MaxCompute連載11:MaxComputeでJobを設定する

Hi, データエンジニアの大原です。
Alibaba Cloudのデータ処理プラットフォーム「MaxCompute」についての連載記事の11回目です。

今回はDataWorksのジョブ処理を使って、MaxCompute Tableで定期的なデータ格納処理をする方法について説明します。

前回の記事はこちらをご覧ください。 www.sbcloud.co.jp

f:id:sbc_ohara:20210104091159p:plain

前書き

MaxComputeの概要をこのblogにて記載しました。

MaxCompute (旧プロダクト名 ODPS) は、大規模データウェアハウジングのためのフルマネージドかつマルチテナント形式のデータ処理プラットフォームです。さまざまなデータインポートソリューションと分散計算モデルにより、大規模データの効率的な照会、運用コストの削減、データセキュリティを実現します。

f:id:sbc_ohara:20210104133726p:plain

少し前になりますが、MaxComputeについての資料をSlideShareへアップロードしていますので、こちらも参考になればと思います。

www.slideshare.net

今回はDataWorksを使って、MaxComputeで定期的にデータを格納・処理するジョブを設定してみましょう。
* 基本モードと標準モードどちらでも良いですが、今回は標準モードとして説明しています。
* Table作成や本番環境へのコミットとか基本的なことは過去エントリにて説明していることや、今回説明する事項が多いので、基本的な操作部分を一部省略しています。


デプロイ後実施のサイクルジョブの設定

ここは5分おきにテーブルのインポート、テーブルのインサート処理を行いたいという例で実施します。

STEP1: ワークフローの作成

DataWorks WorkStdio画面から、新規でワークフローを作成します。
f:id:sbc_ohara:20210316105058p:plain

過去この記事でもワークフローについて軽く説明していますので、こちらも参考にできれば幸いです。

www.sbcloud.co.jp

STEP2: ソーステーブルを作成し、ソースデータをインポートする

ワークフローの中身(Node)は現在空白状態と思うので、ソーステーブルを作成します。

f:id:sbc_ohara:20210316105201p:plain

上にある[ DDL Statement] からSQLでソーステーブルのフィールドを作ります。下段にある[Create Field]などのボタンで手動でフィールド作成もできます。

CREATE TABLE table_demo1 (
  shop_name     string,
  customer_id   string,
  total_price   double,
  comments      string,
  sale_date     string,
  region        string
);

ソーステーブル作成後、コミットします。(標準モードのみ)
f:id:sbc_ohara:20210316105918p:plain

ソーステーブルは現在レコードらデータがない状態なので、データをインポートします。
f:id:sbc_ohara:20210316110014p:plain

インポート元のファイルのフィールドと、ソーステーブルのフィールドを合わせます。
f:id:sbc_ohara:20210316110100p:plain

display名を入力後、本番環境(production environment)へコミットします。
f:id:sbc_ohara:20210316110217p:plain

STEP3: ターゲットテーブルの作成

同じ要領で、今度はMaxComputeのメニューバーからCreate Tableでターゲットテーブルを作成します。
f:id:sbc_ohara:20210316111120p:plain

上にある[ DDL Statement] からSQLでターゲットテーブルのフィールドを作ります。下段にある[Create Field]などのボタンで手動でフィールド作成もできます。

CREATE TABLE result_demo1 (
    region  string
);

f:id:sbc_ohara:20210316111138p:plain

ターゲットテーブル作成後、コミットします。(標準モードのみ)
f:id:sbc_ohara:20210316112147p:plain

STEP4: 開始ノードの作成、SQLノードのインポート、SQLノードの挿入

ソーステーブル、ターゲットテーブルを作成したら、ジョブらワークフローを成立させるために、開始ノードやSQLノードなどを挿入します。まずは開始ノードを作成します。
開始ノード(Zero-Load Node)でノードの名前を「Start_R」としています。

f:id:sbc_ohara:20210316112245p:plain

次はテーブルのインポートを行うノードを作成します。ODPS SQLノードでノードの名前を「import_R」としています。

f:id:sbc_ohara:20210316112609p:plain

最後にテーブルのインサートを行うノードを作成します。ODPS SQLノードでノードの名前を「imsert_R」としています。

f:id:sbc_ohara:20210316112737p:plain

ここまで、ノードの設定が出来ていれば、以下の図のようになります。もしワークフローとしてリンク(図で灰色の矢印)が繋がっていない場合は新たに接続します。
f:id:sbc_ohara:20210316112933p:plain

STEP5: ノードプロパティの設定(5分ごとに繰り返し)

Start_Rノードにて、右側のPropertiesメニューバーから、ノードプロパティを設定します。以下画像のように、5分ごとに実施するように設定します。

f:id:sbc_ohara:20210316113155p:plain

続いて、import_R ノードで 以下のSQLクエリを入力します。このSQLクエリはジョブ実行時に実行されるSQL文です。

INSERT INTO table_demo1(customer_id,shop_name,region,total_price,sale_date,comments ) VALUES ('ibp16rdks1akepepb63wv' , 'jd', 'hangzhou',101, '20190111', 'test' );
INSERT INTO table_demo1(customer_id,shop_name,region,total_price,sale_date,comments ) VALUES ('ibp16rdks1akepepb61wv' , 'jd', 'hangzhou',102, '20190111', 'test' );
INSERT INTO table_demo1(customer_id,shop_name,region,total_price,sale_date,comments ) VALUES ('ibp16rdks1akepepb13wv' , 'jd', 'hangzhou',103, '20190111', 'test' );
INSERT INTO table_demo1(customer_id,shop_name,region,total_price,sale_date,comments ) VALUES ('ibp16rdks1akepepb63wv' , 'jd', 'JP',101, '20190111', 'test' );

f:id:sbc_ohara:20210316113359p:plain

同じく、import_R ノードでノードプロパティを設定します。以下画像のように、5分ごとに実施するように設定します。
f:id:sbc_ohara:20210316113602p:plain

import_R ノードのノードプロパティで下画面側に、依存関係の設定(Dependencies)がありますので、そこへ移動します。
そこで出力先として「table_demo1」を追加します。 するとOutput Nodeリスト一覧で、出力先にtable_demo1が追加されてることがわかります。
設定が終わったらコミットします。
f:id:sbc_ohara:20210316114421p:plain

続いて、Insert_R ノードへ遷移し、 以下のSQLクエリを入力します。

INSERT OVERWRITE TABLE result_demo1 
SELECT region from table_demo1 where region='hangzhou';

f:id:sbc_ohara:20210316114912p:plain

同じく、Insert_R ノードでノードプロパティを設定します。以下画像のように、5分ごとに実施するように設定します。

f:id:sbc_ohara:20210316115111p:plain

Insert_R ノードのノードプロパティで下画面側に、依存関係の設定(Dependencies)がありますので、そこへ移動します。
画像の図のようにtable_demo1を親ノードの出力を入力として設定します。
設定が終わったらコミットします。
f:id:sbc_ohara:20210316115407p:plain

STEP6: タスクの実行

ここまで問題なく設定できていれば、今度はタスクを実行します。まずはDataWorks DataStdio画面の「Deploy」バーから「Create Deploy Task」画面へ遷移します。
タスク一覧が表示されてるので、対象のタスクを選定し、「Deploy Selected」でDeployタスクをセットします。
f:id:sbc_ohara:20210316115711p:plain

STEP7: サイクルインスタンスと実行ログのチェック

タスクの実行結果として、タスクのサイクルをチェックします。 メニューバーから「Operation Center」で、「Cycle Instance」を選定します。
f:id:sbc_ohara:20210316120654p:plain

Import Nodeのタスクのサイクルログが表示されます。時間帯から、タスクが5分おきに実施されていることがわかります。
f:id:sbc_ohara:20210316121020p:plain

Import Nodeのサイクルインスタンスの詳細を確認します。右側にて、図のようなConfigがありますので、「More」をクリックします。
f:id:sbc_ohara:20210316121215p:plain

すると、Import Nodeのタスクのサイクルログの詳細が確認できます。
f:id:sbc_ohara:20210316122244p:plain

今度は、Insert Nodeのタスク結果を確認してみます。
Insert Nodeのタスクのサイクルログが表示されます。時間帯から、タスクが5分おきに実施されていることがわかります。
f:id:sbc_ohara:20210316122708p:plain

Insert Nodeのサイクルインスタンスの詳細を確認します。右側にて、図のようなConfigがありますので、「More」をクリックします。
f:id:sbc_ohara:20210316122729p:plain

Insert Nodeのタスクのサイクルログの詳細が確認できます。
f:id:sbc_ohara:20210316122756p:plain

STEP8: ターゲットテーブルをチェック

タスクらジョブは無事実行されたけど、それがターゲットテーブルにどのように反映しているか、正常性を含め確認します。DataMapから確認します。

f:id:sbc_ohara:20210316121833p:plain

これでデータがターゲットテーブルに正しく挿入されたことが確認できました。

f:id:sbc_ohara:20210316121936p:plain




条件分岐ジョブの設定仕方

ここは今日が月初めの日なら初日専用のshell、それ以外の日なら、初日以外のshellを実行したいという例で実施します。

STEP1: ワークフローの作成

DataWorks WorkStdio画面から、新規でワークフローを作成します。
f:id:sbc_ohara:20210316123247p:plain

STEP2: 割り当てノード、ブランチノード、およびシェルノードの作成

割り当てノード(Assignment Node)を作成します。「Assign_IfFirst」という名前にしています。
f:id:sbc_ohara:20210316124439p:plain

ブランチノード(Branch Node)を作成します。「Branch_judgeDownRun」という名前にしています。
f:id:sbc_ohara:20210316124459p:plain

Shellノード(Shell Node)を2つ作成します。「RunFirst」と「RunExceptFirst」という名前にしています。

f:id:sbc_ohara:20210316124603p:plain

f:id:sbc_ohara:20210316124530p:plain

ノード作成後、ワークフローで図のように矢印のコネクトを設定します。
ここの流れとしては、以下の通りになります。
1. Assign_IfFirst 割り当てノード(Assignment Node)にて、Pythonスクリプトを作成して、今日が月の最初の日かどうかをチェックします
2. Branch_judgeDownRun ブランチノード(Branch Node)にて、ノードはAssign_ifFirst割り当てノードからのパラメータを受け取ります
3. 2の結果で、今日が初日の場合はRunFirstシェルノードを実行し、今日が初日でない場合はRunExceptFirstシェルノードを実行します。

f:id:sbc_ohara:20210316124759p:plain

STEP3: 構成割り当てノードにてPythonスクリプトを作成

Assign_IfFirst こと割り当てノード(Assignment Node)にて、Pythonスクリプトを作成します。

# encoding: utf-8
from datetime import datetime as dtime
import datetime

def firstDayOfMonth(dt):
    return (dt + datetime.timedelta(days=-dt.day + 1)).replace(hour=0, minute=0, second=0, microsecond=0)

if firstDayOfMonth(dtime.today()).day == dtime.today().day:
    print(0)  #first day
else:
    print(1)  #not first day

f:id:sbc_ohara:20210316125325p:plain

Assign_IfFirst ノードにて、右側のPropertiesメニューバーから、ノードプロパティを設定します。以下画像のように、5分ごとに実施するように設定します。
設定が終わったらコミットします。
f:id:sbc_ohara:20210316125520p:plain

STEP4: ブランチノードの設定

Branch_judgeDownRun ブランチノード(Branch Node)にて、引数パラメータからの条件に応じたOutput先を設定します。

Condition:${isFirst}==0
Associated Node OutputNode:_demo.fisrt_cond.is_first
f:id:sbc_ohara:20210316131239p:plain

Condition:${isFirst}==1
Associated Node OutputNode:_demo.fisrt_cond.not_first
f:id:sbc_ohara:20210316131332p:plain

Branch_judgeDownRun ノードでノードプロパティを設定します。以下画像のように、5分ごとに実施するように設定します。
f:id:sbc_ohara:20210316131654p:plain

Branch_judgeDownRun ノードプロパティで下画面側に、依存関係の設定(Dependencies)がありますので、そこへ移動します。
そこのOutput Nodeリスト一覧で、出力先にis_first、not_first が自動で追加されてると思います。

f:id:sbc_ohara:20210316131817p:plain

Branch_judgeDownRun ノードプロパティで日付を検証するパラメータを設定します。
f:id:sbc_ohara:20210316131915p:plain

設定が終わったらコミットします。
f:id:sbc_ohara:20210316132230p:plain

STEP5: Shellノードの設定

Branch_judgeDownRun ノードの分岐処理結果をキャッチし、それぞれ処理するシェルノードを設定します。 f:id:sbc_ohara:20210316132838p:plain

まずはRunFirstシェルノードから設定します。 シェルノードを開いたら、以下コマンドを入力します。

echo "today is first day"

引き続き、RunFirstシェルノードでノードプロパティを設定します。以下画像のように、5分ごとに実施するように設定します。
f:id:sbc_ohara:20210316133054p:plain

RunFirstシェルノードプロパティで下画面側に、依存関係の設定(Dependencies)がありますので、そこへ移動します。
そこのOutput Nodeリスト一覧で、出力先にis_firstを追加します。
設定が終わったらコミットします。

f:id:sbc_ohara:20210316133233p:plain

続いて、RunExceptFirstシェルノードを設定します。 シェルノードを開いたら、以下コマンドを入力します。

echo "today is not first day"

引き続き、RunExceptFirstシェルノードでノードプロパティを設定します。以下画像のように、5分ごとに実施するように設定します。
f:id:sbc_ohara:20210316133420p:plain

RunExceptFirstシェルノードプロパティで下画面側に、依存関係の設定(Dependencies)がありますので、そこへ移動します。
そこのOutput Nodeリスト一覧で、出力先にRunExceptFirstを追加します。
設定が終わったらコミットします。
f:id:sbc_ohara:20210316133447p:plain

ここまで問題なく設定できていれば、今度はタスクを実行します。まずはDataWorks DataStdio画面の「Deploy」バーから「Create Deploy Task」画面へ遷移します。
タスク一覧が表示されてるので、対象のタスクを選定し、「Deploy Selected」でDeployタスクをセットします。
f:id:sbc_ohara:20210316115711p:plain

タスクの実行結果として、タスクのサイクルをチェックします。 メニューバーから「Operation Center」で、「Cycle Task」を選定します。
f:id:sbc_ohara:20210316133615p:plain

タスクの実行結果として、タスクのサイクルをチェックします。 メニューバーから「Operation Center」で、「Cycle Instance」を選定します。
まずはAssign_IfFirst 割り当てノードのサイクルインスタンスを確認します。

ここで図のようにタスクのサイクルログが表示されます。時間帯から、タスクが5分おきに実施されていることがわかります。
f:id:sbc_ohara:20210316133641p:plain

Assign_IfFirst 割り当てノードのサイクルインスタンスの詳細を確認します。右側にて、図のようなConfigがありますので、「More」をクリックします。
f:id:sbc_ohara:20210316133817p:plain

すると、Assign_IfFirst 割り当てノードのタスクのサイクルログの詳細が確認できます。
f:id:sbc_ohara:20210316134222p:plain

次に、Branch_judgeDownRun ブランチノードのサイクルインスタンスを確認します。
図のようにタスクのサイクルログが表示されます。時間帯から、タスクが5分おきに実施されていることがわかります。
f:id:sbc_ohara:20210316134403p:plain

Branch_judgeDownRun ブランチノードのサイクルログの詳細はこの通りになります。
f:id:sbc_ohara:20210316134435p:plain

RunFirstシェルノードのサイクルインスタンスを確認します。
図のようにタスクのサイクルログが表示されます。時間帯から、タスクが5分おきに実施されていることがわかります。
ここも図のようなConfigがありますので、「More」をクリックします。
f:id:sbc_ohara:20210316135045p:plain

RunFirstシェルノードのサイクルログの詳細はこの通りになります。
結果として、今回今日の日付が月初めではないため、RunFirstシェルノードがスキップされました。
f:id:sbc_ohara:20210316135321p:plain

RunExceptFirstシェルノードのサイクルインスタンスを確認します。
図のようにタスクのサイクルログが表示されます。時間帯から、タスクが5分おきに実施されていることがわかります。
ここも図のようなConfigがありますので、「More」をクリックします。
f:id:sbc_ohara:20210316135150p:plain

RunExceptFirstシェルノードのサイクルログの詳細はこの通りになります。
f:id:sbc_ohara:20210316135342p:plain


最後に

本記事では、DataWorksのジョブ処理を使って、MaxCompute Tableで定期的なデータ格納処理をする方法を簡単に説明しました。

最後までお読みいただきありがとうございました。

関連記事

www.sbcloud.co.jp www.sbcloud.co.jp www.sbcloud.co.jp www.sbcloud.co.jp