搭配自我管理的 Apache Kafka 事件來源使用事件篩選 - AWS Lambda

搭配自我管理的 Apache Kafka 事件來源使用事件篩選

您可以使用事件篩選來控制 Lambda 將哪些記錄從串流或佇列中傳送至函數。如需事件篩選運作方式的一般資訊,請參閱控制 Lambda 將哪些事件傳送至您的函數

本節重點介紹自我管理的 Apache Kafka 事件來源的事件篩選。

自我管理的 Apache Kafka 事件篩選基本概念

假設生產者正在將訊息寫入自我管理的 Apache Kafka 叢集中的某個主題,可以是有效的 JSON 格式或純字串。範例記錄如下所示,訊息在 value 欄位中會轉換為 Base64 編碼字串。

{ "mytopic-0":[ { "topic":"mytopic", "partition":0, "offset":15, "timestamp":1545084650987, "timestampType":"CREATE_TIME", "value":"SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "headers":[] } ] }

假設 Apache Kafka 生產者正在以下列 JSON 格式將訊息寫入到您的主題。

{ "device_ID": "AB1234", "session":{ "start_time": "yyyy-mm-ddThh:mm:ss", "duration": 162 } }

您可以使用 value 索引鍵來篩選記錄。假設您只想篩選 device_ID 以字母 AB 開頭的記錄。FilterCriteria 物件如下所示。

{ "Filters": [ { "Pattern": "{ \"value\" : { \"device_ID\" : [ { \"prefix\": \"AB\" } ] } }" } ] }

補充說明,此處是篩選條件的 Pattern 在純文字 JSON 中擴展的值。

{ "value": { "device_ID": [ { "prefix": "AB" } ] } }

您可以使用主控台、AWS CLI 或 AWS SAM 範本新增篩選條件。

Console

若要使用主控台新增此篩選條件,請遵循 將篩選條件標準連接至事件來源映射 (主控台) 中的指示,並針對篩選條件標準輸入下列字串。

{ "value" : { "device_ID" : [ { "prefix": "AB" } ] } }
AWS CLI

若要使用 AWS Command Line Interface (AWS CLI) 來建立具有這些篩選條件標準的新事件來源映射,請執行下列命令。

aws lambda create-event-source-mapping \ --function-name my-function \ --event-source-arn arn:aws:kafka:us-east-2:123456789012:cluster/my-cluster/b-8ac7cc01-5898-482d-be2f-a6b596050ea8 \ --filter-criteria '{"Filters": [{"Pattern": "{ \"value\" : { \"device_ID\" : [ { \"prefix\": \"AB\" } ] } }"}]}'

若要將這些篩選條件標準新增到現有事件來源映射,請執行下列命令。

aws lambda update-event-source-mapping \ --uuid "a1b2c3d4-5678-90ab-cdef-11111EXAMPLE" \ --filter-criteria '{"Filters": [{"Pattern": "{ \"value\" : { \"device_ID\" : [ { \"prefix\": \"AB\" } ] } }"}]}'
AWS SAM

若要使用 AWS SAM 新增此篩選條件,請將下列程式碼片段新增到事件來源的 YAML 範本。

FilterCriteria: Filters: - Pattern: '{ "value" : { "device_ID" : [ { "prefix": "AB" } ] } }'

使用自我管理的 Apache Kafka,您也可以篩選訊息為純字串的記錄。假設您想忽略字串為「錯誤」的訊息。FilterCriteria 物件如下所示。

{ "Filters": [ { "Pattern": "{ \"value\" : [ { \"anything-but\": [ \"error\" ] } ] }" } ] }

補充說明,此處是篩選條件的 Pattern 在純文字 JSON 中擴展的值。

{ "value": [ { "anything-but": [ "error" ] } ] }

您可以使用主控台、AWS CLI 或 AWS SAM 範本新增篩選條件。

Console

若要使用主控台新增此篩選條件,請遵循 將篩選條件標準連接至事件來源映射 (主控台) 中的指示,並針對篩選條件標準輸入下列字串。

{ "value" : [ { "anything-but": [ "error" ] } ] }
AWS CLI

若要使用 AWS Command Line Interface (AWS CLI) 來建立具有這些篩選條件標準的新事件來源映射,請執行下列命令。

aws lambda create-event-source-mapping \ --function-name my-function \ --event-source-arn arn:aws:kafka:us-east-2:123456789012:cluster/my-cluster/b-8ac7cc01-5898-482d-be2f-a6b596050ea8 \ --filter-criteria '{"Filters": [{"Pattern": "{ \"value\" : [ { \"anything-but\": [ \"error\" ] } ] }"}]}'

若要將這些篩選條件標準新增到現有事件來源映射,請執行下列命令。

aws lambda update-event-source-mapping \ --uuid "a1b2c3d4-5678-90ab-cdef-11111EXAMPLE" \ --filter-criteria '{"Filters": [{"Pattern": "{ \"value\" : [ { \"anything-but\": [ \"error\" ] } ] }"}]}'
AWS SAM

若要使用 AWS SAM 新增此篩選條件,請將下列程式碼片段新增到事件來源的 YAML 範本。

FilterCriteria: Filters: - Pattern: '{ "value" : [ { "anything-but": [ "error" ] } ] }'

自我管理的 Apache Kafka 訊息必須是 UTF-8 編碼的字串,可以是純字串或 JSON 格式。這是因為 Lambda 會在套用篩選條件之前,將 Kafka 位元組陣列解碼成 UTF-8。如果您的訊息使用其他編碼方式 (例如 UTF-16 或 ASCII),或者訊息格式與 FilterCriteria 格式不相符,則 Lambda 只會處理中繼資料篩選條件。下表摘要說明特定行為:

傳入訊息 格式 訊息屬性的篩選條件模式格式 產生的動作

純文字的字串

純文字的字串

根據您的篩選條件標準之 Lambda 篩選條件。

純文字的字串

資料屬性沒有篩選條件模式

Lambda 篩選條件 (僅限其他中繼資料屬性) 會根據您的篩選條件標準而定。

純文字的字串

有效的 JSON

Lambda 篩選條件 (僅限其他中繼資料屬性) 會根據您的篩選條件標準而定。

有效的 JSON

純文字的字串

Lambda 篩選條件 (僅限其他中繼資料屬性) 會根據您的篩選條件標準而定。

有效的 JSON

資料屬性沒有篩選條件模式

Lambda 篩選條件 (僅限其他中繼資料屬性) 會根據您的篩選條件標準而定。

有效的 JSON

有效的 JSON

根據您的篩選條件標準之 Lambda 篩選條件。

非 UTF-8 編碼字串

JSON、純字串或沒有模式

Lambda 篩選條件 (僅限其他中繼資料屬性) 會根據您的篩選條件標準而定。