Weitere AWS-SDK-Beispiele sind im GitHub-Repository Beispiele für AWS Doc SDKs
Kinesis-Beispiele unter Verwendung von SDK für Python (Boto3)
Die folgenden Codebeispiele zeigen, wie Sie Aktionen durchführen und gängige Szenarien implementieren, indem Sie AWS SDK für Python (Boto3) mit Kinesis nutzen.
Aktionen sind Codeauszüge aus größeren Programmen und müssen im Kontext ausgeführt werden. Während Aktionen Ihnen zeigen, wie Sie einzelne Servicefunktionen aufrufen, können Sie Aktionen im Kontext der zugehörigen Szenarien anzeigen.
Jedes Beispiel enthält einen Link zum vollständigen Quellcode, wo Sie Anweisungen zum Einrichten und Ausführen des Codes im Kodex finden.
Aktionen
Die folgenden Codebeispiele zeigen, wie CreateStream verwendet wird.
- SDK für Python (Boto3)
-
Anmerkung
Auf GitHub finden Sie noch mehr. Hier finden Sie das vollständige Beispiel und erfahren, wie Sie das AWS-Codebeispiel-Repository
einrichten und ausführen. class KinesisStream: """Encapsulates a Kinesis stream.""" def __init__(self, kinesis_client): """ :param kinesis_client: A Boto3 Kinesis client. """ self.kinesis_client = kinesis_client self.name = None self.details = None self.stream_exists_waiter = kinesis_client.get_waiter("stream_exists") def create(self, name, wait_until_exists=True): """ Creates a stream. :param name: The name of the stream. :param wait_until_exists: When True, waits until the service reports that the stream exists, then queries for its metadata. """ try: self.kinesis_client.create_stream(StreamName=name, ShardCount=1) self.name = name logger.info("Created stream %s.", name) if wait_until_exists: logger.info("Waiting until exists.") self.stream_exists_waiter.wait(StreamName=name) self.describe(name) except ClientError: logger.exception("Couldn't create stream %s.", name) raise-
Weitere API-Informationen finden Sie unter CreateStream in der API-Referenz zum AWS-SDK für Python (Boto3).
-
Die folgenden Codebeispiele zeigen, wie DeleteStream verwendet wird.
- SDK für Python (Boto3)
-
Anmerkung
Auf GitHub finden Sie noch mehr. Hier finden Sie das vollständige Beispiel und erfahren, wie Sie das AWS-Codebeispiel-Repository
einrichten und ausführen. class KinesisStream: """Encapsulates a Kinesis stream.""" def __init__(self, kinesis_client): """ :param kinesis_client: A Boto3 Kinesis client. """ self.kinesis_client = kinesis_client self.name = None self.details = None self.stream_exists_waiter = kinesis_client.get_waiter("stream_exists") def delete(self): """ Deletes a stream. """ try: self.kinesis_client.delete_stream(StreamName=self.name) self._clear() logger.info("Deleted stream %s.", self.name) except ClientError: logger.exception("Couldn't delete stream %s.", self.name) raise-
Weitere API-Informationen finden Sie unter DeleteStream in der API-Referenz zum AWS-SDK für Python (Boto3).
-
Die folgenden Codebeispiele zeigen, wie DescribeStream verwendet wird.
- SDK für Python (Boto3)
-
Anmerkung
Auf GitHub finden Sie noch mehr. Hier finden Sie das vollständige Beispiel und erfahren, wie Sie das AWS-Codebeispiel-Repository
einrichten und ausführen. class KinesisStream: """Encapsulates a Kinesis stream.""" def __init__(self, kinesis_client): """ :param kinesis_client: A Boto3 Kinesis client. """ self.kinesis_client = kinesis_client self.name = None self.details = None self.stream_exists_waiter = kinesis_client.get_waiter("stream_exists") def describe(self, name): """ Gets metadata about a stream. :param name: The name of the stream. :return: Metadata about the stream. """ try: response = self.kinesis_client.describe_stream(StreamName=name) self.name = name self.details = response["StreamDescription"] logger.info("Got stream %s.", name) except ClientError: logger.exception("Couldn't get %s.", name) raise else: return self.details-
Weitere API-Informationen finden Sie unter DescribeStream in der API-Referenz zum AWS-SDK für Python (Boto3).
-
Die folgenden Codebeispiele zeigen, wie GetRecords verwendet wird.
- SDK für Python (Boto3)
-
Anmerkung
Auf GitHub finden Sie noch mehr. Hier finden Sie das vollständige Beispiel und erfahren, wie Sie das AWS-Codebeispiel-Repository
einrichten und ausführen. class KinesisStream: """Encapsulates a Kinesis stream.""" def __init__(self, kinesis_client): """ :param kinesis_client: A Boto3 Kinesis client. """ self.kinesis_client = kinesis_client self.name = None self.details = None self.stream_exists_waiter = kinesis_client.get_waiter("stream_exists") def get_records(self, max_records): """ Gets records from the stream. This function is a generator that first gets a shard iterator for the stream, then uses the shard iterator to get records in batches from the stream. The shard iterator can be accessed through the 'details' property, which is populated using the 'describe' function of this class. Each batch of records is yielded back to the caller until the specified maximum number of records has been retrieved. :param max_records: The maximum number of records to retrieve. :return: Yields the current batch of retrieved records. """ try: response = self.kinesis_client.get_shard_iterator( StreamName=self.name, ShardId=self.details["Shards"][0]["ShardId"], ShardIteratorType="LATEST", ) shard_iter = response["ShardIterator"] record_count = 0 while record_count < max_records: response = self.kinesis_client.get_records( ShardIterator=shard_iter, Limit=10 ) shard_iter = response["NextShardIterator"] records = response["Records"] logger.info("Got %s records.", len(records)) record_count += len(records) yield records except ClientError: logger.exception("Couldn't get records from stream %s.", self.name) raise def describe(self, name): """ Gets metadata about a stream. :param name: The name of the stream. :return: Metadata about the stream. """ try: response = self.kinesis_client.describe_stream(StreamName=name) self.name = name self.details = response["StreamDescription"] logger.info("Got stream %s.", name) except ClientError: logger.exception("Couldn't get %s.", name) raise else: return self.details-
Weitere API-Informationen finden Sie unter GetRecords in der API-Referenz zum AWS-SDK für Python (Boto3).
-
Die folgenden Codebeispiele zeigen, wie PutRecord verwendet wird.
- SDK für Python (Boto3)
-
Anmerkung
Auf GitHub finden Sie noch mehr. Hier finden Sie das vollständige Beispiel und erfahren, wie Sie das AWS-Codebeispiel-Repository
einrichten und ausführen. class KinesisStream: """Encapsulates a Kinesis stream.""" def __init__(self, kinesis_client): """ :param kinesis_client: A Boto3 Kinesis client. """ self.kinesis_client = kinesis_client self.name = None self.details = None self.stream_exists_waiter = kinesis_client.get_waiter("stream_exists") def put_record(self, data, partition_key): """ Puts data into the stream. The data is formatted as JSON before it is passed to the stream. :param data: The data to put in the stream. :param partition_key: The partition key to use for the data. :return: Metadata about the record, including its shard ID and sequence number. """ try: response = self.kinesis_client.put_record( StreamName=self.name, Data=json.dumps(data), PartitionKey=partition_key ) logger.info("Put record in stream %s.", self.name) except ClientError: logger.exception("Couldn't put record in stream %s.", self.name) raise else: return response-
Weitere API-Informationen finden Sie unter PutRecord in der API-Referenz zum AWS-SDK für Python (Boto3).
-
Serverless-Beispiele
Das folgende Codebeispiel veranschaulicht, wie eine Lambda-Funktion implementiert wird, die ein durch den Empfang von Datensätzen aus einem Kinesis-Stream ausgelöstes Ereignis empfängt. Die Funktion ruft die Kinesis-Nutzlast ab, dekodiert von Base64 und protokolliert den Datensatzinhalt.
- SDK für Python (Boto3)
-
Anmerkung
Auf GitHub finden Sie noch mehr. Das vollständige Beispiel sowie eine Anleitung zum Einrichten und Ausführen finden Sie im Repository mit Serverless-Beispielen
. Nutzen eines Kinesis-Ereignisses mit Lambda unter Verwendung von Python.
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 import base64 def lambda_handler(event, context): for record in event['Records']: try: print(f"Processed Kinesis Event - EventID: {record['eventID']}") record_data = base64.b64decode(record['kinesis']['data']).decode('utf-8') print(f"Record Data: {record_data}") # TODO: Do interesting work based on the new data except Exception as e: print(f"An error occurred {e}") raise e print(f"Successfully processed {len(event['Records'])} records.")
Das folgende Codebeispiel zeigt, wie eine teilweise Batch-Antwort für Lambda-Funktionen implementiert wird, die Ereignisse von einem Kinesis-Stream empfangen. Die Funktion meldet die Batch-Elementfehler in der Antwort und signalisiert Lambda, diese Nachrichten später erneut zu versuchen.
- SDK für Python (Boto3)
-
Anmerkung
Auf GitHub finden Sie noch mehr. Das vollständige Beispiel sowie eine Anleitung zum Einrichten und Ausführen finden Sie im Repository mit Serverless-Beispielen
. Melden von Fehlern bei Kinesis-Batchelementen mit Lambda unter Verwendung von Python.
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 def handler(event, context): records = event.get("Records") curRecordSequenceNumber = "" for record in records: try: # Process your record curRecordSequenceNumber = record["kinesis"]["sequenceNumber"] except Exception as e: # Return failed record's sequence number return {"batchItemFailures":[{"itemIdentifier": curRecordSequenceNumber}]} return {"batchItemFailures":[]}