本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
Kafka 连接
指定与 Kafka 集群或 Amazon Managed Streaming for Apache Kafka 集群的连接。
您可以在 GlueContext
对象下面使用以下方法,利用来自 Kafka 流式传输源的记录:
-
getCatalogSource
-
getSource
-
getSourceWithFormat
-
createDataFrameFromOptions
如果您使用 getCatalogSource
,则任务具有数据目录数据库和表名称信息,将其用于获取一些基本参数,以便从 Apache Kafka 流读取数据。如果您使用 getSource
、getSourceWithFormat
或 createDataFrameFromOptions
,则您必须明确指定以下参数:
您可以配合使用 connectionOptions
与 getSource
或createDataFrameFromOptions
、options
与 getSourceWithFormat
、或者 additionalOptions
与 getCatalogSource
,以指定这些选项。
有关流式传输 ETL 作业的注意事项和限制,请参阅串流 ETL 注释和限制。
配置 Kafka
连接可通过互联网访问的 Kafka 直播没有任何AWS先决条件。
您可以创建 AWS Glue Kafka 连接来管理您的连接凭证。有关更多信息,请参阅为 Apache Kafka 数据流创建 AWS Glue 连接:在你的 AWS Glue 作业配置中,提供 c onnectionName
作为附加网络连接,然后在方法调用中为参数提供 c onnectionName
。connectionName
在某些情况下,您需要配置其他先决条件:
-
如果使用带有 IAM 身份验证的 Apache Kafka 版亚马逊托管流媒体,则需要适当的 IAM 配置。
-
如果在亚马逊 VPC 中使用适用于 Apache Kafka 的亚马逊托管流媒体 Kafka,则需要适当的亚马逊 VPC 配置。您需要创建一个提供亚马逊 VP AWS C 连接信息的 Glue 连接。您需要在作业配置中将 Glue AWS 连接作为附加网络连接包括在内。
有关流式传输 ETL 作业先决条件的更多信息,请参阅在 AWS Glue 中流式处理 ETL 作业。
示例:从 Kafka 直播中读取
与 forEachBatch 结合使用。
Kafka 流式传输源示例:
kafka_options = { "connectionName": "ConfluentKafka", "topicName": "kafka-auth-topic", "startingOffsets": "earliest", "inferSchema": "true", "classification": "json" } data_frame_datasource0 = glueContext.create_data_frame.from_options(connection_type="kafka", connection_options=kafka_options)
Kafka 连接选项参考
"connectionType": "kafka"
可使用以下连接选项:
-
"bootstrap.servers"
(必需)引导服务器 URL 的列表,例如,作为b-1.vpc-test-2.o4q88o.c6.kafka.us-east-1.amazonaws.com:9094
。此选项必须在 API 调用中指定,或在数据目录的表元数据中定义。 -
"security.protocol"
(必填)用于与代理通信的协议。可能的值为"SSL"
或"PLAINTEXT"
。 -
"topicName"
(必填)要订阅的以逗号分隔的主题列表。您必须指定"topicName"
、"assign"
或"subscribePattern"
中的其中一个,且只能指定一个。 -
"assign"
:(必填)用于指定要使用的TopicPartitions
的 JSON 字符串。您必须指定"topicName"
、"assign"
或"subscribePattern"
中的其中一个,且只能指定一个。例如:“{"topicA":[0,1],"topicB":[2,4]}”
-
"subscribePattern"
:(必需)标识要订阅的主题列表的 Java 正则表达式字符串。您必须指定"topicName"
、"assign"
或"subscribePattern"
中的其中一个,且只能指定一个。示例:“topic.*”
-
"classification"
(必需)记录中数据使用的文件格式。除非通过数据目录提供,否则为必填项。 -
"delimiter"
(可选)当为 CSV 时classification
使用的值分隔符。默认为 “,
。” -
"startingOffsets"
:(可选)Kafka 主题中数据读取的起始位置。可能的值为"earliest"
或"latest"
。默认值为"latest"
。 -
"startingTimestamp"
:(可选,仅支持 AWS Glue 4.0 或更高版本)Kafka 主题中要从中读取数据的记录的时间戳。可能的值是模式中的 UTC 格式的时间戳字符串yyyy-mm-ddTHH:MM:SSZ
(其中Z
表示带有 +/-的 UTC 时区偏移量。 例如:“2023-04-04T 08:00:00-04:00”)。注意:Gl AWS ue 直播脚本的 “连接选项” 列表中只能出现 “startingOffsets” 或 “startingTimeStamp” 中的一个,包括这两个属性都会导致任务失败。
-
"endingOffsets"
:(可选)批处理查询结束时的终点。可能值为"latest"
,或者为每个TopicPartition
指定结束偏移的 JSON 字符串。对于 JSON 字符串,格式为
{"topicA":{"0":23,"1":-1},"topicB":{"0":-1}}
。偏移值-1
表示"latest"
。 -
"pollTimeoutMs"
:(可选)Spark 任务执行程序中,从 Kafka 轮询数据的超时时间(以毫秒为单位)。默认值为512
。 -
"numRetries"
:(可选)无法获取 Kafka 偏移时的重试次数。默认值为3
。 -
"retryIntervalMs"
:(可选)重试获取 Kafka 偏移时的等待时间(以毫秒为单位)。默认值为10
。 -
"maxOffsetsPerTrigger"
:(可选)每个触发间隔处理的最大偏移数的速率限制。指定的总偏移数跨不同卷的topicPartitions
按比例分割。默认值为 null,这意味着使用者读取所有偏移,直到已知的最新偏移。 -
"minPartitions"
:(可选)从 Kafka 读取数据的必需最小分区数。默认值为 null,这意味着 Spark 分区数等于 Kafka 分区数。 -
"includeHeaders"
:(可选)是否包含 Kafka 标头。当选项设置为“true”时,数据输出将包含一个名为“glue_streaming_kafka_headers”的附加列,类型为Array[Struct(key: String, value: String)]
。默认值为“false”。此选项仅适用于 AWS Glue 版本 3.0 或更高版本。 -
"schema"
:(当 inferSchema 设为 false 时为必填)用于处理有效工作负载的架构。如果分类为avro
,则提供的架构必须采用 Avro 架构格式。如果分类不是avro
,则提供的架构必须采用 DDL 架构格式。以下是一些架构示例。
-
"inferSchema"
:(可选)默认值为“false”。如果设置为“true”,则会在运行时检测到foreachbatch
内的有效工作负载中的架构。 -
"avroSchema"
:(已弃用)用于指定 Avro 数据架构(使用 Avro 格式时)的参数。此参数现已被弃用。使用schema
参数。 -
"addRecordTimestamp"
:(可选)当选项设置为 'true' 时,数据输出将包含一个名为 "__src_timestamp" 的附加列,表示主题收到相应记录的时间。默认值为‘false’。4.0 或更高 AWS Glue 版本支持此选项。 -
"emitConsumerLagMetrics"
:(可选)当该选项设置为 “true” 时,对于每个批次,它将发布从主题收到的最旧记录到该记录到达的时间之间的持续时间内的AWS Glue指标。 CloudWatch该指标的名字是 “glue.driver.streaming”。 maxConsumerLagInMs”。默认值为‘false’。4.0 或更高 AWS Glue 版本支持此选项。