Using Athena to query Apache Hudi datasets - Amazon Athena

Using Athena to query Apache Hudi datasets

Apache Hudi is an open-source data management framework that simplifies incremental data processing. Record-level insert, update, upsert, and delete actions are processed much more granularly, reducing overhead. Upsert refers to the ability to insert records into an existing dataset if they do not already exist or to update them if they do.

Hudi handles data insertion and update events without creating many small files that can cause performance issues for analytics. Apache Hudi automatically tracks changes and merges files so that they remain optimally sized. This avoids the need to build custom solutions that monitor and re-write many small files into fewer large files.

Hudi datasets are suitable for the following use cases:

Data sets managed by Hudi are stored in Amazon S3 using open storage formats. Currently, Athena can read compacted Hudi datasets but not write Hudi data. Athena supports up to Hudi version 0.8.0 with Athena engine version 2, and Hudi version 0.14.0 with Athena engine version 3. This is subject to change. Athena cannot guarantee read compatibility with tables that are created with later versions of Hudi. For information about Athena engine versioning, see Athena engine versioning. For more information about Hudi features and versioning, see the Hudi documentation on the Apache website.

Hudi dataset table types

A Hudi dataset can be one of the following types:

  • Copy on Write (CoW) – Data is stored in a columnar format (Parquet), and each update creates a new version of files during a write.

  • Merge on Read (MoR) – Data is stored using a combination of columnar (Parquet) and row-based (Avro) formats. Updates are logged to row-based delta files and are compacted as needed to create new versions of the columnar files.

With CoW datasets, each time there is an update to a record, the file that contains the record is rewritten with the updated values. With a MoR dataset, each time there is an update, Hudi writes only the row for the changed record. MoR is better suited for write- or change-heavy workloads with fewer reads. CoW is better suited for read-heavy workloads on data that change less frequently.

Hudi provides three query types for accessing the data:

  • Snapshot queries – Queries that see the latest snapshot of the table as of a given commit or compaction action. For MoR tables, snapshot queries expose the most recent state of the table by merging the base and delta files of the latest file slice at the time of the query.

  • Incremental queries – Queries only see new data written to the table, since a given commit/compaction. This effectively provides change streams to enable incremental data pipelines.

  • Read optimized queries – For MoR tables, queries see the latest data compacted. For CoW tables, queries see the latest data committed.

The following table shows the possible Hudi query types for each table type.

Table type Possible Hudi query types
Copy On Write snapshot, incremental
Merge On Read snapshot, incremental, read optimized

Currently, Athena supports snapshot queries and read optimized queries, but not incremental queries. On MoR tables, all data exposed to read optimized queries are compacted. This provides good performance but does not include the latest delta commits. Snapshot queries contain the freshest data but incur some computational overhead, which makes these queries less performant.

For more information about the tradeoffs between table and query types, see Table & Query Types in the Apache Hudi documentation.

Hudi terminology change: Views are now queries

Starting in release version 0.5.1, Apache Hudi changed some of its terminology. What were formerly views are called queries in later releases. The following table summarizes the changes between the old and new terms.

Old term New term

CoW: read optimized view

MoR: realtime view

Snapshot queries

Incremental view Incremental query
MoR read optimized view Read optimized query

Tables from bootstrap operation

Starting in Apache Hudi version 0.6.0, the bootstrap operation feature provides better performance with existing Parquet datasets. Instead of rewriting the dataset, a bootstrap operation can generate metadata only, leaving the dataset in place.

You can use Athena to query tables from a bootstrap operation just like other tables based on data in Amazon S3. In your CREATE TABLE statement, specify the Hudi table path in your LOCATION clause.

For more information about creating Hudi tables using the bootstrap operation in Amazon EMR, see the article New features from Apache Hudi available in Amazon EMR in the AWS Big Data Blog.

Hudi metadata listing

The Apache Hudi has a metadata table that contains indexing features for improved performance like file listing, data skipping using column statistics, and a bloom filter based index.

Of these features, Athena currently supports only the file listing index. The file listing index eliminates file system calls like "list files" by fetching the information from an index which maintains a partition to files mapping. This removes the need to recursively list each and every partition under the table path to get a view of the file system. When you work with large datasets, this indexing drastically reduces the latency that would otherwise occur when getting the list of files during writes and queries. It also avoids bottlenecks like request limits throttling on Amazon S3 LIST calls.

Note

Athena does not support data skipping or bloom filter indexing at this time.

Enabling the Hudi metadata table

Metadata table based file listing is disabled by default. To enable the Hudi metadata table and the related file listing functionality, set the hudi.metadata-listing-enabled table property to TRUE.

Example

The following ALTER TABLE SET TBLPROPERTIES example enables the metadata table on the example partition_cow table.

ALTER TABLE partition_cow SET TBLPROPERTIES('hudi.metadata-listing-enabled'='TRUE')

Considerations and limitations

  • Athena does not support incremental queries.

  • Athena does not support CTAS or INSERT INTO on Hudi data. If you would like Athena support for writing Hudi datasets, send feedback to .

    For more information about writing Hudi data, see the following resources:

  • Using MSCK REPAIR TABLE on Hudi tables in Athena is not supported. If you need to load a Hudi table not created in AWS Glue, use ALTER TABLE ADD PARTITION.

  • Skipping S3 Glacier objects not supported – If objects in the Apache Hudi table are in an Amazon S3 Glacier storage class, setting the read_restored_glacier_objects table property to false has no effect.

    For example, suppose you issue the following command:

    ALTER TABLE table_name SET TBLPROPERTIES ('read_restored_glacier_objects' = 'false')

    For Iceberg and Delta Lake tables, the command produces the error Unsupported table property key: read_restored_glacier_objects. For Hudi tables, the ALTER TABLE command does not produce an error, but Amazon S3 Glacier objects are still not skipped. Running SELECT queries after the ALTER TABLE command continues to return all objects.

Video

The following video shows how you can use Amazon Athena to query a read-optimized Apache Hudi dataset in your Amazon S3-based data lake.

Creating Hudi tables

This section provides examples of CREATE TABLE statements in Athena for partitioned and nonpartitioned tables of Hudi data.

If you have Hudi tables already created in AWS Glue, you can query them directly in Athena. When you create partitioned Hudi tables in Athena, you must run ALTER TABLE ADD PARTITION to load the Hudi data before you can query it.

Copy on write (CoW) create table examples

Nonpartitioned CoW table

The following example creates a nonpartitioned CoW table in Athena.

CREATE EXTERNAL TABLE `non_partition_cow`( `_hoodie_commit_time` string, `_hoodie_commit_seqno` string, `_hoodie_record_key` string, `_hoodie_partition_path` string, `_hoodie_file_name` string, `event_id` string, `event_time` string, `event_name` string, `event_guests` int, `event_type` string) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT 'org.apache.hudi.hadoop.HoodieParquetInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' LOCATION 's3://bucket/folder/non_partition_cow/'

Partitioned CoW table

The following example creates a partitioned CoW table in Athena.

CREATE EXTERNAL TABLE `partition_cow`( `_hoodie_commit_time` string, `_hoodie_commit_seqno` string, `_hoodie_record_key` string, `_hoodie_partition_path` string, `_hoodie_file_name` string, `event_id` string, `event_time` string, `event_name` string, `event_guests` int) PARTITIONED BY ( `event_type` string) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT 'org.apache.hudi.hadoop.HoodieParquetInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' LOCATION 's3://bucket/folder/partition_cow/'

The following ALTER TABLE ADD PARTITION example adds two partitions to the example partition_cow table.

ALTER TABLE partition_cow ADD PARTITION (event_type = 'one') LOCATION 's3://bucket/folder/partition_cow/one/' PARTITION (event_type = 'two') LOCATION 's3://bucket/folder/partition_cow/two/'

Merge on read (MoR) create table examples

Hudi creates two tables in the metastore for MoR: a table for snapshot queries, and a table for read optimized queries. Both tables are queryable. In Hudi versions prior to 0.5.1, the table for read optimized queries had the name that you specified when you created the table. Starting in Hudi version 0.5.1, the table name is suffixed with _ro by default. The name of the table for snapshot queries is the name that you specified appended with _rt.

Nonpartitioned merge on read (MoR) table

The following example creates a nonpartitioned MoR table in Athena for read optimized queries. Note that read optimized queries use the input format HoodieParquetInputFormat.

CREATE EXTERNAL TABLE `nonpartition_mor`( `_hoodie_commit_time` string, `_hoodie_commit_seqno` string, `_hoodie_record_key` string, `_hoodie_partition_path` string, `_hoodie_file_name` string, `event_id` string, `event_time` string, `event_name` string, `event_guests` int, `event_type` string) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT 'org.apache.hudi.hadoop.HoodieParquetInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' LOCATION 's3://bucket/folder/nonpartition_mor/'

The following example creates a nonpartitioned MoR table in Athena for snapshot queries. For snapshot queries, use the input format HoodieParquetRealtimeInputFormat.

CREATE EXTERNAL TABLE `nonpartition_mor_rt`( `_hoodie_commit_time` string, `_hoodie_commit_seqno` string, `_hoodie_record_key` string, `_hoodie_partition_path` string, `_hoodie_file_name` string, `event_id` string, `event_time` string, `event_name` string, `event_guests` int, `event_type` string) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT 'org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' LOCATION 's3://bucket/folder/nonpartition_mor/'

Partitioned merge on read (MoR) table

The following example creates a partitioned MoR table in Athena for read optimized queries.

CREATE EXTERNAL TABLE `partition_mor`( `_hoodie_commit_time` string, `_hoodie_commit_seqno` string, `_hoodie_record_key` string, `_hoodie_partition_path` string, `_hoodie_file_name` string, `event_id` string, `event_time` string, `event_name` string, `event_guests` int) PARTITIONED BY ( `event_type` string) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT 'org.apache.hudi.hadoop.HoodieParquetInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' LOCATION 's3://bucket/folder/partition_mor/'

The following ALTER TABLE ADD PARTITION example adds two partitions to the example partition_mor table.

ALTER TABLE partition_mor ADD PARTITION (event_type = 'one') LOCATION 's3://bucket/folder/partition_mor/one/' PARTITION (event_type = 'two') LOCATION 's3://bucket/folder/partition_mor/two/'

The following example creates a partitioned MoR table in Athena for snapshot queries.

CREATE EXTERNAL TABLE `partition_mor_rt`( `_hoodie_commit_time` string, `_hoodie_commit_seqno` string, `_hoodie_record_key` string, `_hoodie_partition_path` string, `_hoodie_file_name` string, `event_id` string, `event_time` string, `event_name` string, `event_guests` int) PARTITIONED BY ( `event_type` string) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT 'org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' LOCATION 's3://bucket/folder/partition_mor/'

Similarly, the following ALTER TABLE ADD PARTITION example adds two partitions to the example partition_mor_rt table.

ALTER TABLE partition_mor_rt ADD PARTITION (event_type = 'one') LOCATION 's3://bucket/folder/partition_mor/one/' PARTITION (event_type = 'two') LOCATION 's3://bucket/folder/partition_mor/two/'

See also