【Hologres連載】Kubernetes(k8s)上にApache Flinkを配置し、Hologresへリアルタイムデータ連携する方法

Hi, データエンジニアの大原です。
今回はAlibaba Cloudの国際サイトで提供している Hologres を使って、AWSのKubernetesサービスであるECSからApache Flinkによるマルチクラウドーリアルタイムデータ連携する方法をご紹介します。

f:id:sbc_ohara:20210316010930p:plain

Hologresとは

Hologres はリアルタイムのインタラクティブ分析サービスです。高い同時実行性と低いレイテンシーでTB、PBクラスのデータの移動や分析を短時間で処理できます。PostgreSQL11と互換性があり、データを多次元で分析し、ビジネスインサイトを素早くキャッチすることができます。

www.sbcloud.co.jp

少し前になりますが、Hologresについての資料をSlideShareへアップロードしていますので、こちらも参考になればと思います。

www.slideshare.net

Apache Flinkとは

Apache Flink は、大規模データを分散ストリームおよびバッチデータ処理のためのオープンソースプラットフォームです。Apache Sparkと同様、複数のプログラム言語でハイレベルなAPIを提供しており、ビッグデータソリューションのストリーミング分野で人気があります。

flink.apache.org

Kubernetes(k8s)とは

Kubernetes は、K8sとも呼ばれ、コンテナ化されたアプリケーションのデプロイ、スケーリング、管理を自動化するためのオープンソースのシステムで、アプリケーションを構成するコンテナを論理的な単位にまとめ、管理と発見を容易にします。

kubernetes.io

Kubernetes上でApache Flinkを使ってHologresでデータ処理をする

このガイドラインでは、Kubernetes上のApache Flinkを使って、Hologresによるデータ処理を段階的に作成します。Flinkの公式イメージはACK(Alibaba Cloud Container Service for Kubernetes)にデプロイされます。

f:id:sbc_ohara:20210910094411p:plain

このチュートリアルについて

対象者:

本ガイドラインは、以下のような方を対象としています
- git、docker、Kubernetes,、Alibaba Cloud、Hologres,、ACK(Alibaba Cloud Container Service for Kubernetes)に関する基本的な知識を持っている
- 基本的なデプロイメントの知識があり、Java実行ができる

前提条件:

  • Alibaba Cloudのアカウントを所持している
  • Alibaba Cloud HologresとACK(Alibaba Cloud Container Service for Kubernetes)が使用可能な状態になっている
  • 少なくとも1つのHologresインスタンスを持っている
  • 作業環境にmaven、Java、gitが用意されている

Kubernetesクラスタの準備

ACK(Alibaba Cloud Container Service for Kubernetes)にアクセスし、Flinkデプロイ用のサーバーレスKubernetesクラスタを作成します。

f:id:sbc_ohara:20210910095044p:plain f:id:sbc_ohara:20210910095053p:plain

新しいサーバーレスKubernetesクラスターの作成には、約3分ほどかかります。作成ログを確認しながら、クラスターの準備が整うまで待機します。

f:id:sbc_ohara:20210910095106p:plain f:id:sbc_ohara:20210910095114p:plain f:id:sbc_ohara:20210910095123p:plain

スタンドアロンのFlinkクラスターをKubernetes上にデプロイ

Flink Session clusterは、長時間実行されるKubernetes Deploymentとして実行されます。1つのSessionクラスター上で複数のFlinkジョブを実行することができます。各ジョブはクラスターがデプロイされた後にクラスターにサブミットする必要があります。
KubernetesにおけるFlink Sessionクラスターのデプロイには、少なくとも3つのコンポーネントがあります。
- JobManagerを実行するデプロイメント
- TaskManagersのプールのためのデプロイメント
- JobManagerのRESTおよびUIポートを公開するサービス

以下のステップでは、これらを一つずつ準備していきます。より詳細な情報については、Flink official document を参照してください。yaml設定ファイルの記載方法は、Flink official example configuration yamlにあります。

ci.apache.org

ci.apache.org

ConfigMap の新規作成

Kubernetes クラスタ管理画面に移動し、ConfigMap メニューから以下の yaml をベースにした ConfigMap を新規作成します。

apiVersion: v1
kind: ConfigMap
metadata:
  name: flink-config
  labels:
    app: flink
data:
  flink-conf.yaml: |+
    jobmanager.rpc.address: flink-jobmanager
    taskmanager.numberOfTaskSlots: 2
    blob.server.port: 6124
    jobmanager.rpc.port: 6123
    taskmanager.rpc.port: 6122
    queryable-state.proxy.ports: 6125
    jobmanager.memory.process.size: 1600m
    taskmanager.memory.process.size: 1728m
    parallelism.default: 2    
  log4j-console.properties: |+
    # This affects logging for both user code and Flink
    rootLogger.level = INFO
    rootLogger.appenderRef.console.ref = ConsoleAppender
    rootLogger.appenderRef.rolling.ref = RollingFileAppender

    # Uncomment this if you want to _only_ change Flink's logging
    #logger.flink.name = org.apache.flink
    #logger.flink.level = INFO

    # The following lines keep the log level of common libraries/connectors on
    # log level INFO. The root logger does not override this. You have to manually
    # change the log levels here.
    logger.akka.name = akka
    logger.akka.level = INFO
    logger.kafka.name= org.apache.kafka
    logger.kafka.level = INFO
    logger.hadoop.name = org.apache.hadoop
    logger.hadoop.level = INFO
    logger.zookeeper.name = org.apache.zookeeper
    logger.zookeeper.level = INFO

    # Log all infos to the console
    appender.console.name = ConsoleAppender
    appender.console.type = CONSOLE
    appender.console.layout.type = PatternLayout
    appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n

    # Log all infos in the given rolling file
    appender.rolling.name = RollingFileAppender
    appender.rolling.type = RollingFile
    appender.rolling.append = false
    appender.rolling.fileName = ${sys:log.file}
    appender.rolling.filePattern = ${sys:log.file}.%i
    appender.rolling.layout.type = PatternLayout
    appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
    appender.rolling.policies.type = Policies
    appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
    appender.rolling.policies.size.size=100MB
    appender.rolling.strategy.type = DefaultRolloverStrategy
    appender.rolling.strategy.max = 10

    # Suppress the irrelevant (wrong) warnings from the Netty channel handler
    logger.netty.name = org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
    logger.netty.level = OFF

f:id:sbc_ohara:20210910095147p:plain f:id:sbc_ohara:20210910095156p:plain f:id:sbc_ohara:20210910095205p:plain f:id:sbc_ohara:20210910095214p:plain f:id:sbc_ohara:20210910095224p:plain

job managerのデプロイ

Kubernetesのクラスタ管理画面に移動し、Deploymentメニューから以下のyamlに基づいてjob managerとして新しいDeploymentを作成します。

apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-jobmanager
spec:
  replicas: 1
  selector:
    matchLabels:
      app: flink
      component: jobmanager
  template:
    metadata:
      labels:
        app: flink
        component: jobmanager
    spec:
      containers:
      - name: jobmanager
        image: apache/flink:1.13.0-scala_2.11
        args: ["jobmanager"]
        ports:
        - containerPort: 6123
          name: rpc
        - containerPort: 6124
          name: blob-server
        - containerPort: 8081
          name: webui
        livenessProbe:
          tcpSocket:
            port: 6123
          initialDelaySeconds: 30
          periodSeconds: 60
        volumeMounts:
        - name: flink-config-volume
          mountPath: /opt/flink/conf
        securityContext:
          runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary
      volumes:
      - name: flink-config-volume
        configMap:
          name: flink-config
          items:
          - key: flink-conf.yaml
            path: flink-conf.yaml
          - key: log4j-console.properties
            path: log4j-console.properties

f:id:sbc_ohara:20210910095242p:plain f:id:sbc_ohara:20210910095250p:plain f:id:sbc_ohara:20210910095259p:plain f:id:sbc_ohara:20210910095307p:plain f:id:sbc_ohara:20210910095316p:plain

task managerのデプロイ

同じプロセスの下で、以下の yaml に基づいてTask managerとして新しいDeploymentを作成します。

apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-taskmanager
spec:
  replicas: 2
  selector:
    matchLabels:
      app: flink
      component: taskmanager
  template:
    metadata:
      labels:
        app: flink
        component: taskmanager
    spec:
      containers:
      - name: taskmanager
        image: apache/flink:1.13.0-scala_2.11
        args: ["taskmanager"]
        ports:
        - containerPort: 6122
          name: rpc
        - containerPort: 6125
          name: query-state
        livenessProbe:
          tcpSocket:
            port: 6122
          initialDelaySeconds: 30
          periodSeconds: 60
        volumeMounts:
        - name: flink-config-volume
          mountPath: /opt/flink/conf/
        securityContext:
          runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary
      volumes:
      - name: flink-config-volume
        configMap:
          name: flink-config
          items:
          - key: flink-conf.yaml
            path: flink-conf.yaml
          - key: log4j-console.properties
            path: log4j-console.properties

f:id:sbc_ohara:20210910095331p:plain f:id:sbc_ohara:20210910095340p:plain f:id:sbc_ohara:20210910095348p:plain f:id:sbc_ohara:20210910095359p:plain

これにより、Kubernetesクラスタ上にFlink job managerとtask managerがデプロイされました。これらを元に、関連するサービスを作成する必要があります。

job manager UI サービスの作成

Kubernetes クラスタ管理画面の「サービス」メニューから、以下の yaml に基づいて、job manager UI サービスを作成します。これは非HAモードの場合のみ必要で、job manager のWeb UIポートを公開します。

apiVersion: v1
kind: Service
metadata:
  name: flink-jobmanager
spec:
  type: ClusterIP
  ports:
  - name: rpc
    port: 6123
  - name: blob-server
    port: 6124
  - name: webui
    port: 8081
  selector:
    app: flink
    component: jobmanager

f:id:sbc_ohara:20210910095417p:plain f:id:sbc_ohara:20210910095426p:plain f:id:sbc_ohara:20210910095434p:plain f:id:sbc_ohara:20210910095442p:plain

job manager restサービスの作成

同じプロセスの下で、以下の yaml に基づいて job manager rest サービスとして新しいサービスを作成します。 job manager用の rest ポートを公開します。

apiVersion: v1
kind: Service
metadata:
  name: flink-jobmanager-rest
spec:
  type: NodePort
  ports:
  - name: rest
    port: 8081
    targetPort: 8081
    nodePort: 30081
  selector:
    app: flink
    component: jobmanager

f:id:sbc_ohara:20210910095458p:plain f:id:sbc_ohara:20210910095507p:plain f:id:sbc_ohara:20210910095516p:plain

task manager queryステートサービスの作成

同じプロセスの下で、以下のyamlに基づいて、task manager queryステートサービスとして新しいサービスを作成します。照会可能な状態にアクセスするためのtask managerのポートを公開します。

apiVersion: v1
kind: Service
metadata:
  name: flink-taskmanager-query-state
spec:
  type: NodePort
  ports:
  - name: query-state
    port: 6125
    targetPort: 6125
    nodePort: 30025
  selector:
    app: flink
    component: taskmanager

f:id:sbc_ohara:20210910095533p:plain f:id:sbc_ohara:20210910095542p:plain f:id:sbc_ohara:20210910095552p:plain

[オプション] job managerの公開用アクセスUIポートの設定

インターネット経由でサービスにアクセスする場合は、 job managerのUIポートで公開用の別のサービスが必要です。イントラネット経由でジョブマネージャUIサービスにアクセスする場合、このステップは無視してください。
サーバー管理ページの作成ボタンをクリックし、SLB (Server Load Balancer) を利用した公開用サービスを構築します。

f:id:sbc_ohara:20210910095605p:plain f:id:sbc_ohara:20210910095614p:plain f:id:sbc_ohara:20210910095622p:plain

データを生成してHologresに送信するタスクを準備

Alibaba Cloudでは、Flink用のHologresコネクタが提供されており、Github hologres-flink-examplesでサンプルコードも提供されています。このサンプルコードを使って、データを生成し、Hologresに送信するタスクを作成します。

github.com

まず、コマンドラインで git clone https://github.com/hologres/hologres-flink-examples.git でプロジェクトをクローンします。
サンプルコードではユーザの入力からHologresの接続情報を取得していますが、Kubernetesクラスタでは環境変数を使用する必要があります。
/src/main/java/io/hologres/flink/example/HologresSinkExample.java` を以下のように更新します。

......
    public static void main(String[] args) throws Exception {
        /*Options options = new Options();
        options.addOption("e", "endpoint", true, "Hologres endpoint");
        options.addOption("u", "username", true, "Username");
        options.addOption("p", "password", true, "Password");
        options.addOption("d", "database", true, "Database");
        options.addOption("t", "tablename", true, "Table name");

        CommandLineParser parser = new DefaultParser();
        CommandLine commandLine = parser.parse(options, args);
        String endPoint = commandLine.getOptionValue("endpoint");
        String userName = commandLine.getOptionValue("username");
        String password = commandLine.getOptionValue("password");
        String database = commandLine.getOptionValue("database");
        String tableName = commandLine.getOptionValue("tablename");*/

        String database = System.getenv("HOLO_TEST_DB");
        String userName = System.getenv("HOLO_ACCESS_ID");
        String password = System.getenv("HOLO_ACCESS_KEY");
        String endPoint = System.getenv("HOLO_ENDPOINT");
        String tableName = System.getenv("HOLO_TABLE_NAME");
......

f:id:sbc_ohara:20210910095656p:plain

ビルドプロセス中にエラーメッセージが表示された場合は、pom.xml内のholo-clientのバージョンを更新します。正しいバージョンは、maven repository から入手できます。

mvnrepository.com

ビルドが完了すると、<project root>/target/hologress-flink-examples-1.0.0-jar-with-dependencies が入手出来ますので、これを利用します。

Hologresテーブルの作成

ソースコードによれば、生成されるテストデータは以下のスキーマ構成になっています。そのため、送信データを保存するには、同じスキーマ配下に新しいHologresテーブルを作成する必要があります。

......
        TableSchema schema = TableSchema.builder()
            .field("user_id", DataTypes.BIGINT())
            .field("user_name", DataTypes.STRING())
            .field("item_id", DataTypes.BIGINT())
            .field("item_name", DataTypes.STRING())
            .field("price", DataTypes.DECIMAL(38, 2))
            .field("province", DataTypes.STRING())
            .field("city", DataTypes.STRING())
            .field("longitude", DataTypes.STRING())
            .field("latitude", DataTypes.STRING())
            .field("ip", DataTypes.STRING())
            .field("sale_timestamp", Types.SQL_TIMESTAMP).build();
......

f:id:sbc_ohara:20210910095711p:plain

Hologres接続情報のための新しいSecretsの作成

更新されたソースコードによると、スクリプトは環境変数からHologres接続情報を取得します。そのため、新しいSecretsを作成し、Flinkクラスタに設定する必要があります。

f:id:sbc_ohara:20210910095725p:plain f:id:sbc_ohara:20210910095735p:plain

Hologres 接続情報のための有効な環境変数

job manager、task managerともに編集ボタンをクリックし、作成したSecretsに基づいて環境変数を追加します。Variable Keyが更新されたソースコードと一致することを確認します。

f:id:sbc_ohara:20210910095752p:plain f:id:sbc_ohara:20210910095801p:plain

job manager、task managerの両方を再デプロイして、更新された環境変数をアクティベーションします。

f:id:sbc_ohara:20210910095815p:plain

UIサービスからのタスク投入

外部のエンドポイントからjob managerのUIサービスにアクセスし、ビルドしたJARを使ってタスクを投入します。

f:id:sbc_ohara:20210910095828p:plain

新しいタスクが作成され、生成されたデータが対象のHologresテーブルに送信されます(runningステータスになります)

f:id:sbc_ohara:20210910095841p:plain f:id:sbc_ohara:20210910095850p:plain f:id:sbc_ohara:20210910095858p:plain f:id:sbc_ohara:20210910095906p:plain

実行中のタスクを停止したい場合は、「cancel job」リンクをクリックします。

f:id:sbc_ohara:20210910095920p:plain f:id:sbc_ohara:20210910095929p:plain f:id:sbc_ohara:20210910095937p:plain

最後に

ここまで、Apache FlinkからHologresへリアルタイムデータ連携する方法を紹介しました。
この方法を生かすことで、例えばAWSやGCPなどのKubernetes(k8s)を使ったサービス基盤からApache FlinkでリアルタイムでHologresへ連携、Hologresでリアルタイム可視化を実現することができます。

f:id:sbc_ohara:20210910104701p:plain