まいど、Kouです。
Web Serverのアクセスログをリアルタイムストリーミング分析するというのはよくある利用シーンだと想定されます。今回の記事は、Apache Flumeと呼ばれるログ収集基盤とE-MapReduceクラスターのSpark Streaming分析基盤の統合手法について、ご説明させて頂きたいと思います。
- 前提
- EMR-3.16.0
- クラスタータイプは Hadoop
- ハードウェア構成(Header)はecs.sn1ne.2xlargeを1台
- ハードウェア構成(Worker)はecs.sn1ne.2xlargeを3台
# cat /etc/redhat-release CentOS Linux release 7.4.1708 (Core) # uname -r 3.10.0-693.2.2.el7.x86_64 # flume-ng version Flume 1.8.0 Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git Revision: Unknown Compiled by root on Wed Nov 28 11:09:28 CST 2018 From source with checksum 63b5d03c9afd862ff786f7826ffe55d0 # hadoop version Hadoop 2.7.2 Subversion http://gitlab.alibaba-inc.com/soe/emr-hadoop.git -r d2cd70f951304b8ca3d12991262e7e0d321abefc Compiled by root on 2018-11-30T09:31Z Compiled with protoc 2.5.0 From source with checksum 4447ed9f24dcd981df7daaadd5bafc0 This command was run using /opt/apps/ecm/service/hadoop/2.7.2-1.3.2/package/hadoop-2.7.2-1.3.2/share/hadoop/common/hadoop-common-2.7.2.jar
- Flumeの設定
Flumeの使い方について、こちらの説明は割愛しますが、興味お持ちの方が公式ドキュメントをご参考頂ければと思います。Flumeの設定ファイルは以下となります。ソースとシンクはそれぞれSpooldirとAvroクライアントを設定します。
a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = spooldir a1.sources.r1.spoolDir = /root/spool a1.sources.r1.fileHeader = true a1.sinks.k1.type = avro a1.sinks.k1.hostname = localhost a1.sinks.k1.port = 9906 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
- Spark Streamingについて
Spark Streamingは、数秒から数分ほどの短い間隔で繰り返しバッチ処理というマイクロバッチ方式によるストリームデータ処理機能を提供します。今回は使った間隔データは下記のようになります。
DStreamのバッチ間隔 | 1秒 |
スライディング間隔 | 1秒 |
ウィンドウサイズ | 300 秒 |
import re from pyspark import SparkContext from pyspark.streaming import StreamingContext from pyspark.streaming.flume import FlumeUtils parts = [ r'(?P<host>\S+)', r'\S+', r'(?P<user>\S+)', r'\[(?P<time>.+)\]', r'"(?P<request>.+)"', r'(?P<status>[0-9]+)', r'(?P<size>\S+)', r'"(?P<referer>.*)"', r'"(?P<agent>.*)"', ] pattern = re.compile(r'\s+'.join(parts)+r'\s*\Z') def extractURLRequest(line): exp = pattern.match(line) if exp: request = exp.groupdict()["request"] if request: requestFields = request.split() if (len(requestFields) > 1): return requestFields[1] if __name__ == "__main__": sc = SparkContext(appName="StreamingFlumeLogAggregator") sc.setLogLevel("ERROR") ssc = StreamingContext(sc, 1) flumeStream = FlumeUtils.createStream(ssc, "localhost", 9906) lines = flumeStream.map(lambda x: x[1]) urls = lines.map(extractURLRequest) urlCounts = urls.map(lambda x: (x, 1)).reduceByKeyAndWindow(lambda x, y: x + y, lambda x, y : x - y, 300, 1) sortedResults = urlCounts.transform(lambda rdd: rdd.sortBy(lambda x: x[1], False)) sortedResults.pprint() ssc.checkpoint("/root/checkpoint") ssc.start() ssc.awaitTermination()
- 実行する
分析用のアクセスログをSpoolDir(/root/spool)に導入し、下記のコマンドでFlume NGを起動します。その後、別プロセスで上記のSparkStreamingアプリを起動します。
# bin/flume-ng agent --conf conf --conf-file ~/sparkstreamingflume.conf --name a1 -Dflume.root.logger=INFO,console # spark-submit --packages org.apache.spark:spark-streaming-flume_2.11:2.3.2 SparkFlume.py
# tail access_log.txt 46.166.139.20 - - [06/Dec/2015:03:14:54 +0000] "POST /xmlrpc.php HTTP/1.0" 200 370 "-" "Mozilla/4.0 (compatible: MSIE 7.0; Windows NT 6.0)" 46.166.139.20 - - [06/Dec/2015:03:14:54 +0000] "POST /xmlrpc.php HTTP/1.0" 200 370 "-" "Mozilla/4.0 (compatible: MSIE 7.0; Windows NT 6.0)" 46.166.139.20 - - [06/Dec/2015:03:14:55 +0000] "POST /xmlrpc.php HTTP/1.0" 200 370 "-" "Mozilla/4.0 (compatible: MSIE 7.0; Windows NT 6.0)" 46.166.139.20 - - [06/Dec/2015:03:14:55 +0000] "POST /xmlrpc.php HTTP/1.0" 200 370 "-" "Mozilla/4.0 (compatible: MSIE 7.0; Windows NT 6.0)" 46.166.139.20 - - [06/Dec/2015:03:14:56 +0000] "POST /xmlrpc.php HTTP/1.0" 200 370 "-" "Mozilla/4.0 (compatible: MSIE 7.0; Windows NT 6.0)" 46.166.139.20 - - [06/Dec/2015:03:14:56 +0000] "POST /xmlrpc.php HTTP/1.0" 200 370 "-" "Mozilla/4.0 (compatible: MSIE 7.0; Windows NT 6.0)" 46.166.139.20 - - [06/Dec/2015:03:14:57 +0000] "POST /xmlrpc.php HTTP/1.0" 200 370 "-" "Mozilla/4.0 (compatible: MSIE 7.0; Windows NT 6.0)" 46.166.139.20 - - [06/Dec/2015:03:14:58 +0000] "POST /xmlrpc.php HTTP/1.0" 200 370 "-" "Mozilla/4.0 (compatible: MSIE 7.0; Windows NT 6.0)" 46.166.139.20 - - [06/Dec/2015:03:14:58 +0000] "POST /xmlrpc.php HTTP/1.0" 200 370 "-" "Mozilla/4.0 (compatible: MSIE 7.0; Windows NT 6.0)" 46.166.139.20 - - [06/Dec/2015:03:14:59 +0000] "POST /xmlrpc.php HTTP/1.0" 200 370 "-" "Mozilla/4.0 (compatible: MSIE 7.0; Windows NT 6.0)"
下記のように、1秒間ごとにリアルタイム処理することができるようになりました。アクセス先URLごとのアクセス数(Top10)が一覧で出力されました。
------------------------------------------- Time: 2019-03-15 14:02:19 ------------------------------------------- (u'/xmlrpc.php', 8509) (u'/wp-login.php', 1798) (u'/', 119) (u'/robots.txt', 44) (u'/blog/', 36) (u'/page-sitemap.xml', 29) (u'/post-sitemap.xml', 29) (u'/category-sitemap.xml', 29) (u'/sitemap_index.xml', 29) (u'http://51.254.206.142/httptest.php', 26) ... ------------------------------------------- Time: 2019-03-15 14:02:20 ------------------------------------------- (u'/xmlrpc.php', 68415) (u'/wp-login.php', 1923) (u'/', 440) (u'/blog/', 138) (u'/robots.txt', 123) (u'/post-sitemap.xml', 118) (u'/sitemap_index.xml', 118) (u'/page-sitemap.xml', 117) (u'/category-sitemap.xml', 117) (u'/orlando-headlines/', 95) ... ------------------------------------------- Time: 2019-03-15 14:02:21 ------------------------------------------- (u'/xmlrpc.php', 68415) (u'/wp-login.php', 1923) (u'/', 440) (u'/blog/', 138) (u'/robots.txt', 123) (u'/post-sitemap.xml', 118) (u'/sitemap_index.xml', 118) (u'/page-sitemap.xml', 117) (u'/category-sitemap.xml', 117) (u'/orlando-headlines/', 95) ...
- 最後
皆さん、いかがでしたでしょうか、Spark StreamingとApache Flumeを組み合わせたストリームデータ処理システムを構築し、その検証結果を紹介しました。実はWebアクセス以外にも、日々多くの各種業務ログやトラフィックログ、メールのデータなどをリアルタイムな分析やモニタリングを実現したい場合にも活用できるかと思いますので、ぜひご検討ください。