AWS Glue 3.0 及更高版本支持数据湖的 Apache Iceberg 框架。Iceberg 提供了一种高性能的表格式,其工作原理与 SQL 表类似。本主题涵盖了在 Iceberg 表中传输或存储数据时,在 AWS Glue 中使用数据的可用功能。要了解有关 Iceberg 的更多信息,请参阅 Apache Iceberg 官方文档。
您可以使用 AWS Glue 对 Amazon S3 中的 Iceberg 表执行读写操作,也可以使用 AWS Glue 数据目录处理 Iceberg 表。还支持其他操作,包括插入、更新和所有 Spark 查询 Spark 写入。
ALTER TABLE … RENAME TO
不适用于 Apache Iceberg 0.13.1 for AWS Glue 3.0。
下表列出了 AWS Glue 每个版本中包含的 Iceberg 版本。
AWS Glue 版本 |
支持 Iceberg 版本 |
4.0 |
1.0.0 |
3.0 |
0.13.1 |
要了解有关 AWS Glue 支持的数据湖框架的更多信息,请参阅在 AWS Glue ETL 任务中使用数据湖框架。
要启用 Iceberg for AWS Glue,请完成以下任务:
-
指定 iceberg
作为 --datalake-formats
作业参数的值。有关更多信息,请参阅AWS Glue 作业参数。
-
--conf
为 Glue 作业创建一个名为 AWS 的密钥,并将其设置为以下值。或者,您可以在脚本中使用 SparkConf
设置以下配置。这些设置有助于 Apache Spark 正确处理 Iceberg 表。
spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
--conf spark.sql.catalog.glue_catalog=org.apache.iceberg.spark.SparkCatalog
--conf spark.sql.catalog.glue_catalog.warehouse=s3://<your-warehouse-dir
>/
--conf spark.sql.catalog.glue_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog
--conf spark.sql.catalog.glue_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO
如果您将 AWS Glue 3.0 与 Iceberg 0.13.1 一起使用,则必须设置以下附加配置才能使用 Amazon DynamoDB 锁定管理器来确保原子交易。AWSGlue 4.0 默认使用乐观锁。有关更多信息,请参阅 Apache Iceberg 官方文档中的 Iceberg AWS 集成。
--conf spark.sql.catalog.glue_catalog.lock-impl=org.apache.iceberg.aws.glue.DynamoLockManager
--conf spark.sql.catalog.glue_catalog.lock.table=<your-dynamodb-table-name>
使用不同的 Iceberg 版本
要使用 AWS Glue 不支持的 Iceberg 版本,请使用 --extra-jars
作业参数指定您自己的 Iceberg JAR 文件。请勿包含 iceberg
作为 --datalake-formats
参数的值。
为 Iceberg 表启用加密
Iceberg 表有自己的用于启用服务器端加密的机制。除了 AWS Glue 的安全配置外,您还应该启用此配置。
要在 Iceberg 表上启用服务器端加密,请查看 Iceberg 文档中的指南。
此示例脚本演示了如何将 Iceberg 表写入 Amazon S3。该示例使用 IcebergAWS 集成将表注册到 AWS Glue 数据目录。
- Python
-
# Example: Create an Iceberg table from a DataFrame
# and register the table to Glue Data Catalog
dataFrame.createOrReplaceTempView("tmp_<your_table_name>
")
query = f"""
CREATE TABLE glue_catalog.<your_database_name>
.<your_table_name>
USING iceberg
TBLPROPERTIES ("format-version"="2")
AS SELECT * FROM tmp_<your_table_name>
"""
spark.sql(query)
- Scala
-
// Example: Example: Create an Iceberg table from a DataFrame
// and register the table to Glue Data Catalog
dataFrame.createOrReplaceTempView("tmp_<your_table_name>
")
val query = """CREATE TABLE glue_catalog.<your_database_name>
.<your_table_name>
USING iceberg
TBLPROPERTIES ("format-version"="2")
AS SELECT * FROM tmp_<your_table_name>
"""
spark.sql(query)
或者,您可以使用 Spark 方法将 Iceberg 表写入 Amazon S3 和 Data Catalog。
先决条件:您需要预置目录以供 Iceberg 库使用。使用 AWS Glue Data Catalog 时,AWS Glue 让这一切变得简单明了。AWS Glue Data Catalog 已预先配置为供 Spark 库作为 glue_catalog
使用。Data Catalog 表由 databaseName
和 tableName
标识。有关 AWS Glue Data Catalog 的更多信息,请参阅 AWS Glue 中的数据目录和爬网程序。
如果您不使用 AWS Glue Data Catalog ,则需要通过 Spark API 配置目录。有关更多信息,请参阅 Iceberg 文档中的 Spark Configuration。
此示例使用 Spark 从将 Iceberg 表写入 Amazon S3 和 Data Catalog 中。
- Python
-
# Example: Write an Iceberg table to S3 on the Glue Data Catalog
# Create (equivalent to CREATE TABLE AS SELECT)
dataFrame.writeTo("glue_catalog.databaseName
.tableName
") \
.tableProperty("format-version", "2") \
.create()
# Append (equivalent to INSERT INTO)
dataFrame.writeTo("glue_catalog.databaseName
.tableName
") \
.tableProperty("format-version", "2") \
.append()
- Scala
-
// Example: Write an Iceberg table to S3 on the Glue Data Catalog
// Create (equivalent to CREATE TABLE AS SELECT)
dataFrame.writeTo("glue_catalog.databaseName
.tableName
")
.tableProperty("format-version", "2")
.create()
// Append (equivalent to INSERT INTO)
dataFrame.writeTo("glue_catalog.databaseName
.tableName
")
.tableProperty("format-version", "2")
.append()
此示例读取您在 示例:将 Iceberg 表写入 Amazon S3 并将其注册到 AWS Glue 数据目录 中创建的 Iceberg 表。
- Python
-
在本示例中,使用 GlueContext.create_data_frame.from_catalog()
方法。
# Example: Read an Iceberg table from Glue Data Catalog
from awsglue.context import GlueContext
from pyspark.context import SparkContext
sc = SparkContext()
glueContext = GlueContext(sc)
df = glueContext.create_data_frame.from_catalog(
database="<your_database_name>
",
table_name="<your_table_name>
",
additional_options=additional_options
)
- Scala
-
在本示例中,使用 getCatalogSource 方法。
// Example: Read an Iceberg table from Glue Data Catalog
import com.amazonaws.services.glue.GlueContext
import org.apacke.spark.SparkContext
object GlueApp {
def main(sysArgs: Array[String]): Unit = {
val spark: SparkContext = new SparkContext()
val glueContext: GlueContext = new GlueContext(spark)
val df = glueContext.getCatalogSource("<your_database_name>
", "<your_table_name>
",
additionalOptions = additionalOptions)
.getDataFrame()
}
}
此示例将数据插入您在 示例:将 Iceberg 表写入 Amazon S3 并将其注册到 AWS Glue 数据目录 中创建的 Iceberg 表。
此示例要求您设置 --enable-glue-datacatalog
任务参数,才能将 AWS Glue Data Catalog 用作 Apache Spark Hive 元存储。要了解更多信息,请参阅 AWS Glue 作业参数。
- Python
-
在本示例中,使用 GlueContext.write_data_frame.from_catalog()
方法。
# Example: Insert into an Iceberg table from Glue Data Catalog
from awsglue.context import GlueContext
from pyspark.context import SparkContext
sc = SparkContext()
glueContext = GlueContext(sc)
glueContext.write_data_frame.from_catalog(
frame=dataFrame,
database="<your_database_name>
",
table_name="<your_table_name>
",
additional_options=additional_options
)
- Scala
-
在本示例中,使用 getCatalogSink 方法。
// Example: Insert into an Iceberg table from Glue Data Catalog
import com.amazonaws.services.glue.GlueContext
import org.apacke.spark.SparkContext
object GlueApp {
def main(sysArgs: Array[String]): Unit = {
val spark: SparkContext = new SparkContext()
val glueContext: GlueContext = new GlueContext(spark)
glueContext.getCatalogSink("<your_database_name>
", "<your_table_name>
",
additionalOptions = additionalOptions)
.writeDataFrame(dataFrame, glueContext)
}
}
先决条件:您需要预置目录以供 Iceberg 库使用。使用 AWS Glue Data Catalog 时,AWS Glue 让这一切变得简单明了。AWS Glue Data Catalog 已预先配置为供 Spark 库作为 glue_catalog
使用。Data Catalog 表由 databaseName
和 tableName
标识。有关 AWS Glue Data Catalog 的更多信息,请参阅 AWS Glue 中的数据目录和爬网程序。
如果您不使用 AWS Glue Data Catalog ,则需要通过 Spark API 配置目录。有关更多信息,请参阅 Iceberg 文档中的 Spark Configuration。
此示例使用 Spark 从 Data Catalog 读取 Amazon S3 中的 Iceberg 表。
- Python
-
# Example: Read an Iceberg table on S3 as a DataFrame from the Glue Data Catalog
dataFrame = spark.read.format("iceberg").load("glue_catalog.databaseName
.tableName
")
- Scala
-
// Example: Read an Iceberg table on S3 as a DataFrame from the Glue Data Catalog
val dataFrame = spark.read.format("iceberg").load("glue_catalog.databaseName
.tableName
")