@m_seki の

I like ruby tooから引っ越し

Pico

PicoはRBTreeとTokyo CabineのBDBを使って実装した、ストレージの素振りで、すでに動いてるアプリケーションをライブラリ化しようとしてるもの。keyもvalueも文字列。重複を許す(KeyValueストレージじゃないよ)のでMapReduceごっこも可能。なお、Nanoはkeyもvalueもオブジェクトの方向に修正中。


それをrinda_evalを使って、問題の領域ごとに複数のプロセスに分割する実験。領域ごとに調べた結果をあとで一つにまとめてeachする。eachをそれぞれまじめにRMIするとパフォーマンスが落ちるので、each_sliceでまとめて送ると良いみたい。マルチコアな環境ではバッチ処理を並列に処理できるかもしれない。

rinda_evalはforkを使って実現しているので、同じマシンでないと都合が悪い。複数のマシンでこれをやるには、コードの配布方法とか環境に依存するめんどくさいことを準備しなきゃならないから、気が乗らない。

require 'pico'
require 'rinda/tuplespace'
require 'rinda_eval'

def create_index(root)
  pico = Pico::InMemoryWOSynchronize.new
  Dir.glob(File.join(root, '**/*.{c,h,cpp,rb}')) do |path|
    File.read(path).scan(/\w\w+/).uniq.each do |word|
      next if /^\d/ =~ word
      pico.write(word, path)
    end
  end
  pico
end

def setup_tasks(ary, place)
  key = Time.now.to_f
  pid = Process.pid
  ary.each do |root|
    Rinda::rinda_eval(place) do |ts|
      _,_,_, dir = ts.take([:root, key, pid, nil])
      pico = create_index(dir)
      pico.extend(DRbUndumped)
      ts.write([:index, dir, pico])
      ts.read([:shutdown]) rescue exit
      [:done]
    end
    place.write([:root, key, pid, root])
  end
  ary.collect do |root|
    _,_, pico = place.take([:index, root, nil])
    pico
  end
end

class PicoStream
  def initialize(pico, buf=256)
    @queue = SizedQueue.new(buf * 2)
    Thread.new do
      pico.each_slice(buf) do |ary|
        ary.each do |k, v|
          @queue.push([k, v])
        end
      end
      @queue.push([:end, nil])
    end
    @key, @value = @queue.pop
  end
  attr_reader :key
  
  def pop
    return @key, @value
  ensure
    @key, @value = @queue.pop
  end
end

def pico_each(picos)
  ary = picos.collect {|pico| PicoStream.new(pico)}
  ary.delete_if {|pico| pico.key == :end}
  while ary.size > 0
    cur = ary.sort_by {|pico| pico.key}.first
    last = cur.key
    while last == cur.key
      yield(cur.pop)
    end
    ary.delete_if {|pico| pico.key == :end}
  end
  nil
end

DRb.start_service
ts = Rinda::TupleSpace.new
ary = setup_tasks(ARGV, ts)

last = nil
c = 0
pico_each(ary) do |k, v|
  if last != k
    p [last, c] if last
    last = k
    c = 1
  else
    c += 1
  end
end
p [last, c]