Athena を使用した Apache Hudi データセットのクエリ - Amazon Athena

Athena を使用した Apache Hudi データセットのクエリ

Apache Hudi は、増分データ処理を簡素化するオープンソースのデータ管理フレームワークです。レコードレベルの挿入、更新、upsert、および削除アクションは、はるかに細かく処理され、オーバーヘッドを軽減します。Upsert は、レコードがまだ存在しない場合は既存のデータセットに挿入し、存在する場合はレコードを更新する機能のことです。

Hudi は、分析のパフォーマンス問題を引き起こす可能性のある小さなファイルを多数作成することなく、データの挿入と更新イベントを処理します。Apache Hudi は自動的に変更を追跡し、ファイルをマージして、最適なサイズを維持します。これにより、多数の小さなファイルをモニタリングし、より少ない数の大きなファイルに書き換えるカスタムソリューションを構築する必要がなくなります。

Hudi データセットは、次のユースケースに適しています。

Hudi によって管理されるデータセットは、オープンストレージ形式を使用して Amazon S3 に保存されます。現在、Athena は短縮された Hudi データセットを読み取ることができますが、Hudi データを書き込むことはできません。Athena は、Athena エンジンバージョン 2 で Hudi バージョン 0.8.0 まで、Athena エンジンバージョン 3 では Hudi バージョン 0.14.0 までをサポートします。これは変更される可能性があります。Athena は、Hudi のそれ以降のバージョンで作成されたテーブルとの読み取り互換性は保証できません。Athena エンジンのバージョニングについては、「Athena エンジンのバージョニング」を参照してください。Hudi の機能とバージョニングの詳細については、Apache ウェブサイトの「Hudi ドキュメント」を参照してください。

Hudi データセットテーブルタイプ

Hudi データセットは、次のタイプのいずれかになります。

  • 書き込み時コピー (CoW) – データは列形式 (Parquet) で保存され、更新ごとに書き込み中にファイルの新しいバージョンが作成されます。

  • 読み取り時マージ (MoR) – データは、列形式 (Parquet) 形式と行形式 (Avro) の組み合わせを使用して保存されます。更新は、行形式の delta ファイルに記録され、必要に応じて圧縮されて、新しいバージョンの列形式のファイルが作成されます。

CoW データセットでは、レコードが更新されるたびに、そのレコードを含むファイルが更新された値で書き換えられます。MoR データセットでは、更新があるたびに、Hudi によって変更されたレコードの行のみが書き込まれます。MoR は、読み取りが少なく書き込みまたは変更が多いワークロードに適しています。CoW は、頻繁に変更されないデータの読み取りが多いワークロードに適しています。

Hudi にはデータにアクセスするためのクエリタイプが 3 種類用意されています。

  • スナップショットクエリ – 指定されたコミットまたは圧縮アクションの時点で最新のテーブルスナップショットを参照するクエリです。MoR テーブルの場合、スナップショットクエリは、クエリ時に最新のファイルスライスのベースファイルとデルタファイルをマージして、テーブルの最新の状態を提示します。

  • 増分クエリ – クエリは、指定されたコミット/圧縮以降にテーブルに書き込まれた新しいデータのみを参照します。これにより、増分データパイプラインを有効にするための変更ストリームが効果的に提供されます。

  • 読み取り最適化クエリ – MoR テーブルの場合、クエリは圧縮された最新のデータを参照します。CoW テーブルの場合、クエリはコミットされた最新のデータを参照します。

次の表は、各テーブルタイプに対して実行できる Hudi クエリタイプを示しています。

テーブルタイプ 使用可能な Hudi クエリタイプ
書き込み時にコピー スナップショット、増分
読み取り時にマージ スナップショット、増分、読み取り最適化

現在、Athena はスナップショットクエリと読み取り最適化クエリをサポートしていますが、増分クエリはサポートしていません。MoR テーブルでは、すべての 読み取り最適化クエリに対して提示されるデータは圧縮されています。これにより、パフォーマンスは向上しますが、最新のデルタコミットは含まれていません。Snapshot クエリには、最も新しいデータが含まれています。しかし、いくつかの計算オーバーヘッドが生じ、これらのクエリのパフォーマンスが低下します。

テーブルとクエリタイプ間でのトレードオフの詳細については、Apache Hudi ドキュメントの「テーブルとクエリのタイプ」を参照してください。

Hudi 用語の変更: ビューはクエリに変更されました

リリースバージョン 0.5.1 以降、Apache Hudi は用語の一部を変更しました。以前、ビューと呼ばれていたものは、最近のリリースではクエリと呼ばれています。次の表は、新旧用語の変更をまとめたものです。

旧用語 新用語

CoW: 読み取り最適化ビュー

MoR: リアルタイムビュー

スナップショットクエリ

増分ビュー 増分クエリ
MoR 読み取り最適化ビュー 読み取り最適化クエリ

ブートストラップ操作からのテーブル

Apache Hudi バージョン 0.6.0 以降では、ブートストラップ操作機能によって、既存の Parquet データセットのパフォーマンスが向上します。ブートストラップ操作では、データセットを書き換える代わりに、メタデータのみを生成し、データセットはそのまま残すことができます。

Athena を使用して、Amazon S3 のデータに基づいて他のテーブルと同様に、ブートストラップ操作からテーブルをクエリできます。CREATE TABLE ステートメントで、LOCATION 句で Hudi テーブルのパスを指定します。

Amazon EMR で、ブートストラップオペレーションを使用して Hudi テーブルを作成する方法の詳細については、「AWS ビッグデータブログ」の記事「New features from Apache Hudi available in Amazon EMR」(Amazon EMR で利用可能な Apache Hudi の新機能) を参照してください。

Hudi メタデータのリスト

Apache Hudi には、ファイルの一覧表示、列統計を使用したデータのスキップ、ブルームフィルターベースのインデックスなど、パフォーマンスを改善するためのインデックス機能を含むメタデータテーブルがあります。

これらの機能のうち、Athena は現在、ファイルリストインデックスのみをサポートしています。ファイルリストインデックスは、ファイルに対するパーティションのマッピングを維持するインデックスから情報を取得することにより、「ファイルのリスト」などのファイルシステム呼び出しを排除します。これにより、ファイルシステムのビューを取得するために、テーブルパスの下のすべてのパーティションを再帰的にリストする必要がなくなります。大規模なデータセットを扱う場合、このインデックス作成により、書き込みやクエリ中にファイルのリストを取得するときに発生するレイテンシーが大幅に低減されます。また、Amazon S3 LIST 呼び出しでのリクエスト制限のスロットリングなどのボトルネックも回避できます。

注記

現時点では、Athena はデータスキップやブルームフィルターインデックス作成をサポートしていません。

Hudi メタデータテーブルの有効化

メタデータテーブルベースのファイルリストはデフォルトでは無効になっています。Hudi メタデータテーブルと関連ファイルのリスト機能を有効にするには、hudi.metadata-listing-enabled テーブルプロパティを TRUE に設定します。

次のALTER TABLE SET TBLPROPERTIES の例では、サンプル partition_cow テーブルでメタデータテーブルを有効にします。

ALTER TABLE partition_cow SET TBLPROPERTIES('hudi.metadata-listing-enabled'='TRUE')

考慮事項と制約事項

  • Athena は増分クエリをサポートしていません。

  • Athena は、Hudi データに対する CTAS または INSERT INTO をサポートしません。Athena による Hudi データセットへの書き込みのサポートをご希望の場合は、 までフィードバックをお送りください。

    Hudi データの記述の詳細については、次のリソースを参照してください。

  • Athena での Hudi テーブルに対する MSCK REPAIR TABLE の使用はサポートされていません。AWS Glue 以外で作成された Hudi テーブルをロードする必要がある場合は、ALTER TABLE ADD PARTITION を使用してください。

  • S3 Glacier オブジェクトのスキップはサポートされていません — Apache Hudi テーブル内のオブジェクトが Amazon S3 Glacier ストレージクラスにある場合、read_restored_glacier_objects テーブルプロパティを false に設定しても効果はありません。

    例えば、次のコマンドを実行したとします。

    ALTER TABLE table_name SET TBLPROPERTIES ('read_restored_glacier_objects' = 'false')

    Iceberg および Delta Lake テーブルでは、コマンドは「サポートされていないテーブルプロパティキー: read_restored_glacier_objects」というエラーを生成します。Hudi テーブルでは、ALTER TABLE コマンドはエラーを発生しませんが、Amazon S3 Glacier オブジェクトはまだスキップされません。ALTER TABLE コマンドの後に SELECT クエリを実行すると、引き続きすべてのオブジェクトが返されます。

追加リソース

Athena での Apache Hudi の使用に関するその他のリソースについては、以下のリソースを参照してください。

動画

次の動画では、Amazon Athena を使用して、Amazon S3 ベースのデータレイクにある Apache Hudi データセットの読み取り最適化クエリをする方法が紹介されています。

ブログ記事

次の AWS Big Data Blog 記事では、Athena で Apache Hudi を使用する方法を解説しています。

Hudi テーブルの作成

このセクションでは、Hudi データのパーティション分割されたテーブルとパーティション分割されていないテーブルのための Athena の CREATE TABLE ステートメントの例を提供します。

AWS Glue で既に Hudi テーブルを作成している場合は、Athena でそれらを直接クエリできます。Athena でパーティション化された Hudi テーブルを作成するときは、クエリを実行する前に、ALTER TABLE ADD PARTITION を実行して Hudi データをロードする必要があります。

書き込み時コピー (CoW) テーブル作成の例

パーティション化されていない CoW テーブル

以下の例は、Athena でパーティション分割されていない CoW テーブルを作成します。

CREATE EXTERNAL TABLE `non_partition_cow`( `_hoodie_commit_time` string, `_hoodie_commit_seqno` string, `_hoodie_record_key` string, `_hoodie_partition_path` string, `_hoodie_file_name` string, `event_id` string, `event_time` string, `event_name` string, `event_guests` int, `event_type` string) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT 'org.apache.hudi.hadoop.HoodieParquetInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' LOCATION 's3://DOC-EXAMPLE-BUCKET/folder/non_partition_cow/'

パーティション化された CoW テーブル

以下の例は、Athena でパーティション分割された CoW テーブルを作成します。

CREATE EXTERNAL TABLE `partition_cow`( `_hoodie_commit_time` string, `_hoodie_commit_seqno` string, `_hoodie_record_key` string, `_hoodie_partition_path` string, `_hoodie_file_name` string, `event_id` string, `event_time` string, `event_name` string, `event_guests` int) PARTITIONED BY ( `event_type` string) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT 'org.apache.hudi.hadoop.HoodieParquetInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' LOCATION 's3://DOC-EXAMPLE-BUCKET/folder/partition_cow/'

次の ALTER TABLE ADD PARTITION の例では、サンプル partition_cow テーブルに 2 つのパーティションを追加します。

ALTER TABLE partition_cow ADD PARTITION (event_type = 'one') LOCATION 's3://DOC-EXAMPLE-BUCKET/folder/partition_cow/one/' PARTITION (event_type = 'two') LOCATION 's3://DOC-EXAMPLE-BUCKET/folder/partition_cow/two/'

読み取り時マージ (MOR) テーブル作成の例

Hudi は、MoR のメタストアに 2 つのテーブルを作成します。1 つはスナップショットクエリ用のテーブルで、もう 1 つは読み取り最適化クエリ用のテーブルです。両方のテーブルがクエリ可能です。0.5.1 より前の Hudi バージョンでは、読み取り最適化クエリのテーブルには、テーブルの作成時に指定した名前がありました。Hudi バージョン 0.5.1 以降、デフォルトでテーブル名の末尾に _ro が付きます。スナップショットクエリのテーブルの名前は、指定した名前に _rt が付きます。

パーティション化されていない読み取り時マージ (MOR) テーブル

次の例では、読み取り最適化クエリのために、Athena でパーティション分割されていない MoR テーブルを作成します。読み取り最適化クエリでは、入力形式 HoodieParquetInputFormat を使用することに注意してください。

CREATE EXTERNAL TABLE `nonpartition_mor`( `_hoodie_commit_time` string, `_hoodie_commit_seqno` string, `_hoodie_record_key` string, `_hoodie_partition_path` string, `_hoodie_file_name` string, `event_id` string, `event_time` string, `event_name` string, `event_guests` int, `event_type` string) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT 'org.apache.hudi.hadoop.HoodieParquetInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' LOCATION 's3://DOC-EXAMPLE-BUCKET/folder/nonpartition_mor/'

次の例では、スナップショットクエリのために、Athena でパーティション分割されていない MoR テーブルを作成します。スナップショットクエリの場合は、入力形式 HoodieParquetRealtimeInputFormat を使用します。

CREATE EXTERNAL TABLE `nonpartition_mor_rt`( `_hoodie_commit_time` string, `_hoodie_commit_seqno` string, `_hoodie_record_key` string, `_hoodie_partition_path` string, `_hoodie_file_name` string, `event_id` string, `event_time` string, `event_name` string, `event_guests` int, `event_type` string) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT 'org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' LOCATION 's3://DOC-EXAMPLE-BUCKET/folder/nonpartition_mor/'

パーティション化された読み取り時マージ (MOR) テーブル

次の例では、読み取り最適化クエリのために、Athena でパーティション分割された MoR テーブルを作成します。

CREATE EXTERNAL TABLE `partition_mor`( `_hoodie_commit_time` string, `_hoodie_commit_seqno` string, `_hoodie_record_key` string, `_hoodie_partition_path` string, `_hoodie_file_name` string, `event_id` string, `event_time` string, `event_name` string, `event_guests` int) PARTITIONED BY ( `event_type` string) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT 'org.apache.hudi.hadoop.HoodieParquetInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' LOCATION 's3://DOC-EXAMPLE-BUCKET/folder/partition_mor/'

次の ALTER TABLE ADD PARTITION の例では、サンプル partition_mor テーブルに 2 つのパーティションを追加します。

ALTER TABLE partition_mor ADD PARTITION (event_type = 'one') LOCATION 's3://DOC-EXAMPLE-BUCKET/folder/partition_mor/one/' PARTITION (event_type = 'two') LOCATION 's3://DOC-EXAMPLE-BUCKET/folder/partition_mor/two/'

次の例では、スナップショットクエリのために、Athena でパーティション分割された MoR テーブルを作成します。

CREATE EXTERNAL TABLE `partition_mor_rt`( `_hoodie_commit_time` string, `_hoodie_commit_seqno` string, `_hoodie_record_key` string, `_hoodie_partition_path` string, `_hoodie_file_name` string, `event_id` string, `event_time` string, `event_name` string, `event_guests` int) PARTITIONED BY ( `event_type` string) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT 'org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' LOCATION 's3://DOC-EXAMPLE-BUCKET/folder/partition_mor/'

同様に、次の ALTER TABLE ADD PARTITION の例では、サンプル partition_mor_rt テーブルに 2 つのパーティションを追加します。

ALTER TABLE partition_mor_rt ADD PARTITION (event_type = 'one') LOCATION 's3://DOC-EXAMPLE-BUCKET/folder/partition_mor/one/' PARTITION (event_type = 'two') LOCATION 's3://DOC-EXAMPLE-BUCKET/folder/partition_mor/two/'

追加リソース