【Hologres連載】Apache SparkからHologresへデータ連携方法

Hi, データエンジニアの大原です。
今回はAlibaba Cloudの国際サイトで提供している Hologres に対し、Apache SparkからHologresへリアルタイムデータ連携する方法をご紹介します。

f:id:sbc_ohara:20210316010930p:plain

Hologresとは

Hologres はリアルタイムのインタラクティブ分析サービスです。高い同時実行性と低いレイテンシーでTB、PBクラスのデータの移動や分析を短時間で処理できます。PostgreSQL11と互換性があり、データを多次元で分析し、ビジネスインサイトを素早くキャッチすることができます。

www.sbcloud.co.jp

少し前になりますが、Hologresについての資料をSlideShareへアップロードしていますので、こちらも参考になればと思います。

www.slideshare.net

Apache Sparkとは

Apache Sparkは、大規模データ処理のための統合分析エンジンです。複数のプログラム言語でハイレベルなAPIを提供しており、ビッグデータソリューションで人気があります。

また、今回のチュートリアルとしては、Alibaba Cloud EMR(E-MapReduce) によるApache sparkを使用します。EMR(E-MapReduce) については、こちらのSlideShareで紹介しています。

www.slideshare.net

Apache SparkからHologresへデータ連携について

このガイドラインでは、Sparkを使ってHologresでデータ処理を行う方法を順を追って説明します。

f:id:sbc_ohara:20210906202810p:plain

このチュートリアルについて

対象者:

本ガイドラインは、以下のような方を対象としています
- Alibaba Cloud、Hologres、LogService、OSS(Object Storage Service)、EMR(E-MapReduce) に関する基本的な知識を持っている

前提条件:

  • Alibaba Cloud のアカウントを所持している
  • Hologres、LogService、OSS(Object Storage Service)EMR(E-MapReduce) が使用可能な状態になっている
  • 使用するHologres、LogService、OSS(Object Storage Service)EMR(E-MapReduce) は同一Region配下にある
  • 少なくとも1つのHologresインスタンス、1つのOSS(Object Storage Service)bucketを持っている

EMR(E-MapReduce)クラスタをspark実行環境を準備

まず最初に、sparkの実行環境を準備します。Apache Sparkを導入するにはいくつかの方法 があります。

spark.apache.org

Alibaba Cloud EMR(E-MapReduce)では、Hadoop、Spark、Flink、Kafka、HBaseなどのオープンソースのビッグデータサービスを数分で簡単に導入することができます。このガイドラインでは、すべてのSparkタスクをEMR(E-MapReduce)クラスタ上で実行します。

EMRのウィザードを使って、EMR-3.36.1とspark 2.4.7でEMR(E-MapReduce)クラスタを作成します。Alibaba CloudのEMRはAWS EMRなど他社EMRとは異なってほぼマネージドサービスなので、コンソール上の操作だけで5分で出来ます。

f:id:sbc_ohara:20210906210649p:plain

f:id:sbc_ohara:20210906210657p:plain

f:id:sbc_ohara:20210906210705p:plain

f:id:sbc_ohara:20210906210715p:plain

f:id:sbc_ohara:20210906210725p:plain

f:id:sbc_ohara:20210906210732p:plain

f:id:sbc_ohara:20210906210741p:plain

EMR(E-MapReduce)コンソールのData Platformにプロジェクトを作成し、タスクを管理します。

f:id:sbc_ohara:20210906210757p:plain

f:id:sbc_ohara:20210906210805p:plain

Hologresテーブルの準備

既存のHologresインスタンスに、関連データを格納するためのテーブルを作成します。

f:id:sbc_ohara:20210906210820p:plain

f:id:sbc_ohara:20210906210829p:plain

以下は生成するDDL文です。

BEGIN;
CREATE TABLE public.book_order (
 "order_id" text NOT NULL,
 "user_id" int8 NOT NULL,
 "book_id" int8 NOT NULL,
 "book_name" text NOT NULL,
 "order_cnt" int8 NOT NULL,
 "order_amt" int8 NOT NULL,
PRIMARY KEY (order_id)
);
CALL SET_TABLE_PROPERTY('public.book_order', 'orientation', 'column');
CALL SET_TABLE_PROPERTY('public.book_order', 'bitmap_columns', 'order_id,book_name');
CALL SET_TABLE_PROPERTY('public.book_order', 'dictionary_encoding_columns', 'order_id:auto,book_name:auto');
CALL SET_TABLE_PROPERTY('public.book_order', 'time_to_live_in_seconds', '3153600000');
CALL SET_TABLE_PROPERTY('public.book_order', 'distribution_key', 'order_id');
CALL SET_TABLE_PROPERTY('public.book_order', 'storage_format', 'orc');
COMMIT;

OSS(Object Storage Service)バケツにスクリプトとデータのフォルダを用意する

既存のOSS(Object Storage Service)バケットに、pythonスクリプトやデータファイルを保存するためのフォルダを新規に作成します。

f:id:sbc_ohara:20210906210844p:plain

f:id:sbc_ohara:20210906210854p:plain

Apache SparkでCSVデータファイルをHologresテーブルに取り込む

前述の通り、sparkは複数のプログラム言語をサポートしていますが、ここではpythonとPySparkを例にしています。タスクが完了すると、OSS(Object Storage Service)バケット内のCSVファイルを選択し、その中の全てのレコードが対象のHologresテーブルに保存されます。
DataFrameReaderのcsv("path")またはformat("csv").load("path")を使うと、CSVファイルをPySparkのDataFrameに読み込むことができます。また、JdbcRDDを使えば、特定のドライバを使って簡単にデータベースにDataFrameを書き込むことができます。

spark.apache.org

この状態ではHologresインスタンスはPostgreSQLとして接続されています。詳しい情報は、spark JDBC data sourcespark generic file sourceを参照してください。

spark.apache.org

spark.apache.org

以下のスクリプトでhologres_spark.pyを作成します。

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, IntegerType, StringType

if __name__ == '__main__':
    spark = SparkSession.builder.appName('HologresSparkDemo').getOrCreate()
    schema = StructType() \
        .add("order_id", StringType(), True) \
        .add("user_id", IntegerType(), True) \
        .add("book_id", IntegerType(), True) \
        .add("book_name", StringType(), True) \
        .add("order_cnt", IntegerType(), True) \
        .add("order_amt", IntegerType(), True)
    df_with_schema = spark.read.format("csv") \
        .option("header", True) \
        .schema(schema) \
        .load("<your file path in OSS bucket>")
    df_with_schema.write.mode("append").format("jdbc") \
        .option("url", "<jDBC connection url of your instance, e.g. jdbc:postgresql://<endpoint>:<port>/<database name>") \
        .option("dbtable", "<your target table name>") \
        .option("user", "<your accessKeyId>") \
        .option("password", "<your accessKeySecret>") \
        .option("driver", "org.postgresql.Driver") \
        .save()

以下のスクリプトを使って、関連するデータファイルを生成し、テストすることができます。

import csv
import random
import uuid

output_date = "20210712"
book_info = ["嫌われる勇気", "ノルウェイの森", "海辺のカフカ", "色彩を持たない多崎つくると彼の巡礼の年",
             "容疑者Ⅹの献身", "人間失格", "こころ", "天声人语", "幸せになる勇気"]
book_price = [500, 600, 700, 300, 200, 100, 350, 550, 650]
csv_writer = csv.writer(open("test_data_{0}.csv".format(output_date), "w+", newline='', encoding='UTF-8'))
for i in range(10000):
    book_id = random.randint(1, 9)
    order_count = random.randint(1, 5)
    row = [uuid.uuid1(), random.randint(1, 50), book_id, book_info[book_id-1], order_count,
           book_price[book_id-1] * order_count]  # order_id, user_id, book_id, book_name, order_count, order_amount
    csv_writer.writerow(row)

PostgreSQL JDBC Driverをダウンロードして、pythonスクリプト、生成されたデータファイル、ドライバパッケージを上記の用意したフォルダにアップロードします。

mvnrepository.com

f:id:sbc_ohara:20210906210911p:plain

EMR(E-MapReduce)プロジェクトでApache Spark Jobを作成します。

f:id:sbc_ohara:20210906210922p:plain

f:id:sbc_ohara:20210906210929p:plain

以下のフォーマットに従って、実行コマンドを入力します。

--driver-class-path <your PostgreSQL driver class path in OSS> --jars <your PostgreSQL driver class path in OSS> <your python scripts path in OSS>

ウィザードを使ってOSS(Object Storage Service)のパスを生成するには、Enter an OSS pathリンクをクリックします。

f:id:sbc_ohara:20210906210944p:plain

自作のSpark環境で作業する場合は、代わりに以下のコマンドを使用してください。

spark-submit --driver-class-path <your PostgreSQL driver class path> --jars <your PostgreSQL driver class path> <your python scripts path>

Apache spark job を保存し、EMR(E-MapReduce)クラスタに投入して実行します。

f:id:sbc_ohara:20210906211001p:plain

f:id:sbc_ohara:20210906211009p:plain

f:id:sbc_ohara:20210906211017p:plain

f:id:sbc_ohara:20210906211027p:plain

Apache Spark Jobのインスタンス情報やログは、詳細ページで確認できます。

f:id:sbc_ohara:20210906211041p:plain

f:id:sbc_ohara:20210906211049p:plain

HoloWebにアクセスして、ターゲットテーブルのデータを照会すると、CSVファイルのすべてのレコードがHologresテーブルに格納されているのがわかると思います。

f:id:sbc_ohara:20210906211104p:plain

Spark streamingでLog ServiceからHologresへリアルタイムデータ転送

ここまできたら、Spark streaming動作確認のためにHologresのテーブルデータをDELETE文でクリアします。

f:id:sbc_ohara:20210906211630p:plain

LogServiceプロジェクトとlogstoreを準備します。logstore作成時にWebTrackingが有効になっていることを確認してください。

f:id:sbc_ohara:20210906211643p:plain

f:id:sbc_ohara:20210906211651p:plain

f:id:sbc_ohara:20210906211659p:plain

f:id:sbc_ohara:20210906211708p:plain

logstore のデータページに入り、インデックス属性を有効にします。

f:id:sbc_ohara:20210906211720p:plain

f:id:sbc_ohara:20210906211728p:plain

f:id:sbc_ohara:20210906211735p:plain

f:id:sbc_ohara:20210906211744p:plain

Alibaba Cloudが提供するSpark SDK](https://www.alibabacloud.com/help/doc-detail/51075.htm)では、ReceiverまたはDirectモードでLog ServiceからLog dataを消費することができます。

www.alibabacloud.com

関連する紹介やデモコードはGit aliyun-emapreduce-datasourcesGit aliyun-emapreduce-demoで入手できます。

https://github.com/aliyun/aliyun-emapreduce-dem

Apache Spark SDKにより、DataStreamReaderがログサービスを loghub というフォーマットで認識するようになりました。Pythonの使い方についてはhttps://github.com/aliyun/aliyun-emapreduce-datasources/blob/main/docs/how_to_run_spark_with_python_sdk.mdを参照してください。

github.com

関連するSDKを使用して、まずテストのためにコンソールにストリーミングデータを表示します。以下のスクリプトで hologres_spark_streaming.py を作成します。

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, IntegerType, StringType


if __name__ == '__main__':
    spark = SparkSession.builder.appName('HologresSparkStreamingDemo').getOrCreate()
    schema = StructType() \
        .add("order_id", StringType(), True) \
        .add("user_id", IntegerType(), True) \
        .add("book_id", IntegerType(), True) \
        .add("book_name", StringType(), True) \
        .add("order_cnt", IntegerType(), True) \
        .add("order_amt", IntegerType(), True)
    line_with_schema = spark.readStream.format("loghub") \
        .schema(schema) \
        .option("sls.project", "<your log service project name>") \
        .option("sls.store", "<your log service logstore name>") \
        .option("access.key.id", "<your accessKeyId>") \
        .option("access.key.secret", "<your accessKeySecret>") \
        .option("endpoint", "<your endpoint such as ap-northeast-1-intranet.log.aliyuncs.com>") \
        .option("startingoffsets", "latest") \
        .load()
    stream = line_with_schema.writeStream.format("console") \
        .outputMode("append") \
        .trigger(processingTime='25 seconds') \
        .start()
    stream.awaitTermination()

Pythonスクリプト、PostgreSQLドライバ、log service spark sdk (emr-logservice_2.11-2.2 .0.jar) 、およびfastjson-1.2 .45.jar、commons-validator-1.4 .0.jar、ezmorf-1.0 .6.jar、loghub-client-lib-0.6 .13.jar、aliyun-log-0.6 .10.jar、json-lib-2.4-jdk 15.jar、zkclient-0.10.jar、emr-common_2.11-2.2 .0.jarをアップロードします。

f:id:sbc_ohara:20210906211806p:plain

EMR(E-MapReduce)プロジェクトの下に、新しいspark streaming jobを作成し、ジョブ実行コマンドを更新します。

f:id:sbc_ohara:20210906211823p:plain f:id:sbc_ohara:20210906211832p:plain

OSS(Object Storage Service)のパスを指定して、以下のフォーマットに沿って実行コマンドを入力してください。

--master yarn-client --driver-memory 7G --executor-memory 5G --executor-cores 1 --num-executors 32 --driver-class-path ossref://xxx/postgresql-42.2.6.jar  --jars ossref://xxx/postgresql-42.2.6.jar,ossref://xxx/fastjson-1.2.45.jar,ossref://xxx/commons-validator-1.4.0.jar,ossref://xxx/ezmorph-1.0.6.jar,ossref://xxx/emr-logservice_2.11-2.2.0.jar,ossref://xxx/loghub-client-lib-0.6.13.jar,ossref://xxx/aliyun-log-0.6.10.jar,ossref://xxx/json-lib-2.4-jdk15.jar,ossref://xxx/zkclient-0.10.jar,ossref://xxx/emr-common_2.11-2.2.0.jar <your python scripts path in OSS>

自分で構築したspark環境で作業している場合は、関連ファイルのパスも更新してください。

sparkのジョブを保存し、EMR(E-MapReduce)クラスタに投入して実行します。

f:id:sbc_ohara:20210906211852p:plain

f:id:sbc_ohara:20210906211901p:plain

f:id:sbc_ohara:20210906211910p:plain

spark streamingジョブの実行中に、以下のスクリプトで新しいログサービスレコードを送信し、関連するログを確認します。

import random
import requests
import uuid

project = "<your log service project name>"
endpoint = "<your endpoint such as ap-northeast-1.log.aliyuncs.com>"
logstore = "<your log service logstore name>"
topic = "<your logs topic>"
# urlencoded book name in Japanese
book_info = ["%E5%AB%8C%E3%82%8F%E3%82%8C%E3%82%8B%E5%8B%87%E6%B0%97",
             "%E3%83%8E%E3%83%AB%E3%82%A6%E3%82%A7%E3%82%A4%E3%81%AE%E6%A3%AE",
             "%E6%B5%B7%E8%BE%BA%E3%81%AE%E3%82%AB%E3%83%95%E3%82%AB",
             "%E8%89%B2%E5%BD%A9%E3%82%92%E6%8C%81%E3%81%9F%E3%81%AA%E3%81%84%E5%A4%9A%E5%B4%8E%E3%81%A4%E3%81%8F%E3%82%8B%E3%81%A8%E5%BD%BC%E3%81%AE%E5%B7%A1%E7%A4%BC%E3%81%AE%E5%B9%B4",
             "%E5%AE%B9%E7%96%91%E8%80%85%E2%85%A9%E3%81%AE%E7%8C%AE%E8%BA%AB", "%E4%BA%BA%E9%96%93%E5%A4%B1%E6%A0%BC",
             "%E3%81%93%E3%81%93%E3%82%8D", "%E5%A4%A9%E5%A3%B0%E4%BA%BA%E8%AF%AD",
             "%E5%B9%B8%E3%81%9B%E3%81%AB%E3%81%AA%E3%82%8B%E5%8B%87%E6%B0%97"]
book_price = [500, 600, 700, 300, 200, 100, 350, 550, 650]
url = 'http://{0}.{1}/logstores/{2}/track?APIVersion=0.6.0&__topic__={3}&order_id={4}&user_id={5}&book_id={6}&book_name={7}&order_cnt={8}&order_amt={9}'
for i in range(5):
    book_id = random.randint(1, 9)
    order_count = random.randint(1, 5)
    res = requests.get(
        url.format(project, endpoint, logstore, topic, uuid.uuid1(), random.randint(1, 50), book_id,
                   book_info[book_id - 1], order_count,
                   book_price[book_id - 1] * order_count))
    print(res)

f:id:sbc_ohara:20210906211926p:plain

f:id:sbc_ohara:20210906211934p:plain

ここでストリームデータをターゲットのHologresテーブルに保存するために、formatconsolejdbcに置き換えると、java.lang.UnsupportedOperationException: Data source jdbc does not support streamed writingという関連エラーが発生します。つまり、データソースをjdbcと設定では、ストリーミング処理をサポートすることができないことがわかります。

......
    # Error Data source jdbc does not support streamed writing
    stream = line_with_schema.writeStream.format("jdbc") \
        .outputMode("append") \
        .option("url", "<jDBC connection url of your instance, e.g. jdbc:postgresql://<endpoint>:<port>/<database name>") \
        .option("dbtable", "<your target table name>") \
        .option("user", "<your accessKeyId>") \
        .option("password", "<your accessKeySecret>") \
        .option("driver", "org.postgresql.Driver") \
        .start()
......

f:id:sbc_ohara:20210906211947p:plain

foreachBatch(...) でフォーマットコンソールを更新すると、ストリーミングクエリの各マイクロバッチの出力データに対して実行する関数を指定できます。Sparkジョブで以前行ったように、DataFrameをHologresテーブルに保存します。

spark.apache.org

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, IntegerType, StringType


def foreach_batch_function(df, epoch_id):
    df.write.mode("append").format("jdbc") \
        .option("url", "<jDBC connection url of your instance, e.g. jdbc:postgresql://<endpoint>:<port>/<database name>") \
        .option("dbtable", "<your target table name>") \
        .option("user", "<your accessKeyId>") \
        .option("password", "<your accessKeySecret>") \
        .option("driver", "org.postgresql.Driver") \
        .save()
    pass


if __name__ == '__main__':
    spark = SparkSession.builder.appName('HologresSparkStreamingDemo').getOrCreate()
    schema = StructType() \
        .add("order_id", StringType(), True) \
        .add("user_id", IntegerType(), True) \
        .add("book_id", IntegerType(), True) \
        .add("book_name", StringType(), True) \
        .add("order_cnt", IntegerType(), True) \
        .add("order_amt", IntegerType(), True)
    line_with_schema = spark.readStream.format("loghub") \
        .schema(schema) \
        .option("sls.project", "<your log service project name>") \
        .option("sls.store", "<your log service logstore name>") \
        .option("access.key.id", "<your accessKeyId>") \
        .option("access.key.secret", "<your accessKeySecret>") \
        .option("endpoint", "<your endpoint such as ap-northeast-1-intranet.log.aliyuncs.com>") \
        .option("startingoffsets", "latest") \
        .load()
    stream = line_with_schema.writeStream.foreachBatch(foreach_batch_function).start()
    # stream = line_with_schema.writeStream.format("console") \
    #     .outputMode("append") \
    #     .trigger(processingTime='25 seconds') \
    #     .start()
    stream.awaitTermination()

新しいPythonスクリプトをアップロードし、sparkストリーミングジョブを再実行します。新しいログサービステストレコードを送信し、Hologresテーブルを確認します。

f:id:sbc_ohara:20210906212005p:plain

f:id:sbc_ohara:20210906212012p:plain

f:id:sbc_ohara:20210906212020p:plain

f:id:sbc_ohara:20210906212028p:plain

最後に

ここまで、Apache SparkからHologresへリアルタイムデータ連携する方法を紹介しました。
この方法を生かすことで、Apache Sparkがあるサービス基盤からリアルタイムでHologresへ連携、Hologresでリアルタイム可視化を実現することができます。