HOW TO TD(User Engagement)Treasure Data User Engagement

DigdagのHTTPオペレータからSalesforceのオブジェクトを取得

ホーム » DigdagのHTTPオペレータからSalesforceのオブジェクトを取得

はじめに

Treasure Data CDPには様々なマーケティングオートメーションやアナリティクスツールなどにデータを集約することができます。
本稿ではCRMとして有名である「Salesforce」のデータを投入する例です。

Treasure Data CDPでは「Salesforce」のデータコネクターは存在しており、GUIベースで設定は可能ですが本稿ではあえてDigdagのHTTPオペレータを利用いたします。
また、今回はSalesforceのrecordtypeオブジェクトが対象となります。

サンプルソース

timezone: Asia/Tokyo

_export:
  api_region: "treasuredata.co.jp"
  client_id: "SFDC_CLIENT_ID" # SFDCのCLIENT_IDを設定 
  client_secret: "SFDC_CLIENT_SECRET" # SFDCのCLIENT_SECRETを設定
  username: "SFDC_USER_NAME" # SFDCのUSER_NAMEを設定
  password: "SFDC_PASSWORD" # SFDCのPASSWORDを設定
  security_token: "SFDC_SECURITY_TOKEN" # SFDCのSECURITY_TOKENを設定
  td:
    database: sandbox

+get_token_process:
     # HTTPオペレータにてauthのREST APIを叩く。尚、POSTとして取得
      http>: https://test.salesforce.com/services/oauth2/token?grant_type=password&client_id=${client_id}&client_secret=${client_secret}&username=${username}&password=${password}${security_token}
      headers:
        - content-type: "application/x-www-form-urlencoded"
      method: POST
      store_content: true  

# SFDCのレコードタイプオブジェクトをSOQLを用いてREST APIを利用して取得  
+get_process:
      http>: ${JSON.parse(http.last_content).instance_url}/services/data/v48.0/queryAll?q=SELECT+Id,Name,DeveloperName,NamespacePrefix,Description,BusinessProcessId,SobjectType,IsActive,CreatedById,CreatedDate,LastModifiedById,LastModifiedDate,SystemModstamp+FROM+${table}
      headers:
        - Content-Type: 'application/json'

  # 「access_token」は「JSON」をパースさせる
        - Authorization: ${'Bearer ' + JSON.parse(http.last_content).access_token}
        - X-PrettyPrint: 1
      method: GET
      store_content: true
      
+emp_d_table:
      td_ddl>: 
      empty_tables: [tmp2_recordtype]

# 「SOQL」の結果を一時テーブルにJSONデータとして格納
+insert_d:
      td>:
        data: select '${JSON.parse(http.last_content).records}' as d;
      insert_into: tmp2_recordtype
      engine: presto
      
+emp_create_table:
      td_ddl>: 
      empty_tables: [recordtype]
      
+insert_table:
      td>:
      query: |
        WITH dataset AS (
          select d as blob from tmp2_recordtype
        )  
        , t0 AS
        (
          SELECT

   # 一時保存した「JSON」データをパースさせる
            CAST(JSON_EXTRACT(blob,'$')AS ARRAY<JSON>) AS hits
          FROM
            dataset
        )
        SELECT
   
  # それぞれのカラムを「JSON_EXTRACT_SCALAR」を用いて抽出
        JSON_EXTRACT_SCALAR(hit,'$.Id') as id
        ,'"{""url"":""' || JSON_EXTRACT_SCALAR(hit,'$.attributes.url') || '"",""type"":""' || JSON_EXTRACT_SCALAR(hit,'$.attributes.type') ||'""}"' as attributes
        ,JSON_EXTRACT_SCALAR(hit,'$.Name') as name
        ,JSON_EXTRACT_SCALAR(hit,'$.DeveloperName') as developername
        ,JSON_EXTRACT_SCALAR(hit,'$.NamespacePrefix') as namespaceprefix
        ,JSON_EXTRACT_SCALAR(hit,'$.Description') as description
        ,JSON_EXTRACT_SCALAR(hit,'$.BusinessProcessId') as businessprocessid
        ,JSON_EXTRACT_SCALAR(hit,'$.SobjectType') as sobjecttype
        ,JSON_EXTRACT_SCALAR(hit,'$.IsActive') as isactive
        ,JSON_EXTRACT_SCALAR(hit,'$.CreatedById') as createdbyid
        ,JSON_EXTRACT_SCALAR(hit,'$.CreatedDate') as createddate
        ,JSON_EXTRACT_SCALAR(hit,'$.LastModifiedById') as lastmodifiedbyid
        ,JSON_EXTRACT_SCALAR(hit,'$.LastModifiedDate') as lastmodifieddate
        ,JSON_EXTRACT_SCALAR(hit,'$.SystemModstamp') as systemmodstamp
        ,TD_TIME_PARSE(JSON_EXTRACT_SCALAR(hit,'$.LastModifiedDate'), 'JST') as time
        FROM
          t0
        CROSS JOIN UNNEST(hits) AS t(hit)
      insert_into: recordtype
      engine: presto

最後に

DigdagのHTTPオペレータを使用してSalesforceのrecortypeオブジェクトを取得するサンプルを記載させていただきました。

Treasure Data CDPの柔軟な操作方法がご理解いただけたかと思います。ご参照いただき業務にてご活用いただければと思います。

UserEngagement事務局

ユーザーの皆さまへのお知らせや、Treasure Data UserEngagementのサイト運営を担当する事務局アカウント。
Back to top button