id:ita-wasaさんにRubyKaigi2007の宴会で質問された気がするのだけど、メモしなかったのでちゃんと思い出せない。AP4Rでなにが欲しいかわからないので勘で書く。チャネルはパターンでなく、直接指定することにするので、状態変数はチャネルの数だけ作ることにする。
チャネルをパターンで待てるようにすると、Rindaと同じことになってしまって、速度を稼ぎにくくなる。deqするとき、マッチする可能性のあるチャネルそれぞれを検査しなくてはならないし、enqするときは、待っている人たちそれぞれにマッチするか聞かなくちゃならない。こっちの作戦では待っている人ごとに状態変数を用意する。(Rindaの実装と同じ)
この例では@bagはRubyのHashとArrayの組み合わせだけど、ここをRDBと取り替えると永続化、ログ付きのQueueになる。そういう構成もDBI, Pg(とPRbに同梱のDBIPool)を使って実装済み。
require 'monitor' class MailBox include MonitorMixin def initialize super() @arrive = Hash.new { |h, k| h[k] = new_cond } @bag = Hash.new { |h, k| h[k] = [] } end def deq(channel, timeout=nil) synchronize do while true return @bag[channel].shift unless @bag[channel].empty? raise MailBoxTimeout unless @arrive[channel].wait(timeout) end end end def enq(channel, value) synchronize do @bag[channel].push(value) @arrive[channel].broadcast end end class MailBoxTimeout < ThreadError; end end
実験するならこんな感じ。
mb = MailBox.new t0 = Thread.new do p [:t0, mb.deq('ruby')] p [:t0, mb.deq('ruby')] p [:t0, mb.deq('rails')] mb.enq('done', 't1') end t1 = Thread.new do begin p [:t1, mb.deq('rails', 1)] rescue p $! retry end p [:t1, mb.deq('ruby')] p [:t1, mb.deq('rails')] mb.enq('done', 't2') end mb.enq('ruby', 'ruby 1') mb.enq('ruby', 'ruby 2') mb.enq('ruby', 'ruby 3') sleep(2) mb.enq('rails', 'rails 1') mb.enq('rails', 'rails 2') mb.enq('rails', 'rails 3') p ['done', mb.deq('done')] p ['done', mb.deq('done')]