SBクラウド株式会社logo

MaxCompute連載14:ElasticsearchからMaxComputeへ連携する方法

Hi, データエンジニアの大原です。
Alibaba Cloudのデータ処理プラットフォーム「MaxCompute」についての連載記事の14回目です。

今回はElasticsearch からMaxComputeへ連携する方法について説明します。

前回の記事はこちらをご覧ください。 www.sbcloud.co.jp

f:id:sbc_ohara:20210104091159p:plain

前書き

MaxComputeの概要をこのblogにて記載しました。

MaxCompute (旧プロダクト名 ODPS) は、大規模データウェアハウジングのためのフルマネージドかつマルチテナント形式のデータ処理プラットフォームです。さまざまなデータインポートソリューションと分散計算モデルにより、大規模データの効率的な照会、運用コストの削減、データセキュリティを実現します。

f:id:sbc_ohara:20210104133726p:plain

少し前になりますが、MaxComputeについての資料をSlideShareへアップロードしていますので、こちらも参考になればと思います。

www.slideshare.net

今回はMaxComputeからAlibaba Cloud Elasticsearchへデータを連携します。構成図で、こんな感じです。

f:id:sbc_ohara:20210311234150p:plain



共通作業(MaxCompute全体で共通事項)

RAM ユーザー作成&権限付与

もしMaxComputeを操作するユーザがRAMユーザーの場合は以下を実施してください。

RAMより対象のユーザーを選定します。ユーザーが無い場合は新規作成します。 このときにAccessKey IDとAccessKey Secretをメモとして残してください。AccessKey IDとAccessKey SecretはDataWorks DataIntegrationの処理に必要となります。

f:id:sbc_ohara:20210305025953p:plain

対象のユーザーには権限ロールとしてAliyunDataWorksFullAccessをアタッチします。 これはDataWorksを操作するためのFull権限です。
DataWorks側にてユーザーごとに読み取り専用や一部プロジェクト・テーブルなどのきめ細かい権限付与ができますが、ここでは割愛します。

f:id:sbc_ohara:20210305025809p:plain

今回は、RDSからMaxComputeへ連携する方法なので、AliyunRDSFullAccessもアタッチします。

Workspace作成

MaxComputeを操作するためにはワークスペースおよびプロジェクトが必要なので新たに作成します。DataWorksコンソールから 「Create Project」を選択し、起動します。
Modeは「Basic Mode(基本モード)」「と「Standard Mode(標準モード)」の2種類があります。ここは「Basic Mode(基本モード)」として選定します。
基本モードと標準モードは過去記事にて説明しています。

f:id:sbc_ohara:20210304212211p:plain

続けて、MaxCompute を選定します。料金は初めて操作するなら Pay-As-You-Go(使った分だけ課金) が良いと思います。

f:id:sbc_ohara:20210304220348p:plain

MaxComputeに関する必要な情報を設定し、Workspaceを作成します。

f:id:sbc_ohara:20210304220459p:plain

同期タスクをサブミットする方法(基本モード(basic mode))

作業の途中で 同期タスクをサブミットする旨のアクションが発生しますが、こちらの手順を参考にいただければ幸いです。    

DataWorks DataStdioで、操作が終わったら [Commit to Production Environment] をクリックし開発環境から本番環境へ直接コミットします。

f:id:sbc_ohara:20210311222112p:plain

同期タスクをサブミットする方法(標準モード(standard mode))

DataWorks DataStdioの右側にあるProperitiesをクリックします。

f:id:sbc_ohara:20210311221338p:plain

プロパティRerunを設定して、[Use Root Node]ボタンをクリックします。

f:id:sbc_ohara:20210311221142p:plain

開発環境にサブミットします。

f:id:sbc_ohara:20210311221120p:plain

あとは開発環境から本番環境にデプロイします。
具体的な方法は【MaxCompute連載】DataWorksにおける基本モードと標準モード、開発環境と本番環境について をご参照ください。

www.sbcloud.co.jp

Maxcomputeのデータをクエリする方法

DataWorks DataStdioのAd-Hoc Query画面に入って、[ODPS SQL]のノードを作成します。

f:id:sbc_ohara:20210311222431p:plain

SQLクエリ文を作成したら、上書き保存してから、Run SQLボタンを押します。
その後、SQLクエリの実行コストらお金が出ますが、ここも考慮のうえ、Run、で実行します。
実行結果としてレコード数が無事表示されます。

f:id:sbc_ohara:20210311222939p:plain



(事前準備)MaxCompute Tableの準備

DataWorks DataIntegrationから、新規オフライン同期タスクをクリックし、DataStdio画面へ遷移します。
DataStdio画面にて、「Create Node」らダイアログが表示されますが、ここではクローズします。

f:id:sbc_ohara:20210311231235p:plain

Workspace Tables画面に入って、テーブルを作成します。

f:id:sbc_ohara:20210311235320p:plain

DDL Statementボタンをクリックして、MySQL Tableに対応するDDL Statementを入力します。

CREATE TABLE IF NOT EXISTS odps_to_es (
    `create_time` string COMMENT '',
    `category` string COMMENT  '',
    `brand` string COMMENT  '',
    `buyer_id` string COMMENT  '',
    `trans_num` bigint COMMENT  '',
    `trans_amount` double COMMENT  '',
    `click_cnt` bigint COMMENT  ''
)
PARTITIONED BY (pt bigint);

f:id:sbc_ohara:20210311235336p:plain

Display Nameを入力し、テーブルをコミットします。その後はテーブルが作成されてることが確認できます。
※標準モードプロジェクトの場合は本番環境にもコミットする必要があります。

f:id:sbc_ohara:20210311235509p:plain

このテーブルはまだ何も入っていない状態なので、データを追加します。インポートボタンをクリックします。

f:id:sbc_ohara:20210311235610p:plain

パーティションが存在するかどうかを確認します。

f:id:sbc_ohara:20210311235638p:plain

txtファイルをアップロードします。

f:id:sbc_ohara:20210311235709p:plain

データのインポートが無事成功したら、Ad-Hocクエリなり、DataMapなりでレコードが確認できます。

f:id:sbc_ohara:20210311235737p:plain

これでMaxCompute側の設定は完了です。次はElasticsearchの準備を進めます。

(事前準備)Elasticsearchの準備

Elasticsearchコンソールに入って、クラスターを作成してから、詳細画面に入ります。
f:id:sbc_ohara:20210311235912p:plain

Auto Indexingを有効に設定します。
f:id:sbc_ohara:20210312000016p:plain

Auto Indexingが有効になると、このようなステータスになります。
f:id:sbc_ohara:20210312000044p:plain

Public Networksを有効にして、EndPointをメモします。
f:id:sbc_ohara:20210312000150p:plain

Public Networksのホワイトリストを変更します。
f:id:sbc_ohara:20210312000228p:plain

DataworksのデフォルトリソースグループのIPをPublic Networksアクセスのホワイトリストに追加します。
東京リジョンなら以下の通りです。

100.105.55.0/24,11.192.147.0/24,11.192.148.0/24,11.192.149.0/24,100.64.0.0/10,47.91.12.0/24,47.91.13.0/24,47.91.9.0/24,11.199.250.0/24,47.91.27.0/24,11.59.59.0/24,47.245.51.128/26,47.245.51.192/26,47.91.0.128/26,47.91.0.192/26

f:id:sbc_ohara:20210312000245p:plain

IPがホワイトリストに追加されたらOKです。
f:id:sbc_ohara:20210312000355p:plain

Kibanaのコンソールに入ります。
f:id:sbc_ohara:20210312000422p:plain

Kibanaにログインし、DevToolからIndexを作成します。

PUT odps_index?include_type_name=false
{
  "mappings": {
    "properties": { 
      "category": {
        "type": "text"
      },
      "brand": {
        "type": "text"
      },
      "buyer_id": {
        "type": "text"
      },
      "trans_num": {
        "type": "integer"
      },
      "trans_amount": {
        "type": "double"
      },
      "click_cnt": {
        "type": "integer"
      }
    }
  }
}

f:id:sbc_ohara:20210312000529p:plain

DataWorks DataIntegrationに切り替えて、、データソースにElasticsearchを追加します。
f:id:sbc_ohara:20210312000607p:plain

f:id:sbc_ohara:20210312000624p:plain

データソースとしてElasticsearchのPublic Endpoint、ユーザー名、パスワードなどの情報を入力し、接続テストを実行します。

f:id:sbc_ohara:20210312000643p:plain

接続テストで問題なければ、Completeボタンをクリックすることで、Elasticsearchのデータソースが追加されます。

f:id:sbc_ohara:20210312000801p:plain

この準備が終わり次第、データを同期してみます。データ同期にはGUIモードとスクリプトモードの2つのパターンがあります。まずはGUIモードで同期します。
スクリプトモードはtemplateな扱いができるため、後日この作業の自動化したい場合、活用できればと思います。

MaxCompute TableをElasticsearchへ移行(GUIモード)

STEP1: workflow作成

DataWorks DataIntegrationから、新規オフライン同期タスクをクリックし、DataStdio画面へ遷移します。
DataStdio画面にて、「Create Node」らダイアログが表示されますが、ここではクローズします。

f:id:sbc_ohara:20210311231235p:plain

DataStdio画面にてWorkflowを作成します。

f:id:sbc_ohara:20210312001831p:plain

STEP2: DI 同期タスクを作成

同期タスクを作成します。
f:id:sbc_ohara:20210312001916p:plain

ソースをODPSに選択して、テーブルを選択します。そのあとはPreviewボタンをクリックします。
f:id:sbc_ohara:20210312002001p:plain

Previewでデータが表示されます。
f:id:sbc_ohara:20210312002013p:plain

ターゲットをElasticsearchに選択します。
Elasticsearchの場合、IndexとIndex Typeが必須なので入力し、Advanced Settingsをクリックします。
f:id:sbc_ohara:20210312002150p:plain

Advanced Settings画面にて、Auto Mappingを有効に設定します。 (Elasticsearchがver7.xの場合は必要です)
設定後はTarget Field ボタンをクリックすることで編集します。

f:id:sbc_ohara:20210312002319p:plain

odpsのデータに対応するフィールドを入力します。

{"name":"create_time","type":“id"}
{"name": "category","type": "text"}
{"name": "brand","type": "text"}
{"name": "buyer_id","type": "text"}
{"name": "trans_num","type": "integer"}
{"name": "trans_amount","type": "double"}
{"name": "click_cnt","type": "integer"}

f:id:sbc_ohara:20210312002412p:plain

STEP3: DI 同期タスクを実行

タスクを保存して、実行します。
f:id:sbc_ohara:20210312002440p:plain

タスクが成功すると、Logが表示されます。
f:id:sbc_ohara:20210312002453p:plain

今度はElasticsearch - kibana画面に遷移し、データが届いてるかを確認します。
KibanaコンソールのDevToolより、データを検索します。

POST /odps_index/_search?pretty
{
    "query": {  "match_all": {} }
}

f:id:sbc_ohara:20210312002653p:plain

その結果、GUIモードでMaxComputeのデータをElasticsearchへ同期したことが確認できました。

f:id:sbc_ohara:20210312002704p:plain

MaxCompute TableをElasticsearchへ移行(スクリプトモード)

STEP1: workflow作成

DataWorks DataIntegrationから、新規オフライン同期タスクをクリックし、DataStdio画面へ遷移します。
DataStdio画面にて、「Create Node」らダイアログが表示されますが、ここではクローズします。

f:id:sbc_ohara:20210311231235p:plain

DataStdio画面にてWorkflowを作成します。

f:id:sbc_ohara:20210312001831p:plain

STEP2: DI 同期タスクを作成

同期タスクを作成します。
f:id:sbc_ohara:20210312001916p:plain

ソースをODPSに選択して、テーブルを選択します。そのあとはPreviewボタンをクリックします。
f:id:sbc_ohara:20210312002001p:plain

Previewでデータが表示されます。
f:id:sbc_ohara:20210312002013p:plain

ターゲットをElasticsearchに選択します。
Elasticsearchの場合、IndexとIndex Typeが必須なので入力し、Advanced Settingsをクリックします。
f:id:sbc_ohara:20210312002150p:plain

Advanced Settings画面にて、Auto Mappingを有効に設定します。 (Elasticsearchがver7.xの場合は必要です)
設定後はTarget Field ボタンをクリックすることで編集します。

f:id:sbc_ohara:20210312002319p:plain

odpsのデータに対応するフィールドを入力します。

{"name":"create_time","type":“id"}
{"name": "category","type": "text"}
{"name": "brand","type": "text"}
{"name": "buyer_id","type": "text"}
{"name": "trans_num","type": "integer"}
{"name": "trans_amount","type": "double"}
{"name": "click_cnt","type": "integer"}

f:id:sbc_ohara:20210312002412p:plain

STEP3: スクリプトモードにスイッチ

Switch to Code Editorボタンをクリックし、スクリプトモードにスイッチします。

f:id:sbc_ohara:20210312003409p:plain

するとスクリプトが表示されます。これは先述、GUIモードで選択した設定が自動でスクリプトに反映されます。
f:id:sbc_ohara:20210312003423p:plain

STEP4: DI 同期タスクを実行

スクリプト(タスク)を実行します。 スクリプト(タスク)が成功すると、タスクとしてLogが表示されます。
f:id:sbc_ohara:20210312003457p:plain

あとは上記通り、Elasticsearch - kibanaでcheck、可視化できます。


最後に

本記事では、MaxComputeからElasticsearchへ連携する方法を簡単に説明しました。
MaxComputeのデータをElasticsearch - kibanaダッシュボードでリアルタイム可視化したい場合、参考に頂ければ幸いです。

最後までお読みいただきありがとうございました。

MaxCompute連載シリーズ

www.sbcloud.co.jp www.sbcloud.co.jp www.sbcloud.co.jp www.sbcloud.co.jp