Amazon MSK イベントソースでのイベントフィルタリングの使用
イベントフィルタリングを使用して、Lambda が関数に送信するストリームまたはキューからのレコードを制御することができます。イベントフィルタリングの仕組みに関する一般情報については、「Lambda が関数に送信するイベントを制御する」を参照してください。
このセクションでは、Amazon MSK イベントソースのイベントフィルタリングに焦点を当てます。
Amazon MSK イベントフィルタリングの基本
プロデューサーが Amazon MSK クラスターのトピックに、有効な JSON 形式またはプレーン文字列として、メッセージを書き込むとします。レコードの例は次のようになり、value
フィールドでメッセージが Base64 でエンコードされた文字列に変換されます。
{ "mytopic-0":[ { "topic":"mytopic", "partition":0, "offset":15, "timestamp":1545084650987, "timestampType":"CREATE_TIME", "value":"SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "headers":[] } ] }
Apache Kafka プロデューサーが次の JSON 形式でトピックにメッセージを書き込むとします。
{ "device_ID": "AB1234", "session":{ "start_time": "yyyy-mm-ddThh:mm:ss", "duration": 162 } }
value
キーを使用してレコードをフィルタリングできます。device_ID
が AB の文字で始まるレコードのみをフィルタリングするとします。FilterCriteria
オブジェクトは次のようになります。
{ "Filters": [ { "Pattern": "{ \"value\" : { \"device_ID\" : [ { \"prefix\": \"AB\" } ] } }" } ] }
以下は、わかりやすくするためにプレーン JSON で展開したフィルターの Pattern
の値を記載しています。
{ "value": { "device_ID": [ { "prefix": "AB" } ] } }
コンソール、AWS CLI、または AWS SAM テンプレートを使用してフィルターを追加できます。
Amazon MSK では、メッセージがプレーン文字列のレコードをフィルタリングすることもできます。文字列が「error」を含むメッセージを無視するとします。FilterCriteria
オブジェクトは次のようになります。
{ "Filters": [ { "Pattern": "{ \"value\" : [ { \"anything-but\": [ \"error\" ] } ] }" } ] }
以下は、わかりやすくするためにプレーン JSON で展開したフィルターの Pattern
の値を記載しています。
{ "value": [ { "anything-but": [ "error" ] } ] }
コンソール、AWS CLI、または AWS SAM テンプレートを使用してフィルターを追加できます。
Amazon MSK メッセージは UTF-8 でエンコードされた文字列 (プレーン文字列または JSON 形式) である必要があります。これは、Lambda がフィルター条件を適用する前に Amazon MSK のバイト配列を UTF-8 にデコードするためです。メッセージが UTF-16 や ASCII などの別のエンコーディングを使用している場合、またはメッセージ形式が FilterCriteria
形式と一致しない場合、Lambda はメタデータフィルターのみを処理します。以下は、特定の動作を要約した表です。
着信メッセージの形式 | メッセージプロパティのフィルターパターン形式 | 結果として生じるアクション |
---|---|---|
プレーン文字列 |
プレーン文字列 |
Lambda がフィルター条件に基づいてフィルタリングを実行します。 |
プレーン文字列 |
データプロパティのフィルターパターンがない |
Lambda がフィルター条件に基づいて (他のメタデータプロパティのみを) フィルタリングします。 |
プレーン文字列 |
有効な JSON |
Lambda がフィルター条件に基づいて (他のメタデータプロパティのみを) フィルタリングします。 |
有効な JSON |
プレーン文字列 |
Lambda がフィルター条件に基づいて (他のメタデータプロパティのみを) フィルタリングします。 |
有効な JSON |
データプロパティのフィルターパターンがない |
Lambda がフィルター条件に基づいて (他のメタデータプロパティのみを) フィルタリングします。 |
有効な JSON |
有効な JSON |
Lambda がフィルター条件に基づいてフィルタリングを実行します。 |
UTF-8 以外でエンコードされた文字 |
JSON、プレーン文字列、またはパターンなし |
Lambda がフィルター条件に基づいて (他のメタデータプロパティのみを) フィルタリングします。 |