搭配自我管理的 Apache Kafka 事件來源使用事件篩選
您可以使用事件篩選來控制 Lambda 將哪些記錄從串流或佇列中傳送至函數。如需事件篩選運作方式的一般資訊,請參閱控制 Lambda 將哪些事件傳送至您的函數。
本節重點介紹自我管理的 Apache Kafka 事件來源的事件篩選。
自我管理的 Apache Kafka 事件篩選基本概念
假設生產者正在將訊息寫入自我管理的 Apache Kafka 叢集中的某個主題,可以是有效的 JSON 格式或純字串。範例記錄如下所示,訊息在 value
欄位中會轉換為 Base64 編碼字串。
{ "mytopic-0":[ { "topic":"mytopic", "partition":0, "offset":15, "timestamp":1545084650987, "timestampType":"CREATE_TIME", "value":"SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "headers":[] } ] }
假設 Apache Kafka 生產者正在以下列 JSON 格式將訊息寫入到您的主題。
{ "device_ID": "AB1234", "session":{ "start_time": "yyyy-mm-ddThh:mm:ss", "duration": 162 } }
您可以使用 value
索引鍵來篩選記錄。假設您只想篩選 device_ID
以字母 AB 開頭的記錄。FilterCriteria
物件如下所示。
{ "Filters": [ { "Pattern": "{ \"value\" : { \"device_ID\" : [ { \"prefix\": \"AB\" } ] } }" } ] }
補充說明,此處是篩選條件的 Pattern
在純文字 JSON 中擴展的值。
{ "value": { "device_ID": [ { "prefix": "AB" } ] } }
您可以使用主控台、AWS CLI 或 AWS SAM 範本新增篩選條件。
使用自我管理的 Apache Kafka,您也可以篩選訊息為純字串的記錄。假設您想忽略字串為「錯誤」的訊息。FilterCriteria
物件如下所示。
{ "Filters": [ { "Pattern": "{ \"value\" : [ { \"anything-but\": [ \"error\" ] } ] }" } ] }
補充說明,此處是篩選條件的 Pattern
在純文字 JSON 中擴展的值。
{ "value": [ { "anything-but": [ "error" ] } ] }
您可以使用主控台、AWS CLI 或 AWS SAM 範本新增篩選條件。
自我管理的 Apache Kafka 訊息必須是 UTF-8 編碼的字串,可以是純字串或 JSON 格式。這是因為 Lambda 會在套用篩選條件之前,將 Kafka 位元組陣列解碼成 UTF-8。如果您的訊息使用其他編碼方式 (例如 UTF-16 或 ASCII),或者訊息格式與 FilterCriteria
格式不相符,則 Lambda 只會處理中繼資料篩選條件。下表摘要說明特定行為:
傳入訊息 格式 | 訊息屬性的篩選條件模式格式 | 產生的動作 |
---|---|---|
純文字的字串 |
純文字的字串 |
根據您的篩選條件標準之 Lambda 篩選條件。 |
純文字的字串 |
資料屬性沒有篩選條件模式 |
Lambda 篩選條件 (僅限其他中繼資料屬性) 會根據您的篩選條件標準而定。 |
純文字的字串 |
有效的 JSON |
Lambda 篩選條件 (僅限其他中繼資料屬性) 會根據您的篩選條件標準而定。 |
有效的 JSON |
純文字的字串 |
Lambda 篩選條件 (僅限其他中繼資料屬性) 會根據您的篩選條件標準而定。 |
有效的 JSON |
資料屬性沒有篩選條件模式 |
Lambda 篩選條件 (僅限其他中繼資料屬性) 會根據您的篩選條件標準而定。 |
有效的 JSON |
有效的 JSON |
根據您的篩選條件標準之 Lambda 篩選條件。 |
非 UTF-8 編碼字串 |
JSON、純字串或沒有模式 |
Lambda 篩選條件 (僅限其他中繼資料屬性) 會根據您的篩選條件標準而定。 |