EM入門の日本語訳を読んで、初めてEMのコードを見ました。読みながらBartenderを説明するといいんじゃないだろうか。
飽きたので途中で公開します。
Introduction
BartenderはFiberを使ったシングルスレッド向けIOの多重化のライブラリです。
ユーザーのコードで様々なプロトコルに対応するためにBartender自体はIOの待ち合わせだけを担当します。
ポート10000で受信したデータを読んで、送り返すサーバーの例。
Bartender::ServerはTCPのacceptのループの準備をするコンビニエンスクラスです。*1
require 'bartender/bartender' Bartender::Server.new(10000) do |client| while true begin data = Bartender._read(client, 255) Bartender._write(client, data) rescue client.close rescue nil break end end end Bartender.run
acceptのたびに与えられたブロックを実行するFiberを作ります。
複数のターミナルからtelnetして実験しましょう。
$ telnet localhost 10000 Trying ::1... Connected to localhost. Escape character is '^]'. 31 231 31 231 32 3^] telnet> quit Connection closed.
以下は空行が入力されるまで待ってからechoするバージョンです。Bartender::Reader, Writerというコンビニエンスクラスを使います。
複数の端末がechoサーバーに接続している状態で、それぞれ空行を読むまで正しく動いてる部分を確かめてください。
require 'bartender/bartender' Bartender::Server.new(10000) do |client| reader = Bartender::Reader.new(client) writer = Bartender::Writer.new(client) while true begin data = reader.read_until("\r\n\r\n") writer.write(data) rescue client.close rescue nil break end end end Bartender.run
EMとちがうのは、プロトコルの実装などはユーザーのコード側で書くことを想定してるところです。任意のプロトコルはBartenderの上に乗せてくださいね。
Getting Started
require 'bartender/bartender' Bartender.run
なにもしないBartenderです。Bartenderを使うには'bartender/bartender'をrequireしてください。
Bartender.runはIO待ちのループを行う無限ループです。1度目の繰り返し以降で待つものがなくなったとき、終了します。*2
Timers
Bartenderにはalarmが用意されてます。指定した時刻にコールバックを呼びます。
require 'bartender/bartender' Bartender.context.alarm(Time.now + 1, Bartender.context.method(:stop)) Bartender.run
1秒後にBartender.context.stopを呼びます。stopが呼ばれると、次のターンにrunは終了します。
contextはスレッドごとに作られる、Baretender::Contextを返すメソッドです。Bartenderの待ち合わせのコアとなる部分はContextの中にあります。
.contextを毎度書くのはめんどくさいので、よく使うメソッドはBaretnderモジュールのmodule_functionとして定義されてます。stopはたいてい使わないだろうと思って、module_functionになっていません。
require 'bartender/bartender' Fiber.new do n = 0 while true puts n Bartender.sleep(1) n += 1 end end.resume Fiber.new do while true Bartender.sleep(3) puts 'Fizz' end end.resume Fiber.new do while true Bartender.sleep(5) puts 'Buzz' end end.resume Fiber.new do Bartender.sleep(15) Bartender.context.stop end.resume Bartender.run
単にFiberをsleepさせるメソッドも用意されています。周期的なタイマーはありませんが、上のように何度もsleepすれば実現できると思います。
sleepはBartenderのmodule_functionにもあります。alarmはコールバックを登録するけど、sleepは単に休みます。でもalarmはmodule_functionに入っていません。
なにが言いたいかというと、sleep推奨ってことです。BartenderはFiberの習作ですから、できるだけアプリケーションのコードにはコールバックを書かせたくないのです。
一直線の単純なコードを書いても、なんとなく複数のFiberが譲り合って動くようなものが私のテーマです。
sleep(0)を使うと、やることがなければただちに、もし別のIOがあればそのあとに、sleepが解除されます。これを利用すると、ひまな間はがんがん計算するぞ!といった繰り返しが書けます。
XtでいうWorkProc、EMならnext_tickに相当します。
require 'bartender/bartender' Fiber.new do Bartender.sleep(2) Bartender.context.stop end.resume $n = 0 Fiber.new do while true Bartender.sleep(0) $n += 1 end end.resume Bartender.run p $n
ThreadTask
Fiberでやりくりしていても、やっぱり遅い処理は存在するので、Threadに逃がしたいものが出てきます。おまけのコンビニエンスクラスにThreadTaskを用意しました。
ThreadTaskはブロックを与えて生成します。直ちにサブスレッドでブロックを実行します。タスクのvalueを呼ぶと、Thread#valueのように処理を待ってから値を返します。
ThreadTask#valueがブロックしそうなときは、別のFiberに実行権を譲ります。
そうそう。Bartender::ThreadTask.newのスペルが長いので、Bartender.task(&blk)で生成できるようにしました。
require 'bartender/bartender' queue = Queue.new Thread.new do 5.times do |n| sleep 0.3 queue.push([:que, n]) end end Fiber.new do tt = Bartender.task {sleep 2; 'hello 2'} p tt.value end.resume Fiber.new do tt = Bartender.task {sleep 3; 'hello 3'} p tt.value end.resume Fiber.new do tt = Bartender.task {raise('hello 0')} (tt.value rescue $!).tap {|it| p it} end.resume Fiber.new do tt = Bartender.task {sleep 1; 'hello 1'} p tt.value end.resume Fiber.new do ary = 5.times.collect { Bartender.task {queue.pop} } ary.each { |x| p x.value } end.resume Bartender.run
Clients
飽きてきた!
require 'bartender/bartender' Fiber.new do while true p :tick Bartender.sleep(0.05) end end.resume Fiber.new do c = Bartender.tcp_socket('www.druby.org', 80) writer = Bartender::Writer.new(c) reader = Bartender::Reader.new(c) writer.write("GET / HTTP/1.1\r\nHost: www.druby.org\r\n\r\n") while true puts reader.readln.size end rescue c.close Bartender.context.stop end.resume Bartender.run