在 Amazon EMR 中使用 Apache Iceberg - AWS 规范性指导

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

在 Amazon EMR 中使用 Apache Iceberg

亚马逊 EMR 使用 Apache Spark、Apache Hive、Flink 和 Trino 等开源框架,在云端提供 PB 级的数据处理、交互式分析和机器学习。

注意

本指南使用 Apache Spark 作为示例。

亚马逊 EMR 支持多种部署选项:亚马逊 EC2 上的亚马逊 EMR、亚马逊 EKS 上的亚马逊 EMR、亚马逊 EMR Serverless 和亚马逊 EMR 开启的亚马逊 EMR。 AWS Outposts要为您的工作负载选择部署选项,请参阅 Amazon EMR 常见问题解答。

版本和功能兼容性

亚马逊 EMR 版本 6.5.0 及更高版本原生支持 Apache Iceberg。有关每个亚马逊 EMR 版本支持的 Iceberg 版本列表,请参阅亚马逊 EMR 文档中的 Iceberg 版本历史记录。 另请查看在 Amazon EMR 上使用 Iceberg 的注意事项和限制,以了解不同框架上的 Amazon EMR 支持哪些 Iceberg 功能。

我们建议您使用最新的 Amazon EMR 版本,以便从支持的最新版本的 Iceberg 中受益。本节中的代码示例和配置假设您使用的是 Amazon EMR 版本 emr-6.9.0。

使用 Iceberg 创建 Amazon EMR 集群

要在安装了 Iceberg 的亚马逊 EC2 上创建亚马逊 EMR 集群,请按照亚马逊 EM R 文档中的说明进行操作。 

具体而言,您的集群应按以下分类进行配置:

[{ "Classification": "iceberg-defaults", "Properties": { "iceberg.enabled": "true" } }]

从亚马逊 EMR 6.6.0 开始,您还可以选择使用亚马逊 EMR Serverless 或 Amazon EKS 上的亚马逊 EMR 作为 Iceberg 工作负载的部署选项。

在 Amazon EMR 中开发 Iceberg 应用程序

要为 Iceberg 应用程序开发 Spark 代码,您可以使用亚马逊 EMR Studio,这是一个基于 Web 的集成开发环境 (IDE),适用于在亚马逊 EMR 集群上运行的完全托管 Jupyter 笔记本电脑。 

使用亚马逊 EMR Studio 笔记本电脑

您可以在 Amazon EMR Studio Workspace 笔记本电脑中以交互方式开发 Spark 应用程序,并将这些笔记本电脑连接到亚马逊 EC2 集群上的 Amazon EMR 或亚马逊 EKS 托管终端节点上的亚马逊 EMR。有关在亚马逊 EC2 上为亚马逊 EMR 设置 EMR 和在亚马逊 EKS 上为亚马逊 EMR 设置 EMR 的说明,请参阅 AWS 服务 文档。

要在 EMR Studio 中使用 Iceberg,请按照以下步骤操作: 

  1. 按照使用安装了 Iceberg 的集群中的说明,启动启用了 Iceberg 的 Amazon EMR 集群。 

  2. 设置 EMR 工作室。有关说明,请参阅设置 Amazon EMR Studio。

  3. 打开 EMR Studio Workspace 笔记本并运行以下代码作为笔记本中的第一个单元来配置 Spark 会话以使用 Iceberg:

    %%configure -f { "conf": { "spark.sql.catalog.<catalog_name>": "org.apache.iceberg.spark.SparkCatalog", "spark.sql.catalog.<catalog_name>.warehouse": "s3://YOUR-BUCKET-NAME/YOUR-FOLDER-NAME/", "spark.sql.catalog.<catalog_name>.catalog-impl": "org.apache.iceberg.aws.glue.GlueCatalog", "spark.sql.catalog.<catalog_name>.io-impl": "org.apache.iceberg.aws.s3.S3FileIO", "spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" } }

    其中:

    • <catalog_name>是你的 Iceberg Spark 会话目录名称。将其替换为目录名称,并记得更改与该目录关联的所有配置中的参考文献。然后,在代码中,您应该使用完全限定的表名(包括 Spark 会话目录名称)来引用 Iceberg 表,如下所示:

      <catalog_name>.<database_name>.<table_name>
    • <catalog_name>.warehouse指向您要存储数据和元数据的 Amazon S3 路径。

    • 要使目录成为 AWS Glue Data Catalog,请将设置<catalog_name>.catalog-impl org.apache.iceberg.aws.glue.GlueCatalog。对于任何自定义目录实现,都需要使用此密钥来指向实现类。本指南后面的 “一般最佳实践” 部分描述了 Iceberg 支持的不同目录。

    • 使用 org.apache.iceberg.aws.s3.S3FileIO as 是<catalog_name>.io-impl为了利用 Amazon S3 分段上传实现高并行度。

  4. 现在,你可以像开发任何其他 Spark 应用程序一样,开始在笔记本中交互式开发 Iceberg 的 Spark 应用程序。

有关使用亚马逊 EMR Studio 为 Apache Iceberg 配置 Spark 的更多信息,请参阅博客文章 “在亚马逊 EMR 上使用 Apache Iceberg 构建高性能、符合 ACID 且不断演变的数据湖”。 

在 Amazon EMR 中运行 Iceberg 职位

为你的 Iceberg 工作负载开发 Spark 应用程序代码后,你可以在任何支持 Iceberg 的 Amazon EMR 部署选项上运行它(参见 A mazon EM R 常见问题解答)。

与其他 Spark 任务一样,您可以通过添加步骤或以交互方式向主节点提交 Spark 任务来向 Amazon EC2 集群上的 Amazon EMR 提交工作。要运行 Spark 作业,请参阅以下 Amazon EMR 文档页面:

以下各节提供了每个 Amazon EMR 部署选项的示例。

亚马逊 EC2 上的亚马逊 EMR

您可以使用以下步骤提交 Iceberg Spark 作业:

  1. 在您的工作站上创建包含以下内容的文件emr_step_iceberg.json

    [{ "Name": "iceberg-test-job", "Type": "spark", "ActionOnFailure": "CONTINUE", "Args": [ "--deploy-mode", "client", "--conf", "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions", "--conf", "spark.sql.catalog.<catalog_name>=org.apache.iceberg.spark.SparkCatalog", "--conf", "spark.sql.catalog.<catalog_name>.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog", "--conf", "spark.sql.catalog.<catalog_name>.warehouse=s3://YOUR-BUCKET-NAME/YOUR-FOLDER-NAME/", "--conf", "spark.sql.catalog.<catalog_name>.io-impl=org.apache.iceberg.aws.s3.S3FileIO", "s3://YOUR-BUCKET-NAME/code/iceberg-job.py" ] }]
  2. 通过自定义以粗体突出显示的 Iceberg 配置选项,修改特定 Spark 作业的配置文件。

  3. 使用 AWS Command Line Interface (AWS CLI) 提交步骤。在emr_step_iceberg.json文件所在的目录中运行该命令。

    aws emr add-steps ‐‐cluster-id <cluster_id> ‐‐steps file://emr_step_iceberg.json

Amazon EMR Serverless

要向 Amazon EMR Serverless 提交 Iceberg Spark 任务,请使用以下命令: AWS CLI

  1. 在您的工作站上创建包含以下内容的文件emr_serverless_iceberg.json

    { "applicationId": "<APPLICATION_ID>", "executionRoleArn": "<ROLE_ARN>", "jobDriver": { "sparkSubmit": { "entryPoint": "s3://YOUR-BUCKET-NAME/code/iceberg-job.py", "entryPointArguments": [], "sparkSubmitParameters": "--jars /usr/share/aws/iceberg/lib/iceberg-spark3-runtime.jar" } }, "configurationOverrides": { "applicationConfiguration": [{ "classification": "spark-defaults", "properties": { "spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions", "spark.sql.catalog.<catalog_name>": "org.apache.iceberg.spark.SparkCatalog", "spark.sql.catalog.<catalog_name>.catalog-impl": "org.apache.iceberg.aws.glue.GlueCatalog", "spark.sql.catalog.<catalog_name>.warehouse": "s3://YOUR-BUCKET-NAME/YOUR-FOLDER-NAME/", "spark.sql.catalog.<catalog_name>.io-impl": "org.apache.iceberg.aws.s3.S3FileIO", "spark.jars":"/usr/share/aws/iceberg/lib/iceberg-spark3-runtime.jar", "spark.hadoop.hive.metastore.client.factory.class":"com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory" } }], "monitoringConfiguration": { "s3MonitoringConfiguration": { "logUri": "s3://YOUR-BUCKET-NAME/emr-serverless/logs/" } } } }
  2. 通过自定义以粗体突出显示的 Iceberg 配置选项,修改特定 Spark 作业的配置文件。

  3. 使用提交作业 AWS CLI。在emr_serverless_iceberg.json文件所在的目录中运行命令:

    aws emr-serverless start-job-run ‐‐cli-input-json file://emr_serverless_iceberg.json

要使用 EMR Studio 控制台向亚马逊 EMR Serverless 提交 Iceberg Spark 任务,请执行以下操作:

  1. 按照 A mazon EMR 无服务器文档中的说明进行操作。

  2. 配置 Job,请使用为提供的 Spark 的 Iceberg 配置, AWS CLI 并自定义 Iceberg 的突出显示字段。有关详细说明,请参阅亚马逊 EMR 文档中的将 Apache Iceberg 与 EMR Serverles s 配合使用。

亚马逊 EMR 上的亚马逊 EMR EKS

要在亚马逊 EKS 上向亚马逊 EMR 提交 Iceberg Spark 作业,请使用: AWS CLI

  1. 在您的工作站上创建包含以下内容的文件emr_eks_iceberg.json

    { "name": "iceberg-test-job", "virtualClusterId": "<VIRTUAL_CLUSTER_ID>", "executionRoleArn": "<ROLE_ARN>", "releaseLabel": "emr-6.9.0-latest", "jobDriver": { "sparkSubmitJobDriver": { "entryPoint": "s3://YOUR-BUCKET-NAME/code/iceberg-job.py", "entryPointArguments": [], "sparkSubmitParameters": "--jars local:///usr/share/aws/iceberg/lib/iceberg-spark3-runtime.jar" } }, "configurationOverrides": { "applicationConfiguration": [{ "classification": "spark-defaults", "properties": { "spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions", "spark.sql.catalog.<catalog_name>": "org.apache.iceberg.spark.SparkCatalog", "spark.sql.catalog.<catalog_name>.catalog-impl": "org.apache.iceberg.aws.glue.GlueCatalog", "spark.sql.catalog.<catalog_name>.warehouse": "s3://YOUR-BUCKET-NAME/YOUR-FOLDER-NAME/", "spark.sql.catalog.<catalog_name>.io-impl": "org.apache.iceberg.aws.s3.S3FileIO", "spark.hadoop.hive.metastore.client.factory.class": "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory" } }], "monitoringConfiguration": { "persistentAppUI": "ENABLED", "s3MonitoringConfiguration": { "logUri": "s3://YOUR-BUCKET-NAME/emr-serverless/logs/" } } } }
  2. 通过自定义以粗体突出显示的 Iceberg 配置选项,修改 Spark 作业的配置文件。

  3. 使用提交作业 AWS CLI。在emr_eks_iceberg.json文件所在的目录中运行以下命令:

    aws emr-containers start-job-run ‐‐cli-input-json file://emr_eks_iceberg.json

有关详细说明,请参阅 EKS 上的亚马逊 EMR 文档中的 EKS 上将 Apache Iceberg 与亚马逊 EMR 搭配使用。 

亚马逊 EMR 的最佳实践

本节提供了在 Amazon EMR 中调整 Spark 作业以优化向 Iceberg 表读取和写入数据的一般指南。有关 Iceberg 特定的最佳实践,请参阅本指南后面的 “最佳实践” 部分。

  • 使用最新版本的亚马逊 EMR — 亚马逊 EMR 通过亚马逊 EMR Spark 运行时提供开箱即用的 Spark 优化。 AWS 每个新版本都会提高 Spark 运行时引擎的性能。

  • 为您的 Spark 工作负载确定最佳基础架构 — Spark 工作负载可能需要不同类型的硬件来满足不同的作业特征,以确保最佳性能。Amazon EMR 支持多种实例类型(例如计算优化、内存优化、通用型和存储优化),以满足所有类型的处理要求。在载入新工作负载时,我们建议您使用常规实例类型(例如 M5 或 m6g)进行基准测试。监控 CloudWatch 来自 Ganglia 和 Amazon 的操作系统 (OS) 和 YARN 指标,确定峰值负载下的系统瓶颈(CPU、内存、存储和 I/O),然后选择合适的硬件。

  • 调整 spark.sql.shuffle.partitions — 将该spark.sql.shuffle.partitions属性设置为集群中的虚拟内核 (vCore) 总数或该值的倍数(通常为 vCore 总数的 1 到 2 倍)。当您使用哈希和范围分区作为写入分配模式时,此设置会影响 Spark 的并行度。它在写入之前请求洗牌以整理数据,从而确保分区对齐。

  • 启用托管扩展-对于几乎所有用例,我们建议您启用托管扩展和动态分配。但是,如果您的工作负载具有可预测的模式,我们建议您禁用自动扩展和动态分配。启用托管扩展后,我们建议您使用竞价型实例来降低成本。将竞价型实例用于任务节点,而不是核心或主节点。使用竞价型实例时,请使用每个队列具有多种实例类型的实例队列,以确保竞价型可用性。

  • 尽可能使用广播联接 — 广播(地图侧)联接是最优联接,前提是您的一个表足够小,足以容纳最小节点(以 MB 为序)的内存,并且您正在执行 equi (=) 联接。支持除完全外部联接之外的所有联接类型。广播联接将较小的表作为哈希表广播到内存中的所有工作节点。广播小桌子后,您无法对其进行更改。由于哈希表位于本地 Java 虚拟机 (JVM) 中,因此可以使用哈希联接根据联接条件轻松地将其与大型表合并。广播联接可提供高性能,因为随机播放开销最小。

  • 调整垃圾收集器 — 如果垃圾收集 (GC) 周期很慢,可以考虑从默认的并行垃圾收集器切换到 G1GC 以获得更好的性能。要优化 GC 性能,可以微调 GC 参数。要跟踪 GC 性能,您可以使用 Spark 用户界面对其进行监控。理想情况下,GC 时间应小于或等于任务总运行时间的 1%。