独自に出力しているログを Embulk 経由で BigQuery に流すまでの作業記録です。
目標
下記の記事では MySQL の値を Embulk にロードできるようになりました。
MySQL の値をロードするために embulk-input-mysql というプラグインを使用しました。
他のデータソースからロードするためには対応したプラグインをインストールする必要があります。こちらのページにプラグインのリストがあります。
では、リストにない独自に出力しているログをロードするにはどうすればいいでしょうか?プラグインを作ればいいのです。
ということで、今回は Embulk の parser プラグインを作ってみようと思います。
解析するログの形式
[2021-10-25T12:00:01.000] [INFO] app - title: /path/to/page info1 info2 info3 info4 [2021-10-25T12:00:02.000] [INFO] app - title: /path/to/page info1 info2 info3 info4 [2021-10-25T12:00:03.000] [INFO] app - title: /path/to/page info1 info2 info3 info4 ...
今回パースしたいのは上記のようなフォーマットのログファイルです。
一行区切りで、タイムスタンプ、ログレベル、ログの種類とログの詳細がタブで区切られたログです。ログレベルやログの種類は全て同じになる想定なのであまり気にしないことにします。タイムスタンプとログ詳細をパースして BigQuery にロードできるようにするのが目的です。
Embulk プラグインの作り方
Ruby でパーサーを作ります。
まずは下記のコマンドを実行してテンプレートを作成します。plugin用のディレクトリで作業するのがいいと思います。
// 実行コマンド $ embulk new ruby-parser my_parser // 出力例 2021-10-25 18:00.00.000 +0900: Embulk v0.9.23 Creating embulk-parser-my_parser/ Plugin template is successfully generated. Next steps: $ cd embulk-parser-my_parser $ bundle install # install one using rbenv & rbenv-build $ bundle exec rake # build gem to be released $ bundle exec embulk run config.yml # you can run plugin using this command
emblulk-parser-my_parser ディレクトリが作成されます。さらに lib/embulk/parser/ と辿ると test_parser.rb があります。これが今回実装したいプラグインの本体になります。
中身は次のようになっています。このファイルを変更していきます。
module Embulk module Parser class MyParser < ParserPlugin Plugin.register_parser("my_parser", self) def self.transaction(config, &control) # configuration code: task = { "option1" => config.param("option1", :integer), # integer, required "option2" => config.param("option2", :string, default: "myvalue"), # string, optional "option3" => config.param("option3", :string, default: nil), # string, optional } columns = [ Column.new(0, "example", :string), Column.new(1, "column", :long), Column.new(2, "name", :double), ] yield(task, columns) end def init # initialization code: @option1 = task["option1"] @option2 = task["option2"] @option3 = task["option3"] end def run(file_input) while file = file_input.next_file file.each do |buffer| # parsering code record = ["col1", 2, 3.0] page_builder.add(record) end end page_builder.finish end end end end
Embulkプラグインの実装詳細
自動生成されたクラスを見るとrunメソッドでファイルを読み込んでいることがわかります。以降の処理を行単位で行ってくれると思ったのですが、どうやらそうではないらしい ので、リンク先の情報を参考にして LineDecoder というユーティリティを使うことにしました。
def self.transaction(config, &control) # configuration code: parser_task = config.load_config(Java::LineDecoder::DecoderTask) task = { "decoder_task" => DataSource.from_java(parser_task.dump) }
transactionメソッドを変更しました。まず、parser_taskとしてLineDecoderをロードしています。taskにはdecoder_taskとして設定しました。
自動生成されたメソッドには、yamlファイルのオプションを読み込む処理がありましたが、今回は使わないので削除しています。
def init # initialization code: @decoder_task = task.param("decoder_task", :hash).load_task(Java::LineDecoder::DecoderTask) end
initメソッドは上記のように書き換えました。オプションに関する記述は同様に削除しています。
def run(file_input) decoder = Java::LineDecoder.new(file_input.instance_eval { @java_file_input }, @decoder_task) while decoder.nextFile while line = decoder.poll // 各行に対する処理 // パース後の値を渡す page_builder.add([timestamp, info1, info2, info3]) end end page_builder.finish end
そしてrunメソッドです。LineDecoderを使って行ごとに読み込むようになりました。2つ目のwhile文の中で、lineに対して各行に対する処理を書けばOKです。今回はタブで区切って必要な情報を取得すればいいので、この中でsliceしたりします。
page_builder.add([]) の箇所でパース後の値を渡します。
ここで渡した値が、transactionメソッドの中の columns に設定されたカラムとして embulk 側に渡されることになります。型指定は columns で行います。
これで parser プラグインの実装が完了しました。
プラグインを使って Embulk を実行する
プラグインをbundleに含めることもできますが、実行時にプラグインのパスを渡す方法で実行してみたいと思います。
パスを指定するにはIオプションが使えます。プラグインがあるlibディレクトリを指定します。
$ embulk run -I plugin/embulk-parser-my_parser/lib config/my-config.yml
embulkの設定ファイル側では、typeにプラグイン名を指定します。
in: type: file path_prefix: /path/to/log_file.log parser: charset: UTF-8 newline: LF type: my_parser
これでプラグインを使ってオリジナルのログをロードできるようになりました。
参考文献
- 複数行からなるログを解析するために、EmbulkのparserプラグインをRubyで開発する話(準備編)
- Rubyでのプラグイン開発の話。この記事では複数行のログに対応しているが、自分の環境では一行解析できればよかったのでリンク先のサンプルに少し変更を加える形で参考にした。
- hiroyuki-sato/apache_log_ruby.rb
- Rubyで書かれたApacheのログのパーサー。
- Nginx のアクセスログを Embulk で PostgreSQL に入れて分析する|つよくなるブログ
- Nginxが出力するログをワンライナーでTSV形式に変換してビルトインのパーサーを使う方法。