Screaming Loud

日々是精進

Apache Hudiを触ってみた

最近はデータレイクに保存しているデータに対し更新、削除ができるライブラリが増えてきました。

Kudu,deltalakeやHudiなどがありますが、今回はUberが作ったHudiを触ってみました。

セットアップ

Quick-Start Guide - Apache Hudi を参考にセットアップしていきましょう。

localで試すためにspark-shellをいれて実行します。

$ brew install spark-shell
$ spark-shell --package org.apache.hudi:hudi-spark-bundle_2.11:0.5.1-incubating,org.apache.spark:spark-avro_2.11:2.4.4 \
  --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'

データの書き込み

Quick Startに書いてあるとおり、立ち上がったspark-shell上でライブラリをimportします 以下spark-shell上で実行します。

import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._

val tableName = "hudi_ad_user"
val basePath = "file:///tmp/hudi_ad_user"
val schemaStr = """{"type": "record","name": "users","fields": [ 
        {"name": "ts","type": "double"},{"name": "uuid", "type": "string"},
        {"name":"clicked_url","type": "string"}]}"""
scala> val cfg = newBuilder().
        withPath(basePath).
        forTable(tableName).
        withSchema(schemaStr)

basePathはspark-shell上でwriteしたデータが吐かれる場所です 上記の設定だと /tmp/hudi_ad_user/ 以下にデータが作成されます

ではデータをいれていきます。

val df1 = Seq((10, "1234", "https://www.google.com", "day=1"), (11, "5678", "https://twitter.com","day=2"), (15, "9012", "http://github.com","day=1")).toDF("ts", "uuid", "clicked_url", "partitionpath")
df1.write.format("hudi").
  options(getQuickstartWriteConfigs).
  option(PRECOMBINE_FIELD_OPT_KEY, "ts"). // primary keyが同じデータがあった際にこのカラムの大きい値を最新とする
  option(RECORDKEY_FIELD_OPT_KEY, "uuid").  // primary keyの役割
  option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). // partitionにするキー
  option(TABLE_NAME, tableName).
  mode(Overwrite).
  save(basePath)

上記を実行することでhudi形式でparquetフォーマットのファイルが生成されます。

以下parquetファイルだけ絞って見てみます

$ ls -R  /tmp/hudi_ad_user
day=1 day=2

./day=1:
b25a75a8-8d3f-455f-8c21-6cfd9efe1cfc-0_1-117-144_20200613135956.parquet

./day=2:
6b1081c3-fd79-4a53-ab9d-f8815d5e0a2e-0_0-117-143_20200613135956.parquet

対象のパーティションディレクトリにparquetファイルが吐かれているのがわかります。

データ読み込み

では検索してみましょう。

spark.read.format("hudi").load(basePath + "/*").show()

+-------------------+--------------------+------------------+----------------------+--------------------+---+----+--------------------+-------------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name| ts|uuid|         clicked_url|partitionpath|
+-------------------+--------------------+------------------+----------------------+--------------------+---+----+--------------------+-------------+
|     20200613135956| 20200613135956_0_23|              5678|                 day=2|6b1081c3-fd79-4a5...| 11|5678| https://twitter.com|        day=2|
|     20200613135956| 20200613135956_1_21|              9012|                 day=1|b25a75a8-8d3f-455...| 15|9012|   http://github.com|        day=1|
|     20200613135956| 20200613135956_1_22|              1234|                 day=1|b25a75a8-8d3f-455...| 10|1234|https://www.googl...|        day=1|
+-------------------+--------------------+------------------+----------------------+--------------------+---+----+--------------------+-------------+

データが入っていますね。 _hoodie という接頭辞がついているカラムがhudiが管理しているメタデータです。 _hoodie_record_keyuuid が入っています

データ更新

データを更新してみます。 更新する場合はRecordKeyが同一のものに対してアップデートをかけます。 今回ではuuidがPrimaryKeyの役割を果たしているので、同一のuuidに更新がかかります。 そしてtsを比較することで大きい値を最新とみなして更新します。

val df2 = Seq((20, "1234", "https://facebook.com", "day=1")).toDF("ts", "uuid", "clicked_url", "partitionpath")
df2.write.format("hudi").
  options(getQuickstartWriteConfigs).
  option(TABLE_NAME, tableName).
  mode(Append).
  save(basePath)
spark.read.format("hudi").load(basePath + "/*").show()

+-------------------+--------------------+------------------+----------------------+--------------------+---+----+--------------------+-------------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name| ts|uuid|         clicked_url|partitionpath|
+-------------------+--------------------+------------------+----------------------+--------------------+---+----+--------------------+-------------+
|     20200613135956| 20200613135956_0_23|              5678|                 day=2|6b1081c3-fd79-4a5...| 11|5678| https://twitter.com|        day=2|
|     20200613135956| 20200613135956_1_21|              9012|                 day=1|b25a75a8-8d3f-455...| 15|9012|   http://github.com|        day=1|
|     20200613141311| 20200613141311_0_24|              1234|                 day=1|b25a75a8-8d3f-455...| 20|1234|https://facebook.com|        day=1|
+-------------------+--------------------+------------------+----------------------+--------------------+---+----+--------------------+-------------+

_hoodie_record_keyuuid をキーにアップデートされているのがわかります。

実際のディレクトリを見てみると、新たにday=1のパーティションにparquetファイルが新たに追加されました。

$ ls -R
day=1 day=2

./day=1:
b25a75a8-8d3f-455f-8c21-6cfd9efe1cfc-0_0-153-13664_20200613141311.parquet b25a75a8-8d3f-455f-8c21-6cfd9efe1cfc-0_1-117-144_20200613135956.parquet

./day=2:
6b1081c3-fd79-4a53-ab9d-f8815d5e0a2e-0_0-117-143_20200613135956.parquet

特定の時点でのクエリ

アップデートしても、アップデート前の時点の状態に対してクエリをかけたい場合もあると思います。 BEGIN_TIMEの指定とEND_TIMEの指定を以下のようにすると、その _hoodie_commit_time 内のデータを参照します。 スタートに関しては "000" 指定必須感はありますがw

val df3 = spark.read.format("hudi").
  option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL).
  option(BEGIN_INSTANTTIME_OPT_KEY, "000").
  option(END_INSTANTTIME_OPT_KEY, "20200613135957").
  load(basePath)
df3.show()
+-------------------+--------------------+------------------+----------------------+--------------------+---+----+--------------------+-------------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name| ts|uuid|         clicked_url|partitionpath|
+-------------------+--------------------+------------------+----------------------+--------------------+---+----+--------------------+-------------+
|     20200613135956| 20200613135956_0_23|              5678|                 day=2|6b1081c3-fd79-4a5...| 11|5678| https://twitter.com|        day=2|
|     20200613135956| 20200613135956_1_21|              9012|                 day=1|b25a75a8-8d3f-455...| 15|9012|   http://github.com|        day=1|
|     20200613135956| 20200613135956_1_22|              1234|                 day=1|b25a75a8-8d3f-455...| 10|1234|https://www.googl...|        day=1|
+-------------------+--------------------+------------------+----------------------+--------------------+---+----+--------------------+-------------+

まとめ

PrimaryKeyを指定してそれをベースにデータを上書きしていきます。 既存のデータをhudi形式に書き換える必要はあると思いますが、今まで貯めていたデータをアップデートできず洗い替えしていた場合にはいいソリューションなのではと思います。