将 AWS Lambda 与 Amazon DynamoDB 结合使用 - AWS Lambda

将 AWS Lambda 与 Amazon DynamoDB 结合使用

您可以使用 AWS Lambda 函数来处理 Amazon DynamoDB 流中的记录。使用 DynamoDB Streams,每次更新 DynamoDB 表时,您都可以触发 Lambda 函数以执行额外的工作。

Lambda 从流中读取记录,并使用包含流记录的事件同步调用您的函数。Lambda 以批量方式读取记录并调用您的函数来处理批次中的记录。

例 DynamoDB Streams 记录事件

{ "Records": [ { "eventID": "1", "eventVersion": "1.0", "dynamodb": { "Keys": { "Id": { "N": "101" } }, "NewImage": { "Message": { "S": "New item!" }, "Id": { "N": "101" } }, "StreamViewType": "NEW_AND_OLD_IMAGES", "SequenceNumber": "111", "SizeBytes": 26 }, "awsRegion": "us-west-2", "eventName": "INSERT", "eventSourceARN": eventsourcearn, "eventSource": "aws:dynamodb" }, { "eventID": "2", "eventVersion": "1.0", "dynamodb": { "OldImage": { "Message": { "S": "New item!" }, "Id": { "N": "101" } }, "SequenceNumber": "222", "Keys": { "Id": { "N": "101" } }, "SizeBytes": 59, "NewImage": { "Message": { "S": "This item has changed" }, "Id": { "N": "101" } }, "StreamViewType": "NEW_AND_OLD_IMAGES" }, "awsRegion": "us-west-2", "eventName": "MODIFY", "eventSourceARN": sourcearn, "eventSource": "aws:dynamodb" }

Lambda 将针对记录轮询 DynamoDB 流中的分片(按照每秒 4 次的基本频率)。当记录可用时,Lambda 调用您的函数并等待结果。如果处理成功,Lambda 将恢复轮询,直到它收到更多记录。

默认情况下,只要流中有记录,Lambda 就会调用您的函数。如果 Lambda 从流中读取的批处理中只有一条记录,则 Lambda 只会向该函数发送一条记录。为避免在记录数量较少的情况下调用该函数,您可以配置批处理时段,让事件源缓冲最多 5 分钟的记录。在调用该函数之前,Lambda 会继续从流中读取记录,直到收集了完整批次,或者直到批处理时段到期。

如果您的函数返回一个错误,则 Lambda 将重试批处理,直到处理成功或数据过期。为避免分片停滞,可以将事件源映射配置为以较小的批处理大小重试,限制重试次数或者丢弃太早的记录。要保留丢弃的事件,可以配置事件源映射,以将有关失败批处理的详细信息发送到 SQS 队列或 SNS 主题。

您还可以通过并行处理每个分片的多个批处理来提高并发性。在每个分区中,Lambda 最多可以同时处理 10 个批处理。如果您增加每个分区的并发批处理数量,则 Lambda 仍然需要确保在分区键级别进行有序处理。

ParallelizationFactor 设置配置为使用多个 Lambda 调用同时处理 Kinesis 或 DynamoDB 数据流的一个分区。您可以指定 Lambda 通过并行化因子(从 1 [默认值] 到 10)从某个分区轮询的并发批处理数量。例如,如果将 ParallelizationFactor 设置为 2,则最多可以有 200 次并发 Lambda 调用来处理 100 个 Kinesis 数据分区。这有助于在数据卷不稳定且 IteratorAge 较高时提高处理吞吐量。有关更多信息,请参阅针对 Kinesis 和 DynamoDB 事件源的新 AWS Lambda 扩展控制

执行角色权限

Lambda 需要以下权限才能管理与您的 DynamoDB 流相关的资源。将这些权限添加到您的函数的执行角色中。

AWSLambdaDynamoDBExecutionRole 托管策略包含这些权限。有关更多信息,请参阅 AWS Lambda 执行角色

要将失败批处理的记录发送到队列或主题,您的函数需要其他权限。每项目标服务均需要不同的权限,如下所示:

将流配置为事件源

创建事件源映射以指示 Lambda 将流中的记录发送到 Lambda 函数。您可以创建多个事件源映射,以使用多个 Lambda 函数处理相同的数据,或使用单个函数处理来自多个流的项目。

要在 Lambda 控制台中将您的函数配置为从 DynamoDB Streams 读取,请创建 DynamoDB 触发器。

创建触发器

  1. 打开 Lambda 控制台的“函数”页面

  2. 选择函数。

  3. Function overview (函数概览) 下,选择 Add trigger (添加触发器)

  4. 选择触发器类型。

  5. 配置所需选项,然后选择 Add (添加)

Lambda 支持 DynamoDB 事件源的以下选项。

事件源选项

  • DynamoDB 表 – 要从中读取记录的 DynamoDB 表。

  • Batch size (批处理大小) – 每个批处理中发送到函数的记录的数量(最多 10000 条)。Lambda 通过单个调用将批处理中的所有记录传递给函数,前提是事件的总大小未超出同步调用的负载限制 (6 MB)。

  • 批处理时段 – 指定在调用函数之前收集记录的最长时间(以秒为单位)。

  • 起始位置 – 仅处理新记录或所有现有记录。

    • 最新 – 处理已添加到流中的新记录。

    • 时间范围 – 处理流中的所有记录。

    在处理任何现有记录后,函数将继续处理新记录。

  • On-failure destination (故障目标) – SQS 队列或 SNS 主题,用于无法处理的记录。当 Lambda 由于时间太远或已用尽所有重试而丢弃一批记录时,它将有关该批处理的详细信息发送到队列或主题。

  • Retry attempts (重试) – 函数返回错误时 Lambda 重试的最大次数。这不适用于批处理未到达该函数的服务错误或限制。

  • Maximum age of record (最长记录期限) – Lambda 发送到函数的记录的最长期限。

  • Split batch on error (出错时拆分批处理) – 当函数返回错误时,请在重试之前将批处理拆分为两部分。

  • Concurrent batches per shard (每个分片的并发批处理) – 并发处理来自同一分片的多个批处理。

  • 已启用 – 设置为 true 可启用事件源映射。设置为 false 可停止处理记录。Lambda 将跟踪已处理的最后一条记录,并在重新启用映射后从停止位置重新开始处理。

注意

对于 Lambda 作为 DynamoDB 触发器的一部分调用的 GetRecords API 调用,您不需要付费。

之后,要管理事件源配置,请在设计器中选择触发器。

事件源映射 API

要使用 AWS CLIAWS 开发工具包管理事件源,您可以使用以下 API 操作:

以下示例使用 AWS CLI 将名为 my-function 的函数映射到由 Amazon 资源名称 (ARN) 指定的 DynamoDB 流(批处理大小为 500)。

aws lambda create-event-source-mapping --function-name my-function --batch-size 500 --starting-position LATEST \ --event-source-arn arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2019-06-10T19:26:16.525

您应看到以下输出:

{ "UUID": "14e0db71-5d35-4eb5-b481-8945cf9d10c2", "BatchSize": 500, "MaximumBatchingWindowInSeconds": 0, "ParallelizationFactor": 1, "EventSourceArn": "arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2019-06-10T19:26:16.525", "FunctionArn": "arn:aws:lambda:us-east-2:123456789012:function:my-function", "LastModified": 1560209851.963, "LastProcessingResult": "No records processed", "State": "Creating", "StateTransitionReason": "User action", "DestinationConfig": {}, "MaximumRecordAgeInSeconds": 604800, "BisectBatchOnFunctionError": false, "MaximumRetryAttempts": 10000 }

配置其他选项,以自定义如何处理批处理,并指定何时丢弃无法处理的记录。以下示例更新事件源映射,以在两次重试之后或者如果失败记录已存在一个小时以上,将失败记录发送到 SQS 队列。

aws lambda update-event-source-mapping --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --maximum-retry-attempts 2 --maximum-record-age-in-seconds 3600 --destination-config '{"OnFailure": {"Destination": "arn:aws:sqs:us-east-2:123456789012:dlq"}}'

您应该会看到以下输出:

{ "UUID": "f89f8514-cdd9-4602-9e1f-01a5b77d449b", "BatchSize": 100, "MaximumBatchingWindowInSeconds": 0, "ParallelizationFactor": 1, "EventSourceArn": "arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2019-06-10T19:26:16.525", "FunctionArn": "arn:aws:lambda:us-east-2:123456789012:function:my-function", "LastModified": 1573243620.0, "LastProcessingResult": "PROBLEM: Function call failed", "State": "Updating", "StateTransitionReason": "User action", "DestinationConfig": {}, "MaximumRecordAgeInSeconds": 604800, "BisectBatchOnFunctionError": false, "MaximumRetryAttempts": 10000 }

更新的设置是异步应用的,并且直到该过程完成才反映在输出中。使用 get-event-source-mapping 命令可查看当前状态。

aws lambda get-event-source-mapping --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b

您应该会看到以下输出:

{ "UUID": "f89f8514-cdd9-4602-9e1f-01a5b77d449b", "BatchSize": 100, "MaximumBatchingWindowInSeconds": 0, "ParallelizationFactor": 1, "EventSourceArn": "arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2019-06-10T19:26:16.525", "FunctionArn": "arn:aws:lambda:us-east-2:123456789012:function:my-function", "LastModified": 1573244760.0, "LastProcessingResult": "PROBLEM: Function call failed", "State": "Enabled", "StateTransitionReason": "User action", "DestinationConfig": { "OnFailure": { "Destination": "arn:aws:sqs:us-east-2:123456789012:dlq" } }, "MaximumRecordAgeInSeconds": 3600, "BisectBatchOnFunctionError": false, "MaximumRetryAttempts": 2 }

要同时处理多个批处理,请使用 --parallelization-factor 选项。

aws lambda update-event-source-mapping --uuid 2b733gdc-8ac3-cdf5-af3a-1827b3b11284 \ --parallelization-factor 5

错误处理

从 DynamoDB 流中读取记录的事件源映射将同步调用函数并在出错时重试。如果函数受到限制,或者 Lambda 服务未调用该函数而返回错误,Lambda 将重试,直到记录到期或者超过您在事件源映射上配置的最长期限。

如果函数接收到记录但返回错误,Lambda 将重试,直到批处理中的记录到期、超过最大使用期限或者达到配置的重试配额。对于函数错误,您还可以配置事件源映射,以将失败的批处理拆分为两个批处理。重试较小的批处理可以隔离不良记录并解决超时问题。拆分批处理不计入重试配额。

如果错误处理措施失败,Lambda 将丢弃记录并继续处理流中的批处理。使用默认设置,这意味着不良记录最多可以将针对受影响分片的处理操作阻止one day。为避免这种情况,请以合理的重试次数和适合您使用案例的最长记录期限来配置函数的事件源映射。

要保留废弃批处理的记录,请配置失败事件目标。Lambda 将文档和有关批处理的详细信息发送到目标队列或主题。

配置失败事件记录的目标

  1. 打开 Lambda 控制台的“函数”页面

  2. 选择函数。

  3. Function overview (函数概览) 下,选择 Add destination (添加目标)

  4. 对于 Source (源),选择 Stream invocation (流调用)

  5. 对于 Stream (流),选择映射到函数的流。

  6. 对于 Destination type (目标类型),请选择接收调用记录的资源类型。

  7. 对于 Destination (目标),请选择一个资源。

  8. 选择 Save

以下示例显示了 DynamoDB 流的调用记录。

例 调用记录

{ "requestContext": { "requestId": "316aa6d0-8154-xmpl-9af7-85d5f4a6bc81", "functionArn": "arn:aws:lambda:us-east-2:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted", "approximateInvokeCount": 1 }, "responseContext": { "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:13:49.717Z", "DDBStreamBatchInfo": { "shardId": "shardId-00000001573689847184-864758bb", "startSequenceNumber": "800000000003126276362", "endSequenceNumber": "800000000003126276362", "approximateArrivalOfFirstRecord": "2019-11-14T00:13:19Z", "approximateArrivalOfLastRecord": "2019-11-14T00:13:19Z", "batchSize": 1, "streamArn": "arn:aws:dynamodb:us-east-2:123456789012:table/mytable/stream/2019-11-14T00:04:06.388" } }

您可以使用此信息从流中检索受影响的记录以进行故障排除。实际的记录不包括在内,因此您必须处理这些记录,并在它们到期并丢失之前从流中检索它们。

Amazon CloudWatch 指标

在您的函数处理完一批记录后,Lambda 将发出 IteratorAge 指标。该指标指示处理完成时批处理中最后一条记录的时间。如果您的函数正在处理新事件,则可使用迭代器期限来估算新记录的添加时间与函数处理新记录的时间之间的延迟。

迭代器期限中的上升趋势可以指示您的函数问题。有关更多信息,请参阅使用 AWS Lambda 函数指标

时间窗口

Lambda 函数可以运行连续流处理应用程序。流表示通过您的应用程序持续流动的无边界数据。要分析这种不断更新的输入中的信息,可以使用按时间定义的窗口来限制包含的记录。

Lambda 调用是无状态的 – 在没有外部数据库的情况下,无法使用它们跨多个连续调用处理数据。但是,启用窗口化功能后,您可以在不同调用中保持状态。此状态包含之前为当前窗口处理的消息的汇总结果。您的状态最多可以是每个分片 1MB。如果超过该大小,Lambda 将提前终止窗口。

滚动窗口

Lambda 函数可以使用滚动窗口聚合数据:定期打开和关闭的不同窗口。滚动窗口使您能够通过连续的非重叠时间窗口处理流式数据源。

流的每条记录都属于特定窗口。记录只在 Lambda 处理记录所属的窗口时处理一次。在每个窗口中,您都可以在分片内的分区键级别执行计算,例如求和或求平均值。

聚合和处理

系统将调用您的用户托管函数以便聚合和处理该聚合的最终结果。Lambda 汇总在该窗口收到的所有记录。您可以分多个批次接收这些记录,每个批次都作为单独的调用。每次调用都会收到一个状态。您还可以处理记录并返回新状态,下次调用时将传递该新状态。Lambda 将在 JSON 中返回以下格式的 TimeWindowEventResponse

TimeWindowEventReponse

{ "state": { "1": 282, "2": 715 }, "batchItemFailures": [] }
注意

对于 Java 函数,我们建议使用 Map<String, String> 来表示状态。

在窗口末尾,标志 isFinalInvokeForWindow 被设置 true,以表示这是最终状态,并且已准备好进行处理。处理完成后,窗口完成,最终调用完成,然后状态将被删除。

在窗口结束时,Lambda 会对针对聚合结果的操作应用最终处理。您的最终处理将同步调用。成功调用后,函数会检查序列号并继续进行流处理。如果调用失败,则您的 Lambda 函数将暂停进一步处理,直到成功调用为止。

例 DynamodbTimeWindowEvent

{ "Records":[ { "eventID":"1", "eventName":"INSERT", "eventVersion":"1.0", "eventSource":"aws:dynamodb", "awsRegion":"us-east-1", "dynamodb":{ "Keys":{ "Id":{ "N":"101" } }, "NewImage":{ "Message":{ "S":"New item!" }, "Id":{ "N":"101" } }, "SequenceNumber":"111", "SizeBytes":26, "StreamViewType":"NEW_AND_OLD_IMAGES" }, "eventSourceARN":"stream-ARN" }, { "eventID":"2", "eventName":"MODIFY", "eventVersion":"1.0", "eventSource":"aws:dynamodb", "awsRegion":"us-east-1", "dynamodb":{ "Keys":{ "Id":{ "N":"101" } }, "NewImage":{ "Message":{ "S":"This item has changed" }, "Id":{ "N":"101" } }, "OldImage":{ "Message":{ "S":"New item!" }, "Id":{ "N":"101" } }, "SequenceNumber":"222", "SizeBytes":59, "StreamViewType":"NEW_AND_OLD_IMAGES" }, "eventSourceARN":"stream-ARN" }, { "eventID":"3", "eventName":"REMOVE", "eventVersion":"1.0", "eventSource":"aws:dynamodb", "awsRegion":"us-east-1", "dynamodb":{ "Keys":{ "Id":{ "N":"101" } }, "OldImage":{ "Message":{ "S":"This item has changed" }, "Id":{ "N":"101" } }, "SequenceNumber":"333", "SizeBytes":38, "StreamViewType":"NEW_AND_OLD_IMAGES" }, "eventSourceARN":"stream-ARN" } ], "window": { "start": "2020-07-30T17:00:00Z", "end": "2020-07-30T17:05:00Z" }, "state": { "1": "state1" }, "shardId": "shard123456789", "eventSourceARN": "stream-ARN", "isFinalInvokeForWindow": false, "isWindowTerminatedEarly": false }

配置

您可以在创建或更新事件源映射时配置滚动窗口。要配置滚动窗口,请以秒为单位进行指定。以下示例 AWS Command Line Interface (AWS CLI) 命令会创建一个滚动窗口为 120 秒的流式事件源映射。为聚合和处理定义的 Lambda 函数被命名为 tumbling-window-example-function

aws lambda create-event-source-mapping --event-source-arn arn:aws:dynamodb:us-east-1:123456789012:stream/lambda-stream --function-name "arn:aws:lambda:us-east-1:123456789018:function:tumbling-window-example-function" --region us-east-1 --starting-position TRIM_HORIZON --tumbling-window-in-seconds 120

Lambda 根据记录插入到流的时间来确定滚动窗口的边界。所有记录都有一个大致的时间戳,供 Lambda 在确定边界时使用。

滚动窗口聚合不支持重新分片。当分片结束时,Lambda 会认为窗口已关闭,子分片将以全新的状态启动自己的窗口。

滚动窗口完全支持现有的重试策略 maxRetryAttemptsmaxRecordAge

例 Handler.py – 聚合和处理

以下 Python 函数演示了如何聚合然后处理您的最终状态:

def lambda_handler(event, context): print('Incoming event: ', event) print('Incoming state: ', event['state']) #Check if this is the end of the window to either aggregate or process. if event['isFinalInvokeForWindow']: # logic to handle final state of the window print('Destination invoke') else: print('Aggregate invoke') #Check for early terminations if event['isWindowTerminatedEarly']: print('Window terminated early') #Aggregation logic state = event['state'] for record in event['Records']: state[record['dynamodb']['NewImage']['Id']] = state.get(record['dynamodb']['NewImage']['Id'], 0) + 1 print('Returning state: ', state) return {'state': state}

报告批处理项目失败

在使用和处理来自事件源的流式数据时,默认情况下,Lambda 仅在批处理完全成功时,才会在批次的最高序列号处设置检查点。Lambda 会将所有其他结果视为完全失败并重试批处理,直至达到重试次数上限。要允许在处理来自流的批次时部分成功,请开启 ReportBatchItemFailures。允许部分成功有助于减少对记录重试的次数,尽管这并不能完全阻止在成功记录中重试的可能性。

要开启 ReportBatchItemFailures,请在 FunctionResponseTypes 列表 ReportBatchItemFailures 中包含枚举值。此列表指示为函数启用了哪些响应类型。您可以在创建或更新事件源映射时配置此列表。

报告语法

配置批处理项目失败的报告时,将返回 StreamsEventResponse 类,其中包含批处理项目失败列表。您可以使用 StreamsEventResponse 对象返回批处理中第一个失败记录的序列号。您还可以使用正确的响应语法来创建自己的自定义类。以下 JSON 结构显示了所需的响应语法:

{ "batchItemFailures": [ { "itemIdentifier": "<id>" } ] }

成功和失败的条件

如果返回以下任意一项,则 Lambda 会将批处理视为完全成功:

  • 空的 batchItemFailure 列表

  • Null batchItemFailure 列表

  • 空的 EventResponse

  • Null EventResponse

如果返回以下任何一项,则 Lambda 会将批处理视为完全失败:

  • 空字符串 itemIdentifier

  • Null itemIdentifier

  • 包含错误密钥名的 itemIdentifier

Lambda 会根据您的重试策略在失败时重试。

将批次一分为二

如果调用失败并且已开启 BisectBatchOnFunctionError,则无论您的 ReportBatchItemFailures 设置如何,批次都将一分为二。

当收到批处理部分成功响应且同时开启 BisectBatchOnFunctionErrorReportBatchItemFailures 时,批次将在返回的序列号处一分为二,并且 Lambda 将仅重试剩余记录。

Java

例 Handler.java – 返回新的 StreamsEventResponse()

import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestHandler; import com.amazonaws.services.lambda.runtime.events.DynamodbEvent; import java.io.Serializable; import java.util.ArrayList; import java.util.List; public class ProcessDynamodbRecords implements RequestHandler<DynamodbEvent, Serializable> { @Override public Serializable handleRequest(DynamodbEvent input, Context context) { List<StreamsEventResponse.BatchItemFailure> batchItemFailures = new ArrayList<*>(); String curRecordSequenceNumber = ""; for (DynamodbEvent.DynamodbEventRecord dynamodbEventRecord : input.getRecords()) { try { //Process your record DynamodbEvent.Record dynamodbRecord = dynamodbEventRecord.getDynamodb(); curRecordSequenceNumber = dynamodbRecord.getSequenceNumber(); } catch (Exception e) { //Return failed record's sequence number batchItemFailures.add(new StreamsEventResponse.BatchItemFailure(curRecordSequenceNumber)); return new StreamsEventResponse(batchItemFailures); } } return new StreamsEventResponse(batchItemFailures); } }
Python

例 Handler.py – 返回 batchItemFailures[]

def handler(event, context): records = event.get("Records") curRecordSequenceNumber = ""; for record in records: try: # Process your record curRecordSequenceNumber = record["dynamodb"]["sequenceNumber"] except Exception as e: # Return failed record's sequence number return {"batchItemFailures":[{"itemIdentifier": curRecordSequenceNumber}]} return {"batchItemFailures":[]}