@m_seki の

I like ruby tooから引っ越し

ストリーム処理入門: KafkaをRubyで代替する

AIタイトルアシストなら大袈裟でも恥ずかしくない!

n月刊ラムダノートVol.4, No.1の記事を読むぞ

「手を動かして学ぶストリーム処理入門」でKafkaの気持ちを理解したくなりました。 でもKafkaを使うのはめんどくさいので全部Rubyで書いてみようと思います。

実験用のデータ

github.com

githubに実験用のデータが置いてある。親切だ!

ヘッダつきタブ区切りのテキストファイルに気象情報が書いてある。 本文を読み進めると、タブ区切りのまま使わないでJSON風のマップに変換してるようだ。 何度もデータ形式を変換する処理があるのが興味深い。 結局のところ元の情報の表現(カラムの順序)を知っている人が作るんだからタブ区切り(あるいはArray)のままでもいい気がする。

そこは本質じゃないので1ターン目にオブジェクトにしてHashに入れることにした。

require 'pp'
require 'time'

class TSV
  def self.load_file(fname, parser)
    stream = File.foreach(fname).lazy.map {|x| x.chomp.split("\t")}
    head = stream.next
    self.new(head, stream, parser)
  end

  def initialize(cols, stream, parser)
    @cols = cols
    @stream = stream
    @parser = parser
  end
  
  def next
    [@cols, @parser.zip(@stream.next).map {|m, v| m.call(v)}].transpose.to_h
  end
end

if __FILE__ == $0
  tsv = TSV.load_file('dataset-amedas/20220101-20220131-kushiro.tsv',
                      [Time.method(:parse), method(:Float), method(:Float)])
  while (it = tsv.next rescue nil)
    pp it
  end
end

こんな感じの出力になる

{"timestamp"=>2022-01-01 00:00:00 +0900,
 "temperature [°C]"=>-8.2,
 "rainfall [mm]"=>0.0}
{"timestamp"=>2022-01-01 01:00:00 +0900,
 "temperature [°C]"=>-8.6,
 "rainfall [mm]"=>0.0}
{"timestamp"=>2022-01-01 02:00:00 +0900,
 "temperature [°C]"=>-9.3,
 "rainfall [mm]"=>0.0}
...

なお、TSVは外部イテレータ風にした。

replaymanを真似する

実験用のツールにreplaymanというのがあるみたい。 元データの発生時刻(頻度、間隔)を再現しながらデータを出力する係で、再生開始時刻や早送りも設定できる。

似たようなものを作ろう

class Replay
  def initialize(tsv, timed_by, speed=1, start=nil)
    @tsv = tsv
    @start = start
    @speed = speed
    @timed_by = timed_by
  end

  def sleep_until(at)
    sleep([at - Time.now, 0].max)
  end

  def run
    origin = Time.now
    while (rec = @tsv.next rescue nil)
      t = rec[@timed_by]
      @start ||= t
      diff = t - @start
      sleep_until(origin + (diff / @speed))
      yield(rec)
    end
  end
end

if __FILE__ == $0
  src = TSV.load_file('dataset-amedas/20220101-20220131-kushiro.tsv',
                      [Time.method(:parse), method(:Float), method(:Float)])

  replay = Replay.new(src, 'timestamp', 3600)
  replay.run {|x| pp x}
end

(実行しないとわからないと思うが)3600倍速でデータが印字されるぞ!

開始時刻を指定するのがめんどうだったので、最初のデータの時刻をデフォルト値にした。

ちなみにReplayは内部イテレータぽくした。うーむ。なんで私はこれがいいと思ったのだろうか。あとでまた考えよう。

Driqでストリームぽくする

DriqはThe dRuby Bookに書いたDripから永続化を省いた消費されないキューだ。Drip - persistent + queue。 Driqについてはn月刊ラムダノートの既刊に掲載されているので金で買って読んで欲しい。

あとRubyKaigi TakeoutでDriqの話をしてました。(忘れてた)

rubykaigi.org

dRubyを使えば複数プロセス/マシンでDriqとクライアントを分割できるが、分散できるのは知っているので今日はモノプロセス/マルチスレッドで書くことにする。

  require 'pp'
  require 'time'
  require 'driq'

  src = TSV.load_file('dataset-amedas/20220101-20220131-kushiro.tsv',
                      [Time.method(:parse), method(:Float), method(:Float)])
  r = Replay.new(src, 'timestamp', 3600)

  s_source = Driq.new

  Thread.new(s_source) do |sin|
    cursor = 0
    while true
      cursor, value = sin.read(cursor)
      pp value
    end
  end

  r.run {|x| s_source.write(x)}

Replayしたデータをトピックに相当するs_sourceにwriteして、別スレッドでreadして印刷するだけのものです。

気温が0度以上のものだけが流れるs_non_negもやってみる

  require 'pp'
  require 'time'
  require 'driq'

 src = TSV.load_file('dataset-amedas/20220101-20220131-kushiro.tsv',
                      [Time.method(:parse), method(:Float), method(:Float)])
  r = Replay.new(src, 'timestamp', 36000)

  s_source = Driq.new
  s_non_neg = Driq.new

  Thread.new(s_source, s_non_neg) do |sin, sout|
    cursor = 0
    while true
      cursor, value = sin.read(cursor)
      sout.write(value) if value['temperature [°C]'] >= 0
    end
  end

  Thread.new(s_non_neg) do |sin|
    cursor = 0
    while true
      cursor, value = sin.read(cursor)
      pp value
    end
  end

  r.run {|x| s_source.write(x)}

あわせて読みたい

The dRuby Book: Distributed and Parallel Computing with Ruby

n月刊ラムダノート Vol.2, No.1(2020)(電子書籍のみ)www.lambdanote.com

付き合いで送ろう!m_seki's wishlist

今年はERB/dRubyが25周年なので!

https://www.amazon.co.jp/hz/wishlist/ls/1R43BBPSPUEEE/

typoなおした

reply → replay