SBクラウド株式会社logo

MaxCompute連載6:CSVファイルをMaxComputeの内部テーブルに格納する

Hi, データエンジニアの大原です。
Alibaba Cloudのデータ処理プラットフォーム「MaxCompute」についての連載記事の6回目です。

今回はMaxComputeでCSVファイルを格納、SQL処理する方法を説明します。

前回の記事はこちらをご覧ください。

www.sbcloud.co.jp

f:id:sbc_ohara:20210104091159p:plain

前書き

MaxComputeの概要をこのblogにて記載しました。

MaxCompute (旧プロダクト名 ODPS) は、大規模データウェアハウジングのためのフルマネージドかつマルチテナント形式のデータ処理プラットフォームです。さまざまなデータインポートソリューションと分散計算モデルにより、大規模データの効率的な照会、運用コストの削減、データセキュリティを実現します。

f:id:sbc_ohara:20210104133726p:plain

少し前になりますが、MaxComputeについての資料をSlideShareへアップロードしていますので、こちらも参考になればと思います。

www.slideshare.net

今回はAlibaba Cloud MaxComputeでCSVファイルを格納、SQL処理してみましょう。構成図で、こんな感じです。

f:id:sbc_ohara:20210305024831p:plain

今回はMovielensというオープンデータを使いました。
OSSはbucket名bigdata-dwh、ディレクトリ名(Object Name Prefix)はCSV/、CSV配下に以下のCSVファイルを格納しています。

f:id:sbc_ohara:20210305024536p:plain


共通作業(MaxCompute全体で共通事項)

RAM ユーザー作成&権限付与

もしMaxComputeを操作するユーザがRAMユーザーの場合は以下を実施してください。

RAMより対象のユーザーを選定します。ユーザーが無い場合は新規作成します。 このときにAccessKey IDとAccessKey Secretをメモとして残してください。AccessKey IDとAccessKey SecretはDataWorks DataIntegrationの処理に必要となります。

f:id:sbc_ohara:20210305025953p:plain

対象のユーザーには権限ロールとしてAliyunDataWorksFullAccessをアタッチします。 これはDataWorksを操作するためのFull権限です。
DataWorks側にてユーザーごとに読み取り専用や一部プロジェクト・テーブルなどのきめ細かい権限付与ができますが、ここでは割愛します。

f:id:sbc_ohara:20210305025809p:plain

Workspace作成

MaxComputeを操作するためにはワークスペースおよびプロジェクトが必要なので新たに作成します。DataWorksコンソールから 「Create Project」を選択し、起動します。
Modeは「Basic Mode(基本モード)」「と「Standard Mode(標準モード)」の2種類があります。ここは「Basic Mode(基本モード)」として選定します。
基本モードと標準モードは過去記事にて説明しています。

f:id:sbc_ohara:20210304212211p:plain

続けて、MaxCompute を選定します。料金は初めて操作するなら Pay-As-You-Go(使った分だけ課金) が良いと思います。

f:id:sbc_ohara:20210304220348p:plain

MaxComputeに関する必要な情報を設定し、Workspaceを作成します。

f:id:sbc_ohara:20210304220459p:plain


CSVファイルを MaxCompute Tableへ格納

ここのチュートリアルは、CSVファイルをMaxComputeの内部テーブルへ格納する方法です。  

STEP1:データ格納

Workspace、Project作成直後は何もない状態と思います。
なので、まずはDataWorks のHomepageへ移動し、データを格納する準備を進めます。

f:id:sbc_ohara:20210304220751p:plain

DataWorksのコンソール画面です。まずは「Data Integration」をクリックします。

f:id:sbc_ohara:20210304221122p:plain

DataWorks DataIntegration画面に遷移します。今回、OSSにCSVファイルがあるため、それをソースとして設定するためにData Stores をクリックします。

f:id:sbc_ohara:20210304222952p:plain

「New Data Source」をクリックし、データソースを選定します。

f:id:sbc_ohara:20210304223231p:plain

今回はOSSがデータソースなので、OSSを選定します。

f:id:sbc_ohara:20210304223432p:plain

ここで、OSSに関する必要な情報入力画面が出ます。
End Pointはここから選定できます。今回は日本リージョンなので、https://oss-ap-northeast-1.aliyuncs.comを入力します。リージョン情報はここから選定できます。
www.alibabacloud.com

Bucket情報を入力します。OSS接続先が目的なのでBucket名だけで問題ないです。
AccessKey IDとAccessKey Secretを入力します。
色々入力が終わったら「Test connectivity」をクリックして、接続が出来ていることを確認します。接続が出来ていれば、Completeをクリックし、接続設定ウィザードを完了します。

f:id:sbc_ohara:20210304224810p:plain

その後はData Source にて、OSSが無事登録できていることを確認できたらOKです。

f:id:sbc_ohara:20210304231819p:plain

次はDataStdio画面へ遷移します。

f:id:sbc_ohara:20210304231934p:plain

DataStdio画面です。最初は何もない状態です。そこからプラレールのように色々構築することが出来ます。
f:id:sbc_ohara:20210304232016p:plain

STEP2:CSVファイルを認識し、MaxCompute Tableに格納

メニューバーからworkflowを作成します。

f:id:sbc_ohara:20210304232146p:plain

workflow名を入力し、「Create」ボタンで作成します。ここは「csv2maxcompute」というタイトルで登録します。

f:id:sbc_ohara:20210304232340p:plain

workflow画面が出たら、オレンジ色の「Batch Synchronization」をDrag & Drop で移動させます。

f:id:sbc_ohara:20210304232641p:plain

Drag &Drop後、Create Node作成ウィザードが表示されます。ここで新たに作成したいNodeの名前を入力します。

f:id:sbc_ohara:20210304232823p:plain

オレンジ色のNodeで名前を登録後、右クリックで「Open Node」を選定します。

f:id:sbc_ohara:20210304233013p:plain

このような編集画面が出てきます。

f:id:sbc_ohara:20210304233150p:plain

諸元となるデータソースを設定します。connectionタブで「OSS」を選定すると、先ほどDataIntegrationで登録した「csv_sample」が表示されます。

f:id:sbc_ohara:20210304233821p:plain

Object Name Prefixにてcsv/ratings.csv を入力します。
今回のcsvファイルには1行目にフィールド名があるため、「Include Header」でYesを選定します。
csvファイル自体圧縮していないので、Compression Formatは「None」と選定します。

「Preview」をクリックし、CSVファイルの構成を確認することができます。
ここで入力情報で間違ってなかったら「OK」ボタンをクリックすることで、フィールド一覧が自動認識されます。
今はターゲットソースの情報がないので、何も表示されていませんが、コンソール内部では認識されている状態になっています。

f:id:sbc_ohara:20210304234441p:plain

今度はTarget でソース設定情報を登録します。今回はMaxComputeなので、「ODPS」を選定します。ODPSはMaxComputeの昔の名称です。

f:id:sbc_ohara:20210304234730p:plain

今回新しくMaxCompute Projectを作成したため、当然データがない状態です。 なので、SQLを使ってデータを登録します。

CREATE TABLE IF NOT EXISTS ratings (
  userid INT, 
  movieid INT,
  rating INT, 
  tstamp STRING
) 
COMMENT 'movielen ratings table'
lifecycle 36500;

COMMENT は、名前通りコメントです。不要ならスキップ(空白)しても問題ないです。
lifecycle は、名前通り、tableのライフサイクルです。この数値(日)を経過したテーブルは自動削除されます。これをうまく使って一時テーブルの削除などストレージコストを節約できればと思います。

f:id:sbc_ohara:20210304235928p:plain

SQLでテーブル作成が無事完了すると、「02 Mappings」の画面でフィールド選定が出ます。ここで諸元ソースと、ターゲットで同じフィールドであるかMappingsの設定を合わせます。

f:id:sbc_ohara:20210305000113p:plain

これでデータ同期の設定完了です。メニューバーのところにある上書き保存のアイコンをクリックして上書き保存します。

f:id:sbc_ohara:20210305000336p:plain

参考として、ここにある「03 Channel」で数点補足します。
Expected Maximum Concurrency・・・最大同時実行数。ソースからターゲットへデータを転送するスレッド最大数です。 Bandwidth Throttling・・・帯域数。諸元データの容量次第では帯域を増やしたほうがベストです。 f:id:sbc_ohara:20210305000447p:plain

STEP3:実行

先ほどの上書き保存のアイコンの隣にある、実行ボタンをクリックします。以降、データ同期処理としてRuntiume Log画面で 長いLogが出ますが、このタスクが完了するのを待ちます。

f:id:sbc_ohara:20210305001632p:plain

ここでResource Groupについて説明します。
Resource GroupはDataWorksで今回のようなタスク処理に必要なリソースを選定することが出来ます。
Resource は共有リソースグループ(shared resource groups)、専用リソースグループ(exclusive resource groups)、カスタムリソースグループ(custom resource groups)の3種類があります。共有リソースグループは無料枠ですが、同リージョン配下で他のアカウント:DataWorksを使っているユーザリソースを含め処理されるため、少ないタスク・あるいは少数のノードが実行されるシナリオに向いています。処理を最大化したい場合は、有料となる専用リソースグループか、カスタムリソースグループを選定すると良いです。カスタムリソースグループはData IntegrationとShell Nodeのみに特化したリソースグループです。

www.alibabacloud.com

STEP4:完了

先ほどのworkflowのTabをクリックし、画面を切り替えます。 workflowで、オレンジ色のDI(DataIntegration)で先ほど設定完了したので、今度は緑色のODPS SQL をDrag &DropでNodeを作成します。

f:id:sbc_ohara:20210305010615p:plain

「Run_SQL」という名前にします。
f:id:sbc_ohara:20210305010754p:plain

緑色のところを右クリックで、Open Nodeを選定し、SQLクエリ編集画面を開きます。

f:id:sbc_ohara:20210305010829p:plain

レコード件数を集計するSQLクエリを作成し、上書き保存したら、実行ボタンをクリックしてSQLクエリを実行します。

SELECT Count(*) as record from ratings;

f:id:sbc_ohara:20210305011127p:plain

SQLクエリの実行コストらお金が出ます。(PAYGのみ)

f:id:sbc_ohara:20210305011233p:plain

実行結果としてレコード数が無事表示されました。これで以上です。

f:id:sbc_ohara:20210305011349p:plain

補足

上記 DataIntegratinとしての「実行」およびSQLクエリとしての「実行」。これらを連続で実行処理するのが面倒な場合は、workflowを使います。
workflow画面にて、Drag &Dropで処理順ごとにラインを作成します。

f:id:sbc_ohara:20210305012315p:plain

※注意として、MaxComputeは「DELETE」をサポートしません。なので、全てのレコードを削除したい場合は、DELETEの代わりにTRUNCATE TABLEを入れて全てのレコードを削除します。

TRUNCATE TABLE ratings;

www.alibabacloud.com

ワークフロー、処理の順番が完成したら、Runボタンをクリックして実行します。

f:id:sbc_ohara:20210305012413p:plain

処理の過程はNodeを右クリック → View Logを選定するとLogが見れます。

f:id:sbc_ohara:20210305012544p:plain

今度はRun_SQL Node を右クリックし、View Logを選定すると、SQLクエリ結果が出力されます。

f:id:sbc_ohara:20210305013453p:plain

このようにして、DataIntegrationやSQL、Shell、Python、Job、レポート作成などなどの処理が多い場合はworkflowを使って、プラレールのように好きにワークフローを作成すると良いです。

f:id:sbc_ohara:20210305023900p:plain


最後に

本記事では、OSSにあるCSVファイルをMaxCompute Tableとして格納する方法を簡単に説明しました。
この作業はノーコード・ローコードであり非常にシンプルなので、MaxComputeに対してクイックスタートしやすいです。

最後までお読みいただきありがとうございました。

MaxCompute連載シリーズ

www.sbcloud.co.jp www.sbcloud.co.jp www.sbcloud.co.jp www.sbcloud.co.jp www.sbcloud.co.jp