HOW TO TD(User Engagement)Treasure Data User Engagement

よく使うワークフローオペレーター集 part.2

ホーム » よく使うワークフローオペレーター集 part.2

前回(よく使うワークフローオペレーター集 part.1の続きとして、よく使うワークフローオペレーターの中でdigdagオペレーターについてご紹介させていただきます。

オペレーター 概要
ファイルを呼び出し call>: 別のdigファイルを呼び出す。(主に自PJ)
require>: 別のdigファイルを呼び出す。(主に他PJ)
!include: 別のdigファイルを読み込む。(主に変数など)
タスク制御 if>: 条件に対する真偽値で処理を分岐する。
for_each>: 変数のセットに対してタスクを繰り返す。
loop>: 指定した数値の分だけタスクを繰り返す。

call>:

# main.dig
+before_call:
  echo>: before

+run_another_workflow_call:
  call>: workflow/another.dig

+after_call:
  echo>: after


# workflow/another.dig
+another_workflow:
  echo>: another workflow
  • call>
    – このオペレーターは、別のワークフローをサブタスクとして埋め込みます。
  • オプション
    – call>: [ファイル名]  ワークフロー定義ファイルへのパスを指定します。ファイル名は.digで終わる必要があります。

require>:

# main.dig
+before_call:
  echo>: before

+run_another_workflow_call:
  require>: another
  project_name: another_project1

+after_call:
  echo>: after


# another.dig / project_name: another_project1
+another_workflow:
  echo>: another workflow
  • require>
    – このオペレーターは、別のワークフローを実行します。
  • オプション
    – require>: [ファイル名]  ワークフロー名を指定します。
    – session_time: ISO_TIME_WITH_ZONE
     実行する際のセッションタイムを指定します。
     例)session_time: 2017-01-01T00:00:00+00:00
    – project_id:[プロジェクトID] or project_name:[プロジェクト名]  project_idまたはproject_nameを指定することで、別プロジェクトのワークフローを開始できます。プロジェクトが存在しない場合、タスクは失敗します。project_idとproject_nameの両方を設定すると、タスクは失敗します。
    – rerun_on: none, failed, all(default: none)
     noneは試行が既に存在する場合は、ワークフローを開始しません。
     failedは試行が存在しその結果が失敗している場合、ワークフローを開始します。
     allは試行の結果に関係なく、ワークフローを開始します。※おすすめ

callとrequireの違い

call require
サブフォルダのdigファイル実行 できる できない
別プロジェクトのワークフロー呼び出し できない できる
新規セッションの作成 作成されない 作成される
実行されるタスクの扱い 呼び出し元のワークフローにマージされる 呼び出し元のワークフローにはマージされず、独立したセッションとして実行される
呼び出し先ワークフローで失敗後のリトライ時、失敗地点からリトライできるか できる できない
呼び出し方法 digファイルのパスを指定 ワークフロー名を指定
使い分け(参考) 自プロジェクトのワークフローを呼び出す時に使う 他プロジェクトのワークフローを呼び出す時に使う
または、自プロジェクトでもタスク数が1,000を超える場合に使う

!include:

# workflow1.dig
_export:
# include env_variables
!include : common/config/env_variables.dig

# common/config/env_variables.dig
variables:
- foo
- bar
  • !include
    – 別のdigファイルを読み込むことが可能です。
    – 挙動としてはcallオペレーターとほぼ同じですが、!includeでは変数の読み込みも可能です。
    – 変数の読み込みは!include 、タスクの呼び出しはcallを使うことで可読性を確保することができます。

if>:

+run_if_param_is_true:
  if>: ${param}
  _do:
	echo>: ${param} == true
+run_if_param_is_false:
  if>: ${param}
  _do:
	echo>: ${param} == true
  _else_do:
	echo>: ${param} == false
  • if>
    – if>演算子は、条件がtrueの場合に_doのサブタスクを実行します。
    – 条件がfalseの場合、_else_doのサブタスクを実行します。
  • オプション
    – if>: BOOLEAN
     trueまたはfalseとなる条件を指定します。
    – _do: タスク
     条件がtrueの場合に実行するタスクを指定します。
    – _else_do: タスク
     条件がfalseの場合に実行するタスクを指定します。

for_each>:

# main.dig
_export:
  td:
	database: sample_db
  tables:
	- costomer
	- order

+for_each:
  for_each>:
	table: ${tables}
  _do:
	+create_table:
  	td>: query/create_${table}.dig
  	create_table: ${table}

+for_each_parallel:
  for_each>:
	obj: [apple, orange]
  _parallel: true
  _do:
	+task:
  	echo>: ${obj}
  • for_each>
    – for_each>演算子は、変数のセットを使用してサブタスクを複数回実行します。
  • オプション
    – for_each>: 変数
     ループに使用される変数を指定します。オブジェクトまたはJSON文字列にすることができます。
    – _parallel: BOOLEAN | {limit: N}
     繰り返しタスクを並行して実行します。limit: Nを指定することで、並行実行数を限定することができます。
    – _do: タスク
     実行するタスクを指定します。

loop>:

# main.dig
+loop:
  loop>: 2
  _do:
	+task1:
  	echo>: task1-${i}

+loop_parallel_boolean:
  loop>: 3
  _parallel: true
  _do:
	+task2:
  	echo>: task2-${i}

+loop_parallel_limit:
  loop>: 4
  _parallel:
	limit: 2
  _do:
	+task3:
  	echo>: task3-${i}
  • loop>
    – loop>演算子は、サブタスクを複数回実行します。
    ${i}は、サブタスクの変数をエクスポートします。0から始まり、指定したサブタスクが3の場合、i=0,i=1,i=2で実行されます。
  • オプション
    – loop>: タスクを実行する回数
     ループを繰り返す回数を指定します。
    – _parallel: BOOLEAN | {limit: N}
     繰り返しタスクを並行して実行します。limit: Nを指定することで、並行実行数を限定することができます。
    – _do: タスク
     実行するタスクを指定します。

最後に

今回は、データマネジメントチームが良く使うオペレーターの中でもdigdagオペレーターについてご紹介させていただきました。
他にもオペレーターはたくさんありますが、まずは紹介したオペレーターを抑えていただければ基本的なワークフローの実装は行っていただけると思いますので、参考にしてみてください。

UserEngagement事務局

ユーザーの皆さまへのお知らせや、Treasure Data UserEngagementのサイト運営を担当する事務局アカウント。
Back to top button