AWS Lambda
Developer Guide

Using AWS Lambda with Kinesis

You can use a AWS Lambda function to process records in a Amazon Kinesis data stream. With Kinesis, you can collect data from many sources and process them with multiple consumers. Lambda supports standard data stream iterators.

Lambda reads records from the data stream and invokes your function synchronously with an event that contains stream records. Lambda reads records in batches and invokes your function to process records from the batch.

Example Kinesis Record Event

{ "Records": [ { "kinesis": { "partitionKey": "partitionKey-3", "kinesisSchemaVersion": "1.0", "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0IDEyMy4=", "sequenceNumber": "49545115243490985018280067714973144582180062593244200961" }, "eventSource": "aws:kinesis", "eventID": "shardId-000000000000:49545115243490985018280067714973144582180062593244200961", "invokeIdentityArn": "arn:aws:iam::account-id:role/testLEBRole", "eventVersion": "1.0", "eventName": "aws:kinesis:record", "eventSourceARN": "arn:aws:kinesis:us-west-2:35667example:stream/examplestream", "awsRegion": "us-west-2" } ] }

Lambda polls each shard in your Kinesis stream for records at a base rate of once per second. When more records are available, Lambda keeps processing batches until it receives a batch that is smaller than the configured maximum batch size.

If your function returns an error, Lambda retries the batch until processing succeeds or the data expires. Until the issue is resolved, no data in the shard is processed. Handle and record processing errors in your code to avoid stalled shards and potential data loss.

Configuring Your Data Stream and Function

Your Lambda function is a consumer application for your data stream. It processes one batch of records at a time from each shard. A function shares throughput with other consumers, which can be other Lambda functions or applications running on other services.

To increase throughput, add shards to your data stream. Lambda processes records in each shard in order, and stops processing additional records in a shard if your function returns an error. More shards means more batches being processed at once, and lowers the impact of errors on concurrency.

Note

If the total size of the records in a batch exceeds the payload limit, Lambda splits it into smaller batches for processing.

If your function can't scale up to handle one concurrent execution per shard, request a limit increase or reserve concurrency for your function. The concurrency available to your function should match or exceed the number of shards in your Kinesis data stream.

Creating an Event Source Mapping

Create an event source mapping to tell Lambda to send records from your data stream to a Lambda function. To configure your function to read from Kinesis in the Lambda console, create a Kinesis trigger.

To create a trigger

  1. Open the Lambda console Functions page.

  2. Choose a function.

  3. Under Designer, choose a trigger type to add a trigger to your function.

  4. Under Configure triggers, configure the required options and then choose Add.

  5. Choose Save.

Event Source Options

  • Kinesis stream – The Kinesis stream from which to read records.

  • Batch size – The number of records to read from a shard in each batch, up to 10,000. Lambda passes all of the records in the batch to the function in a single call, as long as the total size of the events doesn't exceed the payload limit of 6 MB.

  • Starting position – Process only new records, all existing records, or records created after a certain date.

    • Latest – Process new records added to the stream.

    • Trim horizon – Process all records in the stream.

    • At timestamp – Process records starting from a specific time.

    After processing any existing records, the function is caught up and continues to process new records.

  • Enabled – Disable the event source to stop processing records. Lambda keeps track of the last record processed and resumes processing from that point when re-enabled.

To manage the event source configuration later, choose the trigger in the designer.

Event Source Mapping API

To create the event source mapping with the AWS CLI, use the CreateEventSourceMapping API. The following example uses the AWS Command Line Interface to map a function named my-function to an Kinesis data stream specified by Amazon Resource Name (ARN), with a batch size of 500 hundred, starting from the timestamp in unix time.

$ aws lambda create-event-source-mapping --function-name my-function --no-enabled \ --batch-size 500 --starting-position AT_TIMESTAMP --starting-position-timestamp 1541139109 \ --event-source-arn arn:aws:kinesis:us-east-2:123456789012:stream/my-stream { "UUID": "2b733gdc-8ac3-cdf5-af3a-1827b3b11284", "BatchSize": 500, "EventSourceArn": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-kinesis-stream", "FunctionArn": "arn:aws:lambda:us-east-2:123456789012:function:my-function", "LastModified": 1541139209.351, "LastProcessingResult": "No records processed", "State": "Creating", "StateTransitionReason": "User action" }

The --no-enabled option creates the event source mapping without enabling it. To enable it, use update-event-source-mapping.

$ aws lambda update-event-source-mapping --uuid 2b733gdc-8ac3-cdf5-af3a-1827b3b1128 --enabled { "UUID": "2b733gdc-8ac3-cdf5-af3a-1827b3b1128", "BatchSize": 500, "EventSourceArn": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-kinesis-stream", "FunctionArn": "arn:aws:lambda:us-east-2:123456789012:function:my-function", "LastModified": 1541190239.996, "LastProcessingResult": "No records processed", "State": "Enabling", "StateTransitionReason": "User action" }

To delete the event source mapping, use delete-event-source-mapping.

$ aws lambda delete-event-source-mapping --uuid 2b733gdc-8ac3-cdf5-af3a-1827b3b11284 { "UUID": "2b733gdc-8ac3-cdf5-af3a-1827b3b11284", "BatchSize": 500, "EventSourceArn": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-kinesis-stream", "FunctionArn": "arn:aws:lambda:us-east-2:123456789012:function:my-function", "LastModified": 1541190240.0, "LastProcessingResult": "No records processed", "State": "Deleting", "StateTransitionReason": "User action" }

Execution Role Permissions

Lambda needs the following permissions to manage resources related to your Kinesis stream. Add them to your function's execution role.

The AWSLambdaKinesisExecutionRole managed policy includes these permissions. For more information, see Manage Permissions: Using an IAM Role (Execution Role).

Amazon CloudWatch Metrics

Lambda emits the IteratorAge metric when your function finishes processing a a batch of records. The metric indicates how old the last record in the batch was when processing finished. If your function is processing new events, you can use the iterator age to estimate the latency between when a record is added, and when the function processes it.

An increasing trend in iterator age can indicate issues with your function. For more information, see Using Amazon CloudWatch.