@m_seki の

I like ruby tooから引っ越し

エンタープライズアジャイル勉強会の資料

エンタープライズアジャイル勉強会という勉強会に呼んでいただけたので資料公開しますね。広島の旅費が苦しいのでこういうのうれしい!

 

Gumroadからの課金勢のみなさんへ

 

お話したこと

 

反復開発は全工程やって1ターンだよっていうみんなが知ってる(けど、ある方面のScrumではやられていないことが多いらしい)ことを45分間ずっと説明しました。

 

その結果チームの境界(サブシステムとか工程別チームとか)がどうでもよくなると思う(実感してる)んだけど、Scurm Incの説明だと階層がたくさんある例を示していたし、永田さんはQA部門にこだわりがあるのがおもしろいなー。自分も含めて、みんな自分の状況が好きなんだな。いいことだ。

 

再演バイト待ってます。 (再演したいんじゃなくて、RubyKaigiの旅費かせぎたいので!)

 

speakerdeck.com

合わせて読みたい

 

 

EventMachine入門を読みながら書くBartender入門 

keijinsonyaban.blogspot.jp

EM入門の日本語訳を読んで、初めてEMのコードを見ました。読みながらBartenderを説明するといいんじゃないだろうか。

飽きたので途中で公開します。

なによりも先に

今年もよろしくお願いします。

Introduction

BartenderはFiberを使ったシングルスレッド向けIOの多重化のライブラリです。
ユーザーのコードで様々なプロトコルに対応するためにBartender自体はIOの待ち合わせだけを担当します。

ポート10000で受信したデータを読んで、送り返すサーバーの例。

Bartender::ServerはTCPのacceptのループの準備をするコンビニエンスクラスです。*1

require 'bartender/bartender'

Bartender::Server.new(10000) do |client|
  while true
    begin
      data = Bartender._read(client, 255)
      Bartender._write(client, data)
    rescue
      client.close rescue nil
      break
    end
  end
end

Bartender.run

acceptのたびに与えられたブロックを実行するFiberを作ります。
複数のターミナルからtelnetして実験しましょう。

$ telnet localhost 10000
Trying ::1...
Connected to localhost.
Escape character is '^]'.
31
231
31
231
32
3^]
telnet> quit
Connection closed.

以下は空行が入力されるまで待ってからechoするバージョンです。Bartender::Reader, Writerというコンビニエンスクラスを使います。
複数の端末がechoサーバーに接続している状態で、それぞれ空行を読むまで正しく動いてる部分を確かめてください。

require 'bartender/bartender'

Bartender::Server.new(10000) do |client|
  reader = Bartender::Reader.new(client)
  writer = Bartender::Writer.new(client)
  
  while true
    begin
      data = reader.read_until("\r\n\r\n")
      writer.write(data)
    rescue
      client.close rescue nil
      break
    end
  end
end

Bartender.run

EMとちがうのは、プロトコルの実装などはユーザーのコード側で書くことを想定してるところです。任意のプロトコルはBartenderの上に乗せてくださいね。

Getting Started

require 'bartender/bartender'

Bartender.run

なにもしないBartenderです。Bartenderを使うには'bartender/bartender'をrequireしてください。
Bartender.runはIO待ちのループを行う無限ループです。1度目の繰り返し以降で待つものがなくなったとき、終了します。*2

Timers

Bartenderにはalarmが用意されてます。指定した時刻にコールバックを呼びます。

require 'bartender/bartender'

Bartender.context.alarm(Time.now + 1,
                        Bartender.context.method(:stop))

Bartender.run

1秒後にBartender.context.stopを呼びます。stopが呼ばれると、次のターンにrunは終了します。

contextはスレッドごとに作られる、Baretender::Contextを返すメソッドです。Bartenderの待ち合わせのコアとなる部分はContextの中にあります。
.contextを毎度書くのはめんどくさいので、よく使うメソッドはBaretnderモジュールのmodule_functionとして定義されてます。stopはたいてい使わないだろうと思って、module_functionになっていません。

require 'bartender/bartender'

Fiber.new do
  n = 0
  while true
    puts n
    Bartender.sleep(1)
    n += 1
  end
end.resume

Fiber.new do
  while true
    Bartender.sleep(3)
    puts 'Fizz'
  end
end.resume

Fiber.new do
  while true
    Bartender.sleep(5)
    puts 'Buzz'
  end
end.resume

Fiber.new do
  Bartender.sleep(15)
  Bartender.context.stop
end.resume

Bartender.run

単にFiberをsleepさせるメソッドも用意されています。周期的なタイマーはありませんが、上のように何度もsleepすれば実現できると思います。
sleepはBartenderのmodule_functionにもあります。alarmはコールバックを登録するけど、sleepは単に休みます。でもalarmはmodule_functionに入っていません。
なにが言いたいかというと、sleep推奨ってことです。BartenderはFiberの習作ですから、できるだけアプリケーションのコードにはコールバックを書かせたくないのです。
一直線の単純なコードを書いても、なんとなく複数のFiberが譲り合って動くようなものが私のテーマです。

sleep(0)を使うと、やることがなければただちに、もし別のIOがあればそのあとに、sleepが解除されます。これを利用すると、ひまな間はがんがん計算するぞ!といった繰り返しが書けます。
XtでいうWorkProc、EMならnext_tickに相当します。

require 'bartender/bartender'

Fiber.new do
  Bartender.sleep(2)
  Bartender.context.stop
end.resume

$n = 0
Fiber.new do
  while true
    Bartender.sleep(0)
    $n += 1
  end
end.resume

Bartender.run

p $n

ThreadTask

Fiberでやりくりしていても、やっぱり遅い処理は存在するので、Threadに逃がしたいものが出てきます。おまけのコンビニエンスクラスにThreadTaskを用意しました。
ThreadTaskはブロックを与えて生成します。直ちにサブスレッドでブロックを実行します。タスクのvalueを呼ぶと、Thread#valueのように処理を待ってから値を返します。
ThreadTask#valueがブロックしそうなときは、別のFiberに実行権を譲ります。

そうそう。Bartender::ThreadTask.newのスペルが長いので、Bartender.task(&blk)で生成できるようにしました。

require 'bartender/bartender'

queue = Queue.new
Thread.new do
  5.times do |n|
    sleep 0.3
    queue.push([:que, n])
  end
end

Fiber.new do
  tt = Bartender.task {sleep 2; 'hello 2'}
  p tt.value
end.resume

Fiber.new do
  tt = Bartender.task {sleep 3; 'hello 3'}
  p tt.value
end.resume

Fiber.new do
  tt = Bartender.task {raise('hello 0')}
  (tt.value rescue $!).tap {|it| p it}
end.resume

Fiber.new do
  tt = Bartender.task {sleep 1; 'hello 1'}
  p tt.value
end.resume

Fiber.new do
  ary = 5.times.collect {
    Bartender.task {queue.pop}
  }
  ary.each { |x|
    p x.value
  }
end.resume

Bartender.run

Clients

飽きてきた!

require 'bartender/bartender'

Fiber.new do
  while true
    p :tick
    Bartender.sleep(0.05)
  end
end.resume

Fiber.new do
  c = Bartender.tcp_socket('www.druby.org', 80)
  writer = Bartender::Writer.new(c)
  reader = Bartender::Reader.new(c)

  writer.write("GET / HTTP/1.1\r\nHost: www.druby.org\r\n\r\n")
  while true
    puts reader.readln.size
  end
rescue
  c.close
  Bartender.context.stop
end.resume

Bartender.run

*1:WEBrickを読みながら書きました。

*2:この仕様は正しいのかなあ。一回目も終了するべきかも。

Fibered TupleSpace

Rindaをシングルスレッドで書けるかやってみたら書けそうな感じ。使い道ないけど!

まず、includeしたMonitorMixinの抜き方がわからなかったので、どうしようかと思ったけど、super()しなければ良いことに気づいた。
それから、synchronizeはただのyieldにした。

module Rinda
  class TupleSpace
    def initialize(period=5)
      @bag = TupleBag.new
      @read_waiter = TupleBag.new
      @take_waiter = TupleBag.new
      @notify_waiter = TupleBag.new
      @period = period
      @keeper = nil
    end

    def synchronize
      yield
    en

あとはMonitoroMixinのnew_condが書ければ良さそう。Rindaの使い方の場合、一つのtake/readを待つのは一つの実行主体だけなので、簡単なイベントを作って逃げることした。

module Rinda
  class Event
    def initialize
      @waiter = []
    end
    
    def wait
      @waiter << Fiber.current.method(:resume)
      Fiber.yield
    end

    def signal
      return if @waiter.empty? # これは起こらないはず。たぶん。
      @waiter.shift.call
    end
  end

  class TupleSpace
    def new_cond
      Event.new
    end
  end

忘れてたけど、notifyって操作があって(なんで追加したんだろう)がQueueを使ってるのでFiber用のQueueに交換。

module Bartender
  class Queue
    def initialize
      @reader = []
      @queue = []
    end

    def push(it)
      if @reader.empty?
        @queue << it
      else
        @reader.shift.call(it)
      end
    end
    
    def pop
      if @queue.empty?
        @reader << Fiber.current.method(:resume)
        Fiber.yield
      else
        @queue.shift
      end
    end
  end
end

module Rinda
  class NotifyTemplateEntry < TemplateEntry
    def initialize(place, event, tuple, expires=nil)
      ary = [event, Rinda::Template.new(tuple)]
      super(ary, expires)
      @queue = Bartender::Queue.new
      @done = false
    end
  end

動いたっぽい。でも持ってるテストはスレッドじゃないと試せないのばっかり。どうしよう。

gist.github.com

あわせて課金したい

今年もよろしくお願いします。

エラスティックリーダーシップに寄稿しました

エラスティックリーダーシップ ―自己組織化チームの育て方に寄稿しました。

 

無理を言って二つエッセイを載せてもらいました。(三つはダメだって。稿料は一つ分です)

 

4月にやったCookpad Tech Kitchen #7 〜 理想の開発現場の「ふつう」のお話 〜 - connpassでも話したんだけど、「すぐできること」を書きました。

 

すごい話はもういいかな、と思って。

(すごくない、リーダーじゃない)自分のとる態度にによってチームが変わる(かもしれない)感じが伝わるといいんだけど。サブタイトルに自己組織化って書いてあるくらいなので、各人がチームを変えていけよって話だぞ、わかりにくいけど!もしみんなが変えられるなら、そのときのリーダーは誰だろうね。

組織って自分のことだよ、きっと。

 

あわせて課金したい

 

 

cookpad.connpass.com

 

 

エラスティックリーダーシップ ―自己組織化チームの育て方

エラスティックリーダーシップ ―自己組織化チームの育て方

 

 

Fiber版Tiny dRubyサーバーの説明

こわいのでタイトルからFiberを抜いてみました。
Bartenderの続きです。

selectぽいソケットの待ち合わせとFiber間のRdvによる同期を持つ例です。dRubyプロトコルを話しますが、特定のメソッドにしか対応できません。dRubyプロトコルにするメリットはクライアントを書かなくてよいところです。irb -rdrbでですぐに実験できるからです。

bartender/tiny_drb.rb at master · seki/bartender · GitHub

通常のdRubyRMIごとに別スレッドで動きますが、このサンプルはFiber / Bartenderのサンプルなので、無理やりシングルスレッドで動かしてます。

require 'bartender/bartender'

class Rdv
  def initialize
    @reader = []
    @queue = []
  end

  def push(it)
    if @reader.empty?
      @queue << [it, Fiber.current.method(:resume)]
      return Fiber.yield
    end

    @reader.shift.call(it)
  end

  def pop
    if @queue.empty?
      @reader << Fiber.current.method(:resume)
      return Fiber.yield
    end

    value, fiber = @queue.shift
    fiber.call
    return value
  end
end

module DRb
  class DRbUnknown
    def initialize(err, buf)
      @data = buf
    end

    def _dump(lv)
      @data
    end
  end
end

class DRbEchoServer
  def initialize(port)
    @rdv = Rdv.new
    Bartender::Server.new(port) do |soc|
      begin
        reader = Bartender::Reader.new(soc)
        writer = Bartender::Writer.new(soc)
        while true
          _, msg, argv = req_drb(reader)
          case msg
          when 'push'
            value = @rdv.push(argv)
          when 'pop'
            value = @rdv.pop
          else
            value = msg
          end
          reply_drb(writer, true, value)
        end
      rescue
        p $!
      end
    end
  end

  def req_drb(reader)
    ref = load(reader, false)
    msg = load(reader)
    argc = load(reader)
    argv = argc.times.collect { load(reader) }
    block = load(reader, false)
    [ref, msg, argv]
  end

  def reply_drb(writer, succ, result)
    writer.write(dump(succ), true)
    writer.write(dump(result), true)
    writer.flush
  end

  def dump(obj)
    str = Marshal.dump(obj) rescue Marshal.dump(nil)
    [str.size].pack('N') + str
  end

  def load(reader, marshal=true)
    sz = reader.read(4)
    sz = sz.unpack('N')[0]
    data = reader.read(sz)
    return data unless marshal
    begin
      Marshal.load(data)
    rescue
      DRb::DRbUnknown.new($!, data)
    end
  end
end

DRbEchoServer.new(12345)
Bartender.run

Rdvは前回説明したのと同じなので省略。このdRubyサーバーは druby://localhost:12345 のURIでサービスを提供します。

ro = DRbObject.new_with_uri('druby://localhost:12345')
ro.hello

こんな感じで利用できます。
このオブジェクトはメソッドを呼ぶと、その名前を返す、Echoのように働きます。
ただし、pushとpopだけはRdvへ委譲されます。

  • push(obj) -- オブジェクトをRdvにpushします。popされていなければ、pushはpopが起こるまでブロックします。
  • pop -- Rdvにpushされているオブジェクトを返します。pushされていなければ、popはpushされるまでブロックします。

たくさんの端末を用意して、それぞれirbなどでpush, popなどのメソッドを呼び出してください。pushとpopが揃うまでブロックする様子や、その間でもEchoのように動く様子が確認できます。

Bartenderのおまけライブラリのおかげで肝心な部分が見えませんので、そこを少し解説します。

    Bartender::Server.new(port) do |soc|
      ...
    end

Bartender::ServerはWEBrickIPv6, v4対応のサーバーポートの用意のコードをパクって作った、TCPServerのちょっと大げさなやつです。
引数はポート番号とブロックです。acceptすると、ブロックを実行するFiberを一つ起こします。acceptして得られたソケットをブロックに渡します。
Serverの内部ではBartenderのselectを使って、ブロックせず、ブロック風にクライアントからの接続を待ちます。

        reader = Bartender::Reader.new(soc)
        writer = Bartender::Writer.new(soc)

ReaderとWriterはバッファ付きのIOを提供します。readでブロックしそうな時は別のFiberに実行権を移します。Readerはnバイト読み込みの他に、一行読み込みなどもできます。
Writerはある程度出力バッファに蓄えてからwriteすることもできます。writeでブロックしそうな時は別のFiberに実行権を移します。

Fiberのスイッチが発生するのは、Serverのaccept待ち、クライアントごとに生成されるFiberでのread/write、呼び出したメソッドがpush/popだった際のRdvでの待ちです。
selectのIO待ちとRdvの制御の待ちが管理されてるのがわかるでしょうか。わかんないか。

私が解きたかったのは、Fiber側のコードが単純な処理の並びになっていること(流れを単純にかけるようになること)でした。この例では、dRubyのリクエストを読み、処理して、レスポンスを返す、
という処理が流れが、分断されずに記述できています(と心の中で思う)。

  def req_drb(reader)
    ref = load(reader, false)
    msg = load(reader)
    argc = load(reader)
    argv = argc.times.collect { load(reader) }
    block = load(reader, false)
    [ref, msg, argv]
  end

こういうところ(loadでブロックするかもしれない)とか、

        while true
          _, msg, argv = req_drb(reader)
          case msg
          when 'push'
            value = @rdv.push(argv)
          when 'pop'
            value = @rdv.pop
          else
            value = msg
          end
          reply_drb(writer, true, value)
        end

こういうところ(雑なwhileループ内でもブロックするし、Rdv待ちもあるし)とかが、まるでスレッドみたいな大雑把さで記述でき(てるでしょ?)る。

あわせて課金したい

今年もよろしくお願いします。

BartenderのAPIで練習するFiberの使い方

Fiberの使い道を考えるために、selectと組み合わせて処理を単純に記述できる(のではないか、ということを確かめるために)部品を作ってみました。

GitHub - seki/bartender: Async I/O using Ruby Fiber

selectのラッパー

まずFiberと関係ない部分です。selectの簡単なラッパーです。今後、Ruby本体にもっと良いものが載り、このレイヤーは後で捨てられるだろう、と考えています。

module Bartender
  class Context
    def []=(event, fd, callback)
      # イベント種(:read, :write)とfdに、コールバックを登録します。
      # 一つの組みに、一つのコールバックだけ登録できます。
      # コールバックされても消えません
    end

    def delete(event, fd)
      # コールバックを削除します
    end

    def alarm(time, callback)
      # 指定時刻を過ぎたら呼ばれるコールバックを登録します
      # コールバックされると消えます
      # 戻り値はdelete_alaram()で使います
    end

    def delete_alarm(entry)
      # alarmを削除します
    end

    def run
      # selectのイベントループを回します
      # コールバックがなくなったら止まります
    end

    def stop
      # selectのイベントループを止めます
    end
  end
end

Xtのころの経験から一つの事象に一つだけコールバックを登録できるようにしました。複数でもいいんだけど、複数入れられるものはこの上に被せれば良いだろうから、コア部分は一つだけにしました。
複数登録できる場合の発火する順序などにまつわるイヤなバグの思い出があったので、アプリケーション層で管理してもらう方がいいかなって判断しました。

サーバーのコールバックは次のように登録します。

  context[:read, soc] = Proc.new do
    client = soc.accept
    # いろいろ
  end

  context.run

Fiberと組み合わせたAPI

それぞれのコールバックにFiber.current.method(:resume)を与えることで、Fiberの実行権をブロックしそうな時に手放すことができるだろう、というのを確認するAPIです。
少しだけ応用層になります。

module Bartender
  class Context
   
    def sleep(sec)
      # sec秒間、止まります。スレッドは止まらず、別のFiberが実行されます。
      alarm(Time.now + sec, Fiber.current.method(:resume))
      Fiber.yield
    end

    def wait_io(event, fd)
      # fdがreadable, あるいは writableになるまで止まります。
      # スレッドは止まらず、別のFiberが実行されます。
      self[event, fd] = Fiber.current.method(:resume)
      Fiber.yield
    ensure
      delete(event, fd)
    end

    def wait_io_timeout(event, fd, timeout)
      # timeoutがあるwait_ioです。
      method = Fiber.current.method(:resume)
      entry = alarm(Time.now + timeout, Proc.new {method.call(:timeout)})
      self[event, fd] = method
      raise(TimeoutError) if Fiber.yield == :timeout
    ensure
      delete(event, fd)
      delete_alarm(entry)
    end

    def wait_readable(fd); wait_io(:read, fd); end
    def wait_writable(fd); wait_io(:write, fd); end

    def _read(fd, sz)
      # ブロッキング風、ノンブロッキングなreadです。
      return fd.read_nonblock(sz)
    rescue IO::WaitReadable
      wait_readable(fd)
      retry
    end

    def _write(fd, buf)
      # ブロッキング風、ノンブロッキングなwriteです。
      return fd.write_nonblock(buf)
    rescue IO::WaitWritable
      wait_writable(fd)
      retry
    end
  end
end

しつこいけど、sleepの実装を説明します。sleepはalarmでできています。

    def sleep(sec)
      alarm(Time.now + sec, Fiber.current.method(:resume))
      Fiber.yield
    end

2行でできています。

  1. sec秒後にFiber.current.method(:resume)を呼ぶようにalarmを登録します。
  2. Fiber.yieldで実行権を手放します。

時刻を過ぎたらresumeが呼ばれて、yieldしところから再開します。結果、sec秒間止まることができます。


その他のAPIの実装もほとんど一緒です。
なお、_read, _writeにtimeoutがないのは実装した時期によるもの(alarm, sleepは後から作ったから)なので、そのうちtimeoutが指定できるようにするかもしれません。

最後に、この中で一番長いwait_io_timeoutを説明します。一番自信がないAPIです。

    def wait_io_timeout(event, fd, timeout)
      method = Fiber.current.method(:resume)
      entry = alarm(Time.now + timeout, Proc.new {method.call(:timeout)})
      self[event, fd] = method
      raise(TimeoutError) if Fiber.yield == :timeout
    ensure
      delete(event, fd)
      delete_alarm(entry)
    end
  1. sleepと同様にalarmを設定します。ただし、登録するコールバックはresumeに引数:timeoutを与えるProcです
  2. [event, fd]の組みにresumeを登録
  3. Fiber.yieldします。もしも戻り値が:timeoutだったら例外を発生させます
  4. ensureでそれぞれのコールバックを削除します。

alarmのコールバックは発火した後は削除されるのだけど、IO待ちのコールバックで再開したあとに、またalarmのコールバックが発生すると困るので明示的に削除してます。すでに削除してあったら何も起きないので雑にdelete_alarmしてます。

もしもスレッドだったら、IO待ち起因のコールバックとalarm起因のコールバックが同時に発生したらどうしよう、とか、イベント取り外し中にいやなこと起きないかなあ、とか、心配するところですが、Fiberだから大丈夫。


あわせて読みたい

改訂2版 パーフェクトRubydRubyによる分散・WebプログラミングThe dRuby Book: Distributed and Parallel Computing with Ruby

パーフェクトRubyの改訂版でもFiberの記述は増えていないとのこと。

iPod nano 1 to 7

iPod nanoの第一世代、しばらく使ってなかったんだけど久しぶりにiTunesにつないだら「交換修理対象」だったことがわかって交換しました。

 

iPod nano (第 1 世代) 交換プログラム - Apple サポート

 

Appleに電話して、ヤマト便の回収の予約をして、すぐに現行機が届きました。届いたのはシルバーで、Lightningケーブルだけが付属していました。(色はたぶん選べません)

 

Apple iPod nano 16GB 第7世代 2015年モデル シルバー MKN22J/A

Apple iPod nano 16GB 第7世代 2015年モデル シルバー MKN22J/A

 

 

 

自分が一番音楽を聴くのはクルマの中です。カーステレオはBluetoothがつながるので、ここ数年はiPhone 4sを音楽プレイヤーとして使ってましたが、現行機のnanoもBluetoothが使えるので今はnanoと交代です。

 

第一世代から12年くらい経ってもnanoっぽい。第一世代から継承してる現行機のよいところを自慢します。

  • 軽い
  • 小さすぎない
  • 物理ボタンがある

もちろん、もうクリックホイールはないんだけど、側面にEarPodsについてるボタンと同じものがついています。音量調整の他に、再生、停止、前後の曲への移動ができます。そうそう。シェイクするとシャッフルします。歩きながら手探りだけで操作できます。

 

iPhoneで音楽を聴くのと比べるとこんなところがよいです。

  • 音楽が割り込まれない
  • 通信容量やバッテリー残量が気にならない
  • わざわざ音楽が聴きたくなる

ダメなところはGUI。最近のiPhoneに慣れてしまうと、液晶の解像度の低さに驚きます。あと古臭いGUIパーツもあと10年したらレトロ調になるんだろうなーと自分を納得させるイマイチさ。でも、ネガティブな要素はそれくらいかなあ。

 

現行機といってもここ数年は更新がないんですよね、nano。このままオワコンなのかなあ。128GBくらいのが出てくれると全部入ってうれしいのになー。

 

まとめ 

nano、音楽を聴きたい人におすすめ。

 

 おまけ

METAFIVE買おうと思ってiTunesストア見にいったら800円だったから買っちゃった。

山本潤子 ゴールデン☆ベスト

山本潤子 ゴールデン☆ベスト