HOW TO TD(User Engagement)Treasure Data User Engagement

よく使うワークフローオペレーター集 part.1

ホーム » よく使うワークフローオペレーター集 part.1

こんにちは、データマネジメントチームの佐々木 貴雅です。

Treasure Workflowでは処理のワークフローを設定するために、.digという拡張子をつけたファイルを作成します。このdigファイルには大きく分けて、タスクとオペレーターの2種類とパラメーターをつかって記述します。

オペレーターとは、

  • 実際の処理内容を定義したもの
  • オペレーターはTreasure Workflowで定義されている中から選択する

今回は、よく使うオペレーターの中でTreasuredataオペレーターについてご紹介させていただきます。

オペレーター 概要
クエリ実行 td>: クエリを実行する
クエリ実行 td_run>: 保存クエリを実行する
クエリ実行 td_run>: クエリを実行し、結果行ごとにサブタスクをループする
データコネクター(取得) td_load>: データの一括読み込み
Database/Table操作 td_ddl>: 操作をする

td>:

test.dig
_export:
td:
database: test_db
 
+task1:
td>: queries/mst_customer.sql
create_table: mst_customer
engine: hive
 
+task2:
td>: queries/trs_sales.sql
insert_into: trs_sales
database: test_db2
engine: presto
 
+task3:
td>:
query: |
SELECT ARRAY_JOIN(ARRAY_AGG(column_name), ‘, ‘) AS cols
FROM information_schema.columns
WHERE table_schema = ‘schema_name’
AND table_name = ‘table_name’
store_last_results: true

queries/mst_customer.sql
SELECT * FROM customer

queries/trs_sales.sql
SELECT * FROM sales WHERE time = ${moment(session_date).unix()}


td> オペレーターは、Treasure Data CDPに対してHiveまたはPrestoクエリを実行します。
実行する方法としては、以下2パターンあります。

  1. クエリファイルを呼び出す。
  2. 「query: |」を使ってdigファイルに直接クエリを記述する。

また、以下のオプションを使ってクエリやクエリ結果の制御を行います。

  • create_table: [テーブル名]
  • 結果から作成するテーブルの名前。テーブルが既に存在する場合は削除します。

  • insert_into: [テーブル名]
  • 結果を追加するテーブルの名前。テーブルが存在しない場合は作成されます。

  • database: [データベース名]
  • _exportで宣言されていないデータベースを指定することができます。

  • engine: [presto or hive]
  • クエリエンジンにprestoまたはhiveを指定できます。
    engineを使わない場合は、デフォルトでprestoになります。

  • store_last_results: BOOLEAN
  • クエリ結果の最初の1行を${td.last_results}変数に格納します。
    store_last_resultsを使わない場合は、デフォルトでfalseになります。

td_run>:

test.dig
_export:
td:
database: test_db
 
+task1:
td_run>:2344127
download_file: dl_file.csv
 
+task2:
td>: test_query
store_last_results: true
 

クエリID:2344127 クエリ名:test_query
SELECT * FROM customer


td_run> オペレーターは、Treasure Data CDPに保存されているクエリを実行します。
実行する方法としては2パターンあります。

  1. クエリIDを指定して実行する。※番号が指定された場合
  2. クエリ名を指定して実行する。※番号以外が指定された場合

また、以下のオプションを使ってクエリやクエリ結果の制御を行います。

  • download_file: [ファイル名]
  • クエリ結果をローカルCSVファイルとして保存します。

  • store_last_results: BOOLEAN
  • クエリ結果の最初の1行を${td.last_results}変数に格納します。
    store_last_resultsを使わない場合は、デフォルトでfalseになります。

td_for_each>:

test.dig
_export:
td:
database: test_db
 
+task1:
td_for_each>: queries/mst_customer.sql
_do:
   +show:
       echo>: customer_name ${td.each.name} email ${td_each.email}


td_for_each> オペレーターは、Treasure Data CDPに対するHiveまたはPrestoクエリの結果行ごとにサブタスクをループします。

  • td_for_each>: file.sql
  •  クエリファイルのパスを指定します。クエリ結果を${td.each.xxxx}変数に
     格納しサブタスク内で利用することができます。

また、以下のオプションを使ってクエリやクエリ結果の制御を行います。

  • _do: タスク
  • 実行するタスクを記述します。

  • database: [データベース名]
  • _exportで宣言されていないデータベースを指定することができます。

  • engine: [presto or hive]
  • クエリエンジンにprestoまたはhiveを指定できます。
    engineを使わない場合は、デフォルトでprestoになります。

td_load>:

test.dig
_export:
td:
database: test_db
 
+task1:
td_load>: config/load_data.yml
database: test_db2
table: table_name


td_load> オペレーターは、ストレージ、データベース、またはサービスからデータをロードします。
実行できるのは自分に属するソースのみのため、他のユーザーが所有するソースを実行するには、管理者権限が必要になります。

  • td_load>: FILE.yml
  • YAMLファイルへのパスを指定します。
    以下のオプションを使ってロードしたデータの格納先を指定します。

  • database: [データベース名]
  • _exportで宣言されていないデータベースを指定することができます。

  • table: [テーブル名]
  •  データを格納するターゲットテーブルを指定します。

td_dll>:

_export:
td:
database: test_db
 
+task1:
td_ddl>:
create_tables: table_name
 
+task2:
td_ddl>:
drop_tables: table_name
 
+task3:
td_ddl>:
rename_tables: [{from: “table_name1, to: table_name2”}]
database: test_db2


td_ddl> オペレーターは、Treasure Data CDPに対して操作タスクを実行します。
以下のオプションを使ってテーブル操作を実行します。

  • create_tables: [テーブル名]
  •  テーブルが存在しない場合、新しいテーブルを作成します。
     テーブルがすでに存在する場合、処理を行いません。
      例)WFの初回実行時に、selectやdelete等を行う場合、テーブルが存在しな
        いとエラーとなるため、事前にcreate_teblesで利用するテーブル名を
        指定しておくことで手動でテーブルを事前に作成する必要がなくなります。

  • drop_tables: [テーブル名]
  •  テーブルが存在する場合はテーブルを削除します。
      例)テンポラリーテーブルを作成した際に、WFの終了前にdrop_tablesを
        行うことで不要なテーブルが削除されDB内の整理がやりやすくなります。

  • rename_tables: [{from: 変更前テーブル名, to: 変更後テーブル名}]
  •  テーブルの名前を別の名前に変更します。

    変更後テーブル名がすでに存在する場合は上書きします。
      例)前日差分と比較する場合、前日結果をrename_tablesを行い退避させて
        おくことで当日結果と前日結果を比較して差分を出すこともできます。

  • database: [データベース名]
  • _exportで宣言されていないデータベースを指定することができます。

最後に

今回は、データマネジメントチームが良く使うオペレーターの中でもTreasuredataオペレーターについてご紹介させていただきました。
次回は、digdagオペレーターをご紹介させていただきます。

佐々木 貴雅

Data Managementチーム

SIerのシステムエンジニアとしてキャリアをスタート。化学、建設業の基幹システムの企画、要件定義などの上流工程からPGなどの下流工程まで経験。前職では、社内SEとしてシステムの導入・運用を行いながら、ETLを用いて各種DBやSaaSからデータを収集・統合し業務自動化やBIを用いた可視化など幅広いDXへの取り組みに従事。2019年よりトレジャーデータに参画。データマネジメントチームの一員として運輸・鉄道サービス企業や大手自動車メーカー、建設業、サービス業へ技術的側面からTreasure Data CDPの導入・運用支援を行う。

得意領域 : データアーキテクト、Workflow、SQL、ETL、Javascript、ダッシュボード構築(Tableau)

Leave a Reply

Your email address will not be published. Required fields are marked *

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>

*

Back to top button