こんにちは Kouです。
Webアクセス解析や、ログのリアルタイムモニタリングと不正検知、ソーシャルメディア分析などの時に、オープンソースの分散ストリーミングプラットフォームと呼ばれるApache KafkaとSparkにストリームデータを処理するSpark Streamingを組み合わせたストリームデータ処理システムはよく利用されると考えられます。今回の記事はTwitterのメッセージ分析を例として、Alibaba CloudのE-MapReduce上で、KafkaとSpark Streamingの統合方法について、皆さんにご紹介させて頂きたいと思います。
検証環境について
Spark Streaming
- EMR-3.20.0
- クラスタータイプは Hadoop
- ハードウェア構成(Header)はecs.sn2.largeを1台
- ハードウェア構成(Worker)はecs.sn2.largeを2台
# cat /etc/redhat-release CentOS Linux release 7.4.1708 (Core) # uname -r 3.10.0-693.2.2.el7.x86_64 # hadoop version Hadoop 2.8.5 Subversion Unknown -r f6ae72b25f6dbb9925f0850ac865274d31a28e30 Compiled by root on 2019-04-19T06:38Z Compiled with protoc 2.5.0 From source with checksum 9624fc19bc23f1bbeacb1ae4bee88e7 This command was run using /opt/apps/ecm/service/hadoop/2.8.5-1.3.0/package/hadoop-2.8.5-1.3.0/share/hadoop/common/hadoop-common-2.8.5.jar # spark-submit --version Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.4.2 /_/ Using Scala version 2.11.12, OpenJDK 64-Bit Server VM, 1.8.0_151 Branch branch-2.4.2 Compiled by user root on 2019-04-26T06:32:50Z Revision ef3e6ff4b20dc86ac81291664bb0ed3cf2641fea Url /root/git-repos/emr-spark.git/ Type --help for more information.
Kafka
- EMR-3.20.0
- Zookeeper 3.4.13
- Kakfa 1.1.1
- クラスタータイプは Kafka
- ハードウェア構成(Header)はecs.sn2.largeを1台
- ハードウェア構成(Worker)はecs.sn2.largeを2台
[root@emr-header-1 bin]# echo envi | nc localhost 2181 Environment: zookeeper.version=3.4.13-2d71af4dbe22557fda74f9a9b4309b15a7487f03, built on 06/29/2018 04:05 GMT host.name=emr-header-1.cluster-43709 java.version=1.8.0_151 java.vendor=Oracle Corporation java.home=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.151-1.b12.el7_4.x86_64/jre java.class.path=/usr/lib/zookeeper-current/bin/../build/classes:/usr/lib/zookeeper-current/bin/../build/lib/*.jar:/usr/lib/zookeeper-current/bin/../lib/slf4j-log4j12-1.7.25.jar:/usr/lib/zookeeper-current/bin/../lib/slf4j-api-1.7.25.jar:/usr/lib/zookeeper-current/bin/../lib/netty-3.10.6.Final.jar:/usr/lib/zookeeper-current/bin/../lib/log4j-1.2.17.jar:/usr/lib/zookeeper-current/bin/../lib/jline-0.9.94.jar:/usr/lib/zookeeper-current/bin/../lib/audience-annotations-0.5.0.jar:/usr/lib/zookeeper-current/bin/../zookeeper-3.4.13.jar:/usr/lib/zookeeper-current/bin/../src/java/lib/*.jar:/etc/ecm/zookeeper-conf::/var/lib/ecm-agent/data/jmxetric-1.0.8.jar java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib java.io.tmpdir=/tmp java.compiler=<NA> os.name=Linux os.arch=amd64 os.version=3.10.0-693.2.2.el7.x86_64 user.name=hadoop user.home=/home/hadoop user.dir=/home/hadoop
Hosebird Client
Hosebird Clientというのは、Kafka Producerと連携する時に、TwitterのStreaming APIをコールするJava Http Clientです。 詳しく知りたい方は下記のgithubにご参考頂ければと思います。 github.com
全体構成図
指定したキーワードがTwitterでどれだけ話題になったかをリアルタイムのツイート数で表現させたいという分析の目的から、 まずはじめに、構築するAlibaba Cloudの構成図は以下となります。
Kafka Producer
準備が整ったので、ローカルのJava開発環境で早速コードを書いていきましょう! まずは、以下のサンプルコードを参考に、Kafka Producerを作成し、jarファイルを生成します。
Tweetsキーワード
"bitcoin"、"Blockchain"、"IoT"、"5G"、4つのキーワードを指定しました。
Kakfa BootstrapServers
Kafkaクラスターの任意の一台のIPアドレスで構いません。
Twitter Streaming API認証情報
Twitter Streaming APIを利用する為に、consumerKey、consumerSecret、token、secretをそれぞれ事前に取得して入力します。
public class ProducerTest { Logger logger = LoggerFactory.getLogger(ProducerTest.class.getName()); /** ---------------------- Twitter Streaming API情報 ---------------------- */ String consumerKey = "xxxxxxxxxxxxxxxx"; String consumerSecret = "xxxxxxxxxxxxxxxx"; String token = "xxxxxxxxxxxxxxxx"; String secret = "xxxxxxxxxxxxxxxx"; String mytopic = "tweets_poc"; /** ---------------------- Tweetsキーワードを指定 ---------------------- */ List<String> terms = Lists.newArrayList("bitcoin","Blockchain","IoT","5G"); public ProducerTest(){} public static void main(String[] args) { new ProducerTest().run(); } public void run(){ logger.info("Setup"); BlockingQueue<String> msgQueue = new LinkedBlockingQueue<String>(1000); Client client = createTwitterClient(msgQueue); client.connect(); KafkaProducer<String, String> producer = createKafkaProducer(); Runtime.getRuntime().addShutdownHook(new Thread(() -> { logger.info("stopping application..."); logger.info("shutting down client from twitter..."); client.stop(); logger.info("closing producer..."); producer.close(); logger.info("done!"); })); while (!client.isDone()) { String msg = null; try { msg = msgQueue.poll(5, TimeUnit.SECONDS); } catch (InterruptedException e) { e.printStackTrace(); client.stop(); } if (msg != null){ logger.info(msg); if(StringUtils.containsIgnoreCase(msg,"Bitcoin")){ producer.send(new ProducerRecord<>(mytopic, "Bitcoin", msg), new Callback() { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (e != null) { logger.error("Something bad happened", e); } } }); } else if (StringUtils.containsIgnoreCase(msg,"Blockchain")) { producer.send(new ProducerRecord<>(mytopic, "Blockchain", msg), new Callback() { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (e != null) { logger.error("Something bad happened", e); } } }); } else if (StringUtils.containsIgnoreCase(msg,"IoT")) { producer.send(new ProducerRecord<>(mytopic, "IoT", msg), new Callback() { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (e != null) { logger.error("Something bad happened", e); } } }); } else if (StringUtils.containsIgnoreCase(msg,"5G")) { producer.send(new ProducerRecord<>(mytopic,5,"5G", msg)); } else{ producer.send(new ProducerRecord<>(mytopic, null, msg), new Callback() { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (e != null) { logger.error("Something bad happened", e); } } }); } } } logger.info("End of application"); } /** ---------------------- Hosebird Clientを作成 ---------------------- */ public Client createTwitterClient(BlockingQueue<String> msgQueue){ Hosts hosebirdHosts = new HttpHosts(Constants.STREAM_HOST); StatusesFilterEndpoint hosebirdEndpoint = new StatusesFilterEndpoint(); hosebirdEndpoint.trackTerms(terms); Authentication hosebirdAuth = new OAuth1(consumerKey, consumerSecret, token, secret); ClientBuilder builder = new ClientBuilder() .name("Hosebird-Client-01") .hosts(hosebirdHosts) .authentication(hosebirdAuth) .endpoint(hosebirdEndpoint) .processor(new StringDelimitedProcessor(msgQueue)); Client hosebirdClient = builder.build(); return hosebirdClient; } /** ---------------------- kakfa producerを作成 ---------------------- */ public KafkaProducer<String, String> createKafkaProducer(){ String bootstrapServers = "xxxxxxxxxxxxxxxx"; Properties properties = new Properties(); properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); properties.setProperty(ProducerConfig.ACKS_CONFIG, "all"); properties.setProperty(ProducerConfig.RETRIES_CONFIG, Integer.toString(Integer.MAX_VALUE)); properties.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5"); properties.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy"); properties.setProperty(ProducerConfig.LINGER_MS_CONFIG, "20"); properties.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, Integer.toString(32*1024)); KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties); return producer; } }
Spark Streaming Consumer
Spark Streamingは、数秒から数分ほどの短い間隔で繰り返しバッチ処理というマイクロバッチ方式によるストリームデータ処理機能を提供します。E-MapreduceのKafka Brokerのバージョンは0.10.0以上である為、kafkaレシーバーを使わず、各Partitionから直接メッセージを読み込むというダイレクトのアプローチでKafkaと連携できています。今回は使った間隔データは下記のようになります。
DStreamのバッチ間隔 1秒
スライディング間隔 1秒
ウィンドウサイズ 300秒
ローカルのJava開発環境で、以下のサンプルコードを参考に、Spark Streaming Consumerを作成し、jarファイルを生成します。
public class SparkStreamingPoC { private static JsonParser jsonParser = new JsonParser(); public static void main(String[] args) throws InterruptedException { SparkConf sparkConf = new SparkConf().setAppName("TweetsApp"); JavaStreamingContext streamingContext = new JavaStreamingContext( sparkConf, Durations.seconds(1)); /** ---------------------- kafkaパラメータ設定 ---------------------- */ Map<String, Object> kafkaParams = new HashMap<>(); kafkaParams.put("bootstrap.servers", "xxxxxxxxxxxxxxxx"); kafkaParams.put("key.deserializer", StringDeserializer.class); kafkaParams.put("value.deserializer", StringDeserializer.class); kafkaParams.put("group.id", "tweets_group"); kafkaParams.put("auto.offset.reset", "latest"); kafkaParams.put("enable.auto.commit", false); Collection<String> topics = Arrays.asList("tweets_poc"); /** ---------------------- Spark Stream作成 ---------------------- */ JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream( streamingContext, LocationStrategies.PreferConsistent(), ConsumerStrategies.<String, String> Subscribe(topics, kafkaParams)); JavaPairDStream<String,Integer> s1 = stream.mapToPair(record -> new Tuple2<>(record.key(),1)); JavaPairDStream<String,Integer> s2 = s1.reduceByKeyAndWindow((a,b) -> a + b,Durations.seconds(300),Durations.seconds(1)); JavaPairDStream<Integer,String> s3 = s2.mapToPair(Tuple2::swap); JavaPairDStream<Integer,String> s4 = s3.transformToPair(rdd -> rdd.sortByKey(false)); s4.print(); streamingContext.start(); streamingContext.awaitTermination(); } }
Sparkアプリを実行する
まず、作成したKafka ProducerとSpark Streaming ConsumerのJarファイルを一旦Alibaba Cloud OSSにアップロードしておきます。そして、ECSにsshログインして、ossutilなどのツールでKafka Producerのjarファイル(TweetsProducerTest-1.0-jar-with-dependencies.jar)をECSにダウンロードしていきます。ダウンロードできたら、下記のコマンドでKafka Producerを起動して、Twitterに投稿したメッセージを収集させます。
java -jar TweetsProducerTest-1.0-jar-with-dependencies.jar
E-MapReduceコンソールから、Kafkaに書き込んでいるTopicのPartitionやOffset、ISRなどの情報もリアルタイムで確認することもできます。
master引数
E-MapReduceはYarnモードを使用しますので、下図赤枠のCLIのアーギュメント(master)をYarnに指定する。
OSSプロトコル
ossref:OSSからjarファイルをダウンロードして実行する。
下図のように、ETL処理の結果も確認することができました。
最後
いかかでしたでしょうか
普通ならば、sshなどでmaster nodeや踏み台サーバーにログインし、コマンドラインからJobを実行するとの手順ですが、今回のように、E-MapReduceコンソールのデータプラットフォームより、Jobの実行はGUIベースで操作できるようになりました。さらに、Jobの結果はすぐに反映できる点はJobのトラブルシューティングにも役に立てると思います。
Spark Streamingとkafkaを組み合わせたストリームデータ処理システムをE-Mapreduce上に構築し、その検証結果を紹介しました。実は、汎用のリアルタイムデータ処理プラットフォームとして、今回ご紹介したApache Spark以外、Apache Flinkを利用することもできます。Flinkに慣れ親しんだ開発者は、Alibaba CloudのマネージドサービスRealtime Computeをご参考頂ければと思います。 また、Twitterメッセージ以外にも、日々多くの各種トラフィックログ、メールのデータなどをリアルタイムな分析やモニタリングを実現したい場合にも活用できるかと思いますので、是非使ってみてください!