Kinesis 连接 - AWS Glue

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

Kinesis 连接

您可以使用 Kinesis 连接使用存储在数据目录表中的信息读取和写入 Amazon Kinesis 数据流,或者通过提供直接访问数据流的信息。你可以从 Kinesis 读取信息到 Spark 中 DataFrame,然后将其转换为 G AWS lue。 DynamicFrame你可以用某种格式写 DynamicFrames 入 Kinesis。JSON如果直接访问数据流,请使用这些选项提供有关如何访问数据流的信息。

如果您使用 getCatalogSourcecreate_data_frame_from_catalog 使用来自 Kinesis 流式处理源的记录,则任务具有数据目录数据库和表名称信息,将其用于获取一些基本参数,以便从 Kinesis 流式处理源读取数据。如果使用 getSourcegetSourceWithFormatcreateDataFrameFromOptionscreate_data_frame_from_options,则必须使用此处描述的连接选项指定这些基本参数。

您可以使用 GlueContext 类中指定方法的以下参数为 Kinesis 指定连接选项。

  • Scala

    • connectionOptions:与 getSourcecreateDataFrameFromOptionsgetSink 结合使用

    • additionalOptions:与 getCatalogSourcegetCatalogSink 结合使用

    • options:与 getSourceWithFormatgetSinkWithFormat 结合使用

  • Python

    • connection_options:与 create_data_frame_from_optionswrite_dynamic_frame_from_options 结合使用

    • additional_options:与 create_data_frame_from_catalogwrite_dynamic_frame_from_catalog 结合使用

    • options:与 getSourcegetSink 结合使用

有关直播ETL作业的注意事项和限制,请参阅串流 ETL 注释和限制

配置 Kinesis

要在 G AWS lue Spark 作业中连接到 Kinesis 数据流,你需要一些先决条件:

  • 如果读取,则 AWS Glue 作业必须具有对 Kinesis 数据流的读取访问级别IAM权限。

  • 如果正在写入,则 AWS Glue 作业必须具有 Kinesis 数据流的写入访问级别IAM权限。

在某些情况下,您需要配置其他先决条件:

  • 如果您的 AWS Glue 任务配置了其他网络连接(通常用于连接到其他数据集),并且其中一个连接提供了 Amazon VPC Network 选项,则这将引导您的任务通过亚马逊进行通信VPC。在这种情况下,您还需要将 Kinesis 数据流配置为通过亚马逊进行通信。VPC为此,您可以在 Amazon VPC 和 Kinesis 数据流之间创建一个接口VPC终端节点。有关更多信息,请参阅将 Kinesis Data Streams 与VPC接口端点一起使用

  • 在另一个账户中指定 Amazon Kinesis Data Streams 时,您必须设置角色和策略,从而允许跨账户访问。有关更多信息,请参阅示例:从不同账户的 Kinesis 串流中读取

有关流式处理ETL作业先决条件的更多信息,请参阅在 AWS Glue 中流式处理 ETL 作业

示例:从 Kinesis 流读取

示例:从 Kinesis 流读取

forEachBatch 结合使用。

Amazon Kinesis 流式处理源示例:

kinesis_options = { "streamARN": "arn:aws:kinesis:us-east-2:777788889999:stream/fromOptionsStream", "startingPosition": "TRIM_HORIZON", "inferSchema": "true", "classification": "json" } data_frame_datasource0 = glueContext.create_data_frame.from_options(connection_type="kinesis", connection_options=kinesis_options)

示例:写入 Kinesis 流

示例:从 Kinesis 流读取

forEachBatch 结合使用。

Amazon Kinesis 流式处理源示例:

kinesis_options = { "streamARN": "arn:aws:kinesis:us-east-2:777788889999:stream/fromOptionsStream", "startingPosition": "TRIM_HORIZON", "inferSchema": "true", "classification": "json" } data_frame_datasource0 = glueContext.create_data_frame.from_options(connection_type="kinesis", connection_options=kinesis_options)

Kinesis 连接选项参考

为 Amazon Kinesis Data Streams 指定连接选项。

为 Kinesis 流式处理数据源使用以下连接选项:

  • "streamARN"(必填)用于读/写。Kinesis 数据流的。ARN

  • "classification"(读取所必填)用于读取。记录中数据使用的文件格式。除非 Data Catalog 提供,否则为必需。

  • "streamName" –(可选)用于读取。要从中读取的 Kinesis 数据流的名称。与 endpointUrl 一起使用。

  • "endpointUrl" –(可选)用于读取。默认:“https://kinesis.us-east-1.amazonaws.com”。Kinesis 直播的 AWS 端点。除非要连接到特殊区域,否则您无需对此进行更改。

  • "partitionKey" –(可选)用于写入。生成记录时所使用的 Kinesis 分区键。

  • "delimiter"(可选)用于读取。在 i classification s 时使用的值分隔符CSV。默认为“,”。

  • "startingPosition":(可选)用于读取。要从中读取数据的 Kinesis 数据流中的起始位置。可能的值是"latest""trim_horizon""earliest"、或模式UTC格式的时间戳字符串yyyy-mm-ddTHH:MM:SSZ(其中Z表示带有 +/-UTC 的时区偏移量。 例如 “2023-04-04T 08:00:00-04:00”)。默认值为 "latest"。注意:UTC格式中的时间戳字符串仅支持 AWS G "startingPosition" lue 版本 4.0 或更高版本。

  • "failOnDataLoss":(可选)如果有任何活动分片丢失或已过期,则作业失败。默认值为 "false"

  • "awsSTSRoleARN":(可选)用于读取/写入。使用 (ARN) 代入的角色的 Amazon 资源名称 AWS Security Token Service (AWS STS)。此角色必须拥有针对 Kinesis 数据流执行描述或读取记录操作的权限。在访问其他账户中的数据流时,必须使用此参数。与 "awsSTSSessionName" 结合使用。

  • "awsSTSSessionName":(可选)用于读取/写入。使用 AWS STS代入角色的会话的标识符。在访问其他账户中的数据流时,必须使用此参数。与 "awsSTSRoleARN" 结合使用。

  • "awsSTSEndpoint":(可选)使用代入角色连接到 Kinesis 时要使用的 AWS STS 端点。这允许在 a 中使用区域 AWS STS 终端节点VPC,而默认的全局终端节点是不可能的。

  • "maxFetchTimeInMs":(可选)用于读取。作业执行程序从 Kinesis 数据流中读取当前批处理记录所花费的最长时间,以毫秒为单位指定。在这段时间内可能会GetRecordsAPI拨打多个电话。默认值为 1000

  • "maxFetchRecordsPerShard":(可选)用于读取。每个微批次将从 Kinesis 数据流中的每个分片获取的最大记录数。注意:如果流式传输作业已经从 Kinesis 读取了额外的记录(在同一个 get-records 调用中),则客户端可以超过此限制。如果 maxFetchRecordsPerShard 需要严格,则必须是 maxRecordPerRead 的整数倍。默认值为 100000

  • "maxRecordPerRead":(可选)用于读取。每项 getRecords 操作中要从 Kinesis 数据流获取的最大记录数。默认值为 10000

  • "addIdleTimeBetweenReads":(可选)用于读取。在两项连续 getRecords 操作之间添加时间延迟。默认值为 "False"。此选项仅适用于 Glue 版本 2.0 及更高版本。

  • "idleTimeBetweenReadsInMs":(可选)用于读取。两项连续 getRecords 操作之间的最短时间延迟,以毫秒为单位。默认值为 1000。此选项仅适用于 Glue 版本 2.0 及更高版本。

  • "describeShardInterval":(可选)用于读取。脚本需要考虑重新分片的两次ListShardsAPI调用之间的最短时间间隔。有关更多信息,请参阅《Amazon Kinesis Data Streams 开发人员指南》中的重新分区策略。默认值为 1s

  • "numRetries":(可选)用于读取。Kinesis Data API Streams 请求的最大重试次数。默认值为 3

  • "retryIntervalMs":(可选)用于读取。重试 Kinesis Data Streams 调用之前的冷却时间(以毫秒为单位指定)。API默认值为 1000

  • "maxRetryIntervalMs":(可选)用于读取。两次重试 Kinesis Data Streams 调用之间的最大冷却时间(以毫秒为单位指定)。API默认值为 10000

  • "avoidEmptyBatches":(可选)用于读取。在批处理开始之前检查 Kinesis 数据流中是否有未读数据,避免创建空白微批处理任务。默认值为 "False"

  • "schema":( inferSchema 设置为 false 时为必填项)用于读取。用于处理有效负载的架构。如果分类为 avro,则提供的架构必须采用 Avro 架构格式。如果分类不是avro,则提供的架构必须采用DDL架构格式。

    以下是一些架构示例。

    Example in DDL schema format
    `column1` INT, `column2` STRING , `column3` FLOAT
    Example in Avro schema format
    { "type":"array", "items": { "type":"record", "name":"test", "fields": [ { "name":"_id", "type":"string" }, { "name":"index", "type": [ "int", "string", "float" ] } ] } }
  • "inferSchema":(可选)用于读取。默认值为‘false’。如果设置为“true”,则会在运行时检测到 foreachbatch 内的有效工作负载中的架构。

  • "avroSchema":(已弃用)用于读取。用于指定 Avro 数据架构(使用 Avro 格式时)的参数。此参数现已被弃用。使用 schema 参数。

  • "addRecordTimestamp":(可选)用于读取。当选项设置为 'true' 时,数据输出将包含一个名为 "__src_timestamp" 的附加列,表示数据流收到相应记录的时间。默认值为‘false’。4.0 或更高 AWS Glue 版本支持此选项。

  • "emitConsumerLagMetrics":(可选)用于读取。当该选项设置为 “true” 时,对于每个批次,它将发出从直播收到的最旧记录到它到达的时间之间的持续时间内的AWS Glue指标。 CloudWatch该指标的名字是 “glue.driver.streaming”。 maxConsumerLagInMs”。默认值为‘false’。4.0 或更高 AWS Glue 版本支持此选项。

  • "fanoutConsumerARN":(可选)用于读取。ARN中指定的直播的 Kinesis 直播使用者的。streamARN用于为您的 Kinesis 连接启用增强型扇出功能模式。有关使用增强型扇出功能的 Kinesis 流的更多信息,请参阅 在 Kinesis 流作业中使用增强型扇出功能

  • "recordMaxBufferedTime" –(可选)用于写入。默认值:1000(ms)。记录在等待写入时缓冲的最长时间。

  • "aggregationEnabled" –(可选)用于写入。默认值:真。指定是否应在将记录发送到 Kinesis 之前对其进行汇总。

  • "aggregationMaxSize" –(可选)用于写入。默认值:51200(字节)。如果记录超过了此限制,则它将绕过聚合器。注意:Kinesis 将记录大小限制为 50KB。如果您将其设置为 50KB 以上,Kinesis 将拒绝过大的记录。

  • "aggregationMaxCount" –(可选)用于写入。默认值:4294967295。要打包至汇总记录的最大项目数量。

  • "producerRateLimit" –(可选)用于写入。默认值:150(%)。限制从单个生产者(例如您的作业)发送的每个分片的吞吐量,以占后端限制的百分比表示。

  • "collectionMaxCount" –(可选)用于写入。默认值:500。一个 PutRecords 请求中要打包的最大物品数量。

  • "collectionMaxSize" –(可选)用于写入。默认值:5242880(字节)。与 PutRecords 请求一起发送的最大数据量。