@m_seki の

I like ruby tooから引っ越し

BartenderのAPIで練習するFiberの使い方

Fiberの使い道を考えるために、selectと組み合わせて処理を単純に記述できる(のではないか、ということを確かめるために)部品を作ってみました。

GitHub - seki/bartender: Async I/O using Ruby Fiber

selectのラッパー

まずFiberと関係ない部分です。selectの簡単なラッパーです。今後、Ruby本体にもっと良いものが載り、このレイヤーは後で捨てられるだろう、と考えています。

module Bartender
  class Context
    def []=(event, fd, callback)
      # イベント種(:read, :write)とfdに、コールバックを登録します。
      # 一つの組みに、一つのコールバックだけ登録できます。
      # コールバックされても消えません
    end

    def delete(event, fd)
      # コールバックを削除します
    end

    def alarm(time, callback)
      # 指定時刻を過ぎたら呼ばれるコールバックを登録します
      # コールバックされると消えます
      # 戻り値はdelete_alaram()で使います
    end

    def delete_alarm(entry)
      # alarmを削除します
    end

    def run
      # selectのイベントループを回します
      # コールバックがなくなったら止まります
    end

    def stop
      # selectのイベントループを止めます
    end
  end
end

Xtのころの経験から一つの事象に一つだけコールバックを登録できるようにしました。複数でもいいんだけど、複数入れられるものはこの上に被せれば良いだろうから、コア部分は一つだけにしました。
複数登録できる場合の発火する順序などにまつわるイヤなバグの思い出があったので、アプリケーション層で管理してもらう方がいいかなって判断しました。

サーバーのコールバックは次のように登録します。

  context[:read, soc] = Proc.new do
    client = soc.accept
    # いろいろ
  end

  context.run

Fiberと組み合わせたAPI

それぞれのコールバックにFiber.current.method(:resume)を与えることで、Fiberの実行権をブロックしそうな時に手放すことができるだろう、というのを確認するAPIです。
少しだけ応用層になります。

module Bartender
  class Context
   
    def sleep(sec)
      # sec秒間、止まります。スレッドは止まらず、別のFiberが実行されます。
      alarm(Time.now + sec, Fiber.current.method(:resume))
      Fiber.yield
    end

    def wait_io(event, fd)
      # fdがreadable, あるいは writableになるまで止まります。
      # スレッドは止まらず、別のFiberが実行されます。
      self[event, fd] = Fiber.current.method(:resume)
      Fiber.yield
    ensure
      delete(event, fd)
    end

    def wait_io_timeout(event, fd, timeout)
      # timeoutがあるwait_ioです。
      method = Fiber.current.method(:resume)
      entry = alarm(Time.now + timeout, Proc.new {method.call(:timeout)})
      self[event, fd] = method
      raise(TimeoutError) if Fiber.yield == :timeout
    ensure
      delete(event, fd)
      delete_alarm(entry)
    end

    def wait_readable(fd); wait_io(:read, fd); end
    def wait_writable(fd); wait_io(:write, fd); end

    def _read(fd, sz)
      # ブロッキング風、ノンブロッキングなreadです。
      return fd.read_nonblock(sz)
    rescue IO::WaitReadable
      wait_readable(fd)
      retry
    end

    def _write(fd, buf)
      # ブロッキング風、ノンブロッキングなwriteです。
      return fd.write_nonblock(buf)
    rescue IO::WaitWritable
      wait_writable(fd)
      retry
    end
  end
end

しつこいけど、sleepの実装を説明します。sleepはalarmでできています。

    def sleep(sec)
      alarm(Time.now + sec, Fiber.current.method(:resume))
      Fiber.yield
    end

2行でできています。

  1. sec秒後にFiber.current.method(:resume)を呼ぶようにalarmを登録します。
  2. Fiber.yieldで実行権を手放します。

時刻を過ぎたらresumeが呼ばれて、yieldしところから再開します。結果、sec秒間止まることができます。


その他のAPIの実装もほとんど一緒です。
なお、_read, _writeにtimeoutがないのは実装した時期によるもの(alarm, sleepは後から作ったから)なので、そのうちtimeoutが指定できるようにするかもしれません。

最後に、この中で一番長いwait_io_timeoutを説明します。一番自信がないAPIです。

    def wait_io_timeout(event, fd, timeout)
      method = Fiber.current.method(:resume)
      entry = alarm(Time.now + timeout, Proc.new {method.call(:timeout)})
      self[event, fd] = method
      raise(TimeoutError) if Fiber.yield == :timeout
    ensure
      delete(event, fd)
      delete_alarm(entry)
    end
  1. sleepと同様にalarmを設定します。ただし、登録するコールバックはresumeに引数:timeoutを与えるProcです
  2. [event, fd]の組みにresumeを登録
  3. Fiber.yieldします。もしも戻り値が:timeoutだったら例外を発生させます
  4. ensureでそれぞれのコールバックを削除します。

alarmのコールバックは発火した後は削除されるのだけど、IO待ちのコールバックで再開したあとに、またalarmのコールバックが発生すると困るので明示的に削除してます。すでに削除してあったら何も起きないので雑にdelete_alarmしてます。

もしもスレッドだったら、IO待ち起因のコールバックとalarm起因のコールバックが同時に発生したらどうしよう、とか、イベント取り外し中にいやなこと起きないかなあ、とか、心配するところですが、Fiberだから大丈夫。


あわせて読みたい

改訂2版 パーフェクトRubydRubyによる分散・WebプログラミングThe dRuby Book: Distributed and Parallel Computing with Ruby

パーフェクトRubyの改訂版でもFiberの記述は増えていないとのこと。