こわいのでタイトルからFiberを抜いてみました。
Bartenderの続きです。
selectぽいソケットの待ち合わせとFiber間のRdvによる同期を持つ例です。dRubyのプロトコルを話しますが、特定のメソッドにしか対応できません。dRubyのプロトコルにするメリットはクライアントを書かなくてよいところです。irb -rdrbでですぐに実験できるからです。
bartender/tiny_drb.rb at master · seki/bartender · GitHub
通常のdRubyはRMIごとに別スレッドで動きますが、このサンプルはFiber / Bartenderのサンプルなので、無理やりシングルスレッドで動かしてます。
require 'bartender/bartender'
class Rdv
def initialize
@reader = []
@queue = []
end
def push(it)
if @reader.empty?
@queue << [it, Fiber.current.method(:resume)]
return Fiber.yield
end
@reader.shift.call(it)
end
def pop
if @queue.empty?
@reader << Fiber.current.method(:resume)
return Fiber.yield
end
value, fiber = @queue.shift
fiber.call
return value
end
end
module DRb
class DRbUnknown
def initialize(err, buf)
@data = buf
end
def _dump(lv)
@data
end
end
end
class DRbEchoServer
def initialize(port)
@rdv = Rdv.new
Bartender::Server.new(port) do |soc|
begin
reader = Bartender::Reader.new(soc)
writer = Bartender::Writer.new(soc)
while true
_, msg, argv = req_drb(reader)
case msg
when 'push'
value = @rdv.push(argv)
when 'pop'
value = @rdv.pop
else
value = msg
end
reply_drb(writer, true, value)
end
rescue
p $!
end
end
end
def req_drb(reader)
ref = load(reader, false)
msg = load(reader)
argc = load(reader)
argv = argc.times.collect { load(reader) }
block = load(reader, false)
[ref, msg, argv]
end
def reply_drb(writer, succ, result)
writer.write(dump(succ), true)
writer.write(dump(result), true)
writer.flush
end
def dump(obj)
str = Marshal.dump(obj) rescue Marshal.dump(nil)
[str.size].pack('N') + str
end
def load(reader, marshal=true)
sz = reader.read(4)
sz = sz.unpack('N')[0]
data = reader.read(sz)
return data unless marshal
begin
Marshal.load(data)
rescue
DRb::DRbUnknown.new($!, data)
end
end
end
DRbEchoServer.new(12345)
Bartender.run
Rdvは前回説明したのと同じなので省略。このdRubyサーバーは druby://localhost:12345 のURIでサービスを提供します。
ro = DRbObject.new_with_uri('druby://localhost:12345')
ro.hello
こんな感じで利用できます。
このオブジェクトはメソッドを呼ぶと、その名前を返す、Echoのように働きます。
ただし、pushとpopだけはRdvへ委譲されます。
- push(obj) -- オブジェクトをRdvにpushします。popされていなければ、pushはpopが起こるまでブロックします。
- pop -- Rdvにpushされているオブジェクトを返します。pushされていなければ、popはpushされるまでブロックします。
たくさんの端末を用意して、それぞれirbなどでpush, popなどのメソッドを呼び出してください。pushとpopが揃うまでブロックする様子や、その間でもEchoのように動く様子が確認できます。
Bartenderのおまけライブラリのおかげで肝心な部分が見えませんので、そこを少し解説します。
Bartender::Server.new(port) do |soc|
...
end
Bartender::ServerはWEBrickのIPv6, v4対応のサーバーポートの用意のコードをパクって作った、TCPServerのちょっと大げさなやつです。
引数はポート番号とブロックです。acceptすると、ブロックを実行するFiberを一つ起こします。acceptして得られたソケットをブロックに渡します。
Serverの内部ではBartenderのselectを使って、ブロックせず、ブロック風にクライアントからの接続を待ちます。
reader = Bartender::Reader.new(soc)
writer = Bartender::Writer.new(soc)
ReaderとWriterはバッファ付きのIOを提供します。readでブロックしそうな時は別のFiberに実行権を移します。Readerはnバイト読み込みの他に、一行読み込みなどもできます。
Writerはある程度出力バッファに蓄えてからwriteすることもできます。writeでブロックしそうな時は別のFiberに実行権を移します。
Fiberのスイッチが発生するのは、Serverのaccept待ち、クライアントごとに生成されるFiberでのread/write、呼び出したメソッドがpush/popだった際のRdvでの待ちです。
selectのIO待ちとRdvの制御の待ちが管理されてるのがわかるでしょうか。わかんないか。
私が解きたかったのは、Fiber側のコードが単純な処理の並びになっていること(流れを単純にかけるようになること)でした。この例では、dRubyのリクエストを読み、処理して、レスポンスを返す、
という処理が流れが、分断されずに記述できています(と心の中で思う)。
def req_drb(reader)
ref = load(reader, false)
msg = load(reader)
argc = load(reader)
argv = argc.times.collect { load(reader) }
block = load(reader, false)
[ref, msg, argv]
end
こういうところ(loadでブロックするかもしれない)とか、
while true
_, msg, argv = req_drb(reader)
case msg
when 'push'
value = @rdv.push(argv)
when 'pop'
value = @rdv.pop
else
value = msg
end
reply_drb(writer, true, value)
end
こういうところ(雑なwhileループ内でもブロックするし、Rdv待ちもあるし)とかが、まるでスレッドみたいな大雑把さで記述でき(てるでしょ?)る。