【ClickHouse連載】Apache kafka(Message Queue for Apache Kafka)からClickHouseへデータをリアルタイム格納してみる

Hi, データエンジニアの大原です。
今回はAlibaba Cloudの国際サイトで提供している ClickHouse を使って、Apache kafka(Message Queue for Apache Kafka)からClickHouseへデータを連携、格納する方法をご紹介します。

f:id:sbc_ohara:20210714233458p:plain

ClickHouseとは

ClickHouseは非集計データを含む大量のデータを安定的かつ継続しながら集計といったリアルタイム分析を支える列指向の分散型データベースサービスです。 トラフィック分析、広告およびマーケティング分析、行動分析、リアルタイム監視などのビジネスシナリオで幅広く 使用されています。
ApsaraDB for ClickHouseの概要として、詳しいことはこの記事にてまとめています。

www.sbcloud.co.jp

clickhouse.tech

Apache kafka(Message Queue for Apache Kafka)とは

Apache kafkaはスケーラビリティに優れた分散メッセージングシステムです。
これは元々Apacheオープンソースとして展開されていましたが、これをAlibabaによりフルマネージド型サービスとして登場したのがMessage Queue for Apache Kafkaです。

www.alibabacloud.com

1.ClickHouseの準備

1-1.ClickHouseインスタンスを準備します

この手法は過去の記事でも記載していますが、再掲として記載します。

www.sbcloud.co.jp

1)まずはApsaraDB for ClickHouseインスタンスを作成します。
①VPCを作成

f:id:sbc_ohara:20210716155036p:plain

f:id:sbc_ohara:20210716155113p:plain

②ClickHouseインスタンスを作成
著者は以下のインスタンススペックでインスタンスを作成しています。

ClickHouse version:20.8.7.15
Edition:Single-replica Edition

f:id:sbc_ohara:20210716155212p:plain

f:id:sbc_ohara:20210716155220p:plain

f:id:sbc_ohara:20210716155231p:plain

2)ClickHouseの登録アカウントを作成 インスタンスをクリックし、左側にアカウント管理画面で、アカウントを作成します

f:id:sbc_ohara:20210716155256p:plain

3)ClickHouseクラスターにDMSで接続 ①ClickHouseのインスタンスをクリックし、トップメニューの「Log On to Database」をクリックします

f:id:sbc_ohara:20210716155315p:plain

② DBアカウントとパスワードを入力し、ClickHouseへログイン

f:id:sbc_ohara:20210716155334p:plain

③DMS画面でClickHouseのインスタンスが表示されます

f:id:sbc_ohara:20210716155401p:plain

2.Apache kafka(Message Queue for Apache Kafka)の準備

2-1.Kafkaインスタンスを作成します

1)AlibabaCloudのサイトをログインし、Message Queue for Apache Kafka を選択します

f:id:sbc_ohara:20210817012125p:plain

f:id:sbc_ohara:20210817012134p:plain

2)Kafkaインスタンスを作成します

f:id:sbc_ohara:20210817012149p:plain

f:id:sbc_ohara:20210817012158p:plain

f:id:sbc_ohara:20210817012205p:plain

f:id:sbc_ohara:20210817012214p:plain

3)Kafkaをデプロイします

f:id:sbc_ohara:20210817012229p:plain

f:id:sbc_ohara:20210817012237p:plain

f:id:sbc_ohara:20210817012246p:plain

f:id:sbc_ohara:20210817012254p:plain

4)Topicを作成します
①Kafka詳細画面またはTopic画面に、Topic作成をクリックします

f:id:sbc_ohara:20210817012312p:plain

②Topic情報を入力し、Topicを作成します

f:id:sbc_ohara:20210817012324p:plain

③Topicを作成しました

f:id:sbc_ohara:20210817012340p:plain

5)Consumer Groupを作成します
①Consumer Group画面にConsumer Group作成をクリックします

f:id:sbc_ohara:20210817012419p:plain

②Consumer Group情報を入力し、Consumer Groupを作成します

f:id:sbc_ohara:20210817012431p:plain

③Consumer Groupを作成しました

f:id:sbc_ohara:20210817012454p:plain

2-2.IntelliJ IDEAを使ってJava SDKでKafkaデータを作成します

1)JavaProjectを作成します

①IntelliJ IDEAを開き、ファイルメニューからプロジェクトをクリックします

f:id:sbc_ohara:20210817012514p:plain

②Mavenを選択します

f:id:sbc_ohara:20210817012525p:plain

③プロジェクト名を設定します

f:id:sbc_ohara:20210817012536p:plain

④プロジェクト名とパスを設定します(事前にフォルダを作成する必要があります)

f:id:sbc_ohara:20210817012548p:plain

⑤プロジェクトを作成します

f:id:sbc_ohara:20210817012619p:plain

⑥Project Encodingを設定します

f:id:sbc_ohara:20210817012636p:plain

f:id:sbc_ohara:20210817012644p:plain

2)Kafkaデータを生成します
①下記Java依存関係ライブラリをpom.xmlに追加します

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.11.0.3</version>
        <exclusions>
            <exclusion>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-api</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
        <version>1.7.6</version>
    </dependency>
    <dependency>
        <groupId>commons-cli</groupId>
        <artifactId>commons-cli</artifactId>
        <version>1.4</version>
    </dependency>

f:id:sbc_ohara:20210817012700p:plain

②log4j.propertiesコンフィグファイルを作成します

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

log4j.rootLogger=INFO, STDOUT

log4j.appender.STDOUT=org.apache.log4j.ConsoleAppender
log4j.appender.STDOUT.layout=org.apache.log4j.PatternLayout
log4j.appender.STDOUT.layout.ConversionPattern=[%d] %p %m (%c)%n

f:id:sbc_ohara:20210817012717p:plain

③kafka.propertiesを作成します

## アクセスポイントを設定することで、コンソール画面の詳細画面にデフォルトアクセスポイントが表示されます    
bootstrap.servers=172.16.0.84:9092,172.16.0.83:9092,172.16.0.82:9092

## Topicを設定することで、コンソール画面にTopicが作成されます   
topic=topic_ck_new

## Consumer Groupを設定することで、コンソール画面にConsumer Groupが作成されます     
group.id=group_ck

f:id:sbc_ohara:20210817012737p:plain

④JavaKafkaConfigurer.javaを作成します

import java.util.Properties;

public class JavaKafkaConfigurer {
    private static Properties properties;
    public synchronized static Properties getKafkaProperties() {
        if (null != properties) {
            return properties;
        }
        //kafka.propertiesの内容を取得
        Properties kafkaProperties = new Properties();
        try {
            kafkaProperties.load(KafkaProducerDemo.class.getClassLoader().getResourceAsStream("kafka.properties"));
        } catch (Exception e) {
            //ファイルをロード失敗しました、Quite可能
            e.printStackTrace();
        }
        properties = kafkaProperties;
        return kafkaProperties;
    }
}

f:id:sbc_ohara:20210817012912p:plain

⑤KafkaProducerDemo.javaを作成します

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.Future;

import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class KafkaProducerDemo {

    public static void main(String args[]) {
        //kafka.propertiesをロードします     
        Properties kafkaProperties =  JavaKafkaConfigurer.getKafkaProperties();

        Properties props = new Properties();
        //アクセスポイントを設定、コンソール画面に該当Topicのアクセスポイントを取得します     
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getProperty("bootstrap.servers"));

        //Kafkaメッセージのシリアル化方法。
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        //リクエストの最大待機時間。
        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 30 * 1000);
        //内部クライアントの再試行回数を設定します。
        props.put(ProducerConfig.RETRIES_CONFIG, 5);
        //クライアントの内部再試行間隔を設定します。
        props.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, 3000);
        //Producerオブジェクトを作成します,このオブジェクトはスレッドセーフであることに注意してください。一般的に、プロセス内の1つのProducerオブジェクトで十分です。
        //パフォーマンスを向上させたい場合は、さらにいくつかのオブジェクトを作成できますが、多すぎないようにします。できれば5つ以下にします。
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);

        //Kafkaメッセージを作成する。
        String topic = kafkaProperties.getProperty("topic"); //メッセージのTopic、コンソール画面で作成後、入力します     
        String value = "this is the message's value"; //メッセージ内容。

        try {
            //Futureオブジェクトをバッチ取得することでスピードアップできる。バッチ量は大きく設定しないことをご注意ください
            List<Future<RecordMetadata>> futures = new ArrayList<Future<RecordMetadata>>(128);
            for (int i =0; i < 100; i++) {
                //メッセージを発信、Futureオブジェクトを取得します     
                String msgBody = "{'index':" + i + ", 'content': '" + value + ": " + i + "'}"; //jsonastring
                ProducerRecord<String, String> kafkaMessage =  new ProducerRecord<String, String>(topic, msgBody);//jasonasstring
                //ProducerRecord<String, String> kafkaMessage =  new ProducerRecord<String, String>(topic, value + ": " + i);
                Future<RecordMetadata> metadataFuture = producer.send(kafkaMessage);
                futures.add(metadataFuture);

            }
            producer.flush();
            for (Future<RecordMetadata> future: futures) {
                //Futureオブジェクトの結果を同時に取得します     
                try {
                    RecordMetadata recordMetadata = future.get();
                    System.out.println("Produce ok:" + recordMetadata.toString());
                } catch (Throwable t) {
                    t.printStackTrace();
                }
            }
        } catch (Exception e) {
            //クライアント内部リトライ後、再度発信失敗、このエラーを対応する必要があります     
            System.out.println("error occurred");
            e.printStackTrace();
        }
    }
}

f:id:sbc_ohara:20210817013000p:plain

f:id:sbc_ohara:20210817013009p:plain

⑥プロジェクトをコンパイルし、KafkaConsumerDemo.javaを実行します

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;

public class KafkaConsumerDemo {

    public static void main(String args[]) {
        //kafka.propertiesをロードします     
        Properties kafkaProperties =  JavaKafkaConfigurer.getKafkaProperties();

        Properties props = new Properties();
        //アクセスポイントを設定、コンソール画面に該当Topicのアクセスポイントを取得します     
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getProperty("bootstrap.servers"));
        //2つのポーリング間の最大許容間隔。
        //コンシューマーがこの値を超えるとハートビートを返さない、サーバーはコンシューマーが非ライブ状態であると判断し、サーバーはコンシューマーをコンシューマーグループから削除してリバランスをトリガーします。デフォルトは30秒です。
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
        //毎回ポーリングの最大数。
        //この値は大きく設定しないように、ポーリングのデータが多すぎると次のポーリングの前に消費できない場合、SLBがトリガーされ、フリーズが発生します。
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30);
        //メッセージを逆シリアル化する方法
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        //コンシューマーインスタンスが属するコンシューマーグループは、コンソールアで作成後入力します。
        //同じグループに属するコンシューマーインスタンスは、コンシューマーメッセージをロードします
        props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProperties.getProperty("group.id"));
        //メッセージオブジェクトを構築します。つまり、消費インスタンスを生成します
        KafkaConsumer<String, String> consumer = new org.apache.kafka.clients.consumer.KafkaConsumer<String, String>(props);
        //コンシューマーグループがサブスクライブするトピックを設定する。複数のトピックをサブスクライブできます。
        //GROUP_ID_CONFIGが同じである場合は、サブスクライブされたトピックを同じに設定することをお勧めします。
        List<String> subscribedTopics =  new ArrayList<String>();
        //複数のトピックをサブスクライブする必要がある場合は、ここに追加してください。
        //各トピックは、予めコンソールで作成する必要があります
        String topicStr = kafkaProperties.getProperty("topic");
        String[] topics = topicStr.split(",");
        for (String topic: topics) {
            subscribedTopics.add(topic.trim());
        }
        consumer.subscribe(subscribedTopics);

        //リサイクルでメッセージを消費します
        while (true){
            try {
                ConsumerRecords<String, String> records = consumer.poll(1000);
                //次のポーリングの前にこのデータが消費される必要があり、合計時間はSESSION_TIMEOUT_MS_CONFIGを超えてはなりません。
                //メッセージを消費するために単独なスレッドプールを開くことをお勧めします。結果を非同期で返すこと
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println(String.format("Consume partition:%d offset:%d", record.partition(), record.offset()));
                }
            } catch (Exception e) {
                try {
                    Thread.sleep(1000);
                } catch (Throwable ignore) {

                }
                e.printStackTrace();
            }
        }
    }
}

f:id:sbc_ohara:20210817013051p:plain

⑦プロジェクトをコンパイルし、KafkaProducerDemo.javaを実行します

f:id:sbc_ohara:20210817013103p:plain

⑧プロジェクトをビルドします

f:id:sbc_ohara:20210817013115p:plain

f:id:sbc_ohara:20210817013124p:plain

⑧下記コマンドでJarパッケージを作成します

mvn package

f:id:sbc_ohara:20210817013141p:plain

f:id:sbc_ohara:20210817013151p:plain

2-3.Linuxでkafkaデータを生成します
①linuxをログインし、jarファイルをLinuxにアップロードします

f:id:sbc_ohara:20210817013206p:plain

② jarを実行し、Kafkaデータを生成します

# java -jar kafkaclickhouse-1.0-SNAPSHOT-jar-with-dependencies.jar

f:id:sbc_ohara:20210817013229p:plain

f:id:sbc_ohara:20210817013237p:plain

③Kafkaコンソール画面でデータを確認します

f:id:sbc_ohara:20210817013249p:plain

④JarパッケージでKafkaデータを生成します

# java -classpath kafkaclickhouse-1.0-SNAPSHOT-jar-with-dependencies.jar com.test.kafka.KafkaProducerDemo

f:id:sbc_ohara:20210817013302p:plain

f:id:sbc_ohara:20210817013313p:plain

⑤JarパッケージでKafkaデータを消費します

# java -classpath kafkaclickhouse-1.0-SNAPSHOT-jar-with-dependencies.jar com.test.kafka.KafkaConsumerDemo

f:id:sbc_ohara:20210817013335p:plain

f:id:sbc_ohara:20210817013344p:plain

3.KafkaデータをClickHouseにインポート

3-1.ClickHouseでテーブルを作成します

1)DMSでClickhouseへ接続します

f:id:sbc_ohara:20210817013414p:plain

f:id:sbc_ohara:20210817013424p:plain

f:id:sbc_ohara:20210817013432p:plain

2)データベースを作成します

create database if not exists kafka_clickhouse_demo ON CLUSTER default;  

f:id:sbc_ohara:20210817013446p:plain

3)ClickHouseでKafkaコンシューマーテーブルを作成します。ここではkafka_clickhouse_demoデータベースを使用します。

create table kafka_src_table(
`message` String
)  
ENGINE = Kafka()
SETTINGS kafka_broker_list = '172.16.0.84:9092,172.16.0.83:9092,172.16.0.82:9092',
         kafka_topic_list = 'topic_ck_new',
         kafka_group_name = 'group_ck', 
         kafka_format = 'JSONAsString';

f:id:sbc_ohara:20210817013544p:plain

※kafka_formatおよびパラメータがKafkaのデータソースと一致する必要があります
※Kafkaコンシューマーテーブル消費テーブルを結果テーブルとして直接使用することはできません。Kafka消費テーブルは、Kafkaデータを消費するためにのみ使用されます。そのため、kafka側で実際にすべてのデータを保存しているわけではありません。

4)ClickHouseでローカルテーブルを作成します

CREATE TABLE kafka_table_local(
    `message` String
)
ENGINE = MergeTree()
ORDER BY message;

f:id:sbc_ohara:20210817013639p:plain

5)Clickhouseの分散テーブルを作成します

CREATE TABLE kafka_table_distributed ON CLUSTER default AS kafka_clickhouse_demo.kafka_table_local
ENGINE = Distributed(default, kafka_clickhouse_demo,kafka_table_local, rand());

f:id:sbc_ohara:20210817013700p:plain

6)ClickHouseでMATERIALIZED VIEWテーブルを作成します

CREATE MATERIALIZED VIEW source_mv TO kafka_table_distributed AS
SELECT 
    `message` 
FROM kafka_src_table;

f:id:sbc_ohara:20210817013717p:plain

③テーブルを表示します

show tables;

f:id:sbc_ohara:20210817013734p:plain

3-2.kafkaでデータを生成します

1)下記コマンドでKafkaデータを生成します

# java -jar kafkaclickhouse-1.0-SNAPSHOT-jar-with-dependencies.jar

または

# java -classpath kafkaclickhouse-1.0-SNAPSHOT-jar-with-dependencies.jar com.test.kafka.KafkaProducerDemo

f:id:sbc_ohara:20210817013756p:plain

f:id:sbc_ohara:20210817013805p:plain

2)Kafkaコンソール画面でデータを確認します

f:id:sbc_ohara:20210817013820p:plain

3-3.Clickhouseでデータをインポートします

1)kafka_src_tableを確認します

SELECT * FROM `kafka_src_table` LIMIT 20;
SELECT COUNT( *) FROM kafka_src_table;

f:id:sbc_ohara:20210817013836p:plain

f:id:sbc_ohara:20210817013845p:plain

2)kafka_table_localを確認します

SELECT * FROM `kafka_table_local` LIMIT 20;
SELECT COUNT( *) FROM kafka_table_local;

f:id:sbc_ohara:20210817013903p:plain

f:id:sbc_ohara:20210817013911p:plain

3)kafka_table_distributedを確認します

SELECT * FROM `kafka_table_distributed` LIMIT 20;
SELECT COUNT( *) FROM kafka_table_distributed;

f:id:sbc_ohara:20210817013924p:plain

f:id:sbc_ohara:20210817013932p:plain

4)source_mvを確認します

SELECT * FROM `source_mv` LIMIT 20;
SELECT COUNT( *) FROM source_mv;

f:id:sbc_ohara:20210817013946p:plain

f:id:sbc_ohara:20210817013954p:plain


最後に

ここまで、Apache kafka(Message Queue for Apache Kafka)- ClickHouseのデータ連携方法を紹介しました。
ApsaraDB for ClickHouseは Apache kafka とスムーズに連携できるので、Apache kafka(Message Queue for Apache Kafka)もしくはApache kafka - ClickHouse といったリアルタイムデータ分析ソリューションとして仕上げることもできます。

Special Thanks, Nancy