Fiberの使い道を考えるために、selectと組み合わせて処理を単純に記述できる(のではないか、ということを確かめるために)部品を作ってみました。
GitHub - seki/bartender: Async I/O using Ruby Fiber
selectのラッパー
まずFiberと関係ない部分です。selectの簡単なラッパーです。今後、Ruby本体にもっと良いものが載り、このレイヤーは後で捨てられるだろう、と考えています。
module Bartender class Context def []=(event, fd, callback) # イベント種(:read, :write)とfdに、コールバックを登録します。 # 一つの組みに、一つのコールバックだけ登録できます。 # コールバックされても消えません end def delete(event, fd) # コールバックを削除します end def alarm(time, callback) # 指定時刻を過ぎたら呼ばれるコールバックを登録します # コールバックされると消えます # 戻り値はdelete_alaram()で使います end def delete_alarm(entry) # alarmを削除します end def run # selectのイベントループを回します # コールバックがなくなったら止まります end def stop # selectのイベントループを止めます end end end
Xtのころの経験から一つの事象に一つだけコールバックを登録できるようにしました。複数でもいいんだけど、複数入れられるものはこの上に被せれば良いだろうから、コア部分は一つだけにしました。
複数登録できる場合の発火する順序などにまつわるイヤなバグの思い出があったので、アプリケーション層で管理してもらう方がいいかなって判断しました。
サーバーのコールバックは次のように登録します。
context[:read, soc] = Proc.new do client = soc.accept # いろいろ end context.run
Fiberと組み合わせたAPI
それぞれのコールバックにFiber.current.method(:resume)を与えることで、Fiberの実行権をブロックしそうな時に手放すことができるだろう、というのを確認するAPIです。
少しだけ応用層になります。
module Bartender class Context def sleep(sec) # sec秒間、止まります。スレッドは止まらず、別のFiberが実行されます。 alarm(Time.now + sec, Fiber.current.method(:resume)) Fiber.yield end def wait_io(event, fd) # fdがreadable, あるいは writableになるまで止まります。 # スレッドは止まらず、別のFiberが実行されます。 self[event, fd] = Fiber.current.method(:resume) Fiber.yield ensure delete(event, fd) end def wait_io_timeout(event, fd, timeout) # timeoutがあるwait_ioです。 method = Fiber.current.method(:resume) entry = alarm(Time.now + timeout, Proc.new {method.call(:timeout)}) self[event, fd] = method raise(TimeoutError) if Fiber.yield == :timeout ensure delete(event, fd) delete_alarm(entry) end def wait_readable(fd); wait_io(:read, fd); end def wait_writable(fd); wait_io(:write, fd); end def _read(fd, sz) # ブロッキング風、ノンブロッキングなreadです。 return fd.read_nonblock(sz) rescue IO::WaitReadable wait_readable(fd) retry end def _write(fd, buf) # ブロッキング風、ノンブロッキングなwriteです。 return fd.write_nonblock(buf) rescue IO::WaitWritable wait_writable(fd) retry end end end
しつこいけど、sleepの実装を説明します。sleepはalarmでできています。
def sleep(sec) alarm(Time.now + sec, Fiber.current.method(:resume)) Fiber.yield end
2行でできています。
- sec秒後にFiber.current.method(:resume)を呼ぶようにalarmを登録します。
- Fiber.yieldで実行権を手放します。
時刻を過ぎたらresumeが呼ばれて、yieldしところから再開します。結果、sec秒間止まることができます。
その他のAPIの実装もほとんど一緒です。
なお、_read, _writeにtimeoutがないのは実装した時期によるもの(alarm, sleepは後から作ったから)なので、そのうちtimeoutが指定できるようにするかもしれません。
最後に、この中で一番長いwait_io_timeoutを説明します。一番自信がないAPIです。
def wait_io_timeout(event, fd, timeout) method = Fiber.current.method(:resume) entry = alarm(Time.now + timeout, Proc.new {method.call(:timeout)}) self[event, fd] = method raise(TimeoutError) if Fiber.yield == :timeout ensure delete(event, fd) delete_alarm(entry) end
- sleepと同様にalarmを設定します。ただし、登録するコールバックはresumeに引数:timeoutを与えるProcです
- [event, fd]の組みにresumeを登録
- Fiber.yieldします。もしも戻り値が:timeoutだったら例外を発生させます
- ensureでそれぞれのコールバックを削除します。
alarmのコールバックは発火した後は削除されるのだけど、IO待ちのコールバックで再開したあとに、またalarmのコールバックが発生すると困るので明示的に削除してます。すでに削除してあったら何も起きないので雑にdelete_alarmしてます。
もしもスレッドだったら、IO待ち起因のコールバックとalarm起因のコールバックが同時に発生したらどうしよう、とか、イベント取り外し中にいやなこと起きないかなあ、とか、心配するところですが、Fiberだから大丈夫。