@m_seki の

I like ruby tooから引っ越し

ストリーム指向のストレージDrip

この章では筆者が最近夢中になっているストリーム指向のストレージ、Dripについて紹介します。Dripはストレージであると同時に、プロセス間を協調のメカニズムでもあります。このくだりを聴いてもRindaとの共通部分が多くあると感じるでしょう。実際にRindaのアプリケーションを書いた経験を元にして書かれました。DripはRindaを置き換えるものではありません。どちらかというとオブジェクトの貯蔵庫であって、オブジェクト指向データベース、Key Value Storeやマルチディメンジョンのリストなど一連のストレージの習作を出発点としました。

Dripとはなにか

Dripは追記型のストレージの一種で、Rubyオブジェクトを時系列にログします。Dripにはオブジェクトの追記のみ可能で、削除や更新はできません。dRubyRMIを考慮した、局所的で安価なブラウズ用APIを用意してあります。オブジェクトのまとめ転送や、簡単なパターンによるフィルタ、シークをなど、です。
また、Dripはプロセス間の同期メカニズムでもあります。新しいオブジェクトの到着を待合せることができます。Dripでは一度保存されたオブジェクトは変化することはありません。複数のプロセスがばらばらの時刻に読み出した情報はどれも同じものですし、誰かが読んだオブジェクトを別の誰かが変更することはありません。この特性は分散ファイルシステムでよく見られる特性で、情報を排他的にアクセスしなくてはならない状況を減らすことができます。

Dripはちょっとしたオブジェクトの保存先であり、プロセス間通信、バッチ処理エーテルであり、ライフログです。単純な仕組みであるため、さまざまな用途への応用が考えられますし、それ故に使い途を想像するのが難しいとも言えます。私のDripを次のようなアプリケーションに使いました。

ちょっと雲をつかむような感じですね。次の節から、身近な同期メカニズムであるQueue、身近なオブジェクトの貯蔵庫であるHashとの違いをそれぞれ見ながら、Dripを紹介します。

Queueとの比較

まずQueueと比較しながらDripにおけるプロセス間の協調の違いを見てましょう。

ここでのQueueとはRubyに付属のQueueクラスです。QueueはFIFOのバッファで、要素は任意のオブジェクトです。Queueにオブジェクトを追加するのはpush、オブジェクトを取り出すのはpopです。popはオブジェクトを返すと同時に、Queueの中からそのオブジェクトを削除します。
同時に複数のスレッドからpopすることも可能ですが、一つの要素は一つのスレッドにだけ届きます。同じ要素が複数のpopに届くことはありません。
空のQueueに対してpopを行うとpopはブロックします。新しいオブジェクトが追加され、そしてそのオブジェクトを獲得したただ一人のスレッドに対してオブジェクトを届けます。

Dripにおいてpopに相当する操作はreadです。readは指定したカーソルより新しい要素を返します。ただし、Dripの中から要素を削除することはありません。複数のスレッドが同じカーソルでreadした場合には、それぞれのスレッドに同じ要素を返します。
カーソルよりも新しい要素がない場合、readはブロックします。新しいオブジェクトがwriteされるとreadのブロックはとけて、新しい要素を返します。この場合も、複数のスレッドに同じ要素が届きます。

DripがQueueやRindaとよく似ているポイントは、要素の到着を待つことができるところです。

また異なるポイントは要素を消費するかどうかです。Queueのpopは要素を消費しますが、Dripのreadでは要素は減りません。これは何度でも/何人でも読めるということです。Rindaではアプリケーションのバグやクラッシュによるタプルの紛失はシステム全体のダウンを意味することがありますが、Dripでは要素の紛失を気にする必要はありません。

具体的なコードでDripのreadの様子を見ていきましょう。

ここで使用するメソッド

ここで使用するメソッドは主に二つです。

Drip#write(obj, *tags)

writeメソッドはDripの状態を変化させる唯一の操作で、要素を追加します。要素objをDripに格納し、格納されたキーを返します。objへのアクセスを容易にするために、複数のタグをしていできます。タグの使い方はあとで説明します。

もう一つのメソッドはreadです。

Drip#read(key, n=1, at_least=1, timeout=nil)

Dripをブラウズする基本となるメソッドがreadです。keyは注目点(カーソル)で、keyよりも後に追加された要素のキーと値の組をn個の配列で返します。要素がat_least個そろうまで、readはブロックします。timeoutを指定することができます。
説明が長いですね。要するに「新しい要素をn返せ。at_least個揃うまでは待機せよ。」です。

Dripのインストールと起動

おっと。Dripのインストールを忘れていました。DripはRBTreeという赤黒木の外部ライブラリを使用します。gemを用意していただいたので次のようにインストールして下さい。

% gem ?????

次にDripサーバを起動します。

Dripはデフォルトでは二次記憶としてプレーンなファイルを使います。Dripを生成するにはファイルを置くディレクトリを指定します。次のスクリプト(drip_s.rb)はDripを生成しdRubyでサービスするものです。

require 'drip'
require 'drb'

class Drip
  def quit
    Thread.new do
      synchronize do |key|
        exit(0)
      end
    end
  end
end

drip = Drip.new('drip_dir')
DRb.start_service('druby://localhost:54321', drip)
DRb.thread.join

Dripにquitメソッドを追加しています。これはRMI経由でこのプロセスを終了させるもので、Dripが二次記憶への操作をしていないとき(synchronize中)を待ってから終わらせます。

次のように起動できます。

% ruby drip_s.rb
MyDrip

MacOSXなどPOSIXなOS専用ですが、MyDripという1人用の起動が簡単なDripサーバも用意されています。これは、ホームディレクトリの直下に.dripというディレクトリを作成し、この中をストレージとするDripで、UNIXドメインソケットを使ってサービスします。UNIXドメインソケットですから、ファイルの権限、可視性によって利用者を制限できます。また、UNIXドメインソケットのファイル名はホームディレクトリ以下のいつも決まったパスで接続できます。
TCPの場合、固定にするにはそのマシンの中である番号のポートをあるサービスに割り当てる、とみんなで約束を守る必要があり、dRubyURIを固定にするのに面倒なところがあります。それに対して、各ユーザのホームディレクトリの下のファイルを使う場合にはみんなで約束しあう必要がありませんから、URI機械的に決めるのが簡単です。

MyDripを利用するにはmy_dripをrequireします。
起動してみましょう。

ターミナル1
% irb -r my_drip --simple-prompt
>> MyDrip.invoke
=> 51252
>> MyDrip.class
=> DRb::DRbObject

MyDripはこの固定のポートを指すDRbObjectですが、特別にinvokeメソッドが定義されています。MyDrip.invokeは新しいプロセスをforkし、必要であればDripデーモン起動します。すでに自分用のMyDripが動いている場合にはなにもせずに終了します。なお、MyDripを終了させるにはMyDrip.quitメソッドを使います。

MyDripはirb実行中にちょっとしたオブジェクトのメモをとるのにも使える便利なデーモンです。筆者の環境ではいつもMyDripを起動してあり、Twitterのタイムラインを常にアーカイブしたり、メモをしたりbotミドルウェアになったりしています。

私の.irbrcは次のようにmy_dripをrequireしています。irbを使っているときはいつでもMyDripにメモできます。

require 'my_drip'


以降の実験では、主にMyDripを利用します。MyDripが利用できない環境の方は、次のように定義した"my_drip.rb"を用意することでdrip_s.rbのサービスを代用して使えます。

MyDrip = DRbObject.new_with_uri('druby://localhost:54321')
再びQueueとの比較

MyDripデーモン(あるいは代用となるdrip_s.rb)が起動している状態で実験です。

writeメソッドを使ってオブジェクトを二つ追加します。writeはDripを変化させる唯一のメソッドです。writeメソッドの戻り値は追加された要素と関連付けられたキーです。キーは時刻(usec)から作られた正の整数で、64bitマシンではしばらくの間はFixnumとなります。

ターミナル2
% irb -r my_drip --simple-prompt
>> MyDrip.write('Hello')
=> 1312541947966187
>> MyDrip.write('world')
=> 1312541977245158

つぎにDripからデータを読んでみます。

ターミナル3
% irb -r my_drip --simple-prompt
>> MyDrip.read(0, 1)
=> [[1312541947966187, "Hello"]]

readはカーソルからn個の要素を読むメソッドで、キーと値のペアの配列を返します。
順に読むには次のようにカーソルを動かしながらreadすると良いでしょう。

>> k = 0
=> 0
>> k, v = MyDrip.read(k, 1)[0]
=> [1312541947966187, "Hello"]
>> k, v = MyDrip.read(k, 1)[0]
=> [1312541977245158, "World"]

二つ読めました。さらに読むとどうなるでしょう。

>> k, v = MyDrip.read(k, 1)[0]

kよりも新しい要素がないのでブロックします。ターミナル2から新しい要素を追加するとブロックがとけ、そのオブジェクトが読めるはずです。

ターミナル2
>> MyDrip.write('Hello, Again')
=> 1312542657718320
>> k, v = MyDrip.read(k, 1)[0]
=> [1312542657718320, "Hello, Again"]

どうですか?待合せできていますか?

読み手を増やしてまた0から読んでみましょう。

ターミナル4
% irb -r my_drip --simple-prompt
>> k = 0
=> 0
>> k, v = MyDrip.read(k, 1)[0]
=> [1312541947966187, "Hello"]
>> k, v = MyDrip.read(k, 1)[0]
=> [1312541977245158, "World"]
>> k, v = MyDrip.read(k, 1)[0]
=> [1312542657718320, "Hello, Again"]

同じ要素が読めました。DripではQueueとちがって要素を消費しませんから、同じ情報をなんども読めます。その代わりにどの辺りの要素を読むのか、readのたびに指定しなくてはなりません。

ここでMyDripを再起動させましょう。quitメソッドを呼ぶとだれもwriteしていないときを見計らってプロセスを終了させます。再起動するにはinvokeを呼びます。MyDrip.invokeはログが大きいと時間がかかるときがあります。

ターミナル1
>> MyDrip.quit
=> #<Thread:...>
>> MyDrip.invoke
=> 61470

readメソッドで先ほどの状態になっているか確認してみましょう。

ターミナル1
>> MyDrip.read(0, 3)
=> [[1312541947966187, "Hello"], [1312541977245158, "World"], [1312542657718320, "Hello, Again"]]
実験のまとめ

Queueと似ている点は、時系列に並んだデータを順に取り出せるところ、データの到着を待合せできるところです。Queueと異なる点はデータが減らないところです。同じ要素を複数のプロセスから読めますし、同じプロセスが何度もよむこともできます。経験上、バッチ処理は開発中も運用中も何度も停まりますよね。Dripでは工夫すれば先ほどの状態から処理を再開できます。途中からでも最初からでもやり直すチャンスがあります。
またQueueとの比較を通じて基本となる二つの操作、write、readを紹介しました。

Hashとの比較

ここではKVS、あるいはHashとDripを比較し、それを通じてDripの操作を学びます。
RubyのHashはキーと値が組になった連想配列で、連想配列の実装にハッシュ表を使うことからHashと呼ばれています。あるキーと関連するのは一つの値です。Dripではwrite時に指定できるタグを使ってHashを模倣することができます。

タグ

Drip#writeには格納したいオブジェクトのほかにタグを指定することができます。タグはStringです。一つのオブジェクトに複数のタグをつけることができます。あるタグを指定してreadすることができるため、オブジェクトをあとで取り出すのが容易になります。このタグを利用するとHashを模倣することができます。

タグをHashのキーと考えてみましょう。Dripにおいて「タグをつけてwriteする」のはHashにおいては「キーに関連する値を設定する」ことになります。「タグをもつ最新の値をreadする」のはHashではキーに関連す値を取り出すことと同じです。「最新の値」を取り出せばHashと同様ですが、それ以前の値を取り出すことができますから、この方法で模倣したHashは、変更履歴を持つHashと言えます。

ここで使用するAPI

ここで新たに使用するAPIはheadとread_tagです。

Drip#head(n=1, tag=nil)

headは先頭からn個の要素の配列を返します。tagを指定すると、そのtagを持つ要素だけを選んでn個返します。Drip中の要素数がnより小さくてもHeadはブロックしません。先頭のn個を覗くだけです。

Drip#read_tag(key, tag, n=1, at_least=1, timeout=nil)

read_tagの基本的な動作はreadと同じですが、tagを指定するところが違います。tagをもつ要素だけをreadします。readと同じですから、keyより新しい要素の数がat_least個に満たない場合は、新しいデータが追加されるまでブロックします。あるタグを持つ要素の追加を待ち合わせることができるわけです。

実験

タグとhead、read_tagを組み合わせてHashを模倣してみましょう。先ほどのMyDripをそのまま使います。

まず値の設定です。

hash['seki.age'] = 29

上記のhashへの操作に相当するのは次の通りです。'seki.age'というタグをつけて29をwriteします。

ターミナル2
>> MyDrip.write(29, 'seki.age')
=> 1313358208178481

値の取り出しにはheadが良いでしょう。'seki.age'タグを持つ要素を先頭から一つ要求します。

ターミナル2
>> MyDrip.head(1, 'seki.age')
=> [[1313358208178481, 29, "seki.age"]]

一つの要素は[キー, 値, 任意個のタグ]で、これらの配列が返ります。値だけを見たいのであれば次のようにしても良いでしょう。

ターミナル2
>> k, v = MyDrip.head(1, 'seki.age')
=> [[1313358208178481, 29, "seki.age"]]
>> v
=> 29

今度は値を再設定してみます。

hash['seki.age'] = 49

Hashでいうと上記のような操作です。'seki.age'に関連する値を49と変更するには、先ほどと同様に'seki.age'というタグをつけて49をwriteすればよいです。writeして、headで確認してみましょう。

ターミナル2
>> MyDrip.write(49, 'seki.age')
=> 1313358584380683
>> MyDrip.head(1, 'seki.age')
=> [[1313358584380683, 49, "seki.age"]]

変更履歴は過去のデータを取り出せばわかります。headを使って最新10バージョンの履歴を調べます。

ターミナル2
>> MyDrip.head(10, 'seki.age')
=> [[1313358208178481, 29, "seki.age"], [1313358584380683, 49, "seki.age"]]

先頭から10個の要素を要求しましたが、いまDripの中にある'seki.age'を持つ要素は二つだけなので、2要素のArrayが返りました。結果が複数返る場合、配列は旧い方から新しい方へ向けて並んでいます。

では存在しないキー(Hashでいうところのキー)を問い合わせるとどうなるでしょう。

ターミナル2
>> MyDrip.head(1, 'sora_h.age')
=> []

空の配列が返りました。ブロックもしません。headはブロックしない操作なので、要素が見つからないときは空の配列を返します。
狙った要素が追加を待ち合わせするにはread_tagを使います。

ターミナル2
>> MyDrip.read_tag(0, 'sora_h.age')

ブロックしますね。別の端末から値を設定してみます。

ターミナル3
>> MyDrip.write(12, 'sora_h.age')
=> 1313359385886937

read_tagのブロックは解けて、いま追加したオブジェクトが返ります。

ターミナル2
>> MyDrip.read_tag(0, 'sora_h.age')
=> [[1313359385886937, 12, "sora_h.age"]]
実験のまとめ

タグをうまく使うとHashの基本操作である値の設定と取り出しが模倣できることがわかりました。Hashと違うところは次の点です。

  • 要素は消せない
  • 履歴がある
  • keys/eachがない

Hashと違い要素を削除することはできませんが、nil、あるいは削除状態を表わす特別なオブジェクトを設定するなどによって代用できると思います。また、要素を削除できない副産物として変更の履歴を全て見ることができます。
keysとeachが用意されないのは意図してのことです。簡単に作れるので一度作成しましたが削除しました。現在、DripにそのAPIは残っていません。keysを実装するには全ての要素を一度集める必要がありますが、要素数が大きくなったときに破綻する可能性があるからです。多くの分散ハッシュテーブルでもkeysは用意されていないのではないかと思います。

TupleSpaceと似ている点があります。read_tagを使うと要素の追加や更新を待ち合わせることができます。これはRindaのTupleSpaceにおけるreadのパターンマッチングを非常に限定したものと考えられます。ある特定のタグをもつ要素が追加されるまでプロセスを待たせることができます。このパターンマッチはRindaと比較すると非常に貧弱なものですが、実際のアプリケーションの多くには充分ではないか予想しています。
DripではRindaで広げすぎた仕様を狭くして、最適化しやすい単純な仕組みに挑戦しています。Rindaはインメモリを中心にRuby的な豪華な世界を表現しました。これに対しDripでは永続化を前提として協調機構を考え直しより単純なもの目指しました。
この予想を検証するにはもっと多くのアプリケーションが必要ですね。

この二つの節ではQueue、Hashとの比較を通じてDripを説明してきました。単純な追記しかないストリームでもちょっと凝ったデータ構造が表現できそうです。多くのデータ構造においてeachが定義できるわけですから、世界のほとんどは一直線にならべることができるかもしれませんしね。

QueueやHashと比較してDripを説明しました。

キーとブラウズ

ここではDripに格納されたデータをブラウズする方法を学びます。Dripでは全ての要素はwriteされた順に並んでいますから、Dripにおけるデータのブラウズは時間軸に沿って旅をするようなものです。
ブラウズに使うAPIのほとんどは注目点(カーソル)のキーを引数にとります。まずキーの規則を説明し、次にブラウズの実際を見ていきます。

キー

Drip#writeすると、その要素に対応するキーが返ります。キーは単調増加の整数で、あたらしいキーはこれまでに格納されたどのキーよりも大きくなります。現在の実装ではキーは次のように計算されます。

  def time_to_key(time)
    time.tv_sec * 1000000 + time.tv_usec
  end

キーは時刻から計算された整数です。64bitマシンにおいて(当面の間は)Fixnumです。usecの分解能しかありませんから、1 usecのうちに複数の要素を書き込める際に衝突が発生します。この場合、最も大きなキーより一つ大きな値が選択されます。

      # lastは最後の(最大の)キー
      key = [time_to_key(at), last + 1].max

0が最古のキーとなります。一番旧い要素を指定するときに思い出して下さい。

ブラウズ

これまでの実験でread、read_tag、headを試しました。他にも次のようなAPIがあります。

  • 未来方向へのブラウズ / read, read_tag, newer
  • 過去方向へのブラウズ / head, older

DripのアプリケーションではこれらのAPIを使って時間軸を前後に移動します。タグをうまく使ってスキップすることもあります。

この節では、タグを使って任意の要素へシークしそこから順に読む操作を紹介します。

次の疑似コードは全ての要素を4つずつ読み出していく例です。kが注目点です。kをreadした要素の最後のキーとしながら繰り返すことで、要素を順に読んでいくことができます。

while true
  ary = drip.read(k, 4, 1)
  ...
  k = ary[-1][0]
end

この疑似コードをirbで分解しながら実行していきます。MyDripは動いていますか?この実験もMyDripを使います。まずirbからMyDripにテスト用のデータを書き込みます。

ターミナル1
% irb -r my_drip --simple-prompt
>> MyDrip.write('sentinel', 'test1')
=> 1313573767321912
>> MyDrip.write(:orange, 'test1=orange')
=> 1313573806023712
>> MyDrip.write(:orange, 'test1=orange')
=> 1313573808504784
>> MyDrip.write(:blue, 'test1=blue')
=> 1313573823137557
>> MyDrip.write(:green, 'test1=green')
=> 1313573835145049
>> MyDrip.write(:orange, 'test1=orange')
=> 1313573840760815
>> MyDrip.write(:orange, 'test1=orange')
=> 1313573842988144
>> MyDrip.write(:green, 'test1=green')
=> 1313573844392779

はじめに書いたのは実験を始めた時点を記録するための錨のような要素です。それ以降、オレンジ、オレンジ、青、緑、オレンジ、オレンジ、緑とオブジェクトをwriteしました。色の名前に対応したタグをつけてあります。

ターミナル2
% irb -r my_drip --simple-prompt
>> k, = MyDrip.head(1, 'test1')[0]
=> [1313573767321912, "sentinel", "test1"]
>> k
=> 1313573767321912

まず"test1"というタグをつけた錨の要素をキーを手に入れます。実験の出発点となります。headを使うのが良いでしょう。

次に錨の以降の要素を4つreadします。

ターミナル2
>> ary = MyDrip.read(k, 4)
=> [[1313573806023712, :orange, "test1=orange"], [1313573808504784, :orange, "test1=orange"], [1313573823137557, :blue, "test1=blue"], [1313573835145049, :green, "test1=green"]]

読めましたか?次に注目点を更新して、もう一度4つreadしてみましょう。

ターミナル2
>> k = ary[-1][0]
=> 1313573835145049
>> ary = MyDrip.read(k, 4)
=> [[1313573840760815, :orange, "test1=orange"], [1313573842988144, :orange, "test1=orange"], [1313573844392779, :green, "test1=green"]]

続きの3つの要素が返りました。これはk以降の要素が3つしかないからです。さらに読むとどうなるでしょう。みなさんの予想通り、readはブロックするはずです。

ターミナル2
>> k = ary[-1][0]
=> 1313573844392779
>> ary = MyDrip.read(k, 4)

別の端末からなにかwriteしてreadが動き出すか、確認します。

ターミナル1
>> MyDrip.write('hello')
=> 1313574622814421

解除されましたか?

次はread_tagを使ってフィルタする例を示します。注目点を巻き戻してもう一度実験です。

ターミナル2
>> k, = MyDrip.head(1, 'test1')[0]
=> [1313573767321912, "sentinel", "test1"]

注目点より新しいデータで、タグが'test1=orange'のものを4つ(最低でも2つ)readせよ、としてみましょう。

>> ary = MyDrip.read_tag(k, 'test1=orange', 4, 2)
=> [[1313573806023712, :orange, "test1=orange"], [1313573808504784, :orange, "test1=orange"], [1313573840760815, :orange, "test1=orange"], [1313573842988144, :orange, "test1=orange"]]

オレンジばかり、4つ手に入りました。

注目点を更新して、もう一度同じ操作をしてみます。

>> k = ary[-1][0]
=> 1313573842988144
>> ary = MyDrip.read_tag(k, 'test1=orange', 4, 2)

新しい注目点よりも後には、オレンジの要素が一つもありませんからブロックします。別の端末からオレンジを2つwriteすれば、このread_tagは動き出すでしょう。

ターミナル1
>> MyDrip.write('more orange', 'test1=orange')
=> 1313575076451864
>> MyDrip.write('more orange', 'test1=orange')
=> 1313575077963911
ターミナル2
>> ary = MyDrip.read_tag(k, 'test1=orange', 4, 2)
=> [[1313575076451864, "more orange", "test1=orange"], [1313575077963911, "more orange", "test1=orange"]]

ここではタグを使ったシークと、そこからのread、フィルタを使ったread_tagの例を示しました。Dripのデータの中をブラウズする際の基本となるイディオムです。

ほかにもいくつかユーティリティメソッドがあります。

Drip#newer(key, tag=nil)

keyより新しいものを一つ返します。tagを指定することもできます。newerはread/read_tagのラッパーです。新しい要素が無い場合ブロックせず、nilを返します。

Drip#older(key, tag=nil)

keyよりも旧いものを一つ返します。tagを指定することもできます。新しい要素が無い場合ブロックせず、nilを返します。

そうそう。普通過ぎてわすれていましたが、キーがわかっているときに対応する値を取り出すAPIもあります。

Drip#[](key)
>> k, = MyDrip.head(1, 'test1')[0]
=> [1313573767321912, "sentinel", "test1"]
>> MyDrip[k]
=> ["sentinel", "test1"]

値とタグからなる配列を返します。キーは返りません。

これで主要なAPIの説明は終わりです。そういえばRubyでよく見かけるeachはありませんでしたね。それについてはつぎのクドい話を読んで下さい。

APIの設計指針

DripはdRubyと組み合わせて使うのを前提としてAPIを設計しました。dRubyの弱点はいくつかありますが、特に苦手なのはサーバ側のオブジェクトの寿命と排他制御の管理、そしてRMIの遅さです。サーバ側に状態をもつオブジェクトを作らないこと、RMIの回数を減らすことはAPIの選択の指針となります。

さきほどのreadメソッドに与えるキーについて、もう一度よく見てみましょう。readのキーは、データベース中の視点、カーソル、ページといった概念に近いものです。よくあるデータベースのAPIでは「カーソル」はコンテキストの中に隠されています。例えばRubyのFileオブジェクトは現在の位置を覚えていて、ファイル中のその位置から読んだり、その位置へ書いたりします。これに対し、DripではFileオブジェクトのような状態/コンテキストをもつオブジェクトを用いません。Dripへの質問は状態の変化を伴わない、関数のようになっています。位置などのコンテキストを管理するオブジェクトの代わりに、注目点となるキーを使うのです。このAPIを選択した理由は、コンテキストを管理するオブジェクトをDripサーバの中で生成しないためです。DripはdRubyを経由したRMIで利用されることを前提としています。生成と消滅(beginとend、openとclose)があるようなコンテキストを導入すると、その寿命をサーバが気にする必要が生まれます。分散環境でのGCといった難しい問題に向かい合わなくてはなりません。このため、Dripではそのような面倒を嫌ってInteger(キー)だけの付き合いとなるようにAPIを設計しました。
この節で示した通り、コンテキストを管理するオブジェクトを使う代わりに、readのたびに返されるキーを使ってアクセスすることで、同様な操作を実現できます。もしこのAPIでの操作が面倒と感じるなら、ローカルなプロセスの中でキーを隠すようなコンテキストを準備することを勧めます。間違ってDripサーバ側にコンテキストを用意しないよう注意して下さいね。

readでは、自分の知らない情報を一度に最大n個、少なくともm個を返せ、と指示します。n回のreadで構成すると、RMIの回数が増えてしまいますが、このように一度に転送すればRMIの回数を削減できます。応答時間よりも処理時間が重要なバッチ処理などのケースで有効です。「少なくともm個」を指定することで、イベントの(データの)発生の都度RMIを発生させずにすみます。ほどほどにデータがたまるのを待って一度に転送することができるからです。

Dripはストレージに関する一連の習作の経験から、「作りすぎない」ことに留意しました。「作る」ことは楽しいので、請われるままに機能を増やしてしまうことがしばしば起こります(私はそういう経験があります)。Dripのポリシーを明確にして、機能を増やしてしまう誘惑と戦いました。

アプリケーション

簡易検索システム

ここでは非常に小さな検索システムを作ります。検索システムのミニチュアの作成を通じてDripの応用のヒントとして下さい。
このシステムには主に三つのプロセスが登場します。自分のマシンにあるRubyスクリプトを探してはDripに登録するクロウラ、Dripへ登録されたファイルを検索のために索引をつけるインデクサ、そして中心となるMyDripサーバです。

動かし方

この実験でもMyDripを使用しますので、事前にMyDrip.invokeするか、Windows環境では代替となるサーバを起動しておいて下さいね。

$ irb -r drip -r my_drip
>> MyDrip.invoke
=> 45616 


今回のサンプルはDripのソースコードの中にも含まれています。まずはダウンロードしてみましょう。

$ cd ~
$ git clone git://github.com/seki/Drip.git
$ cd Drip/sample/demo4book

実際にcrawlerを動かす前にcrawler.rbの10行目に、検索したいディレクトリして下さい。
ファイル数が多いと実験に時間が非常にかかるので、少ないディレクトリを選んでください。500ファイル程度が実験しやすいのではないかと思います。今回はソースコードディレクトリを指定しました。

@root = File.expand_path('~/Drip/')

以下のようにcrawl.rbを実行するとcrawlするごとにファイルの一覧が表示されます。

$ ruby crawl.rb 
["install.rb",
 "lib/drip/version.rb",
 "lib/drip.rb",
 "lib/my_drip.rb",
 "sample/copocopo.rb",
 "sample/demo4book/crawl.rb",
 "sample/demo4book/index.rb",
 "sample/drip_s.rb",
 "sample/drip_tw.rb",
 "sample/gca.rb",
 "sample/hello_tw.rb",
 "sample/my_status.rb",
 "sample/simple-oauth.rb",
 "sample/tw_markov.rb",
 "test/basic.rb"]

次に別のターミナルでインデクサを起動し、探したい単語を入力すると、その単語が存在するファイル名を一覧として表示します。
ここでは「def」という単語を検索しています。起動してすぐはまだ索引が完全でないので、急いでなんども検索すると索引対象が増えていく様子を見られるかもしれません。

$ ruby index.rb
def
["sample/demo4book/index.rb", "sample/demo4book/crawl.rb"]
2
def     
["sample/drip_s.rb",
 "lib/drip.rb",
 "lib/my_drip.rb",
 "sample/copocopo.rb",
 "sample/demo4book/index.rb",
 "sample/demo4book/crawl.rb"]
6

クロウラは60秒置きに更新を調べるようになっています。標準入力からなにか入力すると、更新の合間を待ってから終了します。これは、一般的な検索システムのクロウラを模倣して、適度に休むようにしてあります。とくにWebページなど検索対象が広い場合などは頻繁な更新情報の収集にはムリがあります。
なお、クローラを休ませる時間を短くすればファイルを更新してすぐに索引に反映されるようになります。このクローラを改造していくことで、自分だけのちょっとしたリアルタイム検索ツールになるかもしれません。また最近のOSでしたらファイルの更新自体をイベントとして知ることができると思うので、そういった機構をトリガーとするのも面白いと思います。

ここからはソースコードを解説していきます。

投入する要素

このシステムでDripに投入するオブジェクトとタグについて説明します。主に使用するのは「ファイル更新通知」です。

  • ファイル更新通知 - ファイル名、内容、更新日の配列です。'rbcrawl'と'rbcrawl-fname=ファイル名'の二つのタグを持ちます。

クロウラは更新されたファイルを見つけるたびにこの情報をwriteします。これはファイル内容のアーカイブであると同時に、更新を通知するイベントになります。インデクサは更新通知を見つけるたびに索引を更新します。

補助的に利用するものもあります。

  • クロウラの足跡 - ひとまとまりの処理のなかで更新したファイル名の一覧と、その時刻をメモします。'rbcrawl-footprint'というタグを持ちます。
  • 実験開始を示すアンカー - 'rbcrawl-begin'というタグを持ちます。何度か実験を繰り返しているうちにはじめからやり直したくなったらこのタグでなにかwriteしてください。

ではこれらのオブジェクトやタグがどのように使われているか見てみましょう

クロウラ

簡易クロウラの動作を説明します。

class Crawler
  include MonitorMixin

  def initialize
    super()
    @root = File.expand_path('~/develop/git-repo/')
    @drip = MyDrip
    k, = @drip.head(1, 'rbcrawl-begin')[0]
    @fence = k || 0
  end

  def last_mtime(fname)
    k, v, = @drip.head(1, 'rbcrawl-fname=' + fname)[0]
    (v && k > @fence) ? v[1] : Time.at(1)
  end

  def do_crawl
    synchronize do
      ary = []
      Dir.chdir(@root)
      Dir.glob('**/*.rb').each do |fname|
        mtime = File.mtime(fname)
        next if last_mtime(fname) >= mtime
        @drip.write([fname, mtime, File.read(fname)],
                    'rbcrawl', 'rbcrawl-fname=' + fname)
        ary << fname
      end
      @drip.write(ary, 'rbcrawl-footprint')
      ary
    end
  end
  
  def quit
    synchronize do
      exit(0)
    end
  end
end

まず、指定したディレクトリ(@root)以下にある*.rbのファイルを探します。そしてその更新時刻を調べ、新しいファイルを見つけたらその内容や時刻をwriteします。
これは実際には以下のようなデータを書き込んでいます。

@drip.write(
  ["sample/demo4book/index.rb", 2011-08-23 23:50:44 +0100, "ファイルの中身"], 
  "rbcrawl", "rbcrawl-fname=sample/demo4book/index.rb"
)

値はファイル名、時刻、ファイルの中身からなる配列で、それに対して二つのタグがついています。

クロウラは60秒置きに更新を調べるようになっています。標準入力からなにか入力すると、更新の合間を待ってから終了します。一回の処理で見つけたファイル名の配列を'rbcrawl-footprint'というタグをつけて覚えておきます。たとえば、以下のようなデータを書き込みます。

@drip.write(["sample/demo4book/index.rb"], 'rbcrawl-footprint')

このバージョンのクロウラはファイルの削除を追いかけませんが、この足跡情報を使えば削除を知ることができるかもしれません。

更新されたか否かは、headメソッドで一つ前のバージョンを探し比較して検査します。
'rbcrawl-fname=ファイル名'というタグでheadすることで、直前のバージョン(つまりDripに書かれている最新のバージョン)を調べることができます。

k, v = @drip.head(1, "rbcrawl-fname=sample/demo4book/index.rb")[0]

以下に完全なクロウラを載せます。

require 'pp'
require 'my_drip'
require 'monitor'

class Crawler
  include MonitorMixin

  def initialize
    super()
    @root = File.expand_path('~/develop/git-repo/')
    @drip = MyDrip
    k, = @drip.head(1, 'rbcrawl-begin')[0]
    @fence = k || 0
  end

  def last_mtime(fname)
    k, v, = @drip.head(1, 'rbcrawl-fname=' + fname)[0]
    (v && k > @fence) ? v[1] : Time.at(1)
  end

  def do_crawl
    synchronize do
      ary = []
      Dir.chdir(@root)
      Dir.glob('**/*.rb').each do |fname|
        mtime = File.mtime(fname)
        next if last_mtime(fname) >= mtime
        @drip.write([fname, mtime, File.read(fname)],
                    'rbcrawl', 'rbcrawl-fname=' + fname)
        ary << fname
      end
      @drip.write(ary, 'rbcrawl-footprint')
      ary
    end
  end
  
  def quit
    synchronize do
      exit(0)
    end
  end
end

if __FILE__ == $0
  crawler = Crawler.new
  Thread.new do
    while true
      pp crawler.do_crawl
      sleep 60
    end
  end

  gets
  crawler.quit
end
インデクサ

このインデクサは索引の作成、更新と、検索そのものも提供します。指定した単語を含んでいるファイルの名前を返します。このサンプルは実験用のミニチュアなので、インメモリに索引を作ることにしました。rbtreeが必要ですが、Dripが動いているならrbtreeはインストールされていると思います。

class Indexer
  def initialize(cursor=0)
    @drip = MyDrip
    @dict = Dict.new
    k, = @drip.head(1, 'rbcrawl-begin')[0]
    @fence = k || 0
    @cursor = [cursor, @fence].max
  end
  attr_reader :dict

  def update_dict
    each_document do |cur, prev|
      @dict.delete(*prev) if prev
      @dict.push(*cur)
    end
  end

  def each_document
    while true
      ary = @drip.read_tag(@cursor, 'rbcrawl', 10, 1)
      ary.each do |k, v|
        prev = prev_version(k, v[0])
        yield(v, prev)
        @cursor = k
      end
    end
  end

  def prev_version(cursor, fname)
    k, v = @drip.older(cursor, 'rbcrawl-fname=' + fname)
    (v && k > @fence) ? v : nil
  end
end

インデクサはDripから'rbcrawl'タグのついたオブジェクトを取り出し、その都度、索引を更新します。

@drip.read_tag(@cursor, 'rbcrawl', 10, 1)

第4引数の「1」に注目して下さい。先ほど「keyより新しい要素の数がat_least個に満たない場合は、新しいデータが追加されるまでブロックします」と説明したのを覚えていますか?一度に10個ずつ、最低でも1個ずつ返せ、という指示ですから返せる要素が一つもないときにはブロックします。
これによりクロウラが'rbcrawl'タグのデータを挿入するのをブロックしながら待ち合わせている事になります。

インデクサにとってrbcrawlタグのオブジェクトは更新イベントであると同時に文書でもあります。更新されたファイル名、更新時刻、内容がまとめて手に入ります。
また、DripはQueueとちがい、すでに読んだ要素を再び読むことが可能です。注目点の直前の要素を調べるolderなどで調べることが可能です。

def prev_version(cursor, fname)
  k, v = @drip.older(cursor, 'rbcrawl-fname=' + fname)
  (v && k > @fence) ? v : nil
end

通知されたファイルに旧いバージョンの文書があった場合、インデクサは旧い内容を使って索引を削除してから、新しい内容で索引を追加します。

def update_dict
  each_document do |cur, prev|
    @dict.delete(*prev) if prev
    @dict.push(*cur)
  end
end

インデクサは起動されるとスレッドを生成してサブスレッドでDripからのread_tagと索引づけを行います。

indexer ||= Indexer.new(0)
Thread.new do
  indexer.update_dict
end

メインスレッドではユーザーからの入力を待ち、入力されるとその単語を探して検索結果を印字します。

while line = gets
  ary = indexer.dict.query(line.chomp)
  pp ary
  pp ary.size
end

以下に完全なインデクサを載せます。

require 'nkf'
require 'rbtree'
require 'my_drip'
require 'monitor'
require 'pp'


class Indexer
  def initialize(cursor=0)
    @drip = MyDrip
    @dict = Dict.new
    k, = @drip.head(1, 'rbcrawl-begin')[0]
    @fence = k || 0
    @cursor = [cursor, @fence].max
  end
  attr_reader :dict

  def update_dict
    each_document do |cur, prev|
      @dict.delete(*prev) if prev
      @dict.push(*cur)
    end
  end

  def each_document
    while true
      ary = @drip.read_tag(@cursor, 'rbcrawl', 10, 1)
      ary.each do |k, v|
        prev = prev_version(k, v[0])
        yield(v, prev)
        @cursor = k
      end
    end
  end

  def prev_version(cursor, fname)
    k, v = @drip.older(cursor, 'rbcrawl-fname=' + fname)
    (v && k > @fence) ? v : nil
  end
end

class Dict
  include MonitorMixin
  def initialize
    super()
    @tree = RBTree.new
  end

  def query(word)
    synchronize do
      @tree.bound([word, 0, ''], [word + "\0", 0, '']).collect {|k, v| k[2]}
    end
  end

  def delete(fname, mtime, src)
    synchronize do
      each_tree_key(fname, mtime, src) do |key|
        @tree.delete(key)
      end
    end
  end

  def push(fname, mtime, src)
    synchronize do
      each_tree_key(fname, mtime, src) do |key|
        @tree[key] = true
      end
    end
  end

  def intern(word)
    k, v = @tree.lower_bound([word, 0, ''])
    return k[0] if k && k[0] == word
    word
  end

  def each_tree_key(fname, mtime, src)
    NKF.nkf('-w', src).scan(/\w+/m).uniq.each do |word|
      yield([intern(word), mtime.to_i, fname])
    end
  end
end

if __FILE__ == $0
  indexer ||= Indexer.new(0)
  Thread.new do
    indexer.update_dict
  end

  while line = gets
    ary = indexer.dict.query(line.chomp)
    pp ary
    pp ary.size
  end
end
クロウラの動作間隔とインデクサの同期

このサンプルで示したかったものの一つに、複数の処理が自分の都合のよいタイミングで動作するというものがあります。

クロウラは定期的に動作を開始します。クロウラはインデクサの状態など気にせずに処理を行い、更新を見つけてはwriteします。
インデクサも同様です。インデクサはクロウラの動作状況を気にせず、これまでDripに格納されていた文書をまとめて取り出しては索引の更新を行います。文書を処理し終わったら、新しい文書がwriteされるまで休眠状態になります。

データの流れとしては、クロウラが発生源で、Dripに蓄えられて、インデクサがそれを取り出し索引を作ります。しかし、クロウラが発生させた処理の中でインデクサが動作するわけではありません。たとえば、オブザーバーパターンでクロウラ→インデクサとコールバック等のメソッド呼び出しの連鎖のなかで索引更新が行われると想像してみてください。クロウラ側の更新を調べる処理は、索引の更新と直列に動作し律速してしまいます。
Dripにおけるイベントの通知は、受動的ではありません。リスナ側が自分の都合のよいときに能動的に行われます。このスタイルはアクターモデルともよく似ています。インデクサは自分の担当する仕事が一通り終わって、自分の状態が安定してから次の文書を取り出します。dRubyRMIがサブスレッドにより気付かないうちに実行されるのと対照的ですね。

ややこしい喩え話はともかく、クロウラはインデクサの処理を待つことなく動きますし、インデクサはクロウラの処理の頻度と関係なく自分のペースで動きます。Dripはメッセージングのミドルウェアとして彼らの間をゆるく仲介します。

フェンスと足跡

実験を繰り返していると、最初の状態からやり直したくなることがあるでしょう。Dripのデータベースを作り直せばやりなおせますが、でもMyDripはこのアプリケーション以外からも雑多な情報をwriteされているでそれは抵抗がありますよね。
そこでこのアプリケーションの始まりの点を閉めすオブジェクトを導入することに。'rbcrawl-begin'というタグを持つオブジェクトがあるときは、それよりも旧い情報を無視することで、それ以前のオブジェクトに影響されずに実験できます。@fenceはクロウラ、インデクサのどちらでも使っているので読んでみて下さい。
具体的にはolderやheadの際にそのキーをチェックして、@fenceよりも旧かったら無視することにします。

=> MyDrip.write('fence', 'rbcrawl-begin')
>> 1313573767321913

インデクサが索引を二次記憶に書くようになると、プロセスの寿命と索引の寿命が異なるようになります。このような状況にはしばしば出会うと思います。このとき、インデクサが処理を進めたポイントに足跡となるオブジェクトを残すことで、次回の起動に備えることができます。先のフェンスは無効となるポイントを示しましたが、この場合の足跡はまだ処理していないポイントを示すことになります。

RBTree

ここまではcrawlerとindexerがDripのタグや待ち合わせ機能をつかって、どのように新しい文章をインデックスに更新させるかについて説明してきました。
でも実際の検索用インデックスがどのように作られているかにも興味ありませんか?

先に示したインデクサはRBTreeという拡張ライブラリを利用しています。RBTreeは赤黒木という検索に適した二分木のデータ構造とアルゴリズムを提供します。RubyのTreeではなく、red-black treeの略と思われます。Hashはハッシュ関数という魔法の関数を用意して、キーとなるオブジェクトからハッシュ値へ変換し、値を探します。RBTreeでは常にソート済みの列(実装は木だけど、木としてアクセスするAPIは用意されない)を準備しておき、二分探索を使って値を探します。「並んでいる」という性質を利用するといろいろおもしろいことができます。

本の索引を見て下さい。単語ごとにそれが出現する場所(本ならページ番号)が複数並んでいますよね。Hashで実装すると、ほぼこのままに表現できます。

class Dict
  def initialize
    @hash = Hash.new {|h, k| h[k] = Array.new}
  end

  def push(fname, words)
    words.each {|w| @hash[w] << fname}
  end

  def query(word, &blk)
    @hash[word].each(&blk)
  end
end

dict = Dict.new
dict.push('lib/drip.rb', ['def', 'Drip'])
dict.push('lib/foo.rb', ['def'])
dict.push('lib/bar.rb', ['def', 'bar', 'Drip'])
dict.query('def') {|x| puts x}

ファイルが更新されたあとに行われる、二巡目の索引処理ではどうでしょう。
旧くなった索引の削除や新しい索引の登録にはHashの中のArrayを全て読まなくてはなりません。これに対応するには、内側のArrayをHashにすれば効率よくなります。

class Dict2
  def initialize
    @hash = Hash.new {|h, k| h[k] = Hash.new}
  end

  def push(fname, words)
    words.each {|w| @hash[w][fname] = true}
  end

  def query(word)
    @hash[word].each {|k, v| yield(k)}
  end
end

入れ子のHashのキーを使って索引を表現することができました。値は使い途がなくなってしまったところが興味深いです。入れ子のHashはなんだかツリー構造みたいですね。

RBTreeもHashと同様のAPIを提供していますから、上記のHashをRBTreeに置き換えて索引を表現することも可能ですが、もっとRBTreeらしい作戦を紹介します。
二つ目のHashの例では入れ子のHashのキーを使いましたが、これをもう少し発展させましょう。単語と出現場所(ファイル名)をキーとします。入れ子のHashが組み立てていたツリー構造をフラットにしたようなもの、と言えます。

require 'rbtree'

class Dict3
  def initialize
    @tree = RBTree.new
  end

  def push(fname, words)
    words.each {|w| @tree[[w, fname]] = true}
  end

  def query(word)
    @tree.bound([word, ''], [word + "\0", '']) {|k, v| yield(k[1])}
  end
end

queryメソッドで使用しているboundは、二つのキーの内側にある要素を調べるメソッドです。lowerとupperを指定します。
ある単語を含むキーの最小値と、ある単語を含むキーの最大値を指定すれば、その単語の索引が手に入りますね。最小値は、一つ目の要素が対象の単語で、二つ目の要素が最も小さな文字列、つまり''で構成された配列です。では最も大きな文字列(何と比較しても大きい文字列)はなんでしょう。ちょっと思いつきませんね。代わりに「目的の単語の直後の単語を含むキーの最小値」を使います。RubyのStringには"\0"を含めることができますから、ある文字列よりも大きい最小の文字列は "\0" を連結したものと言えます。ちょっとトリッキーですね。そういう汚いものはメソッドに隠してしまいましょう。

  def query(word)
    @tree.bound([word, ''], [word + "\0", '']) {|k, v| yield(k[1])}
  end

この例では単語の出現場所の識別子はファイル名です。先ほどのインデクサではドキュメントのIDとしてファイルの更新時刻とファイル名を用いました。さらに出現した行の番号を覚えたらどうなるか、などいろいろなバリエーションを想像してキーを考えるのも楽しいでしょう。

boundの仲間にはlower_bound、upper_boundというバリエーションもあります。狙ったキーの直前、直後(そのキーを含みます。以上、以下みたいな感じ。)などを調べられます。並んでいるキーとlower_boundを使ってand検索やor検索も効率よく行えます。次のコード片はand検索を行うものです。二つのカーソルを使い、カーソルが一致したときがand成功、カーソルが異なる場合には後側の単語のカーソルを先行する単語のカーソルの点からlower_boundさせます。これを繰り返すと、スキップしながらand検索が可能です。

次のスクリプトは、lower_boundを使ったand検索のアルゴリズムを実験するものです。起動引数に与えたファイルの中から'def'と'initialize'が同時に出現する行を探します。文書の「位置」はこのケースでは「ファイル名」「行番号」を選びました。

require 'rbtree'
require 'nkf'

class Query2
  def initialize
    @tree = RBTree.new
  end
  
  def push(word, fname, lineno)
    @tree[[word, fname, lineno]] = true
  end

  def fwd(w1, fname, lineno)
    k, v = @tree.lower_bound([w1, fname, lineno])
    return nil unless k
    return nil unless k[0] == w1
    k[1..2]
  end

  def query2(w1, w2)
    f1 = fwd(w1, '', 0)
    f2 = fwd(w2, '', 0)
    while f1 && f2
      cmp = f1 <=> f2
      if cmp > 0
        f2 = fwd(w2, *f1)
      elsif cmp < 0
        f1 = fwd(w1, *f2)
      else
        yield(f1)
        f1 = fwd(w1, f1[0], f1[1] + 1)
        f2 = fwd(w2, f2[0], f2[1] + 1)
      end
    end
  end
end

if __FILE__ == $0
  q2 = Query2.new

  while line = ARGF.gets
    NKF.nkf('-w', line).scan(/\w+/) do |word|
      q2.push(word, ARGF.filename, ARGF.lineno)
    end
  end

  q2.query2('def', 'initialize') {|x| p x}
end

boundでなくlower_bound、upper_boundを使うメリットはもう一つあります。
boundの場合、その範囲に入っている要素の数が大きいとき、それだけのArrayをメモリに作ってしまいますが、lower_boundによって少しずつスコープを動かしていけば検索の回数は増えますが、一度に使用するメモリ、RMIであればそのためのバッファも減らすことができます。

順序のあるデータ構造、RBTreeは、実はDripの内部でも使われています。基本となるのはDripのキー(整数のキー)をそのまま使う集合です。もう一つ、タグのための集合にもRBTreeを使っています。この集合は[タグ(String), キー(Integer)]という配列をキーにします。

['rbcrawl-begin', 100030]
['rbcrawl-begin', 103030]
['rbcrawl-fname=a.rb', 1000000]
['rbcrawl-fname=a.rb', 1000020]
['rbcrawl-fname=a.rb', 1000028]
['rbcrawl-fname=a.rb', 1000100]
['rbcrawl-fname=b.rb', 1000005]
['rbcrawl-fname=b.rb', 1000019]
['rbcrawl-fname=b.rb', 1000111]

これなら、'rbcrawl-begin'をもつ最新のキーや、注目点直前の'rbcrawl-fname=a.rb'のキーなどが二分探索のコストで探せます。

Rindaの場合は強力なパターンマッチと引き換えに、Arrayを基本としたデータ構造を内部で使っていたため、データ量に比例して検索時間が増加する(O(N))という問題がありました。Dripの場合はRBTreeを使う事でtagやkeyの開始点まで比較的素早くブラウズが可能になっています。(O(log n))

このデータ構造のおかげで「消えないキュー」「いらなくなったら'rbcrawl-begin'でリセット」といった、一見富豪的なデータストレージが可能になっています。

まとめにかえて

この章の最後に、この小さな検索システムにERBの章で見せたようなWeb UIを追加してみましょう。この検索システムはクロウラとインデクサ、そしてミドルウェアのDripで構成されていました。ここにWEBrick::HTTPServerとサーブレットによるWeb UIを追加してみましょう。ERBの章ではWEBrick::CGIサーバを使って実験しました(覚えてますか?)。今回はHTTPServerを載せてみます。

こんなにたくさんのプロセスを起動するのは面倒ですよね。そこで、クロウラ、インデクサ、HTTPServer、Web UIを一つのプロセスに配置することにしましょう。dRubyを使って作ったシステムは、もともとプロセスの境界はRubyそっくりにできています。このためプロセス構成、オブジェクトの配置を変更するのは意外と簡単です。全部を一つに入れた完成版のスクリプトを以下に示します。

require 'index'
require 'crawl'
require 'webrick'
require 'erb'

class DemoListView
  include ERB::Util
  extend ERB::DefMethod
  def_erb_method('to_html(word, list)', ERB.new(<<EOS))
<html><head><title>Demo UI</title></head><body>
<form method="post"><input type="text" name="w" value="<%=h word %>" /></form>
<% if word %>
<p>search: <%=h word %></p>
<ul>
<%   list.each do |fname| %>
<li><%=h fname%></li>
<%   end %>
</ul>
<% end %>
</body></html>
EOS
end

class DemoUIServlet < WEBrick::HTTPServlet::AbstractServlet
  def initialize(server, crawler, indexer, list_view)
    super(server)
    @crawler = crawler
    @indexer = indexer
    @list_view = list_view
  end

  def req_query(req, key)
    value ,= req.query[key]
    return nil unless value
    value.force_encoding('utf-8')
    value
  end

  def do_GET(req, res)
    word = req_query(req, 'w') || ''
    list = word.empty? ? [] : @indexer.dict.query(word)
    res['content-type'] = 'text/html; charset=utf-8'
    res.body = @list_view.to_html(word, list)
  end
  
  alias do_POST do_GET
end

if __FILE__ == $0
  crawler = Crawler.new
  Thread.new do
    while true
      pp crawler.do_crawl
      sleep 60
    end
  end

  indexer = Indexer.new
  Thread.new do
    indexer.update_dict
  end

  server = WEBrick::HTTPServer.new({:Port => 10080,
                                    :BindAddress => '127.0.0.1'})
  server.mount('/', DemoUIServlet, crawler, indexer, DemoListView.new)
  trap('INT') { server.shutdown }
  server.start
  crawler.quit
end

あたらしいクラスは二つです。一つはDemoUIServletで、Web UIを司ります。もう一つはDemoListViewクラス、CGIの見た目を生成するViewオブジェクトです。
「if __FILE__ == $0」で囲まれたメイン部を見てみます。ここではcrawl.rbやindex.rbのメイン部で行っていたサブスレッドの生成のあと、HTTPサーバを起動しています。Ctrl-Cなどでシグナルを使ってサーバを終了させると、クロウラの仕事の合間に終了します。

クロウラとインデクサが一つプロセスでは、Dripの意味がないのではないか?という気がしなくもないですが、デスクトップのアプリケーションのように起動は簡単になりました。関連するプロセスが少ないのでデーモン化するのも楽です。ところで、プロセス間でオブジェクトの配置を変えるのは簡単でしたよね。このプロセス構成が気に入らなければ、クロウラとインデクサを分けるようなプロセス構成にすることも簡単です。

Dripの章の最後に、久しぶりにdRuby(プログラムリストには現れないけど、MyDripへのアクセスで使ってました)をERBを使って小さなシステムを組み立てました。dRuby、ERB、Rinda、Dripなどの私のライブラリは、あなたの手の中にある問題をあなた自身が解くのを支援できるように意図して作りました。どれも仕組みは単純でおもちゃみたいなライブラリですが、とても手軽にはじめることができます。
本当に大きな問題、たとえばメインメモリにも一つのマシンのディスクにも入りきらないようなデータを扱ったり、無数のクライアントを本当に同時にハンドリングしたり、そういうのには向かないかもしれませんが、自分のPCや家庭のネットワークにあるようなあなたのデータをつかってミニチュアを書くのにはぴったりなツール群です。この本で紹介したライブラリやその考え方があなたのデザインのバリエーションを増やすことになれば、本当にうれしいことです。

おしまい