Amazon EventBridge Pipes フィルタリング
EventBridge Pipes を使用すると、特定のソースのイベントをフィルタリングして、そのサブセットのみを処理できます。このフィルタリングは、イベントパターンを使用して、EventBridge イベントバスや Lambda イベントソースマッピングでフィルタリングするのと同じように機能します。イベントパターンの詳細については、「Amazon EventBridge のイベントパターン」を参照してください。
フィルター条件 FilterCriteria
オブジェクトは、フィルターのリスト (Filters
) で構成される構造です。各フィルターは、フィルタリングパターン (Pattern
) を定義する構造です。Pattern
は、JSON フィルタールールの文字列表現です。FilterCriteria
オブジェクトは、以下の例のようになります。
{ "Filters": [ {"Pattern": "{ \"Metadata1\": [ rule1 ], \"data\": { \"Data1\": [ rule2 ] }}" } ] }
以下は、わかりやすくするためにプレーン JSON で展開したフィルターの Pattern
の値を記載しています。
{ "Metadata1": [ pattern1 ], "data": {"Data1": [ pattern2 ]} }
FilterCriteria
オブジェクトには、メタデータプロパティおよびデータプロパティの主要部分で構成されています。
メタデータプロパティは、イベントオブジェクトのフィールドです。この例では、
FilterCriteria.Metadata1
はメタデータプロパティを参照します。データプロパティは、イベント本文のフィールドです。この例では、
FilterCriteria.Data1
はデータプロパティを参照します。
たとえば、Kinesis ストリームに次のようなイベントが含まれているとします。
{ "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692538361571095921575989136588898", "data": {"City": "Seattle", "State": "WA", "Temperature": "46", "Month": "December" }, "approximateArrivalTimestamp": 1545084650.987 }
イベントがパイプを通過すると、data
フィールドが base64 でエンコードされた状態で次のようになります。
{ "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692538361571095921575989136588898", "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "approximateArrivalTimestamp": 1545084650.987 "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", "awsRegion": "us-east-2", "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" },
Kinesis イベントのメタデータプロパティは、partitionKey
または sequenceNumber
など、data
オブジェクト外の任意のフィールドです。
Kinesis イベントのデータプロパティは、City
または Temperature
などの data
オブジェクト内のフィールドです。
このイベントに一致するようにフィルタリングすると、デコードされたフィールドにフィルターを使用できます。たとえば、partitionKey
および City
でフィルターをオンにするには、次のフィルターを使用します。
{ "partitionKey": [ "1" ], "data": { "City": [ "Seattle" ] }
イベントフィルターを作成すると、EventBridge Pipes はイベントコンテンツにアクセスできます。このコンテンツは、Amazon SQS body
フィールドのように JSON でエスケープされるか、Kinesis data
フィールドのように base64 でエンコードされます。データが有効な JSON であれば、ターゲットパラメータの入力テンプレートまたは JSON パスでコンテンツを直接参照できます。たとえば、Kinesis イベントソースが有効な JSON の場合は、<$.data.someKey>
を使用して変数を参照できます。
イベントパターンを作成する場合、ポーリング操作によって追加されたフィールドではなく、ソース API から送信されたフィールドに基づいてフィルタリングできます。次のフィールドはイベントパターンで使用することはできません。
awsRegion
eventSource
eventSourceARN
eventVersion
eventID
eventName
invokeIdentityArn
eventSourceKey
メッセージフィールドとデータフィールド
すべての EventBridge Pipe ソースには、コアメッセージまたはデータを含むフィールドが含まれています。これらをメッセージフィールドまたはデータフィールドと呼びます。これらのフィールドは JSON でエスケープまたは base64 でエンコードされている可能性があるため特殊ですが、有効な JSON の場合は、本文がエスケープされていないかのように JSON パターンでフィルタリングできます。これらのフィールドの内容は、Input Transformers でもシームレスに使用できます。
フィルタリングが適切な Amazon SQS メッセージ
Amazon SQS メッセージがフィルター条件を満たさない場合、EventBridge はそのメッセージをキューから自動的に削除します。Amazon SQS でこれらのメッセージを手動で削除する必要はありません。
Amazon SQS の場合、メッセージ の body
は任意の文字列にすることができますが、body
が有効な JSON フォーマットであることを FilterCriteria
が期待する場合は、これが問題になる可能性があります。逆の場合も同様です。着信メッセージの body
が JSON 形式である場合に、body
がプレーン文字列であることをフィルター条件が期待していると、意図しない動作が発生する可能性があります。
この問題を回避するには、FilterCriteria
内の body
の形式が、キューから受け取るメッセージの body
に期待される形式と一致することを確認してください。メッセージをフィルタリングする前に、 EventBridgeは着信メッセージの body
の形式と body
に対するフィルターパターンの形式を自動的に評価します。一致しない場合、EventBridge はメッセージをドロップします。以下は、この評価を要約した表です。
着信メッセージの body の形式 |
フィルターパターンの body の形式 |
結果として生じるアクション |
---|---|---|
プレーン文字列 |
プレーン文字列 |
EventBridge がフィルター条件に基づいてフィルタリングします。 |
プレーン文字列 |
データプロパティのフィルターパターンがない |
EventBridge がフィルター条件に基づいて (他のメタデータプロパティのみを) フィルタリングします。 |
プレーン文字列 |
有効な JSON |
EventBridge がメッセージをドロップします。 |
有効な JSON |
プレーン文字列 |
EventBridge がメッセージをドロップします。 |
有効な JSON |
データプロパティのフィルターパターンがない |
EventBridge がフィルター条件に基づいて (他のメタデータプロパティのみを) フィルタリングします。 |
有効な JSON |
有効な JSON |
EventBridge がフィルター条件に基づいてフィルタリングします。 |
FilterCriteria
の一部として body
を含めない場合、EventBridge はこのチェックをスキップします。
フィルタリングが適切な Kinesis メッセージと DynamoDB メッセージ
フィルター条件が Kinesis または DynamoDB レコードを処理すると、ストリームイテレータはこのレコードを通り越して先に進みます。レコードがフィルター条件を満たさない場合に、そのレコードをイベントソースから手動で削除する必要はありません。保持期間が過ぎると、Kinesis と DynamoDB はこれらの古いレコードを自動的に削除します。それより早くレコードを削除したい場合は、「Changing the Data Retention Period」(データ保持期間の変更) を参照してください。
ストリームイベントソースからのイベントを適切にフィルタリングするには、データフィールドとデータフィールドのフィルター条件の両方が有効な JSON 形式である必要があります。(Kinesis のデータフィールドは data
で、DynamoDB のデータフィールドは dynamodb
です。) フィールドのどちらかが有効な JSON 形式ではない場合、EventBridge はメッセージをドロップするか、例外をスローします。以下は、特定の動作を要約した表です。
着信データの形式 (data または dynamodb ) |
データプロパティのフィルターパターンの形式 | 結果として生じるアクション |
---|---|---|
有効な JSON |
有効な JSON |
EventBridge がフィルター条件に基づいてフィルタリングします。 |
有効な JSON |
データプロパティのフィルターパターンがない |
EventBridge がフィルター条件に基づいて (他のメタデータプロパティのみを) フィルタリングします。 |
有効な JSON |
JSON 以外 |
EventBridge がパイプの作成または更新時に例外をスローします。データプロパティのフィルターパターンは、有効な JSON 形式である必要があります。 |
JSON 以外 |
有効な JSON |
EventBridge がレコードをドロップします。 |
JSON 以外 |
データプロパティのフィルターパターンがない |
EventBridge がフィルター条件に基づいて (他のメタデータプロパティのみを) フィルタリングします。 |
JSON 以外 |
JSON 以外 |
EventBridge がパイプの作成または更新時に例外をスローします。データプロパティのフィルターパターンは、有効な JSON 形式である必要があります。 |
Amazon Managed Streaming for Apache Kafka、セルフマネージド Apache Kafka、および Amazon MQ メッセージの適切なフィルタリング
注記
Apache Kafka または Amazon MQ イベントソースにフィルター条件をアタッチした後は、フィルタリングルールがイベントに適用されるまで最大 15 分かかる場合があります。
Amazon MQ ソースの場合、メッセージフィールドは data
になります。Apache Kafka ソース (Amazon MSK およびセルフマネージド Apache Kafka) の場合は、key
と value
の 2 つのメッセージフィールドがあります。
EventBridge は、フィルターに含まれるすべてのフィールドに一致しないメッセージをドロップします。Apache Kafka の場合、EventBridge は関数を正常に呼び出した後、一致するメッセージと一致しないメッセージのオフセットをコミットします。Amazon MQ の場合、EventBridge は関数を正常に呼び出した後で一致するメッセージを認識し、一致しないメッセージはフィルタリング時に認識します。
Apache Kafka メッセージと Amazon MQ メッセージは UTF-8 でエンコードされた文字列 (プレーン文字列または JSON 形式) である必要があります。これは、EventBridge が Apache Kafka と Amazon MQ のバイト配列を UTF-8 にデコードした後でフィルター条件を適用するからです。メッセージが UTF-16 や ASCII などの別のエンコーディングを使用している場合、またはメッセージ形式が FilterCriteria
形式と一致しない場合、EventBridge はメタデータフィルターのみを処理します。以下は、特定の動作を要約した表です。
着信メッセージの形式 (data 、または key と value ) |
メッセージプロパティのフィルターパターン形式 | 結果として生じるアクション |
---|---|---|
プレーン文字列 |
プレーン文字列 |
EventBridge がフィルター条件に基づいてフィルタリングします。 |
プレーン文字列 |
データプロパティのフィルターパターンがない |
EventBridge がフィルター条件に基づいて (他のメタデータプロパティのみを) フィルタリングします。 |
プレーン文字列 |
有効な JSON |
EventBridge がフィルター条件に基づいて (他のメタデータプロパティのみを) フィルタリングします。 |
有効な JSON |
プレーン文字列 |
EventBridge がフィルター条件に基づいて (他のメタデータプロパティのみを) フィルタリングします。 |
有効な JSON |
データプロパティのフィルターパターンがない |
EventBridge がフィルター条件に基づいて (他のメタデータプロパティのみを) フィルタリングします。 |
有効な JSON |
有効な JSON |
EventBridge がフィルター条件に基づいてフィルタリングします。 |
UTF-8 以外でエンコードされた文字 |
JSON、プレーン文字列、またはパターンなし |
EventBridge がフィルター条件に基づいて (他のメタデータプロパティのみを) フィルタリングします。 |
Lambda ESM と EventBridge Pipes の違い
イベントをフィルタリングする場合、Lambda ESM と EventBridge Pipes は一般的に同じように動作します。主な違いは、eventSourceKey
フィールドが ESM ペイロードに存在しないことです。