【Logservice連載】Google Apps Script(GAS)で株価データを収集し、LogServiceの機械学習で株価予測・異常検知・監視をする

Hi, データエンジニアの大原です。
今回はAlibaba Cloudの国際サイトで提供している LogService のご紹介、および株価をLogServiceで予測・異常検知・検知する方法を記載します。
また、SBC Engineers' Blogの性格上、集中連載記事になります。

【Logservice連載】オフラインデータを含めた、様々なデータソースをシームレスに収集するLogServiceのご紹介
【Logservice連載】LogtailでCSVデータを収集するLogService
【Logservice連載】SDKでExcelデータを収集するLogService
【Logservice連載】OSS、AWS S3からCSVデータを収集するLogService
【Logservice連載】SDKでTwitterデータを収集するLogService
【Logservice連載】Logstashでnetflowデータを収集するLogService
【Logservice連載】Google Apps Script(GAS)で株価データを収集し、LogServiceの機械学習で株価予測・異常検知・監視をする ←本記事
【Logservice連載】NYC-Taxi on Logservice

f:id:sbc_ohara:20201118125247p:plain

前書き

LogServiceの可能性をこのblogにて記載しました。

LogService は、リアルタイムデータロギングサービスです。
ログの収集、消費、出荷、検索、および分析をサポートし、大量のログを処理および分析する能力を向上させます。

少し前になりますが、LogServiceについての資料をSlideShareへアップロードしていますので、こちらも参考になればと思います。

www2.slideshare.net

今回はGoogle Apps Script(GAS)を使ってAlibaba Cloud LogServiceへ収集、蓄積、可視化してみましょう。構成図で、こんな感じです。

f:id:sbc_ohara:20201226124040p:plain


プロジェクト作成(LogService全体で共通事項)

まずはプロジェクトを作成します。LogServiceコンソールから 「Create Project」を選択し、起動します。

f:id:sbc_ohara:20201124101928p:plain


Project Nameをここでは「techblog」にし、プロジェクトを作成します。 f:id:sbc_ohara:20201124102655p:plain

その直後に "Do you want to create a Logstore for log data storage immediately?"、「Log Storeを作成しますか?」とポップアップが出ます。 Log StoreはLog Serviceでデータを蓄積するものなので、「OK」を選定します。 f:id:sbc_ohara:20201124102805p:plain

LogStore Nameをここでは「stock_logstore」と入力し、LogStoreを作成します。 f:id:sbc_ohara:20201124103013p:plain

その後、「LogStoreが作成されました。今すぐデータアクセスしますか?」とポップアップが出ますが、これは必要に応じて選定すると良いです。 ちなみに「Yes」を選択した場合、50を超える様々なデータアクセス手法のコンソールが表示されます。 f:id:sbc_ohara:20201124103134p:plain


データ格納について

STEP1: Google Apps Script(GAS)の設定、Webスクレイピングでデータ格納

今回はGoogle Apps Script(GAS)を使って、LogServiceへデータを格納します。 サンプルがあるので、これを参考に作成します。 (中国語なので気合を入れてGoogle翻訳しながら読んでみました)

github.com

流れとしては、①GASで新規プロジェクトを作成 ②外部ライブラリであるaliyun-sdkをGASで使えるようにする になります。
ここではclaspというnpmパッケージをローカル環境にグローバルインストールし、ローカルで開発(webpackをbuild)したものをGASにpushします。(gas-clasp-starterでも良いです。)

developers.google.com

github.com

まずはwebブラウザ側にてGASで新規プロジェクトを作成、および以下のソースコードを記載します(必要なライブラリが入ってないため、まだ実行できません)

function run_stocks_put() {
  var ENDPOINT = '<your endpoint>'
  var ACCESSKEYID = '<your ACCESSKEYID>'
  var ACCESSKEY = '<your ACCESSKEY>'  
  var PROJECT = 'tecblog'
  var LOGSTORE = 'stock_logstore'
  var TOKEN = ""
  var topic = '9984_stock'
  var source = '127.0.0.1'

  var sls = new ALY.SLS({
    "accessKeyId": ACCESSKEYID,
    "secretAccessKey": ACCESSKEY,
    "securityToken" :"tokens",
    endpoint: ENDPOINT,
    apiVersion: '2015-06-01'
  });

  var contents = {
    logs : [{
      time:  get_current_time(),
       contents: [{
            key: 'stock',
            value: fetch_stocks_data()
        }]
    }],
    topic: topic,
    source: source
  };
  
  sls.putLogs({
    projectName: PROJECT,
    logStoreName: LOGSTORE,
    logGroup: contents
  }, function (err, data) {
    if (err) {
      console.log('error:', err);
      return;
    }
    console.log('success:', data);
  });
}


function fetch_stocks_data() {  
  var url = "https://stocks.finance.yahoo.co.jp/stocks/detail/?code=9984.T";
  var m, strStocks, reg = /<td class="stoksPrice">.*?<\/td>/g
  const content = UrlFetchApp.fetch(url).getContentText('UTF-8');            
  
  while (m = reg.exec(content)) {
    Logger.log(m[0]);
    strStocks = m[0].replace('  <td class="stoksPrice">', ""); 
    strStocks = strStocks.replace('</td>', ""); 
    strStocks = strStocks.replace(',', ""); 
    return strStocks;
  }  
}

function get_current_time() {
    var now = new Date();
    return  "" + now.getFullYear() + "/" + padZero(now.getMonth() + 1) + 
        "/" + padZero(now.getDate()) + " " + padZero(now.getHours()) + ":" + 
        padZero(now.getMinutes()) + ":" + padZero(now.getSeconds());
}

f:id:sbc_ohara:20201226130832p:plain

続いて、ローカルでclaspを導入します。

PS C:\Users\1200358> npm i @google/clasp -g

f:id:sbc_ohara:20201228160450p:plain

ターミナルからGoogleにログインします。

PS C:\Users\1200358> clasp login

f:id:sbc_ohara:20201228202940p:plain

Google Apps Script APIを開いて、Google Apps Script APIを有効にします。

script.google.com

f:id:sbc_ohara:20201228153907p:plain

ターミナルにて、GASの既存プロジェクトをCloneします。そのために、.clasp.json を開いてScript IDを設定します。Script IDはGASを実行するIDのことで、URLに付帯されています。以下画像の赤枠がIDです。
.clasp.json は インストール時にPathが表示されています。ここでは、/User/<your name>/.clasp.jsonにあります。

f:id:sbc_ohara:20201226133138p:plain

{
  "scriptId": "1_AsyvNcAcgs6aQQuytdLlYK-7Lh012bvhlWXo0PiNQfgf-8znF9Ytu9t",
  "rootDir": "dist"
}

既存プロジェクトをCloneします。

PS C:\Users\1200358> clasp clone xxxSCRIPT-ID1_AsyvNcAcgs6aQQuytdLlYK-7Lh012bvhlWXo0PiNQfgf-8znF9Ytu9t"

ここで、入れたいモジュールをインストールします。このモジュールは、上記のPJにて、webpackとしてビルドしたものをclasp経由でGASでも使えるようにします。

PS C:\Users\1200358> npm install aliyun-sdk

ソースコードにindex.jsを入れます。

var ALY = require('../../index.js');

ローカル開発で作ったものをGAS側へ反映します。

PS C:\Users\1200358> clasp push

あとはGASのマイトリガーで株価が出てる時間帯に合わせて、10分おきにデータを取得します。
ちなみに東証で株式市場が開いているのは平日 9:00 - 15:00のみです。

(しばらくして、、)データが無事登録されてることを確認しました。

f:id:sbc_ohara:20201229095407p:plain

ちなみにWebスクレイピングはサイトによっては著作権ら法律があるので、事前に注意をチェックのうえご対処願います。
それでも大量のデータを連続取得したい場合は、(相手のサーバ負荷を考慮して)kabuというサイトなどで取得したほうが良いと思います。

kabu.plus


STEP2: 文字型データを数値型データへ変換する(データクレンジング、ETL)

f:id:sbc_ohara:20201229101022p:plain

取得したデータは基本的に「文字型」として登録されます。なので、自らData Transformationでデータの変換処理をする必要があります。
この例の場合、「STOCK」フィールドが文字型なので、Data Transformationの以下の関数を使って「int_STOCK」という数値型フィールドを新規作成します。
※Queryのcast文により文字型→数値型変換ができるため、このケースでのData Transformation作業は基本的に不要ですが、他のデータ、例えば数値に+9,412 といった +, がついてるものを除外したい、株価以外のデータから特定の文字を抽出or置換したい場合はこちらを参考にしてください。

e_set("int_STOCK", ct_int(v("STOCK")))

f:id:sbc_ohara:20201229101627p:plain

この数値型へ変換するルールを継続的に利用する場合は、ETLルールとして「Transformation Rule」を保存し登録、別のLogStoreへETLしアウトプットする必要があります。
※出力する「別のLogStore」は事前にて同じプロジェクト配下で新しく作成し、「Enable」ボタンを押してLogStoreをアクティベーションすると後がスムーズです。

f:id:sbc_ohara:20201228205552p:plain

ルール作成後、ETL処理が実行されます。ETL対象(日付、処理内容)によっては時間がかかります。進捗ステータスはこちらで確認できます。

f:id:sbc_ohara:20201228210533p:plain

ETL処理が終わったら、別のLogStoreである「stock_logstore_etl」にて処理結果が出力されます。

f:id:sbc_ohara:20201229102015p:plain


STEP3:Index機能の有効化、チャートを作ってみる

株価は折れ線チャートです。これはすぐできると思います。 先に、取得したデータの各フィールドを含め、Index機能を有効化します。Index化することで、LogStoreの中にあるデータを素早く検索できるようになり、結果としてチャートをリアルタイムに可視化することが出来ます。

www.alibabacloud.com

[Index Attributes] > [Modify] でIndex登録の作業をします。

f:id:sbc_ohara:20201228224136p:plain

[Automatically Generate Index Attributes]をクリックし、フィールドタイプ(数値型、テキスト型、json型 etc)を選定しOKをクリックします。
その後はLogStoreに入ってる過去データ/これから発生するデータに反映するようにします。

f:id:sbc_ohara:20201228224859p:plain

以上で、各フィールドはIndex化が出来たと思います。
ここまでの作業が問題なければ、Queryバーにて以下Query文を入力し[Search &Analyze ]をクリックすると、以下画面左下の[Data Preview]にてデータが出力されます。

* | select date_parse(DATE_ID,'%Y/%m/%d %H:%i') as Date,  cast(STOCK AS bigint) as Stock

f:id:sbc_ohara:20201229104350p:plain

上記SQL文についてを説明します。LogServiceのQueryは RDBなどで使われるT-SQLとは異なって全文検索をフォーカスとしたQueryとなります。もちろん読み取り専用(Select文のみ)です。詳しくは以下helpを参考にしてみてください。

www.alibabacloud.com

* | select date_parse(DATE_ID,'%Y/%m/%d %H:%i') as Date,  cast(STOCK AS bigint) as Stock

date_parse(DATE_ID,'%Y/%m/%d %H:%i') as Dateは、「DATE_ID」フィールドを指定のタイムスタンプ形式で表示します。
以下のドキュメントの下の方に、Date/Timeのフォーマット設定方法がありますので、参考にしてみてください。

www.alibabacloud.com

cast(STOCK AS bigint) as Stock は 「STOCK」フィールドが文字型なので、数値型へ変換します。

www.alibabacloud.com

上記のQuery文によりデータが見れるようになったので、[Properites] タブをクリックし、グラフとして表示したい [X Axis] と [Left Y Axis] などのフィールドを設定します。
これにより、チャートが見えるようになるはずです。上記は折れ線グラフですが、他のグラフもありますので色々試してみるといいでしょう。

f:id:sbc_ohara:20201228225633p:plain

作成したグラフをDashboardへ出力します。
Dashboardは上記のようなQuery結果をいつでも可視化できるメリットがあります。その他、複数ユーザによる閲覧や、リアルタイム可視化などいろいろありますが、ここは説明を省きます。

www.alibabacloud.com

[Add to New Dashboard]をクリックし、任意のDashboardへ追加します。

f:id:sbc_ohara:20201228225738p:plain

Dashboardにて、先ほどのグラフが表示されました。以降のStepはこのDashboardを使って説明します。

f:id:sbc_ohara:20201228225842p:plain


STEP4:LogServiceで異常検知・予測する

やっと本題に入ります。(ここまで1万2千文字Over、見づらくなりすいません、、)

LogServiceは元々SIEMをメインとしたサービスですが、上記のようなチャートで可視化する箇所が多ければユーザーは追い付かなくなります。そのため、様々な時系列分析アルゴリズムによる異常検知・時系列の予測を行うことができます。

異常検知は以下のように色々なパターンがありますが、今回は株価なので、変化点検知をメインとした異常検知をします。この場合、変化点検出関数ts_cp_detectを使います。

f:id:sbc_ohara:20201229104930p:plain

時系列分析アルゴリズムによる関数の一覧はこちらにてあります。

www.alibabacloud.com

Dashboardにて、[edit]ボタンを押しながら[chart]をダブルクリックします。すると、以下のようなチャートのEdit画面らモーダルウィンドウが表示されます。
このStepではDashboardのモーダルウィンドウ内を使って説明します。

f:id:sbc_ohara:20201228231210p:plain

チャートで日付フィールドが長いためかなり見にくいと思いますので、以下のQuery文を使って[DATE_ID]フィールドから「年(year)」を削除します。[DATE_ID]フィールドのスリム化です。

* | select substr(DATE_ID,6,length(DATE_ID)-5) as dt, cast(STOCK AS bigint) as stock

f:id:sbc_ohara:20201229003918p:plain

これで見えやすくなったと思います。
続いて、変化点検出関数ts_cp_detectを使って、株価の変化点を検知するようにします。   

www.alibabacloud.com

Query文は以下の通りです。 ts_cp_detect関数で、第一引数はunixtime、第二引数は変化を検知したい値、第三以降のパラメータは任意入力ですが 検知したい閾値やスパン、サンプル値などを入力します。詳しくは上記のhelpにて記載されています。

* | select ts_cp_detect(to_unixtime(dt), stock , 3 ) from
 ( select date_parse(DATE_ID,'%Y/%m/%d %H:%i') as dt, cast(STOCK AS bigint) as stock 
from log )

f:id:sbc_ohara:20201229013908p:plain

これで表示されました。srcは値で、probは変化検知のための変化値を示します。probが 1 であれば、変化あり(=変化・異常)と示します。0なら変化なしです。
ここで一つ問題があります。ts_cp_detect関数の返却値として、timestampフィールドがそのまま返却されます。これでは時系列データとして見にくいので、これをyyyy/mm/dd hh:mm形式に変換したい。そういう場合は以下のQuery文を入力します。

* | select from_unixtime(res.timestamp) as date_dt_from_unixtime , res.src as src, res.prob as prob
 from ( 
select ts_cp_detect(to_unixtime(dt), stock , 3 ) as res from
 ( select date_parse(DATE_ID,'%Y/%m/%d %H:%i') as dt, cast(STOCK AS bigint) as stock 
from log )
)

チャートにて、異常検知の時の赤丸(=probフィールドが1 )を表示するために、以下設定します。

f:id:sbc_ohara:20201229023831p:plain

これで設定完了です。株価をリアルタイム可視化、異常検知することもできます。

f:id:sbc_ohara:20201229023649p:plain

今度は予測機能を入れてみます。ts_predicate_arma関数で、自己回帰移動(ARIMA)モデルを使用して時系列データをモデル化し、簡単な時系列予測と異常検出を実行します。

www.alibabacloud.com

ts_predicate_arma関数で、第一引数はunixtime、第二引数は変化を検知したい値、第三以降は任意ですが 検知したい閾値やスパン、サンプル値などを入力します。詳しくは上記のhelpにて記載されています。

* | select ts_predicate_arma(to_unixtime(dt), stock,10, 2, 2, 1, 'max' ) as res from
 ( select date_parse(DATE_ID,'%Y/%m/%d %H:%i') as dt, cast(STOCK AS bigint) as stock 
from log )

これで表示されました。srcは値で、predictは予測値、upper/lowerは信頼区間、anomaly_probは異常検知のための変化値を示します。anomaly_probが1であれば、変化あり(=異常)と示します。
今は株価が上昇傾向なので問題ないのですが、低下時 緑色のラインが下へ向くので、そこから動向の予測ができます。

f:id:sbc_ohara:20201229031639p:plain

LogServiceの機械学習は以下のアルゴリズムが梱包されていますので、目的に応じて色々試すのもありと思います。

f:id:sbc_ohara:20201229110223p:plain

余談ですが、Alibaba CloudのほうでLogServiceによる異常検知のdemoのサイトがありますので、こちらも参考にいただければ幸いです。

http://47.96.36.117/redirect.php?spm=5176.11777732.1159318.3.21e17a34549Qdq&type=26&amp;amp;amp;amp;amp;amp;amp;amp;redirect=true

f:id:sbc_ohara:20201226123505p:plain


STEP5: 監視・株価が急激に低下したときアラート発信する

helpにて、[Case study:トラフィックが大幅に低下したときに検知するクエリ] がありますので、これを参考にアラートを作成します。

Case study - Index and query| Alibaba Cloud Documentation Center

* | SELECT sum(stock) / (max(to_unixtime(date_parse(DATE_ID,'%Y/%m/%d %H:%i'))) - min(to_unixtime(date_parse(DATE_ID,'%Y/%m/%d %H:%i')))) as stock_per_minute, date_trunc('minute',date_parse(DATE_ID,'%Y/%m/%d %H:%i')) as minute group by minute

f:id:sbc_ohara:20201229111738p:plain

f:id:sbc_ohara:20201229111511p:plain

あとは不本意ですがメールが届かない(=株が暴落しない)ことを祈りましょう。

f:id:sbc_ohara:20201229034342p:plain


完了

Google Apps Script(GAS)で株価データを取得しLogServiceへ格納、およびLogServiceの機械学習機能を使って、株価の予測および異常検知を簡単に説明しました。
また、株価が低下したときはメールによるアラートを発信することが出来ます。
(今回、ターゲットとなる株価で暴落がなかったため、メール受信できず。なので暴落がきたときはメールのスクショ画像を含め、このblogを追記更新します)
これにより、いつでもどこでも可視化はもちろん、異常検知や予測、監視が出来ることから目視作業を大幅に減らすことが出来ます。

(懲りずテンプレな記載で恐縮ですが)
LogServiceはフルマネージド環境でありながら、様々なデータを収集し蓄積・可視化する事が可能です。
加えて、データ量や使い方に応じた課金なので、使い方次第ではコスト削減や、運用負荷の改善に効果があるのでは無いでしょうか。

最後までお読みいただきありがとうございました。