Embulkで独自のログを解析するためにparserを開発する – データ分析入門シリーズ

データ分析基盤

独自に出力しているログを Embulk 経由で BigQuery に流すまでの作業記録です。

目標

下記の記事では MySQL の値を Embulk にロードできるようになりました。

Embulkを使ってMySQLのデータをBigQueryに流す手順 - データ分析入門シリーズ
Embulkを使ってMySQLのテーブルをBigQueryに流すまでの手順を書いています。環境を準備するMySQLが動いているサーバ(あるいはMySQLにアクセス可能なサーバ)にembulkをインストールしてコマンドを使えるようにし...

MySQL の値をロードするために embulk-input-mysql というプラグインを使用しました。

他のデータソースからロードするためには対応したプラグインをインストールする必要があります。こちらのページにプラグインのリストがあります。

では、リストにない独自に出力しているログをロードするにはどうすればいいでしょうか?プラグインを作ればいいのです。

ということで、今回は Embulk の parser プラグインを作ってみようと思います。

解析するログの形式

[2021-10-25T12:00:01.000] [INFO] app - title:    /path/to/page  info1    info2   info3      info4
[2021-10-25T12:00:02.000] [INFO] app - title:    /path/to/page  info1    info2   info3      info4
[2021-10-25T12:00:03.000] [INFO] app - title:    /path/to/page  info1    info2   info3      info4
...

今回パースしたいのは上記のようなフォーマットのログファイルです。

一行区切りで、タイムスタンプ、ログレベル、ログの種類とログの詳細がタブで区切られたログです。ログレベルやログの種類は全て同じになる想定なのであまり気にしないことにします。タイムスタンプとログ詳細をパースして BigQuery にロードできるようにするのが目的です。

Embulk プラグインの作り方

Ruby でパーサーを作ります。

まずは下記のコマンドを実行してテンプレートを作成します。plugin用のディレクトリで作業するのがいいと思います。

// 実行コマンド
$ embulk new ruby-parser my_parser

// 出力例
2021-10-25 18:00.00.000 +0900: Embulk v0.9.23
Creating embulk-parser-my_parser/

Plugin template is successfully generated.
Next steps:

  $ cd embulk-parser-my_parser
  $ bundle install                      # install one using rbenv & rbenv-build
  $ bundle exec rake                    # build gem to be released
  $ bundle exec embulk run config.yml   # you can run plugin using this command

emblulk-parser-my_parser ディレクトリが作成されます。さらに lib/embulk/parser/ と辿ると test_parser.rb があります。これが今回実装したいプラグインの本体になります。

中身は次のようになっています。このファイルを変更していきます。

module Embulk
  module Parser

    class MyParser < ParserPlugin
      Plugin.register_parser("my_parser", self)

      def self.transaction(config, &control)
        # configuration code:
        task = {
          "option1" => config.param("option1", :integer),                     # integer, required
          "option2" => config.param("option2", :string, default: "myvalue"),  # string, optional
          "option3" => config.param("option3", :string, default: nil),        # string, optional
        }

        columns = [
          Column.new(0, "example", :string),
          Column.new(1, "column", :long),
          Column.new(2, "name", :double),
        ]
				
        yield(task, columns)
      end

      def init
        # initialization code:
        @option1 = task["option1"]
        @option2 = task["option2"]
        @option3 = task["option3"]
      end

      def run(file_input)
        while file = file_input.next_file
          file.each do |buffer|
            # parsering code
            record = ["col1", 2, 3.0]
            page_builder.add(record)
          end
        end
        page_builder.finish
      end
    end

  end
end

Embulkプラグインの実装詳細

自動生成されたクラスを見るとrunメソッドでファイルを読み込んでいることがわかります。以降の処理を行単位で行ってくれると思ったのですが、どうやらそうではないらしい ので、リンク先の情報を参考にして  LineDecoder というユーティリティを使うことにしました。

def self.transaction(config, &control)
	# configuration code:
	parser_task = config.load_config(Java::LineDecoder::DecoderTask)

	task = {
		"decoder_task" => DataSource.from_java(parser_task.dump)
 }

transactionメソッドを変更しました。まず、parser_taskとしてLineDecoderをロードしています。taskにはdecoder_taskとして設定しました。

自動生成されたメソッドには、yamlファイルのオプションを読み込む処理がありましたが、今回は使わないので削除しています。

def init
	# initialization code:
	@decoder_task = task.param("decoder_task", :hash).load_task(Java::LineDecoder::DecoderTask)
end

initメソッドは上記のように書き換えました。オプションに関する記述は同様に削除しています。

def run(file_input)

	decoder = Java::LineDecoder.new(file_input.instance_eval { @java_file_input }, @decoder_task)

	while decoder.nextFile
		while line = decoder.poll
			
			// 各行に対する処理
			
			// パース後の値を渡す
			page_builder.add([timestamp, info1, info2, info3])
		end
	end
	page_builder.finish
end

そしてrunメソッドです。LineDecoderを使って行ごとに読み込むようになりました。2つ目のwhile文の中で、lineに対して各行に対する処理を書けばOKです。今回はタブで区切って必要な情報を取得すればいいので、この中でsliceしたりします。

page_builder.add([]) の箇所でパース後の値を渡します。

ここで渡した値が、transactionメソッドの中の columns に設定されたカラムとして embulk 側に渡されることになります。型指定は columns で行います。

これで parser プラグインの実装が完了しました。

プラグインを使って Embulk を実行する

プラグインをbundleに含めることもできますが、実行時にプラグインのパスを渡す方法で実行してみたいと思います。

パスを指定するにはIオプションが使えます。プラグインがあるlibディレクトリを指定します。

$ embulk run -I plugin/embulk-parser-my_parser/lib config/my-config.yml

embulkの設定ファイル側では、typeにプラグイン名を指定します。

in:
  type: file
  path_prefix: /path/to/log_file.log
  parser:
    charset: UTF-8
    newline: LF
    type: my_parser

これでプラグインを使ってオリジナルのログをロードできるようになりました。

参考文献

タイトルとURLをコピーしました