Hi, データエンジニアの大原です。
今回はAlibaba Cloudの国際サイトで提供している LogService のご紹介、およびSDKでTwitterデータをLogServiceへ収集する方法を記載します。
また、SBC Engineers' Blogの性格上、集中連載記事になります。
【Logservice連載】オフラインデータを含めた、様々なデータソースをシームレスに収集するLogServiceのご紹介
【Logservice連載】LogtailでCSVデータを収集するLogService
【Logservice連載】SDKでExcelデータを収集するLogService
【Logservice連載】OSS、AWS S3からCSVデータを収集するLogService
【Logservice連載】SDKでTwitterデータを収集するLogService←本記事
【Logservice連載】Logstashでnetflowデータを収集するLogService
【Logservice連載】LogServiceの機械学習で株価予測・異常検知・監視をする
【Logservice連載】NYC-Taxi on Logservice
前書き
LogServiceの可能性をこのblogにて記載しました。
LogService は、リアルタイムデータロギングサービスです。
ログの収集、消費、出荷、検索、および分析をサポートし、大量のログを処理および分析する能力を向上させます。
少し前になりますが、LogServiceについての資料をSlideShareへアップロードしていますので、こちらも参考になればと思います。
www2.slideshare.net
今回はPython SDKを使ってTwitterデータをAlibaba Cloud LogServiceへ収集、蓄積、可視化してみましょう。構成図で、こんな感じです。
プロジェクト作成(LogService全体で共通事項)
まずはプロジェクトを作成します。LogServiceコンソールから 「Create Project」を選択し、起動します。
![]() |
Project Nameをここでは「techblog」にし、プロジェクトを作成します。
その直後に "Do you want to create a Logstore for log data storage immediately?"、「Log Storeを作成しますか?」とポップアップが出ます。
Log StoreはLog Serviceでデータを蓄積するものなので、「OK」を選定します。
LogStore Nameをここでは「twitter_logstore」と入力し、LogStoreを作成します。
その後、「LogStoreが作成されました。今すぐデータアクセスしますか?」とポップアップが出ますが、これは必要に応じて選定すると良いです。
ちなみに「Yes」を選択した場合、50を超える様々なデータアクセス手法のコンソールが表示されます。
データ格納について
STEP1: Twitter APIの準備
Twitter APIを利用するために、Twitter Developer Platformサービスから、Twitter API利用申請をします。
(ここでの説明は省略します)
申請後、Twitter APIとしてCONSUMER_KEY、CONSUMER_SECRET 、ACCESS_KEY 、ACCESS_SECRET を取得します。
STEP2: LoggService 環境の準備
SDKでTwitterデータを収集する前に、LogService側でいろいろ準備する必要があります。
ENDPOINT 、ACCESSKEYID 、ACCESSKEY 、PROJECT 、LOGSTORE 、TOKEN 、topic 、source のパラメータ値を用意しなければなりません。
ENDPOINTはプロダクトサービスが利用している国を指します。
今回は日本リージョンを選定しているので、 ap-northeast-1.log.aliyuncs.com
を選定します。
ACCESSKEYID 、ACCESSKEY はAlibaba Cloudユーザーの認証情報です。これがあれば、様々なサービス・リソースへのプログラムによるアクセスを許可します。
これはコンソールから確認できます。RAMユーザー(子アカウント)の場合は、RAMユーザとしてのACCESSKEYID 、ACCESSKEYを入手する必要があります。
PROJECT はLogServiceのProjectのことを指します。上記「techblog」と作成したので、Projectはtechblog
にします。
LOGSTORE は 「techblog」らProject配下に作成したLogStoreを指定する必要があります。上記「twitter_logstore」を作成したので、ここはtwitter_logstore
にします。
TOKEN 、topic 、source は任意です。これはLogServiceでデータを出力するときに参照として残るパラメータ値です。
STEP3: ECSにて以下Pythonファイルを作成
上記のTwitter API CONSUMER_KEY、CONSUMER_SECRET 、ACCESS_KEY 、ACCESS_SECRET、
およびNDPOINT 、ACCESSKEYID 、ACCESSKEY 、PROJECT 、LOGSTORE 、TOKEN 、topic 、source のパラメータ値の準備ができたら、
ECSなどにて以下Pythonコードを記載します。 著者の Python version は Python 3.6.8
です。
#!/usr/bin/env python # -*- coding: utf-8 -*- # pip3 install -U aliyun-log-python-sdk # pip3 install tweepy from tweepy import StreamListener from tweepy import Stream import tweepy, json, time from datetime import datetime from aliyun.log.logitem import LogItem from aliyun.log.logclient import LogClient from aliyun.log.putlogsrequest import PutLogsRequest CONSUMER_KEY = "<your CONSUMER_KEY>" CONSUMER_SECRET ="<your CONSUMER_SECRET >" ACCESS_KEY = "<your ACCESS_KEY >" ACCESS_SECRET = "<your ACCESS_SECRET >" ENDPOINT = 'ap-northeast-1.log.aliyuncs.com' ACCESSKEYID = "<your ACCESSKEYID >" ACCESSKEY ="<your ACCESSKEY >" PROJECT = 'techblog' LOGSTORE = 'twitter_logstore' TOKEN = "" topic = 'twitter_log_demo' source = 'twitter' auth = tweepy.OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET) auth.set_access_token(ACCESS_KEY, ACCESS_SECRET) api = tweepy.API(auth) client = LogClient(ENDPOINT, ACCESSKEYID, ACCESSKEY, TOKEN) class StdOutListener(StreamListener): def on_data(self, data): # print(data) tweet = json.loads(data) if not tweet['retweeted'] and 'RT @' not in tweet['text']: # retweetは取得しない new_datetime = datetime.strftime(datetime.strptime(tweet["created_at"],"%a %b %d %H:%M:%S +0000 %Y"), "%Y-%m-%d %H:%M:%S") utc_datetime = datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z' if 'Web' in tweet['source']: isWeb = 1 isAndroid = 0 isiPhone = 0 elif 'Android' in tweet['source']: isWeb = 0 isAndroid = 1 isiPhone = 0 elif 'iPhone' in tweet['source']: isWeb = 0 isAndroid = 0 isiPhone = 1 else: isWeb = 0 isAndroid = 0 isiPhone = 0 contents = [ ( 'created_at', new_datetime ), ( 'created_at_utc', utc_datetime ), ( 'id_str', tweet['id_str'] ), ( 'text', tweet['text'] ), ( 'source', tweet['source'] ), ( 'Web', str(isWeb) ), ( 'Android', str(isAndroid) ), ( 'iPhone', str(isiPhone) ), ( 'tweet_size', str(len( tweet['text'] )) ) ] print(contents) # put LogService logitemList = [] logItem = LogItem() logItem.set_time(int(time.time())) logItem.set_contents(contents) logitemList.append(logItem) request = PutLogsRequest(PROJECT, LOGSTORE, topic, source, logitemList) response = client.put_logs(request) response.log_print() def on_error(self, status): print(status) if __name__ == '__main__': query = "#鬼滅の刃" # 取得したい特定のキーワードやハッシュタグ listener = StdOutListener() twitterStream = Stream(auth, listener) twitterStream.filter(track = [query])
注意として、このコードはそのままでは実行できないので、以下コマンドでTweepy、logservice-sdkをインストール実行する必要があります。
pip3 install -U aliyun-log-python-sdk pip3 install tweepy
STEP4: 実行
ECSが閉じても恒久的に動くよう、nohup コマンドで 実行します。
[root@sts ~]# nohup python3 tweet.py &
これで以上です。取得したデータはこのような感じになります。
__source__: twitter __tag__:__client_ip__: 47.74.18.54 __tag__:__receive_time__: 1607694838 __topic__: twitter_log_demo created_at: 2020-12-1 13:53:53 created_at_utc: 2020-12-1T13:53:58.171Z id_str: 1337395179389657091 iPhone: 0 Android: 1 Web: 0 source: <a href="http://twitter.com/download/android" rel="nofollow">Twitter for Android</a> text: これぞ至高の領域に近き剣士… #グラブル #鬼滅の刃 #日輪刀 https://t.co/IcW3ndTI4m tweet_size: 30
完了
LogService Python SDKを使って、Twitterデータを収集する方法を簡単に説明しました。
この作業自体、5分もかからないです(Twitter API申請は除く、、)
ちなみにSDKは単にデータを収集するだけでなく、整形、ETL、格納処理もあります。
aliyun-log-python-sdk.readthedocs.io
LogServiceはフルマネージド環境でありながら、様々なデータを収集し蓄積・可視化する事が可能です。
加えて、データ量や使い方に応じた課金なので、使い方次第ではコスト削減や、運用負荷の改善に効果があるのでは無いでしょうか。
最後までお読みいただきありがとうございました。