Desarrollo de un consumidor de Kinesis Client Library en Python - Amazon Kinesis Data Streams

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Desarrollo de un consumidor de Kinesis Client Library en Python

Puede utilizar Kinesis Client Library (KCL) para crear aplicaciones que procesen datos de los flujos de datos de Kinesis. Kinesis Client Library está disponible en varios idiomas. En este tema se habla de Python.

La KCL es una biblioteca de Java; el soporte para lenguajes distintos de Java se proporciona mediante una interfaz multilingüe llamada. MultiLangDaemon Este daemon está basado en Java y se ejecuta en segundo plano cuando se utiliza un lenguaje de KCL distinto de Java. Por lo tanto, si instala el KCL para Python y escribe su aplicación de consumo completamente en Python, seguirá necesitando instalar Java en su sistema debido a la MultiLangDaemon. Además, MultiLangDaemon tiene algunos ajustes predeterminados que puede que tengas que personalizar para tu caso de uso, por ejemplo, la AWS región a la que se conecta. Para obtener más información sobre MultiLangDaemon esto GitHub, visita la página del MultiLangDaemon proyecto KCL.

Para descargar la KCL de Python GitHub, vaya a la biblioteca de clientes de Kinesis (Python). Para descargar un código de muestra para una aplicación de consumo de KCL para Python, vaya a la página del proyecto de ejemplo de KCL para Python en. GitHub

Debe completar las siguientes tareas a la hora de implementar una aplicación de consumo de KCL en Python:

Implemente los métodos de clase RecordProcessor

La clase RecordProcess debe ampliar la RecordProcessorBase para implementar los siguientes métodos:

initialize process_records shutdown_requested

Este ejemplo proporciona implementaciones que puede utilizar como punto de partida.

#!/usr/bin/env python # Copyright 2014-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved. # # Licensed under the Amazon Software License (the "License"). # You may not use this file except in compliance with the License. # A copy of the License is located at # # http://aws.amazon.com/asl/ # # or in the "license" file accompanying this file. This file is distributed # on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either # express or implied. See the License for the specific language governing # permissions and limitations under the License. from __future__ import print_function import sys import time from amazon_kclpy import kcl from amazon_kclpy.v3 import processor class RecordProcessor(processor.RecordProcessorBase): """ A RecordProcessor processes data from a shard in a stream. Its methods will be called with this pattern: * initialize will be called once * process_records will be called zero or more times * shutdown will be called if this MultiLangDaemon instance loses the lease to this shard, or the shard ends due a scaling change. """ def __init__(self): self._SLEEP_SECONDS = 5 self._CHECKPOINT_RETRIES = 5 self._CHECKPOINT_FREQ_SECONDS = 60 self._largest_seq = (None, None) self._largest_sub_seq = None self._last_checkpoint_time = None def log(self, message): sys.stderr.write(message) def initialize(self, initialize_input): """ Called once by a KCLProcess before any calls to process_records :param amazon_kclpy.messages.InitializeInput initialize_input: Information about the lease that this record processor has been assigned. """ self._largest_seq = (None, None) self._last_checkpoint_time = time.time() def checkpoint(self, checkpointer, sequence_number=None, sub_sequence_number=None): """ Checkpoints with retries on retryable exceptions. :param amazon_kclpy.kcl.Checkpointer checkpointer: the checkpointer provided to either process_records or shutdown :param str or None sequence_number: the sequence number to checkpoint at. :param int or None sub_sequence_number: the sub sequence number to checkpoint at. """ for n in range(0, self._CHECKPOINT_RETRIES): try: checkpointer.checkpoint(sequence_number, sub_sequence_number) return except kcl.CheckpointError as e: if 'ShutdownException' == e.value: # # A ShutdownException indicates that this record processor should be shutdown. This is due to # some failover event, e.g. another MultiLangDaemon has taken the lease for this shard. # print('Encountered shutdown exception, skipping checkpoint') return elif 'ThrottlingException' == e.value: # # A ThrottlingException indicates that one of our dependencies is is over burdened, e.g. too many # dynamo writes. We will sleep temporarily to let it recover. # if self._CHECKPOINT_RETRIES - 1 == n: sys.stderr.write('Failed to checkpoint after {n} attempts, giving up.\n'.format(n=n)) return else: print('Was throttled while checkpointing, will attempt again in {s} seconds' .format(s=self._SLEEP_SECONDS)) elif 'InvalidStateException' == e.value: sys.stderr.write('MultiLangDaemon reported an invalid state while checkpointing.\n') else: # Some other error sys.stderr.write('Encountered an error while checkpointing, error was {e}.\n'.format(e=e)) time.sleep(self._SLEEP_SECONDS) def process_record(self, data, partition_key, sequence_number, sub_sequence_number): """ Called for each record that is passed to process_records. :param str data: The blob of data that was contained in the record. :param str partition_key: The key associated with this recod. :param int sequence_number: The sequence number associated with this record. :param int sub_sequence_number: the sub sequence number associated with this record. """ #################################### # Insert your processing logic here #################################### self.log("Record (Partition Key: {pk}, Sequence Number: {seq}, Subsequence Number: {sseq}, Data Size: {ds}" .format(pk=partition_key, seq=sequence_number, sseq=sub_sequence_number, ds=len(data))) def should_update_sequence(self, sequence_number, sub_sequence_number): """ Determines whether a new larger sequence number is available :param int sequence_number: the sequence number from the current record :param int sub_sequence_number: the sub sequence number from the current record :return boolean: true if the largest sequence should be updated, false otherwise """ return self._largest_seq == (None, None) or sequence_number > self._largest_seq[0] or \ (sequence_number == self._largest_seq[0] and sub_sequence_number > self._largest_seq[1]) def process_records(self, process_records_input): """ Called by a KCLProcess with a list of records to be processed and a checkpointer which accepts sequence numbers from the records to indicate where in the stream to checkpoint. :param amazon_kclpy.messages.ProcessRecordsInput process_records_input: the records, and metadata about the records. """ try: for record in process_records_input.records: data = record.binary_data seq = int(record.sequence_number) sub_seq = record.sub_sequence_number key = record.partition_key self.process_record(data, key, seq, sub_seq) if self.should_update_sequence(seq, sub_seq): self._largest_seq = (seq, sub_seq) # # Checkpoints every self._CHECKPOINT_FREQ_SECONDS seconds # if time.time() - self._last_checkpoint_time > self._CHECKPOINT_FREQ_SECONDS: self.checkpoint(process_records_input.checkpointer, str(self._largest_seq[0]), self._largest_seq[1]) self._last_checkpoint_time = time.time() except Exception as e: self.log("Encountered an exception while processing records. Exception was {e}\n".format(e=e)) def lease_lost(self, lease_lost_input): self.log("Lease has been lost") def shard_ended(self, shard_ended_input): self.log("Shard has ended checkpointing") shard_ended_input.checkpointer.checkpoint() def shutdown_requested(self, shutdown_requested_input): self.log("Shutdown has been requested, checkpointing.") shutdown_requested_input.checkpointer.checkpoint() if __name__ == "__main__": kcl_process = kcl.KCLProcess(RecordProcessor()) kcl_process.run()

Modificación de las propiedades de configuración

En el ejemplo se proporcionan valores predeterminados para las propiedades de configuración, como los que se muestran en el siguiente script. Puede sobrescribir cualquiera de estas propiedades con sus propios valores.

# The script that abides by the multi-language protocol. This script will # be executed by the MultiLangDaemon, which will communicate with this script # over STDIN and STDOUT according to the multi-language protocol. executableName = sample_kclpy_app.py # The name of an Amazon Kinesis stream to process. streamName = words # Used by the KCL as the name of this application. Will be used as the name # of an Amazon DynamoDB table which will store the lease and checkpoint # information for workers with this application name applicationName = PythonKCLSample # Users can change the credentials provider the KCL will use to retrieve credentials. # The DefaultAWSCredentialsProviderChain checks several other providers, which is # described here: # http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html AWSCredentialsProvider = DefaultAWSCredentialsProviderChain # Appended to the user agent of the KCL. Does not impact the functionality of the # KCL in any other way. processingLanguage = python/2.7 # Valid options at TRIM_HORIZON or LATEST. # See http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html#API_GetShardIterator_RequestSyntax initialPositionInStream = TRIM_HORIZON # The following properties are also available for configuring the KCL Worker that is created # by the MultiLangDaemon. # The KCL defaults to us-east-1 #regionName = us-east-1 # Fail over time in milliseconds. A worker which does not renew it's lease within this time interval # will be regarded as having problems and it's shards will be assigned to other workers. # For applications that have a large number of shards, this msy be set to a higher number to reduce # the number of DynamoDB IOPS required for tracking leases #failoverTimeMillis = 10000 # A worker id that uniquely identifies this worker among all workers using the same applicationName # If this isn't provided a MultiLangDaemon instance will assign a unique workerId to itself. #workerId = # Shard sync interval in milliseconds - e.g. wait for this long between shard sync tasks. #shardSyncIntervalMillis = 60000 # Max records to fetch from Kinesis in a single GetRecords call. #maxRecords = 10000 # Idle time between record reads in milliseconds. #idleTimeBetweenReadsInMillis = 1000 # Enables applications flush/checkpoint (if they have some data "in progress", but don't get new data for while) #callProcessRecordsEvenForEmptyRecordList = false # Interval in milliseconds between polling to check for parent shard completion. # Polling frequently will take up more DynamoDB IOPS (when there are leases for shards waiting on # completion of parent shards). #parentShardPollIntervalMillis = 10000 # Cleanup leases upon shards completion (don't wait until they expire in Kinesis). # Keeping leases takes some tracking/resources (e.g. they need to be renewed, assigned), so by default we try # to delete the ones we don't need any longer. #cleanupLeasesUponShardCompletion = true # Backoff time in milliseconds for Amazon Kinesis Client Library tasks (in the event of failures). #taskBackoffTimeMillis = 500 # Buffer metrics for at most this long before publishing to CloudWatch. #metricsBufferTimeMillis = 10000 # Buffer at most this many metrics before publishing to CloudWatch. #metricsMaxQueueSize = 10000 # KCL will validate client provided sequence numbers with a call to Amazon Kinesis before checkpointing for calls # to RecordProcessorCheckpointer#checkpoint(String) by default. #validateSequenceNumberBeforeCheckpointing = true # The maximum number of active threads for the MultiLangDaemon to permit. # If a value is provided then a FixedThreadPool is used with the maximum # active threads set to the provided value. If a non-positive integer or no # value is provided a CachedThreadPool is used. #maxActiveThreads = 0

Nombre de la aplicación

KCL requiere un nombre de aplicación que sea único entre las aplicaciones y en las tablas de Amazon DynamoDB de la misma región. La biblioteca utiliza el valor del nombre de la aplicación de las siguientes formas:

  • Se supone que los procesos de trabajo que están asociados a este nombre de aplicación operan de forma conjunta en la misma secuencia. Estos procesos de trabajo pueden distribuirse entre varias instancias. Si ejecuta otra instancia del mismo código de aplicación, pero con otro nombre de aplicación, KCL considera que la segunda instancia es una aplicación completamente independiente de la otra que opera en el mismo flujo.

  • KCL crea una tabla de DynamoDB con el nombre de la aplicación y utiliza la tabla para actualizar la información de estado (como los puntos de verificación y el mapeo procesos de trabajo-particiones) para la aplicación. Cada aplicación tiene su propia tabla de DynamoDB. Para obtener más información, consulte Utilice una tabla de arrendamientos para realizar un seguimiento de los fragmentos procesados por la aplicación de consumo KCL.

Credenciales

Debe poner sus credenciales de AWS a disposición de uno de los proveedores de credenciales en la cadena de proveedores de credenciales predeterminada. Puede usar la propiedad AWSCredentialsProvider para configurar un proveedor de credenciales. Si ejecuta su aplicación de consumo en una instancia de Amazon EC2, se recomienda que configure la instancia con un rol de IAM. Las credenciales de AWS que reflejan los permisos asociados a este rol de IAM se ponen a disposición de las aplicaciones de la instancia a través de los metadatos de esta. Esta es la forma más segura de administrar las credenciales para una aplicación consumidora que se ejecute en una instancia EC2.