Lambda が Amazon Kinesis Data Streams からのレコードを処理する方法 - AWS Lambda

Lambda が Amazon Kinesis Data Streams からのレコードを処理する方法

Lambda 関数を使用して、Amazon Kinesis データストリームのレコードを処理できます。Lambda 関数を Kinesis Data Streams 共有スループットコンシューマー (標準イテレーター) にマップすることも、拡張ファンアウトを使用する専用スループットコンシューマーにマップすることもできます。標準イテレーターの場合、Lambda は HTTP プロトコルを使用して、Kinesis ストリームの各シャードにレコードがあるかどうかをポーリングします。イベントソースマッピングは、シャードの他のコンシューマーと読み取りスループットを共有します。

Kinesis Data Streams の詳細については、Reading Data from Amazon Kinesis Data Streams を参照してください。

注記

Kinesis は、各シャードに対して課金し、拡張ファンアウトの場合はストリームから読み取られたデータに対して課金します。料金の詳細については、Amazon Kinesis の料金を参照してください。

ポーリングストリームとバッチストリーム

Lambda はデータストリームからレコードを読み取り、関数を、ストリームのレコードを含むイベントと共に同期的に呼び出します。Lambda はバッチ単位でレコードを読み取り、関数を呼び出してバッチからレコードを処理します。各バッチには、単一のシャード/データストリームのレコードが含まれます。

標準の Kinesis データストリームの場合、Lambda はストリーム内のシャードをシャードごとに 1 秒に 1 回の割合でポーリングしてレコードを取得します。Kinesis 拡張ファンアウトの場合、Lambda は HTTP/2 接続を使用して Kinesis からプッシュされるレコードをリッスンします。レコードが利用可能になると、Lambda は関数を呼び出し、結果を待機します。

デフォルトで、Lambda はレコードが使用可能になると同時に関数を呼び出します。Lambda がイベントソースから読み取るバッチにレコードが 1 つしかない場合、Lambda は関数に 1 つのレコードしか送信しません。少数のレコードで関数を呼び出さないようにするには、バッチ処理ウィンドウを設定することで、最大 5 分間レコードをバッファリングするようにイベントソースに指示できます。関数を呼び出す前に、Lambda は、完全なバッチを収集する、バッチ処理ウィンドウの期限が切れる、またはバッチが 6 MB のペイロード制限に到達するまでイベントソースからのレコードの読み取りを継続します。詳細については、「バッチ処理動作」を参照してください。

警告

Lambda イベントソースマッピングは各イベントを少なくとも 1 回処理し、レコードの重複処理が発生する可能性があります。重複するイベントに関連する潜在的な問題を避けるため、関数コードを冪等にすることを強くお勧めします。詳細については、 AWS ナレッジセンターの「Lambda 関数を冪等にするにはどうすればよいですか?」を参照してください。

Lambda は、設定された拡張機能の完了を待たずに、次のバッチを処理のために送信します。つまり、Lambda がレコードの次のバッチを処理する間も拡張機能を実行し続けることができます。アカウントの同時実行設定または制限に違反すると、スロットリングの問題が発生する可能性があります。これが潜在的な問題であるかどうかを検出するには、関数を監視し、イベントソースマッピングで予想よりも高い同時実行メトリクスが表示されているかどうかを確認します。呼び出しの間隔が短いため、Lambda はシャードの数よりも高い同時実行使用量を一時的に報告する場合があります。これは、拡張機能のない Lambda 関数にも当てはまります。

Kinesis データストリームの 1 つのシャードを複数の Lambda 呼び出しで同時に処理するには、ParallelizationFactor 設定を構成します。Lambda がシャードからポーリングする同時バッチの数は、1 (デフォルト)~10 の並列化係数で指定できます。例えば、ParallelizationFactor を 2 に設定すると、最大 200 個の Lambda 呼び出しを同時に実行して、100 個の Kinesis データシャードを処理できます (ただし、ConcurrentExecutions メトリクスに異なる値が表示される場合があります)。これにより、データボリュームが揮発性で IteratorAge が高いときに処理のスループットをスケールアップすることができます。シャードごとの同時実行バッチの数を増やしても、Lambda はパーティションキーレベルで順序立った処理を確実に行います。

Kinesis 集約で ParallelizationFactor を使用することもできます。イベントソースマッピングの動作は、拡張ファンアウトを使用しているかどうかによって異なります。

  • 拡張ファンアウトなし: 集約イベント内のすべてのイベントは、同じパーティションキーを持つ必要があります。パーティションキーは、集約イベントのパーティションキーとも一致する必要があります。集約イベント内のイベントに異なるパーティションキーがある場合、Lambda ではパーティションキーによるイベントが順序通りに処理されないことがあります。

  • 拡張ファンアウトあり: まず、Lambda は集約イベントを個々のイベントにデコードします。集約イベントには、含まれるイベントとは異なるパーティションキーを設定できます。ただし、パーティションキーに一致しないイベントは削除され、失われます。Lambda ではこれらのイベントは処理されず、設定された障害時の送信先には送信されません。

イベントの例

{ "Records": [ { "kinesis": { "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692538361571095921575989136588898", "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "approximateArrivalTimestamp": 1545084650.987 }, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", "awsRegion": "us-east-2", "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" }, { "kinesis": { "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692540925702759324208523137515618", "data": "VGhpcyBpcyBvbmx5IGEgdGVzdC4=", "approximateArrivalTimestamp": 1545084711.166 }, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692540925702759324208523137515618", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", "awsRegion": "us-east-2", "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" } ] }