搭配 Kinesis 事件來源使用事件篩選 - AWS Lambda

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

搭配 Kinesis 事件來源使用事件篩選

您可以使用事件篩選來控制 Lambda 將哪些記錄從串流或佇列中傳送至函數。如需事件篩選運作方式的一般資訊,請參閱控制 Lambda 將哪些事件傳送至您的函數

本節重點介紹 Kinesis 事件來源的事件篩選。

Kinesis 事件篩選基本概念

假設生產者將 JSON 格式的資料放入您的 Kinesis 資料串流中。範例記錄如下所示,JSON 資料在 data 欄位中會轉換為 Base64 編碼字串。

{ "kinesis": { "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692538361571095921575989136588898", "data": "eyJSZWNvcmROdW1iZXIiOiAiMDAwMSIsICJUaW1lU3RhbXAiOiAieXl5eS1tbS1kZFRoaDptbTpzcyIsICJSZXF1ZXN0Q29kZSI6ICJBQUFBIn0=", "approximateArrivalTimestamp": 1545084650.987 }, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", "awsRegion": "us-east-2", "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" }

只要生產者放入串流中的資料是有效的 JSON,您就可以使用事件篩選透過 data 索引鍵來篩選記錄。假設生產者以下列 JSON 格式將記錄放入 Kinesis 串流中。

{ "record": 12345, "order": { "type": "buy", "stock": "ANYCO", "quantity": 1000 } }

若僅篩選訂單類型為「購買」的記錄,FilterCriteria 物件如下所示。

{ "Filters": [ { "Pattern": "{ \"data\" : { \"order\" : { \"type\" : [ \"buy\" ] } } }" } ] }

補充說明,此處是篩選條件的 Pattern 在純文字 JSON 中擴展的值。

{ "data": { "order": { "type": [ "buy" ] } } }

您可以使用 主控台 AWS CLI 或 AWS SAM 範本新增篩選條件。

Console

若要使用主控台新增此篩選條件,請遵循 將篩選條件標準連接至事件來源映射 (主控台) 中的指示,並針對篩選條件標準輸入下列字串。

{ "data" : { "order" : { "type" : [ "buy" ] } } }
AWS CLI

若要使用 AWS Command Line Interface (AWS CLI) 使用這些篩選條件建立新的事件來源映射,請執行下列命令。

aws lambda create-event-source-mapping \ --function-name my-function \ --event-source-arn arn:aws:kinesis:us-east-2:123456789012:stream/my-stream \ --filter-criteria '{"Filters": [{"Pattern": "{ \"data\" : { \"order\" : { \"type\" : [ \"buy\" ] } } }"}]}'

若要將這些篩選條件標準新增到現有事件來源映射,請執行下列命令。

aws lambda update-event-source-mapping \ --uuid "a1b2c3d4-5678-90ab-cdef-11111EXAMPLE" \ --filter-criteria '{"Filters": [{"Pattern": "{ \"data\" : { \"order\" : { \"type\" : [ \"buy\" ] } } }"}]}'
AWS SAM

若要使用 新增此篩選條件 AWS SAM,請將下列程式碼片段新增至事件來源的 YAML 範本。

FilterCriteria: Filters: - Pattern: '{ "data" : { "order" : { "type" : [ "buy" ] } } }'

若要正確篩選來自 Kinesis 來源的事件,資料欄位和資料欄位的篩選條件標準都必須是有效的 JSON 格式。如果其中一個欄位不是有效的 JSON 格式,則 Lambda 會捨棄訊息或擲回例外狀況。下表摘要說明特定行為:

傳入資料格式 資料屬性的篩選條件模式格式 產生的動作

有效的 JSON

有效的 JSON

根據您的篩選條件標準之 Lambda 篩選條件。

有效的 JSON

資料屬性沒有篩選條件模式

Lambda 篩選條件 (僅限其他中繼資料屬性) 會根據您的篩選條件標準而定。

有效的 JSON

非 JSON

Lambda 會在事件來源映射建立或更新時擲回例外狀況。資料屬性的篩選條件模式必須是有效的 JSON 格式。

非 JSON

有效的 JSON

Lambda 捨棄記錄。

非 JSON

資料屬性沒有篩選條件模式

Lambda 篩選條件 (僅限其他中繼資料屬性) 會根據您的篩選條件標準而定。

非 JSON

非 JSON

Lambda 會在事件來源映射建立或更新時擲回例外狀況。資料屬性的篩選條件模式必須是有效的 JSON 格式。

篩選 Kinesis 彙總記錄

使用 Kinesis,可以將多個記錄彙總到單一 Kinesis 資料串流記錄中,以提高資料輸送量。Lambda 只會在您使用 Kinesis 增強型展開傳送時,將篩選條件標準套用至彙總記錄。不支援使用標準 Kinesis 篩選彙總記錄。使用增強型展開傳送時,您可以設定 Kinesis 專用輸送量取用程式,以充當 Lambda 函數的觸發條件。Lambda 接著會篩選彙總記錄,並僅傳遞符合篩選條件標準的記錄。

若要進一步了解 Kinesis 記錄彙總,請參閱「Kinesis Producer Library (KPL) 主要概念」頁面上的彙總部分。若要進一步了解將 Lambda 與 Kinesis 增強型扇出搭配使用,請參閱 AWS 運算部落格上的使用 Amazon Kinesis Data Streams 增強型扇出和 AWS Lambda 提高即時串流處理效能