在 AWS Glue 中使用 Delta Lake 框架 - AWS Glue

在 AWS Glue 中使用 Delta Lake 框架

AWS Glue 3.0 及更高版本支持 Linux Foundation Delta Lake 框架。Delta Lake 是一个开源数据湖存储框架,可帮助您执行 ACID 交易、扩展元数据处理以及统一流式和批处理数据处理。本主题涵盖了在 Delta Lake 表中传输或存储数据时,在 AWS Glue 中使用数据的可用功能。要了解有关 Delta Lake 的更多信息,请参阅 Delta Lake 官方文档

您可以使用 AWS Glue 对 Amazon S3 中的 Delta Lake 表执行读写操作,也可以使用 AWS Glue 数据目录处理 Delta Lake 表。还支持插入、更新和表批量读取和写入等其他操作。使用 Delta Lake 表时,也可以选择使用 Delta Lake Python 库中的方法,例如 DeltaTable.forPath。有关 Delta Lake Python 库的更多信息,请参阅欢迎使用 Delta Lake 的 Python 文档页面。

下表列出了 AWS Glue 每个版本中包含的 Delta Lake 版本。

AWS Glue 版本 支持的 Delta Lake 版本
4.0 2.1.0
3.0 1.0.0

要了解有关 AWS Glue 支持的数据湖框架的更多信息,请参阅在 AWS Glue ETL 任务中使用数据湖框架

为 AWS Glue 启用 Delta Lake

要为 AWS Glue 启用 Delta Lake,请完成以下任务:

  • 指定 delta 作为 --datalake-formats 作业参数的值。有关更多信息,请参阅AWS Glue 作业参数

  • --conf 为 Glue 作业创建一个名为 AWS 的密钥,并将其设置为以下值。或者,您可以在脚本中使用 SparkConf 设置以下配置。这些设置有助于 Apache Spark 正确处理 Delta Lake 表。

    spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog

使用不同的 Delta Lake 版本

要使用 AWS Glue 不支持的 Delta Lake 版本,请使用 --extra-jars 作业参数指定您自己的 Delta Lake JAR 文件。请勿包含 delta 作为 --datalake-formats 作业参数的值。要在这种情况下使用 Delta Lake Python 库,必须使用 --extra-py-files 作业参数指定库 JAR 文件。Python 库打包在 Delta Lake JAR 文件中。

示例:将 Delta Lake 表写入 Amazon S3,并将其注册到 AWS Glue 数据目录

以下 AWS Glue ETL 脚本演示了如何将 Delta Lake 表写入 Amazon S3,并将该表注册到 AWS Glue 数据目录。

Python
# Example: Create a Delta Lake table from a DataFrame # and register the table to Glue Data Catalog additional_options = { "path": "s3://<s3Path>" } dataFrame.write \ .format("delta") \ .options(**additional_options) \ .mode("append") \ .partitionBy("<your_partitionkey_field>") \ .saveAsTable("<your_database_name>.<your_table_name>")
Scala
// Example: Example: Create a Delta Lake table from a DataFrame // and register the table to Glue Data Catalog val additional_options = Map( "path" -> "s3://<s3Path>" ) dataFrame.write.format("delta") .options(additional_options) .mode("append") .partitionBy("<your_partitionkey_field>") .saveAsTable("<your_database_name>.<your_table_name>")

示例:使用 AWS Glue 数据目录从 Amazon S3 读取 Delta Lake 表

以下 AWS Glue ETL 脚本读取您在 示例:将 Delta Lake 表写入 Amazon S3,并将其注册到 AWS Glue 数据目录 中创建的 Delta Lake 表。

Python

在本示例中,使用 create_data_frame.from_catalog 方法。

# Example: Read a Delta Lake table from Glue Data Catalog from awsglue.context import GlueContext from pyspark.context import SparkContext sc = SparkContext() glueContext = GlueContext(sc) df = glueContext.create_data_frame.from_catalog( database="<your_database_name>", table_name="<your_table_name>", additional_options=additional_options )
Scala

在本示例中,使用 getCatalogSource 方法。

// Example: Read a Delta Lake table from Glue Data Catalog import com.amazonaws.services.glue.GlueContext import org.apacke.spark.SparkContext object GlueApp { def main(sysArgs: Array[String]): Unit = { val spark: SparkContext = new SparkContext() val glueContext: GlueContext = new GlueContext(spark) val df = glueContext.getCatalogSource("<your_database_name>", "<your_table_name>", additionalOptions = additionalOptions) .getDataFrame() } }

示例:使用 AWS Glue 数据目录在 Amazon S3 中将 DataFrame 插入 Delta Lake 表

此示例将数据插入您在 示例:将 Delta Lake 表写入 Amazon S3,并将其注册到 AWS Glue 数据目录 中创建的 Delta Lake 表。

注意

此示例要求您设置 --enable-glue-datacatalog 任务参数,才能将 AWS Glue Data Catalog 用作 Apache Spark Hive 元存储。要了解更多信息,请参阅 AWS Glue 作业参数

Python

在本示例中,使用 write_data_frame.from_catalog 方法。

# Example: Insert into a Delta Lake table in S3 using Glue Data Catalog from awsglue.context import GlueContext from pyspark.context import SparkContext sc = SparkContext() glueContext = GlueContext(sc) glueContext.write_data_frame.from_catalog( frame=dataFrame, database="<your_database_name>", table_name="<your_table_name>", additional_options=additional_options )
Scala

在本示例中,使用 getCatalogSink 方法。

// Example: Insert into a Delta Lake table in S3 using Glue Data Catalog import com.amazonaws.services.glue.GlueContext import org.apacke.spark.SparkContext object GlueApp { def main(sysArgs: Array[String]): Unit = { val spark: SparkContext = new SparkContext() val glueContext: GlueContext = new GlueContext(spark) glueContext.getCatalogSink("<your_database_name>", "<your_table_name>", additionalOptions = additionalOptions) .writeDataFrame(dataFrame, glueContext) } }

示例:使用 Spark API 从 Amazon S3 读取 Delta Lake 表

此示例使用 Spark API 从 Amazon S3 读取 Delta Lake 表。

Python
# Example: Read a Delta Lake table from S3 using a Spark DataFrame dataFrame = spark.read.format("delta").load("s3://<s3path/>")
Scala
// Example: Read a Delta Lake table from S3 using a Spark DataFrame val dataFrame = spark.read.format("delta").load("s3://<s3path/>")

示例:使用 Spark 向 Amazon S3 写入 Delta Lake 表

此示例使用 Spark 向 Amazon S3 写入 Delta Lake 表。

Python
# Example: Write a Delta Lake table to S3 using a Spark DataFrame dataFrame.write.format("delta") \ .options(**additional_options) \ .mode("overwrite") \ .partitionBy("<your_partitionkey_field>") .save("s3://<s3Path>")
Scala
// Example: Write a Delta Lake table to S3 using a Spark DataFrame dataFrame.write.format("delta") .options(additionalOptions) .mode("overwrite") .partitionBy("<your_partitionkey_field>") .save("s3://<s3path/>")