使用 Lambda 处理 Amazon MSK 消息 - AWS Lambda

使用 Lambda 处理 Amazon MSK 消息

注意

如果想要将数据发送到 Lambda 函数以外的目标,或要在发送数据之前丰富数据,请参阅 Amazon EventBridge Pipes(Amazon EventBridge 管道)。

将 Amazon MSK 添加为事件源

创建事件源映射,使用 Lambda 控制台、AWS开发工具包,或 AWS Command Line Interface (AWS CLI) 将您的 Amazon MSK 添加为 Lambda 函数触发器。请注意,当您将 Amazon MSK 添加为触发器时,Lambda 将假定 Amazon MSK 集群的 VPC 设置,而不是 Lambda 函数的 VPC 设置。

本节介绍了如何使用 Lambda 控制台和 AWS CLI 创建事件源映射。

先决条件

  • 一个 Amazon MSK 集群和一个 Kafka 主题。有关更多信息,请参阅 Amazon Managed Streaming for Apache Kafka 开发人员指南中的开始使用 Amazon MSK

  • 一个有权访问 MSK 集群所用 AWS 资源的执行角色

可自定义的使用者组 ID

将 Kafka 设置为事件源时,您可以指定使用者组 ID。此使用者组 ID 是您希望 Lambda 函数加入的 Kafka 使用者组的现有标识符。您可以使用此功能将任何正在进行的 Kafka 记录处理设置从其他使用者无缝迁移到 Lambda。

如果指定了使用者组 ID,并且该使用者组中还有其他活跃的轮询器,则 Kafka 会向所有使用者分发消息。换句话说,Lambda 不会收到 Kafka 主题的所有消息。如果希望 Lambda 处理主题中的所有消息,请关闭该使用者组中的任何其他轮询器。

此外,如果指定了使用者组 ID,而 Kafka 找到了具有相同 ID 的有效现有使用者组,则 Lambda 会忽略事件源映射的 StartingPosition 参数。相反,Lambda 开始根据使用者组的已提交偏移量处理记录。如果指定了使用者组 ID,而 Kafka 找不到现有使用者组,则 Lambda 会使用指定的 StartingPosition 配置事件源。

在所有 Kafka 事件源中,您指定的使用者组 ID 必须是唯一的。在使用指定的使用者组 ID 创建 Kafka 事件源映射后,无法更新此值。

添加 Amazon MSK 触发器(控制台)

按照以下步骤将 Amazon MSK 集群和 Kafka 主题添加为 Lambda 函数的触发器。

将 Amazon MSK 触发器添加到 Lambda 函数(控制台)
  1. 打开 Lamba 控制台的 Functions(函数)页面。

  2. 选择 Lambda 函数的名称。

  3. Function overview(函数概览)下,选择 Add trigger(添加触发器)。

  4. Trigger configuration(触发配置)下,执行以下操作:

    1. 选择 MSK 触发器类型。

    2. 对于 MSK cluster(MSK 集群),选择您的集群。

    3. 对于 Batch size(批处理大小),输入要在单个批次中接收的最大消息数。

    4. 对于 Batch window(批处理时段),输入 Lambda 在调用函数之前收集记录所花费的最大秒数。

    5. 对于 Topic name(主题名称),输入 Kafka 主题名称。

    6. (可选)对于 Consumer group ID(使用者组 ID),输入要加入的 Kafka 使用者组的 ID。

    7. (可选)对于起始位置,选择最新即可从最新记录开始读取流,选择最早即可从最早的可用记录开始读取流,选择在时间戳处即可从指定的时间戳开始读取流。

    8. (可选)对于 Authentication(身份验证),选择用于通过 MSK 集群中的代理进行身份验证的密钥。

    9. 要在禁用状态下创建触发器以进行测试(推荐),请清除 Enable trigger(启用触发器)。或者,要立即启用该触发器,请选择 Enable trigger(启用触发器)。

  5. 要创建触发器,请选择 Add(添加)。

添加 Amazon MSK 触发器(AWS CLI)

使用以下示例 AWS CLI 命令为 Lambda 函数创建和查看 Amazon MSK 触发器。

使用 AWS CLI 创建触发器

例 — 为使用 IAM 身份验证的集群创建事件源映射

以下示例使用 create-event-source-mapping AWS CLI 命令将名为 my-kafka-function 的 Lambda 函数映射至名为 AWSKafkaTopic 的 Kafka 主题。将主题的起始位置设置为 LATEST。当集群使用基于 IAM 角色的身份验证时,您不需要 SourceAccessConfiguration 对象。例如:

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-east-1:111122223333:cluster/my-cluster/fc2f5bdf-fd1b-45ad-85dd-15b4a5a6247e-2 \ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function
例 — 为使用 SASL/SCRAM 身份验证的集群创建事件源映射

如果集群使用 SASL/SCRAM 身份验证,则必须包含指定 SASL_SCRAM_512_AUTHSourceAccessConfiguration 对象以及 Secrets Manager 密钥 ARN。

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-east-1:111122223333:cluster/my-cluster/fc2f5bdf-fd1b-45ad-85dd-15b4a5a6247e-2 \ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function --source-access-configurations '[{"Type": "SASL_SCRAM_512_AUTH","URI": "arn:aws:secretsmanager:us-east-1:111122223333:secret:my-secret"}]'
例 — 为使用 mTLS 身份验证的集群创建事件源映射

如果集群使用 mTLS 身份验证,则必须包含指定 CLIENT_CERTIFICATE_TLS_AUTHSourceAccessConfiguration 对象以及 Secrets Manager 密钥 ARN。

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-east-1:111122223333:cluster/my-cluster/fc2f5bdf-fd1b-45ad-85dd-15b4a5a6247e-2 \ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function --source-access-configurations '[{"Type": "CLIENT_CERTIFICATE_TLS_AUTH","URI": "arn:aws:secretsmanager:us-east-1:111122223333:secret:my-secret"}]'

有关更多信息,请参阅 CreateEventSourceMapping API 参考文档。

使用 AWS CLI 查看状态

以下示例使用 get-event-source-mapping AWS CLI 命令来描述您创建的事件源映射的状态。

aws lambda get-event-source-mapping \ --uuid 6d9bce8e-836b-442c-8070-74e77903c815

Amazon MSK 配置参数

所有 Lambda 事件源类型共享相同的 CreateEventSourceMappingUpdateEventSourceMapping API 操作。但是,只有部分参数适用于Amazon MSK。

参数 必需 默认值 注意

AmazonManagedKafkaEventSourceConfig

包含 ConsumerGroupId 字段,该字段默认为唯一值。

只能在 Create(创建)设置

BatchSize

100

最大值:10000

已启用

已启用

none

EventSourceArn

Y

不适用

只能在 Create(创建)设置

FunctionName

不适用

none

FilterCriteria

不适用

控制 Lambda 向您的函数发送的事件

MaximumBatchingWindowInSeconds

500 毫秒

批处理行为

SourceAccessConfigurations

无凭证

事件源的 SASL/SCRAM 或 CLIENT_CERTIFICATE_TLS_AUTH (MutualTLS) 身份验证凭证

StartingPosition

Y

不适用

AT_TIMESTAMP、TRIM_HORIZON 或 LATEST

只能在 Create(创建)设置

StartingPositionTimestamp

不适用

当 StartingPosition 设置为 AT_TIMESTAMP 时,为必需项

主题

Y

不适用

Kafka 主题名称

只能在 Create(创建)设置

创建跨账户事件源映射

您可以使用多 VPC 私有连接将 Lambda 函数连接到不同 AWS 账户 中的预置 MSK 集群。多 VPC 连接使用 AWS PrivateLink,可将所有流量保持在 AWS 网络内。

注意

您无法为无服务器 MSK 集群创建跨账户事件源映射。

要创建跨账户事件源映射,必须先为 MSK 集群配置多 VPC 连接。创建事件源映射时,请使用托管 VPC 连接 ARN 而非集群 ARN,如以下示例所示。CreateEventSourceMapping 操作也因 MSK 集群使用的身份验证类型而异。

例 — 为使用 IAM 身份验证的集群创建跨账户事件源映射

当集群使用基于 IAM 角色的身份验证时,您不需要 SourceAccessConfiguration 对象。例如:

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-east-1:111122223333:vpc-connection/444455556666/my-cluster-name/51jn98b4-0a61-46cc-b0a6-61g9a3d797d5-7 \ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function
例 — 为使用 SASL/SCRAM 身份验证的集群创建跨账户事件源映射

如果集群使用 SASL/SCRAM 身份验证,则必须包含指定 SASL_SCRAM_512_AUTHSourceAccessConfiguration 对象以及 Secrets Manager 密钥 ARN。

有两种方法可以通过 SASL/SCRAM 身份验证将密钥用于跨账户 Amazon MSK 事件源映射:

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-east-1:111122223333:vpc-connection/444455556666/my-cluster-name/51jn98b4-0a61-46cc-b0a6-61g9a3d797d5-7 \ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function \ --source-access-configurations '[{"Type": "SASL_SCRAM_512_AUTH","URI": "arn:aws:secretsmanager:us-east-1:444455556666:secret:my-secret"}]'
例 — 为使用 mTLS 身份验证的集群创建跨账户事件源映射

如果集群使用 mTLS 身份验证,则必须包含指定 CLIENT_CERTIFICATE_TLS_AUTHSourceAccessConfiguration 对象以及 Secrets Manager 密钥 ARN。密钥可以存储在集群账户或 Lambda 函数账户中。

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-east-1:111122223333:vpc-connection/444455556666/my-cluster-name/51jn98b4-0a61-46cc-b0a6-61g9a3d797d5-7 \ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function \ --source-access-configurations '[{"Type": "CLIENT_CERTIFICATE_TLS_AUTH","URI": "arn:aws:secretsmanager:us-east-1:444455556666:secret:my-secret"}]'

将 Amazon MSK 集群用作事件源

当您添加 Apache Kafka 或 Amazon MSK 集群作为 Lambda 函数的触发器时,该集群将用作事件源

Lambda 根据您指定的 StartingPosition,从您在 CreateEventSourceMapping 请求中指定为 Topics 的 Kafka 主题读取事件数据。成功进行处理后,会将 Kafka 主题提交给 Kafka 集群。

如果您指定 StartingPosition 作为 LATEST,则 Lambda 开始读取主题下每个分区中的最新消息。由于在触发器配置后 Lambda 开始读取消息之前可能会有一些延迟,因此 Lambda 不会读取在此窗口中生成的任何消息。

Lambda 按顺序读取每个 Kafka 主题分区的消息。单个 Lambda 负载可以包含来自多个分区的消息。当有更多记录可用时,Lambda 根据您在 CreateEventSourceMapping 中指定的 BatchSize 值,继续对记录进行批处理,直到函数赶上主题的速度。

Lambda 处理各个批次后,会提交该批次中消息的偏移量。如果函数为批处理中的任何消息返回错误,Lambda 将重试整批消息,直到处理成功或消息过期为止。您可以将所有重试都失败的记录发送到失败时的目标,以供日后处理。

注意

尽管 Lambda 函数的最大超时限制通常为 15 分钟,但 Amazon MSK、自行管理的 Apache Kafka、Amazon DocumentDB、Amazon MQ for ActiveMQ 和 RabbitMQ 的事件源映射,仅支持最大超时限制为 14 分钟的函数。此约束可确保事件源映射可以正确处理函数错误和重试。

轮询和流的起始位置

请注意,事件源映射创建和更新期间的流轮询最终是一致的。

  • 在事件源映射创建期间,可能需要几分钟才能开始轮询来自流的事件。

  • 在事件源映射更新期间,可能需要几分钟才能停止和重新开始轮询来自流的事件。

此行为意味着,如果你指定 LATEST 作为流的起始位置,事件源映射可能会在创建或更新期间错过事件。为确保不会错过任何事件,请将流的起始位置指定为 TRIM_HORIZON 或 AT_TIMESTAMP

Amazon CloudWatch 指标

Lambda 会在您的函数处理记录时发出 OffsetLag 指标。此指标的值是写入 Kafka 事件源主题的最后一条记录与函数的使用者组处理的最后一条记录之间的偏移量差值。您可以使用 OffsetLag 来估计添加记录和使用者组处理记录之间的延迟。

如果 OffsetLag 呈上升趋势,则可能表明函数的使用者组中的轮询器存在问题。有关更多信息,请参阅 查看 Lambda 函数的指标

Amazon MSK 事件源的自动伸缩

当您最初创建 Amazon MSK 事件源时,Lambda 会分配一个使用者来处理 Kafka 主题中的所有分区。每个使用者都使用多个并行运行的处理器来处理增加的工作负载。此外,Lambda 会根据工作负载自动增加或缩减使用者的数量。要保留每个分区中的消息顺序,使用者的最大数量为主题中每个分区一个使用者。

Lambda 会按一分钟的间隔时间来评估主题中所有分区的使用者偏移滞后。如果延迟太高,则分区接收消息的速度比 Lambda 处理消息的速度更快。如有必要,Lambda 会在主题中添加或删除使用者。增加或移除使用者的扩缩过程会在评估完成后的三分钟内进行。

如果目标 Lambda 函数受到限制,Lambda 会减少使用者的数量。此操作通过减少使用者可以检索和发送到函数的消息数来减少函数的工作负载。

要监控 Kafka 主题的吞吐量,请查看 Lambda 在您的函数处理记录时发出的偏移滞后指标

要检查并行发生的函数调用次数,还可以监控函数的并发指标