【Logservice連載】Logstashでnetflowデータを収集するLogService

Hi, データエンジニアの大原です。 今回はAlibaba Cloudの国際サイトで提供している LogService のご紹介、およびLogstashでnetflowデータをLogServiceへ収集する方法を記載します。
また、SBC Engineers' Blogの性格上、集中連載記事になります。

【Logservice連載】オフラインデータを含めた、様々なデータソースをシームレスに収集するLogServiceのご紹介
【Logservice連載】LogtailでCSVデータを収集するLogService
【Logservice連載】SDKでExcelデータを収集するLogService
【Logservice連載】OSS、AWS S3からCSVデータを収集するLogService
【Logservice連載】SDKでTwitterデータを収集するLogService
【Logservice連載】Logstashでnetflowデータを収集するLogService←本記事
【Logservice連載】Google Apps Script(GAS)で株価データを収集し、LogServiceの機械学習で株価予測・異常検知・監視をする
【Logservice連載】NYC-Taxi on Logservice

f:id:sbc_ohara:20201118125247p:plain

前書き

LogServiceの可能性をこのblogにて記載しました。

LogService は、リアルタイムデータロギングサービスです。
ログの収集、消費、出荷、検索、および分析をサポートし、大量のログを処理および分析する能力を向上させます。

少し前になりますが、LogServiceについての資料をSlideShareへアップロードしていますので、こちらも参考になればと思います。

www2.slideshare.net

今回はLogstashを使ってNWのトラフィックデータをAlibaba Cloud LogServiceへ収集、蓄積、可視化してみましょう。構成図で、こんな感じです。

f:id:sbc_ohara:20201223143231p:plain


プロジェクト作成(LogService全体で共通事項)

まずはプロジェクトを作成します。LogServiceコンソールから 「Create Project」を選択し、起動します。

f:id:sbc_ohara:20201124101928p:plain


Project Nameをここでは「techblog」にし、プロジェクトを作成します。 f:id:sbc_ohara:20201124102655p:plain

その直後に "Do you want to create a Logstore for log data storage immediately?"、「Log Storeを作成しますか?」とポップアップが出ます。 Log StoreはLog Serviceでデータを蓄積するものなので、「OK」を選定します。 f:id:sbc_ohara:20201124102805p:plain

LogStore Nameをここでは「netflow_logstore」と入力し、LogStoreを作成します。

その後、「LogStoreが作成されました。今すぐデータアクセスしますか?」とポップアップが出ますが、これは必要に応じて選定すると良いです。 ちなみに「Yes」を選択した場合、50を超える様々なデータアクセス手法のコンソールが表示されます。 f:id:sbc_ohara:20201124103134p:plain


データ格納について

今回、netflowを使って、データを収集します。 netflowを収集するNW機器は色々ありますが、手近なものであれば、2000円で購入できるswitch hubでも良いし、Raspberry Piでも良いです。

参考:

qiita.com


STEP1: NW機器のデータ収集

著者の家にCisco Catalyst 6500があるので、それを使ってトラフィックを可視化します。
(ciscoフリーク世代で組織事情によりCCIEを挫折した人です。年齢バレるか。。)

ここは職場のNWを想定して、社内NWの監視をイメージします。著者のCisco Catalyst 6500がコアスイッチ(L3SW)の位置づけとして、NetFlowをアクティベーションします。このblogのために使い捨て使用なので、DNSはfreeのOpenDNSを使用しています。

f:id:sbc_ohara:20201217113218p:plain

NetFlowの基本的な操作方法はこちらになります。

https://www.cisco.com/c/dam/global/ja_jp/td/docs/security/stealthwatch/netflow/Cisco_NetFlow_Configuration.pdf

また、チェックおよび収集したい各フィールド・パラメータはこちらを参考にいただければ幸いです。

www.cisco.com

bas-6500-vg01 # config t
Enter configuration commands, one per line. End with CNTL-Z.
bas-6500-vg01(config) # flow record CHECKrecord                             # CHECKrecord という名前でnetflowレコードを作成
bas-6500-vg01(config-flow-record) # match ipv4 source address              # チェックしたいフィールド(key)をmatchで定義
bas-6500-vg01(config-flow-record) # match ipv4 destination address          # チェックしたいフィールド(key)をmatchで定義
bas-6500-vg01(config-flow-record) # match ipv4 protocol                     # チェックしたいフィールド(key)をmatchで定義
bas-6500-vg01(config-flow-record) # match ipv4 transport source-port        # チェックしたいフィールド(key)をmatchで定義
bas-6500-vg01(config-flow-record) # match ipv4 transport destination-port   # チェックしたいフィールド(key)をmatchで定義
bas-6500-vg01(config-flow-record) # match ipv4 tos                          # チェックしたいフィールド(key)をmatchで定義
bas-6500-vg01(config-flow-record) # match interface input                   # チェックしたいフィールド(key)をmatchで定義       
bas-6500-vg01(config-flow-record) # collect interface output                # 収集したいフィールド(Non-key)をcollectで定義
bas-6500-vg01(config-flow-record) # collect counter bytes                   # 収集したいフィールド(Non-key)をcollectで定義
bas-6500-vg01(config-flow-record) # collect counter packets                 # 収集したいフィールド(Non-key)をcollectで定義
bas-6500-vg01(config-flow-record) # collect timestamp sys-uptime first      # 収集したいフィールド(Non-key)をcollectで定義
bas-6500-vg01(config-flow-record) # collect timestamp sys-uptime last       # 収集したいフィールド(Non-key)をcollectで定義
bas-6500-vg01(config-flow-record) # collect application name                # 収集したいフィールド(Non-key)をcollectで定義
bas-6500-vg01(config-flow-record) # collect interface output                # 収集したいフィールド(Non-key)をcollectで定義
bas-6500-vg01(config-flow-record) # collect routing source as               # 収集したいフィールド(Non-key)をcollectで定義
bas-6500-vg01(config-flow-record) # collect routing destination as          # 収集したいフィールド(Non-key)をcollectで定義
bas-6500-vg01(config-flow-record) # flow exporter CHECKExport               # チェック、収集したいデータらキャッシュを CHECKExport という名前でエクスポートを定義
bas-6500-vg01(config-flow-exporter) # destination xx.xx.xx.xx         # ECSのIPアドレスを指定
bas-6500-vg01(config-flow-exporter) # source gigabitEthernet 0/1            # Export方法を指定
bas-6500-vg01(config-flow-exporter) # transport UDP 9995                     # Export方法を指定
bas-6500-vg01(config-flow-exporter) # export-protocol netflow-v9            # Export方法を指定
bas-6500-vg01(config-flow-exporter) # template data timeout 60             # Export方法を指定
bas-6500-vg01(config-flow-exporter) # option application-table timeout 60         # Export方法を指定
bas-6500-vg01(config-flow-exporter) # option application-attributes timeout 300   # Export方法を指定
bas-6500-vg01(config-flow-exporter) # flow monitor CHECKMonitor             # netflowのレコードおよびエクスポートに合わせて CHECKMonitor という名前でモニターを定義
bas-6500-vg01(config-flow-monitor) # record CHECKrecord                     # CHECKMonitor のレコード CHECKrecord を指定
bas-6500-vg01(config-flow-monitor) # exporter CHECKExport                   # CHECKMonitor のエクスポート CHECKExport を指定
bas-6500-vg01(config-flow-monitor) # cache timeout active 60               # モニターの方法を指定
bas-6500-vg01(config-flow-monitor) # cache timeout inactive 15             # モニターの方法を指定
bas-6500-vg01(config-flow-monitor) # exit
bas-6500-vg01(config) # interface gigabitEthernet 0/1                       # 上記の設定をインターフェースに反映
bas-6500-vg01(config-if) # ip flow monitor CHECKMonitor input               # 上記の設定をインターフェースに反映        
bas-6500-vg01(config-if) # ip flow monitor CHECKMonitor output              # 上記の設定をインターフェースに反映
bas-6500-vg01(config-if) # exit
bas-6500-vg01 # wr mem
Building configuration...

[OK]
bas-6500-vg01 # 

この設定ができれば、ECS側でUDP受信が出来てることを確認できます。


STEP2: Logstashの導入、設定

Logstashは、Elastic 社が開発しているオープンソースのデータ処理パイプラインです。
Logstashを使えば、様々なデータソースをStash(格納)することができます。

f:id:sbc_ohara:20201217155834p:plain

このhelpに従って、ECS側でLogstashを導入します。

www.alibabacloud.com

ちなみに著者のJavaはOpenJDK1.8.0です。

[root@retail-indoor-demo ~]# java -version
openjdk version "1.8.0_252"
OpenJDK Runtime Environment (build 1.8.0_252-b09)
OpenJDK 64-Bit Server VM (build 25.252-b09, mixed mode)
[root@retail-indoor-demo ~]#

Logstashは様々なテンプレートらプラグインがあるので、今回はnetflowをハンドリングするNetflow codec pluginを参考にconfファイルを作成します。

www.elastic.co

input {
    udp {
       port => 9995 
       codec => netflow {
          versions => [9]
        }
       type => netflow
    }
}
filter {
    mutate {
        convert => {"[netflow][ipv4_src_addr]" => "string"
        }
    }
    geoip {
        source => "[netflow][ipv4_src_addr]"
    }
}
output {
   logservice {
        codec => "json"
        endpoint => "ap-northeast-1.log.aliyuncs.com"
        project => "techblog"
        logstore => "netflow_logstore"
        topic => ""
        source => "netflow"
        access_key_id => "<your ACCESSKEYID >"
        access_key_secret => "<your ACCESSKEY >"
        max_send_retry => 10
    }
}

上記、 input はデータ入力に関する設定、filter はデータ変換に関する設定、outputはデータ出力に関する設定を行います。
特にLogServiceへOutputするときのパラメータは上記通りですが、その他詳しくはリファレンスを参照したほうが早いと思います。

www.alibabacloud.com

www.elastic.co

www.elastic.co


STEP3: LogstashからLogServiceへ

コマンドでLogstashを実行します。

bin/logstash netflow.conf

あとはLogServiceでNetFlowらNWトラフィックが見えたら完了です。

f:id:sbc_ohara:20201217164635p:plain


完了

Logstashを使って、netflowデータを収集する方法を簡単に説明しました。
これにより、誰が、どこをどうみてるか、を明確に可視化することが出来ます。
もちろん、NWトラフィックに基づいてLogService付帯のアラート機能を使うことや、ユーザのパターンから、異常検知や、予測を行うことも可能です。
予測機能は、視聴時間と視聴回数から依存症を特定することなどが挙げられます。

ちなみにLogstashで収集するのであれば、データ送信先はLogServiceだけでなく、DataHubを通じてMaxCompute、もしくはElasticsearchへ送信することもできます。ここはいずれ記事にて紹介したいと思います。

aliyun-log-python-sdk.readthedocs.io

(懲りずテンプレな記載で恐縮ですが)
LogServiceはフルマネージド環境でありながら、様々なデータを収集し蓄積・可視化する事が可能です。
加えて、データ量や使い方に応じた課金なので、使い方次第ではコスト削減や、運用負荷の改善に効果があるのでは無いでしょうか。

最後までお読みいただきありがとうございました。

参考になったサイト

www.cisco.com

Step-by-Step Setup of ELK for NetFlow Analytics - Cisco Blogs