Apache FlumeとSpark Streamingの統合について

まいど、Kouです。

Web Serverのアクセスログをリアルタイムストリーミング分析するというのはよくある利用シーンだと想定されます。今回の記事は、Apache Flumeと呼ばれるログ収集基盤とE-MapReduceクラスターのSpark Streaming分析基盤の統合手法について、ご説明させて頂きたいと思います。

  • 前提
  • EMR-3.16.0
  • クラスタータイプは Hadoop
  • ハードウェア構成(Header)はecs.sn1ne.2xlargeを1台
  • ハードウェア構成(Worker)はecs.sn1ne.2xlargeを3台
# cat /etc/redhat-release
CentOS Linux release 7.4.1708 (Core) 
# uname -r
3.10.0-693.2.2.el7.x86_64
# flume-ng version
Flume 1.8.0
Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
Revision: Unknown
Compiled by root on Wed Nov 28 11:09:28 CST 2018
From source with checksum 63b5d03c9afd862ff786f7826ffe55d0
# hadoop version
Hadoop 2.7.2
Subversion http://gitlab.alibaba-inc.com/soe/emr-hadoop.git -r d2cd70f951304b8ca3d12991262e7e0d321abefc
Compiled by root on 2018-11-30T09:31Z
Compiled with protoc 2.5.0
From source with checksum 4447ed9f24dcd981df7daaadd5bafc0
This command was run using /opt/apps/ecm/service/hadoop/2.7.2-1.3.2/package/hadoop-2.7.2-1.3.2/share/hadoop/common/hadoop-common-2.7.2.jar
  • Flumeの設定

Flumeの使い方について、こちらの説明は割愛しますが、興味お持ちの方が公式ドキュメントをご参考頂ければと思います。Flumeの設定ファイルは以下となります。ソースとシンクはそれぞれSpooldirとAvroクライアントを設定します。

a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /root/spool
a1.sources.r1.fileHeader = true

a1.sinks.k1.type = avro
a1.sinks.k1.hostname = localhost
a1.sinks.k1.port = 9906

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
  • Spark Streamingについて

Spark Streamingは、数秒から数分ほどの短い間隔で繰り返しバッチ処理というマイクロバッチ方式によるストリームデータ処理機能を提供します。今回は使った間隔データは下記のようになります。

DStreamのバッチ間隔 1秒
スライディング間隔 1秒
ウィンドウサイズ 300 秒
import re
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.flume import FlumeUtils

parts = [
    r'(?P<host>\S+)',                   
    r'\S+',                            
    r'(?P<user>\S+)',                  
    r'\[(?P<time>.+)\]',               
    r'"(?P<request>.+)"',               
    r'(?P<status>[0-9]+)',              
    r'(?P<size>\S+)',                   
    r'"(?P<referer>.*)"',               
    r'"(?P<agent>.*)"', 
]
pattern = re.compile(r'\s+'.join(parts)+r'\s*\Z')

def extractURLRequest(line):
    exp = pattern.match(line)
    if exp:
        request = exp.groupdict()["request"]
        if request:
           requestFields = request.split()
           if (len(requestFields) > 1):
                return requestFields[1]

if __name__ == "__main__":

    sc = SparkContext(appName="StreamingFlumeLogAggregator")
    sc.setLogLevel("ERROR")
    ssc = StreamingContext(sc, 1)
    flumeStream = FlumeUtils.createStream(ssc, "localhost", 9906)
    lines = flumeStream.map(lambda x: x[1])
    urls = lines.map(extractURLRequest)   
    urlCounts = urls.map(lambda x: (x, 1)).reduceByKeyAndWindow(lambda x, y: x + y, lambda x, y : x - y, 300, 1)

    sortedResults = urlCounts.transform(lambda rdd: rdd.sortBy(lambda x: x[1], False))
    sortedResults.pprint()
    ssc.checkpoint("/root/checkpoint")
    ssc.start()
    ssc.awaitTermination()
  • 実行する

分析用のアクセスログをSpoolDir(/root/spool)に導入し、下記のコマンドでFlume NGを起動します。その後、別プロセスで上記のSparkStreamingアプリを起動します。

# bin/flume-ng agent --conf conf --conf-file ~/sparkstreamingflume.conf --name a1 -Dflume.root.logger=INFO,console

# spark-submit --packages org.apache.spark:spark-streaming-flume_2.11:2.3.2 SparkFlume.py
# tail  access_log.txt 
46.166.139.20 - - [06/Dec/2015:03:14:54 +0000] "POST /xmlrpc.php HTTP/1.0" 200 370 "-" "Mozilla/4.0 (compatible: MSIE 7.0; Windows NT 6.0)"
46.166.139.20 - - [06/Dec/2015:03:14:54 +0000] "POST /xmlrpc.php HTTP/1.0" 200 370 "-" "Mozilla/4.0 (compatible: MSIE 7.0; Windows NT 6.0)"
46.166.139.20 - - [06/Dec/2015:03:14:55 +0000] "POST /xmlrpc.php HTTP/1.0" 200 370 "-" "Mozilla/4.0 (compatible: MSIE 7.0; Windows NT 6.0)"
46.166.139.20 - - [06/Dec/2015:03:14:55 +0000] "POST /xmlrpc.php HTTP/1.0" 200 370 "-" "Mozilla/4.0 (compatible: MSIE 7.0; Windows NT 6.0)"
46.166.139.20 - - [06/Dec/2015:03:14:56 +0000] "POST /xmlrpc.php HTTP/1.0" 200 370 "-" "Mozilla/4.0 (compatible: MSIE 7.0; Windows NT 6.0)"
46.166.139.20 - - [06/Dec/2015:03:14:56 +0000] "POST /xmlrpc.php HTTP/1.0" 200 370 "-" "Mozilla/4.0 (compatible: MSIE 7.0; Windows NT 6.0)"
46.166.139.20 - - [06/Dec/2015:03:14:57 +0000] "POST /xmlrpc.php HTTP/1.0" 200 370 "-" "Mozilla/4.0 (compatible: MSIE 7.0; Windows NT 6.0)"
46.166.139.20 - - [06/Dec/2015:03:14:58 +0000] "POST /xmlrpc.php HTTP/1.0" 200 370 "-" "Mozilla/4.0 (compatible: MSIE 7.0; Windows NT 6.0)"
46.166.139.20 - - [06/Dec/2015:03:14:58 +0000] "POST /xmlrpc.php HTTP/1.0" 200 370 "-" "Mozilla/4.0 (compatible: MSIE 7.0; Windows NT 6.0)"
46.166.139.20 - - [06/Dec/2015:03:14:59 +0000] "POST /xmlrpc.php HTTP/1.0" 200 370 "-" "Mozilla/4.0 (compatible: MSIE 7.0; Windows NT 6.0)"

下記のように、1秒間ごとにリアルタイム処理することができるようになりました。アクセス先URLごとのアクセス数(Top10)が一覧で出力されました。

-------------------------------------------
Time: 2019-03-15 14:02:19
-------------------------------------------
(u'/xmlrpc.php', 8509)
(u'/wp-login.php', 1798)
(u'/', 119)
(u'/robots.txt', 44)
(u'/blog/', 36)
(u'/page-sitemap.xml', 29)
(u'/post-sitemap.xml', 29)
(u'/category-sitemap.xml', 29)
(u'/sitemap_index.xml', 29)
(u'http://51.254.206.142/httptest.php', 26)
...

-------------------------------------------
Time: 2019-03-15 14:02:20
-------------------------------------------
(u'/xmlrpc.php', 68415)
(u'/wp-login.php', 1923)
(u'/', 440)
(u'/blog/', 138)
(u'/robots.txt', 123)
(u'/post-sitemap.xml', 118)
(u'/sitemap_index.xml', 118)
(u'/page-sitemap.xml', 117)
(u'/category-sitemap.xml', 117)
(u'/orlando-headlines/', 95)
...

-------------------------------------------
Time: 2019-03-15 14:02:21
-------------------------------------------
(u'/xmlrpc.php', 68415)
(u'/wp-login.php', 1923)
(u'/', 440)
(u'/blog/', 138)
(u'/robots.txt', 123)
(u'/post-sitemap.xml', 118)
(u'/sitemap_index.xml', 118)
(u'/page-sitemap.xml', 117)
(u'/category-sitemap.xml', 117)
(u'/orlando-headlines/', 95)
...
  • 最後

皆さん、いかがでしたでしょうか、Spark StreamingとApache Flumeを組み合わせたストリームデータ処理システムを構築し、その検証結果を紹介しました。実はWebアクセス以外にも、日々多くの各種業務ログやトラフィックログ、メールのデータなどをリアルタイムな分析やモニタリングを実現したい場合にも活用できるかと思いますので、ぜひご検討ください。