结合 Amazon MSK 使用 Lambda - AWS Lambda

结合 Amazon MSK 使用 Lambda

注意

如果想要将数据发送到 Lambda 函数以外的目标,或要在发送数据之前丰富数据,请参阅 Amazon EventBridge Pipes(Amazon EventBridge 管道)。

Amazon Managed Streaming for Apache Kafka (Amazon MSK) 是一项完全托管式服务,可用于构建并运行使用 Apache Kafka 来处理流数据的应用程序。Amazon MSK 简化了运行 Kafka 的集群的设置、扩展和管理。Amazon MSK 还可以更轻松地配置您的应用程序以适用于多个可用区和保证 AWS Identity and Access Management (IAM) 的安全性。Amazon MSK 支持多个开源版本的 Kafka。

Amazon MSK 作为事件源,运行方式与使用 Amazon Simple Queue Service (Amazon SQS) 或 Amazon Kinesis 相似。Lambda 在内部轮询来自事件源的新消息,然后同步调用目标 Lambda 函数。Lambda 批量读取消息,并将这些消息作为事件有效负载提供给您的函数。最大批处理大小是可配置的(默认值为 100 条消息)。有关更多信息,请参阅 批处理行为

注意

尽管 Lambda 函数的最大超时限制通常为 15 分钟,但 Amazon MSK、自行管理的 Apache Kafka、Amazon DocumentDB、Amazon MQ for ActiveMQ 和 RabbitMQ 的事件源映射,仅支持最大超时限制为 14 分钟的函数。此约束可确保事件源映射可以正确处理函数错误和重试。

Lambda 按顺序读取各个分区的消息。单个 Lambda 负载可以包含来自多个分区的消息。Lambda 处理各个批次后,会提交该批次中消息的偏移量。如果函数为批处理中的任何消息返回错误,Lambda 将重试整批消息,直到处理成功或消息过期为止。

警告

Lambda 事件源映射至少处理每个事件一次,有可能出现重复处理记录的情况。为避免与重复事件相关的潜在问题,我们强烈建议您将函数代码设为幂等性。要了解更多信息,请参阅 AWS 知识中心的如何使我的 Lambda 函数具有幂等性

有关如何将 Amazon MSK 配置为事件源的示例,请参阅AWS计算博客上的将 Amazon MSK 用作 AWS Lambda 事件源。要查看完整的教程,请访问 Amazon MSK Labs 中的 Amazon MSK Lambda 集成

示例事件

Lambda 调用函数时会在事件参数中发送一批消息。事件负载包含一个消息数组。每个数组项目都包含 Amazon MSK 主题和分区标识符的详细信息,以及时间戳和 base64 编码的消息。

{ "eventSource":"aws:kafka", "eventSourceArn":"arn:aws:kafka:sa-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2", "bootstrapServers":"b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092", "records":{ "mytopic-0":[ { "topic":"mytopic", "partition":0, "offset":15, "timestamp":1545084650987, "timestampType":"CREATE_TIME", "key":"abcDEFghiJKLmnoPQRstuVWXyz1234==", "value":"SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "headers":[ { "headerKey":[ 104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101 ] } ] } ] } }

MSK 集群身份验证

Lambda 需要访问 Amazon MSK 集群、检索记录和执行其他任务的权限。Amazon MSK 支持通过多种选项来控制客户端对 MSK 集群的访问。

未经身份验证的访问

如果没有客户端会通过互联网访问集群,则可以使用未经身份验证访问。

SASL/SCRAM 身份验证

Amazon MSK 支持使用传输层安全性协议(TLS)加密进行简单身份验证和安全层/加盐质疑应答身份验证机制(SASL/SCRAM)身份验证。为使 Lambda 连接到集群,您可以将身份验证凭证(用户名和密码)存储在 AWS Secrets Manager 密钥中。

有关使用 Secrets Manager 的更多信息,请参阅《Amazon Managed Streaming for Apache Kafka 开发人员指南》中的使用 AWS Secrets Manager 进行用户名和密码身份验证

Amazon MSK 不支持 SASL/PLAIN 身份验证。

基于 IAM 角色的身份验证

您可以使用 IAM 来验证连接到 MSK 集群的客户端的身份。如果 IAM 身份验证在您的 MSK 集群上处于活动状态,并且您没有为身份验证提供密钥,则 Lambda 自动默认使用 IAM 身份验证。要创建和部署用户或基于角色的策略,请使用 IAM 控制台或 API。有关更多信息,请参阅《Amazon Managed Streaming for Apache Kafka 开发人员指南》中的 IAM 访问控制

要允许 Lambda 连接到 MSK 集群、读取记录和执行其他所需操作,请将以下权限添加到函数的执行角色

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kafka-cluster:Connect", "kafka-cluster:DescribeGroup", "kafka-cluster:AlterGroup", "kafka-cluster:DescribeTopic", "kafka-cluster:ReadData", "kafka-cluster:DescribeClusterDynamicConfiguration" ], "Resource": [ "arn:aws:kafka:region:account-id:cluster/cluster-name/cluster-uuid", "arn:aws:kafka:region:account-id:topic/cluster-name/cluster-uuid/topic-name", "arn:aws:kafka:region:account-id:group/cluster-name/cluster-uuid/consumer-group-id" ] } ] }

您可以将这些权限范围限定为特定集群、主题和组。有关更多信息,请参阅《Amazon Managed Streaming for Apache Kafka 开发人员指南》中的 Amazon MSK Kafka 操作

双向 TLS 身份验证

双向 TLS(mTLS)在客户端和服务器之间提供双向身份验证。客户端向服务器发送证书以便服务器验证客户端,而服务器又向客户端发送证书以便客户端验证服务器。

对于 Amazon MSK,Lambda 充当客户端。您可以配置客户端证书(作为 Secrets Manager 中的密钥),以使用 MSK 集群中的代理对 Lambda 进行身份验证。客户端证书必须由服务器信任存储中的 CA 签名。MSK 集群会向 Lambda 发送服务器证书,以便使用 Lambda 对代理进行身份验证。服务器证书必须由 AWS 信任存储中的证书颁发机构(CA)签名。

有关如何生成客户端证书的说明,请参阅为作为事件源的 Amazon MSK 引入双向 TLS 身份验证

Amazon MSK 不支持自签名服务器证书,因为 Amazon MSK 中的所有代理都使用由 Amazon Trust Services CA 签名的公有证书,预设情况下 Lambda 信任这些证书。

有关适用于 Amazon MSK 的 mTLS 的更多信息,请参阅《Amazon Managed Streaming for Apache Kafka 开发人员指南》中的双向 TLS 身份验证

配置 mTLS 密钥

CLIENT_CERTIFICATE_TLS_AUTH 密钥需要证书字段和私有密钥字段。对于加密的私有密钥,密钥需要私有密钥密码。证书和私有密钥必须采用 PEM 格式。

注意

Lambda 支持 PBES1(而不是 PBES2)私有密钥加密算法。

证书字段必须包含证书列表,首先是客户端证书,然后是任何中间证书,最后是根证书。每个证书都必须按照以下结构在新行中启动:

-----BEGIN CERTIFICATE----- <certificate contents> -----END CERTIFICATE-----

Secrets Manager 支持最多包含 65536 字节的密钥,这为长证书链提供了充足的空间。

私有密钥必须采用 PKCS #8 格式,并具有以下结构:

-----BEGIN PRIVATE KEY----- <private key contents> -----END PRIVATE KEY-----

对于加密的私有密钥,请使用以下结构:

-----BEGIN ENCRYPTED PRIVATE KEY----- <private key contents> -----END ENCRYPTED PRIVATE KEY-----

以下示例显示使用加密私有密钥进行 mTLS 身份验证的密钥内容。对于加密的私有密钥,您可以在密钥中包含私有密钥密码。

{ "privateKeyPassword": "testpassword", "certificate": "-----BEGIN CERTIFICATE----- MIIE5DCCAsygAwIBAgIRAPJdwaFaNRrytHBto0j5BA0wDQYJKoZIhvcNAQELBQAw ... j0Lh4/+1HfgyE2KlmII36dg4IMzNjAFEBZiCRoPimO40s1cRqtFHXoal0QQbIlxk cmUuiAii9R0= -----END CERTIFICATE----- -----BEGIN CERTIFICATE----- MIIFgjCCA2qgAwIBAgIQdjNZd6uFf9hbNC5RdfmHrzANBgkqhkiG9w0BAQsFADBb ... rQoiowbbk5wXCheYSANQIfTZ6weQTgiCHCCbuuMKNVS95FkXm0vqVD/YpXKwA/no c8PH3PSoAaRwMMgOSA2ALJvbRz8mpg== -----END CERTIFICATE-----", "privateKey": "-----BEGIN ENCRYPTED PRIVATE KEY----- MIIFKzBVBgkqhkiG9w0BBQ0wSDAnBgkqhkiG9w0BBQwwGgQUiAFcK5hT/X7Kjmgp ... QrSekqF+kWzmB6nAfSzgO9IaoAaytLvNgGTckWeUkWn/V0Ck+LdGUXzAC4RxZnoQ zp2mwJn2NYB7AZ7+imp0azDZb+8YG2aUCiyqb6PnnA== -----END ENCRYPTED PRIVATE KEY-----" }

Lambda 如何选择引导代理

Lambda 根据集群上可用的身份验证方法以及您是否提供用于身份验证的密钥来选择引导代理。如果您为 mTLS 或 SASL/SCRAM 提供了密钥,Lambda 将自动选择该身份验证方法。如果不提供密钥,Lambda 会选择在集群上处于活动状态的最强身份验证方法。下面是 Lambda 选择代理的优先级顺序,从最强到最弱的身份验证:

  • mTLS(为 mTLS 提供的密钥)

  • SASL/SCRAM(为 SASL/SCRAM 提供的密钥)

  • SASL IAM(未提供密钥,IAM 身份验证处于活动状态)

  • 未经身份验证的 TLS(未提供密钥,IAM 身份验证未处于活动状态)

  • 纯文本(未提供密钥,IAM 身份验证和未经身份验证的 TLS 均未处于活动状态)

注意

如果 Lambda 无法连接到最安全的代理类型,则 Lambda 不会尝试连接到其他(较弱)代理类型。如果希望 Lambda 选择较弱的代理类型,请停用集群上所有更强的身份验证方法。

管理 API 访问和权限

除了访问 Amazon MSK 集群外,您的函数还需要具有执行各种 Amazon MSK API 操作的权限。您可以为函数的执行角色添加这些权限。如果您的用户需要访问任何 Amazon MSK API 操作,请将所需权限添加到用户或角色的身份策略中。

您可以将以下各项权限手动添加到您的执行角色。您也可以将 AWS 托管式策略 AWSLambdaMSKExecutionRole 附加到您的执行角色。AWSLambdaMSKExecutionRole 策略包含了下面列出的所有必需 API 操作和 VPC 权限。

需要的 Lambda 函数执行角色权限

要在 Amazon CloudWatch Logs 中创建日志并将日志存储到日志组,Lambda 函数必须在它的执行角色中具有以下权限:

要使 Lambda 能够代表您访问您的 Amazon MSK 集群,您的 Lambda 函数执行角色必须具有以下权限。

您只需要添加 kafka:DescribeClusterkafka:DescribeClusterV2 中的一个。对于预调配的 MSK 集群,任何一个权限均有效。对于无服务器 MSK 集群,必须使用 kafka:DescribeClusterV2

注意

Lambda 最终计划从关联的 AWSLambdaMSKExecutionRole 托管式策略中移除 kafka:DescribeCluster 权限。您使用此策略,则应迁移任何使用 kafka:DescribeCluster 的应用程序,以便改用 kafka:DescribeClusterV2

VPC 权限

如果只有 VPC 内的用户才能访问您的 Amazon MSK 集群,则您的 Lambda 函数必须具有访问您的 Amazon VPC 资源的权限。这些资源包括您的 VPC、子网、安全组和网络接口。要连接到这些资源,函数的执行角色必须具有以下权限。这些权限包含在 AWSLambdaMSKExecutionRole AWS 托管式策略中。

可选的 Lambda 函数权限

您的 Lambda 函数还可能需要权限来:

  • 访问您的 SCRAM 密钥(如果使用 SASL/SCRAM 身份验证)。

  • 描述您的 Secrets Manager 密钥。

  • 访问 AWS Key Management Service(AWS KMS)客户管理的密钥。

  • 将失败调用的记录发送到目标。

Secrets Manager 和 AWS KMS 权限

根据您为 Amazon MSK 代理配置的访问控制类型,Lambda 函数可能需要具有访问您的 SCRAM 密钥(如果使用 SASL/SCRAM 身份验证)或 Secrets Manager 密钥,来解密您的 AWS KMS 客户自主管理型密钥的权限。要连接到这些资源,函数的执行角色必须具有以下权限:

将记录发送到目标

如果您想将失败调用的记录发送到故障目标,则您的 Lambda 函数必须具有发送这些记录的权限。对于 Kafka 事件源映射,您可以选择 Amazon SNS 主题、Amazon SQS 队列或 Amazon S3 存储桶作为目标。要将记录发送到 SNS 主题,您的函数的执行角色必须具有以下权限:

要将记录发送到 SQS 队列,您的函数的执行角色必须具有以下权限:

要将记录发送到 S3 存储桶,您的函数的执行角色必须具有以下权限:

此外,如果您在目标上配置了 KMS 密钥,则根据具体目标类型,Lambda 需要以下权限:

  • 如果您已使用自己的 KMS 密钥为 S3 目标启用加密,则需要 kms:GenerateDataKey。如果 KMS 密钥和 S3 存储桶目标与您的 Lambda 函数和执行角色位于不同的账户中,请将 KMS 密钥配置为信任执行角色以允许 kms:GenerateDataKey。

  • 如果您已使用自己的 KMS 密钥为 SQS 目标启用加密,则需要 kms:Decryptkms:GenerateDataKey。如果 KMS 密钥和 SQS 队列目标与您的 Lambda 函数和执行角色位于不同的账户中,请将 KMS 密钥配置为信任执行角色以允许 kms:Decrypt、kms:GenerateDataKey、kms:DescribeKeykms:ReEncrypt

  • 如果您已使用自己的 KMS 密钥为 SNS 目标启用加密,则需要 kms:Decryptkms:GenerateDataKey。如果 KMS 密钥和 SNS 主题目标与您的 Lambda 函数和执行角色位于不同的账户中,请将 KMS 密钥配置为信任执行角色以允许 kms:Decrypt、kms:GenerateDataKey、kms:DescribeKeykms:ReEncrypt

向执行角色添加权限

请按照以下步骤使用 IAM 控制台将 AWS 托管策略 AWSLambdaMSKExecutionRole 添加到执行角色。

添加 AWS 托管策略
  1. 打开 IAM 控制台的 Policies(策略)页面

  2. 在搜索框中,输入策略名称 (AWSLambdaMSKExecutionRole)。

  3. 从列表中选择策略,然后依次选择 Policy actions(策略操作)、Attach(附加)。

  4. Attach policy(附加策略)页面,从列表中选择您的执行角色,然后选择 Attach policy(附加策略)。

使用 IAM policy 授予用户访问权限

预设情况下,用户和角色无权执行 Amazon MSK API 操作。要向组织或账户中的用户授予访问权限,您可以添加或更新基于身份的策略。有关更多信息,请参阅 Amazon Managed Streaming for Apache Kafka 开发人员指南中的 Amazon MSK 基于身份的策略示例

身份验证和授权错误

如果缺少使用来自 Amazon MSK 集群的数据所需的任何权限,Lambda 会在 LastProcessingResult 下的事件源映射中显示以下错误消息。

集群未能授权 Lambda

对于 SASL/SCRAM 或 mTLS,此错误表明提供的用户不具有以下所有必需的 Kafka 访问控制列表(ACL)权限:

  • DescribeConfigs 集群

  • 描述组

  • 读取组

  • 描述主题

  • 读取主题

对于 IAM 访问控制,此错误表明函数的执行角色缺少访问组或主题所需的一个或多个权限。查看 基于 IAM 角色的身份验证 中的所需权限列表。

当您使用所需的 Kafka 集群权限创建 Kafka ACL 或 IAM policy 时,请将主题和组指定为资源。主题名称必须与事件源映射中的主题一致。组名称必须与事件源映射的 UUID 一致。

向执行角色添加所需的权限后,更改可能需要几分钟才会生效。

SASL 身份验证失败

对于 SASL/SCRAM,此错误表明提供的用户名和密码无效。

对于 IAM 访问控制,此错误表明执行角色缺少 MSK 集群的 kafka-cluster:Connect 权限。将此权限添加到该角色并将集群的 Amazon Resource Name(ARN)指定为资源。

您可能会看到此错误间歇性发生。在 TCP 连接数超过 Amazon MSK 服务限额后,集群将拒绝连接。Lambda 会退回并重试,直到连接成功为止。在 Lambda 连接到集群并轮询记录后,最后的处理结果将更改为 OK

服务器未能通过 Lambda 的身份验证

此错误表明 Amazon MSK Kafka 未能通过 Lambda 的身份验证。出现此错误的可能原因如下:

  • 您没有为 mTLS 身份验证提供客户端证书。

  • 您提供了客户端证书,但未将代理配置为使用 mTLS。

  • 代理不信任客户端证书。

提供的证书或私有密钥无效

此错误表明 Amazon MSK 使用者无法使用提供的证书或私有密钥。确保证书和密钥使用 PEM 格式,并且私有密钥加密使用 PBES1 算法。

网络配置

为了让 Lambda 将 Kafka 集群用作事件源,需要访问集群所在的 Amazon VPC。建议为 Lambda 部署 AWS PrivateLink VPC 端点。为 Lambda 和 AWS Security Token Service(AWS STS)部署端点。如果代理使用身份验证,则还需要为 Secrets Manager 部署 VPC 端点。如果您配置了故障目标,则还要为该目标服务部署 VPC 端点。

或者,请确保与 Kafka 集群关联的 VPC 在每个公有子网中包含一个 NAT 网关。有关更多信息,请参阅 为连接到 VPC 的 Lambda 函数启用互联网访问权限

如果您使用 VPC 端点,则还必须将其配置为启用私有 DNS 名称

在为 MSK 集群创建事件源映射时,Lambda 会检查集群 VPC 的子网和安全组是否已经存在弹性网络接口(ENI)。如果 Lambda 发现现有 ENI,则会尝试重用这些 ENI。否则,Lambda 会创建新的 ENI 来连接到事件源并调用函数。

注意

Lambda 函数始终在 Lambda 服务拥有的 Amazon VPC 中运行。这些 VPC 由服务自动维护,对客户不可见。您也可以将函数连接到 Amazon VPC。无论是哪种情况,函数的 VPC 配置都不会影响事件源映射。只有事件源 VPC 的配置才能决定 Lambda 连接到事件源的方式。

可通过 Amazon MSK API 发现您的 Amazon VPC 配置。在设置过程中,您不需要使用 create-event-source-mapping 命令对其进行配置。

有关配置网络的更多信息,请参阅AWS计算博客上的使用 Apache Kafka 集群在 VPC 中设置 AWS Lambda

VPC 安全组规则

使用以下规则配置包含集群的 Amazon VPC 安全组(最低要求):

  • 入站规则 – 允许为事件源指定之安全组的 Amazon MSK 代理端口(9092 对应纯文本,9094 对应 TLS,9096 对应 SASL,9098 对应 IAM)上的所有流量。

  • 出站规则 – 允许所有目标的端口 443 上的所有流量传输。对于为事件源指定的安全组,允许 Amazon MSK 代理端口(9092 对应纯文本,9094 对应 TLS,9096 对应 SASL,9098 对应 IAM)上的所有流量。

  • 如果您使用的是 VPC 终端节点而不是 NAT 网关,则与 VPC 终端节点关联的安全组必须允许来自事件源安全组的端口 443 上的所有入站流量。

使用 VPC 端点

在使用 VPC 端点时,调用函数的 API 调用会使用 ENI 通过这些端点进行路由。Lambda 服务主体需要针对使用这些 ENI 的任何角色和函数调用 sts:AssumeRolelambda:InvokeFunction

默认情况下,VPC 端点的 IAM 策略处于开放状态。最佳做法是,将这些策略限制为仅允许特定主体使用该端点执行所需操作。为确保事件源映射能够调用 Lambda 函数,VPC 端点策略必须允许 Lambda 服务主体调用 sts:AssumeRolelambda:InvokeFunction。将 VPC 端点策略限制为仅允许来自组织内部的 API 调用,会导致事件源映射无法正常运行。

以下 VPC 端点策略示例展示了如何向 AWS STS 的 Lambda 服务主体和 Lambda 端点授予所需的访问权限。

例 VPC 端点策略 – AWS STS 端点
{ "Statement": [ { "Action": "sts:AssumeRole", "Effect": "Allow", "Principal": { "Service": [ "lambda.amazonaws.com" ] }, "Resource": "*" } ] }
例 VPC 端点策略 – Lambda 端点
{ "Statement": [ { "Action": "lambda:InvokeFunction", "Effect": "Allow", "Principal": { "Service": [ "lambda.amazonaws.com" ] }, "Resource": "*" } ] }

如果 Kafka 代理使用身份验证,您还可以限制 Secrets Manager 端点的 VPC 端点策略。要调用 Secrets Manager API,Lambda 会使用函数角色而非 Lambda 服务主体。以下示例展示了 Secrets Manager 端点策略。

例 VPC 端点策略 – Secrets Manager 端点
{ "Statement": [ { "Action": "secretsmanager:GetSecretValue", "Effect": "Allow", "Principal": { "AWS": [ "customer_function_execution_role_arn" ] }, "Resource": "customer_secret_arn" } ] }

如果配置了失败时的目标,Lambda 还会利用 Lambda 托管的 ENI 通过函数角色来调用 s3:PutObjectsns:Publishsqs:sendMessage

将 Amazon MSK 添加为事件源

创建事件源映射,使用 Lambda 控制台、AWS开发工具包,或 AWS Command Line Interface (AWS CLI) 将您的 Amazon MSK 添加为 Lambda 函数触发器。请注意,当您将 Amazon MSK 添加为触发器时,Lambda 将假定 Amazon MSK 集群的 VPC 设置,而不是 Lambda 函数的 VPC 设置。

本节介绍了如何使用 Lambda 控制台和 AWS CLI 创建事件源映射。

先决条件

  • 一个 Amazon MSK 集群和一个 Kafka 主题。有关更多信息,请参阅 Amazon Managed Streaming for Apache Kafka 开发人员指南中的开始使用 Amazon MSK

  • 一个有权访问 MSK 集群所用 AWS 资源的执行角色

可自定义的使用者组 ID

将 Kafka 设置为事件源时,您可以指定使用者组 ID。此使用者组 ID 是您希望 Lambda 函数加入的 Kafka 使用者组的现有标识符。您可以使用此功能将任何正在进行的 Kafka 记录处理设置从其他使用者无缝迁移到 Lambda。

如果指定了使用者组 ID,并且该使用者组中还有其他活跃的轮询器,则 Kafka 会向所有使用者分发消息。换句话说,Lambda 不会收到 Kafka 主题的所有消息。如果希望 Lambda 处理主题中的所有消息,请关闭该使用者组中的任何其他轮询器。

此外,如果指定了使用者组 ID,而 Kafka 找到了具有相同 ID 的有效现有使用者组,则 Lambda 会忽略事件源映射的 StartingPosition 参数。相反,Lambda 开始根据使用者组的已提交偏移量处理记录。如果指定了使用者组 ID,而 Kafka 找不到现有使用者组,则 Lambda 会使用指定的 StartingPosition 配置事件源。

在所有 Kafka 事件源中,您指定的使用者组 ID 必须是唯一的。在使用指定的使用者组 ID 创建 Kafka 事件源映射后,无法更新此值。

失败时的目标

要保留来自 Kafka 事件源的调用失败或负载过大的记录,请为函数配置失败时的目标。当调用失败时,Lambda 会向目标发送包含调用详细信息的 JSON 记录。

您可以选择 Amazon SNS 主题、Amazon SQS 队列或 Amazon S3 存储桶作为您的目标。对于 SNS 主题或 SQS 队列目标,Lambda 会将记录元数据发送到目标。对于 S3 存储桶目标,Lambda 会将整个调用记录以及元数据发送到目标。

要让 Lambda 成功将记录发送到您选择的目标,请确保函数的执行角色包含相关权限。该表还描述了每种目标类型如何接收 JSON 调用记录。

目标类型 支持以下事件源 所需的权限 特定于目标的 JSON 格式

Amazon SQS 队列

  • Kinesis

  • DynamoDB

  • 自行管理的 Apache Kafka 和托管的 Apache Kafka

Lambda 将调用记录元数据作为 Message 传递到目标。

Amazon SNS 主题

  • Kinesis

  • DynamoDB

  • 自行管理的 Apache Kafka 和托管的 Apache Kafka

Lambda 将调用记录元数据作为 Message 传递到目标。

Amazon S3 存储桶

  • 自行管理的 Apache Kafka 和托管的 Apache Kafka

Lambda 将调用记录及其元数据存储在目标。

提示

作为最佳实践,请仅在执行角色中包含所需的最小权限。

SNS 和 SQS 目标

以下示例显示了 Lambda 在 Kafka 事件源调用失败时向 SNS 主题或 SQS 队列目标发送的内容。recordsInfo 下面的每个密钥都包含 Kafka 主题和分区,用连字符分隔。例如,对于密钥 "Topic-0"Topic 是 Kafka 主题,0 是分区。对于每个主题和分区,可以使用偏移量和时间戳数据来查找原始调用记录。

{ "requestContext": { "requestId": "316aa6d0-8154-xmpl-9af7-85d5f4a6bc81", "functionArn": "arn:aws:lambda:us-east-1:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted" | "MaximumPayloadSizeExceeded", "approximateInvokeCount": 1 }, "responseContext": { // null if record is MaximumPayloadSizeExceeded "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:38:06.021Z", "KafkaBatchInfo": { "batchSize": 500, "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2", "bootstrapServers": "...", "payloadSize": 2039086, // In bytes "recordsInfo": { "Topic-0": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", }, "Topic-1": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", } } } }

S3 目标

对于 S3 目标,Lambda 会将整个调用记录以及元数据发送到目标。以下示例显示了 Lambda 因调用 Kafka 事件源失败而向 S3 存储桶目标发送消息。除了针对 SQS 和 SNS 目标的上一示例中的所有字段外,payload 字段还包含作为转义 JSON 字符串的原始调用记录。

{ "requestContext": { "requestId": "316aa6d0-8154-xmpl-9af7-85d5f4a6bc81", "functionArn": "arn:aws:lambda:us-east-1:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted" | "MaximumPayloadSizeExceeded", "approximateInvokeCount": 1 }, "responseContext": { // null if record is MaximumPayloadSizeExceeded "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:38:06.021Z", "KafkaBatchInfo": { "batchSize": 500, "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2", "bootstrapServers": "...", "payloadSize": 2039086, // In bytes "recordsInfo": { "Topic-0": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", }, "Topic-1": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", } } }, "payload": "<Whole Event>" // Only available in S3 }
提示

我们建议在目标存储桶上启用 S3 版本控制。

配置失败时的目标

要使用控制台配置失败时的目标,请执行以下步骤:

  1. 打开 Lamba 控制台的 Functions(函数)页面。

  2. 选择函数。

  3. Function overview (函数概览) 下,选择 Add destination (添加目标)

  4. 对于,请选择事件源映射调用

  5. 对于事件源映射,请选择为此函数配置的事件源。

  6. 条件中,选择失败时。对于事件源映射调用,这是唯一可接受的条件。

  7. 对于目标类型,请选择 Lambda 要发送调用记录的目标类型。

  8. 对于 Destination (目标),请选择一个资源。

  9. 选择保存

您还可以使用 Lambda API 配置失败时的目标。例如,以下 CreateEventSourceMapping CLI 命令将为 MyFunction 添加 SQS 失败时目标:

aws lambda create-event-source-mapping \ --function-name "MyFunction" \ --destination-config '{"OnFailure": {"Destination": "arn:aws:sqs:us-east-1:123456789012:dest-queue"}}'

以下 UpdateEventSourceMapping CLI 命令将 S3 失败时目标添加到与输入 uuid 关联的 Kafka 事件源:

aws lambda update-event-source-mapping \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --destination-config '{"OnFailure": {"Destination": "arn:aws:s3:::dest-bucket"}}'

要移除目标,请提供一个空字符串作为 destination-config 参数的实际参数:

aws lambda update-event-source-mapping \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --destination-config '{"OnFailure": {"Destination": ""}}'

添加 Amazon MSK 触发器(控制台)

按照以下步骤将 Amazon MSK 集群和 Kafka 主题添加为 Lambda 函数的触发器。

将 Amazon MSK 触发器添加到 Lambda 函数(控制台)
  1. 打开 Lamba 控制台的 Functions(函数)页面。

  2. 选择 Lambda 函数的名称。

  3. Function overview(函数概览)下,选择 Add trigger(添加触发器)。

  4. Trigger configuration(触发配置)下,执行以下操作:

    1. 选择 MSK 触发器类型。

    2. 对于 MSK cluster(MSK 集群),选择您的集群。

    3. 对于 Batch size(批处理大小),输入要在单个批次中接收的最大消息数。

    4. 对于 Batch window(批处理时段),输入 Lambda 在调用函数之前收集记录所花费的最大秒数。

    5. 对于 Topic name(主题名称),输入 Kafka 主题名称。

    6. (可选)对于 Consumer group ID(使用者组 ID),输入要加入的 Kafka 使用者组的 ID。

    7. (可选)对于起始位置,选择最新即可从最新记录开始读取流,选择最早即可从最早的可用记录开始读取流,选择在时间戳处即可从指定的时间戳开始读取流。

    8. (可选)对于 Authentication(身份验证),选择用于通过 MSK 集群中的代理进行身份验证的密钥。

    9. 要在禁用状态下创建触发器以进行测试(推荐),请清除 Enable trigger(启用触发器)。或者,要立即启用该触发器,请选择 Enable trigger(启用触发器)。

  5. 要创建触发器,请选择 Add(添加)。

添加 Amazon MSK 触发器(AWS CLI)

使用以下示例 AWS CLI 命令为 Lambda 函数创建和查看 Amazon MSK 触发器。

使用 AWS CLI 创建触发器

例 — 为使用 IAM 身份验证的集群创建事件源映射

以下示例使用 create-event-source-mapping AWS CLI 命令将一个名为 my-kafka-function 的 Lambda 函数映射至一个名为 AWSKafkaTopic 的 Kafka 主题。将主题的起始位置设置为 LATEST。当集群使用基于 IAM 角色的身份验证时,您不需要 SourceAccessConfiguration 对象。例如:

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-east-1:111122223333:cluster/my-cluster/fc2f5bdf-fd1b-45ad-85dd-15b4a5a6247e-2 \ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function
例 — 为使用 SASL/SCRAM 身份验证的集群创建事件源映射

如果集群使用 SASL/SCRAM 身份验证,则必须包含指定 SASL_SCRAM_512_AUTHSourceAccessConfiguration 对象以及 Secrets Manager 密钥 ARN。

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-east-1:111122223333:cluster/my-cluster/fc2f5bdf-fd1b-45ad-85dd-15b4a5a6247e-2 \ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function --source-access-configurations '[{"Type": "SASL_SCRAM_512_AUTH","URI": "arn:aws:secretsmanager:us-east-1:111122223333:secret:my-secret"}]'
例 — 为使用 mTLS 身份验证的集群创建事件源映射

如果集群使用 mTLS 身份验证,则必须包含指定 CLIENT_CERTIFICATE_TLS_AUTHSourceAccessConfiguration 对象以及 Secrets Manager 密钥 ARN。

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-east-1:111122223333:cluster/my-cluster/fc2f5bdf-fd1b-45ad-85dd-15b4a5a6247e-2 \ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function --source-access-configurations '[{"Type": "CLIENT_CERTIFICATE_TLS_AUTH","URI": "arn:aws:secretsmanager:us-east-1:111122223333:secret:my-secret"}]'

有关更多信息,请参阅 CreateEventSourceMapping API 参考文档。

使用 AWS CLI 查看状态

以下示例使用 get-event-source-mapping AWS CLI 命令来描述您创建的事件源映射的状态。

aws lambda get-event-source-mapping \ --uuid 6d9bce8e-836b-442c-8070-74e77903c815

创建跨账户事件源映射

您可以使用多 VPC 私有连接将 Lambda 函数连接到不同 AWS 账户 中的预置 MSK 集群。多 VPC 连接使用 AWS PrivateLink,可将所有流量保持在 AWS 网络内。

注意

您无法为无服务器 MSK 集群创建跨账户事件源映射。

要创建跨账户事件源映射,必须先为 MSK 集群配置多 VPC 连接。创建事件源映射时,请使用托管 VPC 连接 ARN 而非集群 ARN,如以下示例所示。CreateEventSourceMapping 操作也因 MSK 集群使用的身份验证类型而异。

例 — 为使用 IAM 身份验证的集群创建跨账户事件源映射

当集群使用基于 IAM 角色的身份验证时,您不需要 SourceAccessConfiguration 对象。例如:

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-east-1:111122223333:vpc-connection/444455556666/my-cluster-name/51jn98b4-0a61-46cc-b0a6-61g9a3d797d5-7 \ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function
例 — 为使用 SASL/SCRAM 身份验证的集群创建跨账户事件源映射

如果集群使用 SASL/SCRAM 身份验证,则必须包含指定 SASL_SCRAM_512_AUTHSourceAccessConfiguration 对象以及 Secrets Manager 密钥 ARN。

有两种方法可以通过 SASL/SCRAM 身份验证将密钥用于跨账户 Amazon MSK 事件源映射:

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-east-1:111122223333:vpc-connection/444455556666/my-cluster-name/51jn98b4-0a61-46cc-b0a6-61g9a3d797d5-7 \ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function \ --source-access-configurations '[{"Type": "SASL_SCRAM_512_AUTH","URI": "arn:aws:secretsmanager:us-east-1:444455556666:secret:my-secret"}]'
例 — 为使用 mTLS 身份验证的集群创建跨账户事件源映射

如果集群使用 mTLS 身份验证,则必须包含指定 CLIENT_CERTIFICATE_TLS_AUTHSourceAccessConfiguration 对象以及 Secrets Manager 密钥 ARN。密钥可以存储在集群账户或 Lambda 函数账户中。

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-east-1:111122223333:vpc-connection/444455556666/my-cluster-name/51jn98b4-0a61-46cc-b0a6-61g9a3d797d5-7 \ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function \ --source-access-configurations '[{"Type": "CLIENT_CERTIFICATE_TLS_AUTH","URI": "arn:aws:secretsmanager:us-east-1:444455556666:secret:my-secret"}]'

Amazon MSK 事件源的自动伸缩

当您最初创建 Amazon MSK 事件源时,Lambda 会分配一个使用者来处理 Kafka 主题中的所有分区。每个使用者都使用多个并行运行的处理器来处理增加的工作负载。此外,Lambda 会根据工作负载自动增加或缩减使用者的数量。要保留每个分区中的消息顺序,使用者的最大数量为主题中每个分区一个使用者。

Lambda 会按一分钟的间隔时间来评估主题中所有分区的使用者偏移滞后。如果延迟太高,则分区接收消息的速度比 Lambda 处理消息的速度更快。如有必要,Lambda 会在主题中添加或删除使用者。增加或移除使用者的扩缩过程会在评估完成后的三分钟内进行。

如果目标 Lambda 函数受到限制,Lambda 会减少使用者的数量。此操作通过减少使用者可以检索和发送到函数的消息数来减少函数的工作负载。

要监控 Kafka 主题的吞吐量,请查看 Lambda 在您的函数处理记录时发出的偏移滞后指标

要检查并行发生的函数调用次数,还可以监控函数的并发指标

轮询和流的起始位置

请注意,事件源映射创建和更新期间的流轮询最终是一致的。

  • 在事件源映射创建期间,可能需要几分钟才能开始轮询来自流的事件。

  • 在事件源映射更新期间,可能需要几分钟才能停止和重新开始轮询来自流的事件。

此行为意味着,如果你指定 LATEST 作为流的起始位置,事件源映射可能会在创建或更新期间错过事件。为确保不会错过任何事件,请将流的起始位置指定为 TRIM_HORIZON 或 AT_TIMESTAMP

Amazon CloudWatch 指标

Lambda 会在您的函数处理记录时发出 OffsetLag 指标。此指标的值是写入 Kafka 事件源主题的最后一条记录与函数的使用者组处理的最后一条记录之间的偏移量差值。您可以使用 OffsetLag 来估计添加记录和使用者组处理记录之间的延迟。

如果 OffsetLag 呈上升趋势,则可能表明函数的使用者组中的轮询器存在问题。有关更多信息,请参阅 使用 Lambda 函数指标

Amazon MSK 配置参数

所有 Lambda 事件源类型共享相同的 CreateEventSourceMappingUpdateEventSourceMapping API 操作。但是,只有部分参数适用于Amazon MSK。

适用于 Amazon MSK 的事件源参数
参数 必需 默认值 注意

AmazonManagedKafkaEventSourceConfig

包含 ConsumerGroupId 字段,该字段默认为唯一值。

只能在 Create(创建)设置

BatchSize

100

最大值:10000

已启用

已启用

EventSourceArn

只能在 Create(创建)设置

FunctionName

FilterCriteria

Lambda 事件筛选

MaximumBatchingWindowInSeconds

500 毫秒

批处理行为

SourceAccessConfigurations

无凭证

事件源的 SASL/SCRAM 或 CLIENT_CERTIFICATE_TLS_AUTH (MutualTLS) 身份验证凭证

StartingPosition

Y

AT_TIMESTAMP、TRIM_HORIZON 或 LATEST

只能在 Create(创建)设置

StartingPositionTimestamp

当 StartingPosition 设置为 AT_TIMESTAMP 时,为必需项

主题

Kafka 主题名称

只能在 Create(创建)设置