AIタイトルアシストなら大袈裟でも恥ずかしくない!
n月刊ラムダノートVol.4, No.1の記事を読むぞ
「手を動かして学ぶストリーム処理入門」でKafkaの気持ちを理解したくなりました。 でもKafkaを使うのはめんどくさいので全部Rubyで書いてみようと思います。
実験用のデータ
githubに実験用のデータが置いてある。親切だ!
ヘッダつきタブ区切りのテキストファイルに気象情報が書いてある。 本文を読み進めると、タブ区切りのまま使わないでJSON風のマップに変換してるようだ。 何度もデータ形式を変換する処理があるのが興味深い。 結局のところ元の情報の表現(カラムの順序)を知っている人が作るんだからタブ区切り(あるいはArray)のままでもいい気がする。
そこは本質じゃないので1ターン目にオブジェクトにしてHashに入れることにした。
require 'pp' require 'time' class TSV def self.load_file(fname, parser) stream = File.foreach(fname).lazy.map {|x| x.chomp.split("\t")} head = stream.next self.new(head, stream, parser) end def initialize(cols, stream, parser) @cols = cols @stream = stream @parser = parser end def next [@cols, @parser.zip(@stream.next).map {|m, v| m.call(v)}].transpose.to_h end end if __FILE__ == $0 tsv = TSV.load_file('dataset-amedas/20220101-20220131-kushiro.tsv', [Time.method(:parse), method(:Float), method(:Float)]) while (it = tsv.next rescue nil) pp it end end
こんな感じの出力になる
{"timestamp"=>2022-01-01 00:00:00 +0900, "temperature [°C]"=>-8.2, "rainfall [mm]"=>0.0} {"timestamp"=>2022-01-01 01:00:00 +0900, "temperature [°C]"=>-8.6, "rainfall [mm]"=>0.0} {"timestamp"=>2022-01-01 02:00:00 +0900, "temperature [°C]"=>-9.3, "rainfall [mm]"=>0.0} ...
なお、TSVは外部イテレータ風にした。
replaymanを真似する
実験用のツールにreplaymanというのがあるみたい。 元データの発生時刻(頻度、間隔)を再現しながらデータを出力する係で、再生開始時刻や早送りも設定できる。
似たようなものを作ろう
class Replay def initialize(tsv, timed_by, speed=1, start=nil) @tsv = tsv @start = start @speed = speed @timed_by = timed_by end def sleep_until(at) sleep([at - Time.now, 0].max) end def run origin = Time.now while (rec = @tsv.next rescue nil) t = rec[@timed_by] @start ||= t diff = t - @start sleep_until(origin + (diff / @speed)) yield(rec) end end end if __FILE__ == $0 src = TSV.load_file('dataset-amedas/20220101-20220131-kushiro.tsv', [Time.method(:parse), method(:Float), method(:Float)]) replay = Replay.new(src, 'timestamp', 3600) replay.run {|x| pp x} end
(実行しないとわからないと思うが)3600倍速でデータが印字されるぞ!
開始時刻を指定するのがめんどうだったので、最初のデータの時刻をデフォルト値にした。
ちなみにReplayは内部イテレータぽくした。うーむ。なんで私はこれがいいと思ったのだろうか。あとでまた考えよう。
Driqでストリームぽくする
DriqはThe dRuby Bookに書いたDripから永続化を省いた消費されないキューだ。Drip - persistent + queue。 Driqについてはn月刊ラムダノートの既刊に掲載されているので金で買って読んで欲しい。
あとRubyKaigi TakeoutでDriqの話をしてました。(忘れてた)
dRubyを使えば複数プロセス/マシンでDriqとクライアントを分割できるが、分散できるのは知っているので今日はモノプロセス/マルチスレッドで書くことにする。
require 'pp' require 'time' require 'driq' src = TSV.load_file('dataset-amedas/20220101-20220131-kushiro.tsv', [Time.method(:parse), method(:Float), method(:Float)]) r = Replay.new(src, 'timestamp', 3600) s_source = Driq.new Thread.new(s_source) do |sin| cursor = 0 while true cursor, value = sin.read(cursor) pp value end end r.run {|x| s_source.write(x)}
Replayしたデータをトピックに相当するs_sourceにwriteして、別スレッドでreadして印刷するだけのものです。
気温が0度以上のものだけが流れるs_non_negもやってみる
require 'pp' require 'time' require 'driq' src = TSV.load_file('dataset-amedas/20220101-20220131-kushiro.tsv', [Time.method(:parse), method(:Float), method(:Float)]) r = Replay.new(src, 'timestamp', 36000) s_source = Driq.new s_non_neg = Driq.new Thread.new(s_source, s_non_neg) do |sin, sout| cursor = 0 while true cursor, value = sin.read(cursor) sout.write(value) if value['temperature [°C]'] >= 0 end end Thread.new(s_non_neg) do |sin| cursor = 0 while true cursor, value = sin.read(cursor) pp value end end r.run {|x| s_source.write(x)}
あわせて読みたい
n月刊ラムダノート Vol.2, No.1(2020)(電子書籍のみ)www.lambdanote.com
付き合いで送ろう!m_seki's wishlist
今年はERB/dRubyが25周年なので!
https://www.amazon.co.jp/hz/wishlist/ls/1R43BBPSPUEEE/
typoなおした
reply → replay