@m_seki の

I like ruby tooから引っ越し

よくあるreduce

名前と値の組の巨大な集合が、名前順にソートされているときに、名前ごとにグルーピングして処理したいんだけど、値の集まりを本物の配列じゃなくてイテレータにしたいときがあるでしょ?

んで、Generatorのようにnextで要素を一つずつたどれるようなstreamがあるとしたら、制御構造はどれもいっしょになるだろう、ということでmix-inもできそうなmodule書いてみた。

require 'enumerator'

module Reduce
  module_function
  def reduce(stream)
    curr = stream.next rescue []
    while curr[0]
      key = curr[0]
      enum = enum_for(:each_value, stream, key, curr)
      yield(curr[0], enum)
      enum.each {}
    end
  end

  module_function
  def each_value(stream, key, curr)
    while curr[0] == key
      yield(curr[1])
      fwd = stream.next rescue []
      curr.replace(fwd)
    end
  end
end

streamをArrayとGeneratorで与えたらこんな風に。(nextしか必要ないから、ThreadとSizedQueueを使ってcallccのないGeneratorを作ってもよいかも)

  require 'generator'

  ary = []
  while s = gets
    s.scan(/\w+/) do |w|
      ary << [w, ARGF.path, ARGF.file.lineno]
    end
  end

  stream = ary.sort_by {|pair| pair[0]}

  Reduce.reduce(Generator.new(stream)) do |k, v|
    p [k, v.collect]
  end

popenとsortだったらこうか。Fooクラスがひどいけど、今は気にしない。

  require 'tempfile'

  class Foo
    def initialize(fp)
      @fp = fp
    end

    def next
      s = @fp.gets
      return [] unless s
      word, file, lineno = s.chomp.split("\t")
      [word, [file, lineno.to_i]]
    end
  end

  Tempfile.open("reduce") do |f|
    while s = gets
      s.scan(/\w+/) do |w|
        f.puts([w, ARGF.path, ARGF.file.lineno].join("\t"))
      end
    end
    f.flush
    
    IO.popen("sort #{f.path}", "r") do |f|
      Reduce.reduce(Foo.new(f)) do |k, v|
        p [k, v.collect]
      end
    end
  end

まあreduceの制御自体は大丈夫そう。むしろ、問題は外部イテレータを持つstreamな気がする。先日のTokyoCabinet用のstreamならこんな感じ。はっ。上のpopen("sort")もこういうクラスに閉じ込めた方がいいかもしれん。

class TokyoBayReduce
  include Reduce
  def initialize(bdb)
    @cursor = bdb.cursor
    @cursor.first
  end

  def next
    @cursor.next
    [@cursor.key, @cursor.val]
  end

  def reduce
    super(self)
  end
end
  require 'tempfile'

  class PipeSortReducer
    include Reduce
    def initialize(fname)
      @fname = fname
    end

    def reduce
      IO.popen("sort #{@fname}", "r") do |fp|
        @fp = fp
        super(self)
      end
    end

    def next
      s = @fp.gets
      return [] unless s
      word, file, lineno = s.chomp.split("\t")
      [word, [file, lineno.to_i]]
    end
  end

  Tempfile.open("reduce") do |f|
    while s = gets
      s.scan(/\w+/) do |w|
        f.puts([w, ARGF.path, ARGF.file.lineno].join("\t"))
      end
    end
    f.flush

    PipeSortReducer.new(f.path).reduce do |k, v|
      p [k, v.collect]
    end
  end