Using event filtering with a Kinesis event source - AWS Lambda

Using event filtering with a Kinesis event source

You can use event filtering to control which records from a stream or queue Lambda sends to your function. For general information about how event filtering works, see Control which events Lambda sends to your function.

This section focuses on event filtering for Kinesis event sources.

Kinesis event filtering basics

Suppose a producer is putting JSON formatted data into your Kinesis data stream. An example record would look like the following, with the JSON data converted to a Base64 encoded string in the data field.

{ "kinesis": { "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692538361571095921575989136588898", "data": "eyJSZWNvcmROdW1iZXIiOiAiMDAwMSIsICJUaW1lU3RhbXAiOiAieXl5eS1tbS1kZFRoaDptbTpzcyIsICJSZXF1ZXN0Q29kZSI6ICJBQUFBIn0=", "approximateArrivalTimestamp": 1545084650.987 }, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", "awsRegion": "us-east-2", "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" }

As long as the data the producer puts into the stream is valid JSON, you can use event filtering to filter records using the data key. Suppose a producer is putting records into your Kinesis stream in the following JSON format.

{ "record": 12345, "order": { "type": "buy", "stock": "ANYCO", "quantity": 1000 } }

To filter only those records where the order type is “buy,” the FilterCriteria object would be as follows.

{ "Filters": [ { "Pattern": "{ \"data\" : { \"order\" : { \"type\" : [ \"buy\" ] } } }" } ] }

For added clarity, here is the value of the filter's Pattern expanded in plain JSON.

{ "data": { "order": { "type": [ "buy" ] } } }

You can add your filter using the console, AWS CLI or an AWS SAM template.

Console

To add this filter using the console, follow the instructions in Attaching filter criteria to an event source mapping (console) and enter the following string for the Filter criteria.

{ "data" : { "order" : { "type" : [ "buy" ] } } }
AWS CLI

To create a new event source mapping with these filter criteria using the AWS Command Line Interface (AWS CLI), run the following command.

aws lambda create-event-source-mapping \ --function-name my-function \ --event-source-arn arn:aws:kinesis:us-east-2:123456789012:stream/my-stream \ --filter-criteria '{"Filters": [{"Pattern": "{ \"data\" : { \"order\" : { \"type\" : [ \"buy\" ] } } }"}]}'

To add these filter criteria to an existing event source mapping, run the following command.

aws lambda update-event-source-mapping \ --uuid "a1b2c3d4-5678-90ab-cdef-11111EXAMPLE" \ --filter-criteria '{"Filters": [{"Pattern": "{ \"data\" : { \"order\" : { \"type\" : [ \"buy\" ] } } }"}]}'
AWS SAM

To add this filter using AWS SAM, add the following snippet to the YAML template for your event source.

FilterCriteria: Filters: - Pattern: '{ "data" : { "order" : { "type" : [ "buy" ] } } }'

To properly filter events from Kinesis sources, both the data field and your filter criteria for the data field must be in valid JSON format. If either field isn't in a valid JSON format, Lambda drops the message or throws an exception. The following table summarizes the specific behavior:

Incoming data format Filter pattern format for data properties Resulting action

Valid JSON

Valid JSON

Lambda filters based on your filter criteria.

Valid JSON

No filter pattern for data properties

Lambda filters (on the other metadata properties only) based on your filter criteria.

Valid JSON

Non-JSON

Lambda throws an exception at the time of the event source mapping creation or update. The filter pattern for data properties must be in a valid JSON format.

Non-JSON

Valid JSON

Lambda drops the record.

Non-JSON

No filter pattern for data properties

Lambda filters (on the other metadata properties only) based on your filter criteria.

Non-JSON

Non-JSON

Lambda throws an exception at the time of the event source mapping creation or update. The filter pattern for data properties must be in a valid JSON format.

Filtering Kinesis aggregated records

With Kinesis, you can aggregate multiple records into a single Kinesis Data Streams record to increase your data throughput. Lambda can only apply filter criteria to aggregated records when you use Kinesis enhanced fan-out. Filtering aggregated records with standard Kinesis isn't supported. When using enhanced fan-out, you configure a Kinesis dedicated-throughput consumer to act as the trigger for your Lambda function. Lambda then filters the aggregated records and passes only those records that meet your filter criteria.

To learn more about Kinesis record aggregation, refer to the Aggregation section on the Kinesis Producer Library (KPL) Key Concepts page. To Learn more about using Lambda with Kinesis enhanced fan-out, see Increasing real-time stream processing performance with Amazon Kinesis Data Streams enhanced fan-out and AWS Lambda on the AWS compute blog.