【ClickHouse連載】Apache FlinkからClickHouseへデータをリアルタイム格納してみる

Hi, データエンジニアの大原です。
今回はAlibaba Cloudの国際サイトで提供している ClickHouse を使って、Apache FlinkからClickHouseへデータ連携する方法をご紹介します。

f:id:sbc_ohara:20210714233458p:plain

ClickHouseとは

ClickHouseは非集計データを含む大量のデータを安定的かつ継続しながら集計といったリアルタイム分析を支える列指向の分散型データベースサービスです。 トラフィック分析、広告およびマーケティング分析、行動分析、リアルタイム監視などのビジネスシナリオで幅広く 使用されています。
ApsaraDB for ClickHouseの概要として、詳しいことはこの記事にてまとめています。

www.sbcloud.co.jp

clickhouse.tech

Apache Flinkとは

オープンソースの分散ストリーム処理プラットフォームです。

flink.apache.org

本記事では、Apache FlinkからClickHouseへデータをリアルタイム格納してみます。構成図で次の通りです。

f:id:sbc_ohara:20210824161648p:plain

1.ClickHouseClientの準備

1-1.ClickHouseインスタンスを準備します

この手法は過去の記事でも記載していますが、再掲として記載します。

www.sbcloud.co.jp

1)まずはApsaraDB for ClickHouseインスタンスを作成します。
①VPCを作成

f:id:sbc_ohara:20210716155036p:plain

f:id:sbc_ohara:20210716155113p:plain

②ClickHouseインスタンスを作成
著者は以下のインスタンススペックでインスタンスを作成しています。

ClickHouse version:20.8.7.15
Edition:Single-replica Edition

f:id:sbc_ohara:20210716155212p:plain

f:id:sbc_ohara:20210716155220p:plain

f:id:sbc_ohara:20210716155231p:plain

2)ClickHouseの登録アカウントを作成 インスタンスをクリックし、左側にアカウント管理画面で、アカウントを作成します

f:id:sbc_ohara:20210716155256p:plain

3)ClickHouseクラスターにDMSで接続 ①ClickHouseのインスタンスをクリックし、トップメニューの「Log On to Database」をクリックします

f:id:sbc_ohara:20210716155315p:plain

② DBアカウントとパスワードを入力し、ClickHouseへログイン

f:id:sbc_ohara:20210716155334p:plain

③DMS画面でClickHouseのインスタンスが表示されます

f:id:sbc_ohara:20210716155401p:plain

2. Apache Flinkプロジェクトを作成

2-1.IntelliJ IDEAをインストールします。(具体的な説明は本記事では省略)

1)IntelliJ IDEAを起動します

f:id:sbc_ohara:20210824161929p:plain

1)フォルダを作成します

f:id:sbc_ohara:20210824161953p:plain

2)Intellij ideaでプロジェクトを作成します
前提条件として、Java1.8がインストールされていることが必須です

①File‐New‐Projectをクリックします

f:id:sbc_ohara:20210824162208p:plain

②Mavenを選択します

f:id:sbc_ohara:20210824195607p:plain

③フォルダを選択します

f:id:sbc_ohara:20210824195621p:plain

④Finishをクリックします

f:id:sbc_ohara:20210824195633p:plain

⑤Projectが作成されたことを確認します

f:id:sbc_ohara:20210824195648p:plain

3)プロジェクトを編集します
ディレクトリ構造

./pom.xml
./src/main/java/com/nancy/flink/ck/FlinkCkSinkSample.java
./src/main/resources/access_log_csv.txt

①pom.xmlを編集します

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>flink-clickhouse-sink</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-core</artifactId>
            <version>1.12.3</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_2.11</artifactId>
            <version>1.12.3</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_2.11</artifactId>
            <version>1.12.3</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.11</artifactId>
            <version>1.12.3</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>ru.ivi.opensource</groupId>
            <artifactId>flink-clickhouse-sink</artifactId>
            <version>1.3.1</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.1.1</version>
                <executions>
                    <!-- Run shade goal on package phase -->
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>com.nancy.flink.ck.FlinkCkSinkSample</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

f:id:sbc_ohara:20210824195725p:plain

② FlinkCkSinkSample.javaを編集します

package com.nancy.flink.ck;

import org.apache.commons.io.IOUtils;
        import org.apache.flink.api.java.utils.ParameterTool;
        import org.apache.flink.streaming.api.datastream.DataStream;
        import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
        import ru.ivi.opensource.flinkclickhousesink.ClickHouseSink;
        import ru.ivi.opensource.flinkclickhousesink.model.ClickHouseClusterSettings;
        import ru.ivi.opensource.flinkclickhousesink.model.ClickHouseSinkConst;

        import java.io.InputStream;
        import java.util.HashMap;
        import java.util.List;
        import java.util.Map;
        import java.util.Properties;

/**
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE
 * file distributed with this work for additional information regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
 * License.  You may obtain a copy of the License at
 * <p>
 * http://www.apache.org/licenses/LICENSE-2.0
 * <p>
 * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
 * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
 * specific language governing permissions and limitations under the License.
 */
public class FlinkCkSinkSample {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        buildClickHouseGlobalSinkParams(env);
        InputStream is = FlinkWithCkSinkSample.class.getClassLoader().getResourceAsStream("access_log_csv.txt");
        List<String> userRecList = IOUtils.readLines(is, "UTF-8");
        DataStream<String> userDataStream = env.fromCollection(userRecList);
        Properties props = new Properties();
        props.put(ClickHouseSinkConst.TARGET_TABLE_NAME, "flink_table_local");
        props.put(ClickHouseSinkConst.MAX_BUFFER_SIZE, "10000");
        userDataStream.map(FlinkWithCkSinkSample::toClickHouseInsertFormatFields)
                .name("convert raw record to ClickHouse table format")
                .addSink(new ClickHouseSink(props))
                .name("flink_table_distributed");
        env.execute();
    }

    // CK: d_sink_table(`id` Int32, `user_name` String, `age` Int32, `city` String, `access_url` String)
    // RAW Record: 190|30|M|administrator|95938
    public static String toClickHouseInsertFormatFields(String userRowRecord) {
        String[] fields = userRowRecord.split("\\|");
        System.out.println(fields.length);
        String rec = String.format("(%s, '%s', %s, '%s', '%s')", fields[0], fields[1], fields[2], fields[3], fields[4]);
        System.out.println(rec);
        return rec;
    }

    private static void buildClickHouseGlobalSinkParams(StreamExecutionEnvironment environment) {
        Map<String, String> globalParameters = new HashMap<>();

        // ClickHouse cluster properties
        globalParameters.put(ClickHouseClusterSettings.CLICKHOUSE_HOSTS, "cc-0iw4v4hezq9lw9333o.ads.aliyuncs.com:8123");
        globalParameters.put(ClickHouseClusterSettings.CLICKHOUSE_USER, "sbtest");
        globalParameters.put(ClickHouseClusterSettings.CLICKHOUSE_PASSWORD, "Test1234");

        // sink common
        globalParameters.put(ClickHouseSinkConst.TIMEOUT_SEC, "5");

        globalParameters.put(ClickHouseSinkConst.FAILED_RECORDS_PATH, System.getProperty("user.home"));
        globalParameters.put(ClickHouseSinkConst.NUM_WRITERS, "3");
        globalParameters.put(ClickHouseSinkConst.NUM_RETRIES, "5");
        globalParameters.put(ClickHouseSinkConst.QUEUE_MAX_CAPACITY, "1024");
        globalParameters.put(ClickHouseSinkConst.IGNORING_CLICKHOUSE_SENDING_EXCEPTION_ENABLED, "false");

        // set global paramaters
        ParameterTool parameters = ParameterTool.fromMap(globalParameters);
        environment.getConfig().setGlobalJobParameters(parameters);
    }
}

packageを作成します

f:id:sbc_ohara:20210824195753p:plain

FlinkCkSinkSample.javaを作成します

f:id:sbc_ohara:20210824195808p:plain

f:id:sbc_ohara:20210824195816p:plain

f:id:sbc_ohara:20210824195825p:plain

③データソースを用意します

f:id:sbc_ohara:20210824195838p:plain

4)プロジェクトをコンパイルし、Jarをパッケージ化します
①プロジェクトを選択し、右クリックメニューからビルドします

f:id:sbc_ohara:20210824195901p:plain

f:id:sbc_ohara:20210824195910p:plain

②下記コマンドでパッケージをします

mvn clean compile package

f:id:sbc_ohara:20210824195922p:plain

f:id:sbc_ohara:20210824195931p:plain

パッケージを確認します

f:id:sbc_ohara:20210824195946p:plain

3. ECS上のLinuxでApache Flink環境の準備

3-1.Java1.8をインストールします

1)jdk1.8を用意します
①jdk-8u144-linux-x64.tar.gzをダウンロードします

(下記のリンクからでもjdkがダウンロードすることができます)

www.oracle.com

f:id:sbc_ohara:20210824200042p:plain

②jdk-8u144-linux-x64.tar.gzをjavaフォルダに解凍します

# mkdir /usr/local/java/
# tar -zxvf jdk-8u144-linux-x64.tar.gz -C /usr/local/java/

2)Java環境を設定します
①Java環境パスを設定します

# vim /etc/profile
    export JAVA_HOME=/usr/local/java/jdk1.8.0_144
    export PATH=${JAVA_HOME}/bin:$PATH
    export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib
    export JRE_HOME=$JAVA_HOME/jre

f:id:sbc_ohara:20210824200132p:plain

②Java環境パスを有効します

# source /etc/profile
# ln -s /usr/local/java/jdk1.8.0_144/bin/java /usr/bin/java

f:id:sbc_ohara:20210824200148p:plain

③Javaバージョンを確認します

java -version

f:id:sbc_ohara:20210824200203p:plain

3-2. Apache Flink1.12をインストールします

①下記コマンドでApache Flink1.12をダウンロードします

# wget https://mirrors.tuna.tsinghua.edu.cn/apache/flink/flink-1.12.5/flink-1.12.5-bin-scala_2.12.tgz

f:id:sbc_ohara:20210824200228p:plain

②Apache Flink1.12を解凍します

f:id:sbc_ohara:20210824200242p:plain

③下記コマンドでApache Flink1.12を起動します

# cd flink-1.12.5
# ./bin/start-cluster.sh

f:id:sbc_ohara:20210824200307p:plain

4.Linux - Apache FlinkでtxtファイルデータをClickHouseへ格納

4-1.ClickHouseでターゲットテーブルを作成します

1)DMSでClickhouseを接続します

f:id:sbc_ohara:20210824200341p:plain

f:id:sbc_ohara:20210824200349p:plain

f:id:sbc_ohara:20210824200359p:plain

2)ClickHouseでデフォルトDBにローカルテーブルを作成します

create table flink_table_local on cluster default(
   id Int32,
   user_name String,
   age Int32,
   city String,
   access_url String
)
engine = MergeTree()
order by id;

f:id:sbc_ohara:20210824200438p:plain

3)ClickHouseでデフォルトDBに分散テーブルを作成します

create table flink_table_distributed on cluster default(
   id Int32,
   user_name String,
   age Int32,
   city String,
   access_url String
)
engine = Distributed(default, default, flink_table_local, id);

f:id:sbc_ohara:20210824200454p:plain

4-2.Linux - Apache FlinkでtxtファイルデータをClickHouseへ格納します

1)rzコマンドでJarパッケージをECSにアップロードします
rzコマンドがインストールされていない場合、下記コマンドでインストールします

yum -y install lrzsz

f:id:sbc_ohara:20210824200531p:plain

無事アップロードされたことを確認します

f:id:sbc_ohara:20210824200601p:plain

2)下記コマンドでJarを実行します

# ./bin/flink run flink-clickhouse-sink-1.0-SNAPSHOT.jar 
Job has been submitted with JobID 2de615f64f920ccd47151f3839b4384e
Program execution finished
Job with JobID 2de615f64f920ccd47151f3839b4384e has finished.
Job Runtime: 1274 ms

f:id:sbc_ohara:20210824200611p:plain

4-3.DMSでClickHouseのデータを確認します

1)DMSでtxtデータをClickHouseにインポートすることを確認します
①分散テーブルを検索します

SELECT * FROM flink_table_distributed;
SELECT COUNT(*) FROM flink_table_distributed;

f:id:sbc_ohara:20210824200626p:plain

f:id:sbc_ohara:20210824200635p:plain

②ローカルテーブルを検索します

SELECT * FROM flink_table_local;
SELECT COUNT(*)  FROM flink_table_local;

f:id:sbc_ohara:20210824200651p:plain

f:id:sbc_ohara:20210824200659p:plain


最後に

ここまで、Apache Flink - ApsaraDB for ClickHouseへデータ連携する方法を紹介しました。
ApsaraDB for ClickHouseはApache Flinkとスムーズに連携できるので、例えば、Apache FlinkやRealtime ComputeなどからApsaraDB for ClickHouseへリアルタイムデータ連携しつつ、ClickHouseでリアルタイム可視化、といったソリューションを構築することもできます。

Special Thanks, Nancy