AWS Lambda を Amazon DynamoDB に使用する - AWS Lambda

AWS Lambda を Amazon DynamoDB に使用する

AWS Lambda 関数を使用して、Amazon DynamoDB ストリームのレコードを処理できます。DynamoDB ストリーム では、Lambda 関数を使用して、DynamoDB テーブルが更新されるたびに追加の作業を実行することができます。

Lambda は、ストリームからレコードを読み取り、ストリームレコードを含むイベントを使用して関数を同期的に呼び出します。Lambda は、バッチ単位でレコードを読み取り、関数を呼び出してバッチからレコードを処理します。

例 DynamoDB ストリーム レコードイベント

{ "Records": [ { "eventID": "1", "eventVersion": "1.0", "dynamodb": { "Keys": { "Id": { "N": "101" } }, "NewImage": { "Message": { "S": "New item!" }, "Id": { "N": "101" } }, "StreamViewType": "NEW_AND_OLD_IMAGES", "SequenceNumber": "111", "SizeBytes": 26 }, "awsRegion": "us-west-2", "eventName": "INSERT", "eventSourceARN": eventsourcearn, "eventSource": "aws:dynamodb" }, { "eventID": "2", "eventVersion": "1.0", "dynamodb": { "OldImage": { "Message": { "S": "New item!" }, "Id": { "N": "101" } }, "SequenceNumber": "222", "Keys": { "Id": { "N": "101" } }, "SizeBytes": 59, "NewImage": { "Message": { "S": "This item has changed" }, "Id": { "N": "101" } }, "StreamViewType": "NEW_AND_OLD_IMAGES" }, "awsRegion": "us-west-2", "eventName": "MODIFY", "eventSourceARN": sourcearn, "eventSource": "aws:dynamodb" }

Lambda は、レコードの DynamoDB ストリームにあるシャードを 1 秒あたり 4 回の基本レートでポーリングします。レコードが利用可能になると、Lambda は関数を呼び出し、結果を待機します。処理が成功すると、Lambda は、レコードをさらに受け取るまでポーリングを再開します。

デフォルトでは、Lambda はストリームからレコードが利用可能になるとすぐに、関数を呼び出します。ストリームから読み込むバッチに、1 つのレコードしか含まれない場合、Lambda は関数に 1 つのレコードのみを送信します。少数のレコードで関数を呼び出すことを避けるため、[batch window (バッチウィンドウ)] を設定して、最大 5 分間レコードをバッファするようイベントソースに指定できます。Lambda は、関数を呼び出す前に、完全なバッチを収集するか、バッチウィンドウの有効期限が切れるまで、ストリームから継続してレコードを読み取ります。

関数がエラーを返した場合、処理が成功するか、データの有効期限が切れるまで Lambda はバッチを再試行します。シャードの停止を避けるには、より小さいバッチサイズで再試行する、再試行数を制限する、または古いレコードを破棄するよう、イベントソースマッピングを設定します。破棄されたイベントを保持するには、失敗したバッチの詳細を SQS キューまたは SNS トピックに送信するよう、イベントソースマッピングを設定できます。

各シャードから複数のバッチを並行して処理し、同時実行を増やすこともできます。Lambda は各シャードで同時に最大 10 個のバッチを処理できます。シャードごとの同時実行バッチ数を増やした場合、Lambda はパーティションキーレベルで順序どおりの処理を継続します。

実行ロールのアクセス許可

Lambda には、DynamoDB ストリームに関連するリソースを管理するための以下のアクセス許可が必要です。これを関数の実行ロールに追加します。

AWSLambdaDynamoDBExecutionRole 管理ポリシーには、これらのアクセス許可が含まれています。詳細については、「AWS Lambda 実行ロール」を参照してください。

キューまたはトピックに失敗したバッチのレコードを送信するには、関数に追加のアクセス権限が必要です。各送信先サービスには、次のように異なるアクセス許可が必要です。

イベントソースとしてストリームを設定する

イベントソースマッピングを作成し、ストリームから Lambda 関数にレコードを送信するように Lambda に通知します。複数のイベントソースマッピングを作成することで、複数の Lambda 関数で同じデータを処理したり、1 つの関数で複数のストリームの項目を処理したりできます。

Lambda コンソールで DynamoDB ストリーム から読み取るように関数を設定するには、DynamoDB トリガーを作成します。

トリガーを作成するには

  1. Lambda コンソール (関数ページ) を開きます。

  2. 関数を選択します。

  3. [Designer] で、[Add trigger] を選択します。

  4. トリガータイプを選択します。

  5. 必要なオプションを設定して [追加] を選択します。

Lambda では、DynamoDB イベントソースの次のオプションがサポートされています。

イベントソースオプション

  • DynamoDB テーブル – レコードの読み取り元の DynamoDB テーブル。

  • バッチサイズ – 各バッチ内の関数へ送信するレコードの数(最大 1,000)。Lambda は、同期呼び出し (6 MB) に対してイベントの合計サイズがペイロード制限を超えない限り、バッチ内のすべてのレコードを 1 回の呼び出しで関数に渡します。

  • バッチウィンドウ – 関数を呼び出す前にレコードを収集する最大時間 (秒数) を指定します。

  • 開始位置 – 新規レコードのみ、または既存のすべてのレコードを処理します。

    • 最新 – ストリームに追加された新しいレコードを処理します。

    • Trim horizon (水平トリム) – ストリーム内のすべてのレコードを処理します。

    既存のレコードを処理した後、関数に戻り、新しいレコードの処理が続行されます。

  • 失敗発生時の送信先 – 処理できないレコードの SQS キューまたは SNS トピック。Lambda は、レコードが古くなったか、最大回数の再試行を行ったためレコードのバッチを破棄した場合、バッチに関する詳細をキューまたはトピックに送信します。

  • Retry attempts (再試行数) – 関数がエラーを返すまでに Lambda が試みる再試行の最大数。これは、バッチが関数に到達しなかったときのサービスエラーやスロットリングには適用されません。

  • [Maximum age of record (レコードの最大期間)] – Lambda が関数に送信するレコードの最大期間。

  • エラー発生時のバッチ分割 – 関数がエラーを返した場合は、再試行する前にバッチを 2 つに分割します。

  • シャードごとの同時実行バッチ – 同じシャードから同時に複数のバッチを処理します。

  • 有効 – イベントソースマッピングを有効にするには、true に設定します。レコードの処理を停止するには false に設定します。Lambda は、処理された最新のレコードを追跡し、マッピングが再度有効になるとその時点から処理を再開します。

注記

DynamoDB は、Lambda がストリームからレコードを取得するための読み込みリクエストに対して課金します。料金の詳細については、「Amazon DynamoDB の料金表」を参照してください。

後でイベントソース設定を管理するには、デザイナーでトリガーを選択します。

イベントソースマッピング API

AWS CLI または AWS SDK を使用してイベントソースマッピングを管理するには、次の API アクションを使用します。

次の例では、AWS CLI を使用して、500 のバッチサイズで、my-function という名前の関数を、Amazon リソースネーム (ARN) によって指定された DynamoDB ストリームにマッピングします。

$ aws lambda create-event-source-mapping --function-name my-function --batch-size 500 --starting-position LATEST \ --event-source-arn arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2019-06-10T19:26:16.525 { "UUID": "14e0db71-5d35-4eb5-b481-8945cf9d10c2", "BatchSize": 500, "MaximumBatchingWindowInSeconds": 0, "ParallelizationFactor": 1, "EventSourceArn": "arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2019-06-10T19:26:16.525", "FunctionArn": "arn:aws:lambda:us-east-2:123456789012:function:my-function", "LastModified": 1560209851.963, "LastProcessingResult": "No records processed", "State": "Creating", "StateTransitionReason": "User action", "DestinationConfig": {}, "MaximumRecordAgeInSeconds": 604800, "BisectBatchOnFunctionError": false, "MaximumRetryAttempts": 10000 }

その他のオプションを設定してバッチの処理方法をカスタマイズし、処理できないレコードをいつ破棄するかを指定します。次の例では、イベントソースマッピングを更新し、2 回の再試行後、またはレコードが 1 時間以上古くなったときに SQS キューに失敗レコードを送信します。

$ aws lambda update-event-source-mapping --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --maximum-retry-attempts 2 --maximum-record-age-in-seconds 3600 --destination-config '{"OnFailure": {"Destination": "arn:aws:sqs:us-east-2:123456789012:dlq"}}' { "UUID": "f89f8514-cdd9-4602-9e1f-01a5b77d449b", "BatchSize": 100, "MaximumBatchingWindowInSeconds": 0, "ParallelizationFactor": 1, "EventSourceArn": "arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2019-06-10T19:26:16.525", "FunctionArn": "arn:aws:lambda:us-east-2:123456789012:function:my-function", "LastModified": 1573243620.0, "LastProcessingResult": "PROBLEM: Function call failed", "State": "Updating", "StateTransitionReason": "User action", "DestinationConfig": {}, "MaximumRecordAgeInSeconds": 604800, "BisectBatchOnFunctionError": false, "MaximumRetryAttempts": 10000 }

更新された設定は非同期に適用され、プロセスが完了するまで出力に反映されません。現在のステータスを表示するには、get-event-source-mapping コマンドを使用します。

$ aws lambda get-event-source-mapping --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b { "UUID": "f89f8514-cdd9-4602-9e1f-01a5b77d449b", "BatchSize": 100, "MaximumBatchingWindowInSeconds": 0, "ParallelizationFactor": 1, "EventSourceArn": "arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2019-06-10T19:26:16.525", "FunctionArn": "arn:aws:lambda:us-east-2:123456789012:function:my-function", "LastModified": 1573244760.0, "LastProcessingResult": "PROBLEM: Function call failed", "State": "Enabled", "StateTransitionReason": "User action", "DestinationConfig": { "OnFailure": { "Destination": "arn:aws:sqs:us-east-2:123456789012:dlq" } }, "MaximumRecordAgeInSeconds": 3600, "BisectBatchOnFunctionError": false, "MaximumRetryAttempts": 2 }

複数のバッチを同時に処理するには、--parallelization-factor オプションを使用します。

$ aws lambda update-event-source-mapping --uuid 2b733gdc-8ac3-cdf5-af3a-1827b3b11284 \ --parallelization-factor 5

エラー処理

DynamoDB ストリームからレコードを読み取るイベントソースマッピングは、関数を同期的に呼び出し、エラー発生時には再試行します。関数がスロットリングされるか、Lambda サービスが関数を呼び出さずにエラーを返す場合、Lambda は、レコードの有効期限が切れるか、イベントソースマッピングで設定した最大期間を超えるまで再試行します。

関数がレコードを受け取るがエラーを返す場合、Lambda はバッチ内のレコードの有効期限が切れる、最大期間を過ぎる、または設定された再試行の制限に達するまで、再試行します。関数のエラーの場合、失敗したバッチを 2 つのバッチに分割するようイベントソースマッピングを設定することもできます。より小さなバッチで再試行すると、不良レコードが分離され、タイムアウトの問題を回避できます。バッチを分割しても、再試行の制限に対してカウントされません。

エラー処理が失敗する場合、Lambda はレコードを破棄し、ストリームからのバッチ処理を継続します。この場合、デフォルト設定では、不良レコードが最大 one day まで、影響を受けたシャードの処理をブロックする可能性があります。これを回避するには、ユースケースに合った適切な数の再試行数と最大レコード期間で、関数のイベントソースマッピングを設定します。

破棄されるバッチのレコードを保持するには、失敗したイベントの送信先を設定します。Lambda はバッチの詳細を含むドキュメントを送信先キューまたはトピックに送信します。

失敗したイベントのレコードの送信先を設定するには

  1. Lambda コンソール (関数ページ) を開きます。

  2. 関数を選択します。

  3. [ Designer (デザイナー)] で、[送信先の追加] を選択します。

  4. [Source (送信元)] で、[Stream invocation (ストリーム呼び出し)] を選択します。

  5. [ストリーム] で、関数にマッピングされるストリームを選択します。

  6. [送信先タイプ] で、呼び出しレコードを受信するリソースのタイプを選択します。

  7. [送信先] で、リソースを選択します。

  8. [Save] を選択します。

次の例は、DynamoDB ストリームの呼び出しレコードを示します。

例 呼び出しレコード

{ "requestContext": { "requestId": "316aa6d0-8154-xmpl-9af7-85d5f4a6bc81", "functionArn": "arn:aws:lambda:us-east-2:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted", "approximateInvokeCount": 1 }, "responseContext": { "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:13:49.717Z", "DDBStreamBatchInfo": { "shardId": "shardId-00000001573689847184-864758bb", "startSequenceNumber": "800000000003126276362", "endSequenceNumber": "800000000003126276362", "approximateArrivalOfFirstRecord": "2019-11-14T00:13:19Z", "approximateArrivalOfLastRecord": "2019-11-14T00:13:19Z", "batchSize": 1, "streamArn": "arn:aws:dynamodb:us-east-2:123456789012:table/mytable/stream/2019-11-14T00:04:06.388" } }

この情報を使用して、トラブルシューティングのため、影響のあるレコードをストリームから取得できます。実際のレコードは含まれないため、このレコードを処理し、有効期限が切れて失われる前に、ストリームから取得する必要があります。

Amazon CloudWatch のメトリクス

関数がレコードのバッチの処理を完了すると、Lambda により IteratorAge メトリクスが発生します。メトリクスは、処理が終了したとき、バッチの最後のレコードがどれくらい時間が経過したレコードであったかを示します。関数が新しいイベントを処理する場合、イテレーターの有効期間を使用して、レコードが追加されてから関数によって処理されるまでのレイテンシーを推定できます。

イテレーターの有効期間が増加傾向の場合、関数に問題があることを示している可能性があります。詳細については、「AWS Lambda 関数メトリクスの使用」を参照してください。