HOW TO TD(User Engagement)Treasure Data User Engagement

Workflowでの日次ループ処理の作り方

ホーム » Workflowでの日次ループ処理の作り方

データマネジメントチームの藤井 温子です。
この記事では、日次処理のワークフローを使って、過去分のデータをまとめて取り込み・処理する際の「日次ループ処理」の考え方・記述方法について解説しています。

日次の処理をワークフローで行っている場合に、過去分やメンテナンス等でワークフローを止めていた期間分だけ、後日まとめて行いたいということがあると思います。このような場合には、もともと使っている日次ワークフローを活用して、ループ処理を行うことでまとめて処理できます。

この記事では、以下の2点について説明しています。

  • 日次ループ処理を含むプロジェクトのファイル構成
  • 日次のループ処理の作り方

日次ループ処理を含むプロジェクトのファイル構成

ワークフローは通常は日次など定期実行するもののことが多いため、同じプロジェクトに、過去分やワークフロー実行を止めていた期間のデータをまとめて処理する日次ループ処理の.digファイルをプロジェクトの中に入れておくと便利です。

例えば、以下のようなファイル構成が代表的です。

上記のような構成にしておくと、通常はcron_daily.digをcronで毎日実行されるようにしておき、日次ループ処理が必要なときはbulk.digを実行することで、一括で同じ処理が可能になります。

日次のループ処理の作り方

ループ処理のオペレーター

次に、具体的にどのように日次ループ処理を作成するかをご説明します。
ループ処理のためのオペレーターには以下のようなものがあります。

  • loop>
    指定した回数、下位のタスクを繰り返す
  • for_each>
    変数のリストを使って、下位のタスクを繰り返す
  • for_range>
    from, to, stepを指定することで、ある範囲で下位のタスクを繰り返す
  • td_for_each>
    クエリ結果を変数として使って、下位のタスクを繰り返す

その中でも、日次ループ処理を行うときはfor_range>オペレーターが便利です。

for_range>を使った日次ループ処理

for_range>は変数を使って下位のタスクを複数回行います。from, to, indexの3つの変数をエクスポートするので、それらの変数に応じて下位のタスクが実行されます。
for_range>オペレーターにはfrom, to, stepの3つの変数を指定します。

+repeat:
  for_range>:
    from: 10
    to: 50
    step: 10
  _do:
    echo>: processing from ${range.from} to ${range.to}.

上記の例では、10から50まで、10刻みで下位のタスクが実行されます。つまり、4つのタスクが実行されます。各タスクの変数は以下のようになります。
<タスク1> range.from: 10, range.to: 20, range.index: 0
<タスク2> range.from: 20, range.to: 30, range.index: 1
<タスク3> range.from: 30, range.to: 40, range.index: 2
<タスク4> range.from: 40, range.to: 50, range.index: 3
※タスク5はrange.from: 50となり、to: 50を超えてしまうので実行されない

これを利用して、日次の処理を1日刻みで一定期間行います。

_export:
  period:
    first: "2019-08-01"
    last: "2019-12-01"
  
+for_range_from_to:
  for_range>:
    from: ${moment(period.first).unix()}
    to: ${moment(period.last).unix()}
    step: 86400
  _do:
    # 実行したい下位タスクを記述

上記の例では、変数 ${range.from} に2019年8月1日から2019年11月30日の日付 (unixtime) が1日刻みで入ります。(step:の86400は、60秒×60分×24時間=86400秒で一日分の秒数を意味します。)
過去分のデータのファイル名や値に日付(例:filename_YYYYMMDD)を付けておくことで、${range.from}の値をキーにしてデータの取り込み・処理を行うことができます。

実行されるのは指定したto:未満の範囲までという点はご注意ください。つまり、上記の例ではlast:で指定した日の「前日まで」となります。もし、last:で指定した日までの範囲で実行したい場合は、to:${moment(period.last).unix())} に1日足しておく必要があります。

処理したデータのセッションタイムを変更する

日次用のワークフローを利用して、ループ処理でデータの取り込み・処理などを行った際には、本来いつ処理されるべきデータなのかについて情報がなければ、その後の処理に不都合をきたします。

session_timeを実際のワークフロー実行日ではなく${range.from}の日付に指定し、処理したデータのtimeカラムにsession_timeが入るようにしておくことで、処理されたデータがいつのデータなのか(本来はいつ処理されるべきデータか)を残しておくことができます。

実装例としては以下のようになります。

+for_range_from_to:
 for_range>:
   from: ${moment(period.first).unix()}
   to: ${moment(period.last).unix()}
   step: 86400

 _do:
   +task:
     require>: workflow_010
     session_time: ${moment.unix(range.from).format()} # セッションタイムを変更
     rerun_on: all

以上となります。この記事が少しでもご参考になりましたら幸いです。

(参考資料)
for_range>オペレーターの詳細については、dig dagの公式ドキュメントをご参考ください。
http://docs.digdag.io/operators/for_range.html

藤井 温子

Data Managementチーム

新卒でデジタルマーケティング支援企業に入社し、UXデザインコンサルタントとして大手保険会社、食品メーカー、機器メーカー等に向けたユーザーリサーチ・WebサイトのUX・UI改善等のプロジェクトを担当。その後、同社のデータ分析系SaaSのカスタマーサクセスの立ち上げメンバーとして、トレーニングプログラムの立ち上げやお客様の活用支援に従事。よりテクニカル領域に関わりたいと思い、2020年にトレジャーデータに参画。データマネジメントチームにて、Treasure Data CDPの構築や施策のデータ集計等、データ基盤の設計・開発・運用支援に従事。

得意領域 : データ活用・分析支援、ウェブ解析、ユーザーリサーチ(定性調査)、コンサルティング

Back to top button