【Hologres連載】Azureのk8s(AKS)にApache Sparkを配置し、Hologresへリアルタイムデータ連携する方法(30分で出来るAzureとAlibaba Cloudのマルチクラウド構成)

Hi, データエンジニアの大原です。
今回はAlibaba Cloudの国際サイトで提供している Hologres を使って、AzureのKubernetesサービスであるAKSからApache Sparkによるマルチクラウドーリアルタイムデータ連携する方法をご紹介します。

f:id:sbc_ohara:20210316010930p:plain

Hologresとは

Hologres はリアルタイムのインタラクティブ分析サービスです。高い同時実行性と低いレイテンシーでTB、PBクラスのデータの移動や分析を短時間で処理できます。PostgreSQL11と互換性があり、データを多次元で分析し、ビジネスインサイトを素早くキャッチすることができます。

www.sbcloud.co.jp

少し前になりますが、Hologresについての資料をSlideShareへアップロードしていますので、こちらも参考になればと思います。

www.slideshare.net

Apache Sparkとは

Apache Sparkは、大規模データ処理のための統合分析エンジンです。複数のプログラム言語でハイレベルなAPIを提供しており、ビッグデータソリューションで人気があります。

spark.apache.org

Kubernetes(k8s)とは

Kubernetes は、K8sとも呼ばれ、コンテナ化されたアプリケーションのデプロイ、スケーリング、管理を自動化するためのオープンソースのシステムで、アプリケーションを構成するコンテナを論理的な単位にまとめ、管理と発見を容易にします。

kubernetes.io

Azure Kubernetes上でApache Sparkを使ってHologresでデータ処理をする

このガイドラインでは、Kubernetes上のApache Sparkを使って、Hologresによるデータ処理を段階的に作成します。
マルチクラウドによるデータ分析サービスを構築するため、Apache SparkはAKS(Azure Kubernetes Service) にてデプロイします。

f:id:sbc_ohara:20210909084249p:plain

このチュートリアルについて

対象者:

本ガイドラインは、以下のような方を対象としています
- docker、Kubernetes,、Alibaba Cloud、Hologres,、Azure、AKS(Azure Kubernetes Service) 、Azure Storage Accountに関する基本的な知識を持っている

前提条件:

  • Alibaba Cloud、Azure のアカウントを所持している
  • Alibaba Cloud Hologresが使用可能な状態になっている
  • 少なくとも1つのHologresインスタンスを持っている
  • Azure AKS(Azure Kubernetes Service) と Azure Storage Account が使用可能な状態になっている
  • 作業環境にAzure CLI、およびdocker(17.05以降)をインストールしている

Hologresテーブルの準備

既存のHologresインスタンスに、関連データを格納するためのテーブルを作成します。

f:id:sbc_ohara:20210909084305p:plain f:id:sbc_ohara:20210909084313p:plain

以下は生成するDDL文です

BEGIN;
CREATE TABLE public.book_order (
 "order_id" text NOT NULL,
 "user_id" int8 NOT NULL,
 "book_id" int8 NOT NULL,
 "book_name" text NOT NULL,
 "order_cnt" int8 NOT NULL,
 "order_amt" int8 NOT NULL,
PRIMARY KEY (order_id)
);
CALL SET_TABLE_PROPERTY('public.book_order', 'orientation', 'column');
CALL SET_TABLE_PROPERTY('public.book_order', 'bitmap_columns', 'order_id,book_name');
CALL SET_TABLE_PROPERTY('public.book_order', 'dictionary_encoding_columns', 'order_id:auto,book_name:auto');
CALL SET_TABLE_PROPERTY('public.book_order', 'time_to_live_in_seconds', '3153600000');
CALL SET_TABLE_PROPERTY('public.book_order', 'distribution_key', 'order_id');
CALL SET_TABLE_PROPERTY('public.book_order', 'storage_format', 'orc');
COMMIT;

特定データファイルおよびjarファイルを含むApache SparkのImageファイルのbuild

AKS(Azure Kubernetes Service) にApache Sparkをデプロイするためには、Sparkの実行環境、python 3環境下のPySpark、ソースデータファイル、依存関係にあるJARを含む、適切なSparkのImageを構築する必要があります。
以下の手順で独自のイメージを構築することもできますし、あるいはDocker Image libraryにある bwbw723/spark-k8s-hologres-demo:2.4.7 をそのまま使うこともできます。

hub.docker.com

f:id:sbc_ohara:20210909084330p:plain

上記のパブリックイメージを使用する場合は、次のセクションから始めてください。独自のイメージを構築するには、作業環境でdockerサービスが実行されていることを確認する必要があります。

f:id:sbc_ohara:20210909084345p:plain

Apache Spark用Imageファイルの準備

Alibaba Cloud Log Service SDK は spark 2.x.x を必要とするので、ここでは spark-2.4.7-bin-hadoop2.7 を使用します。

$ wget https://archive.apache.org/dist/spark/spark-2.4.7/spark-2.4.7-bin-hadoop2.7.tgz
$ tar -xzvf spark-2.4.7-bin-hadoop2.7.tgz

f:id:sbc_ohara:20210909084401p:plain

ImageのJAR依存関係とデータファイルの準備

上記のHologresテーブルのテーブルスキーマに従って、以下のスクリプトで関連するデータファイルを生成してテストすることができます。

import csv
import random
import uuid

output_date = "20210712"
book_info = ["嫌われる勇気", "ノルウェイの森", "海辺のカフカ", "色彩を持たない多崎つくると彼の巡礼の年",
             "容疑者Ⅹの献身", "人間失格", "こころ", "天声人语", "幸せになる勇気"]
book_price = [500, 600, 700, 300, 200, 100, 350, 550, 650]
csv_writer = csv.writer(open("test_data_{0}.csv".format(output_date), "w+", newline='', encoding='UTF-8'))
for i in range(10000):
    book_id = random.randint(1, 9)
    order_count = random.randint(1, 5)
    row = [uuid.uuid1(), random.randint(1, 50), book_id, book_info[book_id-1], order_count,
           book_price[book_id-1] * order_count]  # order_id, user_id, book_id, book_name, order_count, order_amount
    csv_writer.writerow(row)

以下のスクリプトを使って、関連するデータファイルを生成し、テストすることができます。

order_id,user_id,book_id,book_name,order_cnt,order_amt
5761158b-f044-11eb-8be8-706655fcbfda,39,8,天声人语,1,550
57613ca0-f044-11eb-a396-706655fcbfda,46,8,天声人语,5,2750
57613ca1-f044-11eb-82e5-706655fcbfda,13,5,容疑者Ⅹの献身,1,200
57613ca2-f044-11eb-b395-706655fcbfda,35,5,容疑者Ⅹの献身,3,600
57613ca3-f044-11eb-a123-706655fcbfda,42,4,色彩を持たない多崎つくると彼の巡礼の年,1,300
57613ca4-f044-11eb-923c-706655fcbfda,18,8,天声人语,5,2750
57613ca5-f044-11eb-a533-706655fcbfda,29,9,幸せになる勇気,3,1950
57613ca6-f044-11eb-9ca4-706655fcbfda,35,7,こころ,3,1050
57613ca7-f044-11eb-bd76-706655fcbfda,25,1,嫌われる勇気,1,500
......

PostgreSQL JDBC Driver の依存関係にあるJARをMaven Repository からダウンロードします。

mvnrepository.com

mvnrepository.com

spark root folder(spark-2.4.7-bin-hadoop2.7 here) と入力し、demoという名前の新しいフォルダ を作成し、準備したデータファイル (data.csv) 、PostgreSQLドライバ (postgresql-42.2 .6.jar) 、ログサービスのspark sdk (emr-logservice_2.11-2.2 .0.jar) とその依存関係 (fastjson-1.2 .45.jar、commons-validator-1.4 .0.jar、ezmorf-1.0 .6.jar、loghub-client-lib-0.6 .13.jar、aliyun-log-0.6 .10.jar、json-lib-2.4-jdk 15.jar、zkclient-0.10.jar、emr-common_2.11-2.2 .0.jarなど) を入れます。

f:id:sbc_ohara:20210909084417p:plain

Apache spark imageをビルドおよびプッシュ

以下の2つの点について/kubernetes/dockerfiles/spark/Dockerfileを更新します。
- python 3実行環境を追加します。
- データファイルと依存関係JARを作業ディレクトリに置く

以下はDockerfileのサンプルです。

FROM openjdk:8-jdk-slim

ARG spark_jars=jars
ARG img_path=kubernetes/dockerfiles
ARG k8s_tests=kubernetes/tests

# Before building the docker image, first build and make a Spark distribution following
# the instructions in http://spark.apache.org/docs/latest/building-spark.html.
# If this docker file is being used in the context of building your images from a Spark
# distribution, the docker build command should be invoked from the top level directory
# of the Spark distribution. E.g.:
# docker build -t spark:latest -f kubernetes/dockerfiles/spark/Dockerfile .

RUN set -ex && \
    apt-get update && \
    ln -s /lib /lib64 && \
    apt install -y bash tini libc6 libpam-modules libnss3 && \
    mkdir -p /opt/spark && \
    mkdir -p /opt/spark/work-dir && \
    touch /opt/spark/RELEASE && \
    rm /bin/sh && \
    ln -sv /bin/bash /bin/sh && \
    echo "auth required pam_wheel.so use_uid" >> /etc/pam.d/su && \
    chgrp root /etc/passwd && chmod ug+rw /etc/passwd && \
    rm -rf /var/cache/apt/*

COPY ${spark_jars} /opt/spark/jars
COPY bin /opt/spark/bin
COPY sbin /opt/spark/sbin
COPY ${img_path}/spark/entrypoint.sh /opt/
COPY examples /opt/spark/examples
COPY ${k8s_tests} /opt/spark/tests
COPY data /opt/spark/data
COPY python /opt/spark/python

ENV SPARK_HOME /opt/spark

# Add python3 running environment
ENV PYTHON_VERSION=3.7.3

RUN apt-get install -y \
    build-essential \
    libreadline-gplv2-dev \
    libncursesw5-dev \
    libssl-dev \
    libsqlite3-dev \
    tk-dev \
    libgdbm-dev \
    libc6-dev \
    libbz2-dev \
    libffi-dev \
    zlib1g-dev \
    liblzma-dev \
    wget \
    && apt-get autoremove -y \
    && apt-get clean

WORKDIR /usr/src

RUN wget https://www.python.org/ftp/python/$PYTHON_VERSION/Python-$PYTHON_VERSION.tgz

RUN tar xzf Python-$PYTHON_VERSION.tgz

WORKDIR /usr/src/Python-$PYTHON_VERSION

RUN ./configure --enable-optimizations --with-ensurepip=install

RUN make altinstall

RUN ln -s /usr/local/bin/python3.7 /usr/local/bin/python3
RUN ln -s /usr/local/bin/python3.7 /usr/local/bin/python
RUN ln -s /usr/local/bin/pip3.7 /usr/local/bin/pip3
RUN ln -s /usr/local/bin/pip3.7 /usr/local/bin/pip

RUN pip install findspark

WORKDIR /opt/spark/work-dir

RUN rm -rf /usr/src/Python-$PYTHON_VERSION
RUN rm -rf /usr/src/Python-$PYTHON_VERSION.tgz

# Put data file and dependency JARs into work dir

ADD demo/aliyun-log-0.6.10.jar aliyun-log-0.6.10.jar
ADD demo/fastjson-1.2.45.jar fastjson-1.2.45.jar
ADD demo/loghub-client-lib-0.6.13.jar loghub-client-lib-0.6.13.jar
ADD demo/ezmorph-1.0.6.jar ezmorph-1.0.6.jar
ADD demo/emr-common_2.11-2.2.0.jar emr-common_2.11-2.2.0.jar
ADD demo/emr-logservice_2.11-2.2.0.jar emr-logservice_2.11-2.2.0.jar
ADD demo/commons-validator-1.4.0.jar commons-validator-1.4.0.jar
ADD demo/postgresql-42.2.6.jar postgresql-42.2.6.jar
ADD demo/zkclient-0.10.jar zkclient-0.10.jar
ADD demo/json-lib-2.4-jdk15.jar json-lib-2.4-jdk15.jar
ADD demo/data.csv data.csv

ENTRYPOINT [ "/opt/entrypoint.sh" ]

Docker HubACR(Azure Container Registry)Alibaba Cloud Container Registry などのコンテナイメージレジストリにログインします。ここではDocker Hubを例に説明します。

f:id:sbc_ohara:20210909084439p:plain

に移動し、関連するコマンドを実行してイメージをビルドし、プッシュします。必要に応じて、イメージのレジストリ名やタグを変更します。

$ docker build -t bwbw723/spark-k8s-hologres-demo:2.4.7 -f ./kubernetes/dockerfiles/spark/Dockerfile .
$ docker push bwbw723/spark-k8s-hologres-demo:2.4.7

f:id:sbc_ohara:20210909084454p:plain

f:id:sbc_ohara:20210909084502p:plain

AKS(Azure Kubernetes Service) clusterの準備

AKS(Azure Kubernetes Service) クラスタの作成は、Azure official help documentの手順に従って、Azure CLIで行うことができます。
Azure CLIの準備ができていない場合は、Install Azure CLISign inを参照してください。

docs.microsoft.com

docs.microsoft.com

docs.microsoft.com

コマンドは次の通りです。

# Install Azure CLI
$ curl -L https://aka.ms/InstallAzureCli | bash
# Sign in
$ az login
# Create a resource group for the cluster
$ az group create --name <resource group name> --location <group location>
# Create a Service Principal for the cluster
$ az ad sp create-for-rbac --name <service principal name>
# Create the AKS cluster with nodes that are of size Standard_DS2_v2
$ az aks create --resource-group <above resource group name> --name <aks cluster name> --node-vm-size <node vm size such as Standard_DS2_v2> --generate-ssh-keys --service-principal <APPID generated above> --client-secret <PASSWORD generated above>
# Get credentials for cluster connection
$ az aks get-credentials --resource-group <above resource group name> --name <above aks cluster name>

f:id:sbc_ohara:20210909084515p:plain

PySparkでスパークジョブの準備

Sparkは複数のプログラム言語をサポートしており、ここではpythonとPySparkを例にしています。タスクが完了すると、スパークイメージで用意したCSVファイルに含まれる全てのレコードが対象のHologresテーブルに保存されます。
DataFrameReaderのcsv("path")またはformat("csv").load("path")を使うと、CSVファイルをPySparkのDataFrameに読み込むことができます。また、JdbcRDDを使えば、特定のドライバを使って簡単にデータベースにDataFrameを書き込むことができます。

この状態ではHologresインスタンスはPostgreSQLとして接続されています。詳しい情報は、spark JDBC data sourcespark generic file sourceを参照してください。

以下のスクリプトで hologres_spark.py を作成します。

import findspark

findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, IntegerType, StringType

if __name__ == '__main__':
    spark = SparkSession.builder.appName('HologresSparkDemo').getOrCreate()
    schema = StructType() \
        .add("order_id", StringType(), True) \
        .add("user_id", IntegerType(), True) \
        .add("book_id", IntegerType(), True) \
        .add("book_name", StringType(), True) \
        .add("order_cnt", IntegerType(), True) \
        .add("order_amt", IntegerType(), True)
    df_with_schema = spark.read.format("csv") \
        .option("header", True) \
        .schema(schema) \
        .load("<your file path in spark image, such as /opt/spark/work-dir/data.csv>")
    df_with_schema.write.mode("append").format("jdbc") \
        .option("url", "<JDBC connection url of your instance, e.g. jdbc:postgresql://<endpoint>:<port>/<database name>") \
        .option("dbtable", "<your target table name>") \
        .option("user", "<your accessKeyId>") \
        .option("password", "<your accessKeySecret>") \
        .option("driver", "org.postgresql.Driver") \
        .save()

Submit spark job

pythonスクリプトをAzure blob storageに置くには、この手順に従って配置する必要があります。

docs.microsoft.com

コマンドは次の通りです。

# Create storage account
$ az storage account create --resource-group <above resource group name> --name <storage account name> --sku Standard_LRS
# Export Azure storage connection string
$ export AZURE_STORAGE_CONNECTION_STRING=`az storage account show-connection-string --resource-group <above resource group name> --name <above storage account name> -o tsv`
# Create storage container
$ az storage container create --name <container name>
# Set container permission as public
$ az storage container set-permission --name <above container name> --public-access blob
# Upload python script into container
$ az storage blob upload --container-name <above container name> --file <local file path> --name <file name in container>
# Get access url for uploaded python script
$ scriptUrl=$(az storage blob url --container-name <above container name> --name <above file name in container> | tr -d '"')

f:id:sbc_ohara:20210909084538p:plain

ここではSpark Jobをsubmitする方法を説明します。
kubectl proxyを実行して、kube-proxyを別のコマンドラインで起動します。

docs.microsoft.com

f:id:sbc_ohara:20210909084552p:plain

先ほど前のコマンドラインに戻り、以下のようにサービスアカウントとクラスターを作成します。

$ kubectl create serviceaccount spark
$ kubectl create clusterrolebinding spark-role --clusterrole=edit --serviceaccount=default:spark --namespace=default

f:id:sbc_ohara:20210909084605p:plain

PySparkタスクをAKS (Azure Kubernetes Service) クラスターにSubmit します。必要に応じて、image 情報とFile pathを更新します。

./bin/spark-submit \
    --master k8s://http://127.0.0.1:8001 \
    --deploy-mode cluster \
    --name hologres-spark \
    --conf spark.executor.instances=3 \
    --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
    --conf spark.kubernetes.container.image.pullPolicy=Always \
    --conf spark.kubernetes.container.image=bwbw723/spark-k8s-hologres-demo:2.4.7 \
    --driver-class-path local:///opt/spark/work-dir/postgresql-42.2.6.jar \
    --jars local:///opt/spark/work-dir/postgresql-42.2.6.jar \
    <above scriptUrl>

f:id:sbc_ohara:20210909084617p:plain

kubectl get podsで実際のPod情報を確認します。

f:id:sbc_ohara:20210909084629p:plain

Hologresテーブルのレコードをチェックします。

f:id:sbc_ohara:20210909084641p:plain

PySparkでspark streaming jobを準備する

Alibaba Cloudが提供するSpark SDK を使用すると、LogServiceでReceiver(受信モード)もしくはDirect(直接モード)でログデータを受け取りすることが出来ます。関連情報およびソースとして Git aliyun-emapreduce-datasources および、Git aliyun-emapreduce-demo にて詳しい説明があります。

www.alibabacloud.com

Spark SDKを使って、DataStreamReaderはログサービスを loghub フォーマットとして認識することができました。 Pythonの使い方については、こちらを参照してください。

github.com

以下のスクリプトでholres_spark_streaming.pyを作成します。

import findspark

findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, IntegerType, StringType


def foreach_batch_function(df, epoch_id):
    df.write.mode("append").format("jdbc") \
        .option("url", "<public JDBC connection url of your instance, e.g. jdbc:postgresql://<endpoint>:<port>/<database name>") \
        .option("dbtable", "<your target table name>") \
        .option("user", "<your accessKeyId>") \
        .option("password", "<your accessKeySecret>") \
        .option("driver", "org.postgresql.Driver") \
        .save()
    pass


if __name__ == '__main__':
    spark = SparkSession.builder.appName('HologresSparkStreamingDemo').getOrCreate()
    schema = StructType() \
        .add("order_id", StringType(), True) \
        .add("user_id", IntegerType(), True) \
        .add("book_id", IntegerType(), True) \
        .add("book_name", StringType(), True) \
        .add("order_cnt", IntegerType(), True) \
        .add("order_amt", IntegerType(), True)
    line_with_schema = spark.readStream.format("loghub") \
        .schema(schema) \
        .option("sls.project", "<your log service project name>") \
        .option("sls.store", "<your log service logstore name>") \
        .option("access.key.id", "<your accessKeyId>") \
        .option("access.key.secret", "<your accessKeySecret>") \
        .option("endpoint", "<your endpoint such as ap-northeast-1.log.aliyuncs.com>") \
        .option("startingoffsets", "latest") \
        .load()
    stream = line_with_schema.writeStream.foreachBatch(foreach_batch_function).start()
    stream.awaitTermination()

データ転送のためにAlibaba Cloud Log Serviceのリソース準備

Alibaba Cloudコンソールに入り、Log ServiceプロジェクトとLogStoreを準備します。LogStoreを作成するときは、WebTrackingが有効になっていることを確認します。

f:id:sbc_ohara:20210909084658p:plain f:id:sbc_ohara:20210909084707p:plain f:id:sbc_ohara:20210909084717p:plain f:id:sbc_ohara:20210909084726p:plain

logstore dataを入力し、インデックス属性を有効にします。 

f:id:sbc_ohara:20210909084739p:plain f:id:sbc_ohara:20210909084746p:plain f:id:sbc_ohara:20210909084755p:plain f:id:sbc_ohara:20210909084803p:plain

Spark streaming jobの実行

まずはHologresのテーブルデータをDELETEらSQLクエリでクリアします。

f:id:sbc_ohara:20210909084816p:plain

Azure CLI (az storage blob upload ...) またはAzure Portalで、事前に作成したSpark streamingスクリプトをAzure blobストレージにアップロードします。

f:id:sbc_ohara:20210909084829p:plain

アップロードされたスクリプトのアクセスURLを取得します。

f:id:sbc_ohara:20210909084841p:plain

作業環境のコマンドラインに戻り、AKS(Azure Kubernetes Service)クラスタにSpark streaming Taskを投入します。

./bin/spark-submit \
    --master k8s://http://127.0.0.1:8001 \
    --deploy-mode cluster \
    --name hologres-spark-streaming \
    --conf spark.executor.instances=3 \
    --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
    --conf spark.kubernetes.container.image.pullPolicy=Always \
    --conf spark.kubernetes.container.image=bwbw723/spark-k8s-hologres-demo:2.4.7 \
    --driver-class-path local:///opt/spark/work-dir/postgresql-42.2.6.jar \
    --jars local:///opt/spark/work-dir/postgresql-42.2.6.jar,local:///opt/spark/work-dir/fastjson-1.2.45.jar,local:///opt/spark/work-dir/commons-validator-1.4.0.jar,local:///opt/spark/work-dir/ezmorph-1.0.6.jar,local:///opt/spark/work-dir/emr-logservice_2.11-2.2.0.jar,local:///opt/spark/work-dir/loghub-client-lib-0.6.13.jar,local:///opt/spark/work-dir/aliyun-log-0.6.10.jar,local:///opt/spark/work-dir/json-lib-2.4-jdk15.jar,local:///opt/spark/work-dir/zkclient-0.10.jar,local:///opt/spark/work-dir/emr-common_2.11-2.2.0.jar \
    <access url of your spark streaming script>

f:id:sbc_ohara:20210909084856p:plain

kubectl get pods で realted pod 情報を確認します。

f:id:sbc_ohara:20210909084909p:plain

Spark streaming job の実行中に、以下のスクリプトで新しいLog Serviceのレコードを送信し、関連するログを確認します。

import random
import requests
import uuid

project = "<your log service project name>"
endpoint = "<your endpoint such as ap-northeast-1.log.aliyuncs.com>"
logstore = "<your log service logstore name>"
topic = "<your logs topic>"
# urlencoded book name in Japanese
book_info = ["%E5%AB%8C%E3%82%8F%E3%82%8C%E3%82%8B%E5%8B%87%E6%B0%97",
             "%E3%83%8E%E3%83%AB%E3%82%A6%E3%82%A7%E3%82%A4%E3%81%AE%E6%A3%AE",
             "%E6%B5%B7%E8%BE%BA%E3%81%AE%E3%82%AB%E3%83%95%E3%82%AB",
             "%E8%89%B2%E5%BD%A9%E3%82%92%E6%8C%81%E3%81%9F%E3%81%AA%E3%81%84%E5%A4%9A%E5%B4%8E%E3%81%A4%E3%81%8F%E3%82%8B%E3%81%A8%E5%BD%BC%E3%81%AE%E5%B7%A1%E7%A4%BC%E3%81%AE%E5%B9%B4",
             "%E5%AE%B9%E7%96%91%E8%80%85%E2%85%A9%E3%81%AE%E7%8C%AE%E8%BA%AB", "%E4%BA%BA%E9%96%93%E5%A4%B1%E6%A0%BC",
             "%E3%81%93%E3%81%93%E3%82%8D", "%E5%A4%A9%E5%A3%B0%E4%BA%BA%E8%AF%AD",
             "%E5%B9%B8%E3%81%9B%E3%81%AB%E3%81%AA%E3%82%8B%E5%8B%87%E6%B0%97"]
book_price = [500, 600, 700, 300, 200, 100, 350, 550, 650]
url = 'http://{0}.{1}/logstores/{2}/track?APIVersion=0.6.0&__topic__={3}&order_id={4}&user_id={5}&book_id={6}&book_name={7}&order_cnt={8}&order_amt={9}'
for i in range(5):
    book_id = random.randint(1, 9)
    order_count = random.randint(1, 5)
    res = requests.get(
        url.format(project, endpoint, logstore, topic, uuid.uuid1(), random.randint(1, 50), book_id,
                   book_info[book_id - 1], order_count,
                   book_price[book_id - 1] * order_count))
    print(res)

f:id:sbc_ohara:20210909084924p:plain

今度はHologresでレコードを確認します。

f:id:sbc_ohara:20210909084935p:plain

最後に

ここまで、Azureのk8s(AKS)にApache Sparkを配置し、Hologresへデータ連携する方法を紹介しました。
この方法を生かすことで、Alibaba Cloudでない、他社クラウドサービスからApache Spark on k8sを使ってk8sのサービス基盤からリアルタイムでHologresへ連携、Hologresでリアルタイム可視化を実現することができます。