@m_seki の

I like ruby tooから引っ越し

BartenderのAPIで練習するFiberの使い方

Fiberの使い道を考えるために、selectと組み合わせて処理を単純に記述できる(のではないか、ということを確かめるために)部品を作ってみました。

GitHub - seki/bartender: Async I/O using Ruby Fiber

selectのラッパー

まずFiberと関係ない部分です。selectの簡単なラッパーです。今後、Ruby本体にもっと良いものが載り、このレイヤーは後で捨てられるだろう、と考えています。

module Bartender
  class Context
    def []=(event, fd, callback)
      # イベント種(:read, :write)とfdに、コールバックを登録します。
      # 一つの組みに、一つのコールバックだけ登録できます。
      # コールバックされても消えません
    end

    def delete(event, fd)
      # コールバックを削除します
    end

    def alarm(time, callback)
      # 指定時刻を過ぎたら呼ばれるコールバックを登録します
      # コールバックされると消えます
      # 戻り値はdelete_alaram()で使います
    end

    def delete_alarm(entry)
      # alarmを削除します
    end

    def run
      # selectのイベントループを回します
      # コールバックがなくなったら止まります
    end

    def stop
      # selectのイベントループを止めます
    end
  end
end

Xtのころの経験から一つの事象に一つだけコールバックを登録できるようにしました。複数でもいいんだけど、複数入れられるものはこの上に被せれば良いだろうから、コア部分は一つだけにしました。
複数登録できる場合の発火する順序などにまつわるイヤなバグの思い出があったので、アプリケーション層で管理してもらう方がいいかなって判断しました。

サーバーのコールバックは次のように登録します。

  context[:read, soc] = Proc.new do
    client = soc.accept
    # いろいろ
  end

  context.run

Fiberと組み合わせたAPI

それぞれのコールバックにFiber.current.method(:resume)を与えることで、Fiberの実行権をブロックしそうな時に手放すことができるだろう、というのを確認するAPIです。
少しだけ応用層になります。

module Bartender
  class Context
   
    def sleep(sec)
      # sec秒間、止まります。スレッドは止まらず、別のFiberが実行されます。
      alarm(Time.now + sec, Fiber.current.method(:resume))
      Fiber.yield
    end

    def wait_io(event, fd)
      # fdがreadable, あるいは writableになるまで止まります。
      # スレッドは止まらず、別のFiberが実行されます。
      self[event, fd] = Fiber.current.method(:resume)
      Fiber.yield
    ensure
      delete(event, fd)
    end

    def wait_io_timeout(event, fd, timeout)
      # timeoutがあるwait_ioです。
      method = Fiber.current.method(:resume)
      entry = alarm(Time.now + timeout, Proc.new {method.call(:timeout)})
      self[event, fd] = method
      raise(TimeoutError) if Fiber.yield == :timeout
    ensure
      delete(event, fd)
      delete_alarm(entry)
    end

    def wait_readable(fd); wait_io(:read, fd); end
    def wait_writable(fd); wait_io(:write, fd); end

    def _read(fd, sz)
      # ブロッキング風、ノンブロッキングなreadです。
      return fd.read_nonblock(sz)
    rescue IO::WaitReadable
      wait_readable(fd)
      retry
    end

    def _write(fd, buf)
      # ブロッキング風、ノンブロッキングなwriteです。
      return fd.write_nonblock(buf)
    rescue IO::WaitWritable
      wait_writable(fd)
      retry
    end
  end
end

しつこいけど、sleepの実装を説明します。sleepはalarmでできています。

    def sleep(sec)
      alarm(Time.now + sec, Fiber.current.method(:resume))
      Fiber.yield
    end

2行でできています。

  1. sec秒後にFiber.current.method(:resume)を呼ぶようにalarmを登録します。
  2. Fiber.yieldで実行権を手放します。

時刻を過ぎたらresumeが呼ばれて、yieldしところから再開します。結果、sec秒間止まることができます。


その他のAPIの実装もほとんど一緒です。
なお、_read, _writeにtimeoutがないのは実装した時期によるもの(alarm, sleepは後から作ったから)なので、そのうちtimeoutが指定できるようにするかもしれません。

最後に、この中で一番長いwait_io_timeoutを説明します。一番自信がないAPIです。

    def wait_io_timeout(event, fd, timeout)
      method = Fiber.current.method(:resume)
      entry = alarm(Time.now + timeout, Proc.new {method.call(:timeout)})
      self[event, fd] = method
      raise(TimeoutError) if Fiber.yield == :timeout
    ensure
      delete(event, fd)
      delete_alarm(entry)
    end
  1. sleepと同様にalarmを設定します。ただし、登録するコールバックはresumeに引数:timeoutを与えるProcです
  2. [event, fd]の組みにresumeを登録
  3. Fiber.yieldします。もしも戻り値が:timeoutだったら例外を発生させます
  4. ensureでそれぞれのコールバックを削除します。

alarmのコールバックは発火した後は削除されるのだけど、IO待ちのコールバックで再開したあとに、またalarmのコールバックが発生すると困るので明示的に削除してます。すでに削除してあったら何も起きないので雑にdelete_alarmしてます。

もしもスレッドだったら、IO待ち起因のコールバックとalarm起因のコールバックが同時に発生したらどうしよう、とか、イベント取り外し中にいやなこと起きないかなあ、とか、心配するところですが、Fiberだから大丈夫。


あわせて読みたい

改訂2版 パーフェクトRubydRubyによる分散・WebプログラミングThe dRuby Book: Distributed and Parallel Computing with Ruby

パーフェクトRubyの改訂版でもFiberの記述は増えていないとのこと。

iPod nano 1 to 7

iPod nanoの第一世代、しばらく使ってなかったんだけど久しぶりにiTunesにつないだら「交換修理対象」だったことがわかって交換しました。

 

iPod nano (第 1 世代) 交換プログラム - Apple サポート

 

Appleに電話して、ヤマト便の回収の予約をして、すぐに現行機が届きました。届いたのはシルバーで、Lightningケーブルだけが付属していました。(色はたぶん選べません)

 

Apple iPod nano 16GB 第7世代 2015年モデル シルバー MKN22J/A

Apple iPod nano 16GB 第7世代 2015年モデル シルバー MKN22J/A

 

 

 

自分が一番音楽を聴くのはクルマの中です。カーステレオはBluetoothがつながるので、ここ数年はiPhone 4sを音楽プレイヤーとして使ってましたが、現行機のnanoもBluetoothが使えるので今はnanoと交代です。

 

第一世代から12年くらい経ってもnanoっぽい。第一世代から継承してる現行機のよいところを自慢します。

  • 軽い
  • 小さすぎない
  • 物理ボタンがある

もちろん、もうクリックホイールはないんだけど、側面にEarPodsについてるボタンと同じものがついています。音量調整の他に、再生、停止、前後の曲への移動ができます。そうそう。シェイクするとシャッフルします。歩きながら手探りだけで操作できます。

 

iPhoneで音楽を聴くのと比べるとこんなところがよいです。

  • 音楽が割り込まれない
  • 通信容量やバッテリー残量が気にならない
  • わざわざ音楽が聴きたくなる

ダメなところはGUI。最近のiPhoneに慣れてしまうと、液晶の解像度の低さに驚きます。あと古臭いGUIパーツもあと10年したらレトロ調になるんだろうなーと自分を納得させるイマイチさ。でも、ネガティブな要素はそれくらいかなあ。

 

現行機といってもここ数年は更新がないんですよね、nano。このままオワコンなのかなあ。128GBくらいのが出てくれると全部入ってうれしいのになー。

 

まとめ 

nano、音楽を聴きたい人におすすめ。

 

 おまけ

METAFIVE買おうと思ってiTunesストア見にいったら800円だったから買っちゃった。

山本潤子 ゴールデン☆ベスト

山本潤子 ゴールデン☆ベスト

 

 

 

Fiberのための同期機構の作り方

Rubyのスレッドは素晴らしいと思うのですが、なぜか人気がありません。

Fiberで書けば人気が出ると思うので、ときどき調べています。多くのサンプルは次のようなジェネレーターっぽいものです。

fib = Fiber.new do
  a, b = 1, 1
  while true
    Fiber.yield(a)
    a, b = b, a + b
  end
end

10.times do |n|
  p [n, fib.resume]
end

Enumeratorなどの実装もこんな感じでできてるんでしょうね。
二つの実行主体(fibのFiberと10.times..してるメイン部分)はメイン部分がFiberを利用しているような構図です(??)。
resumeするべき相手のことを知ってるし、むしろFiberが資源みたいな感じ(???)。

マルチスレッドでのプログラミングのように、複数のFiber間で連携する雰囲気のコードを書くにはどうしたらよいでしょう。
スレッドでいうと次のようなQueueを介して同期するようなものです。

  require 'thread'

  q = SizedQueue.new(1)
  Thread.new do
    10.times do |n|
      sleep(rand)
      q.push(n)
      p [:push, n]
    end
  end
  Thread.new do
    10.times do |n|
      sleep(rand)
      p [:pop, q.pop]
    end
  end.join

スレッド間はQueueだけの付き合いで、相手の実行主体(この場合はThread)については興味がありません。
かっこいい!

こういった同期機構をFiberで書くにはどうしたらいいんでしょう。
プリミティブ自体をFiberで管理したりとかいろいろ試行錯誤したんだけど、うまくいきませんでした。(ヘタクソ)

require 'fiber'すると使用できる、Fiber.currentを見つけてからは簡単に記述できることがわかりました。
同期プリミティブを利用した実行主体をメモしてyield, resumeできるので、事前に関係者(関係するFiber)を知り合っておく必要がありません。*1

次の例はpushをするFiberとpopをするFiberが揃ったら、オブジェクトを一つpush側からpop側へ渡して、両者を再開させる同期機構(ランデブー)です。

require 'fiber'

class Rdv
  def initialize
    @reader = []
    @queue = []
  end

  def push(it)
    if @reader.empty?
      @queue << [it, Fiber.current.method(:resume)]
      return Fiber.yield
    end
    @reader.shift.call(it)
  end

  def pop
    if @queue.empty?
      @reader << Fiber.current.method(:resume)
      return Fiber.yield
    end

    value, fiber = @queue.shift
    fiber.call
    return value
  end
end


if __FILE__ == $0
  rdv = Rdv.new
  Fiber.new do 
    10.times do |n|
      rdv.push(n)
      p [:push, n]
    end
  end.resume
  Fiber.new do
    10.times do |n|
      p [:pop, rdv.pop]
    end
  end.resume
end

一般的な同期機構を作るイディオムは次のようになります。

  1. 呼び出したFIberをメモ
  2. Fiber.yield
  3. 再開されたらメモしておいたFiberにresume

1の前、あるいは、3の前に完了条件をチェックすると思います。

pushの場合はこういう意味です。

  1. 待っている人(pop側)がいないなら、Fiberをメモしてyield。このとき一緒に渡されたオブジェクトもメモします。
  2. 待っている人がいる場合、または再開されたとき、一つ取り出してresume

pop側はこういう気持ちです。

  1. データがないなら(push側がいないなら)、Fiberをメモしてyield
  2. データがある場合、または再開されたとき、一つ取り出してresume。このとき一緒にメモしたオブジェクトを渡します。

Threadのときと違うのは、条件を検査する間をロックする必要がないところです。Threadの場合は、待ってる人を全員を一斉に再開して、ロックを獲得できた実行主体だけが先に進みますが、Fiberでは同時に一つしか動かないのがわかっているので、そういった管理は必要ありません。ドキドキするけど。Fiberで書くときのキモはもしかすると、Thread脳で「これはやばい」と思う部分を「Fiberでは関係なかった」と押し殺すところかもしれません。

Bartender

https://github.com/seki/bartender

Bartenderというselectを抽象化するライブラリを書いています。これは突き詰めるとfdとreadable, writableの組みに一つだけコールバックを設定できるだけのライブラリです。
しかし、Fiberと組み合わせて、Fiber.current.method(:resume)をコールバックとして登録することで面白いもの、ブロッキングのread/write風にアプリケーションを書くと、ブロックしそうなときは他のFiberに実行権を渡しブロックしない、ブロッキング風ノンブロッキングIOを実現できます。

    def wait_io(event, fd)
      self[event, fd] = Fiber.current.method(:resume)
      Fiber.yield
    ensure
      delete(event, fd)
    end

    def wait_readable(fd); wait_io(:read, fd); end

    def _read(fd, sz)
      return fd.read_nonblock(sz)
    rescue IO::WaitReadable
      wait_readable(fd)
      retry
    end

read周りの実装を抜き出すとこうなります。(SSLだとreadでもWaitWritableが来るそうなので対応しないと)

アプリケーションの実際はまたあとで。

*1:この例ではFiber.current.method(:resume)を覚えていますが、単にFiber.currentでも大丈夫。