在 AWS Glue 中流式处理 ETL 作业 - AWS Glue

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

在 AWS Glue 中流式处理 ETL 作业

您可以创建连续运行的串流提取、转换和负载(ETL)任务,使用来自 Amazon Kinesis Data Streams、Apache Kafka 和 Amazon Managed Streaming for Apache Kafka (Amazon MSK)等串流源的数据。这些任务会清理并转换数据,然后将结果加载到 Amazon S3 数据湖或 JDBC 数据存储中。

此外,您还可以为 Amazon Kinesis Data Streams 流生成数据。此功能仅在编写 AWS Glue 脚本时可用。有关更多信息,请参阅 Kinesis 连接

默认情况下,AWS Glue 在 100 秒的时段内处理和写出数据。这可以实现数据的高效处理,并允许对晚于预计时间到达的数据执行聚合。您可以修改此窗口时段的大小以提高及时性或聚合精度。AWS Glue 串流任务使用检查点而非任务书签来跟踪已读取的数据。

注意

当串流 ETL 任务正在运行时,AWS Glue 会按小时计费。

本视频讨论了流媒体ETL的成本挑战以及节省成本的功能。 AWS Glue

创建串流 ETL 任务涉及以下步骤:

  1. 对于 Apache Kafka 串流源,请创建与 Kafka 源或 Amazon MSK 集群的 AWS Glue 连接。

  2. 手动为串流源创建数据目录表。

  3. 为串流数据源创建 ETL 任务。定义特定于串流的任务属性,并提供您自己的脚本或(可选)修改生成的脚本。

有关更多信息,请参阅 AWS Glue 中的流式处理 ETL

为 Amazon Kinesis Data Streams 创建串流 ETL 任务时,您无需创建 AWS Glue 连接。但是,如果有一个连接附加到以 Kinesis Data Streams 作为源的 AWS Glue 串流 ETL 任务,则需要提供到 Kinesis 的 Virtual Private Cloud(VPC)终端节点。有关更多信息,请参阅 Amazon VPC 用户指南中的创建接口端点 在另一个账户中指定 Amazon Kinesis Data Streams 串流时,您必须设置角色和策略从而允许跨账户访问。有关更多信息,请参阅示例:从不同账户的 Kinesis 串流中读取

AWS Glue 流式传输 ETL 任务可以自动检测压缩数据,以透明方式解压流式传输数据,对输入源执行常见转换,并加载到输出存储。

如果是以下输入格式,则 AWS Glue 支持自动解压以下压缩类型:

压缩类型 Avro 文件 Avro 基准 JSON CSV Grok
BZIP2 支持 支持
GZIP 不支持 支持
SNAPPY 是(原始 Snappy) 是(framed Snappy) 是(framed Snappy) 是(framed Snappy) 是(framed Snappy)
XZ 支持 支持
ZSTD 支持 不支持
DEFLATE 支持 支持

为 Apache Kafka 数据流创建 AWS Glue 连接

要从 Apache Kafka 流中进行读取,您必须创建 AWS Glue 连接。

为 Kafka 源创建 AWS Glue 连接(控制台)
  1. 打开 AWS Glue 控制台,网址为 https://console.aws.amazon.com/glue/

  2. 在导航窗格的 Data catalog (数据目录) 下,选择 Connections (连接)

  3. 选择 Add connection (添加连接),然后在 Set up your connection’s properties (设置连接的属性) 页面上,输入连接名称。

    注意

    有关指定连接属性的更多信息,请参阅 AWS Glue 连接属性。

  4. 对于 Connection type (连接类型),选择 Kafka

  5. 对于 Kafka bootstrap servers URLs (Kafka 引导服务器 URL),输入您 Amazon MSK 集群或 Apache Kafka 集群引导代理的主机和端口编号。仅使用传输层安全性(TLS)端点建立到 Kafka 群集的初始连接。不支持 Plaintext 端点。

    以下是 Amazon MSK 集群的主机名和端口编号对的示例列表。

    myserver1.kafka.us-east-1.amazonaws.com:9094,myserver2.kafka.us-east-1.amazonaws.com:9094, myserver3.kafka.us-east-1.amazonaws.com:9094

    有关引导代理信息的更多信息,请参阅 Amazon Managed Streaming for Apache Kafka 开发人员指南中的获取 Amazon MSK 集群的引导代理

  6. 如果您希望与 Kafka 数据源建立安全连接,请选择 Require SSL connection (需要 SSL 连接),并在 Kafka private CA certificate location (Kafka 私有 CA 证书位置) 中,输入自定义 SSL 证书的有效 Amazon S3 路径。

    对于与自我托管式 Kafka 的 SSL 连接,自定义证书是强制性的。对于 Amazon MSK 则是可选的。

    有关为 Kafka 指定自定义证书的更多信息,请参阅 AWS Glue SSL 连接属性

  7. 使用 AWS Glue Studio 或 AWS CLI 指定 Kafka 客户端身份验证方法。要访问, AWS Glue Studio 请AWS Glue从左侧导航窗格的 ETL 菜单中选择。

    有关 Kafka 客户端身份认证方法的更多信息,请参阅适用于客户端身份认证的 AWS Glue Kafka 连接属性

  8. (可选)输入描述,然后选择 Next (下一步)

  9. 对于 Amazon MSK 集群,请指定其 Virtual Private Cloud(VPC)、子网和安全组。对于自行托管式 Kafka,VPC 信息是可选的。

  10. 选择 Next (下一步) 以查看所有连接属性,然后选择 Finish (结束)

有关 AWS Glue 连接的更多信息,请参阅 连接到数据

适用于客户端身份认证的 AWS Glue Kafka 连接属性

SASL/GSSAPI(Kerberos)身份认证

选择此身份认证方法将允许您指定 Kerberos 属性。

Kerberos Keytab

选择 keytab 文件的位置。keytab 可存储一个或多个主体的长期密钥。有关更多信息,请参阅 MIT Kerberos 文档:keytab

Kerberos krb5.conf 文件

选择 krb5.conf 文件。它包含默认领域(一种类似于域的逻辑网络,用于定义同一 KDC 下的一组系统)和 KDC 服务器的位置。有关更多信息,请参阅 MIT Kerberos 文档:krb5.conf

Kerberos 主体和 Kerberos 服务名称

输入 Kerberos 主体和服务名称 有关更多信息,请参阅 MIT Kerberos 文档:Kerberos 主体

SASL/SCRAM-SHA-512 身份认证

选择此身份认证方法将允许您指定身份认证凭证。

AWS Secrets Manager

在搜索框中键入相应的名称或 ARN 以搜索令牌。

直接提供用户名和密码

在搜索框中键入相应的名称或 ARN 以搜索令牌。

SSL 客户端身份认证

选择此身份认证方法将允许您浏览 Amazon S3 以选择 Kafka 客户端密钥库的位置。或者,您可以输入 Kafka 客户端密钥库密码和 Kafka 客户端密钥密码。

IAM 身份验证

此身份验证方法不需要任何其他规范,仅在流媒体源为 MSK Kafka 时适用。

SASL/普通身份验证

选择此身份验证方法允许您指定身份验证凭据。

为串流源创建数据目录表

您可以为流式传输源手动创建数据目录表,以指定源数据流属性(包括数据 Schema)。此表用作串流 ETL 任务的数据源。

如果您不知道源数据流中数据的架构,则可以在不使用架构的情况下创建表。然后,当您创建串流 ETL 任务时,您可以打开 AWS Glue 架构检测函数。AWS Glue 会通过流数据来确定架构。

使用AWS Glue控制台、 AWS Command Line Interface (AWS CLI) 或 AWS Glue API 创建表。有关使用 AWS Glue 控制台手动创建表的信息,请参阅 AWS Glue 表

注意

您不能使用 AWS Lake Formation 控制台创建表;必须使用AWS Glue控制台。

另外,请考虑以下有关 Avro 格式的串流源或可以应用 Grok 模式的日志数据的信息。

Kinesis 数据源

在创建表时,请设置以下串流 ETL 属性(控制台)。

源的类型

Kinesis

对于同一账户中的 Kinesis 源:
区域

Amazon Kinesis Data Streams 服务所在的地 AWS 区。“区域”和 Kinesis 流名称会一起转换为“流 ARN”。

示例:https://kinesis.us-east-1.amazonaws.com

Kinesis 流名称

流名称如《Amazon Kinesis Data Streams 开发人员指南》https://docs.aws.amazon.com/streams/latest/dev/kinesis-using-sdk-java-create-stream.html中的创建流所述。

有关其他账户中的 Kinesis 源,请参阅此示例来设置角色和策略以允许进行跨账户访问。配置以下设置:
流 ARN

使用者用于注册的 Kinesis Data Streams ARN。有关更多信息,请参阅中的 Amazon 资源名称 (ARN) 和 AWS 服务命名空间AWS 一般参考

所担任角色的 ARN

担任角色的 Amazon Resource Name(ARN)。

会话名称(可选)

所担任角色会话的标识符。

在根据不同规则或因为不同原因担任相同角色时,使用角色会话名称对会话进行唯一标识。在跨账户方案中,角色会话名称对于拥有此角色的账户可见,并且可以由拥有该角色的账户记录。所担任角色规则的 ARN 中也使用角色会话名称。这意味着,后续使用临时安全证书的跨账户 API 请求将在其 AWS CloudTrail 日志中向外部账户公开角色会话名称。

为 Amazon Kinesis Data Streams 设置串流 ETL 属性(AWS Glue API 或 AWS CLI)
  • 要为同一账户中的 Kinesis 源设置串流 ETL 属性,请在 CreateTable API 操作或 create_table CLI 命令的 StorageDescriptor 结构中指定 streamNameendpointUrl 参数。

    "StorageDescriptor": { "Parameters": { "typeOfData": "kinesis", "streamName": "sample-stream", "endpointUrl": "https://kinesis.us-east-1.amazonaws.com" } ... }

    或者,指定 streamARN

    "StorageDescriptor": { "Parameters": { "typeOfData": "kinesis", "streamARN": "arn:aws:kinesis:us-east-1:123456789:stream/sample-stream" } ... }
  • 要为其他账户中的 Kinesis 源设置串流 ETL 属性,请在 CreateTable API 操作或 create_table CLI 命令的 StorageDescriptor 结构中指定 streamARNawsSTSRoleARNawsSTSSessionName(可选)参数。

    "StorageDescriptor": { "Parameters": { "typeOfData": "kinesis", "streamARN": "arn:aws:kinesis:us-east-1:123456789:stream/sample-stream", "awsSTSRoleARN": "arn:aws:iam::123456789:role/sample-assume-role-arn", "awsSTSSessionName": "optional-session" } ... }

Kafka 数据源

在创建表时,请设置以下串流 ETL 属性(控制台)。

源的类型

Kafka

对于 Kafka 源:
主题名称

Kafka 中指定的主题名称。

连接

一个引用 Kafka 源的 AWS Glue 连接,如 为 Apache Kafka 数据流创建 AWS Glue 连接 中所述。

AWS Glue 架构注册表源

要将 AWS Glue 架构注册表用于串流任务,请按照位于 使用案例:AWS Glue Data Catalog 的说明创建或更新架构注册表。

目前,AWS Glue 串流仅支持架构推理推断设置为 false 的 Glue Schema Registry Avro 格式。

Avro 串流源的注释和限制

以下注释和限制适用于 Avro 格式的串流源:

  • 启用架构检测后,Avro 架构必须包含在负载中。关闭时,负载应仅包含数据。

  • 某些 Avro 数据类型在动态帧中不受支持。在使用 AWS Glue 控制台的创建表向导中的 Define a schema (定义架构) 页面时,您无法指定这些数据类型。在架构检测期间,Avro 架构中不受支持的类型将转换为受支持的类型,如下所示:

    • EnumType => StringType

    • FixedType => BinaryType

    • UnionType => StructType

  • 如果使用控制台中的 Define a schema (定义架构) 页面,则架构的隐含根元素类型为 record。如果你想要一个除 record 以外的根元素类型,例如 array 或者 map,则不能使用 Define a schema (定义架构) 页面来指定架构。相反,您必须跳过该页并将架构指定为表属性或在 ETL 脚本中指定。

    • 要在表属性中指定架构,请完成创建表向导,编辑表详细信息,并在 Table properties (表属性) 下添加新的键值对。使用密钥 avroSchema,然后为值输入架构 JSON 对象,如以下屏幕截图所示。

      
                                在 Table properties (表属性) 标题中,有两列文本字段。左侧列标题为 Key (密钥),右侧列标题为 Value (值)。第一行中的键/值对是 classification/avro。第二行中的键/值对是 Avroschema/{"type":"array","items":"string"}。
    • 要在 ETL 脚本中指定架构,请修改 datasource0 任务语句并将 avroSchema 密钥添加到 additional_options 参数,如以下 Python 和 Scala 示例所示。

      Python
      SCHEMA_STRING = ‘{"type":"array","items":"string"}’ datasource0 = glueContext.create_data_frame.from_catalog(database = "database", table_name = "table_name", transformation_ctx = "datasource0", additional_options = {"startingPosition": "TRIM_HORIZON", "inferSchema": "false", "avroSchema": SCHEMA_STRING})
      Scala
      val SCHEMA_STRING = """{"type":"array","items":"string"}""" val datasource0 = glueContext.getCatalogSource(database = "database", tableName = "table_name", redshiftTmpDir = "", transformationContext = "datasource0", additionalOptions = JsonOptions(s"""{"startingPosition": "TRIM_HORIZON", "inferSchema": "false", "avroSchema":"$SCHEMA_STRING"}""")).getDataFrame()

将 grok 模式应用于串流源

您可以为日志数据源创建串流 ETL 任务,并使用 Grok 模式将日志转换为结构化数据。然后,ETL 任务会将数据作为结构化数据源进行处理。在为串流源创建数据目录表时,可以指定要应用的 Grok 模式。

有关 Grok 模式和自定义模式字符串值的信息,请参阅 编写 grok 自定义分类器

将 grok 模式添加到数据目录表(控制台)
  • 使用创建表向导,并使用在 为串流源创建数据目录表 中指定的参数创建表。将数据格式指定为 Grok,填写 Grok pattern (Grok 模式) 字段,并可选择在 Custom patterns (optional) (自定义模式(可选)) 下添加自定义模式。

    
                        *

    在每个自定义模式后按 Enter

将 grok 模式添加到数据目录表(AWS Glue API 或 AWS CLI)
  • 添加 GrokPattern 参数,并且可以选择将 CustomPatterns 参数添加到 CreateTable API 操作或 create_table CLI 命令。

    "Parameters": { ... "grokPattern": "string", "grokCustomPatterns": "string", ... },

    grokCustomPatterns 表达为字符串,并使用“\n”作为模式之间的分隔符。

    以下是指定这些参数的示例。

    "parameters": { ... "grokPattern": "%{USERNAME:username} %{DIGIT:digit:int}", "grokCustomPatterns": "digit \d", ... }

定义串流 ETL 作业的作业属性

在 AWS Glue 控制台上定义串流 ETL 任务时,请提供以下特定于流的属性。有关其他任务属性的说明,请参阅 定义 Spark 作业的作业属性

IAM 角色

指定用于授权用于运行作业、访问流媒体源和访问目标数据存储的资源的 AWS Identity and Access Management (IAM) 角色。

要访问 Amazon Kinesis Data Streams,请将AmazonKinesisFullAccess AWS 托管策略附加到该角色,或者附加允许更精细访问权限的类似 IAM 策略。有关示例策略,请参阅使用 IAM 控制对 Amazon Kinesis Data Streams 资源的访问

有关在 AWS Glue 中运行任务的权限的更多信息,请参阅 适用于 AWS Glue 的 Identity and Access Management

Type

选择 Spark streaming (Spark 串流)

AWS Glue 版本

AWS Glue 版本确定可用于任务的 Apache Spark、Python 或 Scala 版本。选择一个选项指定可供作业使用的 Python 或 Scala 版本。AWS Glue支持 Python 3 的 2.0 版是串流 ETL 任务的默认设置。

作业超时

(可选)输入持续时间(以分钟为单位)。默认值为空,这意味着任务可能无限期运行。

数据来源

指定您在 为串流源创建数据目录表 中创建的表。

数据目标

请执行以下操作之一:

  • 选择 Create tables in your data target (在数据目标中创建表) 并指定以下数据目标属性。

    数据存储

    选择 Amazon S3 或 JDBC。

    格式

    选择任意格式。所有项都支持流式处理。

  • 选择 Use tables in the data catalog and update your data target (使用数据目录中的表并更新数据目标),然后选择用于 JDBC 数据存储的表。

输出架构定义

请执行以下操作之一:

  • 选择 Automatically detect schema of each record (自动检测每条记录的架构) 以启动架构检测。AWS Glue 可以通过串流数据确定架构。

  • 选择 Specify output schema for all records (指定所有记录的输出方案) 以使用 Apply Mapping(应用映射)转换来定义输出架构。

Script

(可选)提供您自己的脚本或修改生成的脚本以执行 Apache Spark Structured Streaming 引擎支持的操作。有关可用操作的信息,请参阅流式传输 DataFrames /Dataset 上的操作。

串流 ETL 注释和限制

请记住以下注释和限制:

  • 自动解压 AWS Glue 流式传输 ETL 任务仅适用于受支持的压缩类型。另请注意以下几点:

    • Framed Snappy 是指适用于 Snappy 的帧格式

    • Deflate 在 Glue 版本 3.0 而不是 Glue 版本 2.0 中受支持。

  • 使用架构检测时,无法执行串流数据联接。

  • AWS Glue 流式传输 ETL 任务不支持对具有 Avro 格式的 AWS Glue 架构注册表使用 Union 数据类型。

  • 您的 ETL 脚本可以使用 AWS Glue 的内置转换和 Apache Spark Structured Streaming 的原生转换。有关更多信息,请参阅 Apache Spark 网站上的流媒体 DataFrames /数据集操作或。AWS Glue PySpark 变换参考

  • AWS Glue 串流 ETL 任务使用检查点来跟踪已读取的数据。因此,停止并重新启动的任务将从流中停止的位置开始。如果要重新处理数据,您可以删除脚本中引用的检查点文件夹。

  • 不支持任务书签。

  • 要在作业中使用 Kinesis Data Streams 的增强型扇出功能,请参阅 在 Kinesis 流作业中使用增强型扇出功能

  • 如果您使用在 AWS Glue 架构注册表中创建的数据目录表,则当新的架构版本可用时,要反映该新架构,您需要执行以下操作:

    1. 停止与该表关联的任务。

    2. 更新数据目录表的架构。

    3. 重新启动与该表关联的任务。