こわいのでタイトルからFiberを抜いてみました。
Bartenderの続きです。
selectぽいソケットの待ち合わせとFiber間のRdvによる同期を持つ例です。dRubyのプロトコルを話しますが、特定のメソッドにしか対応できません。dRubyのプロトコルにするメリットはクライアントを書かなくてよいところです。irb -rdrbでですぐに実験できるからです。
bartender/tiny_drb.rb at master · seki/bartender · GitHub
通常のdRubyはRMIごとに別スレッドで動きますが、このサンプルはFiber / Bartenderのサンプルなので、無理やりシングルスレッドで動かしてます。
require 'bartender/bartender' class Rdv def initialize @reader = [] @queue = [] end def push(it) if @reader.empty? @queue << [it, Fiber.current.method(:resume)] return Fiber.yield end @reader.shift.call(it) end def pop if @queue.empty? @reader << Fiber.current.method(:resume) return Fiber.yield end value, fiber = @queue.shift fiber.call return value end end module DRb class DRbUnknown def initialize(err, buf) @data = buf end def _dump(lv) @data end end end class DRbEchoServer def initialize(port) @rdv = Rdv.new Bartender::Server.new(port) do |soc| begin reader = Bartender::Reader.new(soc) writer = Bartender::Writer.new(soc) while true _, msg, argv = req_drb(reader) case msg when 'push' value = @rdv.push(argv) when 'pop' value = @rdv.pop else value = msg end reply_drb(writer, true, value) end rescue p $! end end end def req_drb(reader) ref = load(reader, false) msg = load(reader) argc = load(reader) argv = argc.times.collect { load(reader) } block = load(reader, false) [ref, msg, argv] end def reply_drb(writer, succ, result) writer.write(dump(succ), true) writer.write(dump(result), true) writer.flush end def dump(obj) str = Marshal.dump(obj) rescue Marshal.dump(nil) [str.size].pack('N') + str end def load(reader, marshal=true) sz = reader.read(4) sz = sz.unpack('N')[0] data = reader.read(sz) return data unless marshal begin Marshal.load(data) rescue DRb::DRbUnknown.new($!, data) end end end DRbEchoServer.new(12345) Bartender.run
Rdvは前回説明したのと同じなので省略。このdRubyサーバーは druby://localhost:12345 のURIでサービスを提供します。
ro = DRbObject.new_with_uri('druby://localhost:12345') ro.hello
こんな感じで利用できます。
このオブジェクトはメソッドを呼ぶと、その名前を返す、Echoのように働きます。
ただし、pushとpopだけはRdvへ委譲されます。
- push(obj) -- オブジェクトをRdvにpushします。popされていなければ、pushはpopが起こるまでブロックします。
- pop -- Rdvにpushされているオブジェクトを返します。pushされていなければ、popはpushされるまでブロックします。
たくさんの端末を用意して、それぞれirbなどでpush, popなどのメソッドを呼び出してください。pushとpopが揃うまでブロックする様子や、その間でもEchoのように動く様子が確認できます。
Bartenderのおまけライブラリのおかげで肝心な部分が見えませんので、そこを少し解説します。
Bartender::Server.new(port) do |soc| ... end
Bartender::ServerはWEBrickのIPv6, v4対応のサーバーポートの用意のコードをパクって作った、TCPServerのちょっと大げさなやつです。
引数はポート番号とブロックです。acceptすると、ブロックを実行するFiberを一つ起こします。acceptして得られたソケットをブロックに渡します。
Serverの内部ではBartenderのselectを使って、ブロックせず、ブロック風にクライアントからの接続を待ちます。
reader = Bartender::Reader.new(soc) writer = Bartender::Writer.new(soc)
ReaderとWriterはバッファ付きのIOを提供します。readでブロックしそうな時は別のFiberに実行権を移します。Readerはnバイト読み込みの他に、一行読み込みなどもできます。
Writerはある程度出力バッファに蓄えてからwriteすることもできます。writeでブロックしそうな時は別のFiberに実行権を移します。
Fiberのスイッチが発生するのは、Serverのaccept待ち、クライアントごとに生成されるFiberでのread/write、呼び出したメソッドがpush/popだった際のRdvでの待ちです。
selectのIO待ちとRdvの制御の待ちが管理されてるのがわかるでしょうか。わかんないか。
私が解きたかったのは、Fiber側のコードが単純な処理の並びになっていること(流れを単純にかけるようになること)でした。この例では、dRubyのリクエストを読み、処理して、レスポンスを返す、
という処理が流れが、分断されずに記述できています(と心の中で思う)。
def req_drb(reader) ref = load(reader, false) msg = load(reader) argc = load(reader) argv = argc.times.collect { load(reader) } block = load(reader, false) [ref, msg, argv] end
こういうところ(loadでブロックするかもしれない)とか、
while true _, msg, argv = req_drb(reader) case msg when 'push' value = @rdv.push(argv) when 'pop' value = @rdv.pop else value = msg end reply_drb(writer, true, value) end
こういうところ(雑なwhileループ内でもブロックするし、Rdv待ちもあるし)とかが、まるでスレッドみたいな大雑把さで記述でき(てるでしょ?)る。