@m_seki の

I like ruby tooから引っ越し

ストリーム処理入門: KafkaをRubyで代替する

AIタイトルアシストなら大袈裟でも恥ずかしくない!

n月刊ラムダノートVol.4, No.1の記事を読むぞ

「手を動かして学ぶストリーム処理入門」でKafkaの気持ちを理解したくなりました。 でもKafkaを使うのはめんどくさいので全部Rubyで書いてみようと思います。

実験用のデータ

github.com

githubに実験用のデータが置いてある。親切だ!

ヘッダつきタブ区切りのテキストファイルに気象情報が書いてある。 本文を読み進めると、タブ区切りのまま使わないでJSON風のマップに変換してるようだ。 何度もデータ形式を変換する処理があるのが興味深い。 結局のところ元の情報の表現(カラムの順序)を知っている人が作るんだからタブ区切り(あるいはArray)のままでもいい気がする。

そこは本質じゃないので1ターン目にオブジェクトにしてHashに入れることにした。

require 'pp'
require 'time'

class TSV
  def self.load_file(fname, parser)
    stream = File.foreach(fname).lazy.map {|x| x.chomp.split("\t")}
    head = stream.next
    self.new(head, stream, parser)
  end

  def initialize(cols, stream, parser)
    @cols = cols
    @stream = stream
    @parser = parser
  end
  
  def next
    [@cols, @parser.zip(@stream.next).map {|m, v| m.call(v)}].transpose.to_h
  end
end

if __FILE__ == $0
  tsv = TSV.load_file('dataset-amedas/20220101-20220131-kushiro.tsv',
                      [Time.method(:parse), method(:Float), method(:Float)])
  while (it = tsv.next rescue nil)
    pp it
  end
end

こんな感じの出力になる

{"timestamp"=>2022-01-01 00:00:00 +0900,
 "temperature [°C]"=>-8.2,
 "rainfall [mm]"=>0.0}
{"timestamp"=>2022-01-01 01:00:00 +0900,
 "temperature [°C]"=>-8.6,
 "rainfall [mm]"=>0.0}
{"timestamp"=>2022-01-01 02:00:00 +0900,
 "temperature [°C]"=>-9.3,
 "rainfall [mm]"=>0.0}
...

なお、TSVは外部イテレータ風にした。

replaymanを真似する

実験用のツールにreplaymanというのがあるみたい。 元データの発生時刻(頻度、間隔)を再現しながらデータを出力する係で、再生開始時刻や早送りも設定できる。

似たようなものを作ろう

class Reply
  def initialize(tsv, timed_by, speed=1, start=nil)
    @tsv = tsv
    @start = start
    @speed = speed
    @timed_by = timed_by
  end

  def sleep_until(at)
    sleep([at - Time.now, 0].max)
  end

  def run
    origin = Time.now
    while (rec = @tsv.next rescue nil)
      t = rec[@timed_by]
      @start ||= t
      diff = t - @start
      sleep_until(origin + (diff / @speed))
      yield(rec)
    end
  end
end

if __FILE__ == $0
  src = TSV.load_file('dataset-amedas/20220101-20220131-kushiro.tsv',
                      [Time.method(:parse), method(:Float), method(:Float)])

  reply = Reply.new(src, 'timestamp', 3600)
  reply.run {|x| pp x}
end

(実行しないとわからないと思うが)3600倍速でデータが印字されるぞ!

開始時刻を指定するのがめんどうだったので、最初のデータの時刻をデフォルト値にした。

ちなみにReplyは内部イテレータぽくした。うーむ。なんで私はこれがいいと思ったのだろうか。あとでまた考えよう。

Driqでストリームぽくする

DriqはThe dRuby Bookに書いたDripから永続化を省いた消費されないキューだ。Drip - persistent + queue。 Driqについてはn月刊ラムダノートの既刊に掲載されているので金で買って読んで欲しい。

あとRubyKaigi TakeoutでDriqの話をしてました。(忘れてた)

rubykaigi.org

dRubyを使えば複数プロセス/マシンでDriqとクライアントを分割できるが、分散できるのは知っているので今日はモノプロセス/マルチスレッドで書くことにする。

  require 'pp'
  require 'time'
  require 'driq'

  src = TSV.load_file('dataset-amedas/20220101-20220131-kushiro.tsv',
                      [Time.method(:parse), method(:Float), method(:Float)])
  r = Reply.new(src, 'timestamp', 3600)

  s_source = Driq.new

  Thread.new(s_source) do |sin|
    cursor = 0
    while true
      cursor, value = sin.read(cursor)
      pp value
    end
  end

  r.run {|x| s_source.write(x)}

Replyしたデータをトピックに相当するs_sourceにwriteして、別スレッドでreadして印刷するだけのものです。

気温が0度以上のものだけが流れるs_non_negもやってみる

  require 'pp'
  require 'time'
  require 'driq'

 src = TSV.load_file('dataset-amedas/20220101-20220131-kushiro.tsv',
                      [Time.method(:parse), method(:Float), method(:Float)])
  r = Reply.new(src, 'timestamp', 36000)

  s_source = Driq.new
  s_non_neg = Driq.new

  Thread.new(s_source, s_non_neg) do |sin, sout|
    cursor = 0
    while true
      cursor, value = sin.read(cursor)
      sout.write(value) if value['temperature [°C]'] >= 0
    end
  end

  Thread.new(s_non_neg) do |sin|
    cursor = 0
    while true
      cursor, value = sin.read(cursor)
      pp value
    end
  end

  r.run {|x| s_source.write(x)}

あわせて読みたい

The dRuby Book: Distributed and Parallel Computing with Ruby

n月刊ラムダノート Vol.2, No.1(2020)(電子書籍のみ)www.lambdanote.com

付き合いで送ろう!m_seki's wishlist

今年はERB/dRubyが25周年なので!

https://www.amazon.co.jp/hz/wishlist/ls/1R43BBPSPUEEE/

手札に「たねポケモン」がなかったときに山札引く話

AIタイトルアシストを使いました!

相手の手札に「たねポケモン」がいなかったら

対戦準備のときの話。相手の手札に「たねポケモン」がいなかったとき、自分は山を引くことができるんだが、その時の話。

対戦のはじめかた | あそびかた | ルール・Q&A | ポケモンカードゲーム公式ホームページ :

相手の手札を確認し、7まで準備をします。相手も7まで準備し終えたあと、山札を1枚引いて手札に加えることができます(引かなくてもかまいません)。 相手が何度も手札を引きなおした場合は、その回数ぶんまで山札をまとめて引くことができます。 このときに引いたのが「たねポケモン」だったら、ベンチに出すことができます(出さなくてもかまいません)。

何度も引きなおしたとき、「その回数ぶんまで山札をまとめて引く」んだね。 一枚一枚確認しながら引くのは間違い。

AR買いました

ポケモンカードゲームSV sv5M 拡張パック サイバージャッジ シキジカ AR (073/071) | ポケカ 草 たねポケモンポケモンカードゲームSV sv5M 拡張パック サイバージャッジ メブキジカ AR (074/071) | ポケカ 草 1進化

Kindle Oasis: キンドルアプリでの本やマンガの読みやすさ

AIタイトルアシスト使った!

Kindle Oasisを買い替えた

Kindleの本やマンガはiPad mini (5th) のKindleアプリか、Kindle Oasisを使っている。

iPad miniはすばらしい

KindleリーダーとしてiPad miniはめっちゃよい。大きさが良い。 操作の反応速度のおかげでページめくりもストレージの管理もやりやすい。

  • 読みやすい!
  • 仰向けで読むにはちょっと重い
  • お風呂では読めない

(amazon, 整備済みiPadとかあるのか)

Kindle Oasisはここがよい

以前のPaperwhiteは反応の遅さとページめくりのもたつきが嫌で使わなくなってしまった。

しばらくして物理めくりボタンと防水機能をもつOasisの存在を知る。ボタンでめくれるなら多少操作が遅くてもいいかも! お風呂で本が読めるぞ!!

とWishlistに入れておいたら、stsuboiさんからお古をもらえることになった。9thなので、バックライトの色以外はほぼほぼ現行と同じやつ。 これがめっちゃよいんだよ。

  • 独特の形状で持ちやすい!(持つ部分だけ厚く、それ以外はとても薄い)
  • 物理めくりボタンはとてもよい
  • お風呂で読める

買い替え

それで2年弱使っていたのだけど、最近になってバックライトなどの不具合(一部が最も明るくなり、一部が点かない。ランダムに明滅する、隅の方のE-inkが反転してる)が発生してしまった。 一番暗くしていれば問題発生しにくいんだけど、なんとなく断線とかショートとかしてそうで不安になってきた。

深夜にチャットサポート(途中で電話になった)でやりとりしたところ、

  • 修理は行っていない
  • 新品を買う際に使える15%offクーポンがあるがamazon.co.jpで買ったものでないので(簡単には)発行できない

とのことで、新しく買い替えることにした。

そろそろOasisの新型が出るんじゃないかとか、手書きできるScribeの方がいいのでは!?と10thのOasisにしました。Scribeはお風呂で使えないしね。

結論としては買い替えてよかった。Oasisとてもよい。

9thと10th

9thと10thはバックライトの色調くらいの差しかないぞ、という噂だったけど本当かなあ。触り心地はだいぶ良くなってる気がする。

  • 反応が速い
  • ダウンロード速い
  • ボタンのクリック感がよい
  • 色調はそれほどでもなかった

最近読んでるやつ

unlimitedでずっと借りてるやつ。定期的に読み返してる。統一した時間がないんだぞ、って話から並行処理の同期とかのことと対比させながら読むのが楽しい。

雑誌の連載見てて単行本を買った。ドール沼という世界があるんだなあ。沼には沼の共通性があるね。