Apache Hudi and Lake Formation
Amazon EMR releases 6.15.0 and higher include support for fine-grained access control based on AWS Lake Formation with Apache Hudi when you read and write data with Spark SQL. Amazon EMR supports table, row, column, and cell-level access control with Apache Hudi. With this feature, you can run snapshot queries on copy-on-write tables to query the latest snapshot of the table at a given commit or compaction instant.
Currently, a Lake Formation-enabled Amazon EMR cluster must retrieve Hudi's commit time column to perform
incremental queries and time travel queries. It doesn't support Spark's
timestamp as of
syntax and the Spark.read()
function. The correct syntax is
select * from table where _hoodie_commit_time <= point_in_time
. For more
information, see Point in time Time-Travel queries on Hudi table
The following support matrix lists some core features of Apache Hudi with Lake Formation:
Copy on Write | Merge on Read | |
---|---|---|
Snapshot queries - Spark SQL |
✓ |
✓ |
Read-optimized queries - Spark SQL |
✓ |
✓ |
Incremental queries |
✓ |
✓ |
Time travel queries |
✓ |
✓ |
Metadata tables |
✓ |
✓ |
DML |
✓ |
✓ |
DDL commands |
||
Spark datasource queries |
||
Spark datasource writes |
Querying Hudi tables
This section shows how you can run the supported queries described above on a Lake Formation enabled cluster. The table should be a registered catalog table.
-
To start the Spark shell, use the following commands.
spark-sql --jars /usr/lib/hudi/hudi-spark-bundle.jar \ --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \ --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog \ --conf spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension,com.amazonaws.emr.recordserver.connector.spark.sql.RecordServerSQLExtension \ --conf spark.sql.catalog.spark_catalog.lf.managed=true
If you want Lake Formation to use record server to manage your Spark catalog, set
spark.sql.catalog.<managed_catalog_name>.lf.managed
to true. -
To query the latest snapshot of copy-on-write tables, use the following commands.
SELECT * FROM
my_hudi_cow_table
spark.read.table("
my_hudi_cow_table
") -
To query the latest compacted data of
MOR
tables, you can query the read-optimized table that is suffixed with_ro
:SELECT * FROM
my_hudi_mor_table
_rospark.read.table("
my_hudi_mor_table
_ro")
Note
The performance of reads on Lake Formation clusters might be slower because of optimizations that are not supported. These features include file listing based on Hudi metadata, and data skipping. We recommend that you test your application performance to ensure that it meets your requirements.