@m_seki の

I like ruby tooから引っ越し

10分で書く複数のチャネルのあるQueue

id:ita-wasaさんにRubyKaigi2007の宴会で質問された気がするのだけど、メモしなかったのでちゃんと思い出せない。AP4Rでなにが欲しいかわからないので勘で書く。チャネルはパターンでなく、直接指定することにするので、状態変数はチャネルの数だけ作ることにする。

チャネルをパターンで待てるようにすると、Rindaと同じことになってしまって、速度を稼ぎにくくなる。deqするとき、マッチする可能性のあるチャネルそれぞれを検査しなくてはならないし、enqするときは、待っている人たちそれぞれにマッチするか聞かなくちゃならない。こっちの作戦では待っている人ごとに状態変数を用意する。(Rindaの実装と同じ)

この例では@bagはRubyのHashとArrayの組み合わせだけど、ここをRDBと取り替えると永続化、ログ付きのQueueになる。そういう構成もDBI, Pg(とPRbに同梱のDBIPool)を使って実装済み。

require 'monitor'

class MailBox
  include MonitorMixin
  def initialize
    super()
    @arrive = Hash.new { |h, k| h[k] = new_cond }
    @bag = Hash.new { |h, k| h[k] = [] }
  end

  def deq(channel, timeout=nil)
    synchronize do
      while true
        return @bag[channel].shift unless @bag[channel].empty?
        raise MailBoxTimeout unless @arrive[channel].wait(timeout)
      end
    end
  end

  def enq(channel, value)
    synchronize do
      @bag[channel].push(value)
      @arrive[channel].broadcast
    end
  end

  class MailBoxTimeout < ThreadError; end
end

実験するならこんな感じ。

  mb = MailBox.new

  t0 = Thread.new do
    p [:t0, mb.deq('ruby')]
    p [:t0, mb.deq('ruby')]
    p [:t0, mb.deq('rails')]
    mb.enq('done', 't1')
  end

  t1 = Thread.new do
    begin
      p [:t1, mb.deq('rails', 1)]
    rescue
      p $!
      retry
    end
    p [:t1, mb.deq('ruby')]
    p [:t1, mb.deq('rails')]
    mb.enq('done', 't2')
  end

  mb.enq('ruby', 'ruby 1')
  mb.enq('ruby', 'ruby 2')
  mb.enq('ruby', 'ruby 3')

  sleep(2)

  mb.enq('rails', 'rails 1')
  mb.enq('rails', 'rails 2')
  mb.enq('rails', 'rails 3')

  p ['done', mb.deq('done')]
  p ['done', mb.deq('done')]