Using event filtering with a Kinesis event source
You can use event filtering to control which records from a stream or queue Lambda sends to your function. For general information about how event filtering works, see Control which events Lambda sends to your function.
This section focuses on event filtering for Kinesis event sources.
Kinesis event filtering basics
Suppose a producer is putting JSON formatted data into your Kinesis data stream. An example record would look like the following, with the
JSON data converted to a Base64 encoded string in the data
field.
{ "kinesis": { "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692538361571095921575989136588898", "data": "eyJSZWNvcmROdW1iZXIiOiAiMDAwMSIsICJUaW1lU3RhbXAiOiAieXl5eS1tbS1kZFRoaDptbTpzcyIsICJSZXF1ZXN0Q29kZSI6ICJBQUFBIn0=", "approximateArrivalTimestamp": 1545084650.987 }, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", "awsRegion": "us-east-2", "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" }
As long as the data the producer puts into the stream is valid JSON, you can use event filtering to filter records using the data
key. Suppose a producer is putting records into your Kinesis stream in the following JSON format.
{ "record": 12345, "order": { "type": "buy", "stock": "ANYCO", "quantity": 1000 } }
To filter only those records where the order type is “buy,” the FilterCriteria
object would be as follows.
{ "Filters": [ { "Pattern": "{ \"data\" : { \"order\" : { \"type\" : [ \"buy\" ] } } }" } ] }
For added clarity, here is the value of the filter's Pattern
expanded in plain JSON.
{ "data": { "order": { "type": [ "buy" ] } } }
You can add your filter using the console, AWS CLI or an AWS SAM template.
To properly filter events from Kinesis sources, both the data field and your filter criteria for the data field must be in valid JSON format. If either field isn't in a valid JSON format, Lambda drops the message or throws an exception. The following table summarizes the specific behavior:
Incoming data format | Filter pattern format for data properties | Resulting action |
---|---|---|
Valid JSON |
Valid JSON |
Lambda filters based on your filter criteria. |
Valid JSON |
No filter pattern for data properties |
Lambda filters (on the other metadata properties only) based on your filter criteria. |
Valid JSON |
Non-JSON |
Lambda throws an exception at the time of the event source mapping creation or update. The filter pattern for data properties must be in a valid JSON format. |
Non-JSON |
Valid JSON |
Lambda drops the record. |
Non-JSON |
No filter pattern for data properties |
Lambda filters (on the other metadata properties only) based on your filter criteria. |
Non-JSON |
Non-JSON |
Lambda throws an exception at the time of the event source mapping creation or update. The filter pattern for data properties must be in a valid JSON format. |
Filtering Kinesis aggregated records
With Kinesis, you can aggregate multiple records into a single Kinesis Data Streams record to increase your data throughput. Lambda can only apply filter criteria to aggregated records when you use Kinesis enhanced fan-out. Filtering aggregated records with standard Kinesis isn't supported. When using enhanced fan-out, you configure a Kinesis dedicated-throughput consumer to act as the trigger for your Lambda function. Lambda then filters the aggregated records and passes only those records that meet your filter criteria.
To learn more about Kinesis record aggregation, refer to the Aggregation
section on the Kinesis Producer Library (KPL) Key Concepts page. To Learn more about using Lambda with Kinesis enhanced fan-out, see
Increasing real-time stream processing performance with Amazon Kinesis Data Streams enhanced fan-out and AWS Lambda