@m_seki の

I like ruby tooから引っ越し

ストリーム指向のストレージDrip

Ruby会議2011の下書き]

  • PTupleSpace
  • 問題点
  • KV(与太話)
  • Drip

TupleSpaceの永続化とその制約

この節ではRinda::TupleSpaceをおさらいしながら、TupleSpaceに永続化したPTupleSpaceの概要を紹介し、その制約について考えます。
PTupleSpaceはTupleSpaceのサブクラスです。タプルの状態の変化を逐次二次記憶にログして、次回の起動に備えます。PTupleSpaceを再起動すると最後の(最新の)タプルの状態のままに復元されます。

タプルは実世界の「伝票」によく似ています。タプルをプロセス間でリレーしながら仕事を進めていく様子は、「伝票」を持ち回って仕事を行うのにそっくりです。Rindaの世界では「伝票」はTupleSpaceを介してプロセスからプロセスへ渡り歩きます。

PTupleSpaceの提供する永続化は、TupleSpaceに蓄えられた伝票の束にのみ作用します。プロセスが持っている伝票をPTupleSpaceが知ることはできず、永続化されません。また、待合せている様子も永続化の対象ではありません。プロセスがある伝票を待っている、という状況までは再現できないのです。

TupleSpaceに期待する機能が伝票の貯蔵庫であると考えた場合には、これで充分と言えるでしょう。PTupleSpaceにwriteした情報は再起動後もそのまま手に入ります。多くのアプリケーションではこれで間に合うかもしれません。ArrayやHashをそのままdRubyで公開する、あるいはログ付きで公開するのに比べて、TupleSpaceはどのくらい便利なのでしょうか。おそらく、RindaのTupleSpaceの強力なパターンマッチングにはある程度のアドバンテージがあるでしょう。そのパターンマッチングと引き換えに、あまり効率のよいデータ構造を使うことができませんでした。実装には線形探索が残っていて、要素数が増えたときに不安があります。

TupleSpaceの本来の役割であるプロセス間の協調についてはどうでしょうか。PTupleSpaceに異常が起きてクラッシュしてしまった、再起動が必要になった、といった状況を想像してみましょう。まず、PTupleSpaceプロセスが停止することにより、readやtakeなどの待合せのRMIを実行していたプロセスではdRubyの例外があがります。PTupleSpaceが再起動されるとタプル群の最後の状態に復元されます。待合せをしていたプロセスは再起動したことを(知るのは難しいのですが)知ったのち、例外が発生した操作をやり直すことになります。しかし、そのように再開するスクリプトを書くのは難しく面倒です。

また、RMIのために抱え込む厄介な問題もあります。writeやtakeなど、タプルの状態を変える操作を考えてみましょう。通常のメソッド呼び出しでは処理が終われば呼び出した側に直ちに制御がもどりますが、RMIではサーバ側のメソッドの終了と、RMIの終了の間にソケット通信が行われます。つまり、処理が終わる前に例外が発生したのか、結果を伝える間に例外が発生したのか知ることができません。PTupleSpaceが二次記憶にタプルの操作をログしたあとに、クライアントにその完了が届く前にクラッシュしてしまう可能性があります。(全てがうまくいってからログする実装を選んでも、クライアントにタプルが届いたのち、ログするまえにクラッシュする可能性があります)

異常終了といえば、プロセス側のクラッシュも考えられますね。PTupleSpaceの対象外ですがちょっと想像してみましょう。伝票をプロセスが取り出したままクラッシュしてしまうと、復元する方法がありません。次の短いスクリプトを見てみましょう。

def succ
  _, count = @ts.take([:count, nil])
  count += 1
  yield(count)
ensure
  @ts.write([:count, count) if count
end

これは[:count, 整数]のタプルを取り出し、一つ大きくしてまた書き込むスクリプトです。伝票を取り出し、カウンタを一つ進め、最後にTupleSpaceに書き戻します。伝票がプロセスにある間は、別のプロセスは伝票をTupleSpaceから読んだり、取り出したりすることはできないので安全にカウンタを操作できます。さて、もしも伝票がプロセスにある間にそのプロセスがクラッシュしたらどうなるでしょう。PTupleSpaceは自身の中にある伝票しか復元できませんから、その伝票は失われたままです。このカウンタを操作するプロセス群は全て停まってしまいます。こういった使い方(協調に使うケースの多くはそうなんだと思うのですが)をする場合、TupleSpaceだけでなく関係するプロセス群も再起動する必要があるだけでなく、TupleSpace内のタプルも初期状態にする必要があります。せっかくタプルの状態を復元できるようにしたというのに‥。

PTupleSpaceはTupleSpace自体の永続化を目的としたもので、それ自体はおそらく期待した通りに動作すると思います(そういうつもりで作ったので)。しかし、それだけでは協調するプロセス群をもとに戻すことはできません。ちょっとだまされた気分ですよね。

PTupleSpaceの使い方

skip

ストレージとしてのTupleSpace

APIの視点からストレージとしてのTupleSpaceをおさらいします。
TupleSpaceはタプル群を扱う集合構造です。同じ情報を複数持つことができるので、Bagと言えるでしょう。
最近の流行言葉にKVSという言葉ありますね。キーと値で表現するなら、同じキーを持つ要素の重複を許すストレージです。キーしかなくて、キーが値、にも見えますが。

これに対してHashは一つのキーに一つの値が関連付けられる辞書です。

TupleSpaceで辞書を模倣するのはやっかいです。[キー, 値]というタプルで辞書を構成仕様とした場合を考えてみましょう。まずデータを読むのは次のように書けそうです。

@ts.read([キー, nil])

では要素の追加はどうでしょう。

@ts.write([キー, 値])

このような単純なwriteでは重複を防ぐことはできません。全体をロックして、そのキーのタプルを削除してからwriteする必要があります。

def []=(key, value)
  lock = @ts.take([:global_lock])
  @ts.take([key, nil], 0) rescue nil
  @ts.write([key, value])
ensure
  @ts.write(lock) if lock
end

このグローバルなロックは実はデータを読むときにも必要です。なぜなら、そのキーの情報を別のスレッドが更新中かもしれないからです。

def [](key)
  lock = @ts.take([:global_lock])
  _, value = @ts.read([key, nil], 0) rescue nil
  return value
ensure
  @ts.write(lock) if lock
end

要素の増減がないケースでは前章で示した通り、グローバルなロックは不要です。だれかが更新中はその要素は取り出せませんが、更新が終わればまた書き戻されるはずです。ですから、単に要素が読めるまでreadで待ってしまえば良いことになり、局所的なロックとなります。

eachはどのように実装したらよいでしょう。TupleSpace全体を順に走査するうまい方法はありません。read_allで全ての要素のArrayを生成して、その配列にeachを委譲することになります。

def each(&blk)
  lock = @ts.take([:global_lock])
  @ts.read_all([nil, nil]).each(&blk)
ensure
  @ts.write(lock) if lock
end

素数が少ないうちは気になりませんが、多くなると損している気がしますね。
分散ハッシュテーブルなどでもeachやkeysを低コストで実装するのは難しいかもしれません。

流行のストレージには、常にキーでソートされているシーケンスを持つものがあります。並んでいることを利用して、大きな空間をブラウズするのが得意です。キーを工夫することでバージョン付きの情報を蓄えることもできます。RindaのTupleSpaceには、タプルを順序付けて並べることはできませんから、これを低コストで模倣するのは難しいです。

ところであなたが欲しかった集合は本当にHashでしたか?

ストリーム指向のストレージDrip

この節で紹介するのは前章で説明したRD stream(消えないストリーム)のようなデータ構造を持つライブラリDripです。Dripはオブジェクトを書き込まれた順に蓄えるだけでなく、イベント通知メカニズムとしてプロセス間の同期にも使えます。Drip自体の機能は非常に小さいものですが、視点によっていろいろなものに見えます。ストレージにもバッチ処理ミドルウェアにも見えます。ひとことで説明するのは難しいですが、流行のサービスではTwitterのタイムラインがよく似ています。Rubyの標準ライブラリでたとえると、消えないQueue、消えないRindaです。たとえてもやっぱりよくわかりませんね。

簡単なスクリプトを使ってDripを使ってみましょう。

Dripをインストールする

(井上さんgems化してくださーい。)

Dripを作る

Dripは二次記憶としてプレーンなファイルを使います。Dripを生成するにはファイルを置くディレクトリを指定します。次のスクリプト(drip_s.rb)はDripを生成しdRubyでサービスするものです。

require 'drip'
require 'drb'

class Drip
  def quit
    Thread.new do
      synchronize do |key|
        exit(0)
      end
    end
  end
end

drip = Drip.new('drip_dir')
DRb.start_service('druby://localhost:54321', drip)
DRb.thread.join

Dripにquitメソッドを追加しています。これはRMI経由でこのプロセスを終了させるもので、Dripが二次記憶への操作をしていないとき(synchronize中)を待ってから終わらせます。

このあとの操作のために起動しておきましょう。

ターミナル1

% ruby drip_s.rb
オブジェクトを覚える

Dripの状態を変更する唯一のメソッドはwriteメソッドです。writeメソッドには覚えたいオブジェクトを指定します。ちょっと書いてみましょう。

ターミナル2

% irb -r drb --simple-prompt
>> drip = DRbObject.new_with_uri('druby://localhost:54321')
=> #<Drip:...>
>> drip.write('Hello, World.')
=> 1308221059579470

ターミナル1で起動したDripサーバへの参照を作り、dripという変数に覚えます。次に'Hello, World.'という文字列をwriteします。writeの戻り値は書き込んだオブジェクトに対応するキーです。キーは単調増加する整数で、たいてい時刻から生成されます。時刻と最新のキーを比べて、最新のキーの方が大きい場合には+1したものをキーとします。多くの場合時刻と相互に変換できると思いますが、「ユーザが時計を設定してしまった問題」や時刻の分解能よりも細かい単位で書き込まれた場合などにはその限りではありません。いずれにしろ、このようなケースでもキーはいつも単調増加となります。
キーがおおよその時刻に変換できるのは、人間があとでデータを調べるときに多少は便利です。Dripには同時にただ一つのオブジェクトだけが書き込めます。さまざまな事象は同時にいくつも発生しますが、Dripがそれを観測するのは同時にはただ一つです。事象が発生した時刻ではなく、Dripがそれを知った時刻と考えて下さい。

writeメソッドはStringに限らず、どんなオブジェクトでも保存できます。ただし、MarshalできないオブジェクトはDRbObjectで保存されますから、取り出して使えるようにするには多少の工夫が必要です。また、あとでオブジェクトを取り出すときのヒントとなる複数のタグを指定できます。タグはStringでなければいけません。

>> drip.write({:text => 'Hello, World', :user => 'm_seki'}, 'greeting', 'test')
=> 1308221460421676
>> drip.write(1308221460421676, 'test')
=> 1308221907348161

この操作では、Hashのオブジェクトに二つのタグ('greeting, 'test')を付けて書き込み、次に整数に'test'というタグを付けて書き込んでいます。writeの際に、一つのオブジェクトに複数のタグをつけることができます。タグはDripで一意なものでなく、同じタグを持つオブジェクトがすでにwriteされていても問題ありません。

read

Dripからデータを読む方法はたくさんありますが、基本となるのはreadです。readの引数は意外なほど多いです。

read(key, n=1, at_least=1, timeout=nil)

戻り値はキーとオブジェクト、タグからできたArrayのArrayです。読みたい要素数が1であってもArrayが返ります。keyよりも大きなキーを持つ要素をn個返します。at_leastは最低限返して欲しい要素数です。読める要素数がat_leastに達するまで、readはブロックします。timeoutはブロックの期限を指定します。最低でもtimeout秒は待ち、それを越えると例外があがります。nilを指定すると無限に待ちます。

先頭から一つずつ読み進めてみましょう。Dripのキーは正の整数ですから、先頭はキー0の次の要素です。0から二つの要素を読んでみましょう。

>> drip.read(0, 2)
=> [[1308221059579470, "Hello, World."], [1308221460421676, {:text=>"Hello, World", :user=>"m_seki"}, "greeting", "test"]]

0の次のキーを持つ要素(つまり先頭、一番旧い要素)から二つ分の要素が集まってできたArrayが返りました。

次に先頭から順に一つずつ読んでみます。注目点のキーをkとして、kをずらしながらreadしていきましょう。

>> k = 0
=> 0
>> k, v, *tag = drip.read(k)[0]
=> [1308221059579470, "Hello, World."]
>> k, v, *tag = drip.read(k)[0]
=> [1308221460421676, {:text=>"Hello, World", :user=>"m_seki"}, "greeting", "test"]
>> k, v, *tag = drip.read(k)[0]
=> [1308221907348161, 1308221460421676, "test"]
>> k, v, *tag = drip.read(k)[0]

どうでしょうか?ひとつずつ順に取り出せている様子がわかりますか?Dripにおけるデータのブラウズは、キーをずらすことで表現します。readは与えられたキーのすぐ後の要素を返しますから、さっき読んだオブジェクトのキーを与えてreadすることで全ての要素を順に辿ることができます。C言語のstdioライブラリでいうとfseek()とfread()を一度に行うイメージですね。

さて、4回目のreadでブロックしてしまいました。これは注目点のキーより新しい要素が存在しないからです。もう一つ端末を用意して、要素を追加してみましょう。

ターミナル3

% irb -r drb --simple-prompt
>> drip = DRbObject.new_with_uri('druby://localhost:54321')
=> #<Drip:0x0000010086b130>
>> drip.write('Hello, Again.', 'test')
=> 1308222915958300

ターミナル2

=> [1308222915958300, "Hello, Again.", "test"]

うん。ターミナル2のブロックは解け、新しい要素が届いたのがわかります。 

ここでdrip_s.rbを停止させたらどうなるか実験してみましょう。まずターミナル2でreadを行ってブロックさせます。

ターミナル2

>> k, v, *tag = drip.read(k)[0]

続いてdrip_s.rbを[control]+Cなどで停止させます。

ターミナル1

% ruby drip_s.rb
^Cdrip_s.rb:16:in `join': Interrupt
	from drip_s.rb:16:in `<main>'

ターミナル2ではreadの最中にdrip_s.rbが終了したので例外があがります。待ち合わせしているときにサーバが終了すると、待合せは解除されることになります。

DRb::DRbConnError: connection closed
         ....

再びdrip_s.rbを起動してから、readを試してみましょう。

ターミナル1

% ruby drip_s.rb

ターミナル2

>> k, v, *tag = drip.read(k)[0]

そしてターミナル3からもう一つ要素を追加します。ターミナル2のreadが完了し、その要素を読み出すことができるはずです。

ターミナル3

>> drip.write('drip drop')
=> 1308304037358423

ターミナル2

=> [1308304037358423, "drip drop"]

Dripは要素を二次記憶にwriteされたオブジェクトをログしており、次回の起動に備えています。これまでの実験の中でwriteされた5つの要素が本当に残っているかためしましょう。先頭から5つの要素をreadします。keyの最小値から5つの要素をreadするには、read(0, 5)とします。

ターミナル2

>> drip.read(0, 5)
=> [[1308221059579470, "Hello, World."], [1308221460421676, {:text=>"Hello, World", :user=>"m_seki"}, "greeting", "test"], [1308221907348161, 1308221460421676, "test"], [1308222915958300, "Hello, Again.", "test"], [1308304037358423, "drip drop"]]

drip_s.rbが一度終了しても内容が失われてないことが確認できます。

APIの設計指針

DripはdRubyと組み合わせて使うのを前提としてAPIを設計しました。dRubyの弱点はいくつかありますが、特に苦手なのはサーバ側のオブジェクトの寿命と排他制御の管理、そしてRMIの遅さです。サーバ側に状態をもつオブジェクトを作らないこと、RMIの回数を減らすことはAPIの選択の指針となります。

さきほどのreadメソッドに与えるキーについて、もう一度よく見てみましょう。readのキーは、データベース中の視点、カーソル、ページといった概念に近いものです。よくあるデータベースのAPIでは「カーソル」はコンテキストの中に隠されています。例えばRubyのFileオブジェクトは現在の位置を覚えていて、ファイル中のその位置から読んだり、その位置へ書いたりします。これに対し、DripではFileオブジェクトのような状態/コンテキストをもつオブジェクトを用いません。Dripへの質問は状態の変化を伴わない、関数のようになっています。位置などのコンテキストを管理するオブジェクトの代わりに、注目点となるキーを使うのです。このAPIを選択した理由は、コンテキストを管理するオブジェクトをDripサーバの中で生成しないためです。DripはdRubyを経由したRMIで利用されることを前提としています。生成と消滅(beginとend、openとclose)があるようなコンテキストを導入すると、その寿命をサーバが気にする必要が生まれます。分散環境でのGCといった難しい問題に向かい合わなくてはなりません。このため、Dripではそのような面倒を嫌ってInteger(キー)だけの付き合いとなるようにAPIを設計しました。
この節で示した通り、コンテキストを管理するオブジェクトを使う代わりに、readのたびに返されるキーを使ってアクセスすることで、同様な操作を実現できます。もしこのAPIでの操作が面倒と感じるなら、ローカルなプロセスの中でキーを隠すようなコンテキストを準備することを勧めます。間違ってDripサーバ側にコンテキストを用意しないよう注意して下さいね。

readでは、自分の知らない情報を一度に最大n個、少なくともm個を返せ、と指示します。n回のreadで構成すると、RMIの回数が増えてしまいますが、このように一度に転送すればRMIの回数を削減できます。応答時間よりも処理時間が重要なバッチ処理などのケースで有効です。「少なくともm個」を指定することで、イベントの(データの)発生の都度RMIを発生させずにすみます。ほどほどにデータがたまるのを待って一度に転送することができるからです。

Dripはストレージに関する一連の習作の経験から、「作りすぎない」ことに留意しました。「作る」ことは楽しいので、請われるままに機能を増やしてしまうことがしばしば起こります(私はそういう経験があります)。Dripのポリシーを明確にして、機能を増やしてしまう誘惑と戦いました。

タグとその他のread系API

writeの際につけたタグを使って、読み出す情報をフィルタすることができます。writeでは、一つの情報に複数のタグをつけることができ、read_tagはタグを一つ指定してそのタグを持つ要素だけを読み出すことができます。その他の引数はreadと同じです。

read_tag(key, tag, n=1, at_least=1, timeout=nil)

Dripでは全ての要素は一直線のストリームとして管理されます。要素はキーと値、複数のタグで構成されて、キーの順に並んでいます。readやread_tagは小さいキーから大きいキーの順にアクセスします。キーはwriteされた時刻を元に計算され、たいてい連続していません。readやread_tagの際に、存在しないキーを与えても問題ありません。そのキーよりも大きなキーを持つ、直近の要素からアクセスを開始します。readは与えたキーよりも大きなキーを持つ、直近の要素を返します。read_tagも同様に直近の要素を返しますが、タグがマッチしない要素はスキップされ、マッチした要素だけが返されます。タグを使うと一つのDripをタグで分類されたたくさんのDripのように見立てることもできます。
また、readとread_tagのキーは共通ですから二つを組み合わせることもできます。例えば、read_tagで狙ったタグを持つ要素を取得して、それ以降の要素をreadで順に全て集める、といった操作です。

read_tagもreadと同様に要素が取り出せない場合に、ブロックして新しい要素の到着を待つことができます。

read系のAPIは他にも用意されています。

older(key, tag=nil)

head(n=1, tag=nil)

readやread_tagは過去から未来へ走査するAPIですが、olderとheadは過去方向への操作を補助するAPIです。
olderはkeyで指定した要素の一つ旧い要素を返します。tagを使って要素をフィルタすることもできます。keyにnilを与えると最新を指定したことになります。older(nil)は最新の要素を一つ返します。

headはolderを使ったコンビニエンスメソッドです。一番新しい要素までのn個を返します。tagを使って要素をフィルタすることも可能です。nが1の場合、一番新しい要素だけが入ったArrayを返します。nが2の場合には、一番新しい要素の一つ旧い要素と、一番新しい要素の入ったArrayを返します。Arrayの中は旧いから新しいものへと並んでいます。

過去へ向かってのアクセスはブロックすることはありません。なぜなら、過去には情報が追加できないからです。(要素が増えるのは未来方向のみ)

コンビニエンスメソッドとしてはolderの対になるnewerもあります。

newer(key, tag=nil)

これは単なるread/read_tagのラッパーで、read(key, 1, 0)[0]あるいはread_tag(key, tag, 1, 0)[0]を簡単に呼べるようにしたものです。


(つづく)