最近はデータレイクに保存しているデータに対し更新、削除ができるライブラリが増えてきました。
Kudu,deltalakeやHudiなどがありますが、今回はUberが作ったHudiを触ってみました。
セットアップ
Quick-Start Guide - Apache Hudi を参考にセットアップしていきましょう。
localで試すためにspark-shellをいれて実行します。
$ brew install spark-shell $ spark-shell --package org.apache.hudi:hudi-spark-bundle_2.11:0.5.1-incubating,org.apache.spark:spark-avro_2.11:2.4.4 \ --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
データの書き込み
Quick Startに書いてあるとおり、立ち上がったspark-shell上でライブラリをimportします 以下spark-shell上で実行します。
import org.apache.hudi.QuickstartUtils._ import scala.collection.JavaConversions._ import org.apache.spark.sql.SaveMode._ import org.apache.hudi.DataSourceReadOptions._ import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.config.HoodieWriteConfig._ val tableName = "hudi_ad_user" val basePath = "file:///tmp/hudi_ad_user" val schemaStr = """{"type": "record","name": "users","fields": [ {"name": "ts","type": "double"},{"name": "uuid", "type": "string"}, {"name":"clicked_url","type": "string"}]}""" scala> val cfg = newBuilder(). withPath(basePath). forTable(tableName). withSchema(schemaStr)
basePathはspark-shell上でwriteしたデータが吐かれる場所です
上記の設定だと /tmp/hudi_ad_user/
以下にデータが作成されます
ではデータをいれていきます。
val df1 = Seq((10, "1234", "https://www.google.com", "day=1"), (11, "5678", "https://twitter.com","day=2"), (15, "9012", "http://github.com","day=1")).toDF("ts", "uuid", "clicked_url", "partitionpath") df1.write.format("hudi"). options(getQuickstartWriteConfigs). option(PRECOMBINE_FIELD_OPT_KEY, "ts"). // primary keyが同じデータがあった際にこのカラムの大きい値を最新とする option(RECORDKEY_FIELD_OPT_KEY, "uuid"). // primary keyの役割 option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). // partitionにするキー option(TABLE_NAME, tableName). mode(Overwrite). save(basePath)
上記を実行することでhudi形式でparquetフォーマットのファイルが生成されます。
以下parquetファイルだけ絞って見てみます
$ ls -R /tmp/hudi_ad_user day=1 day=2 ./day=1: b25a75a8-8d3f-455f-8c21-6cfd9efe1cfc-0_1-117-144_20200613135956.parquet ./day=2: 6b1081c3-fd79-4a53-ab9d-f8815d5e0a2e-0_0-117-143_20200613135956.parquet
対象のパーティションディレクトリにparquetファイルが吐かれているのがわかります。
データ読み込み
では検索してみましょう。
spark.read.format("hudi").load(basePath + "/*").show() +-------------------+--------------------+------------------+----------------------+--------------------+---+----+--------------------+-------------+ |_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path| _hoodie_file_name| ts|uuid| clicked_url|partitionpath| +-------------------+--------------------+------------------+----------------------+--------------------+---+----+--------------------+-------------+ | 20200613135956| 20200613135956_0_23| 5678| day=2|6b1081c3-fd79-4a5...| 11|5678| https://twitter.com| day=2| | 20200613135956| 20200613135956_1_21| 9012| day=1|b25a75a8-8d3f-455...| 15|9012| http://github.com| day=1| | 20200613135956| 20200613135956_1_22| 1234| day=1|b25a75a8-8d3f-455...| 10|1234|https://www.googl...| day=1| +-------------------+--------------------+------------------+----------------------+--------------------+---+----+--------------------+-------------+
データが入っていますね。
_hoodie
という接頭辞がついているカラムがhudiが管理しているメタデータです。
_hoodie_record_key
に uuid
が入っています
データ更新
データを更新してみます。 更新する場合はRecordKeyが同一のものに対してアップデートをかけます。 今回ではuuidがPrimaryKeyの役割を果たしているので、同一のuuidに更新がかかります。 そしてtsを比較することで大きい値を最新とみなして更新します。
val df2 = Seq((20, "1234", "https://facebook.com", "day=1")).toDF("ts", "uuid", "clicked_url", "partitionpath") df2.write.format("hudi"). options(getQuickstartWriteConfigs). option(TABLE_NAME, tableName). mode(Append). save(basePath)
spark.read.format("hudi").load(basePath + "/*").show() +-------------------+--------------------+------------------+----------------------+--------------------+---+----+--------------------+-------------+ |_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path| _hoodie_file_name| ts|uuid| clicked_url|partitionpath| +-------------------+--------------------+------------------+----------------------+--------------------+---+----+--------------------+-------------+ | 20200613135956| 20200613135956_0_23| 5678| day=2|6b1081c3-fd79-4a5...| 11|5678| https://twitter.com| day=2| | 20200613135956| 20200613135956_1_21| 9012| day=1|b25a75a8-8d3f-455...| 15|9012| http://github.com| day=1| | 20200613141311| 20200613141311_0_24| 1234| day=1|b25a75a8-8d3f-455...| 20|1234|https://facebook.com| day=1| +-------------------+--------------------+------------------+----------------------+--------------------+---+----+--------------------+-------------+
_hoodie_record_key
の uuid
をキーにアップデートされているのがわかります。
実際のディレクトリを見てみると、新たにday=1のパーティションにparquetファイルが新たに追加されました。
$ ls -R day=1 day=2 ./day=1: b25a75a8-8d3f-455f-8c21-6cfd9efe1cfc-0_0-153-13664_20200613141311.parquet b25a75a8-8d3f-455f-8c21-6cfd9efe1cfc-0_1-117-144_20200613135956.parquet ./day=2: 6b1081c3-fd79-4a53-ab9d-f8815d5e0a2e-0_0-117-143_20200613135956.parquet
特定の時点でのクエリ
アップデートしても、アップデート前の時点の状態に対してクエリをかけたい場合もあると思います。
BEGIN_TIMEの指定とEND_TIMEの指定を以下のようにすると、その _hoodie_commit_time
内のデータを参照します。
スタートに関しては "000" 指定必須感はありますがw
val df3 = spark.read.format("hudi"). option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL). option(BEGIN_INSTANTTIME_OPT_KEY, "000"). option(END_INSTANTTIME_OPT_KEY, "20200613135957"). load(basePath) df3.show() +-------------------+--------------------+------------------+----------------------+--------------------+---+----+--------------------+-------------+ |_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path| _hoodie_file_name| ts|uuid| clicked_url|partitionpath| +-------------------+--------------------+------------------+----------------------+--------------------+---+----+--------------------+-------------+ | 20200613135956| 20200613135956_0_23| 5678| day=2|6b1081c3-fd79-4a5...| 11|5678| https://twitter.com| day=2| | 20200613135956| 20200613135956_1_21| 9012| day=1|b25a75a8-8d3f-455...| 15|9012| http://github.com| day=1| | 20200613135956| 20200613135956_1_22| 1234| day=1|b25a75a8-8d3f-455...| 10|1234|https://www.googl...| day=1| +-------------------+--------------------+------------------+----------------------+--------------------+---+----+--------------------+-------------+
まとめ
PrimaryKeyを指定してそれをベースにデータを上書きしていきます。 既存のデータをhudi形式に書き換える必要はあると思いますが、今まで貯めていたデータをアップデートできず洗い替えしていた場合にはいいソリューションなのではと思います。