E-MapReduce(Kafka)とElasticSearchの連携を試しました。

こんにちは Kouです。

前回の記事では、Alibaba Cloud E-MapReduceを利用したApache KafkaとApache Spark Streamingの統合方法を紹介しました。今回の記事は、引き続きTwitterメッセージの例を用いて、多くのエンタープライズ企業で導入されたKafkaとElasticsearchの構成について、Alibaba Cloud上での統合方法を説明してみたいと思います。

検証環境について

Kafka
  • EMR-3.20.0
  • Zookeeper 3.4.13
  • Kakfa 1.1.1
  • クラスタータイプは Kafka
  • ハードウェア構成(Header)はecs.sn2.largeを1台
  • ハードウェア構成(Worker)はecs.sn2.largeを2台

※ クラスターの作成手順は公式ドキュメントにご参照いただけますと幸いです。

# cat /etc/redhat-release
CentOS Linux release 7.4.1708 (Core) 
# uname -r
3.10.0-693.2.2.el7.x86_64
# echo envi | nc localhost 2181
Environment:
zookeeper.version=3.4.13-2d71af4dbe22557fda74f9a9b4309b15a7487f03, built on 06/29/2018 04:05 GMT
host.name=emr-header-1.cluster-43709
java.version=1.8.0_151
java.vendor=Oracle Corporation
java.home=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.151-1.b12.el7_4.x86_64/jre
java.class.path=/usr/lib/zookeeper-current/bin/../build/classes:/usr/lib/zookeeper-current/bin/../build/lib/*.jar:/usr/lib/zookeeper-current/bin/../lib/slf4j-log4j12-1.7.25.jar:/usr/lib/zookeeper-current/bin/../lib/slf4j-api-1.7.25.jar:/usr/lib/zookeeper-current/bin/../lib/netty-3.10.6.Final.jar:/usr/lib/zookeeper-current/bin/../lib/log4j-1.2.17.jar:/usr/lib/zookeeper-current/bin/../lib/jline-0.9.94.jar:/usr/lib/zookeeper-current/bin/../lib/audience-annotations-0.5.0.jar:/usr/lib/zookeeper-current/bin/../zookeeper-3.4.13.jar:/usr/lib/zookeeper-current/bin/../src/java/lib/*.jar:/etc/ecm/zookeeper-conf::/var/lib/ecm-agent/data/jmxetric-1.0.8.jar
java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
java.io.tmpdir=/tmp
java.compiler=<NA>
os.name=Linux
os.arch=amd64
os.version=3.10.0-693.2.2.el7.x86_64
user.name=hadoop
user.home=/home/hadoop
user.dir=/home/hadoop
Hosebird Client

Hosebird Clientというのは、Kafka Producerと連携する時に、TwitterのStreaming APIをコールするJava Http Clientです。 詳しく知りたい方は下記のgithubにご参考頂ければと思います。 github.com

全体構成図

まずはじめに、Alibaba Cloudの構成図は以下となります。Elasticsearch環境を素早く構築する為、Alibaba Cloudのホスト型Elasticsearchを使いました。現時点はAlibaba Cloud Elasticsearchが Elasticsearch 5.5.3 with Commercial Feature、Elasticsearch 6.3.2 with Commercial Feature、Elasticsearch 6.7.0 with Commercial Feature3つのバージョンをサポートしており、エンタープライズレベルのアクセス制御、セキュリティモニタリング、アラーム、可視化レポート、機械学習などのX-Packプラグインも含まれております。

f:id:sbc_kou:20190807105656p:plain

Kafka Producer

準備が整ったので、ローカルのJava開発環境で早速コードを書いていきましょう! まずは、以下のサンプルコードでKafka Producerを作成し、jarファイルを生成します。

Kakfa BootstrapServers

Kafkaクラスターの任意の一台のIPアドレスで構いません。

Twitter Streaming API認証情報

Twitter Streaming APIを利用する為に、consumerKey、consumerSecret、token、secretをそれぞれ事前に取得して入力します。

public class ProducerTest {

    Logger logger = LoggerFactory.getLogger(ProducerTest.class.getName());

 /** ---------------------- Twitter Streaming API情報 ---------------------- */
    String consumerKey = "xxxxxxxxxxxxxxxx";
    String consumerSecret = "xxxxxxxxxxxxxxxx";
    String token = "xxxxxxxxxxxxxxxx";
    String secret = "xxxxxxxxxxxxxxxx";
    String mytopic = "tweets_poc";

 /** ---------------------- Tweetsキーワードを指定 ---------------------- */
    List<String> terms = Lists.newArrayList("bitcoin","Blockchain","IoT","5G");


    public ProducerTest(){}

    public static void main(String[] args) {
        new ProducerTest().run();
    }

    public void run(){

        logger.info("Setup");

        BlockingQueue<String> msgQueue = new LinkedBlockingQueue<String>(1000);

        Client client = createTwitterClient(msgQueue);
        client.connect();

        KafkaProducer<String, String> producer = createKafkaProducer();

        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            logger.info("stopping application...");
            logger.info("shutting down client from twitter...");
            client.stop();
            logger.info("closing producer...");
            producer.close();
            logger.info("done!");
        }));

        while (!client.isDone()) {
            String msg = null;
            try {
                msg = msgQueue.poll(5, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                e.printStackTrace();
                client.stop();
            }

            if (msg != null){
                logger.info(msg);
                if(StringUtils.containsIgnoreCase(msg,"Bitcoin")){
                    producer.send(new ProducerRecord<>(mytopic, "Bitcoin", msg), new Callback() {
                        @Override
                        public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                            if (e != null) {
                                logger.error("Something bad happened", e);
                            }
                        }
                    });
                }
                else if (StringUtils.containsIgnoreCase(msg,"Blockchain")) {
                    producer.send(new ProducerRecord<>(mytopic, "Blockchain", msg), new Callback() {
                        @Override
                        public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                            if (e != null) {
                                logger.error("Something bad happened", e);
                            }
                        }
                    });
                }
                else if (StringUtils.containsIgnoreCase(msg,"IoT")) {
                    producer.send(new ProducerRecord<>(mytopic, "IoT", msg), new Callback() {
                        @Override
                        public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                            if (e != null) {
                                logger.error("Something bad happened", e);
                            }
                        }
                    });
                }
                else if (StringUtils.containsIgnoreCase(msg,"5G")) {
                    producer.send(new ProducerRecord<>(mytopic,5,"5G", msg));
                }
                else{
                    producer.send(new ProducerRecord<>(mytopic, null, msg), new Callback() {
                        @Override
                        public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                            if (e != null) {
                                logger.error("Something bad happened", e);
                            }
                        }
                    });
                }
            }
        }
        logger.info("End of application");
    }
   
 /** ---------------------- Hosebird Clientを作成 ---------------------- */
    public Client createTwitterClient(BlockingQueue<String> msgQueue){

        Hosts hosebirdHosts = new HttpHosts(Constants.STREAM_HOST);
        StatusesFilterEndpoint hosebirdEndpoint = new StatusesFilterEndpoint();

        hosebirdEndpoint.trackTerms(terms);

        Authentication hosebirdAuth = new OAuth1(consumerKey, consumerSecret, token, secret);

        ClientBuilder builder = new ClientBuilder()
                .name("Hosebird-Client-01")                              
                .hosts(hosebirdHosts)
                .authentication(hosebirdAuth)
                .endpoint(hosebirdEndpoint)
                .processor(new StringDelimitedProcessor(msgQueue));

        Client hosebirdClient = builder.build();
        return hosebirdClient;
    }

  /** ---------------------- kakfa producerを作成 ---------------------- */
    public KafkaProducer<String, String> createKafkaProducer(){

        String bootstrapServers = "xxxxxxxxxxxxxxxx";

        Properties properties = new Properties();
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        properties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
        properties.setProperty(ProducerConfig.ACKS_CONFIG, "all");
        properties.setProperty(ProducerConfig.RETRIES_CONFIG, Integer.toString(Integer.MAX_VALUE));
        properties.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5"); 

        properties.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
        properties.setProperty(ProducerConfig.LINGER_MS_CONFIG, "20");
        properties.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, Integer.toString(32*1024));

        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
        return producer;
    }

}

Kafka Consumer

ローカルのJava開発環境で、Consumerを作成しましょう。

Elasticsearch認証情報

ElasticSearchサービスを利用する為に、 elasticsearch側のusername、password、アクセスエンドポイントをそれぞれ事前に取得しておきます。

Elasticsearchの事前準備

kibanaコンソールから、ElaticsearchのIndex(twitter)とType(tweets)を事前に作成しておきます。

public class ElasticSearchConsumer {

    public static RestHighLevelClient createClient(){

        /** ---------------------- ElasticSearch認証情報 ---------------------- */
        String hostname = "xxxxxxxxxxxxxxxx"; 
        String username = "xxxxxxxxxxxxxxxx"; 
        String password = "xxxxxxxxxxxxxxxx"; 

        final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
        credentialsProvider.setCredentials(AuthScope.ANY,
                new UsernamePasswordCredentials(username, password));

        RestClientBuilder builder = RestClient.builder(
                new HttpHost(hostname, 9200, "http"))
                .setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
                    @Override
                    public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
                        return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
                    }
                });

        RestHighLevelClient client = new RestHighLevelClient(builder);
        return client;
    }

    public static KafkaConsumer<String, String> createConsumer(String topic){

        String bootstrapServers = "xxxxxxxxxxxxxxxx";
        String groupId = "kafka-demo-elasticsearch";

         /** ---------------------- consumer パラメータ設定 ---------------------- */
        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); 
        properties.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "10"); 

        /** ---------------------- consumer 作成 ---------------------- */
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
        consumer.subscribe(Arrays.asList(topic));

        return consumer;

    }

    private static JsonParser jsonParser = new JsonParser();

    private static String extractIdFromTweet(String tweetJson){
        
        return jsonParser.parse(tweetJson)
                .getAsJsonObject()
                .get("id_str")
                .getAsString();
    }

    public static void main(String[] args) throws IOException {

        Logger logger = LoggerFactory.getLogger(ElasticSearchConsumer.class.getName());
        RestHighLevelClient client = createClient();

        KafkaConsumer<String, String> consumer = createConsumer("tweets_poc");

        while(true){
            ConsumerRecords<String, String> records =
                    consumer.poll(Duration.ofMillis(100)); 

            Integer recordCount = records.count();
            logger.info("Received " + recordCount + " records");    

            for (ConsumerRecord<String, String> record : records){

                try {
                    String id = extractIdFromTweet(record.value());

                    /** ---------------------- データをElasticSearchに挿入 ---------------------- */
                    IndexRequest indexRequest = new IndexRequest(
                            "twitter",
                            "tweets",
                            id 
                    ).source(record.value(), XContentType.JSON);

                    
                    IndexResponse indexResponse = client.index(indexRequest,RequestOptions.DEFAULT);
                    logger.info(indexResponse.getId());

                } catch (NullPointerException e){
                    logger.warn("skipping bad data: " + record.value());
                }

            }

            if(recordCount > 0){

                logger.info("Committing offsets...");
                consumer.commitSync();
                logger.info("Offsets have been committed");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }
        }

    }
}

実行

まず、作成したKafka ProducerとKafka ConsumerのJarファイルを一旦Alibaba Cloud OSSにアップロードしておきます。そして、ECS(Kafka Producer)にsshログインして、ossutilなどのツールでKafka Producerのjarファイル(TweetsProducerTest-1.0-jar-with-dependencies.jar)をECSにダウンロードしていきます。ダウンロードできたら、下記のコマンドでKafka Producerを起動して、Twitterに投稿したメッセージを収集させます。

java -jar TweetsProducerTest-1.0-jar-with-dependencies.jar

同様に、ECS(Kafka Consumer)にsshログインして、Kafka Consumerのjarファイル(TweetsProducerTest-1.0-jar-with-dependencies.jar)をダウンロードしていきます。ダウンロードできたら、また下記のコマンドでKafka Consumerを起動して、KafkaのPartitionからメッセージを読み出します。

java -jar kafka-elasticsearch-poc-1.0-jar-with-dependencies.jar

上記のコマンドを実行したら、下図のように、kafkaから読み出したTwitterメッセージのIDをターミナル画面上で確認することができます。 f:id:sbc_kou:20190807110631p:plain

そこで、上記の赤枠のTwitterIDを例として、ElasticSearchのkibanaで検索して確認してみます。まず、Alibaba Cloudコンソールからkibanaへアクセスします。 f:id:sbc_kou:20190820143751p:plain 赤枠のIDをkibanaコンソールで検索すると、該当のTwitterメッセージが出てきました! f:id:sbc_kou:20190807110421p:plain

最後

いかがでしたでしょうか

Apache KafkaとElasticsearchは、これまで見てきた数多くの企業で実際に利用されており、KafkaはElasticsearchを利用する前に、データのストリーム処理に重要な役割を担っている一方、Elasticsearch は元のデータをそのまま保存するのではなく、高速検索に適用されます。また、今回ご紹介したKafka以外、クラウドマネージドサービスを望むようでしたら、Alibaba CloudのLogServiceを使っても同じ役割を果たせるので、ご興味をお持ちの方、ぜひご参考してみてください!