KafkaとSpark Streamingの統合について

こんにちは Kouです。

Webアクセス解析や、ログのリアルタイムモニタリングと不正検知、ソーシャルメディア分析などの時に、オープンソースの分散ストリーミングプラットフォームと呼ばれるApache KafkaとSparkにストリームデータを処理するSpark Streamingを組み合わせたストリームデータ処理システムはよく利用されると考えられます。今回の記事はTwitterのメッセージ分析を例として、Alibaba CloudのE-MapReduce上で、KafkaとSpark Streamingの統合方法について、皆さんにご紹介させて頂きたいと思います。

検証環境について

Spark Streaming
  • EMR-3.20.0
  • クラスタータイプは Hadoop
  • ハードウェア構成(Header)はecs.sn2.largeを1台
  • ハードウェア構成(Worker)はecs.sn2.largeを2台
# cat /etc/redhat-release
CentOS Linux release 7.4.1708 (Core) 
# uname -r
3.10.0-693.2.2.el7.x86_64
# hadoop version
Hadoop 2.8.5
Subversion Unknown -r f6ae72b25f6dbb9925f0850ac865274d31a28e30
Compiled by root on 2019-04-19T06:38Z
Compiled with protoc 2.5.0
From source with checksum 9624fc19bc23f1bbeacb1ae4bee88e7
This command was run using /opt/apps/ecm/service/hadoop/2.8.5-1.3.0/package/hadoop-2.8.5-1.3.0/share/hadoop/common/hadoop-common-2.8.5.jar
# spark-submit --version
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.2
      /_/

Using Scala version 2.11.12, OpenJDK 64-Bit Server VM, 1.8.0_151
Branch branch-2.4.2
Compiled by user root on 2019-04-26T06:32:50Z
Revision ef3e6ff4b20dc86ac81291664bb0ed3cf2641fea
Url /root/git-repos/emr-spark.git/
Type --help for more information.
Kafka
  • EMR-3.20.0
  • Zookeeper 3.4.13
  • Kakfa 1.1.1
  • クラスタータイプは Kafka
  • ハードウェア構成(Header)はecs.sn2.largeを1台
  • ハードウェア構成(Worker)はecs.sn2.largeを2台
[root@emr-header-1 bin]# echo envi | nc localhost 2181
Environment:
zookeeper.version=3.4.13-2d71af4dbe22557fda74f9a9b4309b15a7487f03, built on 06/29/2018 04:05 GMT
host.name=emr-header-1.cluster-43709
java.version=1.8.0_151
java.vendor=Oracle Corporation
java.home=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.151-1.b12.el7_4.x86_64/jre
java.class.path=/usr/lib/zookeeper-current/bin/../build/classes:/usr/lib/zookeeper-current/bin/../build/lib/*.jar:/usr/lib/zookeeper-current/bin/../lib/slf4j-log4j12-1.7.25.jar:/usr/lib/zookeeper-current/bin/../lib/slf4j-api-1.7.25.jar:/usr/lib/zookeeper-current/bin/../lib/netty-3.10.6.Final.jar:/usr/lib/zookeeper-current/bin/../lib/log4j-1.2.17.jar:/usr/lib/zookeeper-current/bin/../lib/jline-0.9.94.jar:/usr/lib/zookeeper-current/bin/../lib/audience-annotations-0.5.0.jar:/usr/lib/zookeeper-current/bin/../zookeeper-3.4.13.jar:/usr/lib/zookeeper-current/bin/../src/java/lib/*.jar:/etc/ecm/zookeeper-conf::/var/lib/ecm-agent/data/jmxetric-1.0.8.jar
java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
java.io.tmpdir=/tmp
java.compiler=<NA>
os.name=Linux
os.arch=amd64
os.version=3.10.0-693.2.2.el7.x86_64
user.name=hadoop
user.home=/home/hadoop
user.dir=/home/hadoop
Hosebird Client

Hosebird Clientというのは、Kafka Producerと連携する時に、TwitterのStreaming APIをコールするJava Http Clientです。 詳しく知りたい方は下記のgithubにご参考頂ければと思います。 github.com

全体構成図

指定したキーワードがTwitterでどれだけ話題になったかをリアルタイムのツイート数で表現させたいという分析の目的から、 まずはじめに、構築するAlibaba Cloudの構成図は以下となります。

f:id:sbc_kou:20190716161135p:plain

Kafka Producer

準備が整ったので、ローカルのJava開発環境で早速コードを書いていきましょう! まずは、以下のサンプルコードを参考に、Kafka Producerを作成し、jarファイルを生成します。

Tweetsキーワード

"bitcoin"、"Blockchain"、"IoT"、"5G"、4つのキーワードを指定しました。

Kakfa BootstrapServers

Kafkaクラスターの任意の一台のIPアドレスで構いません。

Twitter Streaming API認証情報

Twitter Streaming APIを利用する為に、consumerKey、consumerSecret、token、secretをそれぞれ事前に取得して入力します。

public class ProducerTest {

    Logger logger = LoggerFactory.getLogger(ProducerTest.class.getName());

 /** ---------------------- Twitter Streaming API情報 ---------------------- */
    String consumerKey = "xxxxxxxxxxxxxxxx";
    String consumerSecret = "xxxxxxxxxxxxxxxx";
    String token = "xxxxxxxxxxxxxxxx";
    String secret = "xxxxxxxxxxxxxxxx";
    String mytopic = "tweets_poc";

 /** ---------------------- Tweetsキーワードを指定 ---------------------- */
    List<String> terms = Lists.newArrayList("bitcoin","Blockchain","IoT","5G");


    public ProducerTest(){}

    public static void main(String[] args) {
        new ProducerTest().run();
    }

    public void run(){

        logger.info("Setup");

        BlockingQueue<String> msgQueue = new LinkedBlockingQueue<String>(1000);

        Client client = createTwitterClient(msgQueue);
        client.connect();

        KafkaProducer<String, String> producer = createKafkaProducer();

        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            logger.info("stopping application...");
            logger.info("shutting down client from twitter...");
            client.stop();
            logger.info("closing producer...");
            producer.close();
            logger.info("done!");
        }));

        while (!client.isDone()) {
            String msg = null;
            try {
                msg = msgQueue.poll(5, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                e.printStackTrace();
                client.stop();
            }

            if (msg != null){
                logger.info(msg);
                if(StringUtils.containsIgnoreCase(msg,"Bitcoin")){
                    producer.send(new ProducerRecord<>(mytopic, "Bitcoin", msg), new Callback() {
                        @Override
                        public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                            if (e != null) {
                                logger.error("Something bad happened", e);
                            }
                        }
                    });
                }
                else if (StringUtils.containsIgnoreCase(msg,"Blockchain")) {
                    producer.send(new ProducerRecord<>(mytopic, "Blockchain", msg), new Callback() {
                        @Override
                        public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                            if (e != null) {
                                logger.error("Something bad happened", e);
                            }
                        }
                    });
                }
                else if (StringUtils.containsIgnoreCase(msg,"IoT")) {
                    producer.send(new ProducerRecord<>(mytopic, "IoT", msg), new Callback() {
                        @Override
                        public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                            if (e != null) {
                                logger.error("Something bad happened", e);
                            }
                        }
                    });
                }
                else if (StringUtils.containsIgnoreCase(msg,"5G")) {
                    producer.send(new ProducerRecord<>(mytopic,5,"5G", msg));
                }
                else{
                    producer.send(new ProducerRecord<>(mytopic, null, msg), new Callback() {
                        @Override
                        public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                            if (e != null) {
                                logger.error("Something bad happened", e);
                            }
                        }
                    });
                }
            }
        }
        logger.info("End of application");
    }
   
 /** ---------------------- Hosebird Clientを作成 ---------------------- */
    public Client createTwitterClient(BlockingQueue<String> msgQueue){

        Hosts hosebirdHosts = new HttpHosts(Constants.STREAM_HOST);
        StatusesFilterEndpoint hosebirdEndpoint = new StatusesFilterEndpoint();

        hosebirdEndpoint.trackTerms(terms);

        Authentication hosebirdAuth = new OAuth1(consumerKey, consumerSecret, token, secret);

        ClientBuilder builder = new ClientBuilder()
                .name("Hosebird-Client-01")                              
                .hosts(hosebirdHosts)
                .authentication(hosebirdAuth)
                .endpoint(hosebirdEndpoint)
                .processor(new StringDelimitedProcessor(msgQueue));

        Client hosebirdClient = builder.build();
        return hosebirdClient;
    }

  /** ---------------------- kakfa producerを作成 ---------------------- */
    public KafkaProducer<String, String> createKafkaProducer(){

        String bootstrapServers = "xxxxxxxxxxxxxxxx";

        Properties properties = new Properties();
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        properties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
        properties.setProperty(ProducerConfig.ACKS_CONFIG, "all");
        properties.setProperty(ProducerConfig.RETRIES_CONFIG, Integer.toString(Integer.MAX_VALUE));
        properties.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5"); 

        properties.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
        properties.setProperty(ProducerConfig.LINGER_MS_CONFIG, "20");
        properties.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, Integer.toString(32*1024));

        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
        return producer;
    }

}

Spark Streaming Consumer

Spark Streamingは、数秒から数分ほどの短い間隔で繰り返しバッチ処理というマイクロバッチ方式によるストリームデータ処理機能を提供します。E-MapreduceのKafka Brokerのバージョンは0.10.0以上である為、kafkaレシーバーを使わず、各Partitionから直接メッセージを読み込むというダイレクトのアプローチでKafkaと連携できています。今回は使った間隔データは下記のようになります。

DStreamのバッチ間隔 1秒
スライディング間隔 1秒
ウィンドウサイズ 300秒

ローカルのJava開発環境で、以下のサンプルコードを参考に、Spark Streaming Consumerを作成し、jarファイルを生成します。

public class SparkStreamingPoC {

    private static JsonParser jsonParser = new JsonParser();

    public static void main(String[] args) throws InterruptedException {

        SparkConf sparkConf = new SparkConf().setAppName("TweetsApp");

        JavaStreamingContext streamingContext = new JavaStreamingContext(
                sparkConf, Durations.seconds(1));

         /** ---------------------- kafkaパラメータ設定 ---------------------- */
        Map<String, Object> kafkaParams = new HashMap<>();
        kafkaParams.put("bootstrap.servers", "xxxxxxxxxxxxxxxx");
        kafkaParams.put("key.deserializer", StringDeserializer.class);
        kafkaParams.put("value.deserializer", StringDeserializer.class);
        kafkaParams.put("group.id", "tweets_group");
        kafkaParams.put("auto.offset.reset", "latest");
        kafkaParams.put("enable.auto.commit", false);

        Collection<String> topics = Arrays.asList("tweets_poc");

        
        /** ---------------------- Spark Stream作成 ---------------------- */
       JavaInputDStream<ConsumerRecord<String, String>> stream =
                KafkaUtils.createDirectStream(
                        streamingContext,
                        LocationStrategies.PreferConsistent(),
                        ConsumerStrategies.<String, String> Subscribe(topics, kafkaParams));

        JavaPairDStream<String,Integer> s1 = stream.mapToPair(record -> new Tuple2<>(record.key(),1));
        JavaPairDStream<String,Integer> s2 = s1.reduceByKeyAndWindow((a,b) -> a + b,Durations.seconds(300),Durations.seconds(1));
        JavaPairDStream<Integer,String> s3 = s2.mapToPair(Tuple2::swap);
        JavaPairDStream<Integer,String> s4 = s3.transformToPair(rdd -> rdd.sortByKey(false));
        s4.print();

        streamingContext.start();
        streamingContext.awaitTermination();

    }
}

Sparkアプリを実行する

まず、作成したKafka ProducerとSpark Streaming ConsumerのJarファイルを一旦Alibaba Cloud OSSにアップロードしておきます。そして、ECSにsshログインして、ossutilなどのツールでKafka Producerのjarファイル(TweetsProducerTest-1.0-jar-with-dependencies.jar)をECSにダウンロードしていきます。ダウンロードできたら、下記のコマンドでKafka Producerを起動して、Twitterに投稿したメッセージを収集させます。

java -jar TweetsProducerTest-1.0-jar-with-dependencies.jar

E-MapReduceコンソールから、Kafkaに書き込んでいるTopicのPartitionやOffset、ISRなどの情報もリアルタイムで確認することもできます。

f:id:sbc_kou:20190703171503p:plain

f:id:sbc_kou:20190703171507p:plain

master引数

E-MapReduceはYarnモードを使用しますので、下図赤枠のCLIのアーギュメント(master)をYarnに指定する。

OSSプロトコル

ossref:OSSからjarファイルをダウンロードして実行する。

f:id:sbc_kou:20190703164615p:plain

下図のように、ETL処理の結果も確認することができました。

f:id:sbc_kou:20190703172435p:plain

最後

いかかでしたでしょうか

普通ならば、sshなどでmaster nodeや踏み台サーバーにログインし、コマンドラインからJobを実行するとの手順ですが、今回のように、E-MapReduceコンソールのデータプラットフォームより、Jobの実行はGUIベースで操作できるようになりました。さらに、Jobの結果はすぐに反映できる点はJobのトラブルシューティングにも役に立てると思います。

Spark Streamingとkafkaを組み合わせたストリームデータ処理システムをE-Mapreduce上に構築し、その検証結果を紹介しました。実は、汎用のリアルタイムデータ処理プラットフォームとして、今回ご紹介したApache Spark以外、Apache Flinkを利用することもできます。Flinkに慣れ親しんだ開発者は、Alibaba CloudのマネージドサービスRealtime Computeをご参考頂ければと思います。 また、Twitterメッセージ以外にも、日々多くの各種トラフィックログ、メールのデータなどをリアルタイムな分析やモニタリングを実現したい場合にも活用できるかと思いますので、是非使ってみてください!