Digdagを使ってスケジューリングをしてみよう – データ分析入門シリーズ

データ分析基盤を作っていく上で、定期実行したい場面は数多くあります。これまで定期実行といえば cron を使っていました。データのロードも cron で実行することはできますが、ログを収集したり、複雑なタスク管理をcronで実現するのは少し難しいです。

そこで使えるのが Digdag です。

Digdag fits simple replacement of cron, IT operations automation, data analytics batch jobs, machine learning pipelines, and many more by using Directed Acyclic Graphs (DAG) as the infrastructure.

cron だけでは実現できないリッチなタスク管理ができます。この記事では詳しい内容や応用に入る前にとにかく Digdag に触れてみたいと思います。

Digdag入門

公式ページの Getting started にしたがってサンプルを動かすところまでやります。

Getting started — Digdag 0.10.5 documentation

最新バージョンをダウンロードする

$ curl -o /bin/digdag --create-dirs -L "https://dl.digdag.io/digdag-latest"

最新バージョンをダウンロードして保存します。(保存先を /bin/digdag に変更してます。)

chmod +x /bin/digdag

実行権限の変更も行いました。

echo 'export PATH="$HOME/bin:$PATH"' >> ~/.bashrc
$ source ~/.bashrc

(必要に応じて)パスを通します。正しく設定できていればdigdagコマンドが使えるようになっています。helpオプションを試してみましょう。次のようになっていれば設定完了です。

# 実行コマンド
$ digdag --help

# 出力例
2021-10-14 13:11:03 +0900: Digdag v0.10.3
Usage: digdag <command> [options...]
  Local-mode commands:
    init <dir>                         create a new workflow project
    r[un] <workflow.dig>               run a workflow
    c[heck]                            show workflow definitions
    sched[uler]                        run a scheduler server
    migrate (run|check)                migrate database
    selfupdate                         update cli to the latest version
...

サンプル・ワークフロー

サンプルが用意されているので実行して、digdagの動きを見ていきます。

まずはワークフローを新規作成して、テンプレートファイルを作ります。

# 実行コマンド
$ digdag init mydag

# 出力例
2021-10-14 13:12:00 +0900: Digdag v0.10.3
  Creating mydag/mydag.dig
  Creating mydag/.gitignore
Done. Type `cd mydag` and then `digdag run mydag.dig` to run the workflow. Enjoy!

設定ファイルが作成され、次に実行するコマンドの指示が出ました。

設定ファイルの内容を見てみます。

timezone: UTC

+setup:
  echo>: start ${session_time}

+disp_current_date:
  echo>: ${moment(session_time).utc().format('YYYY-MM-DD HH:mm:ss Z')}

+repeat:
  for_each>:
    order: [first, second, third]
    animal: [dog, cat]
  _do:
    echo>: ${order} ${animal}
  _parallel: true

+teardown:
  echo>: finish ${session_time}

はじめに実行時のセッション時間を表示して、現在の日時を表示、配列orderと配列animalをループして値を表示して、最後に終了時のセッション時間を表示するようです。

ディレクトリを移って、digdagを実行します。

# 実行コマンド
$ cd mydig
$ digdag run mydag.dig

# 出力例
2021-10-14 13:12:33 +0900: Digdag v0.10.3
2021-10-14 13:12:35 +0900 [WARN] (main): Using a new session time 2021-10-14T00:00:00+00:00.
2021-10-14 13:12:35 +0900 [INFO] (main): Using session mydag/.digdag/status/20211014T000000+0000.
2021-10-14 13:12:35 +0900 [INFO] (main): Starting a new session project id=1 workflow name=mydag session_time=2021-10-14T00:00:00+00:00

# ここからセッション開始
2021-10-14 13:12:37 +0900 [INFO] (0016@[0:default]+mydag+setup): echo>: start 2021-10-14T00:00:00+00:00
start 2021-10-14T00:00:00+00:00

# 現在の日時を表示しています
2021-10-14 13:12:39 +0900 [INFO] (0016@[0:default]+mydag+disp_current_date): echo>: 2021-10-14 00:00:00 +00:00
2021-10-14 00:00:00 +00:00

# repeatの処理。6通りの組み合わせが並列に表示されています
2021-10-14 13:12:40 +0900 [INFO] (0016@[0:default]+mydag+repeat): for_each>: {order=[first, second, third], animal=[dog, cat]}
2021-10-14 13:12:41 +0900 [INFO] (0017@[0:default]+mydag+repeat^sub+for-0=order=0=first&1=animal=1=cat): echo>: first cat
2021-10-14 13:12:41 +0900 [INFO] (0018@[0:default]+mydag+repeat^sub+for-0=order=1=second&1=animal=0=dog): echo>: second dog
first cat
second dog
2021-10-14 13:12:41 +0900 [INFO] (0021@[0:default]+mydag+repeat^sub+for-0=order=2=third&1=animal=1=cat): echo>: third cat
third cat
2021-10-14 13:12:41 +0900 [INFO] (0019@[0:default]+mydag+repeat^sub+for-0=order=1=second&1=animal=1=cat): echo>: second cat
second ca
2021-10-14 13:12:41 +0900 [INFO] (0016@[0:default]+mydag+repeat^sub+for-0=order=0=first&1=animal=0=dog): echo>: first dog
first dog
2021-10-14 13:12:41 +0900 [INFO] (0020@[0:default]+mydag+repeat^sub+for-0=order=2=third&1=animal=0=dog): echo>: third dog
third dog

# 終了時のセッション時間
2021-10-14 13:12:42 +0900 [INFO] (0020@[0:default]+mydag+teardown): echo>: finish 2021-10-14T00:00:00+00:00
finish 2021-10-14T00:00:00+00:00

# 実行後のステータスがファイルに設定されています。
Success. Task state is saved at mydag/.digdag/status/20211014T000000+0000 directory.
  * Use --session <daily | hourly | "yyyy-MM-dd[ HH:mm:ss]"> to not reuse the last session time.
  * Use --rerun, --start +NAME, or --goal +NAME argument to rerun skipped tasks.

設定ファイルにあるワークフローが実行されていることが確認できました。

設定ファイルの書き方

サンプルやドキュメントから次のようなルールで設定を書けることが分かります。

  • 処理の単位はタスク。タスクは +task 形式で書くことができる
  • オペレーションは operators> 形式で書くことができる。echo>: ... とか。
  • 変数は ${...} 形式。時間計算のために Moment.js がバンドルされてる。
  • 変数は _export: で定義する。

digdagで実行スケジュールを設定する

ワークフローを定期定に実行するには schedule: を設定します。

timezone: Asia/Tokyo

schedule:
  daily>: 07:00:00

+step1:
  sh>: tasks/shell_sample.sh

いつスケジュールが実行されるかを知るには digdag check を使います。

# 実行コマンド
$ digdag check

# 出力例
2021-11-11 16:14:26 +0900: Digdag v0.10.3
  System default timezone: Asia/Tokyo

  Definitions (1 workflows):
    sample (2 tasks)

  Parameters:
    {}

  Schedules (1 entries):
    sample:
      daily>: "07:00:00"
      first session time: 2021-11-12 00:00:00 +0900
      first scheduled to run at: 2021-11-12 07:00:00 +0900 (in 14h 45m 32s)

実際にスケジューラーを動かすには digdag scheduler または digdag sched を使います。

# 実行コマンド
$ digdag sched

# 出力例
2021-11-11 16:17:20 +0900: Digdag v0.10.3
2021-11-11 16:17:22 +0900 [INFO] (main): secret encryption engine: disabled
2021-11-11 16:17:23 +0900 [INFO] (main): Added new revision 1
2021-11-11 16:17:23 +0900 [INFO] (main): XNIO version 3.3.8.Final
2021-11-11 16:17:23 +0900 [INFO] (main): XNIO NIO Implementation Version 3.3.8.Final
2021-11-11 16:17:23 +0900 [INFO] (main): Starting server on 127.0.0.1:65432
2021-11-11 16:17:23 +0900 [INFO] (main): Bound on 127.0.0.1:65432 (api)

# 終了時
2021-11-11 16:17:59 +0900 [INFO] (shutdown): Started shutdown process
2021-11-11 16:17:59 +0900 [INFO] (shutdown): Shutting down workflow executor loop
2021-11-11 16:17:59 +0900 [INFO] (shutdown): Closing HTTP listening sockets
2021-11-11 16:17:59 +0900 [INFO] (shutdown): Waiting for completion of running HTTP requests...
2021-11-11 16:17:59 +0900 [INFO] (shutdown): Shutting down HTTP worker threads
2021-11-11 16:17:59 +0900 [INFO] (shutdown): Shutting down system
2021-11-11 16:17:59 +0900 [INFO] (shutdown): Shutdown completed

ちなみに二重起動すると次のようなエラーが表示されます。前回起動したプロセスが終了されていないので、プロセスを kill してから再度起動します。

error: java.net.BindException: Address already in use (runtime)

Embulkを動かしてみる

大枠がつかめたのでEmbulkを動かせるようにします。

timezone: 'Asia/Tokyo'

+sync:
  _export:
      EMBULK_INPUT_DB_HOST: DB_HOST_ADDRESS
        EMBULK_INPUT_DB_USER: root
  sh>: embulk run -c diff.yml config.yml.liquid

まずはタイムゾーンを設定。

次に +sync タスクで Embulk のタスクを作ります。メインになる embulk run コマンドを sh オペレーターで記述します。このとき、_export で変数を設定することで sh 実行時に環境変数として渡すことができます。こうすることで、config.yml.liquid ファイルでは {{ env.EMBULK_INPUT_DB_HOST }} のようにして変数を展開することができます。

liquidは組み込みテンプレートなので、embulkが使えるようになっていれば自動で使うことができます。

タイトルとURLをコピーしました