@m_seki の

I like ruby tooから引っ越し

WEBrickでchunked

WEBrickのレスポンスでchunkedする話

いろいろ探したけどchunkedしてる例が見つからなかったので書いた。

res.chunked=()

WEBrickのHTTPServerの話です。レスポンスにchunkedを指定してイベントを延々と返す方法を調べました。
転送をchunkedで行うにはレスポンスに次の操作をすればよい。

  res.chunked = true

問題はbodyの方。bodyにStringなど完成したバッファを渡してしまうと、結局一気に送ることになってしまって意味がない。

  res.body = "返信だよー"

ダラダラ送るにはIOぽいなにかを与える必要がある。bodyにreadpartialメソッドを持つオブジェクトを与えるとIOぽいと判断してくれる。(IOのサブクラスとか限定しないの、WEBrickさいこー!)

  res.body = MyStream.new(queue) # ← readpatial と close メソッド持つオブジェクト

雑に言うとこんなの。(ほんとのデータはQueueからくると想定してる場合の擬似コード

  class MyStream
    def initialize(queue)
      @queue = queue
    end

    def next_chunk
      @queue.pop
    rescue
      raise EOFError
    end

    def close
      # nop
    end

    def readpartial(size, buf='')
      buf.clear
      buf << next_chunk
      buf
    end
  end

ちゃんと書くならreadpartialのバッファサイズの指定があるので、それも考慮しなくてはならない。
Tofuに追加したChunkedStreamは次のように実装した。

  class ChunkedStream
    def initialize(stream)
      @data = nil
      @cursor = 0
      @stream = stream
    end

    def next_chunk
      @stream.pop
    rescue
      raise EOFError
    end

    def close
      @stream.close
    end

    def readpartial(size, buf='')
      buf.clear
      unless @data
        @cursor = 0
        @data = next_chunk
        @data.force_encoding("ascii-8bit")
      end
      if @data.bytesize <= size
        buf << @data
        @data = nil
      else
        slice = @data.byteslice(@cursor, size)
        @cursor += slice.bytesize
        buf << slice
        if @data.bytesize <= @cursor
          @data = nil
        end
      end
      buf
    end
  end

WEBrickのレスポンス ⇄ ChunkedStream ⇄ 本来アプリで表現したかったストリーム(@stream)、のように中間に入って使う。
WEBrickはレスポンスのチャンクを作るために、ChunkedStreamをreadpartialする。送っていないバッファがあればすぐに返却する。送るものがなければ、@streamからpopを試みる。
@streamがQueueのような待ち合わせ可能な同期メカニズムであれば、データがなければブロックするであろう。ブロックするということは、WEBrickのそのクライアント担当スレッドが止まるわけだけど、その他のスレッドには関係ないので問題ない。他のクライアントに対してはちゃんと仕事してくれる。WEBrickちゃんとしてる。

QueueのデータをServer Sent Eventsぽく返すなら次のようにする。(自分ならDripを使う)

class EventStream
  def initialize(queue)
    @queue = queue
  end

  def pop
    value = @queue.pop
    "data: #{value.to_json}\n\n"
  end

  def close;  end
end
  res.body = Tofu::ChunkedStream.new(EventStream.new(queue))

追記: もう少し親切な例

require 'webrick'
require 'tofu'
require 'drb'
require 'json'

class EventStream
  def initialize(queue)
    @queue = queue
  end

  def pop
    value = @queue.pop
    p value
    "data: #{value.to_json}\n\n"
  end

  def close; end
end

queue = Queue.new
DRb.start_service('druby://localhost:54321', queue)

s = WEBrick::HTTPServer.new(:Port => 8086)
s.mount_proc('/stream') {|req, res|
  res.content_type = 'text/event-stream'
  res.chunked = true
  res.body = Tofu::ChunkedStream.new(EventStream.new(queue))
}
s.start

別の端末からirbでデータ送る

% irb -r drb
irb(main):001:0> ro = DRbObject.new_with_uri('druby://localhost:54321')
irb(main):002:0> ro.push('hello')
irb(main):003:0> ro.push('world')

http://localhost:8086 表示してブラウザのコンソールで実験

var sse = new EventSource.new("/stream")
sse.onmessage = function(e) { console.log(e) }

そうそう

さて土曜日はとちぎRuby会議08ですね。
体調が悪くて参加できない方がいるようです。今から参加したくなった人はTL検索してチケットを譲ってもらおう!