【ClickHouse連載】Apache SparkからClickHouseへデータをリアルタイム格納してみる

Hi, データエンジニアの大原です。
今回はAlibaba Cloudの国際サイトで提供している ClickHouse を使って、Apache SparkからClickHouseへデータ連携する方法をご紹介します。

f:id:sbc_ohara:20210714233458p:plain

ClickHouseとは

ClickHouseは非集計データを含む大量のデータを安定的かつ継続しながら集計といったリアルタイム分析を支える列指向の分散型データベースサービスです。 トラフィック分析、広告およびマーケティング分析、行動分析、リアルタイム監視などのビジネスシナリオで幅広く 使用されています。
ApsaraDB for ClickHouseの概要として、詳しいことはこの記事にてまとめています。

www.sbcloud.co.jp

clickhouse.tech

Apache Sparkとは

オープンソースのビッグデータと機械学習のための非常に高速な分散処理フレームワークです。 Apache SparkはE-MapReduceやDataLake Analytics、MaxComputeなどにて付帯しています。

spark.apache.org

少し前になりますが、Apache Sparkを含む、E-MapReduceについての資料をSlideShareへアップロードしていますので、こちらも参考になればと思います。

www.slideshare.net

本記事では、Apache SparkからClickHouseへデータをリアルタイム格納してみます。構成図で次の通りです。

f:id:sbc_ohara:20210824160415p:plain

1.ClickHouseClientの準備

1-1.ClickHouseインスタンスを準備します

この手法は過去の記事でも記載していますが、再掲として記載します。

www.sbcloud.co.jp

1)まずはApsaraDB for ClickHouseインスタンスを作成します。
①VPCを作成

f:id:sbc_ohara:20210716155036p:plain

f:id:sbc_ohara:20210716155113p:plain

②ClickHouseインスタンスを作成
著者は以下のインスタンススペックでインスタンスを作成しています。

ClickHouse version:20.8.7.15
Edition:Single-replica Edition

f:id:sbc_ohara:20210716155212p:plain

f:id:sbc_ohara:20210716155220p:plain

f:id:sbc_ohara:20210716155231p:plain

2)ClickHouseの登録アカウントを作成 インスタンスをクリックし、左側にアカウント管理画面で、アカウントを作成します

f:id:sbc_ohara:20210716155256p:plain

3)ClickHouseクラスターにDMSで接続 ①ClickHouseのインスタンスをクリックし、トップメニューの「Log On to Database」をクリックします

f:id:sbc_ohara:20210716155315p:plain

② DBアカウントとパスワードを入力し、ClickHouseへログイン

f:id:sbc_ohara:20210716155334p:plain

③DMS画面でClickHouseのインスタンスが表示されます

f:id:sbc_ohara:20210716155401p:plain

2. Apache Spark環境の準備

2-1.IntelliJ IDEAをインストールします。(具体的な説明は本記事では省略)

1)IntelliJ IDEAを起動します

f:id:sbc_ohara:20210823200105p:plain

2-2.IntelliJ IDEAでSBTプラグインをインストールします

1)下記リンクからSBTプラグインをダウンロードします

plugins.jetbrains.com

f:id:sbc_ohara:20210823200143p:plain

2)ダウンロードされたSBTzipファイルをIntelliJ IDEAのプラグインフォルダに置き、SBTプラグインをインストールします
① メニューバー [File] > [Settings] を開きます

f:id:sbc_ohara:20210823200247p:plain

②Pluginsを選択し、Install Plugin from Disk設定メニューをクリックします

f:id:sbc_ohara:20210823200301p:plain

③プラグインを選択します

f:id:sbc_ohara:20210823204600p:plain

④SBTプラグインが表示されるのを確認します

f:id:sbc_ohara:20210823204623p:plain

⑤IDEを再起動します

f:id:sbc_ohara:20210823204644p:plain

⑥SBTプラグインがインストールされます

f:id:sbc_ohara:20210823204702p:plain

2-3.SBTインストール

SBTインストール をします。この手順はWindows環境での実行となります。
著者はMacを持っていないため、お手数ですがネットなどの情報を参考に構築いただければ幸いです。
1)下記リンクからsbt-1.3.8.zipをローカルにダウンロードします

www.scala-sbt.org

①sbt-1.3.8.zipをダウンロードします

f:id:sbc_ohara:20210823204755p:plain

f:id:sbc_ohara:20210823204805p:plain

②sbt-1.3.8.zipを解凍します

f:id:sbc_ohara:20210823204822p:plain

2)MyComputerプロパティから環境パスを設定
①SBT_HOMEを設定します

f:id:sbc_ohara:20210823204855p:plain

②SBT binパスを追加します

f:id:sbc_ohara:20210823204909p:plain

3)CMDを開き、下記コマンドでsbtを確認します

①CMDを開き、sbtを入力します

# sbt

f:id:sbc_ohara:20210823205309p:plain

f:id:sbc_ohara:20210823205318p:plain

②sbtバージョンを確認します

f:id:sbc_ohara:20210823205331p:plain

f:id:sbc_ohara:20210823205506p:plain

2-4.Intellij ideaでScalaプラグインをインストールします

1)下記リンクからscalaプラグインをインストールします
前提条件として、Intellij ideaが起動されていることが必須です

plugins.jetbrains.com

①Install to IntelliJ IDEA2020.1.1ボタンをクリックします

f:id:sbc_ohara:20210823205519p:plain

②Successと表示されます

f:id:sbc_ohara:20210823205532p:plain

③IDEAでscalaのインストール画面が表示されるのを確認します

f:id:sbc_ohara:20210823205542p:plain

④Scalaをインストールします

f:id:sbc_ohara:20210823205552p:plain

⑤インストール後、IDEAでFile‐New‐Projectプロジェクトの作成画面にScalaメニューが追加されます

f:id:sbc_ohara:20210823205625p:plain

3.Sparkプロジェクトの準備

3-1.Sparkプロジェクトを作成します

1)Sparkプロジェクトを作成します
① メニューバーで File > New > Project をクリックします

f:id:sbc_ohara:20210823205704p:plain

②Scalaを選択し、sbtを選択します

f:id:sbc_ohara:20210823205716p:plain

③プロジェクトフォルダを選択し、JDK、sbt、Scalaを設定します

JDK:1.8
sbt:1.3.8
Scala:2.12.0

f:id:sbc_ohara:20210823205735p:plain

④プロジェクトが作成されます

f:id:sbc_ohara:20210823205757p:plain

f:id:sbc_ohara:20210823205807p:plain

3-2.Sparkプロジェクトのディレクトリを準備します

1)ディレクトリ構造

./src
./src/main
./src/main/scala
./src/main/scala/com
./src/main/scala/com/spark
./src/main/scala/com/spark/test
./src/main/scala/com/spark/test/WriteToCk.scala
./build.sbt
./assembly.sbt
./project/plugins.sbt

2)./src/main/scala/com/spark/test/WriteToCk.scalaを作成します

f:id:sbc_ohara:20210823205836p:plain

f:id:sbc_ohara:20210823205845p:plain

f:id:sbc_ohara:20210823205854p:plain

f:id:sbc_ohara:20210823205904p:plain

3)WriteToCk.scalaを編集します
①WriteToCk.scalaサンプル を次の通りに入力します

package com.spark.test

import java.util
import java.util.Properties

import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
import org.apache.spark.SparkConf
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.storage.StorageLevel

object WriteToCk {
  val properties = new Properties()
  properties.put("driver", "ru.yandex.clickhouse.ClickHouseDriver")
  properties.put("user", "<your-user-name>")
  properties.put("password", "<your-password>")
  properties.put("batchsize","100000")
  properties.put("socket_timeout","300000")
  properties.put("numPartitions","8")
  properties.put("rewriteBatchedStatements","true")

  val url = "jdbc:clickhouse://<you-url>:8123/default"
  val table = "<your-table-name>"

  def main(args: Array[String]): Unit = {
    val sc = new SparkConf()
    sc.set("spark.driver.memory", "1G")
    sc.set("spark.driver.cores", "4")
    sc.set("spark.executor.memory", "1G")
    sc.set("spark.executor.cores", "2")

    val session = SparkSession.builder().master("local[*]").config(sc).appName("write-to-ck").getOrCreate()

    val df = session.read.format("csv")
      .option("header", "true")
      .option("sep", ",")
      .option("inferSchema", "true")
      .load("</your/path/to/test/data/a.txt>")
      .selectExpr(
        "Year",
        "Quarter",
        "Month"
      )
      .persist(StorageLevel.MEMORY_ONLY_SER_2)
    println(s"read done")

    df.write.mode(SaveMode.Append).option(JDBCOptions.JDBC_BATCH_INSERT_SIZE, 100000).jdbc(url, table, properties)
    println(s"write done")

    df.unpersist(true)
  }
}

パラメータ説明:
・your-user-name:ターゲットClickHouseクラスターで作成されたデータベースアカウント名
・password:データベースアカウント名に対応するパスワード
・your-url:ターゲットClickHouseクラスターアドレス(VPCエンドポイントで設定することをお勧めする)
・/your/path/to/test/data/a.txt:インポートするデータファイルのパス(ファイルアドレスとファイル名を含む)
・your-table-name:ClickHouseクラスターのターゲットテーブル名

②WriteToCk.scalaを編集します

package com.spark.test
import java.util.Properties

import org.apache.spark.SparkConf
import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.storage.StorageLevel

object WriteToCk {
  val properties = new Properties()
  properties.put("driver", "ru.yandex.clickhouse.ClickHouseDriver")
  properties.put("user", "sbtest")
  properties.put("password", "Test1234")
  properties.put("batchsize","100000")
  properties.put("socket_timeout","300000")
  properties.put("numPartitions","8")
  properties.put("rewriteBatchedStatements","true")

  val url = "jdbc:clickhouse://cc-0iw4v4hezq9lw9333.ads.aliyuncs.com:8123/default"
  val table = "spark_table_distributed"

  def main(args: Array[String]): Unit = {
    val sc = new SparkConf()
    sc.set("spark.driver.memory", "1G")
    sc.set("spark.driver.cores", "4")
    sc.set("spark.executor.memory", "1G")
    sc.set("spark.executor.cores", "2")

    val session = SparkSession.builder().master("local[*]").config(sc).appName("write-to-ck").getOrCreate()

    val df = session.read.format("csv")
      .option("header", "true")
      .option("sep", ",")
      .option("inferSchema", "true")
      .load("oss://spark-clickhouse/data/access_log_csv.txt")
      .select("*")
      .persist(StorageLevel.MEMORY_ONLY_SER_2)
    println(s"read done")

    df.write.mode(SaveMode.Append).option(JDBCOptions.JDBC_BATCH_INSERT_SIZE, 100000).jdbc(url, table, properties)
    println(s"write done")

    df.unpersist(true)
  }
}

※今回はOSSに保存するtxtファイルをSparkで読み取ることを例とします
※予め、access_log_csv.txt をOSS ("oss://spark-clickhouse/data/access_log_csv.txt") にアップロードします

f:id:sbc_ohara:20210823210037p:plain

4)./build.sbt構成ファイルを編集して、依存関係を追加します
①build.sbtを下記のように編集します

lazy val sparkSettings = Seq(
  organization := "com.spark.test",
  version := "0.1",
  scalaVersion := "2.12.0",
  libraryDependencies ++= Seq(
    "org.apache.spark" %% "spark-sql" % "3.0.0" % "provided",
    "org.apache.spark" %% "spark-core" % "3.0.0" % "provided",
    "com.alibaba" % "fastjson" % "1.2.4" % "provided",
    "ru.yandex.clickhouse" % "clickhouse-jdbc" % "0.2.4"
  )
)

lazy val root = (project in file("."))
  .settings(
    sparkSettings,
    name := "sparkdemo",
    mainClass in assembly := Some("com.spark.test.WriteToCk"),
    assemblyJarName in assembly := "nancy-spark-test-WriteToCk.jar",
    assemblyMergeStrategy in assembly := {
      case PathList("javax", "servlet", xs @ _*) => MergeStrategy.first
      case PathList("javax", "inject", xs @ _*) => MergeStrategy.first
      case PathList("javax", "activation", xs @ _*) => MergeStrategy.first
      case PathList("javax", "xml", xs @ _*) => MergeStrategy.first
      case PathList("org", "apache", xs @ _*) => MergeStrategy.first
      case PathList("org", "aopalliance", xs @ _*) => MergeStrategy.first
      case PathList("org", "ow2", xs @ _*) => MergeStrategy.first
      case PathList("net", "jpountz", xs @ _*) => MergeStrategy.first
      case PathList("com", "google", xs @ _*) => MergeStrategy.first
      case PathList("com", "esotericsoftware", xs @ _*) => MergeStrategy.first
      case PathList("com", "codahale", xs @ _*) => MergeStrategy.first
      case PathList("com", "yammer", xs @ _*) => MergeStrategy.first
      case PathList("com", "fasterxml", xs @ _*) => MergeStrategy.first
      case "about.html" => MergeStrategy.rename
      case "META-INF/mailcap" => MergeStrategy.first
      case "META-INF/mimetypes.default" => MergeStrategy.first
      case "plugin.properties" => MergeStrategy.first
      case "git.properties" => MergeStrategy.first
      case "log4j.properties" => MergeStrategy.first
      case "module-info.class" => MergeStrategy.discard
      case x =>
        val oldStrategy = (assemblyMergeStrategy in assembly).value
        oldStrategy(x)
    }
  )

※assemblyMergeStrategyでJarパッケージが重複するエラーを解決します
※./assembly.sbt-> sbt assemblyでパッケージする方法ではru.yandex.clickhouseの3rdパーティーを引用することができます。sbt packageで作成したJarパッケージではclickhouseの3rdパーティーには含まれません。

f:id:sbc_ohara:20210823210233p:plain

5)./assembly.sbtを編集します
※assemblyプラグインのインストール方法はassembly関連のコンフィグファイルを正しく設定し、sbt updateコマンドを実行します

①assembly.sbtを下記のように編集します

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "1.0.0")
resolvers += Resolver.url("bintray-sbt-plugins", url("https://scala.jfrog.io/artifactory/sbt-plugin-releases/sbt-plugin-releases"))(Resolver.ivyStylePatterns)

※sbtとassemblyのバージョンにはご注意ください

f:id:sbc_ohara:20210823210309p:plain

6)./project/plugins.sbtを編集します

logLevel := Level.Warn
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "1.0.0")

f:id:sbc_ohara:20210823210328p:plain

7)sbt updateでsbt-assemblyをインストールします

# sbt update

f:id:sbc_ohara:20210823210357p:plain

8)プラグインを確認します
①sbt pluginsを入力します

# sbt plugins

f:id:sbc_ohara:20210823210414p:plain

②sbtassembly.AssemblyPluginがインストールされます

f:id:sbc_ohara:20210823210439p:plain

ここまでsbt-assemblyがインストール完了されたことを確認します

9)Spark依頼をインポートします

プロジェクトを正しくビルドするため、Spark依頼をインポートします
①下記リンクからspark-3.1.2-bin-hadoop3.2.tgzをダウンロードします

spark.apache.org

f:id:sbc_ohara:20210823211124p:plain

f:id:sbc_ohara:20210823211104p:plain

②spark-3.1.2-bin-hadoop3.2.tgzを解凍し、プロジェクトの右クリックメニューから依頼Jarを追加します

f:id:sbc_ohara:20210823211111p:plain

③Libraries‐NewProjectLibraryにJavaを選択します

f:id:sbc_ohara:20210823211146p:plain

④spark-3.1.2-bin-hadoop3.2のjarsフォルダを選択します

f:id:sbc_ohara:20210823211202p:plain

⑤「OK」をクリックします

f:id:sbc_ohara:20210823211221p:plain

⑥プロジェクトをBuildします

f:id:sbc_ohara:20210823211236p:plain

⑦プロジェクトが正しくビルドされたことを確認します

f:id:sbc_ohara:20210823211305p:plain

ここまで手順で問題なければプロジェクトのビルドが無事成功したと思います。

3-3.Sparkプロジェクトをパッケージします

1)下記コマンドでsbtクリアします

sbt clean

f:id:sbc_ohara:20210823211513p:plain

2)下記コマンドでパッケージを実行します

sbt assembly

f:id:sbc_ohara:20210823211556p:plain

3)nancy-spark-test-WriteToCk.jarパッケージが生成されるのを確認します

f:id:sbc_ohara:20210823211656p:plain

4. Apache SparkからtxtファイルデータをClickHouseへ格納

4-1.OSSにファイルをアップロードします

1)OSSバケットを作成します
①OSSを選択します

f:id:sbc_ohara:20210824153601p:plain

②Bucketメニューを選択します

f:id:sbc_ohara:20210824153618p:plain

③Bucket作成をクリックします

f:id:sbc_ohara:20210824153627p:plain

④デフォルト設定でBucketを作成します

f:id:sbc_ohara:20210824153640p:plain

2)Step2-3で生成されたnancy-spark-test-WriteToCk.jarファイルをOSSにアップロードします
①フォルダ作成ボタンをクリックし、jarフォルダを作成します

f:id:sbc_ohara:20210824153652p:plain

②jarsフォルダに遷移し、Uploadメニューをクリックし、ファイルをアップロードします

f:id:sbc_ohara:20210824153703p:plain

3)txtファイルをOSSにアップロードします
①サンプルファイルを準備します

"id","user_name","age","city","access_url"
1,tick,32,shanghai,http://xdbdsd.com/xgwgwe
2,wangl,22,beijing,http://ghwbw.com/xgwgwe
3,xiaoh,23,shenzhen,http://holko.com/xgwgwe
4,jess,45,hangzhou,http://jopjop.com/xgwgwe
5,jack,14,shanghai,http://wewsd.com/xgwgwe
6,tomy,25,hangzhou,http://sbedr.com/xgwgwe
7,lucy,45,shanghai,http://ghhwed.com/xgwgwe
8,tengyin,26,shanghai,http://hewhe.com/xgwgwe
9,cuos,27,shenzhen,http://yoiuj.com/xgwgwe
10,wangsh,37,shanghai,http://hhou.com/xgwgwe

f:id:sbc_ohara:20210824153714p:plain

② Create Folderをクリックし、dataフォルダを作成します

f:id:sbc_ohara:20210824153723p:plain

③ dataフォルダに遷移し、Uploadメニューをクリックし、ファイルをアップロードします
※data保存パスはCodeに書いてあるファイルパスと同じです

f:id:sbc_ohara:20210824153751p:plain

4-2.ClickHouseでテーブルを作成します

1)DMSでClickhouseを接続します

f:id:sbc_ohara:20210824153803p:plain

f:id:sbc_ohara:20210824153812p:plain

f:id:sbc_ohara:20210824153821p:plain

2)ClickHouseでデフォルトDBにローカルテーブルを作成します

create table spark_table_local on cluster default (
  id UInt8,
  user_name String,
  age UInt16,
  city String,
  access_url String
)
engine = MergeTree()
order by id;

f:id:sbc_ohara:20210824153837p:plain

3)ClickHouseでデフォルトDBに分散テーブルを作成します

create table spark_table_distributed on cluster default(
  id UInt8,
  user_name String,
  age UInt16,
  city String,
  access_url String
)
engine = Distributed(default, default, spark_table_local, id);

f:id:sbc_ohara:20210824153915p:plain

4-3.EMRのSparkタスクでデータをClickHouseにインポートします

4-3-1.EMRインスタンスを作成します

①コンソール画面でEMRを検索します

f:id:sbc_ohara:20210824153934p:plain

②日本リージョンを選択し、ClusterWizardをクリックします

f:id:sbc_ohara:20210824153951p:plain

③ClusterタイプでHadoopを選択します

f:id:sbc_ohara:20210824154004p:plain

④従量課金を選択し、ClickHouseと同じVPCを設定します

f:id:sbc_ohara:20210824154017p:plain

⑤Cluster基本情報を設定し、PublicIPをオンにします

f:id:sbc_ohara:20210824154028p:plain

⑥Cluster情報を確認します

f:id:sbc_ohara:20210824154039p:plain

⑦EMRClusterを作成完成します

f:id:sbc_ohara:20210824154051p:plain

4-3-2.EMRでプロジェクトを作成します

①EMRClusterを選択し、EMR情報画面を表示します

f:id:sbc_ohara:20210824154104p:plain

②「Data Platform」をクリックします

f:id:sbc_ohara:20210824154118p:plain

③「Create Project」をクリックし、プロジェクトを作成します

f:id:sbc_ohara:20210824154131p:plain

④Projectを設定します

f:id:sbc_ohara:20210824154142p:plain

⑤プロジェクトが作成されたことを確認します

f:id:sbc_ohara:20210824154204p:plain

4-3-2.EMRでSparkJobを作成します

①Projectをクリックし、Workflows画面を表示します

f:id:sbc_ohara:20210824154216p:plain

②EditJobをクリックし、Job作成メニューをクリックします

f:id:sbc_ohara:20210824154312p:plain

③sparkジョブが作成されました

f:id:sbc_ohara:20210824155748p:plain

4-3-4.EMRでSparkJobを実行します

1)jarファイルパスを自動的に入力します
①Enter an OSS pathメニューでOSSバケットを選択し、OSSREFを選択します

f:id:sbc_ohara:20210824155814p:plain

②nancy-spark-test-WriteToCk.jarを選択します

f:id:sbc_ohara:20210824155826p:plain

③コンソール画面でossref://spark-clickhouse/jars/nancy-spark-test-WriteToCk.jarを入力します

f:id:sbc_ohara:20210824155840p:plain

2)sparkジョブで下記コマンドを入力します

--class com.spark.test.WriteToCk --master yarn-client --driver-memory 7G --executor-memory 5G --executor-cores 1 --num-executors 32 ossref://spark-clickhouse/jars/nancy-spark-test-WriteToCk.jar

①「run」をクリックします

f:id:sbc_ohara:20210824155920p:plain

②ResourceGroupを設定します

f:id:sbc_ohara:20210824155933p:plain

③spark jobを実行します

f:id:sbc_ohara:20210824155944p:plain

④record画面でジョブ実行状態を確認します

f:id:sbc_ohara:20210824160014p:plain

4-3-5.DMSでClickHouseのデータを確認します

1)DMSでtxtデータをClickHouseにインポートすることを確認します
①分散テーブルを検索します

SELECT * FROM spark_table_distributed;
SELECT COUNT(*) FROM spark_table_distributed;

f:id:sbc_ohara:20210824160028p:plain

f:id:sbc_ohara:20210824160037p:plain

②ローカルテーブルを検索します

SELECT * FROM spark_table_local;
SELECT COUNT(*)  FROM spark_table_local;

f:id:sbc_ohara:20210824160053p:plain

f:id:sbc_ohara:20210824160102p:plain


最後に

ここまで、Apache Spark- ApsaraDB for ClickHouseへデータ連携する方法を紹介しました。
ApsaraDB for ClickHouseはApache Sparkとスムーズに連携できるので、例えば、Spark StreamingやDataLake Analytics Serverless SparkなどからApsaraDB for ClickHouseへリアルタイムデータ連携しつつ、ClickHouseでリアルタイム可視化、といったソリューションを構築することもできます。

Special Thanks, Nancy