在 Lambda 中保留 Kinesis Data Streams 事件源的已丢弃批次记录
Kinesis 事件源映射的错误处理取决于错误是在调用函数之前还是在函数调用期间发生的:
-
调用前:如果 Lambda 事件源映射由于节流或其他问题而无法调用该函数,则它会一直重试,直到记录过期或超过事件源映射上配置的最大期限(MaximumRecordAgeInSeconds)。
-
调用期间:如果调用函数但返回错误,Lambda 会重试,直到记录过期、超过最大期限(MaximumRecordAgeInSeconds)或达到配置的重试配额(MaximumRetryAttempts)。对于函数错误,您还可以配置 BisectBatchOnFunctionError,将失败的批次拆分为两个较小的批次,从而隔离错误记录并避免超时。拆分批次不会消耗重试配额。
如果错误处理措施失败,Lambda 将丢弃记录并继续处理数据流中的批次。使用默认设置时,这意味着错误的记录可能会阻止受影响的分区上的处理,时间最长为一周。为了避免这种情况,请配置函数的事件源映射,使用合理的重试次数和适合您的使用案例的最长记录期限。
配置失败调用的目标
要保留失败的事件源映射调用的记录,请在函数的事件源映射中添加一个目标。发送到目标的每条记录都是一个 JSON 文档,其中包含有关失败调用的元数据。您可以将任何 Amazon SNS 主题或 Amazon SQS 队列配置为目标。您的执行角色必须具有目标的权限:
-
对于 SQS 目标:sqs:SendMessage
-
对于 SNS 目标:sns:Publish
要使用控制台配置失败时的目标,请执行以下步骤:
打开 Lamba 控制台的 Functions
(函数)页面。 -
选择函数。
-
在 Function overview (函数概览) 下,选择 Add destination (添加目标)。
-
对于源,请选择事件源映射调用。
-
对于事件源映射,请选择为此函数配置的事件源。
-
在条件中,选择失败时。对于事件源映射调用,这是唯一可接受的条件。
-
对于目标类型,请选择 Lambda 要发送调用记录的目标类型。
-
对于 Destination (目标),请选择一个资源。
-
选择保存。
您还可以使用 AWS Command Line Interface(AWS CLI)配置失败时的目标。例如,以 create-event-source-mappingMyFunction
:
aws lambda create-event-source-mapping \ --function-name "MyFunction" \ --event-source-arn arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream \ --destination-config '{"OnFailure": {"Destination": "arn:aws:sqs:us-east-1:123456789012:dest-queue"}}'
以下 update-event-source-mapping
aws lambda update-event-source-mapping \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --maximum-retry-attempts 2 \ --maximum-record-age-in-seconds 3600 \ --destination-config '{"OnFailure": {"Destination": "arn:aws:sns:us-east-1:123456789012:dest-topic"}}'
更新的设置是异步应用的,并且直到该过程完成才反映在输出中。使用 get-event-source-mapping
要移除目标,请提供一个空字符串作为 destination-config
参数的实际参数:
aws lambda update-event-source-mapping \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --destination-config '{"OnFailure": {"Destination": ""}}'
以下示例显示了 Lambda 在 Kinesis 事件源调用失败时向 SQS 队列或 SNS 主题发送的内容。由于 Lambda 仅为这些目标类型发送元数据,因此请使用 streamArn
、shardId
、startSequenceNumber
和 endSequenceNumber
字段获取完整的原始记录。
{ "requestContext": { "requestId": "c9b8fa9f-5a7f-xmpl-af9c-0c604cde93a5", "functionArn": "arn:aws:lambda:us-east-2:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted", "approximateInvokeCount": 1 }, "responseContext": { "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:38:06.021Z", "KinesisBatchInfo": { "shardId": "shardId-000000000001", "startSequenceNumber": "49601189658422359378836298521827638475320189012309704722", "endSequenceNumber": "49601189658422359378836298522902373528957594348623495186", "approximateArrivalOfFirstRecord": "2019-11-14T00:38:04.835Z", "approximateArrivalOfLastRecord": "2019-11-14T00:38:05.580Z", "batchSize": 500, "streamArn": "arn:aws:kinesis:us-east-2:123456789012:stream/mystream" } }
您可以使用此信息从流中检索受影响的记录以进行故障排除。实际记录不包括在内,因此您必须处理此记录并在记录过期和丢失之前从流中检索它们。