@m_seki の

I like ruby tooから引っ越し

forkしたら回収されるオブジェクトが欲しい

forkしたあと、DRbのコネクションプールが残ってる(つながったまま)のは嫌そう

execしてくれれば閉じられるけど...

いただいたPRは次のdRubyのメソッド呼び出しをトリガーにしてpidを見てcloseするものでした。dRubyRMIしないとずっと残ったままなので、それは多分ダメ。

fork後のThreadの仕様

たしかサブスレッド(forkしたスレッド以外?)はすべて終了する。これを使えば、fork時に回収されるようになりそう。

アクターがマイブームのころにスレッドをオブジェクト風にする素振りをしていたけど、まさか実用品として復活することになるとは!
今回は即座に(同期して)メソッドの結果が欲しいので、そのような同期メカニズムを書いて、その中にスレッドを作るようにした。
(つまり今回は並行性ではなく、forkのタイミングで消えてもらう情報を管理するためにスレッドを使う)

class ThreadObject
    include MonitorMixin

    def initialize(&blk)
      super()
      @wait_ev = new_cond
      @req_ev = new_cond
      @res_ev = new_cond
      @status = :wait
      @req = nil
      @res = nil
      @thread = Thread.new(self, &blk)
    end

    def alive?
      @thread.alive?
    end

    def method_missing(msg, *arg, &blk)
      synchronize do
        @wait_ev.wait_until { @status == :wait }
        @req = [msg] + arg
        @status = :req
        @req_ev.broadcast
        @res_ev.wait_until { @status == :res }
        value = @res
        @req = @res = nil
        @status = :wait
        @wait_ev.broadcast
        return value
      end
    end

    def _execute()
      synchronize do
        @req_ev.wait_until { @status == :req }
        @res = yield(@req)
        @status = :res
        @res_ev.signal
      end   
    end
  end

サーバーに相当する処理をブロックで渡す。method_missingの情報が渡るので、それっぽっく処理する。
スレッドにHashを管理してもらって、そこにFileオブジェクトを預ける。forkするとGCのタイミングでcloseされるようになる。

  proxy = ThreadObject.new do |queue|
    dict = {}
    while true
      queue._execute do |message|
        case(message[0])
        when :[]= then
          dict[message[1]] = message[2]
        when :[] then
          dict[message[1]]
        when :size
          dict.size
        else
          nil
        end
      end
    end
  end

  20.times do |n|
    Thread.new(n) do |x|
      proxy[x] = File.open("rdv.rb")
    end
  end

  sleep 0.01
  p proxy.size

  system("lsof -p #{$$} | wc -l")

  fork {
    system("lsof -p #{$$} | wc -l")
    GC.start # ここでcloseされて減る
    system("lsof -p #{$$} | wc -l")
  }

  sleep 10

drb.rb中のプールの管理はこんな感じ。take(uri)とstore(conn)の二つのメソッドを提供する。fork後にはGCされる。

    def self.make_pool
      ThreadObject.new do |queue|
        pool = []
        while true
          queue._execute do |message|
            case(message[0])
            when :take then
              remote_uri = message[1]
              conn = nil
              new_pool = []
              pool.each do |c|
                if conn.nil? and c.uri == remote_uri
                  conn = c if c.alive?
                else
                  new_pool.push c
                end
              end
              pool = new_pool
              conn
            when :store then
              conn = message[1]
              pool.unshift(conn)
              pool.pop.close while pool.size > POOL_SIZE
              conn
            else
              nil
            end
          end
        end
      end
    end
    @pool_proxy = make_pool

fork後にFDがどれくらい残るか確かめるgist

ThreadObjectを入れる前後でFDの増減の様子を調べたときのスクリプト。今日もらったPRはdRubyRMIするタイミングでcloseできるようになった。ThreadObject版はRMIしない場合でもGCが起こればcloseされるようになったので、ちょっとまし。

gist.github.com