Hi, データエンジニアの大原です。
今回はAlibaba Cloudの国際サイトで提供している LogService のご紹介、および株価をLogServiceで予測・異常検知・検知する方法を記載します。
また、SBC Engineers' Blogの性格上、集中連載記事になります。
【Logservice連載】オフラインデータを含めた、様々なデータソースをシームレスに収集するLogServiceのご紹介
【Logservice連載】LogtailでCSVデータを収集するLogService
【Logservice連載】SDKでExcelデータを収集するLogService
【Logservice連載】OSS、AWS S3からCSVデータを収集するLogService
【Logservice連載】SDKでTwitterデータを収集するLogService
【Logservice連載】Logstashでnetflowデータを収集するLogService
【Logservice連載】Google Apps Script(GAS)で株価データを収集し、LogServiceの機械学習で株価予測・異常検知・監視をする ←本記事
【Logservice連載】NYC-Taxi on Logservice
前書き
LogServiceの可能性をこのblogにて記載しました。
LogService は、リアルタイムデータロギングサービスです。
ログの収集、消費、出荷、検索、および分析をサポートし、大量のログを処理および分析する能力を向上させます。
少し前になりますが、LogServiceについての資料をSlideShareへアップロードしていますので、こちらも参考になればと思います。
www2.slideshare.net
今回はGoogle Apps Script(GAS)を使ってAlibaba Cloud LogServiceへ収集、蓄積、可視化してみましょう。構成図で、こんな感じです。
プロジェクト作成(LogService全体で共通事項)
まずはプロジェクトを作成します。LogServiceコンソールから 「Create Project」を選択し、起動します。
![]() |
Project Nameをここでは「techblog」にし、プロジェクトを作成します。
その直後に "Do you want to create a Logstore for log data storage immediately?"、「Log Storeを作成しますか?」とポップアップが出ます。
Log StoreはLog Serviceでデータを蓄積するものなので、「OK」を選定します。
LogStore Nameをここでは「stock_logstore」と入力し、LogStoreを作成します。
その後、「LogStoreが作成されました。今すぐデータアクセスしますか?」とポップアップが出ますが、これは必要に応じて選定すると良いです。
ちなみに「Yes」を選択した場合、50を超える様々なデータアクセス手法のコンソールが表示されます。
データ格納について
STEP1: Google Apps Script(GAS)の設定、Webスクレイピングでデータ格納
今回はGoogle Apps Script(GAS)を使って、LogServiceへデータを格納します。 サンプルがあるので、これを参考に作成します。 (中国語なので気合を入れてGoogle翻訳しながら読んでみました)
流れとしては、①GASで新規プロジェクトを作成 ②外部ライブラリであるaliyun-sdk
をGASで使えるようにする になります。
ここではclaspというnpmパッケージをローカル環境にグローバルインストールし、ローカルで開発(webpackをbuild)したものをGASにpushします。(gas-clasp-starterでも良いです。)
まずはwebブラウザ側にてGASで新規プロジェクトを作成、および以下のソースコードを記載します(必要なライブラリが入ってないため、まだ実行できません)
function run_stocks_put() { var ENDPOINT = '<your endpoint>' var ACCESSKEYID = '<your ACCESSKEYID>' var ACCESSKEY = '<your ACCESSKEY>' var PROJECT = 'tecblog' var LOGSTORE = 'stock_logstore' var TOKEN = "" var topic = '9984_stock' var source = '127.0.0.1' var sls = new ALY.SLS({ "accessKeyId": ACCESSKEYID, "secretAccessKey": ACCESSKEY, "securityToken" :"tokens", endpoint: ENDPOINT, apiVersion: '2015-06-01' }); var contents = { logs : [{ time: get_current_time(), contents: [{ key: 'stock', value: fetch_stocks_data() }] }], topic: topic, source: source }; sls.putLogs({ projectName: PROJECT, logStoreName: LOGSTORE, logGroup: contents }, function (err, data) { if (err) { console.log('error:', err); return; } console.log('success:', data); }); } function fetch_stocks_data() { var url = "https://stocks.finance.yahoo.co.jp/stocks/detail/?code=9984.T"; var m, strStocks, reg = /<td class="stoksPrice">.*?<\/td>/g const content = UrlFetchApp.fetch(url).getContentText('UTF-8'); while (m = reg.exec(content)) { Logger.log(m[0]); strStocks = m[0].replace(' <td class="stoksPrice">', ""); strStocks = strStocks.replace('</td>', ""); strStocks = strStocks.replace(',', ""); return strStocks; } } function get_current_time() { var now = new Date(); return "" + now.getFullYear() + "/" + padZero(now.getMonth() + 1) + "/" + padZero(now.getDate()) + " " + padZero(now.getHours()) + ":" + padZero(now.getMinutes()) + ":" + padZero(now.getSeconds()); }
続いて、ローカルでclaspを導入します。
PS C:\Users\1200358> npm i @google/clasp -g
ターミナルからGoogleにログインします。
PS C:\Users\1200358> clasp login
Google Apps Script APIを開いて、Google Apps Script APIを有効にします。
ターミナルにて、GASの既存プロジェクトをCloneします。そのために、.clasp.json を開いてScript IDを設定します。Script IDはGASを実行するIDのことで、URLに付帯されています。以下画像の赤枠がIDです。
.clasp.json は インストール時にPathが表示されています。ここでは、/User/<your name>/.clasp.json
にあります。
{ "scriptId": "1_AsyvNcAcgs6aQQuytdLlYK-7Lh012bvhlWXo0PiNQfgf-8znF9Ytu9t", "rootDir": "dist" }
既存プロジェクトをCloneします。
PS C:\Users\1200358> clasp clone xxxSCRIPT-ID1_AsyvNcAcgs6aQQuytdLlYK-7Lh012bvhlWXo0PiNQfgf-8znF9Ytu9t"
ここで、入れたいモジュールをインストールします。このモジュールは、上記のPJにて、webpackとしてビルドしたものをclasp経由でGASでも使えるようにします。
PS C:\Users\1200358> npm install aliyun-sdk
ソースコードにindex.js
を入れます。
var ALY = require('../../index.js');
ローカル開発で作ったものをGAS側へ反映します。
PS C:\Users\1200358> clasp push
あとはGASのマイトリガーで株価が出てる時間帯に合わせて、10分おきにデータを取得します。
ちなみに東証で株式市場が開いているのは平日 9:00 - 15:00のみです。
(しばらくして、、)データが無事登録されてることを確認しました。
ちなみにWebスクレイピングはサイトによっては著作権ら法律があるので、事前に注意をチェックのうえご対処願います。
それでも大量のデータを連続取得したい場合は、(相手のサーバ負荷を考慮して)kabuというサイトなどで取得したほうが良いと思います。
STEP2: 文字型データを数値型データへ変換する(データクレンジング、ETL)
取得したデータは基本的に「文字型」として登録されます。なので、自らData Transformationでデータの変換処理をする必要があります。
この例の場合、「STOCK」フィールドが文字型なので、Data Transformationの以下の関数を使って「int_STOCK」という数値型フィールドを新規作成します。
※Queryのcast文により文字型→数値型変換ができるため、このケースでのData Transformation作業は基本的に不要ですが、他のデータ、例えば数値に+9,412
といった +
や,
がついてるものを除外したい、株価以外のデータから特定の文字を抽出or置換したい場合はこちらを参考にしてください。
e_set("int_STOCK", ct_int(v("STOCK")))
この数値型へ変換するルールを継続的に利用する場合は、ETLルールとして「Transformation Rule」を保存し登録、別のLogStoreへETLしアウトプットする必要があります。
※出力する「別のLogStore」は事前にて同じプロジェクト配下で新しく作成し、「Enable」ボタンを押してLogStoreをアクティベーションすると後がスムーズです。
ルール作成後、ETL処理が実行されます。ETL対象(日付、処理内容)によっては時間がかかります。進捗ステータスはこちらで確認できます。
ETL処理が終わったら、別のLogStoreである「stock_logstore_etl」にて処理結果が出力されます。
STEP3:Index機能の有効化、チャートを作ってみる
株価は折れ線チャートです。これはすぐできると思います。 先に、取得したデータの各フィールドを含め、Index機能を有効化します。Index化することで、LogStoreの中にあるデータを素早く検索できるようになり、結果としてチャートをリアルタイムに可視化することが出来ます。
[Index Attributes] > [Modify] でIndex登録の作業をします。
[Automatically Generate Index Attributes]をクリックし、フィールドタイプ(数値型、テキスト型、json型 etc)を選定しOKをクリックします。
その後はLogStoreに入ってる過去データ/これから発生するデータに反映するようにします。
以上で、各フィールドはIndex化が出来たと思います。
ここまでの作業が問題なければ、Queryバーにて以下Query文を入力し[Search &Analyze ]をクリックすると、以下画面左下の[Data Preview]にてデータが出力されます。
* | select date_parse(DATE_ID,'%Y/%m/%d %H:%i') as Date, cast(STOCK AS bigint) as Stock
上記SQL文についてを説明します。LogServiceのQueryは RDBなどで使われるT-SQLとは異なって全文検索をフォーカスとしたQueryとなります。もちろん読み取り専用(Select文のみ)です。詳しくは以下helpを参考にしてみてください。
* | select date_parse(DATE_ID,'%Y/%m/%d %H:%i') as Date, cast(STOCK AS bigint) as Stock
date_parse(DATE_ID,'%Y/%m/%d %H:%i') as Date
は、「DATE_ID」フィールドを指定のタイムスタンプ形式で表示します。
以下のドキュメントの下の方に、Date/Timeのフォーマット設定方法がありますので、参考にしてみてください。
cast(STOCK AS bigint) as Stock
は 「STOCK」フィールドが文字型なので、数値型へ変換します。
上記のQuery文によりデータが見れるようになったので、[Properites] タブをクリックし、グラフとして表示したい [X Axis] と [Left Y Axis] などのフィールドを設定します。
これにより、チャートが見えるようになるはずです。上記は折れ線グラフですが、他のグラフもありますので色々試してみるといいでしょう。
作成したグラフをDashboardへ出力します。
Dashboardは上記のようなQuery結果をいつでも可視化できるメリットがあります。その他、複数ユーザによる閲覧や、リアルタイム可視化などいろいろありますが、ここは説明を省きます。
[Add to New Dashboard]をクリックし、任意のDashboardへ追加します。
Dashboardにて、先ほどのグラフが表示されました。以降のStepはこのDashboardを使って説明します。
STEP4:LogServiceで異常検知・予測する
やっと本題に入ります。(ここまで1万2千文字Over、見づらくなりすいません、、)
LogServiceは元々SIEMをメインとしたサービスですが、上記のようなチャートで可視化する箇所が多ければユーザーは追い付かなくなります。そのため、様々な時系列分析アルゴリズムによる異常検知・時系列の予測を行うことができます。
異常検知は以下のように色々なパターンがありますが、今回は株価なので、変化点検知をメインとした異常検知をします。この場合、変化点検出関数ts_cp_detect
を使います。
時系列分析アルゴリズムによる関数の一覧はこちらにてあります。
Dashboardにて、[edit]ボタンを押しながら[chart]をダブルクリックします。すると、以下のようなチャートのEdit画面らモーダルウィンドウが表示されます。
このStepではDashboardのモーダルウィンドウ内を使って説明します。
チャートで日付フィールドが長いためかなり見にくいと思いますので、以下のQuery文を使って[DATE_ID]フィールドから「年(year)」を削除します。[DATE_ID]フィールドのスリム化です。
* | select substr(DATE_ID,6,length(DATE_ID)-5) as dt, cast(STOCK AS bigint) as stock
これで見えやすくなったと思います。
続いて、変化点検出関数ts_cp_detect
を使って、株価の変化点を検知するようにします。
Query文は以下の通りです。 ts_cp_detect関数で、第一引数はunixtime、第二引数は変化を検知したい値、第三以降のパラメータは任意入力ですが 検知したい閾値やスパン、サンプル値などを入力します。詳しくは上記のhelpにて記載されています。
* | select ts_cp_detect(to_unixtime(dt), stock , 3 ) from ( select date_parse(DATE_ID,'%Y/%m/%d %H:%i') as dt, cast(STOCK AS bigint) as stock from log )
これで表示されました。srcは値で、probは変化検知のための変化値を示します。probが 1
であれば、変化あり(=変化・異常)と示します。0
なら変化なしです。
ここで一つ問題があります。ts_cp_detect関数の返却値として、timestampフィールドがそのまま返却されます。これでは時系列データとして見にくいので、これをyyyy/mm/dd hh:mm形式に変換したい。そういう場合は以下のQuery文を入力します。
* | select from_unixtime(res.timestamp) as date_dt_from_unixtime , res.src as src, res.prob as prob from ( select ts_cp_detect(to_unixtime(dt), stock , 3 ) as res from ( select date_parse(DATE_ID,'%Y/%m/%d %H:%i') as dt, cast(STOCK AS bigint) as stock from log ) )
チャートにて、異常検知の時の赤丸(=probフィールドが1
)を表示するために、以下設定します。
これで設定完了です。株価をリアルタイム可視化、異常検知することもできます。
今度は予測機能を入れてみます。ts_predicate_arma関数で、自己回帰移動(ARIMA)モデルを使用して時系列データをモデル化し、簡単な時系列予測と異常検出を実行します。
ts_predicate_arma関数で、第一引数はunixtime、第二引数は変化を検知したい値、第三以降は任意ですが 検知したい閾値やスパン、サンプル値などを入力します。詳しくは上記のhelpにて記載されています。
* | select ts_predicate_arma(to_unixtime(dt), stock,10, 2, 2, 1, 'max' ) as res from ( select date_parse(DATE_ID,'%Y/%m/%d %H:%i') as dt, cast(STOCK AS bigint) as stock from log )
これで表示されました。srcは値で、predictは予測値、upper/lowerは信頼区間、anomaly_probは異常検知のための変化値を示します。anomaly_probが1であれば、変化あり(=異常)と示します。
今は株価が上昇傾向なので問題ないのですが、低下時 緑色のラインが下へ向くので、そこから動向の予測ができます。
LogServiceの機械学習は以下のアルゴリズムが梱包されていますので、目的に応じて色々試すのもありと思います。
余談ですが、Alibaba CloudのほうでLogServiceによる異常検知のdemoのサイトがありますので、こちらも参考にいただければ幸いです。
STEP5: 監視・株価が急激に低下したときアラート発信する
helpにて、[Case study:トラフィックが大幅に低下したときに検知するクエリ] がありますので、これを参考にアラートを作成します。
Case study - Index and query| Alibaba Cloud Documentation Center
* | SELECT sum(stock) / (max(to_unixtime(date_parse(DATE_ID,'%Y/%m/%d %H:%i'))) - min(to_unixtime(date_parse(DATE_ID,'%Y/%m/%d %H:%i')))) as stock_per_minute, date_trunc('minute',date_parse(DATE_ID,'%Y/%m/%d %H:%i')) as minute group by minute
あとは不本意ですがメールが届かない(=株が暴落しない)ことを祈りましょう。
完了
Google Apps Script(GAS)で株価データを取得しLogServiceへ格納、およびLogServiceの機械学習機能を使って、株価の予測および異常検知を簡単に説明しました。
また、株価が低下したときはメールによるアラートを発信することが出来ます。
(今回、ターゲットとなる株価で暴落がなかったため、メール受信できず。なので暴落がきたときはメールのスクショ画像を含め、このblogを追記更新します)
これにより、いつでもどこでも可視化はもちろん、異常検知や予測、監視が出来ることから目視作業を大幅に減らすことが出来ます。
(懲りずテンプレな記載で恐縮ですが)
LogServiceはフルマネージド環境でありながら、様々なデータを収集し蓄積・可視化する事が可能です。
加えて、データ量や使い方に応じた課金なので、使い方次第ではコスト削減や、運用負荷の改善に効果があるのでは無いでしょうか。
最後までお読みいただきありがとうございました。