Using Athena to Query Apache Hudi Datasets - Amazon Athena

Using Athena to Query Apache Hudi Datasets

Apache Hudi is an open-source data management framework that simplifies incremental data processing. Record-level insert, update, upsert, and delete actions are processed much more granularly, reducing overhead. Upsert refers to the ability to insert records into an existing dataset if they do not already exist or to update them if they do.

Hudi handles data insertion and update events without creating many small files that can cause performance issues for analytics. Apache Hudi automatically tracks changes and merges files so that they remain optimally sized. This avoids the need to build custom solutions that monitor and re-write many small files into fewer large files.

Hudi datasets are suitable for the following use cases:

Data sets managed by Hudi are stored in S3 using open storage formats. Currently, Athena can read compacted Hudi datasets but not write Hudi data. Athena uses Apache Hudi version 0.5.2-incubating, subject to change. For more information about this Hudi version, see apache/hudi release-0.5.2 on GitHub.com.

Hudi Dataset Storage Types

A Hudi dataset can be one of the following types:

  • Copy on Write (CoW) – Data is stored in a columnar format (Parquet), and each update creates a new version of files during a write.

  • Merge on Read (MoR) – Data is stored using a combination of columnar (Parquet) and row-based (Avro) formats. Updates are logged to row-based delta files and are compacted as needed to create new versions of the columnar files.

With CoW datasets, each time there is an update to a record, the file that contains the record is rewritten with the updated values. With a MoR dataset, each time there is an update, Hudi writes only the row for the changed record. MoR is better suited for write- or change-heavy workloads with fewer reads. CoW is better suited for read-heavy workloads on data that change less frequently.

Hudi provides three logical views for accessing the data:

  • Read-optimized view – Provides the latest committed dataset from CoW tables and the latest compacted dataset from MoR tables.

  • Incremental view – Provides a change stream between two actions out of a CoW dataset to feed downstream jobs and extract, transform, load (ETL) workflows.

  • Real-time view – Provides the latest committed data from a MoR table by merging the columnar and row-based files inline.

Currently, Athena supports only the first of these: the read-optimized view. Queries on a read-optimized view return all compacted data, which provides good performance but does not include the latest delta commits. For more information about the tradeoffs between storage types, see Storage Types & Views in the Apache Hudi documentation.

Considerations and Limitations

  • Athena supports reading of the compacted view of Hudi data only.

    • For Copy on Write (CoW), Athena supports snapshot queries.

    • For Merge on Read (MoR), Athena supports read optimized queries.

  • Athena does not support CTAS or INSERT INTO on Hudi data. If you would like Athena support for writing Hudi datasets, send feedback to .

    For more information about writing Hudi data, see the following resources:

  • Using MSCK REPAIR TABLE on Hudi tables in Athena is not supported. If you need to load a Hudi table not created in AWS Glue, use ALTER TABLE ADD PARTITION.

Creating Hudi Tables

This section provides examples of CREATE TABLE statements in Athena for partitioned and nonpartitioned tables of Hudi data.

If you have Hudi tables already created in AWS Glue, you can query them directly in Athena. When you create Hudi tables in Athena, you must run ALTER TABLE ADD PARTITION to load the Hudi data before you can query it.

Copy on Write (CoW) Create Table Examples

Nonpartitioned CoW Table

The following example creates a nonpartitioned CoW table in Athena.

CREATE EXTERNAL TABLE `non_partition_cow`( `_hoodie_commit_time` string, `_hoodie_commit_seqno` string, `_hoodie_record_key` string, `_hoodie_partition_path` string, `_hoodie_file_name` string, `event_id` string, `event_time` string, `event_name` string, `event_guests` int, `event_type` string) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT 'org.apache.hudi.hadoop.HoodieParquetInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' LOCATION 's3://bucket/folder/non_partition_cow'

Partitioned CoW Table

The following example creates a partitioned CoW table in Athena.

CREATE EXTERNAL TABLE `partition_cow`( `_hoodie_commit_time` string, `_hoodie_commit_seqno` string, `_hoodie_record_key` string, `_hoodie_partition_path` string, `_hoodie_file_name` string, `event_id` string, `event_time` string, `event_name` string, `event_guests` int) PARTITIONED BY ( `event_type` string) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT 'org.apache.hudi.hadoop.HoodieParquetInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' LOCATION 's3://bucket/folder/partition_cow'

The following ALTER TABLE ADD PARTITION example adds two partitions to the example partition_cow table.

ALTER TABLE partition_cow ADD PARTITION (event_type = 'one') LOCATION 's3://bucket/folder/partition_cow/one/' PARTITION (event_type = 'two') LOCATION 's3://bucket/folder/partition_cow/two/'

Merge on Read (MoR) Create Table Examples

Hudi creates two tables in the Hive metastore for MoR: a table with the name that you specified, which is a read-optimized view, and a table with the same name appended with _rt, which is a real-time view. However, when you create MoR tables in Athena, you can query only the read-optimized view.

Nonpartitioned Merge on Read (MoR) Table

The following example creates a nonpartitioned MoR table in Athena.

CREATE EXTERNAL TABLE `nonpartition_mor`( `_hoodie_commit_time` string, `_hoodie_commit_seqno` string, `_hoodie_record_key` string, `_hoodie_partition_path` string, `_hoodie_file_name` string, `event_id` string, `event_time` string, `event_name` string, `event_guests` int, `event_type` string) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT 'org.apache.hudi.hadoop.HoodieParquetInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' LOCATION 's3://bucket/folder/nonpartition_mor'

Partitioned Merge on Read (MoR) Table

The following example creates a partitioned MoR table in Athena.

CREATE EXTERNAL TABLE `partition_mor`( `_hoodie_commit_time` string, `_hoodie_commit_seqno` string, `_hoodie_record_key` string, `_hoodie_partition_path` string, `_hoodie_file_name` string, `event_id` string, `event_time` string, `event_name` string, `event_guests` int) PARTITIONED BY ( `event_type` string) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT 'org.apache.hudi.hadoop.HoodieParquetInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' LOCATION 's3://bucket/folder/partition_mor'

The following ALTER TABLE ADD PARTITION example adds two partitions to the example partition_mor table.

ALTER TABLE partition_mor ADD PARTITION (event_type = 'one') LOCATION 's3://bucket/folder/partition_mor/one/' PARTITION (event_type = 'two') LOCATION 's3://bucket/folder/partition_mor/two/'