WEBrickのレスポンスでchunkedする話
いろいろ探したけどchunkedしてる例が見つからなかったので書いた。
res.chunked=()
WEBrickのHTTPServerの話です。レスポンスにchunkedを指定してイベントを延々と返す方法を調べました。
転送をchunkedで行うにはレスポンスに次の操作をすればよい。
res.chunked = true
問題はbodyの方。bodyにStringなど完成したバッファを渡してしまうと、結局一気に送ることになってしまって意味がない。
res.body = "返信だよー"
ダラダラ送るにはIOぽいなにかを与える必要がある。bodyにreadpartialメソッドを持つオブジェクトを与えるとIOぽいと判断してくれる。(IOのサブクラスとか限定しないの、WEBrickさいこー!)
res.body = MyStream.new(queue) # ← readpatial と close メソッド持つオブジェクト
雑に言うとこんなの。(ほんとのデータはQueueからくると想定してる場合の擬似コード)
class MyStream def initialize(queue) @queue = queue end def next_chunk @queue.pop rescue raise EOFError end def close # nop end def readpartial(size, buf='') buf.clear buf << next_chunk buf end end
ちゃんと書くならreadpartialのバッファサイズの指定があるので、それも考慮しなくてはならない。
Tofuに追加したChunkedStreamは次のように実装した。
class ChunkedStream def initialize(stream) @data = nil @cursor = 0 @stream = stream end def next_chunk @stream.pop rescue raise EOFError end def close @stream.close end def readpartial(size, buf='') buf.clear unless @data @cursor = 0 @data = next_chunk @data.force_encoding("ascii-8bit") end if @data.bytesize <= size buf << @data @data = nil else slice = @data.byteslice(@cursor, size) @cursor += slice.bytesize buf << slice if @data.bytesize <= @cursor @data = nil end end buf end end
WEBrickのレスポンス ⇄ ChunkedStream ⇄ 本来アプリで表現したかったストリーム(@stream)、のように中間に入って使う。
WEBrickはレスポンスのチャンクを作るために、ChunkedStreamをreadpartialする。送っていないバッファがあればすぐに返却する。送るものがなければ、@streamからpopを試みる。
@streamがQueueのような待ち合わせ可能な同期メカニズムであれば、データがなければブロックするであろう。ブロックするということは、WEBrickのそのクライアント担当スレッドが止まるわけだけど、その他のスレッドには関係ないので問題ない。他のクライアントに対してはちゃんと仕事してくれる。WEBrickちゃんとしてる。
QueueのデータをServer Sent Eventsぽく返すなら次のようにする。(自分ならDripを使う)
class EventStream def initialize(queue) @queue = queue end def pop value = @queue.pop "data: #{value.to_json}\n\n" end def close; end end
res.body = Tofu::ChunkedStream.new(EventStream.new(queue))
追記: もう少し親切な例
require 'webrick' require 'tofu' require 'drb' require 'json' class EventStream def initialize(queue) @queue = queue end def pop value = @queue.pop p value "data: #{value.to_json}\n\n" end def close; end end queue = Queue.new DRb.start_service('druby://localhost:54321', queue) s = WEBrick::HTTPServer.new(:Port => 8086) s.mount_proc('/stream') {|req, res| res.content_type = 'text/event-stream' res.chunked = true res.body = Tofu::ChunkedStream.new(EventStream.new(queue)) } s.start
別の端末からirbでデータ送る
% irb -r drb irb(main):001:0> ro = DRbObject.new_with_uri('druby://localhost:54321') irb(main):002:0> ro.push('hello') irb(main):003:0> ro.push('world')
http://localhost:8086 表示してブラウザのコンソールで実験
var sse = new EventSource("/stream") sse.onmessage = function(e) { console.log(e) }
そうそう
さて土曜日はとちぎRuby会議08ですね。
体調が悪くて参加できない方がいるようです。今から参加したくなった人はTL検索してチケットを譲ってもらおう!