Using event filtering with a self-managed Apache Kafka event source
You can use event filtering to control which records from a stream or queue Lambda sends to your function. For general information about how event filtering works, see Control which events Lambda sends to your function.
This section focuses on event filtering for self-managed Apache Kafka event sources.
Self-managed Apache Kafka event filtering basics
Suppose a producer is writing messages to a topic in your self-managed Apache Kafka cluster, either in valid JSON format or as plain strings. An example record
would look like the following, with the message converted to a Base64 encoded string in the value
field.
{ "mytopic-0":[ { "topic":"mytopic", "partition":0, "offset":15, "timestamp":1545084650987, "timestampType":"CREATE_TIME", "value":"SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "headers":[] } ] }
Suppose your Apache Kafka producer is writing messages to your topic in the following JSON format.
{ "device_ID": "AB1234", "session":{ "start_time": "yyyy-mm-ddThh:mm:ss", "duration": 162 } }
You can use the value
key to filter records. Suppose you wanted to filter only those records where device_ID
begins with the letters AB. The FilterCriteria
object would be as follows.
{ "Filters": [ { "Pattern": "{ \"value\" : { \"device_ID\" : [ { \"prefix\": \"AB\" } ] } }" } ] }
For added clarity, here is the value of the filter's Pattern
expanded in plain JSON.
{ "value": { "device_ID": [ { "prefix": "AB" } ] } }
You can add your filter using the console, AWS CLI or an AWS SAM template.
With self-managed Apache Kafka, you can also filter records where the message is a plain string. Suppose you want to ignore those messages where the string is
"error". The FilterCriteria
object would look as follows.
{ "Filters": [ { "Pattern": "{ \"value\" : [ { \"anything-but\": [ \"error\" ] } ] }" } ] }
For added clarity, here is the value of the filter's Pattern
expanded in plain JSON.
{ "value": [ { "anything-but": [ "error" ] } ] }
You can add your filter using the console, AWS CLI or an AWS SAM template.
Self-managed Apache Kafka messages must be UTF-8 encoded strings, either plain strings or in JSON format. That's because Lambda decodes Kafka byte arrays into UTF-8 before
applying filter criteria. If your messages use another encoding, such as UTF-16 or ASCII, or if the message format doesn't match the
FilterCriteria
format, Lambda processes metadata filters only. The following table summarizes the specific behavior:
Incoming message format | Filter pattern format for message properties | Resulting action |
---|---|---|
Plain string |
Plain string |
Lambda filters based on your filter criteria. |
Plain string |
No filter pattern for data properties |
Lambda filters (on the other metadata properties only) based on your filter criteria. |
Plain string |
Valid JSON |
Lambda filters (on the other metadata properties only) based on your filter criteria. |
Valid JSON |
Plain string |
Lambda filters (on the other metadata properties only) based on your filter criteria. |
Valid JSON |
No filter pattern for data properties |
Lambda filters (on the other metadata properties only) based on your filter criteria. |
Valid JSON |
Valid JSON |
Lambda filters based on your filter criteria. |
Non-UTF-8 encoded string |
JSON, plain string, or no pattern |
Lambda filters (on the other metadata properties only) based on your filter criteria. |