Lambda のイベントフィルタリング
イベントフィルタリングを使用して、Lambda が関数に送信するストリームまたはキューからのレコードを制御することができます。たとえば、フィルターを追加して、関数が特定のデータパラメータを含む Amazon SQS メッセージのみを処理するようにできます。イベントフィルタリングはイベントソースマッピングと連動します。次の AWS サービスのイベントソースマッピングにフィルターを追加できます。
-
Amazon DynamoDB
-
Amazon Kinesis Data Streams
-
Amazon MQ
-
Amazon Managed Streaming for Apache Kafka (Amazon MSK)
-
セルフマネージド Apache Kafka
-
Amazon Simple Queue Service (Amazon SQS)
Lambda は Amazon DocumentDB のイベントフィルタリングをサポートしていません。
デフォルトでは、単一のイベントソースマッピングに最大 5 つの異なるフィルターを定義することができます。フィルターは論理 OR で結合されています。イベントソースのレコードが 1 つ以上のフィルター条件を満たす場合、Lambda は関数に送信する次のイベントにそのレコードを含めます。どのフィルターも条件を満たさない場合、Lambda はレコードを破棄します。
注記
イベントソースに 5 つを超えるフィルターを定義する必要がある場合、イベントソースごとに最大 10 フィルターまでクォータ引き上げをリクエストできます。現在のクォータで許可されている数よりも多くのフィルターを追加しようとすると、イベントソースの作成時に Lambda はエラーを返します。
トピック
イベントフィルタリングの基本
フィルター条件 (FilterCriteria
) オブジェクトは、フィルターのリスト (Filters
) で構成される構造です。各フィルターは、イベントのフィルタリングパターン (Pattern
) を定義する構造です。パターンは、JSON フィルタールールを文字列で表したものです。FilterCriteria
オブジェクトの構造は次の通りです。
{ "Filters": [ { "Pattern": "{ \"Metadata1\": [ rule1 ], \"data\": { \"Data1\": [ rule2 ] }}" } ] }
以下は、わかりやすくするためにプレーン JSON で展開したフィルターの Pattern
の値を記載しています。
{ "Metadata1": [ rule1 ], "data": { "Data1": [ rule2 ] } }
フィルターパターンには、メタデータプロパティ、データプロパティ、またはその両方を含めることができます。使用可能なメタデータパラメータおよびデータパラメータの形式は、イベントソースとして機能している AWS のサービス によって異なります。たとえば、イベントソースマッピングが Amazon SQS キューから次のレコードを受け取ったとします。
{ "messageId": "059f36b4-87a3-44ab-83d2-661975830a7d", "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a...", "body": "{\n "City": "Seattle",\n "State": "WA",\n "Temperature": "46"\n}", "attributes": { "ApproximateReceiveCount": "1", "SentTimestamp": "1545082649183", "SenderId": "AIDAIENQZJOLO23YVJ4VO", "ApproximateFirstReceiveTimestamp": "1545082649185" }, "messageAttributes": {}, "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", "eventSource": "aws:sqs", "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue", "awsRegion": "us-east-2" }
-
メタデータプロパティは、レコードを作成したイベントに関する情報を含むフィールドです。Amazon SQS レコードの例では、メタデータプロパティには
messageID
、eventSourceArn
、awsRegion
などのフィールドが含まれます。 -
データプロパティは、ストリームまたはキューからのデータを含むレコードのフィールドです。Amazon SQS イベントの例では、データフィールドのキーは
body
であり、データプロパティはフィールドCity
、State
、Temperature
です。
イベントソースの種類が異なれば、データフィールドに使用するキー値も異なります。データプロパティをフィルタリングするには、フィルターのパターンに正しいキーが使われていることを確認してください。データフィルタリングキーのリストおよびサポートされている各 AWS のサービス のフィルターパターンの例については、異なる AWS のサービス を持つフィルターの使用 を参照してください。
イベントフィルタリングは、マルチレベル JSON のフィルタリングを処理できます。たとえば、DynamoDB ストリームのレコードに次のフラグメントがあるとします。
"dynamodb": { "Keys": { "ID": { "S": "ABCD" } "Number": { "N": "1234" }, ... }
ソートキー Number
の値が 4567 のレコードのみを処理するとします。この場合、FilterCriteria
オブジェクトは以下のようになります。
{ "Filters": [ { "Pattern": "{ \"dynamodb\": { \"Keys\": { \"Number\": { \"N\": [ "4567" ] } } } }" } ] }
以下は、わかりやすくするためにプレーン JSON で展開したフィルターの Pattern
の値を記載しています。
{ "dynamodb": { "Keys": { "Number": { "N": [ "4567" ] } } } }
フィルター条件を満たさないレコードの処理
フィルター条件を満たさないレコードの処理方法は、イベントソースによって異なります。
Amazon SQSでは、メッセージがフィルター条件を満たさない場合、Lambda はそのメッセージをキューから自動的に削除します。Amazon SQS でこれらのメッセージを手動で削除する必要はありません。
-
Kinesis および DynamoDB では、フィルター条件がレコードを処理すると、ストリームイテレータはこのレコードを通り越して先に進みます。レコードがフィルター条件を満たさない場合に、そのレコードをイベントソースから手動で削除する必要はありません。保持期間が過ぎると、Kinesis と DynamoDB はこれらの古いレコードを自動的に削除します。それより早くレコードを削除したい場合は、「Changing the Data Retention Period」(データ保持期間の変更) を参照してください。
-
Amazon MSK、セルフマネージド Apache Kafka、Amazon MQ メッセージの場合、Lambda はフィルターに含まれるすべてのフィールドに一致しないメッセージを削除します。Apache Kafka では、Lambda は関数を正常に呼び出した後、一致するメッセージと一致しないメッセージのオフセットをコミットします。Amazon MQ の場合、Lambda は関数を正常に呼び出した後で一致するメッセージを認識し、一致しないメッセージはフィルタリング時に認識します。
フィルタールールの構文
フィルタールールの場合、Lambda は Amazon EventBridge ルールのサブセットをサポートしており、EventBridge と同じ構文を使用します。詳細については、「Amazon EventBridge ユーザーガイド」の「Amazon EventBridge のイベントパターン」を参照してください。
以下は、Lambda のイベントフィルタリングで使用できるすべての比較演算子の概要です。Equals (大文字と小文字を区別しない)、Or (複数フィールド)、Ends with の EventBridge 比較演算子はサポートされていないことに注意してください。
Comparison operator (比較演算子) | 例 | ルール構文 |
---|---|---|
Null |
UserId が Null |
"UserID": [ null ] |
空 |
LastName が空白 |
"LastName": [""] |
等しい |
Name が「Alice」 |
"Name": [ "Alice" ] |
および |
Location が「New York」、および Day が「Monday」 |
"Location": [ "New York" ], "Day": ["Monday"] |
または |
PaymentType が「Credit」または「Debit」 |
"PaymentType": [ "Credit", "Debit"] |
以外 |
Weather が「Raining」以外 |
"Weather": [ { "anything-but": [ "Raining" ] } ] |
数値 (等しい) |
Price が 100 |
"Price": [ { "numeric": [ "=", 100 ] } ] |
数値 (範囲) |
Price が 10 より大きく 20 以下 |
"Price": [ { "numeric": [ ">", 10, "<=", 20 ] } ] |
存在する |
ProductName が存在 |
"ProductName": [ { "exists": true } ] |
存在しない |
ProductName が存在しない |
"ProductName": [ { "exists": false } ] |
から始まる |
Region が US にある |
"Region": [ {"prefix": "us-" } ] |
注記
EventBridge と同様に、Lambda は文字列に対して文字単位の厳密な一致を使用し、大文字変換やその他の文字列の正規化は行いません。数値の場合、Lambda は文字列表現も使用します。たとえば、300、300.0、3.0e2 は等しいとはみなされません。
イベントソースマッピングへのフィルター条件のアタッチ (コンソール)
Lambda コンソールを使用してフィルター条件を持つ新しいイベントソースマッピングを作成するには、以下の手順を実行します。
フィルター条件が設定された新しいイベントソースマッピングを作成する (コンソール)
-
Lambda コンソールの [Functions]
(関数) ページを開きます。 -
イベントソースマッピングを作成する関数の名前を選択します。
-
[Function overview] (関数の概要) で [Add trigger] (トリガーを追加) をクリックします。
-
[Trigger configuration] (トリガーの設定) で、イベントフィルタリングをサポートするトリガータイプを選択します。サポートされているサービスのリストについては、このページの冒頭にあるリストを参照してください。
-
[Additional settings] (追加の設定) を展開します。
-
[Filter criteria] (フィルター条件) で [Add] (追加) を選択してから、フィルターを定義して入力します。たとえば、次の内容を入力できます。
{ "Metadata" : [ 1, 2 ] }
これは、フィールド
Metadata
が 1 または 2 に等しいレコードのみを処理するように Lambda に指示します。引き続き [追加] を選択し、最大許容数までフィルターを追加できます。 -
フィルターの追加を完了したら、[保存] を選択します。
コンソールを使用してフィルター条件を入力するとき、フィルターパターンのみを入力し、Pattern
キーまたはエスケープの引用符を入力する必要はありません。上記の手順のステップ 6 では、{ "Metadata" : [ 1, 2 ] }
は次の FilterCriteria
に対応します。
{ "Filters": [ { "Pattern": "{ \"Metadata\" : [ 1, 2 ] }" } ] }
コンソールでイベントソースマッピングを作成すると、トリガーの詳細にフォーマットされた FilterCriteria
が表示されます。コンソールを使用してイベントフィルターを作成するその他の例については、異なる AWS のサービス を持つフィルターの使用 を参照してください。
イベントソースマッピングへのフィルター条件のアタッチ (AWS CLI)
イベントソースマッピングに以下の FilterCriteria
を設定するとします。
{ "Filters": [ { "Pattern": "{ \"Metadata\" : [ 1, 2 ] }" } ] }
AWS Command Line Interface (AWS CLI) を使用してこれらのフィルター条件を持つ新しいイベントソースマッピングを作成するには、以下のコマンドを実行します。
aws lambda create-event-source-mapping \ --function-name
my-function
\ --event-source-arnarn:aws:sqs:us-east-2:123456789012:my-queue
\ --filter-criteria '{"Filters": [{"Pattern": "{ \"Metadata\" : [ 1, 2 ]}"}]}'
この「create-event-source-mapping」コマンドは、指定された FilterCriteria
を持つ関数 my-function
の新しい Amazon SQS イベントソースマッピングを作成します。
これらのフィルター条件を既存のイベントソースマッピングに追加するには、次のコマンドを実行します。
aws lambda update-event-source-mapping \ --uuid
"a1b2c3d4-5678-90ab-cdef-11111EXAMPLE"
\ --filter-criteria '{"Filters": [{"Pattern": "{ \"Metadata\" : [ 1, 2 ]}"}]}'
イベントソースマッピングを更新するには、その UUID が必要であることに注意してください。UUID は「list-event-source-mappings」コールから取得できます。Lambda は、「create-event-source-mapping」の CLI レスポンスでも UUID を返します。
イベントソースからフィルター条件を削除するには、空の FilterCriteria
オブジェクトを持つ次の「update-event-source-mapping」コマンドを実行できます。
aws lambda update-event-source-mapping \ --uuid
"a1b2c3d4-5678-90ab-cdef-11111EXAMPLE"
\ --filter-criteria "{}"
AWS CLI を使用してイベントフィルターを作成するその他の例については、異なる AWS のサービス を持つフィルターの使用 を参照してください。
イベントソースマッピングへのフィルター条件のアタッチ (AWS SAM)
以下のフィルタ条件を使用するように AWS SAM 内のイベントソースを設定したいとします。
{ "Filters": [ { "Pattern": "{ \"Metadata\" : [ 1, 2 ] }" } ] }
これらのフィルタ条件をイベントソースマッピングに追加するには、イベントソースの YAML テンプレートに以下のスニペットを挿入します。
FilterCriteria: Filters: - Pattern: '{"Metadata": [1, 2]}'
イベントソースマッピング用の AWS SAM テンプレートの作成と設定に関する詳細については、「AWS SAM デベロッパーガイド」の「EventSource」セクションを参照してください。AWS SAM テンプレートを使用してイベントフィルターを作成するその他の例については、異なる AWS のサービス を持つフィルターの使用 を参照してください。
異なる AWS のサービス を持つフィルターの使用
イベントソースの種類が異なれば、データフィールドに使用するキー値も異なります。データプロパティをフィルタリングするには、フィルターのパターンに正しいキーが使われていることを確認してください。次の表では、サポートされている各 AWS のサービス のフィルタリングキーが示されます。
AWS のサービス | フィルタリングキー |
---|---|
DynamoDB | dynamodb |
Kinesis | data |
Amazon MQ | data |
Amazon MSK | value |
セルフマネージド Apache Kafka | value |
Amazon SQS | body |
次のセクションでは、さまざまなタイプのイベントソースにおけるフィルターパターンの例が示されます。サポートされている受信データ形式の定義およびサポートされている各サービスのフィルターパターン本体形式も提供します。
DynamoDB でフィルタリング
プライマリキー CustomerName
、属性 AccountManager
、属性 PaymentTerms
を含む DynamoDB テーブルがあるとします。次の内容では、DynamoDB テーブルのストリームからのレコード例が示されています。
{ "eventID": "1", "eventVersion": "1.0", "dynamodb": { "ApproximateCreationDateTime": "1678831218.0" "Keys": { "CustomerName": { "S": "AnyCompany Industries" }, "NewImage": { "AccountManager": { "S": "Pat Candella" }, "PaymentTerms": { "S": "60 days" }, "CustomerName": { "S": "AnyCompany Industries" }, "SequenceNumber": "111", "SizeBytes": 26, "StreamViewType": "NEW_AND_OLD_IMAGES" }
DynamoDB テーブルのキーおよび属性値に基づいてフィルタリングするには、レコードで dynamodb
キーを使用します。プライマリキー CustomerName
が「AnyCompany Industries」のレコードのみを関数で処理するとします。FilterCriteria
オブジェクトは次のようになります。
{ "Filters": [ { "Pattern": "{ \"dynamodb\" : { \"Keys\" : { \"CustomerName\" : { \"S\" : [ \"AnyCompany Industries\" ] } } } }" } ] }
以下は、わかりやすくするためにプレーン JSON で展開したフィルターの Pattern
の値を記載しています。
{ "dynamodb": { "Keys": { "CustomerName": { "S": [ "AnyCompany Industries" ] } } } }
コンソール、AWS CLI、または AWS SAM テンプレートを使用してフィルターを追加できます。
DynamoDB を使用すると、NewImage
および OldImage
キーを使用して属性値をフィルタリングすることもできます。最新のテーブル画像の AccountManager
属性が「Pat Candella」または「Shirley Rodriguez」のレコードをフィルタリングするとします。FilterCriteria
オブジェクトは次のようになります。
{ "Filters": [ { "Pattern": "{ \"dynamodb\" : { \"NewImage\" : { \"AccountManager\" : { \"S\" : [ \"Pat Candella\", \"Shirley Rodriguez\" ] } } } }" } ] }
以下は、わかりやすくするためにプレーン JSON で展開したフィルターの Pattern
の値を記載しています。
{ "dynamodb": { "NewImage": { "AccountManager": { "S": [ "Pat Candella", "Shirley Rodriguez" ] } } } }
コンソール、AWS CLI、または AWS SAM テンプレートを使用してフィルターを追加できます。
注記
DynamoDB イベントフィルタリングは、数値演算子 (数値等式および数値範囲) の使用をサポートしていません。テーブルの項目が数値として保存されている場合でも、これらのパラメータは JSON レコードオブジェクトの文字列に変換されます。
DynamoDB ソースのイベントを適切にフィルタリングするには、データフィールドおよびデータフィールド (dynamodb
) のフィルター条件の両方が有効な JSON 形式である必要があります。フィールドのどちらかが有効な JSON 形式ではない場合、Lambda はメッセージをドロップするか、例外をスローします。以下は、特定の動作を要約した表です。
着信データの形式 | データプロパティのフィルターパターンの形式 | 結果として生じるアクション |
---|---|---|
有効な JSON |
有効な JSON |
Lambda がフィルター条件に基づいてフィルタリングを実行します。 |
有効な JSON |
データプロパティのフィルターパターンがない |
Lambda がフィルター条件に基づいて (他のメタデータプロパティのみを) フィルタリングします。 |
有効な JSON |
JSON 以外 |
Lambda がイベントソースマッピングの作成または更新時に例外をスローします。データプロパティのフィルターパターンは、有効な JSON 形式である必要があります。 |
JSON 以外 |
有効な JSON |
Lambda がレコードをドロップします。 |
JSON 以外 |
データプロパティのフィルターパターンがない |
Lambda がフィルター条件に基づいて (他のメタデータプロパティのみを) フィルタリングします。 |
JSON 以外 |
JSON 以外 |
Lambda がイベントソースマッピングの作成または更新時に例外をスローします。データプロパティのフィルターパターンは、有効な JSON 形式である必要があります。 |
Kinesis でフィルタリング
プロデューサーが JSON 形式のデータを Kinesis データストリームに入力するとします。レコードの例は次のようになり、data
フィールドで JSON データが Base64 でエンコードされた文字列に変換されます。
{ "kinesis": { "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692538361571095921575989136588898", "data": "eyJSZWNvcmROdW1iZXIiOiAiMDAwMSIsICJUaW1lU3RhbXAiOiAieXl5eS1tbS1kZFRoaDptbTpzcyIsICJSZXF1ZXN0Q29kZSI6ICJBQUFBIn0=", "approximateArrivalTimestamp": 1545084650.987 }, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", "awsRegion": "us-east-2", "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" }
プロデューサーがストリームに入力するデータが有効な JSON である限り、イベントフィルタリングを使用して data
キーを使用するレコードをフィルタリングできます。プロデューサーが次の JSON 形式でレコードを Kinesis ストリームに入力するとします。
{ "record": 12345, "order": { "type": "buy", "stock": "ANYCO", "quantity": 1000 } }
注文タイプが「購入」のレコードのみをフィルタリングするには、FilterCriteria
オブジェクトは次のようになります。
{ "Filters": [ { "Pattern": "{ \"data\" : { \"order\" : { \"type\" : [ \"buy\" ] } } }" } ] }
以下は、わかりやすくするためにプレーン JSON で展開したフィルターの Pattern
の値を記載しています。
{ "data": { "order": { "type": [ "buy" ] } } }
コンソール、AWS CLI、または AWS SAM テンプレートを使用してフィルターを追加できます。
Kinesis ソースからイベントを適切にフィルタリングするには、データフィールドおよびデータフィールドのフィルター条件の両方が有効な JSON 形式である必要があります。フィールドのどちらかが有効な JSON 形式ではない場合、Lambda はメッセージをドロップするか、例外をスローします。以下は、特定の動作を要約した表です。
着信データの形式 | データプロパティのフィルターパターンの形式 | 結果として生じるアクション |
---|---|---|
有効な JSON |
有効な JSON |
Lambda がフィルター条件に基づいてフィルタリングを実行します。 |
有効な JSON |
データプロパティのフィルターパターンがない |
Lambda がフィルター条件に基づいて (他のメタデータプロパティのみを) フィルタリングします。 |
有効な JSON |
JSON 以外 |
Lambda がイベントソースマッピングの作成または更新時に例外をスローします。データプロパティのフィルターパターンは、有効な JSON 形式である必要があります。 |
JSON 以外 |
有効な JSON |
Lambda がレコードをドロップします。 |
JSON 以外 |
データプロパティのフィルターパターンがない |
Lambda がフィルター条件に基づいて (他のメタデータプロパティのみを) フィルタリングします。 |
JSON 以外 |
JSON 以外 |
Lambda がイベントソースマッピングの作成または更新時に例外をスローします。データプロパティのフィルターパターンは、有効な JSON 形式である必要があります。 |
Kinesis 集約レコードのフィルタリング
Kinesis を使用すると、複数のレコードを 1 つの Kinesis データストリームレコードに集約し、データスループットを増加させることができます。Lambda は、Kinesis 「拡張ファンアウト」を使用する場合に限り、集約レコードにフィルター条件を適用できます。標準 Kinesis による集約レコードのフィルタリングはサポートされていません。拡張ファンアウトを使用するときは、Kinesis 専用スループットコンシューマーが Lambda 関数のトリガーとして機能するように設定します。次に、Lambda は集約されたレコードをフィルタリングし、フィルター条件を満たすレコードのみを渡します。
Kinesis レコード集約の詳細については、「Kinesis プロデューサーライブラリ (KPL) のキーコンセプト」ページの「集約」セクションを参照してください。Kinesis 拡張ファンアウトを用いた Lambda の使用に関する詳細については、「AWS コンピュートブログ」の「Amazon Kinesis Data Streams 拡張ファンアウトおよび AWS Lambda でのリアルタイムストリーム処理パフォーマンスの向上
Amazon MQ でフィルタリング
注記
Amazon MQ のイベントソースマッピングを新しいフィルター条件で更新するとき、Lambda が変更を適用するまで最大 15 分かかる場合があります。この期間が経過する前でも、以前のフィルター設定はまだ有効です。これは、コンソールに表示されるイベントソースマッピングのステータスが Enabling
から Enabled
に変わった場合でも当てはまります。
Amazon MQ メッセージキューには、有効な JSON 形式またはプレーン文字列でメッセージが含まれているとします。レコードの例は次のようになり、data
フィールドでデータが Base64 でエンコードされた文字列に変換されます。
Active MQ および Rabbit MQ ブローカーの両方では、イベントフィルタリングを使用して data
キーを使用するレコードをフィルタリングできます。Amazon MQ キューに次の JSON 形式のメッセージが含まれているとします。
{ "timeout": 0, "IPAddress": "203.0.113.254" }
timeout
フィールドが 0 より大きいレコードのみをフィルタリングするには、FilterCriteria
オブジェクトは次のようになります。
{ "Filters": [ { "Pattern": "{ \"data\" : { \"timeout\" : [ { \"numeric\": [ \">\", 0] } } ] } }" } ] }
以下は、わかりやすくするためにプレーン JSON で展開したフィルターの Pattern
の値を記載しています。
{ "data": { "timeout": [ { "numeric": [ ">", 0 ] } ] } }
コンソール、AWS CLI、または AWS SAM テンプレートを使用してフィルターを追加できます。
Amazon MQ を使用すると、メッセージがプレーン文字列のレコードをフィルタリングすることもできます。メッセージが「Result:」で始まるレコードのみを処理するとします。FilterCriteria
オブジェクトは次のようになります。
{ "Filters": [ { "Pattern": "{ \"data\" : [ { \"prefix\": \"Result: \" } ] }" } ] }
以下は、わかりやすくするためにプレーン JSON で展開したフィルターの Pattern
の値を記載しています。
{ "data": [ { "prefix": "Result: " } ] }
コンソール、AWS CLI、または AWS SAM テンプレートを使用してフィルターを追加できます。
Amazon MQ メッセージは UTF-8 でエンコードされた文字列 (プレーン文字列または JSON 形式) である必要があります。これは、Lambda がフィルター条件を適用する前に Amazon MQ のバイト配列を UTF-8 にデコードするためです。メッセージが UTF-16 や ASCII などの別のエンコーディングを使用している場合、またはメッセージ形式が FilterCriteria
形式と一致しない場合、Lambda はメタデータフィルターのみを処理します。以下は、特定の動作を要約した表です。
着信メッセージの の形式 | メッセージプロパティのフィルターパターン形式 | 結果として生じるアクション |
---|---|---|
プレーン文字列 |
プレーン文字列 |
Lambda がフィルター条件に基づいてフィルタリングを実行します。 |
プレーン文字列 |
データプロパティのフィルターパターンがない |
Lambda がフィルター条件に基づいて (他のメタデータプロパティのみを) フィルタリングします。 |
プレーン文字列 |
有効な JSON |
Lambda がフィルター条件に基づいて (他のメタデータプロパティのみを) フィルタリングします。 |
有効な JSON |
プレーン文字列 |
Lambda がフィルター条件に基づいて (他のメタデータプロパティのみを) フィルタリングします。 |
有効な JSON |
データプロパティのフィルターパターンがない |
Lambda がフィルター条件に基づいて (他のメタデータプロパティのみを) フィルタリングします。 |
有効な JSON |
有効な JSON |
Lambda がフィルター条件に基づいてフィルタリングを実行します。 |
UTF-8 以外でエンコードされた文字 |
JSON、プレーン文字列、またはパターンなし |
Lambda がフィルター条件に基づいて (他のメタデータプロパティのみを) フィルタリングします。 |
Amazon MSK およびセルフマネージド Apache Kafka でフィルタリング
注記
新しいフィルター条件で Amazon MSK またはセルフマネージド Apache Kafka のイベントソースマッピングで更新するとき、Lambda が変更を適用するまで最大 15 分かかる場合があります。この期間が経過する前でも、以前のフィルター設定はまだ有効です。これは、コンソールに表示されるイベントソースマッピングのステータスが Enabling
から Enabled
に変わった場合でも当てはまります。
プロデューサーが Amazon MSK またはセルフマネージド Apache Kafka クラスターのトピックに、有効な JSON 形式またはプレーン文字列として、メッセージを書き込むとします。レコードの例は次のようになり、value
フィールドでメッセージが Base64 でエンコードされた文字列に変換されます。
{ "mytopic-0":[ { "topic":"mytopic", "partition":0, "offset":15, "timestamp":1545084650987, "timestampType":"CREATE_TIME", "value":"SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "headers":[] } ] }
Apache Kafka プロデューサーが次の JSON 形式でトピックにメッセージを書き込むとします。
{ "device_ID": "AB1234", "session":{ "start_time": "yyyy-mm-ddThh:mm:ss", "duration": 162 } }
value
キーを使用してレコードをフィルタリングできます。device_ID
が AB の文字で始まるレコードのみをフィルタリングするとします。FilterCriteria
オブジェクトは次のようになります。
{ "Filters": [ { "Pattern": "{ \"value\" : { \"device_ID\" : [ { \"prefix\": \"AB\" } ] } }" } ] }
以下は、わかりやすくするためにプレーン JSON で展開したフィルターの Pattern
の値を記載しています。
{ "value": { "device_ID": [ { "prefix": "AB" } ] } }
コンソール、AWS CLI、または AWS SAM テンプレートを使用してフィルターを追加できます。
Amazon MSK およびセルフマネージド Apache Kafka では、メッセージがプレーン文字列のレコードをフィルタリングすることもできます。文字列が「error」を含むメッセージを無視するとします。FilterCriteria
オブジェクトは次のようになります。
{ "Filters": [ { "Pattern": "{ \"value\" : [ { \"anything-but\": [ \"error\" ] } ] }" } ] }
以下は、わかりやすくするためにプレーン JSON で展開したフィルターの Pattern
の値を記載しています。
{ "value": [ { "anything-but": [ "error" ] } ] }
コンソール、AWS CLI、または AWS SAM テンプレートを使用してフィルターを追加できます。
Amazon MSK およびセルフマネージド Apache Kafka メッセージは UTF-8 でエンコードされた文字列 (プレーン文字列または JSON 形式) である必要があります。これは、Lambda がフィルター条件を適用する前に Amazon MSK のバイト配列を UTF-8 にデコードするためです。メッセージが UTF-16 や ASCII などの別のエンコーディングを使用している場合、またはメッセージ形式が FilterCriteria
形式と一致しない場合、Lambda はメタデータフィルターのみを処理します。以下は、特定の動作を要約した表です。
着信メッセージの の形式 | メッセージプロパティのフィルターパターン形式 | 結果として生じるアクション |
---|---|---|
プレーン文字列 |
プレーン文字列 |
Lambda がフィルター条件に基づいてフィルタリングを実行します。 |
プレーン文字列 |
データプロパティのフィルターパターンがない |
Lambda がフィルター条件に基づいて (他のメタデータプロパティのみを) フィルタリングします。 |
プレーン文字列 |
有効な JSON |
Lambda がフィルター条件に基づいて (他のメタデータプロパティのみを) フィルタリングします。 |
有効な JSON |
プレーン文字列 |
Lambda がフィルター条件に基づいて (他のメタデータプロパティのみを) フィルタリングします。 |
有効な JSON |
データプロパティのフィルターパターンがない |
Lambda がフィルター条件に基づいて (他のメタデータプロパティのみを) フィルタリングします。 |
有効な JSON |
有効な JSON |
Lambda がフィルター条件に基づいてフィルタリングを実行します。 |
UTF-8 以外でエンコードされた文字 |
JSON、プレーン文字列、またはパターンなし |
Lambda がフィルター条件に基づいて (他のメタデータプロパティのみを) フィルタリングします。 |
Amazon SQS でフィルタリング
Amazon SQS キューに次の JSON 形式のメッセージが含まれているとします。
{ "RecordNumber": 0000, "TimeStamp": "yyyy-mm-ddThh:mm:ss", "RequestCode": "AAAA" }
このキューのレコード例は次のようになります。
{ "messageId": "059f36b4-87a3-44ab-83d2-661975830a7d", "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a...", "body": "{\n "RecordNumber": 0000,\n "TimeStamp": "yyyy-mm-ddThh:mm:ss",\n "RequestCode": "AAAA"\n}", "attributes": { "ApproximateReceiveCount": "1", "SentTimestamp": "1545082649183", "SenderId": "AIDAIENQZJOLO23YVJ4VO", "ApproximateFirstReceiveTimestamp": "1545082649185" }, "messageAttributes": {}, "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", "eventSource": "aws:sqs", "eventSourceARN": "arn:aws:sqs:us-west-2:123456789012:my-queue", "awsRegion": "us-west-2" }
Amazon SQS メッセージの内容に基づいてフィルタリングするには、Amazon SQS メッセージレコードの body
キーを使用します。Amazon SQS メッセージの RequestCode
が「BBBB」のレコードのみを処理するとします。FilterCriteria
オブジェクトは次のようになります。
{ "Filters": [ { "Pattern": "{ \"body\" : { \"RequestCode\" : [ \"BBBB\" ] } }" } ] }
以下は、わかりやすくするためにプレーン JSON で展開したフィルターの Pattern
の値を記載しています。
{ "body": { "RequestCode": [ "BBBB" ] } }
コンソール、AWS CLI、または AWS SAM テンプレートを使用してフィルターを追加できます。
関数が、RecordNumber
が 9999 を超えるレコードのみを処理するとします。FilterCriteria
オブジェクトは次のようになります。
{ "Filters": [ { "Pattern": "{ \"body\" : { \"RecordNumber\" : [ { \"numeric\": [ \">\", 9999 ] } ] } }" } ] }
以下は、わかりやすくするためにプレーン JSON で展開したフィルターの Pattern
の値を記載しています。
{ "body": { "RecordNumber": [ { "numeric": [ ">", 9999 ] } ] } }
コンソール、AWS CLI、または AWS SAM テンプレートを使用してフィルターを追加できます。
Amazon SQS では、メッセージ本文は任意の文字列にすることができます。body
が有効な JSON フォーマットであることが FilterCriteria
で想定されている場合は、これが問題になる可能性があります。逆の場合も同様です。着信メッセージの本文が JSON 形式であっても、フィルター条件が body
をプレーン文字列であると想定する場合、意図しない動作が発生する可能性があります。
この問題を回避するには、FilterCriteria
の本文の形式がキューで受信するメッセージで想定する body
の形式と一致することを確認してください。メッセージをフィルタリングする前に、Lambda は着信メッセージの本文の形式および body
のフィルターパターンの形式を自動的に評価します。一致しない場合、Lambda はメッセージを除外します。この評価のまとめは、以下の表のとおりです。
着信メッセージの body の形式 |
フィルターパターンの body の形式 |
結果として生じるアクション |
---|---|---|
プレーン文字列 |
プレーン文字列 |
Lambda がフィルター条件に基づいてフィルタリングを実行します。 |
プレーン文字列 |
データプロパティのフィルターパターンがない |
Lambda がフィルター条件に基づいて (他のメタデータプロパティのみを) フィルタリングします。 |
プレーン文字列 |
有効な JSON |
Lambda はメッセージを除外します。 |
有効な JSON |
プレーン文字列 |
Lambda はメッセージを除外します。 |
有効な JSON |
データプロパティのフィルターパターンがない |
Lambda がフィルター条件に基づいて (他のメタデータプロパティのみを) フィルタリングします。 |
有効な JSON |
有効な JSON |
Lambda がフィルター条件に基づいてフィルタリングを実行します。 |