Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.
Desarrollar consumidores con KCL en Java
Requisitos previos
Antes de comenzar con KCL 3.x, asegúrese de que dispone de lo siguiente:
-
Java Development Kit (JDK) 8 o posterior,
-
AWS SDK para Java 2.x
-
Maven o Gradle para la administración de dependencias.
KCL recopila métricas de uso de la CPU, como el uso de la CPU del host de cómputo en el que se ejecutan los procesos de trabajo para equilibrar la carga y lograr un nivel de uso de recursos uniforme entre ellos. Para permitir que KCL recopile métricas de uso de CPU desde los procesos de trabajo, debe cumplir los siguientes requisitos previos:
Amazon Elastic Compute Cloud(Amazon EC2)
-
El sistema operativo debe ser Linux.
-
Debe habilitarlo IMDSv2en su EC2 instancia.
Amazon Elastic Container Service (Amazon ECS) en Amazon EC2
-
El sistema operativo debe ser Linux.
-
Debe habilitar la versión 4 del punto de conexión de metadatos de tareas de ECS.
-
La versión del agente de contenedor de Amazon ECS debe ser 1.39.0 o posterior.
Amazon ECS en AWS Fargate
-
Debe habilitar la versión 4 del punto de conexión de metadatos de tareas de Fargate. Si utiliza la versión 1.4.0 o una posterior de la plataforma Fargate, se habilitará de forma predeterminada.
-
Versión de la plataforma de Fargate 1.4.0 o posterior.
Amazon Elastic Kubernetes Service (Amazon EKS) en Amazon EC2
-
El sistema operativo debe ser Linux.
Amazon EKS en AWS Fargate
-
Plataforma de Fargate 1.3.0 o posterior.
importante
Si KCL no puede recopilar las métricas de uso de la CPU de los procesos de trabajo, volverá a utilizar el rendimiento por proceso de trabajo para asignar los arrendamientos y equilibrar la carga entre los procesos de trabajo de la flota. Para obtener más información, consulte Cómo KCL asigna los arrendamientos a los procesos de trabajo y equilibra la carga.
Instale y agrege dependencias
Si está utilizando Maven, agregue la siguiente dependencia a su archivo pom.xml. Asegúrese de reemplazar la versión 3.x.x por la versión más reciente de KCL.
<dependency> <groupId>software.amazon.kinesis</groupId> <artifactId>amazon-kinesis-client</artifactId> <version>3.x.x</version> <!-- Use the latest version --> </dependency>
Si está utilizando Gradle, agregue lo siguiente a su archivo build.gradle. Asegúrese de reemplazar la versión 3.x.x por la versión más reciente de KCL.
implementation 'software.amazon.kinesis:amazon-kinesis-client:3.x.x'
Puede buscar la versión más reciente del KCL en el Repositorio central de Maven
Implementar el consumidor
Una aplicación de consumidor de KCL consta de los siguientes componentes clave:
Componentes principales
RecordProcessor
RecordProcessor es el componente principal en el que reside la lógica empresarial para procesar los registros de transmisión de datos de Kinesis. Define la forma en que procesa la aplicación los datos que recibe del flujo de Kinesis.
Responsabilidades principales:
-
Inicializar el procesamiento de una partición
-
Procesar lotes de registros del flujo de Kinesis
-
Cerrar el procesamiento de una partición (por ejemplo, cuando la partición se divide o fusiona, o cuando el arrendamiento se transfiere a otro host)
-
Controlar el registro de puntos de control para realizar un seguimiento del progreso
A continuación, se muestra un ejemplo de implementación:
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; import software.amazon.kinesis.exceptions.InvalidStateException; import software.amazon.kinesis.exceptions.ShutdownException; import software.amazon.kinesis.lifecycle.events.*; import software.amazon.kinesis.processor.ShardRecordProcessor; public class SampleRecordProcessor implements ShardRecordProcessor { private static final String SHARD_ID_MDC_KEY = "ShardId"; private static final Logger log = LoggerFactory.getLogger(SampleRecordProcessor.class); private String shardId; @Override public void initialize(InitializationInput initializationInput) { shardId = initializationInput.shardId(); MDC.put(SHARD_ID_MDC_KEY, shardId); try { log.info("Initializing @ Sequence: {}", initializationInput.extendedSequenceNumber()); } finally { MDC.remove(SHARD_ID_MDC_KEY); } } @Override public void processRecords(ProcessRecordsInput processRecordsInput) { MDC.put(SHARD_ID_MDC_KEY, shardId); try { log.info("Processing {} record(s)", processRecordsInput.records().size()); processRecordsInput.records().forEach(r -> log.info("Processing record pk: {} -- Seq: {}", r.partitionKey(), r.sequenceNumber()) ); // Checkpoint periodically processRecordsInput.checkpointer().checkpoint(); } catch (Throwable t) { log.error("Caught throwable while processing records. Aborting.", t); } finally { MDC.remove(SHARD_ID_MDC_KEY); } } @Override public void leaseLost(LeaseLostInput leaseLostInput) { MDC.put(SHARD_ID_MDC_KEY, shardId); try { log.info("Lost lease, so terminating."); } finally { MDC.remove(SHARD_ID_MDC_KEY); } } @Override public void shardEnded(ShardEndedInput shardEndedInput) { MDC.put(SHARD_ID_MDC_KEY, shardId); try { log.info("Reached shard end checkpointing."); shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { log.error("Exception while checkpointing at shard end. Giving up.", e); } finally { MDC.remove(SHARD_ID_MDC_KEY); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { MDC.put(SHARD_ID_MDC_KEY, shardId); try { log.info("Scheduler is shutting down, checkpointing."); shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { log.error("Exception while checkpointing at requested shutdown. Giving up.", e); } finally { MDC.remove(SHARD_ID_MDC_KEY); } } }
Ahora se explicará con detalle cada método utilizado en el ejemplo:
inicializar (InitializationInput) InitializationInput
-
Objetivo: configurar los recursos o estados necesarios para procesar los registros.
-
Cuándo se llama: una vez, cuando KCL asigna una partición a este procesador de registros.
-
Puntos clave:
-
initializationInput.shardId(): el ID de la partición que gestionará este procesador. -
initializationInput.extendedSequenceNumber(): el número de secuencia desde el que se iniciará el procesamiento.
-
ProcessRecords () ProcessRecordsInput processRecordsInput
-
Objetivo: procesar los registros entrantes y, de manera opcional, comprobar el progreso del punto de control.
-
Cuando se llama: repetidamente, siempre y cuando el procesador de registros sea el propietario del arrendamiento de la partición.
-
Puntos clave:
-
processRecordsInput.records(): lista de registros que se van a procesar. -
processRecordsInput.checkpointer(): se utiliza para comprobar el progreso. -
Asegúrese de haber gestionado cualquier excepción durante el procesamiento para evitar que la KCL falle.
-
Este método debe ser idempotente, ya que el mismo registro puede procesarse más de una vez en algunos escenarios, por ejemplo, cuando los datos no se han registrado en puntos de control antes de que el proceso de trabajo fallara o reinicie de forma inesperada.
-
Vacíe siempre los datos almacenados en el búfer antes de registrar los puntos de control para garantizar la coherencia de datos.
-
Arrendamiento perdido () LeaseLostInput leaseLostInput
-
Objetivo: limpiar cualquier recurso específico para procesar esta partición.
-
Cuándo se llama: cuando otro programador se hace cargo del arrendamiento de esta partición.
-
Puntos clave:
-
El registro de puntos de control no está permitido en este método.
-
Fragmentado () ShardEndedInput shardEndedInput
-
Objetivo: finalizar el procesamiento de esta partición y este punto de control.
-
Cuándo se llama: cuando la partición se divide o se fusiona, lo que indica que se han procesado todos los datos de esta partición.
-
Puntos clave:
-
shardEndedInput.checkpointer(): se utiliza para realizar el registro final de puntos de control. -
El registro de puntos de control de este método es obligatorio para completar el procesamiento.
-
Si no se vacían los datos y puntos de control, es posible que se pierdan los datos o se duplique el procesamiento cuando se vuelva a abrir la partición.
-
Cierre solicitado () ShutdownRequestedInput shutdownRequestedInput
-
Objetivo: registrar un punto de control y limpiar los recursos cuando KCL se está cerrando.
-
Cuándo se llama: cuando KCL se cierra (por ejemplo, cuando la aplicación se cierra).
-
Puntos clave:
-
shutdownRequestedInput.checkpointer(): se utiliza para realizar el registro de puntos de control antes del cierre. -
Asegúrese de haber implementado el registro de puntos de control en el método para guardar el progreso antes de que la aplicación se detenga.
-
Si no se vacían los datos y puntos de control, se podrían perder los datos o volver a procesar los registros cuando se reinicie la aplicación.
-
importante
KCL 3.x garantiza un menor reprocesamiento de datos cuando el arrendamiento se transfiere de un proceso de trabajo a otro mediante puntos de control antes de que el proceso de trabajo anterior se cierre. Si no implementa la lógica de registro de puntos de control en el método shutdownRequested(), no verá este beneficio. Asegúrese de haber implementado una lógica de registro de puntos de control dentro del método shutdownRequested().
RecordProcessorFactory
RecordProcessorFactory es responsable de crear nuevas RecordProcessor instancias. KCL usa esta fábrica para crear una nueva RecordProcessor para cada fragmento que la aplicación necesite procesar.
Responsabilidades principales:
-
Cree nuevas RecordProcessor instancias bajo demanda
-
Asegúrese de que cada una RecordProcessor esté inicializada correctamente
A continuación, se muestra un ejemplo de implementación:
import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; public class SampleRecordProcessorFactory implements ShardRecordProcessorFactory { @Override public ShardRecordProcessor shardRecordProcessor() { return new SampleRecordProcessor(); } }
En este ejemplo, la fábrica crea una nueva SampleRecordProcessor cada vez que se llama a shardRecordProcessor (). Puede ampliarlo para incluir cualquier lógica de inicialización necesaria.
Programador
El programador es un componente de alto nivel que coordina todas las actividades de la aplicación KCL. Es responsable de la orquestación general del procesamiento de datos.
Responsabilidades principales:
-
Gestione el ciclo de vida de RecordProcessors
-
Gestionar la administración de los arrendamientos de particiones
-
Coordinar el registro de puntos de control
-
Equilibrar la carga de procesamiento de la partición entre varios procesos de trabajo de su aplicación
-
Gestionar correctamente las señales de cierre y cierre de las aplicaciones
Por lo general, el programar se crea en la aplicación principal y se inicia en ella. Puede consultar el ejemplo de implementación del programador en la siguiente sección, Aplicación de consumo principal
Aplicación de consumo principal
La aplicación de consumo principal une todos los componentes. Es responsable de configurar el consumo de KCL, crear los clientes necesarios, configurar el programador y administrar el ciclo de vida de la aplicación.
Responsabilidades principales:
-
Configurar clientes de AWS servicio (Kinesis, DynamoDB,) CloudWatch
-
Configurar la aplicación de KCL
-
Crear e iniciar el programador
-
Gestionar el apagado de aplicaciones
A continuación, se muestra un ejemplo de implementación:
import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.kinesis.common.ConfigsBuilder; import software.amazon.kinesis.common.KinesisClientUtil; import software.amazon.kinesis.coordinator.Scheduler; import java.util.UUID; public class SampleConsumer { private final String streamName; private final Region region; private final KinesisAsyncClient kinesisClient; public SampleConsumer(String streamName, Region region) { this.streamName = streamName; this.region = region; this.kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(this.region)); } public void run() { DynamoDbAsyncClient dynamoDbAsyncClient = DynamoDbAsyncClient.builder().region(region).build(); CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build(); ConfigsBuilder configsBuilder = new ConfigsBuilder( streamName, streamName, kinesisClient, dynamoDbAsyncClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory() ); Scheduler scheduler = new Scheduler( configsBuilder.checkpointConfig(), configsBuilder.coordinatorConfig(), configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), configsBuilder.processorConfig(), configsBuilder.retrievalConfig() ); Thread schedulerThread = new Thread(scheduler); schedulerThread.setDaemon(true); schedulerThread.start(); } public static void main(String[] args) { String streamName = "your-stream-name"; // replace with your stream name Region region = Region.US_EAST_1; // replace with your region new SampleConsumer(streamName, region).run(); } }
Por defecto, KCL crea un consumidor de distribución ramificada mejorada (EFO) con un rendimiento dedicado. Para obtener más información sobre la distribución ramificada mejorada, consulte Desarrollo de consumidores de distribución ramificada mejorada con rendimiento dedicado. Si tiene menos de 2 consumidores o no necesita retrasos de propagación de la lectura inferiores a 200 ms, debe establecer la siguiente configuración en el objeto del programador para utilizar consumidores de rendimiento compartido:
configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient))
El siguiente código es un ejemplo de cómo crear un objeto del programador que utiliza consumidores de rendimiento compartido:
Importaciones:
import software.amazon.kinesis.retrieval.polling.PollingConfig;
Código:
Scheduler scheduler = new Scheduler( configsBuilder.checkpointConfig(), configsBuilder.coordinatorConfig(), configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), configsBuilder.processorConfig(), configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient)) );/