Kinesis examples using SDK for Python (Boto3) - AWS SDK Code Examples

There are more AWS SDK examples available in the AWS Doc SDK Examples GitHub repo.

Kinesis examples using SDK for Python (Boto3)

The following code examples show you how to perform actions and implement common scenarios by using the AWS SDK for Python (Boto3) with Kinesis.

Actions are code excerpts from larger programs and must be run in context. While actions show you how to call individual service functions, you can see actions in context in their related scenarios.

Each example includes a link to the complete source code, where you can find instructions on how to set up and run the code in context.

Actions

The following code example shows how to use CreateStream.

SDK for Python (Boto3)
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository.

class KinesisStream: """Encapsulates a Kinesis stream.""" def __init__(self, kinesis_client): """ :param kinesis_client: A Boto3 Kinesis client. """ self.kinesis_client = kinesis_client self.name = None self.details = None self.stream_exists_waiter = kinesis_client.get_waiter("stream_exists") def create(self, name, wait_until_exists=True): """ Creates a stream. :param name: The name of the stream. :param wait_until_exists: When True, waits until the service reports that the stream exists, then queries for its metadata. """ try: self.kinesis_client.create_stream(StreamName=name, ShardCount=1) self.name = name logger.info("Created stream %s.", name) if wait_until_exists: logger.info("Waiting until exists.") self.stream_exists_waiter.wait(StreamName=name) self.describe(name) except ClientError: logger.exception("Couldn't create stream %s.", name) raise
  • For API details, see CreateStream in AWS SDK for Python (Boto3) API Reference.

The following code example shows how to use DeleteStream.

SDK for Python (Boto3)
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository.

class KinesisStream: """Encapsulates a Kinesis stream.""" def __init__(self, kinesis_client): """ :param kinesis_client: A Boto3 Kinesis client. """ self.kinesis_client = kinesis_client self.name = None self.details = None self.stream_exists_waiter = kinesis_client.get_waiter("stream_exists") def delete(self): """ Deletes a stream. """ try: self.kinesis_client.delete_stream(StreamName=self.name) self._clear() logger.info("Deleted stream %s.", self.name) except ClientError: logger.exception("Couldn't delete stream %s.", self.name) raise
  • For API details, see DeleteStream in AWS SDK for Python (Boto3) API Reference.

The following code example shows how to use DescribeStream.

SDK for Python (Boto3)
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository.

class KinesisStream: """Encapsulates a Kinesis stream.""" def __init__(self, kinesis_client): """ :param kinesis_client: A Boto3 Kinesis client. """ self.kinesis_client = kinesis_client self.name = None self.details = None self.stream_exists_waiter = kinesis_client.get_waiter("stream_exists") def describe(self, name): """ Gets metadata about a stream. :param name: The name of the stream. :return: Metadata about the stream. """ try: response = self.kinesis_client.describe_stream(StreamName=name) self.name = name self.details = response["StreamDescription"] logger.info("Got stream %s.", name) except ClientError: logger.exception("Couldn't get %s.", name) raise else: return self.details
  • For API details, see DescribeStream in AWS SDK for Python (Boto3) API Reference.

The following code example shows how to use GetRecords.

SDK for Python (Boto3)
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository.

class KinesisStream: """Encapsulates a Kinesis stream.""" def __init__(self, kinesis_client): """ :param kinesis_client: A Boto3 Kinesis client. """ self.kinesis_client = kinesis_client self.name = None self.details = None self.stream_exists_waiter = kinesis_client.get_waiter("stream_exists") def get_records(self, max_records): """ Gets records from the stream. This function is a generator that first gets a shard iterator for the stream, then uses the shard iterator to get records in batches from the stream. The shard iterator can be accessed through the 'details' property, which is populated using the 'describe' function of this class. Each batch of records is yielded back to the caller until the specified maximum number of records has been retrieved. :param max_records: The maximum number of records to retrieve. :return: Yields the current batch of retrieved records. """ try: response = self.kinesis_client.get_shard_iterator( StreamName=self.name, ShardId=self.details["Shards"][0]["ShardId"], ShardIteratorType="LATEST", ) shard_iter = response["ShardIterator"] record_count = 0 while record_count < max_records: response = self.kinesis_client.get_records( ShardIterator=shard_iter, Limit=10 ) shard_iter = response["NextShardIterator"] records = response["Records"] logger.info("Got %s records.", len(records)) record_count += len(records) yield records except ClientError: logger.exception("Couldn't get records from stream %s.", self.name) raise def describe(self, name): """ Gets metadata about a stream. :param name: The name of the stream. :return: Metadata about the stream. """ try: response = self.kinesis_client.describe_stream(StreamName=name) self.name = name self.details = response["StreamDescription"] logger.info("Got stream %s.", name) except ClientError: logger.exception("Couldn't get %s.", name) raise else: return self.details
  • For API details, see GetRecords in AWS SDK for Python (Boto3) API Reference.

The following code example shows how to use PutRecord.

SDK for Python (Boto3)
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository.

class KinesisStream: """Encapsulates a Kinesis stream.""" def __init__(self, kinesis_client): """ :param kinesis_client: A Boto3 Kinesis client. """ self.kinesis_client = kinesis_client self.name = None self.details = None self.stream_exists_waiter = kinesis_client.get_waiter("stream_exists") def put_record(self, data, partition_key): """ Puts data into the stream. The data is formatted as JSON before it is passed to the stream. :param data: The data to put in the stream. :param partition_key: The partition key to use for the data. :return: Metadata about the record, including its shard ID and sequence number. """ try: response = self.kinesis_client.put_record( StreamName=self.name, Data=json.dumps(data), PartitionKey=partition_key ) logger.info("Put record in stream %s.", self.name) except ClientError: logger.exception("Couldn't put record in stream %s.", self.name) raise else: return response
  • For API details, see PutRecord in AWS SDK for Python (Boto3) API Reference.

Serverless examples

The following code example shows how to implement a Lambda function that receives an event triggered by receiving records from a Kinesis stream. The function retrieves the Kinesis payload, decodes from Base64, and logs the record contents.

SDK for Python (Boto3)
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the Serverless examples repository.

Consuming a Kinesis event with Lambda using Python.

# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 import base64 def lambda_handler(event, context): for record in event['Records']: try: print(f"Processed Kinesis Event - EventID: {record['eventID']}") record_data = base64.b64decode(record['kinesis']['data']).decode('utf-8') print(f"Record Data: {record_data}") # TODO: Do interesting work based on the new data except Exception as e: print(f"An error occurred {e}") raise e print(f"Successfully processed {len(event['Records'])} records.")

The following code example shows how to implement partial batch response for Lambda functions that receive events from a Kinesis stream. The function reports the batch item failures in the response, signaling to Lambda to retry those messages later.

SDK for Python (Boto3)
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the Serverless examples repository.

Reporting Kinesis batch item failures with Lambda using Python.

# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 def handler(event, context): records = event.get("Records") curRecordSequenceNumber = "" for record in records: try: # Process your record curRecordSequenceNumber = record["kinesis"]["sequenceNumber"] except Exception as e: # Return failed record's sequence number return {"batchItemFailures":[{"itemIdentifier": curRecordSequenceNumber}]} return {"batchItemFailures":[]}