SBクラウド株式会社logo

【Hologres連載】FunctionComputeでHologresのジョブスケジューラを設定する方法

Hi, データエンジニアの大原です。
今回はAlibaba Cloudの国際サイトで提供している Hologres で、FunctionComputeによるジョブスケジューラを設定する方法をご紹介します。

f:id:sbc_ohara:20210316010930p:plain

Hologresとは

Hologres はリアルタイムのインタラクティブ分析サービスです。高い同時実行性と低いレイテンシーでTB、PBクラスのデータの移動や分析を短時間で処理できます。PostgreSQL11と互換性があり、データを多次元で分析し、ビジネスインサイトを素早くキャッチすることができます。

www.sbcloud.co.jp

少し前になりますが、Hologresについての資料をSlideShareへアップロードしていますので、こちらも参考になればと思います。

www.slideshare.net

Hologresのジョブスケジューラについて

Hologresのスケジュールジョブに関する公式ソリューションは、主にDataWorksをベースとしています。HoloStdio、というHologresをDataWorksでハンドリングする機能がありますが、国際サイトでは、HologresとDataWorks(HoloStdio)がまだリリースされていないため、他の方法で導入する必要があります。
Hologresは、分散コンピューティングノードに最適化された大規模なストレージと優れたクエリ機能を、低コスト、高性能、高可用性で提供するよう設計されています。リアルタイムのデータウェアハウスソリューションとリアルタイムのインタラクティブなクエリサービスを提供します。   
つまり、Hologresに関連するスクリプトやSQL文を定期的に実行するためのトリガーを構築することが今回のポイントとなります。     

前提条件

・Alibaba Cloudのアカウントを持っている
・HologresとFunction Computeを有効化している
・Hologresのインスタンスを所持している


FunctionCompute によるソリューション(Java版)

Alibaba Cloud Function Computeは、フルマネージドなイベントドリブンコンピューティングサービスです。サーバレスとして運用ですが、 time trigger を設定することで、指定した時間に自動的に関数を起動することができます。

www.alibabacloud.com

以下の手順では、Java関数に基づいて、10分ごとにHologresパーティションテーブルにデータを挿入するスケジュールジョブを構築します。

STEP1: Hologresにてテーブルを準備

Hologresでorder_schedule_partitionという名前のテーブルを、order_timeというカラムをパーティションキーとして作成します。

f:id:sbc_ohara:20210716211047p:plain

f:id:sbc_ohara:20210716211059p:plain

f:id:sbc_ohara:20210716211108p:plain

STEP2: FunctionComputeでサンプルコードを準備

FunctionComputeは多くのプログラミング言語 をサポートしています。ここでは「Java」を例として構築します。
Javaで構築する場合、「fc-java-core」パッケージを使って、「FunctionCompute」が提供する定義済みのハンドラを実装する必要があります。 より詳しい情報はFunctionComputeによる開発ガイドライン を参照してください。

www.alibabacloud.com

FC (Function Compute)ランタイムパッケージの他に、HologresデータベースにSQLステートメントを送信するためには、Hologres connectorが必要です。
HologresはPostgreSQL 11に対応しているので、一般的なPostgreSQL JDBCドライバ がサポートされていますが、Hologres専用に開発されたDriverを利用することもできます。

mvnrepository.com

新しいMavenプロジェクトを作成し、pom.xmlのdependencyセクションを以下のように更新してください。

......
    <dependencies>
        <dependency>
            <groupId>com.aliyun.fc.runtime</groupId>
            <artifactId>fc-java-core</artifactId>
            <version>1.4.0</version>
        </dependency>

        <dependency>
            <groupId>com.aliyun.fc.runtime</groupId>
            <artifactId>fc-java-common</artifactId>
            <version>2.2.2</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba.hologres</groupId>
            <artifactId>postgresql-holo</artifactId>
            <version>42.2.18.4</version>
        </dependency>
    </dependencies>
......

ターゲットパッケージをビルドするには、maven-assembly-pluginも必要です。
以下は設定例です。詳しくはJava runtime environmentをご参照ください。

www.alibabacloud.com

......
    <build>
        <plugins>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.1.0</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                    <appendAssemblyId>false</appendAssemblyId> <!-- this is used for not append id to the jar name -->
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id> <!-- this is used for inheritance merges -->
                        <phase>package</phase> <!-- bind to the packaging phase -->
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
......

上記設定で問題なければ、上記ScriptからHologresデータベースに接続されると、必要に応じて2つのオペレーションが実行されます。

www.alibabacloud.com

www.alibabacloud.com

関連する操作を行うために、プロジェクト内に新しいJavaクラスを作成します。

public class App implements StreamRequestHandler {
    @Override
    public void handleRequest(InputStream inputStream, OutputStream outputStream, Context context) throws IOException {
        try {
            // Connection URL, please update based on your own settings
            String url = "jdbc:postgresql://{ENDPOINT}:{PORT}/{DBNAME}?user={ACCESS_ID}&password={ACCESS_KEY}";
            Connection conn = DriverManager.getConnection(url);
            SimpleDateFormat df = new SimpleDateFormat("yyyyMMddHHmm");
            String orderTime = df.format(new Date());
            String table_sql = "create table public.order_schedule_partition_" + orderTime + " partition of public.order_schedule_partition for values in('" + orderTime + "')";
            
            // Operation 1: Create partition table
            Statement st = conn.createStatement();
            int update_row = st.executeUpdate(table_sql);
            context.getLogger().info("Create partition table done! " + update_row);
            
            // Operation 2: Insert random data
            String data_sql = "insert into order_schedule_partition_" + orderTime + " (order_time, order_id, book_id, book_name, price) VALUES" +
                    "(?, ?, ?, ?, ?)";
            PreparedStatement pst = conn.prepareStatement(data_sql);
            Random random = new Random();
            for (int i = 0; i < 10; ++i) {
                int j = 1;
                pst.setString(j++, orderTime);
                pst.setString(j++, UUID.randomUUID().toString());
                pst.setInt(j++, random.nextInt(100));
                pst.setString(j++, UUID.randomUUID().toString());
                pst.setInt(j++, random.nextInt(50));
                int related_row = pst.executeUpdate();
                context.getLogger().info("Insert target row! times: " + i);
            }
            /*conn.commit();*/ // Auto Commit by default
            st.close();
            pst.close();
            conn.close();
        } catch (SQLException throwables) {
            throwables.printStackTrace();
        }
    }
}

プロジェクトのルートで mvn clean package コマンドを実行すると、targetフォルダにビルドされたJARパッケージができます。

D:\Development\java\workspace\fc-hologres-schedule>mvn clean package
[INFO] Scanning for projects...
......
[INFO]
[INFO] ------------------< org.example:fc-hologres-schedule >------------------
[INFO] Building fc-hologres-schedule 1.0-SNAPSHOT
[INFO] --------------------------------[ jar ]---------------------------------
[INFO]
......
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  7.587 s
[INFO] Finished at: 2021-07-13T14:39:46+08:00
[INFO] ------------------------------------------------------------------------

STEP3: FunctionComputeにて、処理内容を設定

Alibaba CloudでFunctionComputeサービスが有効になっていることを確認してください。
初めてFunction Computeにアクセスする場合は、以下のポップアップウィンドウでクラウドリソースの認証を行ってください。

f:id:sbc_ohara:20210716212106p:plain

f:id:sbc_ohara:20210716212116p:plain

FunctionComputeにてイベント処理したい内容としてFunctionを作成します。

f:id:sbc_ohara:20210716212713p:plain

f:id:sbc_ohara:20210716212722p:plain

f:id:sbc_ohara:20210716212730p:plain

上記で作成したJARパッケージを用いて関数を作成しますが、その際、Function Handlerの設定に注意する必要があります。

Handlerは「example.HelloFC::handleRequest」という形で定義されます。Handler "example.HelloFC::handleRequest "は、exampleパッケージのHelloFC.javaに "handlerRequest "というメソッドがあることを意味します。

f:id:sbc_ohara:20210716212739p:plain

f:id:sbc_ohara:20210716212802p:plain

invoke ボタンをクリックすると、上記作成したScriptを手動でテストすることができます。

f:id:sbc_ohara:20210716212943p:plain

f:id:sbc_ohara:20210716212952p:plain

f:id:sbc_ohara:20210716213001p:plain

STEP4: time triggerの作成

FunctionCompute の Time Trigger を使えば、定期的にターゲットスクリプトを実行することができます。
Time Trigger は、interval minutescron expression に基づいて作成することができます。タイマー設定の詳細については、time expressionsを参照してください。

www.alibabacloud.com

www.alibabacloud.com

f:id:sbc_ohara:20210716213042p:plain

f:id:sbc_ohara:20210716213051p:plain

The script will be run based on timer settings after the time trigger is created successfully. In the example, cron expression 0 0/10 * * * ? will call the script every 10 minutes.

Time Trigger の作成が成功すると、タイマー設定に基づいてスクリプトが実行されます。
この例では、cron式の 0 0/10 * * ? が10分ごとにスクリプトを呼び出します。

f:id:sbc_ohara:20210716213110p:plain

f:id:sbc_ohara:20210716213120p:plain

スケジュールが使用できなくなった場合、タイムトリガーを無効にすることができます。
そうすれば、スクリプトが自動的に呼び出されることはありません。

f:id:sbc_ohara:20210716213132p:plain


FunctionCompute によるソリューション(Python 版)

上記は FunctionCompute を Java版 として構築しました。今度はPythonを使って、同じ目的を達成するための構築方法を紹介します。

STEP1: Hologresにてテーブルを準備

Hologresでorder_schedule_partitionという名前のテーブルを、order_timeというカラムをパーティションキーとして作成します。
上記Java版で既に作成済なら、それを使いまわすこともできます。

f:id:sbc_ohara:20210716211047p:plain

f:id:sbc_ohara:20210716211059p:plain

f:id:sbc_ohara:20210716211108p:plain

STEP2: FunctionComputeでサンプルコードを準備

Psycopgは、Pythonプログラミング言語用のPostgreSQLデータベースアダプターです。HologresはPostgreSQL 11に対応しているので、Psycopgを使ってHologresに接続する ことができます。

www.alibabacloud.com

www.psycopg.org

以下サンプルコードを作成し、index.pyとして保存します。

import psycopg2
import uuid
import time
import random


def handler(event, context):
    # Connect to the database, please update based on your own settings
    connection = psycopg2.connect(host=<ENDPOINT>, port=<PORT>, dbname=<DBNAME>, user=<ACCESS_ID>, password=<ACCESS_KEY>)
    order_time = time.strftime('%Y%m%d%H%M', time.localtime(time.time()))
    cur = connection.cursor()

    # Operation 1: Create partition table
    cur.execute(
        "create table public.order_schedule_partition_{0} partition of public.order_schedule_partition for values in('{0}');".format(
            order_time))
    connection.commit() # commit manually

    # Operation 2: Insert random data
    for i in range(5):
        cur.execute("""INSERT INTO order_schedule_partition_{0} (order_time, order_id, book_id, book_name, price) VALUES
                    ('{0}', '{1}', {2}, '{3}', {4});""".format(order_time, uuid.uuid1(), random.randint(1, 100), uuid.uuid1(),
                                                         random.randint(1, 100)))
    connection.commit() # commit manually
    cur.close()
    connection.close()
    return 'Success'

STEP3: FunctionComputeにて、処理内容を設定

上記URLリンクでも紹介していますが、 psycopg2は FunctionCompute のPython3 実行環境の組み込みモジュールの1つではないです。そのため、カスタムモジュールを使用しながら、カスタムモジュールとコードをパッケージ化して FunctionCompute へアップロード する必要があります。

www.alibabacloud.com

これには2つの対処方法があります。

PIPらパッケージマネージャーを使って依存関係を管理
fun install コマンドを使って、依存関係をインストール

今回はECS上のサーバー(CentOS)で Funcraft によって、psycopg2を含む実行環境を構成する方法を説明します。

www.alibabacloud.com

最新のFuncraftパッケージを release page から入手して、ECSサーバーにダウンロードします。

curl -o fun-linux.zip https://funcruft-release.oss-accelerate.aliyuncs.com/fun/fun-v3.6.23-linux.zip

Funcraftをインストールし、バージョン情報を確認します。

www.alibabacloud.com

f:id:sbc_ohara:20210716213926p:plain

「Hologres」という名前のディレクトリを作成し、fun install init コマンドを実行して FunctionCompute のためのFunctionルートディレクトリを初期化します。これにより、Funfile という新しいファイルが生成されます。

f:id:sbc_ohara:20210716213940p:plain

f:id:sbc_ohara:20210716213950p:plain

以下のように Funfile の構成を更新し、カスタムモジュールpsycopg2を設定します。

RUNTIME python3
RUN fun-install pip install psycopg2

ルートディレクトリに template.yml という名前のファイルを作成し、対象となるサービスや機能の情報を定義します。
詳しくはtemplate.yml introductionをご覧ください。

github.com

以下の内容は、現在のFunctionが bob_demo というサービスの下に hologres_schedule_py という名前でデプロイされることを示しています。

ROSTemplateFormatVersion: '2015-09-01'
Transform: 'Aliyun::Serverless-2018-04-03'
Resources:
  bob_demo:  
    Type: 'Aliyun::Serverless::Service' 
    hologres_schedule_py:    
      Type: 'Aliyun::Serverless::Function'   
      Properties:     
        Handler: index.handler     
        Runtime: python3     
        CodeUri: './'

.env という名前のファイルを新たに作成し、Alibaba Cloudのアカウント情報をconfig Funcraftに追加します。

www.alibabacloud.com

ACCOUNT_ID=xxxxxxxx
REGION=ap-northeast-1
ACCESS_KEY_ID=xxxxxxxxxxxx
ACCESS_KEY_SECRET=xxxxxxxxxx
FC_ENDPOINT=https://{accountid}.{region}.fc.aliyuncs.com
TIMEOUT=10
RETRIES=3

f:id:sbc_ohara:20210716214009p:plain

function のルートディレクトリで fun install コマンドを実行して、依存関係をインストールします。
docker serviceがインストールされます。インストール後、これが正常に動作していることを確認してください。
なお、インストールにはマルチステージビルドを使用しているため、dockerのバージョンは17.05以降である必要があります。

f:id:sbc_ohara:20210716214020p:plain

f:id:sbc_ohara:20210716214028p:plain

f:id:sbc_ohara:20210716214036p:plain

FunCraftを使って、fun deploy -yというコマンドで function をデプロイします。
途中で「Both project and logstore are required for enabling instance metrics. ( インスタンスメトリクスを有効にするには、プロジェクトとログストアの両方が必要です) 」 というエラーメッセージが表示された場合は、サービスの設定で「Request-level Metrics」と「Instance Metrics」を無効にする必要があります。

f:id:sbc_ohara:20210716214054p:plain

f:id:sbc_ohara:20210716214103p:plain

f:id:sbc_ohara:20210716214115p:plain

f:id:sbc_ohara:20210716214125p:plain

あとは手動でコンソールのFunctionを起動し、関連するログや結果を確認します。

f:id:sbc_ohara:20210716214151p:plain

f:id:sbc_ohara:20210716214204p:plain

f:id:sbc_ohara:20210716214213p:plain

STEP4: time triggerの作成

上記、Java版でも説明していますが、 Time Triggerを作成して、スケジュールジョブの設定を行います。

f:id:sbc_ohara:20210716214226p:plain

f:id:sbc_ohara:20210716214236p:plain

The script will be run every 10 minutes based on the created time trigger. It could be disable as well.

この Script は、作成されたタイムトリガーに基づいて10分ごとに実行されます。無効にすることもできます。

f:id:sbc_ohara:20210716214249p:plain

f:id:sbc_ohara:20210716214258p:plain


最後に

ここまで、FunctionComputeでJavaもしくはPythonを使ってHologresのジョブスケジューラを設定する方法を紹介しました。
PostgreSQL 11と高い互換性があるHologresと、FunctionComputeを使えば、DataWorks無しでもジョブスケジューラなどを色々構築、運用することができます。

Special Thanks, Bob