@m_seki の

I like ruby tooから引っ越し

Fibered TupleSpace

Rindaをシングルスレッドで書けるかやってみたら書けそうな感じ。使い道ないけど!

まず、includeしたMonitorMixinの抜き方がわからなかったので、どうしようかと思ったけど、super()しなければ良いことに気づいた。
それから、synchronizeはただのyieldにした。

module Rinda
  class TupleSpace
    def initialize(period=5)
      @bag = TupleBag.new
      @read_waiter = TupleBag.new
      @take_waiter = TupleBag.new
      @notify_waiter = TupleBag.new
      @period = period
      @keeper = nil
    end

    def synchronize
      yield
    en

あとはMonitoroMixinのnew_condが書ければ良さそう。Rindaの使い方の場合、一つのtake/readを待つのは一つの実行主体だけなので、簡単なイベントを作って逃げることした。

module Rinda
  class Event
    def initialize
      @waiter = []
    end
    
    def wait
      @waiter << Fiber.current.method(:resume)
      Fiber.yield
    end

    def signal
      return if @waiter.empty? # これは起こらないはず。たぶん。
      @waiter.shift.call
    end
  end

  class TupleSpace
    def new_cond
      Event.new
    end
  end

忘れてたけど、notifyって操作があって(なんで追加したんだろう)がQueueを使ってるのでFiber用のQueueに交換。

module Bartender
  class Queue
    def initialize
      @reader = []
      @queue = []
    end

    def push(it)
      if @reader.empty?
        @queue << it
      else
        @reader.shift.call(it)
      end
    end
    
    def pop
      if @queue.empty?
        @reader << Fiber.current.method(:resume)
        Fiber.yield
      else
        @queue.shift
      end
    end
  end
end

module Rinda
  class NotifyTemplateEntry < TemplateEntry
    def initialize(place, event, tuple, expires=nil)
      ary = [event, Rinda::Template.new(tuple)]
      super(ary, expires)
      @queue = Bartender::Queue.new
      @done = false
    end
  end

動いたっぽい。でも持ってるテストはスレッドじゃないと試せないのばっかり。どうしよう。

gist.github.com

あわせて課金したい

今年もよろしくお願いします。

エラスティックリーダーシップに寄稿しました

エラスティックリーダーシップ ―自己組織化チームの育て方に寄稿しました。

 

無理を言って二つエッセイを載せてもらいました。(三つはダメだって。稿料は一つ分です)

 

4月にやったCookpad Tech Kitchen #7 〜 理想の開発現場の「ふつう」のお話 〜 - connpassでも話したんだけど、「すぐできること」を書きました。

 

すごい話はもういいかな、と思って。

(すごくない、リーダーじゃない)自分のとる態度にによってチームが変わる(かもしれない)感じが伝わるといいんだけど。サブタイトルに自己組織化って書いてあるくらいなので、各人がチームを変えていけよって話だぞ、わかりにくいけど!もしみんなが変えられるなら、そのときのリーダーは誰だろうね。

組織って自分のことだよ、きっと。

 

あわせて課金したい

 

 

cookpad.connpass.com

 

 

エラスティックリーダーシップ ―自己組織化チームの育て方

エラスティックリーダーシップ ―自己組織化チームの育て方

 

 

Fiber版Tiny dRubyサーバーの説明

こわいのでタイトルからFiberを抜いてみました。
Bartenderの続きです。

selectぽいソケットの待ち合わせとFiber間のRdvによる同期を持つ例です。dRubyプロトコルを話しますが、特定のメソッドにしか対応できません。dRubyプロトコルにするメリットはクライアントを書かなくてよいところです。irb -rdrbでですぐに実験できるからです。

bartender/tiny_drb.rb at master · seki/bartender · GitHub

通常のdRubyRMIごとに別スレッドで動きますが、このサンプルはFiber / Bartenderのサンプルなので、無理やりシングルスレッドで動かしてます。

require 'bartender/bartender'

class Rdv
  def initialize
    @reader = []
    @queue = []
  end

  def push(it)
    if @reader.empty?
      @queue << [it, Fiber.current.method(:resume)]
      return Fiber.yield
    end

    @reader.shift.call(it)
  end

  def pop
    if @queue.empty?
      @reader << Fiber.current.method(:resume)
      return Fiber.yield
    end

    value, fiber = @queue.shift
    fiber.call
    return value
  end
end

module DRb
  class DRbUnknown
    def initialize(err, buf)
      @data = buf
    end

    def _dump(lv)
      @data
    end
  end
end

class DRbEchoServer
  def initialize(port)
    @rdv = Rdv.new
    Bartender::Server.new(port) do |soc|
      begin
        reader = Bartender::Reader.new(soc)
        writer = Bartender::Writer.new(soc)
        while true
          _, msg, argv = req_drb(reader)
          case msg
          when 'push'
            value = @rdv.push(argv)
          when 'pop'
            value = @rdv.pop
          else
            value = msg
          end
          reply_drb(writer, true, value)
        end
      rescue
        p $!
      end
    end
  end

  def req_drb(reader)
    ref = load(reader, false)
    msg = load(reader)
    argc = load(reader)
    argv = argc.times.collect { load(reader) }
    block = load(reader, false)
    [ref, msg, argv]
  end

  def reply_drb(writer, succ, result)
    writer.write(dump(succ), true)
    writer.write(dump(result), true)
    writer.flush
  end

  def dump(obj)
    str = Marshal.dump(obj) rescue Marshal.dump(nil)
    [str.size].pack('N') + str
  end

  def load(reader, marshal=true)
    sz = reader.read(4)
    sz = sz.unpack('N')[0]
    data = reader.read(sz)
    return data unless marshal
    begin
      Marshal.load(data)
    rescue
      DRb::DRbUnknown.new($!, data)
    end
  end
end

DRbEchoServer.new(12345)
Bartender.run

Rdvは前回説明したのと同じなので省略。このdRubyサーバーは druby://localhost:12345 のURIでサービスを提供します。

ro = DRbObject.new_with_uri('druby://localhost:12345')
ro.hello

こんな感じで利用できます。
このオブジェクトはメソッドを呼ぶと、その名前を返す、Echoのように働きます。
ただし、pushとpopだけはRdvへ委譲されます。

  • push(obj) -- オブジェクトをRdvにpushします。popされていなければ、pushはpopが起こるまでブロックします。
  • pop -- Rdvにpushされているオブジェクトを返します。pushされていなければ、popはpushされるまでブロックします。

たくさんの端末を用意して、それぞれirbなどでpush, popなどのメソッドを呼び出してください。pushとpopが揃うまでブロックする様子や、その間でもEchoのように動く様子が確認できます。

Bartenderのおまけライブラリのおかげで肝心な部分が見えませんので、そこを少し解説します。

    Bartender::Server.new(port) do |soc|
      ...
    end

Bartender::ServerはWEBrickIPv6, v4対応のサーバーポートの用意のコードをパクって作った、TCPServerのちょっと大げさなやつです。
引数はポート番号とブロックです。acceptすると、ブロックを実行するFiberを一つ起こします。acceptして得られたソケットをブロックに渡します。
Serverの内部ではBartenderのselectを使って、ブロックせず、ブロック風にクライアントからの接続を待ちます。

        reader = Bartender::Reader.new(soc)
        writer = Bartender::Writer.new(soc)

ReaderとWriterはバッファ付きのIOを提供します。readでブロックしそうな時は別のFiberに実行権を移します。Readerはnバイト読み込みの他に、一行読み込みなどもできます。
Writerはある程度出力バッファに蓄えてからwriteすることもできます。writeでブロックしそうな時は別のFiberに実行権を移します。

Fiberのスイッチが発生するのは、Serverのaccept待ち、クライアントごとに生成されるFiberでのread/write、呼び出したメソッドがpush/popだった際のRdvでの待ちです。
selectのIO待ちとRdvの制御の待ちが管理されてるのがわかるでしょうか。わかんないか。

私が解きたかったのは、Fiber側のコードが単純な処理の並びになっていること(流れを単純にかけるようになること)でした。この例では、dRubyのリクエストを読み、処理して、レスポンスを返す、
という処理が流れが、分断されずに記述できています(と心の中で思う)。

  def req_drb(reader)
    ref = load(reader, false)
    msg = load(reader)
    argc = load(reader)
    argv = argc.times.collect { load(reader) }
    block = load(reader, false)
    [ref, msg, argv]
  end

こういうところ(loadでブロックするかもしれない)とか、

        while true
          _, msg, argv = req_drb(reader)
          case msg
          when 'push'
            value = @rdv.push(argv)
          when 'pop'
            value = @rdv.pop
          else
            value = msg
          end
          reply_drb(writer, true, value)
        end

こういうところ(雑なwhileループ内でもブロックするし、Rdv待ちもあるし)とかが、まるでスレッドみたいな大雑把さで記述でき(てるでしょ?)る。

あわせて課金したい

今年もよろしくお願いします。