【Logservice連載】SDKでTwitterデータを収集するLogService

Hi, データエンジニアの大原です。 今回はAlibaba Cloudの国際サイトで提供している LogService のご紹介、およびSDKでTwitterデータをLogServiceへ収集する方法を記載します。
また、SBC Engineers' Blogの性格上、集中連載記事になります。

【Logservice連載】オフラインデータを含めた、様々なデータソースをシームレスに収集するLogServiceのご紹介
【Logservice連載】LogtailでCSVデータを収集するLogService
【Logservice連載】SDKでExcelデータを収集するLogService
【Logservice連載】OSS、AWS S3からCSVデータを収集するLogService
【Logservice連載】SDKでTwitterデータを収集するLogService←本記事
【Logservice連載】Logstashでnetflowデータを収集するLogService
【Logservice連載】LogServiceの機械学習で株価予測・異常検知・監視をする
【Logservice連載】NYC-Taxi on Logservice

f:id:sbc_ohara:20201118125247p:plain

前書き

LogServiceの可能性をこのblogにて記載しました。

LogService は、リアルタイムデータロギングサービスです。
ログの収集、消費、出荷、検索、および分析をサポートし、大量のログを処理および分析する能力を向上させます。

少し前になりますが、LogServiceについての資料をSlideShareへアップロードしていますので、こちらも参考になればと思います。

www2.slideshare.net

今回はPython SDKを使ってTwitterデータをAlibaba Cloud LogServiceへ収集、蓄積、可視化してみましょう。構成図で、こんな感じです。

f:id:sbc_ohara:20201215140931p:plain


プロジェクト作成(LogService全体で共通事項)

まずはプロジェクトを作成します。LogServiceコンソールから 「Create Project」を選択し、起動します。

f:id:sbc_ohara:20201124101928p:plain


Project Nameをここでは「techblog」にし、プロジェクトを作成します。 f:id:sbc_ohara:20201124102655p:plain

その直後に "Do you want to create a Logstore for log data storage immediately?"、「Log Storeを作成しますか?」とポップアップが出ます。 Log StoreはLog Serviceでデータを蓄積するものなので、「OK」を選定します。 f:id:sbc_ohara:20201124102805p:plain

LogStore Nameをここでは「twitter_logstore」と入力し、LogStoreを作成します。

その後、「LogStoreが作成されました。今すぐデータアクセスしますか?」とポップアップが出ますが、これは必要に応じて選定すると良いです。 ちなみに「Yes」を選択した場合、50を超える様々なデータアクセス手法のコンソールが表示されます。 f:id:sbc_ohara:20201124103134p:plain


データ格納について

STEP1: Twitter APIの準備

Twitter APIを利用するために、Twitter Developer Platformサービスから、Twitter API利用申請をします。

developer.twitter.com

(ここでの説明は省略します)

申請後、Twitter APIとしてCONSUMER_KEY、CONSUMER_SECRET 、ACCESS_KEY 、ACCESS_SECRET を取得します。


STEP2: LoggService 環境の準備

SDKでTwitterデータを収集する前に、LogService側でいろいろ準備する必要があります。
ENDPOINT 、ACCESSKEYID 、ACCESSKEY 、PROJECT 、LOGSTORE 、TOKEN 、topic 、source のパラメータ値を用意しなければなりません。

ENDPOINTはプロダクトサービスが利用している国を指します。
今回は日本リージョンを選定しているので、 ap-northeast-1.log.aliyuncs.com を選定します。

www.alibabacloud.com

ACCESSKEYID 、ACCESSKEY はAlibaba Cloudユーザーの認証情報です。これがあれば、様々なサービス・リソースへのプログラムによるアクセスを許可します。
これはコンソールから確認できます。RAMユーザー(子アカウント)の場合は、RAMユーザとしてのACCESSKEYID 、ACCESSKEYを入手する必要があります。

PROJECT はLogServiceのProjectのことを指します。上記「techblog」と作成したので、Projectはtechblogにします。

LOGSTORE は 「techblog」らProject配下に作成したLogStoreを指定する必要があります。上記「twitter_logstore」を作成したので、ここはtwitter_logstoreにします。

TOKEN 、topic 、source は任意です。これはLogServiceでデータを出力するときに参照として残るパラメータ値です。


STEP3: ECSにて以下Pythonファイルを作成

上記のTwitter API CONSUMER_KEY、CONSUMER_SECRET 、ACCESS_KEY 、ACCESS_SECRET、
およびNDPOINT 、ACCESSKEYID 、ACCESSKEY 、PROJECT 、LOGSTORE 、TOKEN 、topic 、source のパラメータ値の準備ができたら、 ECSなどにて以下Pythonコードを記載します。 著者の Python version は Python 3.6.8 です。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

# pip3 install -U aliyun-log-python-sdk
# pip3 install tweepy

from tweepy import StreamListener
from tweepy import Stream
import tweepy, json, time
from datetime import datetime

from aliyun.log.logitem import LogItem
from aliyun.log.logclient import LogClient
from aliyun.log.putlogsrequest import PutLogsRequest

CONSUMER_KEY = "<your CONSUMER_KEY>"
CONSUMER_SECRET ="<your CONSUMER_SECRET >"
ACCESS_KEY = "<your ACCESS_KEY >"
ACCESS_SECRET = "<your ACCESS_SECRET >"

ENDPOINT = 'ap-northeast-1.log.aliyuncs.com'
ACCESSKEYID = "<your ACCESSKEYID >"
ACCESSKEY ="<your ACCESSKEY >"
PROJECT = 'techblog'
LOGSTORE = 'twitter_logstore'
TOKEN = ""
topic = 'twitter_log_demo'
source = 'twitter'

auth = tweepy.OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET)
auth.set_access_token(ACCESS_KEY, ACCESS_SECRET)
api = tweepy.API(auth)

client = LogClient(ENDPOINT, ACCESSKEYID, ACCESSKEY, TOKEN)

class StdOutListener(StreamListener):
    def on_data(self, data):
        # print(data)
        tweet = json.loads(data)
        if not tweet['retweeted'] and 'RT @' not in tweet['text']: # retweetは取得しない
            new_datetime = datetime.strftime(datetime.strptime(tweet["created_at"],"%a %b %d %H:%M:%S +0000 %Y"), "%Y-%m-%d %H:%M:%S")
            utc_datetime = datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z' 

            if 'Web' in tweet['source']:
                isWeb = 1
                isAndroid = 0
                isiPhone = 0                              
            elif  'Android' in tweet['source']:
                isWeb = 0
                isAndroid = 1
                isiPhone = 0   
            elif  'iPhone' in tweet['source']:
                isWeb = 0
                isAndroid = 0
                isiPhone = 1   
            else:
                isWeb = 0
                isAndroid = 0
                isiPhone = 0   

            contents = [
                ( 'created_at', new_datetime ),
                ( 'created_at_utc', utc_datetime ), 
                ( 'id_str', tweet['id_str'] ),                
                ( 'text', tweet['text'] ),
                ( 'source', tweet['source'] ), 
                ( 'Web', str(isWeb) ),               
                ( 'Android', str(isAndroid) ), 
                ( 'iPhone', str(isiPhone) ), 
                ( 'tweet_size', str(len( tweet['text'] )) ) 
                ]
            print(contents)
            # put LogService
            logitemList = []
            logItem = LogItem()
            logItem.set_time(int(time.time()))
            logItem.set_contents(contents)
            logitemList.append(logItem)

            request = PutLogsRequest(PROJECT, LOGSTORE, topic, source, logitemList)
            response = client.put_logs(request)
            response.log_print()

    def on_error(self, status):
        print(status)

if __name__ == '__main__':
    query = "#鬼滅の刃" # 取得したい特定のキーワードやハッシュタグ
    listener = StdOutListener()
    twitterStream = Stream(auth, listener)
    twitterStream.filter(track = [query])

注意として、このコードはそのままでは実行できないので、以下コマンドでTweepy、logservice-sdkをインストール実行する必要があります。

pip3 install -U aliyun-log-python-sdk
pip3 install tweepy

STEP4: 実行

ECSが閉じても恒久的に動くよう、nohup コマンドで 実行します。

[root@sts ~]# nohup python3 tweet.py &

これで以上です。取得したデータはこのような感じになります。

__source__:  twitter
__tag__:__client_ip__:  47.74.18.54
__tag__:__receive_time__:  1607694838
__topic__:  twitter_log_demo
created_at:  2020-12-1 13:53:53
created_at_utc:  2020-12-1T13:53:58.171Z
id_str:  1337395179389657091
iPhone:  0
Android:  1
Web:  0
source:  <a href="http://twitter.com/download/android" rel="nofollow">Twitter for Android</a>
text:  これぞ至高の領域に近き剣士…
 #グラブル  #鬼滅の刃  #日輪刀 https://t.co/IcW3ndTI4m
tweet_size:  30

f:id:sbc_ohara:20201217101728p:plain

完了

LogService Python SDKを使って、Twitterデータを収集する方法を簡単に説明しました。
この作業自体、5分もかからないです(Twitter API申請は除く、、)

ちなみにSDKは単にデータを収集するだけでなく、整形、ETL、格納処理もあります。

aliyun-log-python-sdk.readthedocs.io

LogServiceはフルマネージド環境でありながら、様々なデータを収集し蓄積・可視化する事が可能です。
加えて、データ量や使い方に応じた課金なので、使い方次第ではコスト削減や、運用負荷の改善に効果があるのでは無いでしょうか。

最後までお読みいただきありがとうございました。