Rubyのスレッドは素晴らしいと思うのですが、なぜか人気がありません。
Fiberで書けば人気が出ると思うので、ときどき調べています。多くのサンプルは次のようなジェネレーターっぽいものです。
fib = Fiber.new do a, b = 1, 1 while true Fiber.yield(a) a, b = b, a + b end end 10.times do |n| p [n, fib.resume] end
Enumeratorなどの実装もこんな感じでできてるんでしょうね。
二つの実行主体(fibのFiberと10.times..してるメイン部分)はメイン部分がFiberを利用しているような構図です(??)。
resumeするべき相手のことを知ってるし、むしろFiberが資源みたいな感じ(???)。
マルチスレッドでのプログラミングのように、複数のFiber間で連携する雰囲気のコードを書くにはどうしたらよいでしょう。
スレッドでいうと次のようなQueueを介して同期するようなものです。
require 'thread' q = SizedQueue.new(1) Thread.new do 10.times do |n| sleep(rand) q.push(n) p [:push, n] end end Thread.new do 10.times do |n| sleep(rand) p [:pop, q.pop] end end.join
スレッド間はQueueだけの付き合いで、相手の実行主体(この場合はThread)については興味がありません。
かっこいい!
こういった同期機構をFiberで書くにはどうしたらいいんでしょう。
プリミティブ自体をFiberで管理したりとかいろいろ試行錯誤したんだけど、うまくいきませんでした。(ヘタクソ)
require 'fiber'すると使用できる、Fiber.currentを見つけてからは簡単に記述できることがわかりました。
同期プリミティブを利用した実行主体をメモしてyield, resumeできるので、事前に関係者(関係するFiber)を知り合っておく必要がありません。*1
次の例はpushをするFiberとpopをするFiberが揃ったら、オブジェクトを一つpush側からpop側へ渡して、両者を再開させる同期機構(ランデブー)です。
require 'fiber' class Rdv def initialize @reader = [] @queue = [] end def push(it) if @reader.empty? @queue << [it, Fiber.current.method(:resume)] return Fiber.yield end @reader.shift.call(it) end def pop if @queue.empty? @reader << Fiber.current.method(:resume) return Fiber.yield end value, fiber = @queue.shift fiber.call return value end end if __FILE__ == $0 rdv = Rdv.new Fiber.new do 10.times do |n| rdv.push(n) p [:push, n] end end.resume Fiber.new do 10.times do |n| p [:pop, rdv.pop] end end.resume end
一般的な同期機構を作るイディオムは次のようになります。
- 呼び出したFIberをメモ
- Fiber.yield
- 再開されたらメモしておいたFiberにresume
1の前、あるいは、3の前に完了条件をチェックすると思います。
pushの場合はこういう意味です。
- 待っている人(pop側)がいないなら、Fiberをメモしてyield。このとき一緒に渡されたオブジェクトもメモします。
- 待っている人がいる場合、または再開されたとき、一つ取り出してresume
pop側はこういう気持ちです。
- データがないなら(push側がいないなら)、Fiberをメモしてyield
- データがある場合、または再開されたとき、一つ取り出してresume。このとき一緒にメモしたオブジェクトを渡します。
Threadのときと違うのは、条件を検査する間をロックする必要がないところです。Threadの場合は、待ってる人を全員を一斉に再開して、ロックを獲得できた実行主体だけが先に進みますが、Fiberでは同時に一つしか動かないのがわかっているので、そういった管理は必要ありません。ドキドキするけど。Fiberで書くときのキモはもしかすると、Thread脳で「これはやばい」と思う部分を「Fiberでは関係なかった」と押し殺すところかもしれません。
Bartender
https://github.com/seki/bartender
Bartenderというselectを抽象化するライブラリを書いています。これは突き詰めるとfdとreadable, writableの組みに一つだけコールバックを設定できるだけのライブラリです。
しかし、Fiberと組み合わせて、Fiber.current.method(:resume)をコールバックとして登録することで面白いもの、ブロッキングのread/write風にアプリケーションを書くと、ブロックしそうなときは他のFiberに実行権を渡しブロックしない、ブロッキング風ノンブロッキングIOを実現できます。
def wait_io(event, fd) self[event, fd] = Fiber.current.method(:resume) Fiber.yield ensure delete(event, fd) end def wait_readable(fd); wait_io(:read, fd); end def _read(fd, sz) return fd.read_nonblock(sz) rescue IO::WaitReadable wait_readable(fd) retry end
read周りの実装を抜き出すとこうなります。(SSLだとreadでもWaitWritableが来るそうなので対応しないと)
アプリケーションの実際はまたあとで。
*1:この例ではFiber.current.method(:resume)を覚えていますが、単にFiber.currentでも大丈夫。