@m_seki の

I like ruby tooから引っ越し

Rinda::TupleSpaceとNjet

Njetは===の結果を反転するだけの小さなクラスです。Njetは「ニエット」と読みます。
この文書では、Rinda::TupleSpaceとNjetを組み合わせて作るちょっと便利な同期メカニズムを紹介します。
はじめにタプルとパターンのパターンマッチングについて少しだけ復習し、想定する問題とNjetによる解決を示します。dRubyやRinda::TupleSpaceの基本的な使い方は別の文書を参考にしてください。

タプル、パターン、case equals

「===」(case equals, triple equals)は特殊な等号演算子です。通常、==と同じよう同値を検査するものですが、一部のクラスでは「所属性」を検査する働きを持ちます。たとえば左辺がClassの場合にはis_a?の検査であったり、Rangeの場合には範囲に入っているか否か、Regexpの場合には正規表現によるマッチングの検査であったりします。===は主にcase文での比較に用いられます。
Rinda::TupleSpaceのread()やtake()では、引数でパターンを与えてタプルを選択します。パターンマッチングは次のルールで行われます。パターンの各要素について、タプルの要素と===で比較した結果が全て真となるタプルとマッチします。また、パターンの中にあるnilは全てのオブジェクトにマッチするワイルドカードです。たとえば、パターン[:age, 25..35]はタプル[:age, 26]、[:age, 30]とマッチし、[:age, 20]にはマッチしません。

なにか変わったら教えてね問題

複数の観測者(複数のプロセス)が共有するある情報を見ているとします。それぞれの観測者は最新の情報に興味があります。また、情報に変化がおきたことにも興味があります。たとえば、体重計の値や誰かのツイート、RubyKaigiの懇親会のチケットの発売情報などです。GUIにおけるビューも同様ですね。ビューはモデルの変化をきっかけに、自分のペースでモデルの最新の状態を使って描画するのが使命です。
複数のプロセスがオブジェクトの変化を追いかけるなんて、とてもタプルスペース向きの問題ですね。Rinda::TupleSpaceでこれ解くにはどうしたらよいでしょう。

rd-stream

Lindaを使った分散データ構造のひとつにrd-streamがあります。これはデータを時系列に並べたストリーム構造の分散版です。「rd」とはストリームからデータを取得しても消えないことを示しています。複数の観測者がそれぞれのタイミングでストリームからデータを読み出しても、ストリームからデータが消えることはありません。観測者は視点(カーソル)を動かして順に読んでいくのです。
rd-streamは複数の観測者が読んでもうまく動くイベントキューのように働きます。情報の変化があるたびにrd-streamに記録して、観測者はそのrd-streamを読み続ければ変化を全て知ることができます。
rd-streamの一般的な実装は、先頭のインデックスを示すタプルと位置と要素のタプルを併用します。以下はrd-streamの例です。先頭のインデックスは[:head, 3]というタプルで表現されています。[0, 'zero']以降はストリームの要素です。

[:head, 3]
[0, 'zero']
[1, 1]
[2, :two]
[3, 3.0]

新しいデータが到着すると、[:head, 現在の先頭]のタプルを取り出し、インデックスを一つ進め、要素のタプルを追加します。最後に[:head, 新しいインデックス]を書きます。

def add(obj)
  idx = @ts.take([:head, Integer])[1]
  fwd = idx + 1
  @ts.write[fwd, obj]
  @ts.write[:head, fwd]
end

ストリームを読むプロセスは単に添字を一つずつ大きくしながらreadするだけです。もし、ストリームが尽きてしまったら、新しいデータが到着するまでreadがブロックします。次のコード片はストリームから要素を一つずつ読み出し、yieldするイテレータです。

def reader
  idx = @ts.read([:head, Integer)[1]
  while true
    yield(@ts.read([idx, nil])[1])
    idx += 1
  end
end

最初に述べた通り「変わったら教えてね」問題はrd-streamを使えば解決できます。
しかし、ちょっとした問題があります。rd-streamはその名の通り、要素が減りませんから、過去のオブジェクトの履歴も全て保持ししてしまいます。では、要素を減らすことができるでしょうか?要素を減らす機構を考えてみましょう。古い要素を減らすということは、観測者がread([idx, nil])の操作が失敗する可能性がでてきます。カーソルを一つずつずらしながら読む、というシンプルなしかけでは破綻するということです。これに対応するためにあらたなしかけが必要になるでしょう。
「変わったら教えてね」というシンプルな要求に対して、複雑な回答だと思いませんか?
また、観測者が一人であるならどうでしょう。その場合は要素を減らしながら読むin-stream(つまりQueueと同じ)が適切な解となります。

class Njet

Njetはちょっと変わったクラスです。Njetには===メソッドだけが定義されます。このメソッドは===の結果を反転して返します。実はこの小さなNjetが「変わったら教えて欲しい」問題を解決します。

class Njet
  def initialize(value)
    @value = value
  end

  def ===(other)
    ! (@value === other)
  end
end
['age', Njet.new(23)]

というパターンは最初の要素が'age'、2番目の要素が23でない、タプルとマッチします。
体重計の変化を追ってみましょう。体重計の値は:weightというSymbolと重さを示すFloatのタプルで表現することとします。45.6kgなら次のようなタプルになります。

[:weight, 45.6]

まず、体重を提供する側で考えます。現在の体重を更新するには、まず古いタプルをtakeで削除したのちに、新しい値をwriteします。つまり次の手順となります。

def update(new_value)
  @ts.take([:weight, nil])
  @ts.write([:weight, new_value])
end

次に観測者側の視点で考えます。最新の体重を知るには次のパターンでreadします。

def current
  @ts.read([:weight, nil])[1]
end

:weightではじまる2要素のタプルをreadします。nilは全てのオブジェクトにマッチするパターンです。
では、この操作で知った値から変わったら教えて欲しいときにはどうすればよいでしょう。現在知っている値がknown_valueだとしたら、known_valueでないタプルとマッチすればよさそうです。これをNjetクラスを使って表現してみましょう。Njet.new(knwon_value)とすればその値以外とマッチするようになります。つまり次のような操作です。

def wait(known_value)
  @ts.read([:weight, Njet.new(known_value)])[1]
end

です。更新されていなければreadはマッチしませんから、そこでスレッドはブロックします。その後、値が更新されればブロックは解除され、新しい値が返ります。

実験

TCP/IPを話す体重計は簡単に手に入らないと思うので、標準入力から読んだ行を観測するサービスで実験しましょう。この実験では、情報の取得と情報の公開は同じプロセスが担当します。

require 'rinda/tuplespace'

class Njet
  def initialize(value)
    @value = value
  end

  def ===(other)
    ! (@value === other)
  end
end

class MyModel
  include DRbUndumped

  def initialize
    @ts = Rinda::TupleSpace.new
    @ts.write([:model, nil])
  end

  def update(obj)
    @ts.take([:model, nil])
    @ts.write([:model, obj])
  end
  
  def wait(known=nil)
    @ts.read([:model, Njet.new(known)])[1]
  end
end

model = MyModel.new
DRb.start_service('druby://localhost:55551', model)

while line = gets
  model.update(line.chomp)
end

このスクリプトを実行すると、一行読むたびに共有情報を更新します。
MyModelクラスに注目して下さい。協調のために内部にRinda::TupleSpaceを用意しています。情報の更新はupdateメソッドで、情報の取得はwaitメソッドで行います。waitメソッドには自分が知っている情報を与えます。その情報と異なれば直ちに新しい値を返し、もし同じであれば変化が起こるまでブロックします。MyModelクラス、waitメソッド自体はストリームのカーソルを持っていないので、同時に複数のプロセスから呼び出されても大丈夫なようになっています。
クライアントは次のようなスクリプトになります。30回変化を報告したら終了します。

require 'drb'

ro = DRbObject.new_with_uri('druby://localhost:55551')

it = nil
30.times do
  it = ro.wait(it)
  p it
  sleep(0.1) # ちょっとした処理の代わり
end

サーバ、クライアントの順に起動してサーバの端末でなにか文字を入力すると、行ごとに報告されると思います。すばやくタイプしたり、copy/pasteなどを利用して複数行を連続して入力すると、クライアント側の処理の間に更新が発生することがあります。この場合、どのように振る舞うか考えてみて下さい。

まとめ

Njetを使って「変わったら教えてね」問題を解く方法を示しました。某所向けの簡易つぶやきシステム(Koto)のAjax部分にNjetを使っています。Mの変化につられてVを更新するような仕組みにも便利です。

もうちょっとクドい観察

APIについて、もう少しクドく観察してみます。
こういった通知はObserverパターンでやれよって思う人も多いと思います。コールバックするスタイルの通知は通知側のスレッドでアプリケーションが動いてしまうので、一部のアプリケーションの速度に全体が律速する可能性が高いです。今回示したwaitではアプリケーションが問い合わせるスタイルですから、全体が一部のアプリケーションに律速することはありません。また、速いアプリケーションはより頻繁に更新することができ、遅いアプリケーションはほどほどに間引いて更新することができます。これは謎ストレージで同期メカニズムのDropの開発の動機にもつながります。Dropについては「あとで書く」です。

あわせて読みたい

dRubyによる分散・Webプログラミング