SBクラウド株式会社logo

MaxCompute連載10:HDFS_ParquetファイルをMaxComputeの外部テーブルとして処理する(外部テーブル・partitionつき)

Hi, データエンジニアの大原です。
Alibaba Cloudのデータ処理プラットフォーム「MaxCompute」についての連載記事の10回目です。

今回はHDFS_ParquetファイルをMaxComputeの外部テーブルとして処理する方法について説明します。

前回の記事はこちらをご覧ください。 www.sbcloud.co.jp

f:id:sbc_ohara:20210104091159p:plain

前書き

MaxComputeの概要をこのblogにて記載しました。

MaxCompute (旧プロダクト名 ODPS) は、大規模データウェアハウジングのためのフルマネージドかつマルチテナント形式のデータ処理プラットフォームです。さまざまなデータインポートソリューションと分散計算モデルにより、大規模データの効率的な照会、運用コストの削減、データセキュリティを実現します。

f:id:sbc_ohara:20210104133726p:plain

少し前になりますが、MaxComputeについての資料をSlideShareへアップロードしていますので、こちらも参考になればと思います。

www.slideshare.net

今回はAlibaba Cloud MaxComputeでHDFS_Parquet(partition付き)を外部テーブルとして格納、SQL処理してみましょう。構成図で、こんな感じです。

f:id:sbc_ohara:20210315194025p:plain

今回は、以前LogServiceでTwitterデータを収集したあと、LogSeriviceからOSS LogShipperで生成されたParquetファイルを使います。
LogServiceからOSSへLogShipper設定方法は以下の通り、コンソールを操作するだけです。

f:id:sbc_ohara:20210315194753p:plain

Ship log data from Log Service to OSS - Log consumption and shipping| Alibaba Cloud Documentation Center

Parquet-formatted data - Log consumption and shipping| Alibaba Cloud Documentation Center

www.sbcloud.co.jp

OSSフォルダはこのような感じになります。
partitionが「%Y/%m/%d/%H/%M」なので、 year、month、day、hourlyで区切りとなります。 f:id:sbc_ohara:20210315193232p:plain

Ship log data from Log Service to OSS - Log consumption and shipping| Alibaba Cloud Documentation Center

f:id:sbc_ohara:20210315195333p:plain

前置きが長くなりましたが、この状態から、MaxComputeでOSSにあるParquetデータを外部テーブルとして処理します。

共通作業(MaxCompute全体で共通事項)

RAM ユーザー作成&権限付与

もしMaxComputeを操作するユーザがRAMユーザーの場合は以下を実施してください。

RAMより対象のユーザーを選定します。ユーザーが無い場合は新規作成します。 このときにAccessKey IDとAccessKey Secretをメモとして残してください。AccessKey IDとAccessKey SecretはDataWorks DataIntegrationの処理に必要となります。

f:id:sbc_ohara:20210305025953p:plain

対象のユーザーには権限ロールとしてAliyunDataWorksFullAccessをアタッチします。 これはDataWorksを操作するためのFull権限です。
DataWorks側にてユーザーごとに読み取り専用や一部プロジェクト・テーブルなどのきめ細かい権限付与ができますが、ここでは割愛します。

f:id:sbc_ohara:20210305025809p:plain

Workspace作成

MaxComputeを操作するためにはワークスペースおよびプロジェクトが必要なので新たに作成します。DataWorksコンソールから 「Create Project」を選択し、起動します。
Modeは「Basic Mode(基本モード)」「と「Standard Mode(標準モード)」の2種類があります。ここは「Basic Mode(基本モード)」として選定します。
基本モードと標準モードは過去記事にて説明しています。

f:id:sbc_ohara:20210304212211p:plain

続けて、MaxCompute を選定します。料金は初めて操作するなら Pay-As-You-Go(使った分だけ課金) が良いと思います。

f:id:sbc_ohara:20210304220348p:plain

MaxComputeに関する必要な情報を設定し、Workspaceを作成します。

f:id:sbc_ohara:20210304220459p:plain


STEP1:データ格納ことDataIntegration、、、はSkipします。

DataIntegrationは異種データソース間でデータを同期するプロセスです。しかし今回はOSSにあるCSVファイルを外部テーブルとして利用するため、この手順はSkipとなります。

DataWorks でまずは DataStdioへ遷移します。

f:id:sbc_ohara:20210304231934p:plain

DataStdio画面です。最初は何もない状態です。そこからプラレールのように色々構築することが出来ます。
f:id:sbc_ohara:20210304232016p:plain

STEP2:JSONファイルを認識し、MaxCompute Tableに格納

メニューバーからworkflowを作成します。

f:id:sbc_ohara:20210304232146p:plain

workflow名を入力し、「Create」ボタンで作成します。ここは「csv2maxcompute」というタイトルで登録します。

f:id:sbc_ohara:20210304232340p:plain

workflow画面が出たら、メニューバーにて、緑色の「Table」から「Create Table」でテーブルを新規作成します。
「csv2maxcompute」というワークフロー画面はタブより閉じても問題ありません。

f:id:sbc_ohara:20210305031323p:plain

External_oss_table_parquet という名前のテーブルを作成します。

f:id:sbc_ohara:20210315201419p:plain

こんな感じになります。そこから外部データと連携してテーブルを作成します。

f:id:sbc_ohara:20210315201447p:plain

まずはExternal Tableをクリックします。

f:id:sbc_ohara:20210315161318p:plain

MaxCompute/DataWorksとして、今操作しているNodeでOSSのデータ操作ができるよう、Nodeに対しRAM Role権限 AliyunODPSDefaultRole を付与します。

f:id:sbc_ohara:20210305031928p:plain

先ほどのDataStdio画面に戻って、今度はフォルダパスを選定します。
OSSのパスで対象のParquetファイルのあるフォルダで、partitionが見える段階まで深掘りでクリックします。選定できたらOKを押します。
今回はこのパスoss://oss-ap-northeast-1-internal.aliyuncs.com/realtime-analytics-test-ohara/hdfs/parquet/ に設定しています。

f:id:sbc_ohara:20210315201956p:plain

RAMロールからAliyunODPSDefaultRoleのARNをコピーし、rolearnにペーストします。

f:id:sbc_ohara:20210315152214p:plain

File FormatをPARQUETに選定します。

f:id:sbc_ohara:20210315202237p:plain

今回はpartitionファイルなので、Partitioned Table をクリックします。すると、Partition Field Name一覧が表示されます。

f:id:sbc_ohara:20210315202535p:plain

今回、LogServiceのOSS Shipperとしてpartitionの構成を %Y/%m/%d/%H/%M として入力しているので、以下の構成でpartitionを設定します。

f:id:sbc_ohara:20210315224335p:plain

フィールド名を入力します。 今回は以下の構成でOSSへ格納しているので、これをベースにフィールドを作成します。

f:id:sbc_ohara:20210315222728p:plain

【Logservice連載】SDKでTwitterデータを収集するLogService - SBクラウド株式会社

parquetでフィールド構成が不明な場合は、ParquetViewerを使って確認すると良いです。インストール不要です。

github.com

f:id:sbc_ohara:20210315222939p:plain

上にある[ DDL Statement] からSQLでフィールドを作ります。下段にある[Create Field]などのボタンで手動でフィールド作成もできます。

CREATE TABLE IF NOT EXISTS External_oss_table_parquet(
  logservice_source String,
  logservice_topic String,
  logservice_time String,
  created_at String,
  created_at_utc String,
  id_str String,
  Web String,
  Android String,
  iPhone String,
  text String,
  source String,
  tweet_size String
) 

フィールド名も見れるようになりました。最後に「Time-to-Live(TTL)」のチェックを外し、Display名を入力します。今回は「links_display」と入れます。
これで設定は完了です。 [Commit to Production Environment] をクリックし、実行します。

f:id:sbc_ohara:20210315224255p:plain

Talbeを本番環境にコミットしますか?と警告メッセージがでます。ここはOKを押します。

f:id:sbc_ohara:20210305034411p:plain

STEP3:partition認識

これで無事コミットが完了したら、今度はSQLで操作、確認します。今回はAd Hoc Queryを使います。
メニューバーでAd Hoc Query →新規作成 → ODPS SQL を選定します。

f:id:sbc_ohara:20210315205821p:plain

Node Nameは「check_parquet」、Locationは「Ad-Hoc Query」で、Ad-Hoc Query画面を開きます。

f:id:sbc_ohara:20210315210018p:plain

先ほど作成したexternal_oss_table_parquetテーブルは現在partitionが無い状態だとおもうので、partitionを識別する前にテーブルプロパティで情報を確認します。

f:id:sbc_ohara:20210315210401p:plain

現在、PartitionsはNo Data、何もない状態ということがわかります。
この状態からAd-Hoc Queryを使ってPartitionを識別します。

MSCK REPAIR TABLE external_oss_table_parquet;

その結果、Log側でもpartitionが認識されています。テーブルプロパティ側でも認識することが出来たらOKです。

f:id:sbc_ohara:20210315211840p:plain

STEP4:完了

あとはpartitionを指定しながらSQLクエリを回すだけです。

f:id:sbc_ohara:20210315231911p:plain


参考

Partition で次のようなケース sale_date=20200921/region=Japan の場合、以下の通りに設定することができます。
f:id:sbc_ohara:20210315232236p:plain

DataStdio でpartitionは以下設定し、コミットします。
f:id:sbc_ohara:20210315232349p:plain

上記と同様、partitionを認識するSQLクエリ MSCK REPAIR TABLE ~; を実行することで、テーブルプロパティでもpartitionが認識されます。

f:id:sbc_ohara:20210315232538p:plain

あとはSQLクエリでpartitionを認識しながら実行することで、テーブルが表示されます。
f:id:sbc_ohara:20210315232708p:plain


最後に

本記事では、OSSにあるHDFS_Parquetファイル(partition付き)をMaxCompute EXTERNAL Table(外部テーブル)として格納する方法を簡単に説明しました。
LogServiceでデータ収集後、LogShipperでOSSへpartition付きデータとして出力されるので、これをうまく連携できれば誰でもDWHが簡単に構築できます。

最後までお読みいただきありがとうございました。

参考: www.alibabacloud.com

www.alibabacloud.com

MaxCompute連載シリーズ

www.sbcloud.co.jp www.sbcloud.co.jp www.sbcloud.co.jp www.sbcloud.co.jp