AWS Glue 3.0 及更高版本支持 Linux Foundation Delta Lake 框架。Delta Lake 是一个开源数据湖存储框架,可帮助您执行 ACID 交易、扩展元数据处理以及统一流式和批处理数据处理。本主题涵盖了在 Delta Lake 表中传输或存储数据时,在 AWS Glue 中使用数据的可用功能。要了解有关 Delta Lake 的更多信息,请参阅 Delta Lake 官方文档。
您可以使用 AWS Glue 对 Amazon S3 中的 Delta Lake 表执行读写操作,也可以使用 AWS Glue 数据目录处理 Delta Lake 表。还支持插入、更新和表批量读取和写入等其他操作。使用 Delta Lake 表时,也可以选择使用 Delta Lake Python 库中的方法,例如 DeltaTable.forPath
。有关 Delta Lake Python 库的更多信息,请参阅欢迎使用 Delta Lake 的 Python 文档页面。
下表列出了 AWS Glue 每个版本中包含的 Delta Lake 版本。
AWS Glue 版本 |
支持的 Delta Lake 版本 |
4.0 |
2.1.0 |
3.0 |
1.0.0 |
要了解有关 AWS Glue 支持的数据湖框架的更多信息,请参阅在 AWS Glue ETL 任务中使用数据湖框架。
要为 AWS Glue 启用 Delta Lake,请完成以下任务:
-
指定 delta
作为 --datalake-formats
作业参数的值。有关更多信息,请参阅AWS Glue 作业参数。
-
--conf
为 Glue 作业创建一个名为 AWS 的密钥,并将其设置为以下值。或者,您可以在脚本中使用 SparkConf
设置以下配置。这些设置有助于 Apache Spark 正确处理 Delta Lake 表。
spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog
使用不同的 Delta Lake 版本
要使用 AWS Glue 不支持的 Delta Lake 版本,请使用 --extra-jars
作业参数指定您自己的 Delta Lake JAR 文件。请勿包含 delta
作为 --datalake-formats
作业参数的值。要在这种情况下使用 Delta Lake Python 库,必须使用 --extra-py-files
作业参数指定库 JAR 文件。Python 库打包在 Delta Lake JAR 文件中。
以下 AWS Glue ETL 脚本演示了如何将 Delta Lake 表写入 Amazon S3,并将该表注册到 AWS Glue 数据目录。
- Python
-
# Example: Create a Delta Lake table from a DataFrame
# and register the table to Glue Data Catalog
additional_options = {
"path": "s3://<s3Path>
"
}
dataFrame.write \
.format("delta") \
.options(**additional_options) \
.mode("append") \
.partitionBy("<your_partitionkey_field>
") \
.saveAsTable("<your_database_name>
.<your_table_name>
")
- Scala
-
// Example: Example: Create a Delta Lake table from a DataFrame
// and register the table to Glue Data Catalog
val additional_options = Map(
"path" -> "s3://<s3Path>
"
)
dataFrame.write.format("delta")
.options(additional_options)
.mode("append")
.partitionBy("<your_partitionkey_field>
")
.saveAsTable("<your_database_name>
.<your_table_name>
")
以下 AWS Glue ETL 脚本读取您在 示例:将 Delta Lake 表写入 Amazon S3,并将其注册到 AWS Glue 数据目录 中创建的 Delta Lake 表。
- Python
-
在本示例中,使用 create_data_frame.from_catalog 方法。
# Example: Read a Delta Lake table from Glue Data Catalog
from awsglue.context import GlueContext
from pyspark.context import SparkContext
sc = SparkContext()
glueContext = GlueContext(sc)
df = glueContext.create_data_frame.from_catalog(
database="<your_database_name>
",
table_name="<your_table_name>
",
additional_options=additional_options
)
- Scala
-
在本示例中,使用 getCatalogSource 方法。
// Example: Read a Delta Lake table from Glue Data Catalog
import com.amazonaws.services.glue.GlueContext
import org.apacke.spark.SparkContext
object GlueApp {
def main(sysArgs: Array[String]): Unit = {
val spark: SparkContext = new SparkContext()
val glueContext: GlueContext = new GlueContext(spark)
val df = glueContext.getCatalogSource("<your_database_name>
", "<your_table_name>
",
additionalOptions = additionalOptions)
.getDataFrame()
}
}
此示例将数据插入您在 示例:将 Delta Lake 表写入 Amazon S3,并将其注册到 AWS Glue 数据目录 中创建的 Delta Lake 表。
此示例要求您设置 --enable-glue-datacatalog
任务参数,才能将 AWS Glue Data Catalog 用作 Apache Spark Hive 元存储。要了解更多信息,请参阅 AWS Glue 作业参数。
- Python
-
在本示例中,使用 write_data_frame.from_catalog 方法。
# Example: Insert into a Delta Lake table in S3 using Glue Data Catalog
from awsglue.context import GlueContext
from pyspark.context import SparkContext
sc = SparkContext()
glueContext = GlueContext(sc)
glueContext.write_data_frame.from_catalog(
frame=dataFrame,
database="<your_database_name>
",
table_name="<your_table_name>
",
additional_options=additional_options
)
- Scala
-
在本示例中,使用 getCatalogSink 方法。
// Example: Insert into a Delta Lake table in S3 using Glue Data Catalog
import com.amazonaws.services.glue.GlueContext
import org.apacke.spark.SparkContext
object GlueApp {
def main(sysArgs: Array[String]): Unit = {
val spark: SparkContext = new SparkContext()
val glueContext: GlueContext = new GlueContext(spark)
glueContext.getCatalogSink("<your_database_name>
", "<your_table_name>
",
additionalOptions = additionalOptions)
.writeDataFrame(dataFrame, glueContext)
}
}
此示例使用 Spark API 从 Amazon S3 读取 Delta Lake 表。
- Python
-
# Example: Read a Delta Lake table from S3 using a Spark DataFrame
dataFrame = spark.read.format("delta").load("s3://<s3path/>
")
- Scala
-
// Example: Read a Delta Lake table from S3 using a Spark DataFrame
val dataFrame = spark.read.format("delta").load("s3://<s3path/>
")
此示例使用 Spark 向 Amazon S3 写入 Delta Lake 表。
- Python
-
# Example: Write a Delta Lake table to S3 using a Spark DataFrame
dataFrame.write.format("delta") \
.options(**additional_options) \
.mode("overwrite") \
.partitionBy("<your_partitionkey_field>
")
.save("s3://<s3Path>
")
- Scala
-
// Example: Write a Delta Lake table to S3 using a Spark DataFrame
dataFrame.write.format("delta")
.options(additionalOptions)
.mode("overwrite")
.partitionBy("<your_partitionkey_field>
")
.save("s3://<s3path/>
")