@m_seki の

I like ruby tooから引っ越し

EventMachine入門を読みながら書くBartender入門 

keijinsonyaban.blogspot.jp

EM入門の日本語訳を読んで、初めてEMのコードを見ました。読みながらBartenderを説明するといいんじゃないだろうか。

飽きたので途中で公開します。

なによりも先に

今年もよろしくお願いします。

Introduction

BartenderはFiberを使ったシングルスレッド向けIOの多重化のライブラリです。
ユーザーのコードで様々なプロトコルに対応するためにBartender自体はIOの待ち合わせだけを担当します。

ポート10000で受信したデータを読んで、送り返すサーバーの例。

Bartender::ServerはTCPのacceptのループの準備をするコンビニエンスクラスです。*1

require 'bartender/bartender'

Bartender::Server.new(10000) do |client|
  while true
    begin
      data = Bartender._read(client, 255)
      Bartender._write(client, data)
    rescue
      client.close rescue nil
      break
    end
  end
end

Bartender.run

acceptのたびに与えられたブロックを実行するFiberを作ります。
複数のターミナルからtelnetして実験しましょう。

$ telnet localhost 10000
Trying ::1...
Connected to localhost.
Escape character is '^]'.
31
231
31
231
32
3^]
telnet> quit
Connection closed.

以下は空行が入力されるまで待ってからechoするバージョンです。Bartender::Reader, Writerというコンビニエンスクラスを使います。
複数の端末がechoサーバーに接続している状態で、それぞれ空行を読むまで正しく動いてる部分を確かめてください。

require 'bartender/bartender'

Bartender::Server.new(10000) do |client|
  reader = Bartender::Reader.new(client)
  writer = Bartender::Writer.new(client)
  
  while true
    begin
      data = reader.read_until("\r\n\r\n")
      writer.write(data)
    rescue
      client.close rescue nil
      break
    end
  end
end

Bartender.run

EMとちがうのは、プロトコルの実装などはユーザーのコード側で書くことを想定してるところです。任意のプロトコルはBartenderの上に乗せてくださいね。

Getting Started

require 'bartender/bartender'

Bartender.run

なにもしないBartenderです。Bartenderを使うには'bartender/bartender'をrequireしてください。
Bartender.runはIO待ちのループを行う無限ループです。1度目の繰り返し以降で待つものがなくなったとき、終了します。*2

Timers

Bartenderにはalarmが用意されてます。指定した時刻にコールバックを呼びます。

require 'bartender/bartender'

Bartender.context.alarm(Time.now + 1,
                        Bartender.context.method(:stop))

Bartender.run

1秒後にBartender.context.stopを呼びます。stopが呼ばれると、次のターンにrunは終了します。

contextはスレッドごとに作られる、Baretender::Contextを返すメソッドです。Bartenderの待ち合わせのコアとなる部分はContextの中にあります。
.contextを毎度書くのはめんどくさいので、よく使うメソッドはBaretnderモジュールのmodule_functionとして定義されてます。stopはたいてい使わないだろうと思って、module_functionになっていません。

require 'bartender/bartender'

Fiber.new do
  n = 0
  while true
    puts n
    Bartender.sleep(1)
    n += 1
  end
end.resume

Fiber.new do
  while true
    Bartender.sleep(3)
    puts 'Fizz'
  end
end.resume

Fiber.new do
  while true
    Bartender.sleep(5)
    puts 'Buzz'
  end
end.resume

Fiber.new do
  Bartender.sleep(15)
  Bartender.context.stop
end.resume

Bartender.run

単にFiberをsleepさせるメソッドも用意されています。周期的なタイマーはありませんが、上のように何度もsleepすれば実現できると思います。
sleepはBartenderのmodule_functionにもあります。alarmはコールバックを登録するけど、sleepは単に休みます。でもalarmはmodule_functionに入っていません。
なにが言いたいかというと、sleep推奨ってことです。BartenderはFiberの習作ですから、できるだけアプリケーションのコードにはコールバックを書かせたくないのです。
一直線の単純なコードを書いても、なんとなく複数のFiberが譲り合って動くようなものが私のテーマです。

sleep(0)を使うと、やることがなければただちに、もし別のIOがあればそのあとに、sleepが解除されます。これを利用すると、ひまな間はがんがん計算するぞ!といった繰り返しが書けます。
XtでいうWorkProc、EMならnext_tickに相当します。

require 'bartender/bartender'

Fiber.new do
  Bartender.sleep(2)
  Bartender.context.stop
end.resume

$n = 0
Fiber.new do
  while true
    Bartender.sleep(0)
    $n += 1
  end
end.resume

Bartender.run

p $n

ThreadTask

Fiberでやりくりしていても、やっぱり遅い処理は存在するので、Threadに逃がしたいものが出てきます。おまけのコンビニエンスクラスにThreadTaskを用意しました。
ThreadTaskはブロックを与えて生成します。直ちにサブスレッドでブロックを実行します。タスクのvalueを呼ぶと、Thread#valueのように処理を待ってから値を返します。
ThreadTask#valueがブロックしそうなときは、別のFiberに実行権を譲ります。

そうそう。Bartender::ThreadTask.newのスペルが長いので、Bartender.task(&blk)で生成できるようにしました。

require 'bartender/bartender'

queue = Queue.new
Thread.new do
  5.times do |n|
    sleep 0.3
    queue.push([:que, n])
  end
end

Fiber.new do
  tt = Bartender.task {sleep 2; 'hello 2'}
  p tt.value
end.resume

Fiber.new do
  tt = Bartender.task {sleep 3; 'hello 3'}
  p tt.value
end.resume

Fiber.new do
  tt = Bartender.task {raise('hello 0')}
  (tt.value rescue $!).tap {|it| p it}
end.resume

Fiber.new do
  tt = Bartender.task {sleep 1; 'hello 1'}
  p tt.value
end.resume

Fiber.new do
  ary = 5.times.collect {
    Bartender.task {queue.pop}
  }
  ary.each { |x|
    p x.value
  }
end.resume

Bartender.run

Clients

飽きてきた!

require 'bartender/bartender'

Fiber.new do
  while true
    p :tick
    Bartender.sleep(0.05)
  end
end.resume

Fiber.new do
  c = Bartender.tcp_socket('www.druby.org', 80)
  writer = Bartender::Writer.new(c)
  reader = Bartender::Reader.new(c)

  writer.write("GET / HTTP/1.1\r\nHost: www.druby.org\r\n\r\n")
  while true
    puts reader.readln.size
  end
rescue
  c.close
  Bartender.context.stop
end.resume

Bartender.run

*1:WEBrickを読みながら書きました。

*2:この仕様は正しいのかなあ。一回目も終了するべきかも。