Kinesis イベントソースでのイベントフィルタリングの使用 - AWS Lambda

Kinesis イベントソースでのイベントフィルタリングの使用

イベントフィルタリングを使用して、Lambda が関数に送信するストリームまたはキューからのレコードを制御することができます。イベントフィルタリングの仕組みに関する一般情報については、「Lambda が関数に送信するイベントを制御する」を参照してください。

このセクションでは、Kinesis イベントソースのイベントフィルタリングに焦点を当てます。

Kinesis イベントフィルタリングの基本

プロデューサーが JSON 形式のデータを Kinesis データストリームに入力するとします。レコードの例は次のようになり、data フィールドで JSON データが Base64 でエンコードされた文字列に変換されます。

{ "kinesis": { "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692538361571095921575989136588898", "data": "eyJSZWNvcmROdW1iZXIiOiAiMDAwMSIsICJUaW1lU3RhbXAiOiAieXl5eS1tbS1kZFRoaDptbTpzcyIsICJSZXF1ZXN0Q29kZSI6ICJBQUFBIn0=", "approximateArrivalTimestamp": 1545084650.987 }, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", "awsRegion": "us-east-2", "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" }

プロデューサーがストリームに入力するデータが有効な JSON である限り、イベントフィルタリングを使用して data キーを使用するレコードをフィルタリングできます。プロデューサーが次の JSON 形式でレコードを Kinesis ストリームに入力するとします。

{ "record": 12345, "order": { "type": "buy", "stock": "ANYCO", "quantity": 1000 } }

注文タイプが「購入」のレコードのみをフィルタリングするには、FilterCriteria オブジェクトは次のようになります。

{ "Filters": [ { "Pattern": "{ \"data\" : { \"order\" : { \"type\" : [ \"buy\" ] } } }" } ] }

以下は、わかりやすくするためにプレーン JSON で展開したフィルターの Pattern の値を記載しています。

{ "data": { "order": { "type": [ "buy" ] } } }

コンソール、AWS CLI、または AWS SAM テンプレートを使用してフィルターを追加できます。

Console

コンソールを使用してこのフィルターを追加するには、イベントソースマッピングへのフィルター条件のアタッチ (コンソール) の指示に従って [フィルター条件] に次の文字列を入力します。

{ "data" : { "order" : { "type" : [ "buy" ] } } }
AWS CLI

AWS Command Line Interface (AWS CLI) を使用してこれらのフィルター条件を持つ新しいイベントソースマッピングを作成するには、以下のコマンドを実行します。

aws lambda create-event-source-mapping \ --function-name my-function \ --event-source-arn arn:aws:kinesis:us-east-2:123456789012:stream/my-stream \ --filter-criteria '{"Filters": [{"Pattern": "{ \"data\" : { \"order\" : { \"type\" : [ \"buy\" ] } } }"}]}'

これらのフィルター条件を既存のイベントソースマッピングに追加するには、次のコマンドを実行します。

aws lambda update-event-source-mapping \ --uuid "a1b2c3d4-5678-90ab-cdef-11111EXAMPLE" \ --filter-criteria '{"Filters": [{"Pattern": "{ \"data\" : { \"order\" : { \"type\" : [ \"buy\" ] } } }"}]}'
AWS SAM

AWS SAM を使用してこのフィルターを追加するには、イベントソースの YAML テンプレートに次のスニペットを追加します。

FilterCriteria: Filters: - Pattern: '{ "data" : { "order" : { "type" : [ "buy" ] } } }'

Kinesis ソースからイベントを適切にフィルタリングするには、データフィールドおよびデータフィールドのフィルター条件の両方が有効な JSON 形式である必要があります。フィールドのどちらかが有効な JSON 形式ではない場合、Lambda はメッセージをドロップするか、例外をスローします。以下は、特定の動作を要約した表です。

着信データの形式 データプロパティのフィルターパターンの形式 結果として生じるアクション

有効な JSON

有効な JSON

Lambda がフィルター条件に基づいてフィルタリングを実行します。

有効な JSON

データプロパティのフィルターパターンがない

Lambda がフィルター条件に基づいて (他のメタデータプロパティのみを) フィルタリングします。

有効な JSON

JSON 以外

Lambda がイベントソースマッピングの作成または更新時に例外をスローします。データプロパティのフィルターパターンは、有効な JSON 形式である必要があります。

JSON 以外

有効な JSON

Lambda がレコードをドロップします。

JSON 以外

データプロパティのフィルターパターンがない

Lambda がフィルター条件に基づいて (他のメタデータプロパティのみを) フィルタリングします。

JSON 以外

JSON 以外

Lambda がイベントソースマッピングの作成または更新時に例外をスローします。データプロパティのフィルターパターンは、有効な JSON 形式である必要があります。

Kinesis 集約レコードのフィルタリング

Kinesis を使用すると、複数のレコードを 1 つの Kinesis データストリームレコードに集約し、データスループットを増加させることができます。Lambda は、Kinesis 「拡張ファンアウト」を使用する場合に限り、集約レコードにフィルター条件を適用できます。標準 Kinesis による集約レコードのフィルタリングはサポートされていません。拡張ファンアウトを使用するときは、Kinesis 専用スループットコンシューマーが Lambda 関数のトリガーとして機能するように設定します。次に、Lambda は集約されたレコードをフィルタリングし、フィルター条件を満たすレコードのみを渡します。

Kinesis レコード集約の詳細については、「Kinesis プロデューサーライブラリ (KPL) のキーコンセプト」ページの「集約」セクションを参照してください。Kinesis 拡張ファンアウトを用いた Lambda の使用に関する詳細については、「AWS コンピュートブログ」の「Amazon Kinesis Data Streams 拡張ファンアウトおよび AWS Lambda でのリアルタイムストリーム処理パフォーマンスの向上」を参照してください。