Lambda が Amazon Kinesis Data Streams からのレコードを処理する方法 - AWS Lambda

Lambda が Amazon Kinesis Data Streams からのレコードを処理する方法

Lambda 関数を使用して、Amazon Kinesis データストリームのレコードを処理できます。Lambda 関数を Kinesis Data Streams 共有スループットコンシューマー (標準イテレーター) にマップすることも、拡張ファンアウトを使用する専用スループットコンシューマーにマップすることもできます。標準イテレーターの場合、Lambda は HTTP プロトコルを使用して、Kinesis ストリームの各シャードにレコードがあるかどうかをポーリングします。イベントソースマッピングは、シャードの他のコンシューマーと読み取りスループットを共有します。

Kinesis Data Streams の詳細については、Reading Data from Amazon Kinesis Data Streams を参照してください。

注記

Kinesis は、各シャードに対して課金し、拡張ファンアウトの場合はストリームから読み取られたデータに対して課金します。料金の詳細については、Amazon Kinesis の料金を参照してください。

ポーリングストリームとバッチストリーム

Lambda はデータストリームからレコードを読み取り、関数を、ストリームのレコードを含むイベントと共に同期的に呼び出します。Lambda はバッチ単位でレコードを読み取り、関数を呼び出してバッチからレコードを処理します。各バッチには、単一のシャード/データストリームのレコードが含まれます。

デフォルトで、Lambda はレコードが使用可能になると同時に関数を呼び出します。Lambda がイベントソースから読み取るバッチにレコードが 1 つしかない場合、Lambda は関数に 1 つのレコードしか送信しません。少数のレコードで関数を呼び出さないようにするには、バッチ処理ウィンドウを設定することで、最大 5 分間レコードをバッファリングするようにイベントソースに指示できます。関数を呼び出す前に、Lambda は、完全なバッチを収集する、バッチ処理ウィンドウの期限が切れる、またはバッチが 6 MB のペイロード制限に到達するまでイベントソースからのレコードの読み取りを継続します。詳細については、「バッチ処理動作」を参照してください。

警告

Lambda イベントソースマッピングは各イベントを少なくとも 1 回処理し、レコードの重複処理が発生する可能性があります。重複するイベントに関連する潜在的な問題を避けるため、関数コードを冪等にすることを強くお勧めします。詳細については、 AWS ナレッジセンターの「Lambda 関数を冪等にするにはどうすればよいですか?」を参照してください。

Kinesis データストリームの 1 つのシャードを複数の Lambda 呼び出しで同時に処理するには、ParallelizationFactor 設定を構成します。Lambda がシャードからポーリングする同時バッチの数は、1 (デフォルト)~10 の並列化係数で指定できます。例えば、ParallelizationFactor を 2 に設定すると、最大 200 個の Lambda 呼び出しを同時に実行して、100 個の Kinesis データシャードを処理できます (ただし、ConcurrentExecutions メトリックの実際の値は異なったものとなります)。これにより、データボリュームが揮発性で IteratorAge が高いときに処理のスループットをスケールアップすることができます。シャードごとの同時実行バッチの数を増やしても、Lambda はパーティションキーレベルで順序立った処理を確実に行います。

Kinesis 集約で ParallelizationFactor を使用することもできます。イベントソースマッピングの動作は、拡張ファンアウトを使用しているかどうかによって異なります。

  • 拡張ファンアウトなし: 集約イベント内のすべてのイベントは、同じパーティションキーを持つ必要があります。パーティションキーは、集約イベントのパーティションキーとも一致する必要があります。集約イベント内のイベントに異なるパーティションキーがある場合、Lambda ではパーティションキーによるイベントが順序通りに処理されないことがあります。

  • 拡張ファンアウトあり: まず、Lambda は集約イベントを個々のイベントにデコードします。集約イベントには、含まれるイベントとは異なるパーティションキーを設定できます。ただし、パーティションキーに一致しないイベントは削除され、失われます。Lambda ではこれらのイベントは処理されず、設定された障害時の送信先には送信されません。

イベントの例

{ "Records": [ { "kinesis": { "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692538361571095921575989136588898", "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "approximateArrivalTimestamp": 1545084650.987 }, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", "awsRegion": "us-east-2", "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" }, { "kinesis": { "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692540925702759324208523137515618", "data": "VGhpcyBpcyBvbmx5IGEgdGVzdC4=", "approximateArrivalTimestamp": 1545084711.166 }, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692540925702759324208523137515618", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", "awsRegion": "us-east-2", "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" } ] }