@m_seki の

I like ruby tooから引っ越し

Fiberのための同期機構の作り方

Rubyのスレッドは素晴らしいと思うのですが、なぜか人気がありません。

Fiberで書けば人気が出ると思うので、ときどき調べています。多くのサンプルは次のようなジェネレーターっぽいものです。

fib = Fiber.new do
  a, b = 1, 1
  while true
    Fiber.yield(a)
    a, b = b, a + b
  end
end

10.times do |n|
  p [n, fib.resume]
end

Enumeratorなどの実装もこんな感じでできてるんでしょうね。
二つの実行主体(fibのFiberと10.times..してるメイン部分)はメイン部分がFiberを利用しているような構図です(??)。
resumeするべき相手のことを知ってるし、むしろFiberが資源みたいな感じ(???)。

マルチスレッドでのプログラミングのように、複数のFiber間で連携する雰囲気のコードを書くにはどうしたらよいでしょう。
スレッドでいうと次のようなQueueを介して同期するようなものです。

  require 'thread'

  q = SizedQueue.new(1)
  Thread.new do
    10.times do |n|
      sleep(rand)
      q.push(n)
      p [:push, n]
    end
  end
  Thread.new do
    10.times do |n|
      sleep(rand)
      p [:pop, q.pop]
    end
  end.join

スレッド間はQueueだけの付き合いで、相手の実行主体(この場合はThread)については興味がありません。
かっこいい!

こういった同期機構をFiberで書くにはどうしたらいいんでしょう。
プリミティブ自体をFiberで管理したりとかいろいろ試行錯誤したんだけど、うまくいきませんでした。(ヘタクソ)

require 'fiber'すると使用できる、Fiber.currentを見つけてからは簡単に記述できることがわかりました。
同期プリミティブを利用した実行主体をメモしてyield, resumeできるので、事前に関係者(関係するFiber)を知り合っておく必要がありません。*1

次の例はpushをするFiberとpopをするFiberが揃ったら、オブジェクトを一つpush側からpop側へ渡して、両者を再開させる同期機構(ランデブー)です。

require 'fiber'

class Rdv
  def initialize
    @reader = []
    @queue = []
  end

  def push(it)
    if @reader.empty?
      @queue << [it, Fiber.current.method(:resume)]
      return Fiber.yield
    end
    @reader.shift.call(it)
  end

  def pop
    if @queue.empty?
      @reader << Fiber.current.method(:resume)
      return Fiber.yield
    end

    value, fiber = @queue.shift
    fiber.call
    return value
  end
end


if __FILE__ == $0
  rdv = Rdv.new
  Fiber.new do 
    10.times do |n|
      rdv.push(n)
      p [:push, n]
    end
  end.resume
  Fiber.new do
    10.times do |n|
      p [:pop, rdv.pop]
    end
  end.resume
end

一般的な同期機構を作るイディオムは次のようになります。

  1. 呼び出したFIberをメモ
  2. Fiber.yield
  3. 再開されたらメモしておいたFiberにresume

1の前、あるいは、3の前に完了条件をチェックすると思います。

pushの場合はこういう意味です。

  1. 待っている人(pop側)がいないなら、Fiberをメモしてyield。このとき一緒に渡されたオブジェクトもメモします。
  2. 待っている人がいる場合、または再開されたとき、一つ取り出してresume

pop側はこういう気持ちです。

  1. データがないなら(push側がいないなら)、Fiberをメモしてyield
  2. データがある場合、または再開されたとき、一つ取り出してresume。このとき一緒にメモしたオブジェクトを渡します。

Threadのときと違うのは、条件を検査する間をロックする必要がないところです。Threadの場合は、待ってる人を全員を一斉に再開して、ロックを獲得できた実行主体だけが先に進みますが、Fiberでは同時に一つしか動かないのがわかっているので、そういった管理は必要ありません。ドキドキするけど。Fiberで書くときのキモはもしかすると、Thread脳で「これはやばい」と思う部分を「Fiberでは関係なかった」と押し殺すところかもしれません。

Bartender

https://github.com/seki/bartender

Bartenderというselectを抽象化するライブラリを書いています。これは突き詰めるとfdとreadable, writableの組みに一つだけコールバックを設定できるだけのライブラリです。
しかし、Fiberと組み合わせて、Fiber.current.method(:resume)をコールバックとして登録することで面白いもの、ブロッキングのread/write風にアプリケーションを書くと、ブロックしそうなときは他のFiberに実行権を渡しブロックしない、ブロッキング風ノンブロッキングIOを実現できます。

    def wait_io(event, fd)
      self[event, fd] = Fiber.current.method(:resume)
      Fiber.yield
    ensure
      delete(event, fd)
    end

    def wait_readable(fd); wait_io(:read, fd); end

    def _read(fd, sz)
      return fd.read_nonblock(sz)
    rescue IO::WaitReadable
      wait_readable(fd)
      retry
    end

read周りの実装を抜き出すとこうなります。(SSLだとreadでもWaitWritableが来るそうなので対応しないと)

アプリケーションの実際はまたあとで。

*1:この例ではFiber.current.method(:resume)を覚えていますが、単にFiber.currentでも大丈夫。