データ分析基盤を作っていく上で、定期実行したい場面は数多くあります。これまで定期実行といえば cron を使っていました。データのロードも cron で実行することはできますが、ログを収集したり、複雑なタスク管理をcronで実現するのは少し難しいです。
そこで使えるのが Digdag です。
Digdag fits simple replacement of cron, IT operations automation, data analytics batch jobs, machine learning pipelines, and many more by using Directed Acyclic Graphs (DAG) as the infrastructure.
cron だけでは実現できないリッチなタスク管理ができます。この記事では詳しい内容や応用に入る前にとにかく Digdag に触れてみたいと思います。
Digdag入門
公式ページの Getting started にしたがってサンプルを動かすところまでやります。
最新バージョンをダウンロードする
$ curl -o /bin/digdag --create-dirs -L "https://dl.digdag.io/digdag-latest"
最新バージョンをダウンロードして保存します。(保存先を /bin/digdag に変更してます。)
chmod +x /bin/digdag
実行権限の変更も行いました。
echo 'export PATH="$HOME/bin:$PATH"' >> ~/.bashrc $ source ~/.bashrc
(必要に応じて)パスを通します。正しく設定できていればdigdagコマンドが使えるようになっています。helpオプションを試してみましょう。次のようになっていれば設定完了です。
# 実行コマンド $ digdag --help # 出力例 2021-10-14 13:11:03 +0900: Digdag v0.10.3 Usage: digdag <command> [options...] Local-mode commands: init <dir> create a new workflow project r[un] <workflow.dig> run a workflow c[heck] show workflow definitions sched[uler] run a scheduler server migrate (run|check) migrate database selfupdate update cli to the latest version ...
サンプル・ワークフロー
サンプルが用意されているので実行して、digdagの動きを見ていきます。
まずはワークフローを新規作成して、テンプレートファイルを作ります。
# 実行コマンド $ digdag init mydag # 出力例 2021-10-14 13:12:00 +0900: Digdag v0.10.3 Creating mydag/mydag.dig Creating mydag/.gitignore Done. Type `cd mydag` and then `digdag run mydag.dig` to run the workflow. Enjoy!
設定ファイルが作成され、次に実行するコマンドの指示が出ました。
設定ファイルの内容を見てみます。
timezone: UTC +setup: echo>: start ${session_time} +disp_current_date: echo>: ${moment(session_time).utc().format('YYYY-MM-DD HH:mm:ss Z')} +repeat: for_each>: order: [first, second, third] animal: [dog, cat] _do: echo>: ${order} ${animal} _parallel: true +teardown: echo>: finish ${session_time}
はじめに実行時のセッション時間を表示して、現在の日時を表示、配列orderと配列animalをループして値を表示して、最後に終了時のセッション時間を表示するようです。
ディレクトリを移って、digdagを実行します。
# 実行コマンド $ cd mydig $ digdag run mydag.dig # 出力例 2021-10-14 13:12:33 +0900: Digdag v0.10.3 2021-10-14 13:12:35 +0900 [WARN] (main): Using a new session time 2021-10-14T00:00:00+00:00. 2021-10-14 13:12:35 +0900 [INFO] (main): Using session mydag/.digdag/status/20211014T000000+0000. 2021-10-14 13:12:35 +0900 [INFO] (main): Starting a new session project id=1 workflow name=mydag session_time=2021-10-14T00:00:00+00:00 # ここからセッション開始 2021-10-14 13:12:37 +0900 [INFO] (0016@[0:default]+mydag+setup): echo>: start 2021-10-14T00:00:00+00:00 start 2021-10-14T00:00:00+00:00 # 現在の日時を表示しています 2021-10-14 13:12:39 +0900 [INFO] (0016@[0:default]+mydag+disp_current_date): echo>: 2021-10-14 00:00:00 +00:00 2021-10-14 00:00:00 +00:00 # repeatの処理。6通りの組み合わせが並列に表示されています 2021-10-14 13:12:40 +0900 [INFO] (0016@[0:default]+mydag+repeat): for_each>: {order=[first, second, third], animal=[dog, cat]} 2021-10-14 13:12:41 +0900 [INFO] (0017@[0:default]+mydag+repeat^sub+for-0=order=0=first&1=animal=1=cat): echo>: first cat 2021-10-14 13:12:41 +0900 [INFO] (0018@[0:default]+mydag+repeat^sub+for-0=order=1=second&1=animal=0=dog): echo>: second dog first cat second dog 2021-10-14 13:12:41 +0900 [INFO] (0021@[0:default]+mydag+repeat^sub+for-0=order=2=third&1=animal=1=cat): echo>: third cat third cat 2021-10-14 13:12:41 +0900 [INFO] (0019@[0:default]+mydag+repeat^sub+for-0=order=1=second&1=animal=1=cat): echo>: second cat second ca 2021-10-14 13:12:41 +0900 [INFO] (0016@[0:default]+mydag+repeat^sub+for-0=order=0=first&1=animal=0=dog): echo>: first dog first dog 2021-10-14 13:12:41 +0900 [INFO] (0020@[0:default]+mydag+repeat^sub+for-0=order=2=third&1=animal=0=dog): echo>: third dog third dog # 終了時のセッション時間 2021-10-14 13:12:42 +0900 [INFO] (0020@[0:default]+mydag+teardown): echo>: finish 2021-10-14T00:00:00+00:00 finish 2021-10-14T00:00:00+00:00 # 実行後のステータスがファイルに設定されています。 Success. Task state is saved at mydag/.digdag/status/20211014T000000+0000 directory. * Use --session <daily | hourly | "yyyy-MM-dd[ HH:mm:ss]"> to not reuse the last session time. * Use --rerun, --start +NAME, or --goal +NAME argument to rerun skipped tasks.
設定ファイルにあるワークフローが実行されていることが確認できました。
設定ファイルの書き方
サンプルやドキュメントから次のようなルールで設定を書けることが分かります。
- 処理の単位はタスク。タスクは
+task
形式で書くことができる - オペレーションは
operators>
形式で書くことができる。echo>: ...
とか。 - 変数は
${...}
形式。時間計算のために Moment.js がバンドルされてる。 - 変数は
_export:
で定義する。
digdagで実行スケジュールを設定する
ワークフローを定期定に実行するには schedule:
を設定します。
timezone: Asia/Tokyo schedule: daily>: 07:00:00 +step1: sh>: tasks/shell_sample.sh
いつスケジュールが実行されるかを知るには digdag check
を使います。
# 実行コマンド $ digdag check # 出力例 2021-11-11 16:14:26 +0900: Digdag v0.10.3 System default timezone: Asia/Tokyo Definitions (1 workflows): sample (2 tasks) Parameters: {} Schedules (1 entries): sample: daily>: "07:00:00" first session time: 2021-11-12 00:00:00 +0900 first scheduled to run at: 2021-11-12 07:00:00 +0900 (in 14h 45m 32s)
実際にスケジューラーを動かすには digdag scheduler
または digdag sched
を使います。
# 実行コマンド $ digdag sched # 出力例 2021-11-11 16:17:20 +0900: Digdag v0.10.3 2021-11-11 16:17:22 +0900 [INFO] (main): secret encryption engine: disabled 2021-11-11 16:17:23 +0900 [INFO] (main): Added new revision 1 2021-11-11 16:17:23 +0900 [INFO] (main): XNIO version 3.3.8.Final 2021-11-11 16:17:23 +0900 [INFO] (main): XNIO NIO Implementation Version 3.3.8.Final 2021-11-11 16:17:23 +0900 [INFO] (main): Starting server on 127.0.0.1:65432 2021-11-11 16:17:23 +0900 [INFO] (main): Bound on 127.0.0.1:65432 (api) # 終了時 2021-11-11 16:17:59 +0900 [INFO] (shutdown): Started shutdown process 2021-11-11 16:17:59 +0900 [INFO] (shutdown): Shutting down workflow executor loop 2021-11-11 16:17:59 +0900 [INFO] (shutdown): Closing HTTP listening sockets 2021-11-11 16:17:59 +0900 [INFO] (shutdown): Waiting for completion of running HTTP requests... 2021-11-11 16:17:59 +0900 [INFO] (shutdown): Shutting down HTTP worker threads 2021-11-11 16:17:59 +0900 [INFO] (shutdown): Shutting down system 2021-11-11 16:17:59 +0900 [INFO] (shutdown): Shutdown completed
ちなみに二重起動すると次のようなエラーが表示されます。前回起動したプロセスが終了されていないので、プロセスを kill してから再度起動します。
error: java.net.BindException: Address already in use (runtime)
Embulkを動かしてみる
大枠がつかめたのでEmbulkを動かせるようにします。
timezone: 'Asia/Tokyo' +sync: _export: EMBULK_INPUT_DB_HOST: DB_HOST_ADDRESS EMBULK_INPUT_DB_USER: root sh>: embulk run -c diff.yml config.yml.liquid
まずはタイムゾーンを設定。
次に +sync
タスクで Embulk のタスクを作ります。メインになる embulk run
コマンドを sh
オペレーターで記述します。このとき、_export
で変数を設定することで sh 実行時に環境変数として渡すことができます。こうすることで、config.yml.liquid ファイルでは {{ env.EMBULK_INPUT_DB_HOST }}
のようにして変数を展開することができます。
liquidは組み込みテンプレートなので、embulkが使えるようになっていれば自動で使うことができます。