【Logservice連載】OSS、AWS S3からデータを収集するLogService

Hi, データエンジニアの大原です。 今回はAlibaba Cloudの国際サイトで提供している LogService のご紹介、およびOSS、AWS S3からLogServiceへデータを収集する方法を記載します。
また、SBC Engineers' Blogの性格上、集中連載記事になります。

【Logservice連載】オフラインデータを含めた、様々なデータソースをシームレスに収集するLogServiceのご紹介
【Logservice連載】LogtailでCSVデータを収集するLogService
【Logservice連載】SDKでExcelデータを収集するLogService
【Logservice連載】OSS、AWS S3からCSVデータを収集するLogService ←本記事
【Logservice連載】SDKでTwitterデータを収集するLogService
【Logservice連載】Logstashでnetflowデータを収集するLogService
【Logservice連載】Google Apps Script(GAS)で株価データを収集し、LogServiceの機械学習で株価予測・異常検知・監視をする
【Logservice連載】NYC-Taxi on Logservice

f:id:sbc_ohara:20201118125247p:plain

前書き

LogServiceの可能性をこのblogにて記載しました。

LogService は、リアルタイムデータロギングサービスです。
ログの収集、消費、出荷、検索、および分析をサポートし、大量のログを処理および分析する能力を向上させます。

少し前になりますが、LogServiceについての資料をSlideShareへアップロードしていますので、こちらも参考になればと思います。

www2.slideshare.net

今回はOSS、AWS S3を使ってAlibaba Cloud LogServiceへ収集、蓄積、可視化してみましょう。構成図で、こんな感じです。

f:id:sbc_ohara:20201229131536p:plain


プロジェクト作成(LogService全体で共通事項)

まずはプロジェクトを作成します。LogServiceコンソールから 「Create Project」を選択し、起動します。

f:id:sbc_ohara:20201124101928p:plain


Project Nameをここでは「techblog」にし、プロジェクトを作成します。 f:id:sbc_ohara:20201124102655p:plain

その直後に "Do you want to create a Logstore for log data storage immediately?"、「Log Storeを作成しますか?」とポップアップが出ます。 Log StoreはLog Serviceでデータを蓄積するものなので、「OK」を選定します。 f:id:sbc_ohara:20201124102805p:plain

LogStore Nameをここでは「oss_logstore」と入力し、LogStoreを作成します。

その後、「LogStoreが作成されました。今すぐデータアクセスしますか?」とポップアップが出ますが、これは必要に応じて選定すると良いです。 ちなみに「Yes」を選択した場合、50を超える様々なデータアクセス手法のコンソールが表示されます。 f:id:sbc_ohara:20201124103134p:plain


データ格納について

(再掲)このシナリオでは 以下の図のように、OSS、AWS S3を使ってcsvファイルを登録します。
【Logservice連載】LogtailでCSVデータを収集するLogService のOSS版を目指します。


STEP1: OSSにてcsvファイルのアップロード

まずはOSS格納から行います。この作業自体シンプルで5分もしないうちに済みます。(著者的には多分csvファイルをLogServiceへ格納する作業の中で最も一番楽と思います)

f:id:sbc_ohara:20201229132754p:plain

OSSにて、csvファイルをアップロードします。以下、シンプルなcsvファイルです。 ※LogServiceと同リージョンであることが望ましい

f:id:sbc_ohara:20201229133431p:plain


STEP2: OSSからLogServiceへインポート

LogServiceの LogStore コンソールにて、インポート諸元を [OSS - Object Storage Service]へクリックします。

f:id:sbc_ohara:20201229134405p:plain

OSSにてcsvファイルをアップロードしたので、そのOSSのBucket、Prefixを入力します。FormatもCSVへプルタウン選択します。
必要情報入力が出来たら、無事ロード出来てるか確認として、「Preview」をクリックします。これでcsvファイルの先頭数行の読み取り結果が表示されたらOKで、次のステップへ進めます。

f:id:sbc_ohara:20201229133943p:plain

続いて、OSSにあるcsvファイルの[START_DT]フィールドをLogServiceのsystem timeとして登録したいので、[Use system time]のスイッチを切り替えます。
すると [Time Field] [time format] などが表示されますので、合わせて入力します。
今回の[START_DT]フィールドは yyyy/mm/dd hh:mmフォーマット形式なので、yyyy/mm/dd HH:mmとして登録します。Time zomeも日本時間に合わせてGMT+09:00にします。

csvなどのファイルに含まれている日付 - 時間のフォーマットをLog Serviceへ合わせる場合は、こちらを参考にしてフォーマットを指定する必要があります。

Time formats - Data Collection| Alibaba Cloud Documentation Center

このあとは「Test」をクリックし、csvファイルの先頭数行の読み取り結果およびLog Time(Unixtime)が無事表示されたらOKです。次のステップへ進めます。

f:id:sbc_ohara:20201229134701p:plain

Import Intervalで5分おきに実施にします。これは何分おきに新規ファイルがあればインポート処理をするかといった設定です。

f:id:sbc_ohara:20201229144844p:plain

Import処理に入ります。ここは待たなくても、処理中でも結果が表示されるので、「Next」をクリックし、「Log Query」の「Try now」で即可視化します。

f:id:sbc_ohara:20201229135324p:plain

f:id:sbc_ohara:20201229135519p:plain

以上で表示されます。(タイムラグもあるのでURLを更新し、ページを再リロードしてください)

f:id:sbc_ohara:20201229152650p:plain


STEP3: AWS S3にcsvファイルのアップロード

今度はAWS S3格納から行います。これはAWSでのsyslogやcsvファイルなどのデータをLogServiceで可視化したいときに役立ちます。

f:id:sbc_ohara:20201229132935p:plain

AWSにて、csvファイルをアップロードします。

f:id:sbc_ohara:20201229140533p:plain


STEP4: AWS lambdaでの設定

AWS Lambdaを使って、AWS S3からAlibaba Cloud OSSへデータを自動転送するようにします。流れとしては、上記のAWS S3 bucketに新規でcsvファイルがアップロードされたら、STEP1で作成したOSSのbucketに転送し、あとはSTEP2の設定によりLogServiceへ自動格納します。

デザイナーおよび関数コードは以下の通りです。Python 3.7です。

import json
import urllib.parse
import boto3
import uuid

print('Loading function')

s3 = boto3.resource('s3')

def lambda_handler(event, context):
    bucket = event['Records'][0]['s3']['bucket']['name']
    key = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'], encoding='utf-8')
    try:
        oss = boto3.client('s3', aws_access_key_id='<your aws_access_key_id>',aws_secret_access_key='<your aws_secret_access_key>',
                                                region_name='cn-hangzhou', endpoint_url='http://datalake-ohara-202010.oss-cn-hangzhou.aliyuncs.com')
        tmp_file_name = uuid.uuid1()
        s3.Bucket(bucket).download_file(key,'/csv/{0}'.format(tmp_file_name))
        response = oss.upload_file('/csv/{0}'.format(tmp_file_name), 'lamdatooss', key)
        
        return response
    except Exception as e:
        print(e)
        print('Error getting object {} from bucket {}. Make sure they exist and your bucket is in the same region as this function.'.format(key, bucket))
        raise e

f:id:sbc_ohara:20201229143749p:plain

リソース概要は以下の通りです。

{
  "Version": "2012-10-17",
  "Id": "default",
  "Statement": [
    {
      "Sid": "lambda-a2b5ac8d-923a-4b8a-911a-3147fb6c1dea",
      "Effect": "Allow",
      "Principal": {
        "Service": "s3.amazonaws.com"
      },
      "Action": "lambda:InvokeFunction",
      "Resource": "arn:aws:lambda:ap-northeast-1:<your aws_access_id>:function:s3oss_demo",
      "Condition": {
        "StringEquals": {
          "AWS:SourceAccount": "<your aws account>"
        },
        "ArnLike": {
          "AWS:SourceArn": "arn:aws:s3:::democode2020"
        }
      }
    }
  ]
}

f:id:sbc_ohara:20201229141735p:plain

あとは「デプロイ」で、AWS S3にあったcsvファイルがAlibaba Cloud OSSにコピーという形で格納されます。
以降、AWS S3の上記bucketに新規csvファイルが格納されたら、上記lambdaによって自動でコピーしOSSへ格納されます。

f:id:sbc_ohara:20201229142509p:plain

その後はSTEP2 で設定したImportの処理により、LogService側もAWS S3からのデータが無事反映されてることがわかります。

f:id:sbc_ohara:20201229151636p:plain


完了

OSS Importを使って、csvファイルをLogServiceへ格納、可視化する方法を簡単に説明しました。
LogServiceはシンプルかつスピーディに構築することができます。この構築も、可視化まで1時間もかからないです。5分あれば見れます。

LogServiceはフルマネージド環境でありながら、様々なデータを収集し蓄積・可視化する事が可能です。
加えて、データ量や使い方に応じた課金なので、使い方次第ではコスト削減や、運用負荷の改善に効果があるのでは無いでしょうか。

最後までお読みいただきありがとうございました。