Amazon Kinesis で AWS Lambda を使用する - AWS Lambda

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Amazon Kinesis で AWS Lambda を使用する

注記

Lambda 関数以外のターゲットにデータを送信するか、データを送信する前にデータを強化する場合は、「Amazon EventBridge Pipes」を参照してください。

AWS Lambda 関数を使用すると、Amazon Kinesis データストリームのレコードを処理できます。

Kinesis データストリームは、シャードのセットです。各シャードには、一連のデータレコードが含まれます。コンシューマーは、Kinesis データストリームからのデータを処理するアプリケーションです。Lambda 関数を共有スループットコンシューマー (標準イテレーター) にマップすることも、拡張ファンアウトを使用する専用スループットコンシューマーにマップすることもできます。

標準イテレーターの場合、Lambda は HTTP プロトコルを使用して、Kinesis ストリームの各シャードにレコードがあるかどうかをポーリングします。イベントソースマッピングは、シャードの他のコンシューマーと読み取りスループットを共有します。

レイテンシーを最小限に抑え、読み取りスループットを最大化するために、拡張ファンアウトを使用するデータストリームコンシューマーを作成できます。ストリームコンシューマーは、ストリームから読み取る他のアプリケーションに影響を及ぼさないように、専用の接続を各シャードに割り当てます。専用のスループットは、多数のアプリケーションで同じデータを読み取っている場合や、大きなレコードでストリームを再処理する場合に役立ちます。Kinesis は、HTTP/2 経由で Lambda にレコードをプッシュします。

Kinesis Data Streams の詳細については、Reading Data from Amazon Kinesis Data Streams を参照してください。

イベントの例

{ "Records": [ { "kinesis": { "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692538361571095921575989136588898", "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "approximateArrivalTimestamp": 1545084650.987 }, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", "awsRegion": "us-east-2", "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" }, { "kinesis": { "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692540925702759324208523137515618", "data": "VGhpcyBpcyBvbmx5IGEgdGVzdC4=", "approximateArrivalTimestamp": 1545084711.166 }, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692540925702759324208523137515618", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", "awsRegion": "us-east-2", "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" } ] }

ポーリングストリームとバッチストリーム

Lambda はデータストリームからレコードを読み取り、関数を、ストリームのレコードを含むイベントと共に同期的に呼び出します。Lambda はバッチ単位でレコードを読み取り、関数を呼び出してバッチからレコードを処理します。各バッチには、単一のシャード/データストリームのレコードが含まれます。

デフォルトで、Lambda はレコードが使用可能になると同時に関数を呼び出します。Lambda がイベントソースから読み取るバッチにレコードが 1 つしかない場合、Lambda は関数に 1 つのレコードしか送信しません。少数のレコードで関数を呼び出さないようにするには、バッチ処理ウィンドウを設定することで、最大 5 分間レコードをバッファリングするようにイベントソースに指示できます。関数を呼び出す前に、Lambda は、完全なバッチを収集する、バッチ処理ウィンドウの期限が切れる、またはバッチが 6 MB のペイロード制限に到達するまでイベントソースからのレコードの読み取りを継続します。詳細については、「バッチ処理動作」を参照してください。

警告

Lambda イベントソースマッピングは各イベントを少なくとも 1 回処理し、バッチの重複処理が発生する可能性があります。重複するイベントに関連する潜在的な問題を避けるため、関数コードをべき等にすることを強くお勧めします。詳細については、 AWSナレッジセンターの「Lambda 関数をべき等にするにはどうすればよいですか?」を参照してください。

関数がエラーを返した場合、処理が成功するか、データの有効期限が切れるまで Lambda はバッチを再試行します。停止するシャードを避けるために、より小さなバッチサイズで再試行したり、再試行回数を制限したり、古すぎるレコードを破棄したりするように、イベントソースマッピングを設定できます。破棄されたイベントを保持するには、失敗したバッチの詳細を標準 SQS キューまたは標準 SNS トピックに送信するように、イベントソースマッピングを設定します。

同時実行数を増やすために、各シャードからの複数のバッチを並行して処理することもできます。Lambda は、各シャードで最大 10 個のバッチを同時に処理できます。シャードごとの同時バッチの数を増やしても、Lambda は引き続き、パーティションキーレベルで順序どおりに処理を行います。

ParallelizationFactor 設定を使用することで、複数の Lambda 呼び出しを同時に実行して Kinesis または DynamoDB データストリームの 1 つのシャードを処理します。Lambda がシャードからポーリングする同時バッチの数は、1 (デフォルト)~10 の並列化係数で指定できます。例えば、ParallelizationFactor を 2 に設定すると、最大 200 個の Lambda 呼び出しを同時に実行して、100 個の Kinesis データシャードを処理できます (ただし、ConcurrentExecutions メトリックの実際の値は異なったものとなります)。これにより、データボリュームが揮発性で IteratorAge が高いときに処理のスループットをスケールアップすることができます。

Kinesis 集約ParallelizationFactorで を使用することもできます。イベントソースマッピングの動作は、拡張ファンアウト を使用しているかどうかによって異なります。

  • 拡張ファンアウトなし: 集約イベント内のすべてのイベントは、同じパーティションキーを持つ必要があります。パーティションキーは、集約されたイベントのパーティションキーとも一致する必要があります。集約されたイベント内のイベントに異なるパーティションキーがある場合、Lambda はパーティションキーによるイベントの順番な処理を保証することはできません。

  • 拡張ファンアウト: まず、Lambda は集約されたイベントを個々のイベントにデコードします。集約されたイベントには、含まれるイベントとは異なるパーティションキーを設定できます。ただし、パーティションキーに対応していないイベントは削除され、 は失われます。Lambda はこれらのイベントを処理しず、設定された障害送信先に送信しません。

ポーリングとストリームの開始位置

イベントソースマッピングの作成時および更新時のストリームのポーリングは、最終的に一貫性があることに注意してください。

  • イベントソースマッピングの作成時、ストリームからのイベントのポーリングが開始されるまでに数分かかる場合があります。

  • イベントソースマッピングの更新時、ストリームからのイベントのポーリングが停止および再開されるまでに数分かかる場合があります。

つまり、LATEST をストリームの開始位置として指定すると、イベントソースマッピングの作成または更新中にイベントを見逃す可能性があります。イベントを見逃さないようにするには、ストリームの開始位置を TRIM_HORIZON または AT_TIMESTAMP として指定します。

データストリームと関数の設定

Lambda 関数は、データストリームのコンシューマーアプリケーションです。シャードごとに 1 つのレコードのバッチを一度に処理します。Lambda 関数はデータストリーム (標準イテレーター) か、ストリームのコンシューマー (拡張ファンアウト) にマッピングすることができます。

標準イテレーターでは、Lambda は、レコードの Kinesis ストリームにある各シャードを 1 秒あたり 1 回の基本レートでポーリングします。利用可能なレコードが増えると、Lambda は関数がストリームに追いつくまでバッチを処理し続けます。イベントソースマッピングは、シャードの他のコンシューマーと読み取りスループットを共有します。

レイテンシーを最小限に抑え、読み取りスループットを最大化するために、拡張ファンアウトを使用するデータストリームコンシューマーを作成します。拡張ファンアウトを使用するコンシューマーは、ストリームから読み取る他のアプリケーションに影響を及ぼさないように、専用の接続を各シャードに割り当てます。ストリームのコンシューマーは HTTP/2 を使用して、長時間にわたる接続とリクエストヘッダーの圧縮でレコードを Lambda にプッシュすることによってレイテンシーを短縮します。Kinesis RegisterStreamConsumer API を使用してストリームコンシューマーを作成できます。

aws kinesis register-stream-consumer --consumer-name con1 \ --stream-arn arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream

以下の出力が表示されます。

{ "Consumer": { "ConsumerName": "con1", "ConsumerARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream/consumer/con1:1540591608", "ConsumerStatus": "CREATING", "ConsumerCreationTimestamp": 1540591608.0 } }

関数がレコードを処理する速度を上げるには、データストリームにシャードを追加します。Lambda は、各シャードのレコードを順番に処理します。関数からエラーが返された場合、シャードのさらなるレコードの処理は停止されます。シャードが増えると、一度に処理されるバッチが増え、同時実行のエラーの影響を下げることができます。

同時実行のバッチの合計分を処理できるように関数をスケールアップできない場合は、関数のクォータ引き上げをリクエストするか、同時実行数を予約します。

実行ロールのアクセス許可

Lambda には、Kinesis データストリームに関連するリソースを管理するための以下のアクセス許可が必要です。AWSLambdaKinesisExecutionRole 管理ポリシーには、これらのアクセス許可が含まれています。この管理ポリシーは、関数の実行ロールに追加できます。

Kinesis データストリームと Lambda 関数が異なるアカウントにある場合は、ストリームリソースが Lambda 関数の実行ロールまたはアカウントに kinesis:DescribeStream 権限を付与していることを確認してください。

さらに、コンソールからイベントソースマッピングを作成する場合は、kinesis:ListStreams および kinesis: アクセスListStreamConsumers許可が必要です。

標準 SQS キューまたは標準 SNS トピックに失敗したバッチのレコードを送信するには、関数に追加の許可が必要になります。各送信先サービスには、次のように異なるアクセス許可が必要です。

アクセス許可を追加し、イベントソースマッピングを作成する

イベントソースマッピングを作成し、データストリームから Lambda 関数にレコードを送信するように Lambda に通知します。複数のイベントソースマッピングを作成することで、複数の Lambda 関数で同じデータを処理したり、1 つの関数で複数のデータストリームの項目を処理したりできます。複数のデータストリームから項目を処理する場合、各バッチには 1 つのシャード/ストリームのレコードのみが含まれます。

Kinesis データストリームから読み取るように関数を設定するには、実行ロールに AWSLambdaKinesisExecutionRoleAWS管理ポリシーを追加し、Kinesis トリガーを作成します。

アクセス許可を追加してトリガーを作成するには
  1. Lambda コンソールの関数ページを開きます。

  2. 関数の名前を選択します。

  3. [Configuration] (設定) タブを開き、次に [Permissions] (アクセス許可) をクリックします。

  4. ロール名 で、実行ロールへのリンクを選択します。このリンクをクリックすると、IAM コンソールでロールが開きます。

    
              実行ロールへのリンク
  5. [アクセス許可を追加][ポリシーをアタッチ] の順に選択します。

    
              IAM コンソールでのポリシーのアタッチ
  6. [検索] フィールドに AWSLambdaKinesisExecutionRole を入力します。このポリシーを実行ロールに追加します。これは、Kinesis ストリームから関数が読み取るために必要なアクセス許可を含む AWSマネージドポリシーです。このポリシーの詳細については、「 AWSマネージドポリシーリファレンスAWSLambdaKinesisExecutionRole」の「」を参照してください。

  7. Lambda コンソールで関数に戻ります。[関数の概要] で [トリガーを追加] をクリックします。

    
              Lambda コンソールの関数の概要セクション
  8. トリガーのタイプを選択します。

  9. 必須のオプションを設定し、[Add] (追加) を選択します。

Lambda は、Kinesis イベントソースの次のオプションをサポートしています。

イベントソースオプション
  • Kinesis ストリーム - レコードの読み取り元の Kinesis ストリーム。

  • コンシューマー (オプション) - ストリームコンシューマーを使用して、専用接続でストリームから読み込みます。

  • バッチサイズ - 各バッチで関数に送信されるレコードの数。最大 10,000。Lambda は、イベントの合計サイズが同期呼び出しのペイロード上限 (6 MB) を超えない限り、バッチ内のすべてのレコードを単一の呼び出しで関数に渡します。

  • バッチウィンドウ - 関数を呼び出す前にレコードを収集する最大時間(秒数)を指定します。

  • 開始位置 - 新しいレコードのみ、既存のすべてのレコード、または特定の日付以降に作成されたレコードを処理します。

    • 最新 - ストリームに追加された新しいレコードを処理します。

    • 水平トリム - ストリーム内のすべてのレコードを処理します。

    • タイムスタンプで - 特定の時刻以降のレコードを処理します。

    既存のレコードを処理した後、関数に戻り、新しいレコードの処理が続行されます。

  • [障害発生時の宛先] — 処理できないレコードの標準 SQS キューまたは標準 SNS トピックです。Lambda は、古すぎる、または再試行回数の上限に達したレコードのバッチを廃棄すると、バッチに関する詳細をキューまたはトピックに送信します。

  • 再試行回数 - 関数がエラーを返したときに Lambda が再試行する回数の上限です。これは、バッチが関数に到達しなかったサービスエラーやスロットルには適用されません。

  • レコードの最大有効期間 — Lambda が関数に送信するレコードの最大経過時間。

  • エラー発生時のバッチ分割 — 関数がエラーを返した場合、再試行する前にバッチを 2 つに分割します。元のバッチサイズ設定は変更されません。

  • シャードごとの同時バッチ — 同じシャードからの複数のバッチを同時に処理します。

  • 有効 - イベントソースマッピングを有効にするには、true に設定します。レコードの処理を停止するには、false に設定します。Lambda は、処理された最新のレコードを追跡し、再度有効になるとその時点から処理を再開します。

注記

Kinesis は、各シャードに対して課金し、拡張ファンアウトの場合はストリームから読み取られたデータに対して課金します。料金の詳細については、Amazon Kinesis の料金を参照してください。

後でイベントソース設定を管理するには、デザイナーでトリガーを選択します。

Kinesis イベントのフィルタリング

Kinesis を Lambda のイベントソースとして設定すると、イベントフィルタリングを使用して、処理のためにストリームからどのレコードを Lambda が関数に送信するかを制御できます。Kinesis で Lambda イベントフィルタリングを使用する方法の詳細については、「Kinesis でフィルタリング」を参照してください。

イベントソースマッピング API

AWS Command Line Interface (AWS CLI) または AWS SDK を使用してイベントソースを管理するには、以下の API 操作を使用できます。

AWS CLI を使用してイベントソースマッピングを作成するには、create-event-source-mapping コマンドを使用します。以下の例では、AWS CLI を使用して、関数 my-function を、Kinesis データストリームにマップします。データストリームは Amazon リソースネーム (ARN) によって指定され、バッチサイズ 500 で、Unix 時間形式のタイムスタンプから始まります。

aws lambda create-event-source-mapping --function-name my-function \ --batch-size 500 --starting-position AT_TIMESTAMP --starting-position-timestamp 1541139109 \ --event-source-arn arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream

次のような出力が表示されます。

{ "UUID": "2b733gdc-8ac3-cdf5-af3a-1827b3b11284", "BatchSize": 500, "MaximumBatchingWindowInSeconds": 0, "ParallelizationFactor": 1, "EventSourceArn": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream", "FunctionArn": "arn:aws:lambda:us-east-2:123456789012:function:my-function", "LastModified": 1541139209.351, "LastProcessingResult": "No records processed", "State": "Creating", "StateTransitionReason": "User action", "DestinationConfig": {}, "MaximumRecordAgeInSeconds": 604800, "BisectBatchOnFunctionError": false, "MaximumRetryAttempts": 10000 }

コンシューマーを使用するには、ストリーム ARN ではなく、コンシューマーの ARN を指定します。

その他のオプションを設定し、バッチの処理方法をカスタマイズしたり、処理できないレコードをいつ破棄するかを指定したりします。次の例では、2 回の再試行後、またはレコードが 1 時間以上経過した場合に、イベントソースマッピングを更新して障害レコードを標準 SQS キューに送信します。

aws lambda update-event-source-mapping --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --maximum-retry-attempts 2 --maximum-record-age-in-seconds 3600 --destination-config '{"OnFailure": {"Destination": "arn:aws:sqs:us-east-2:123456789012:dlq"}}'

こちらの出力が表示されるはずです。

{ "UUID": "f89f8514-cdd9-4602-9e1f-01a5b77d449b", "BatchSize": 100, "MaximumBatchingWindowInSeconds": 0, "ParallelizationFactor": 1, "EventSourceArn": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream", "FunctionArn": "arn:aws:lambda:us-east-2:123456789012:function:my-function", "LastModified": 1573243620.0, "LastProcessingResult": "PROBLEM: Function call failed", "State": "Updating", "StateTransitionReason": "User action", "DestinationConfig": {}, "MaximumRecordAgeInSeconds": 604800, "BisectBatchOnFunctionError": false, "MaximumRetryAttempts": 10000 }

更新された設定は非同期に適用され、プロセスが完了するまで出力に反映されません。get-event-source-mapping コマンドを使用して、現在のステータスを表示します。

aws lambda get-event-source-mapping --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b

こちらの出力が表示されるはずです。

{ "UUID": "f89f8514-cdd9-4602-9e1f-01a5b77d449b", "BatchSize": 100, "MaximumBatchingWindowInSeconds": 0, "ParallelizationFactor": 1, "EventSourceArn": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream", "FunctionArn": "arn:aws:lambda:us-east-2:123456789012:function:my-function", "LastModified": 1573244760.0, "LastProcessingResult": "PROBLEM: Function call failed", "State": "Enabled", "StateTransitionReason": "User action", "DestinationConfig": { "OnFailure": { "Destination": "arn:aws:sqs:us-east-2:123456789012:dlq" } }, "MaximumRecordAgeInSeconds": 3600, "BisectBatchOnFunctionError": false, "MaximumRetryAttempts": 2 }

複数のバッチを同時に処理するには、--parallelization-factor オプションを使用します。

aws lambda update-event-source-mapping --uuid 2b733gdc-8ac3-cdf5-af3a-1827b3b11284 \ --parallelization-factor 5

エラー処理

Kinesis ストリームからレコードを読み取るイベントソースマッピングは、関数を同期的に呼び出し、エラー発生時に再試行します。Lambda が関数をスロットリングするか、関数を呼び出すことなくエラーを返す場合、Lambda は、レコードの期限が切れる、またはイベントソースマッピングで設定した最大経過時間を超過するまで再試行します。

関数がレコードを受信してもエラーが返された場合、バッチ内のレコードが期限切れになるか、最大経過時間を超えるか、設定された再試行クォータに達するまで、Lambda は再試行します。関数エラーの場合、失敗したバッチを 2 つのバッチに分割するように、イベントソースマッピングを構成することもできます。小さなバッチで再試行すると、不良のレコードが分離され、タイムアウトの問題が回避されます。バッチを分割しても、再試行クォータにはカウントされません。

エラー処理の対策に失敗すると、Lambda はレコードを破棄し、ストリームからのバッチ処理を継続します。デフォルト設定では、不良レコードによって、影響を受けるシャードでの処理が最大 1 週間ブロックされる可能性があります。これを回避するには、関数のイベントソースマッピングを、適切な再試行回数と、ユースケースに適合する最大レコード経過時間で設定します。

破棄されたバッチのレコードを保持するには、失敗したイベントの送信先を設定します。Lambda は、バッチの詳細を含むドキュメントを送信先キューまたはトピックに送信します。

失敗したイベントのレコードの送信先を設定するには
  1. Lambda コンソールの [関数ページ] を開きます。

  2. 関数を選択します。

  3. [機能の概要 ] で、[送信先を追加 ] を選択します。

  4. [Source] (送信元) で、[Stream invocation] (ストリームの呼び出し) を選択します。

  5. [ストリーム] で、関数にマッピングされるストリームを選択します。

  6. [送信先タイプ] で、呼び出しレコードを受信するリソースのタイプを選択します。

  7. [送信先] で、リソースを選択します。

  8. [Save] を選択します。

次の例は、Kinesis ストリームの呼び出しレコードを示します。

例 呼び出しレコード
{ "requestContext": { "requestId": "c9b8fa9f-5a7f-xmpl-af9c-0c604cde93a5", "functionArn": "arn:aws:lambda:us-east-2:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted", "approximateInvokeCount": 1 }, "responseContext": { "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:38:06.021Z", "KinesisBatchInfo": { "shardId": "shardId-000000000001", "startSequenceNumber": "49601189658422359378836298521827638475320189012309704722", "endSequenceNumber": "49601189658422359378836298522902373528957594348623495186", "approximateArrivalOfFirstRecord": "2019-11-14T00:38:04.835Z", "approximateArrivalOfLastRecord": "2019-11-14T00:38:05.580Z", "batchSize": 500, "streamArn": "arn:aws:kinesis:us-east-2:123456789012:stream/mystream" } }

この情報は、トラブルシューティングのためにストリームから影響を受けるレコードを取得する際に使用できます。実際のレコードは含まれていないので、有効期限が切れて失われる前に、このレコードを処理し、ストリームから取得する必要があります。

Amazon CloudWatch メトリクス

関数がレコードのバッチの処理を完了すると、Lambda により IteratorAge メトリクスが発生します。メトリクスは、処理が終了したとき、バッチの最後のレコードがどれくらい時間が経過したレコードであったかを示します。関数が新しいイベントを処理する場合、イテレーターの有効期間を使用して、レコードが追加されてから関数によって処理されるまでのレイテンシーを推定できます。

イテレーターの有効期間が増加傾向の場合、関数に問題があることを示している可能性があります。詳しくは、「Lambda 関数のメトリクスの使用」を参照してください。

時間枠

Lambda 関数は、連続ストリーム処理アプリケーションを実行できます。ストリームは、アプリケーションを継続的に流れる無限のデータを表します。この継続的に更新される入力からの情報を分析するために、時間に関して定義されたウィンドウを使用して、含まれるレコードをバインドできます。

タンブリングウィンドウは、一定の間隔で開閉する別個のタイムウィンドウです。ディフォルトでは、Lambda 呼び出しはステートレス — 外部データベースがない場合、複数の連続した呼び出しでデータを処理するために使用することはできません。ただし、タンブリングウィンドウを使用して、呼び出し間で状態を維持できます。この状態は、現在のウィンドウに対して以前に処理されたメッセージの集計結果が含まれます。状態は、シャードごとに最大 1 MB にすることができます。このサイズを超えると、Lambda はウィンドウを早期に終了します。

ストリームの各レコードは、特定のウィンドウに属しています。Lambda は各レコードを少なくとも 1 回処理しますが、各レコードが 1 回だけ処理される保証はありません。エラー処理などのまれなケースでは、一部のレコードが複数回処理されることがあります。レコードは常に最初から順番に処理されます。レコードが複数回処理される場合、順不同で処理されます。

集約と処理

ユーザー管理関数は、集約と、その集約の最終結果を処理するために呼び出されます。Lambda は、ウィンドウで受信したすべてのレコードを集約します。これらのレコードは、個別の呼び出しとして複数のバッチで受け取ることができます。各呼び出しは状態を受け取ります。したがって、タンブリングウィンドウを使用する場合、Lambda 関数の応答に state プロパティが含まれている必要があります。応答に state プロパティが含まれてないと、Lambda はこれを失敗した呼び出しと見なします。この条件を満たすために、関数は次の JSON 形式の TimeWindowEventResponse オブジェクトを返すことができます。

TimeWindowEventResponse
{ "state": { "1": 282, "2": 715 }, "batchItemFailures": [] }
注記

Java 関数の場合は、Map<String, String>を使用して状態を表すことをお勧めします。

ウィンドウの最後で、フラグisFinalInvokeForWindowtrueに設定され、これが最終状態であり、処理の準備ができていることが示されます。処理が完了すると、ウィンドウが完了し、最終的な呼び出しが完了し、状態は削除されます。

ウィンドウの最後に、Lambda は集計結果に対するアクションの最終処理を使用します。最終処理が同期的に呼び出されます。呼び出しが成功すると、関数はシーケンス番号をチェックポイントし、ストリーム処理が続行されます。呼び出しが失敗した場合、Lambda 関数は呼び出しが成功するまで処理を一時停止します。

例 KinesisTimeWindowEvent
{ "Records": [ { "kinesis": { "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692538361571095921575989136588898", "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "approximateArrivalTimestamp": 1607497475.000 }, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-kinesis-role", "awsRegion": "us-east-1", "eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/lambda-stream" } ], "window": { "start": "2020-12-09T07:04:00Z", "end": "2020-12-09T07:06:00Z" }, "state": { "1": 282, "2": 715 }, "shardId": "shardId-000000000006", "eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/lambda-stream", "isFinalInvokeForWindow": false, "isWindowTerminatedEarly": false }

構成

イベントソースマッピングを作成または更新するときに 、タンブリングウィンドウを設定できます。タンブリングウィンドウを設定するには、ウィンドウを秒単位で指定します。次の例のAWS Command Line Interface (AWS CLI)コマンドは、タンブルウィンドウが120秒に設定されたストリーミングイベントソースマッピングを作成します。集約と処理のために Lambda 関数が定義した関数の名前は tumbling-window-example-function です。

aws lambda create-event-source-mapping --event-source-arn arn:aws:kinesis:us-east-1:123456789012:stream/lambda-stream --function-name "arn:aws:lambda:us-east-1:123456789018:function:tumbling-window-example-function" --region us-east-1 --starting-position TRIM_HORIZON --tumbling-window-in-seconds 120

Lambdaは、レコードがストリームに挿入された時間に基づいて、タンブルするウィンドウ境界を決定します。すべてのレコードには、Lambda が境界の決定に使用するおおよそのタイムスタンプがあります。

ウィンドウの集合をタンブルしても、再共有はサポートされません。シャードが終了すると、Lambda はウィンドウが閉じているとみなし、子シャードは新しい状態で自身のウィンドウを開始します。

タンブルウィンドウは、既存の再試行ポリシーmaxRetryAttemptsおよびmaxRecordAgeを完全にサポートします。

例 Handler.py - 集約と処理

次の Python 関数は、最終状態を集約して処理する方法を示しています。

def lambda_handler(event, context): print('Incoming event: ', event) print('Incoming state: ', event['state']) #Check if this is the end of the window to either aggregate or process. if event['isFinalInvokeForWindow']: # logic to handle final state of the window print('Destination invoke') else: print('Aggregate invoke') #Check for early terminations if event['isWindowTerminatedEarly']: print('Window terminated early') #Aggregation logic state = event['state'] for record in event['Records']: state[record['kinesis']['partitionKey']] = state.get(record['kinesis']['partitionKey'], 0) + 1 print('Returning state: ', state) return {'state': state}

バッチアイテムの失敗をレポートする

イベントソースからストリーミングデータを使用および処理する場合、デフォルトでは、バッチが完全に成功した場合にのみ、バッチの最大シーケンス番号に Lambda チェックポイントが設定されます。Lambda は、他のすべての結果を完全な失敗として扱い、再試行の上限までバッチの処理を再試行します。ストリームからのバッチの処理中に部分的な成功を許可するには、ReportBatchItemFailuresをオンにします 。部分的な成功を許可すると、レコードの再試行回数を減らすことができますが、成功したレコードの再試行の可能性を完全に妨げるわけではありません。

ReportBatchItemFailuresオンにするには 、ReportBatchItemFailures 列挙値をFunctionResponseTypesリストに含めます。このリストは、関数で有効になっているレスポンスタイプを示します。このリストは、 イベントソースマッピングを作成または更新するときに設定ができます。

レポートの構文

バッチアイテムの失敗に関するレポートを設定する場合、StreamsEventResponse クラスはバッチアイテムの失敗のリストとともに返されます。StreamsEventResponseオブジェクトを使用して、バッチ処理で最初に失敗したレコードのシーケンス番号を返すことができます。また、正しいレスポンスシンタックスを使用して、独自のカスタムクラスを作成することもできます。次の JSON 構造体は、必要な応答構文を示しています。

{ "batchItemFailures": [ { "itemIdentifier": "<SequenceNumber>" } ] }
注記

batchItemFailures 配列に複数の項目が含まれている場合、Lambda はシーケンス番号が最も小さいレコードをチェックポイントとして使用します。その後、Lambda はそのチェックポイントからすべてのレコードを再試行します。

成功条件と失敗の条件

次のいずれかを返すと、Lambda はバッチを完全な成功として処理します:

  • 空のbatchItemFailureリストです。

  • null の batchItemFailure リスト

  • 空の EventResponse

  • ヌル EventResponse

次のいずれかを返すと、Lambda はバッチを完全な失敗として処理します:

  • 空の文字列itemIdentifier

  • ヌル itemIdentifier

  • itemIdentifier間違えているキー名

Lambda は、再試行戦略に基づいて失敗を再試行します。

バッチを2分割します

呼び出しが失敗し、BisectBatchOnFunctionError オンになっている場合、バッチはReportBatchItemFailures設定に関係なく2分割されます。

部分的なバッチ成功レスポンスを受信し、BisectBatchOnFunctionErrorReportBatchItemFailures の両方がオンになっている場合、バッチは返されたシーケンス番号で 2 分割され、Lambda は残りのレコードのみを再試行します。

バッチで失敗したメッセージ ID のリストを返す関数コードの例を次に示します。

.NET
AWS SDK for .NET
注記

については、「」を参照してください GitHub。サーバーレスサンプルリポジトリで完全な例を検索し、設定および実行の方法を確認してください。

.NET を使用した Lambda での Kinesis バッチアイテム失敗のレポート。

using System.Text; using System.Text.Json.Serialization; using Amazon.Lambda.Core; using Amazon.Lambda.KinesisEvents; using AWS.Lambda.Powertools.Logging; // Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class. [assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))] namespace KinesisIntegration; public class Function { // Powertools Logger requires an environment variables against your function // POWERTOOLS_SERVICE_NAME [Logging(LogEvent = true)] public async Task<StreamsEventResponse> FunctionHandler(KinesisEvent evnt, ILambdaContext context) { if (evnt.Records.Count == 0) { Logger.LogInformation("Empty Kinesis Event received"); return new StreamsEventResponse(); } foreach (var record in evnt.Records) { try { Logger.LogInformation($"Processed Event with EventId: {record.EventId}"); string data = await GetRecordDataAsync(record.Kinesis, context); Logger.LogInformation($"Data: {data}"); // TODO: Do interesting work based on the new data } catch (Exception ex) { Logger.LogError($"An error occurred {ex.Message}"); /* Since we are working with streams, we can return the failed item immediately. Lambda will immediately begin to retry processing from this failed item onwards. */ return new StreamsEventResponse { BatchItemFailures = new List<StreamsEventResponse.BatchItemFailure> { new StreamsEventResponse.BatchItemFailure { ItemIdentifier = record.Kinesis.SequenceNumber } } }; } } Logger.LogInformation($"Successfully processed {evnt.Records.Count} records."); return new StreamsEventResponse(); } private async Task<string> GetRecordDataAsync(KinesisEvent.Record record, ILambdaContext context) { byte[] bytes = record.Data.ToArray(); string data = Encoding.UTF8.GetString(bytes); await Task.CompletedTask; //Placeholder for actual async work return data; } } public class StreamsEventResponse { [JsonPropertyName("batchItemFailures")] public IList<BatchItemFailure> BatchItemFailures { get; set; } public class BatchItemFailure { [JsonPropertyName("itemIdentifier")] public string ItemIdentifier { get; set; } } }
Go
SDK for Go V2
注記

については、「」を参照してください GitHub。サーバーレスサンプルリポジトリで完全な例を検索し、設定および実行の方法を確認してください。

Go を使用した Lambda での Kinesis バッチアイテム失敗のレポート。

package main import ( "context" "fmt" "github.com/aws/aws-lambda-go/events" "github.com/aws/aws-lambda-go/lambda" ) func handler(ctx context.Context, kinesisEvent events.KinesisEvent) (map[string]interface{}, error) { batchItemFailures := []map[string]interface{}{} for _, record := range kinesisEvent.Records { curRecordSequenceNumber := "" // Process your record if /* Your record processing condition here */ { curRecordSequenceNumber = record.Kinesis.SequenceNumber } // Add a condition to check if the record processing failed if curRecordSequenceNumber != "" { batchItemFailures = append(batchItemFailures, map[string]interface{}{"itemIdentifier": curRecordSequenceNumber}) } } kinesisBatchResponse := map[string]interface{}{ "batchItemFailures": batchItemFailures, } return kinesisBatchResponse, nil } func main() { lambda.Start(handler) }
Java
SDK for Java 2.x
注記

については、「」を参照してください GitHub。サーバーレスサンプルリポジトリで完全な例を検索し、設定および実行の方法を確認してください。

Java を使用した Lambda での Kinesis バッチアイテム失敗のレポート。

import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestHandler; import com.amazonaws.services.lambda.runtime.events.KinesisEvent; import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse; import java.io.Serializable; import java.util.ArrayList; import java.util.List; public class ProcessKinesisRecords implements RequestHandler<KinesisEvent, StreamsEventResponse> { @Override public StreamsEventResponse handleRequest(KinesisEvent input, Context context) { List<StreamsEventResponse.BatchItemFailure> batchItemFailures = new ArrayList<>(); String curRecordSequenceNumber = ""; for (KinesisEvent.KinesisEventRecord kinesisEventRecord : input.getRecords()) { try { //Process your record KinesisEvent.Record kinesisRecord = kinesisEventRecord.getKinesis(); curRecordSequenceNumber = kinesisRecord.getSequenceNumber(); } catch (Exception e) { /* Since we are working with streams, we can return the failed item immediately. Lambda will immediately begin to retry processing from this failed item onwards. */ batchItemFailures.add(new StreamsEventResponse.BatchItemFailure(curRecordSequenceNumber)); return new StreamsEventResponse(batchItemFailures); } } return new StreamsEventResponse(batchItemFailures); } }
JavaScript
SDK for JavaScript (v2)
注記

については、「」を参照してください GitHub。サーバーレスサンプルリポジトリで完全な例を検索し、設定および実行の方法を確認してください。

Javascript を使用した Lambda での Kinesis バッチアイテム失敗のレポート。

exports.handler = async (event, context) => { for (const record of event.Records) { try { console.log(`Processed Kinesis Event - EventID: ${record.eventID}`); const recordData = await getRecordDataAsync(record.kinesis); console.log(`Record Data: ${recordData}`); // TODO: Do interesting work based on the new data } catch (err) { console.error(`An error occurred ${err}`); /* Since we are working with streams, we can return the failed item immediately. Lambda will immediately begin to retry processing from this failed item onwards. */ return { batchItemFailures: [{ itemIdentifier: record.kinesis.sequenceNumber }], }; } } console.log(`Successfully processed ${event.Records.length} records.`); return { batchItemFailures: [] }; }; async function getRecordDataAsync(payload) { var data = Buffer.from(payload.data, "base64").toString("utf-8"); await Promise.resolve(1); //Placeholder for actual async work return data; }

を使用した Lambda での Kinesis バッチアイテム失敗のレポート TypeScript。

import { KinesisStreamEvent, Context, KinesisStreamHandler, KinesisStreamRecordPayload, KinesisStreamBatchResponse, } from "aws-lambda"; import { Buffer } from "buffer"; import { Logger } from "@aws-lambda-powertools/logger"; const logger = new Logger({ logLevel: "INFO", serviceName: "kinesis-stream-handler-sample", }); export const functionHandler: KinesisStreamHandler = async ( event: KinesisStreamEvent, context: Context ): Promise<KinesisStreamBatchResponse> => { for (const record of event.Records) { try { logger.info(`Processed Kinesis Event - EventID: ${record.eventID}`); const recordData = await getRecordDataAsync(record.kinesis); logger.info(`Record Data: ${recordData}`); // TODO: Do interesting work based on the new data } catch (err) { logger.error(`An error occurred ${err}`); /* Since we are working with streams, we can return the failed item immediately. Lambda will immediately begin to retry processing from this failed item onwards. */ return { batchItemFailures: [{ itemIdentifier: record.kinesis.sequenceNumber }], }; } } logger.info(`Successfully processed ${event.Records.length} records.`); return { batchItemFailures: [] }; }; async function getRecordDataAsync( payload: KinesisStreamRecordPayload ): Promise<string> { var data = Buffer.from(payload.data, "base64").toString("utf-8"); await Promise.resolve(1); //Placeholder for actual async work return data; }
PHP
SDK for PHP
注記

については、「」を参照してください GitHub。サーバーレスサンプルリポジトリで完全な例を検索し、設定および実行の方法を確認してください。

PHP を使用した Lambda での Kinesis バッチアイテム失敗のレポート。

<?php # using bref/bref and bref/logger for simplicity use Bref\Context\Context; use Bref\Event\Kinesis\KinesisEvent; use Bref\Event\Handler as StdHandler; use Bref\Logger\StderrLogger; require __DIR__ . '/vendor/autoload.php'; class Handler implements StdHandler { private StderrLogger $logger; public function __construct(StderrLogger $logger) { $this->logger = $logger; } /** * @throws JsonException * @throws \Bref\Event\InvalidLambdaEvent */ public function handle(mixed $event, Context $context): array { $kinesisEvent = new KinesisEvent($event); $this->logger->info("Processing records"); $records = $kinesisEvent->getRecords(); $failedRecords = []; foreach ($records as $record) { try { $data = $record->getData(); $this->logger->info(json_encode($data)); // TODO: Do interesting work based on the new data } catch (Exception $e) { $this->logger->error($e->getMessage()); // failed processing the record $failedRecords[] = $record->getSequenceNumber(); } } $totalRecords = count($records); $this->logger->info("Successfully processed $totalRecords records"); // change format for the response $failures = array_map( fn(string $sequenceNumber) => ['itemIdentifier' => $sequenceNumber], $failedRecords ); return [ 'batchItemFailures' => $failures ]; } } $logger = new StderrLogger(); return new Handler($logger);
Python
SDK for Python (Boto3)
注記

については、「」を参照してください GitHub。サーバーレスサンプルリポジトリで完全な例を検索し、設定および実行の方法を確認してください。

Python を使用した Lambda での Kinesis バッチアイテム失敗のレポート。

def handler(event, context): records = event.get("Records") curRecordSequenceNumber = "" for record in records: try: # Process your record curRecordSequenceNumber = record["kinesis"]["sequenceNumber"] except Exception as e: # Return failed record's sequence number return {"batchItemFailures":[{"itemIdentifier": curRecordSequenceNumber}]} return {"batchItemFailures":[]}
Rust
SDK for Rust
注記

については、「」を参照してください GitHub。サーバーレスサンプルリポジトリで完全な例を検索し、設定および実行の方法を確認してください。

Rust を使用した Lambda での Kinesis バッチアイテム失敗のレポート。

use aws_lambda_events::{ event::kinesis::KinesisEvent, kinesis::KinesisEventRecord, streams::{KinesisBatchItemFailure, KinesisEventResponse}, }; use lambda_runtime::{run, service_fn, Error, LambdaEvent}; async fn function_handler(event: LambdaEvent<KinesisEvent>) -> Result<KinesisEventResponse, Error> { let mut response = KinesisEventResponse { batch_item_failures: vec![], }; if event.payload.records.is_empty() { tracing::info!("No records found. Exiting."); return Ok(response); } for record in &event.payload.records { tracing::info!( "EventId: {}", record.event_id.as_deref().unwrap_or_default() ); let record_processing_result = process_record(record); if record_processing_result.is_err() { response.batch_item_failures.push(KinesisBatchItemFailure { item_identifier: record.kinesis.sequence_number.clone(), }); /* Since we are working with streams, we can return the failed item immediately. Lambda will immediately begin to retry processing from this failed item onwards. */ return Ok(response); } } tracing::info!( "Successfully processed {} records", event.payload.records.len() ); Ok(response) } fn process_record(record: &KinesisEventRecord) -> Result<(), Error> { let record_data = std::str::from_utf8(record.kinesis.data.as_slice()); if let Some(err) = record_data.err() { tracing::error!("Error: {}", err); return Err(Error::from(err)); } let record_data = record_data.unwrap_or_default(); // do something interesting with the data tracing::info!("Data: {}", record_data); Ok(()) } #[tokio::main] async fn main() -> Result<(), Error> { tracing_subscriber::fmt() .with_max_level(tracing::Level::INFO) // disable printing the name of the module in every log line. .with_target(false) // disabling time is handy because CloudWatch will add the ingestion time. .without_time() .init(); run(service_fn(function_handler)).await }

Amazon Kinesis 設定パラメータ

すべての Lambda イベントソースタイプは、同じ CreateEventSourceMappingおよび UpdateEventSourceMapping API オペレーションを共有します。ただし、Kinesis に適用されるのは一部のパラメータのみです。

Kinesis に適用されるイベントソースパラメータ
[Parameter] (パラメータ) 必須 デフォルト メモ

BatchSize

N

100

最大: 10,000

BisectBatchOnFunctionError

N

false

DestinationConfig

N

廃棄されたレコードの標準 Amazon SQS キューまたは標準 Amazon SNS トピックの送信先。

有効

N

true

EventSourceArn

Y

データストリームまたはストリームコンシューマーの ARN。

FunctionName

Y

MaximumBatchingWindowInSeconds

N

0

MaximumRecordAgeInSeconds

N

-1

-1 は無制限を意味します。Lambda はレコードを破棄しません (Kinesis Data Streams データ保持設定は引き続き適用されます)

最小: -1

最大: 604,800

MaximumRetryAttempts

N

-1

-1 に設定すると無制限になり、失敗したレコードはレコードの有効期限が切れるまで再試行されます。

最小: -1

最大: 10,000

ParallelizationFactor

N

1

最大: 10

StartingPosition

Y

AT_TIMESTAMP、TRIM_HORIZON、または LATEST

StartingPositionTimestamp

N

StartingPosition が AT_TIMESTAMP に設定されている場合にのみ有効です。Unix タイム秒単位で読み取りをスタートする時間

TumblingWindowInSeconds

N

最小: 0

最大: 900