SBクラウド株式会社logo

MaxCompute連載15:TableStoreからMaxComputeへ連携する方法

Hi, データエンジニアの大原です。
Alibaba Cloudのデータ処理プラットフォーム「MaxCompute」についての連載記事の15回目です。

今回はTableStoreからMaxComputeへ連携する方法について説明します。

前回の記事はこちらをご覧ください。 www.sbcloud.co.jp

f:id:sbc_ohara:20210104091159p:plain

前書き

MaxComputeの概要をこのblogにて記載しました。

MaxCompute (旧プロダクト名 ODPS) は、大規模データウェアハウジングのためのフルマネージドかつマルチテナント形式のデータ処理プラットフォームです。さまざまなデータインポートソリューションと分散計算モデルにより、大規模データの効率的な照会、運用コストの削減、データセキュリティを実現します。

f:id:sbc_ohara:20210104133726p:plain

少し前になりますが、MaxComputeについての資料をSlideShareへアップロードしていますので、こちらも参考になればと思います。

www.slideshare.net

今回はAlibaba Cloud TableStore からMaxComputeへデータを連携します(フルデータ同期、増分データ同期、列/行モード)。構成図で、こんな感じです。

f:id:sbc_ohara:20210312012000p:plain



共通作業(MaxCompute全体で共通事項)

RAM ユーザー作成&権限付与

もしMaxComputeを操作するユーザがRAMユーザーの場合は以下を実施してください。

RAMより対象のユーザーを選定します。ユーザーが無い場合は新規作成します。 このときにAccessKey IDとAccessKey Secretをメモとして残してください。AccessKey IDとAccessKey SecretはDataWorks DataIntegrationの処理に必要となります。

f:id:sbc_ohara:20210305025953p:plain

対象のユーザーには権限ロールとしてAliyunDataWorksFullAccessをアタッチします。 これはDataWorksを操作するためのFull権限です。
DataWorks側にてユーザーごとに読み取り専用や一部プロジェクト・テーブルなどのきめ細かい権限付与ができますが、ここでは割愛します。

f:id:sbc_ohara:20210305025809p:plain

Workspace作成

MaxComputeを操作するためにはワークスペースおよびプロジェクトが必要なので新たに作成します。DataWorksコンソールから 「Create Project」を選択し、起動します。
Modeは「Basic Mode(基本モード)」「と「Standard Mode(標準モード)」の2種類があります。ここは「Basic Mode(基本モード)」として選定します。
基本モードと標準モードは過去記事にて説明しています。

f:id:sbc_ohara:20210304212211p:plain

続けて、MaxCompute を選定します。料金は初めて操作するなら Pay-As-You-Go(使った分だけ課金) が良いと思います。

f:id:sbc_ohara:20210304220348p:plain

MaxComputeに関する必要な情報を設定し、Workspaceを作成します。

f:id:sbc_ohara:20210304220459p:plain

同期タスクをサブミットする方法(基本モード(basic mode))

作業の途中で 同期タスクをサブミットする旨のアクションが発生しますが、こちらの手順を参考にいただければ幸いです。    

DataWorks DataStdioで、操作が終わったら [Commit to Production Environment] をクリックし開発環境から本番環境へ直接コミットします。

f:id:sbc_ohara:20210311222112p:plain

同期タスクをサブミットする方法(標準モード(standard mode))

DataWorks DataStdioの右側にあるProperitiesをクリックします。

f:id:sbc_ohara:20210311221338p:plain

プロパティRerunを設定して、[Use Root Node]ボタンをクリックします。

f:id:sbc_ohara:20210311221142p:plain

開発環境にサブミットします。

f:id:sbc_ohara:20210311221120p:plain

あとは開発環境から本番環境にデプロイします。
具体的な方法は【MaxCompute連載】DataWorksにおける基本モードと標準モード、開発環境と本番環境について をご参照ください。

www.sbcloud.co.jp

Maxcomputeのデータをクエリする方法

DataWorks DataStdioのAd-Hoc Query画面に入って、[ODPS SQL]のノードを作成します。

f:id:sbc_ohara:20210311222431p:plain

SQLクエリ文を作成したら、上書き保存してから、Run SQLボタンを押します。
その後、SQLクエリの実行コストらお金が出ますが、ここも考慮のうえ、Run、で実行します。
実行結果としてレコード数が無事表示されます。

f:id:sbc_ohara:20210311222939p:plain



(事前準備)TableStore の準備

TableStoreのインスタンスを作成します。

f:id:sbc_ohara:20210312012317p:plain

TableStoreのテーブルを作成します。
f:id:sbc_ohara:20210312012353p:plain

Python SDKでTableStoreにデータを作成します。

from tablestore import *
import time
MachineIpList = ["7552_10.10.10.2", "7552_10.10.10.2", "8d9c_10.10.10.3", "8d9c_10.10.10.3", "e5a3_10.10.10.1"]
MetricsList = [{"cpu": "1", "net_in": "10.0"}, {"cpu": "2", "net_in": "11.0"}, {"cpu": "3", "net_in": "12.0"},{"cpu": "4", "net_in": "13.0"}, {"cpu": "5", "net_in": "14.0"}]

def batch_write_row(client):
    # batch put 10 rows and update 10 rows on exist table, delete 10 rows on a not-exist table.
    put_row_items = []
    all_primary_key = []
    for i in range(0, 5):
        now = int(time.time_ns())
        primary_key = [('MachineIp', MachineIpList[i]), ('Timestamp', now)]
        attribute_columns = [('Metrics', str(MetricsList[i]))]
        row = Row(primary_key, attribute_columns)
        condition = Condition(RowExistenceExpectation.IGNORE)
        item = PutRowItem(row, condition)
        put_row_items.append(item)
        all_primary_key.append((now, MachineIpList[i]))
        time.sleep(1)
                                                                                                       
    request = BatchWriteRowRequest()
    request.add(TableInBatchWriteRowItem(table_name, put_row_items))
    result = client.batch_write_row(request)
    print('Result status: %s' % (result.is_all_succeed()))
    print('check table\'s put results:')
    succ, fail = result.get_put()
    for item in succ:
        print('Put succeed, consume %s write cu.' % item.consumed.write)
    for item in fail:
        print('Put failed, error code: %s, error message: %s' % (item.error_code, item.error_message))
    return all_primary_key
                                                                                                            
if __name__ == '__main__':
    OTS_ENDPOINT = 'https://dw02.ap-northeast-1.ots.aliyuncs.com'
    OTS_ID = 'LTA**************RKwQ' #実際のIDを入力
    OTS_SECRET = 'eB6K54**********fVU' #実際のSecretを入力
    OTS_INSTANCE = 'dw01'
    table_name = 'Monitor'
    client = OTSClient(OTS_ENDPOINT, OTS_ID, OTS_SECRET, OTS_INSTANCE)
    time.sleep(3)  # wait for table ready
    primary_key_list = batch_write_row(client)
    print('push data')

上記Python SDKで作成したデータを確認します。
f:id:sbc_ohara:20210312013627p:plain

今度はDataWorks側での作業に移ります。
TableStore をDataWorks DataIntegrationデータソースに追加します。

f:id:sbc_ohara:20210311230553p:plain

データソース追加をクリックして、TableStore を選択します。

f:id:sbc_ohara:20210312013814p:plain

データソースとしてTableStore の情報を入力し、接続テストを実行します。
f:id:sbc_ohara:20210312013920p:plain

接続テストで問題なければ、Completeボタンをクリックすることで、TableStore のデータソースが追加されます。
これでTableStore 側の設定は完了です。次はMaxCompute Tableの準備を進めます。

f:id:sbc_ohara:20210312013953p:plain

(事前準備)MaxCompute Tableの準備

DataWorks DataIntegrationから、新規オフライン同期タスクをクリックし、DataStdio画面へ遷移します。
DataStdio画面にて、「Create Node」らダイアログが表示されますが、ここではクローズします。

f:id:sbc_ohara:20210311231235p:plain

Workspace Tables画面に入って、テーブルを作成します。

f:id:sbc_ohara:20210312014108p:plain

DDL Statementボタンをクリックして、OTS Tableに対応するDDL Statementを入力します。

CREATE TABLE IF NOT EXISTS ots_to_odps (
    `MachineIp` string  COMMENT '',
    `ots_timestamp` BIGINT COMMENT '',
    `Metrics` string COMMENT  ''
);

f:id:sbc_ohara:20210312014119p:plain

Display Nameを入力し、テーブルをコミットします。その後はテーブルが作成されてることが確認できます。
※標準モードプロジェクトの場合は本番環境にもコミットする必要があります。

f:id:sbc_ohara:20210312014228p:plain

この準備が終わり次第、データを移行してみます。データ移行にはGUIモードとスクリプトモードの2つのパターンがあります。まずはGUIモードで移行します。
スクリプトモードはtemplateな扱いができるため、後日この作業の自動化したい場合、活用できればと思います。

TableStore - MaxCompute の同期(フルデータ、スクリプトモード)

STEP1: workflow作成

DataWorks DataIntegrationから、新規オフライン同期タスクをクリックし、DataStdio画面へ遷移します。
DataStdio画面にて、「Create Node」らダイアログが表示されますが、ここではクローズします。

f:id:sbc_ohara:20210311231235p:plain

DataStdio画面にてWorkflowを作成します。

f:id:sbc_ohara:20210312014502p:plain

STEP2: DI 同期タスクを作成

同期タスクを作成します。
f:id:sbc_ohara:20210312014526p:plain

ソースをOTS、ターゲットをODPSに選択します。
f:id:sbc_ohara:20210312014605p:plain

ターゲット・MaxComputeテーブルを選定します。
f:id:sbc_ohara:20210312014615p:plain

Switch to Code Editorボタンをクリックし、スクリプトモードにスイッチします。
f:id:sbc_ohara:20210312014705p:plain

スクリプトが表示されますが、スクリプトを編集します。

{
    "type": "job",
    "steps": [
        {
            "stepType": "ots",
            "parameter": {
                "datasource": "ots_first",
                "column": [
                    {
                        "name": "MachineIp"
                    },
                    {
                        "name": "Timestamp"
                    },
                    {
                        "name": "Metrics"
                    }
                ],
                "range": {
                    "split": [
                        {
                            "type": "INF_MIN"
                        },
                        {
                            "type": "STRING",
                            "value": "7552_10.10.10.2"
                        },
                        {
                            "type": "STRING",
                            "value": "8d9c_10.10.10.3"
                        },
                        {
                            "type": "STRING",
                            "value": "e5a3_10.10.10.1"
                        },
                        {
                            "type": "INF_MAX"
                        }
                    ],
                    "end": [
                        {
                            "type": "INF_MAX"
                        },
                        {
                            "type": "INF_MAX"
                        }
                    ],
                    "begin": [
                        {
                            "type": "INF_MIN"
                        },
                        {
                            "type": "INF_MIN"
                        }
                    ]
                },
                "table": "Monitor"
            },
            "name": "Reader",
            "category": "reader"
        },
       {
            "stepType": "odps",
            "parameter": {
                "partition": "",
                "truncate": true,
                "datasource": "odps_first",
                "column": [
                    "*"
                ],
                "emptyAsNull": false,
                "table": "ots_to_odps"
            },
            "name": "Writer",
            "category": "writer"
        }
    ],
    "version": "2.0",
    "order": {
        "hops": [
            {
                "from": "Reader",
                "to": "Writer"
            }
        ]
    },
    "setting": {
        "errorLimit": {
            "record": ""
        },
        "speed": {
            "concurrent": 2,
            "throttle": false
        }
    }
}

f:id:sbc_ohara:20210312014723p:plain

STEP3: DI 同期タスクを実行

タスクを実行します。
f:id:sbc_ohara:20210312014856p:plain

タスクが成功すると、Logが表示されます。
f:id:sbc_ohara:20210312014927p:plain

あとはAd-Hoc クエリで確認します(手順は上記の共通手順にて記載しています)

select * from nelly01_dev.ots_to_odps;

f:id:sbc_ohara:20210312014938p:plain

これにより、TableStore - MaxCompute の同期(フルデータ、スクリプトモード)が確認できました。

TableStoreに増分データを用意

今度はTableStoreで増分データ分だけ同期する方法を試します。準備としてTableStore側で増分データを用意します。
まずはTableStoreのStream機能を有効にします。
f:id:sbc_ohara:20210312015153p:plain

Python SDKでTableStoreにデータを追加で書き込みします。

from tablestore import *
import time
MachineIpList = ["7552_10.10.10.2", "7552_10.10.10.2", "8d9c_10.10.10.3", "8d9c_10.10.10.3", "e5a3_10.10.10.1"]
MetricsList = [{"cpu": "1", "net_in": "10.0"}, {"cpu": "2", "net_in": "11.0"}, {"cpu": "3", "net_in": "12.0"},{"cpu": "4", "net_in": "13.0"}, {"cpu": "5", "net_in": "14.0"}]

def batch_write_row(client):
    # batch put 10 rows and update 10 rows on exist table, delete 10 rows on a not-exist table.
    put_row_items = []
    all_primary_key = []
    for i in range(0, 5):
        now = int(time.time_ns())
        primary_key = [('MachineIp', MachineIpList[i]), ('Timestamp', now)]
        attribute_columns = [('Metrics', str(MetricsList[i]))]
        row = Row(primary_key, attribute_columns)
        condition = Condition(RowExistenceExpectation.IGNORE)
        item = PutRowItem(row, condition)
        put_row_items.append(item)
        all_primary_key.append((now, MachineIpList[i]))
        time.sleep(1)
                                                                                                       
    request = BatchWriteRowRequest()
    request.add(TableInBatchWriteRowItem(table_name, put_row_items))
    result = client.batch_write_row(request)
    print('Result status: %s' % (result.is_all_succeed()))
    print('check table\'s put results:')
    succ, fail = result.get_put()
    for item in succ:
        print('Put succeed, consume %s write cu.' % item.consumed.write)
    for item in fail:
        print('Put failed, error code: %s, error message: %s' % (item.error_code, item.error_message))
    return all_primary_key
                                                                                                            
if __name__ == '__main__':
    OTS_ENDPOINT = 'https://dw02.ap-northeast-1.ots.aliyuncs.com'
    OTS_ID = 'LTA**************RKwQ' #実際のIDを入力
    OTS_SECRET = 'eB6K54**********fVU' #実際のSecretを入力
    OTS_INSTANCE = 'dw01'
    table_name = 'Monitor'
    client = OTSClient(OTS_ENDPOINT, OTS_ID, OTS_SECRET, OTS_INSTANCE)
    time.sleep(3)  # wait for table ready
    primary_key_list = batch_write_row(client)
    print('push data')

上記Python SDKで作成したデータを確認します。
f:id:sbc_ohara:20210312015348p:plain

この時点で、先ほどMaxComputeに格納したデータには含まれていないデータがあることがわかります。これを使って、増分データの同期を試してみます。

TableStoreをMaxCompute Tableへ同期(増分データ、GUIモード、列モード)

STEP1: DI 同期タスクを作成

同期タスクを作成します。
f:id:sbc_ohara:20210312015458p:plain

ソースをOTS Stream、テーブルを選択します。
f:id:sbc_ohara:20210312015547p:plain

ターゲットをODPS、テーブル作成ボタンをクリックします。
f:id:sbc_ohara:20210312015715p:plain

テーブル名とカラムのタイプを編集して、MaxComputeテーブルを作成します。
f:id:sbc_ohara:20210312015733p:plain

STEP2: DI 同期タスクを実行

タスクを実行します。
引数Argumentsがありますが、適当なstartTimeとendTime、bizdateを入力します。
f:id:sbc_ohara:20210312015806p:plain

タスクが成功すると、Logが表示されます。
f:id:sbc_ohara:20210312015920p:plain

あとはAd-Hoc クエリで確認します(手順は上記の共通手順にて記載しています)

select * from nelly01_dev.monitor_add_one where ds=‘20200913’;

f:id:sbc_ohara:20210312015950p:plain

これにより、TableStoreをMaxCompute Tableへ同期(増分データ、GUIモード、列モード)が確認できました。

TableStoreをMaxCompute Tableへ同期(増分データ、スクリプトモード、列モード)

STEP1: DI 同期タスクを作成

同期タスクを作成します。
f:id:sbc_ohara:20210312015458p:plain

ソースをOTS Stream、テーブルを選択します。
f:id:sbc_ohara:20210312015547p:plain

ターゲットをODPS、テーブル作成ボタンをクリックします。
f:id:sbc_ohara:20210312015715p:plain

テーブル名とカラムのタイプを編集して、MaxComputeテーブルを作成します。
f:id:sbc_ohara:20210312015733p:plain

STEP2: スクリプトモードにスイッチ

Switch to Code Editorボタンをクリックし、スクリプトモードにスイッチします。
f:id:sbc_ohara:20210312020150p:plain

するとスクリプトが表示されます。これは先述、GUIモードで選択した設定が自動でスクリプトに反映されます。
f:id:sbc_ohara:20210312020201p:plain

STEP3: DI 同期タスクを実行

タスクを実行します。
引数Argumentsがありますが、適当なstartTimeとendTime、bizdateを入力します。
f:id:sbc_ohara:20210312020230p:plain

タスクが成功すると、Logが表示されます。
f:id:sbc_ohara:20210312020241p:plain

あとはAd-Hoc クエリで確認します(手順は上記の共通手順にて記載しています)

select * from nelly01_dev.monitor_add_one where ds=‘20200913’;

f:id:sbc_ohara:20210312015950p:plain

これにより、TableStoreをMaxCompute Tableへ同期(増分データ、スクリプトモード、列モード) が確認できました。

TableStoreをMaxCompute Tableへ同期(増分データ、スクリプトモード、行モード)

最後に、行モードで増分データの同期を試します。

STEP1: DI 同期タスクを作成

同期タスクを作成します。
f:id:sbc_ohara:20210312015458p:plain

ソースをOTS Stream、テーブルを選択します。
f:id:sbc_ohara:20210312015547p:plain

ターゲットをODPS、テーブル作成ボタンをクリックします。
f:id:sbc_ohara:20210312015715p:plain

テーブル名とカラムのタイプを編集して、MaxComputeテーブルを作成します。
f:id:sbc_ohara:20210312015733p:plain

STEP2: スクリプトモードにスイッチ

Switch to Code Editorボタンをクリックし、スクリプトモードにスイッチします。
f:id:sbc_ohara:20210312020450p:plain

スクリプトが表示されますが、スクリプトを編集します。
modeとcolumnを編集します。

 "parameter": {
                "mode": "single_version_and_update_only",
                "statusTable": "TableStoreStreamReaderStatusTable",
                "maxRetries": 30,
                "isExportSequenceInfo": false,
                "datasource": "ots_first",
                "column": [
                    {
                        "name": "MachineIp"
                    },
                    {
                        "name": "Timestamp"
                    },
                    {
                        "name": "Metrics"
                    }
                ],
                "startTimeString": "${startTime}",
                "table": "Monitor",
                "endTimeString": "${endTime}"
            }

f:id:sbc_ohara:20210312020606p:plain

STEP3: DI 同期タスクを実行

タスクを実行します。
引数Argumentsがありますが、適当なstartTimeとendTime、bizdateを入力します。
f:id:sbc_ohara:20210312020622p:plain

タスクが成功すると、Logが表示されます。
f:id:sbc_ohara:20210312020657p:plain

あとはAd-Hoc クエリで確認します(手順は上記の共通手順にて記載しています)

select * from nelly01_dev.ots_to_odps;

f:id:sbc_ohara:20210312020720p:plain

これにより、TableStoreをMaxCompute Tableへ同期(増分データ、スクリプトモード、行モード) が確認できました。


最後に

本記事では、TableStoreからMaxComputeへ連携する方法を簡単に説明しました。
TableStoreでデータ量が肥大化した場合は、この方法でMaxComputeへデータ移植、コスト削減およびDWHとしての運用ができれば幸いです。

最後までお読みいただきありがとうございました。

MaxCompute連載シリーズ

www.sbcloud.co.jp www.sbcloud.co.jp www.sbcloud.co.jp www.sbcloud.co.jp