AWS Lambda
開発者ガイド

AWS Lambda を Amazon Kinesis に使用する

AWS Lambda 関数を使用して、Amazon Kinesis データストリームのレコードを処理できます。Kinesis では、多くのソースからデータを収集し、複数のコンシューマーで処理することができます。Lambda では、標準データストリームイテレーターおよび HTTP/2 ストリームコンシューマーがサポートされています。

Lambda は、データストリームからレコードを読み取り、ストリームレコードを含むイベントを使用して関数を同期的に呼び出します。Lambda は、バッチのレコードを読み取り、関数を呼び出してバッチからレコードを処理します。

例 Kinesis レコードイベント

{ "Records": [ { "kinesis": { "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692538361571095921575989136588898", "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "approximateArrivalTimestamp": 1545084650.987 }, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", "awsRegion": "us-east-2", "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" }, { "kinesis": { "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692540925702759324208523137515618", "data": "VGhpcyBpcyBvbmx5IGEgdGVzdC4=", "approximateArrivalTimestamp": 1545084711.166 }, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692540925702759324208523137515618", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", "awsRegion": "us-east-2", "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" } ] }

同じストリームからレコードを読み取るアプリケーションが複数ある場合は、標準イテレーターの代わりに Kinesis ストリームコンシューマーを使用できます。コンシューマーは専用の読み取りスループットがあるため、他のコンシューマーと同じデータを競合する必要はありません。コンシューマーでは、Kinesis はレコードを HTTP/2 接続で Lambda にプッシュします。これにより、レコードと関数呼び出しの追加によるレイテンシーも短縮されます。

デフォルトでは、Lambda はストリームからレコードが利用可能になるとすぐに、関数を呼び出します。ストリームから読み込むバッチに、1 つのレコードしか含まれない場合、Lambda は関数に 1 つのレコードのみを送信します。少数のレコードで関数を呼び出すことを避けるため、[batch window (バッチウィンドウ)] を設定して、最大 5 分間レコードをバッファするようイベントソースに指定できます。Lambda は、関数を呼び出す前に、完全なバッチを収集するか、バッチウィンドウの有効期限が切れるまで、ストリームから継続してレコードを読み取ります。

関数がエラーを返した場合、処理が成功するか、データの有効期限が切れるまで Lambda はバッチを再試行します。この問題が解決されるまで、シャード内のデータは処理されません。保留状態のシャードや潜在的なデータ損失が発生しないように、コードで処理エラーを処理および記録します。

データストリームと関数の設定

Lambda 関数は、データストリームのコンシューマーアプリケーションです。シャードごとに 1 つのレコードのバッチを一度に処理します。Lambda 関数はデータストリーム (標準イテレーター) か、ストリームのコンシューマー (拡張ファンアウト) にマッピングすることができます。

Lambda は、レコードの Kinesis ストリームにある各シャードを 1 秒あたり 1 回の基本レートでポーリングします。利用可能なレコードが増えると、Lambda は設定されている最大バッチサイズより小さいバッチを受け取るまでバッチの処理を維持します。この関数では、シャードの他のコンシューマーと読み取りスループットを共有します。

レイテンシーを最小化し、読み取りスループットを最大化するには、データストリームのコンシューマーを作成します。ストリームコンシューマーは、ストリームから読み取る他のアプリケーションに影響を及ぼさないように、専用の接続を各シャードに割り当てます。専用のスループットは、多数のアプリケーションで同じデータを読み取っている場合や、大きなレコードでストリームを再処理する場合に役立ちます。

ストリームのコンシューマーは HTTP/2 を使用して、長時間にわたる接続とリクエストヘッダーの圧縮でレコードを Lambda にプッシュすることによってレイテンシーを短縮します。ストリームコンシューマーは、Kinesis RegisterStreamConsumer API を使用して作成できます。

$ aws kinesis register-stream-consumer --consumer-name con1 \ --stream-arn arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream { "Consumer": { "ConsumerName": "con1", "ConsumerARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream/consumer/con1:1540591608", "ConsumerStatus": "CREATING", "ConsumerCreationTimestamp": 1540591608.0 } }

関数がレコードを処理するスピードを上げるには、データストリームにシャードを追加します。Lambda は、各シャードのレコードを順序どおりに処理し、関数がエラーを返した場合はシャード内の追加のレコードの処理を停止します。シャードが増えると、一度に処理されるバッチが増え、同時実行のエラーの影響を下げることができます。

シャードごとに 1 つの同時実行を処理できるように関数がスケールアップできない場合、関数の制限の引き上げをリクエストするか同時実行を予約します。関数に使用できる同時実行数は、Kinesis データストリーム内のシャードの数以上になる必要があります。

実行ロールのアクセス権限

Lambda には、Kinesis データストリームに関連するリソースを管理するための以下のアクセス許可が必要です。これを関数の実行ロールに追加します。

AWSLambdaKinesisExecutionRole 管理ポリシーには、これらのアクセス許可が含まれています。詳細については、「AWS Lambda 実行ロール」を参照してください。

イベントソースとしてストリームを設定する

イベントソースマッピングを作成し、データストリームから Lambda 関数にレコードを送信するように Lambda に通知します。複数のイベントソースマッピングを作成することで、複数の Lambda 関数で同じデータを処理したり、1 つの関数で複数のデータストリームの項目を処理したりできます。

Lambda コンソールで Kinesis から読み取るように関数を設定するには、Kinesis トリガーを作成します。

トリガーを作成するには

  1. Lambda コンソール (関数ページ) を開きます。

  2. 関数を選択します。

  3. [Designer] で、[Add trigger] を選択します。

  4. トリガータイプを選択します。

  5. 必要なオプションを設定して [追加] を選択します。

Lambda では、Kinesis イベントソースの次のオプションがサポートされています。

イベントソースオプション

  • Kinesis ストリーム – レコードの読み取り元の Kinesis ストリーム。

  • コンシューマー (オプション) – ストリームコンシューマーを使用して、専用接続でストリームから読み込みます。

  • バッチサイズ – 各バッチ内のシャードから読み取るレコードの数 (最大 10,000)。Lambda は、同期呼び出し (6 MB) に対してイベントの合計サイズがペイロード制限を超えない限り、バッチ内のすべてのレコードを 1 回の呼び出しで関数に渡します。

  • バッチウィンドウ – 関数を呼び出す前にレコードを収集する最大時間 (秒数) を指定します。

  • Starting position (開始位置) – 新しいレコードのみ、既存のすべてのレコード、または特定の日付以降に作成されたレコードを処理します。

    • 最新 – ストリームに追加された新しいレコードを処理します。

    • Trim horizon (水平トリム) – ストリーム内のすべてのレコードを処理します。

    • At timestamp (タイムスタンプで) – 特定の時刻以降のレコードを処理します。

    既存のレコードを処理した後、関数に戻り、新しいレコードの処理が続行されます。

  • 有効 – イベントソースを無効にしてレコードの処理を停止します。Lambda は、処理された最新のレコードを追跡し、再度有効になるとその時点から処理を再開します。

後でイベントソース設定を管理するには、デザイナーでトリガーを選択します。

イベントソースマッピング API

AWS CLI でイベントソースマッピングを作成するには、CreateEventSourceMapping API を使用します。以下の例では、AWS CLI を使用して、my-function という名前の関数を Kinesis データストリームにマッピングします。データストリームは Amazon リソースネーム (ARN) によって指定され、バッチサイズ 500 で、Unix 時間形式のタイムスタンプから始まります。

$ aws lambda create-event-source-mapping --function-name my-function \ --batch-size 500 --starting-position AT_TIMESTAMP --starting-position-timestamp 1541139109 \ --event-source-arn arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream { "UUID": "2b733gdc-8ac3-cdf5-af3a-1827b3b11284", "BatchSize": 500, "MaximumBatchingWindowInSeconds": 0, "EventSourceArn": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream", "FunctionArn": "arn:aws:lambda:us-east-2:123456789012:function:my-function", "LastModified": 1541139209.351, "LastProcessingResult": "No records processed", "State": "Creating", "StateTransitionReason": "User action" }

コンシューマーを使用するには、ストリーム ARN ではなく、コンシューマーの ARN を指定します。

Amazon CloudWatch のメトリクス

関数がレコードのバッチの処理を完了すると、Lambda により IteratorAge メトリクスが発生します。メトリクスは、処理が終了したとき、バッチの最後のレコードがどれくらい時間が経過したレコードであったかを示します。関数が新しいイベントを処理する場合、イテレーターの有効期間を使用して、レコードが追加されてから関数によって処理されるまでのレイテンシーを推定できます。

イテレーターの有効期間が増加傾向の場合、関数に問題があることを示している可能性があります。詳細については、「AWS Lambda のメトリクス」を参照してください。