Seleccione sus preferencias de cookies

Usamos cookies esenciales y herramientas similares que son necesarias para proporcionar nuestro sitio y nuestros servicios. Usamos cookies de rendimiento para recopilar estadísticas anónimas para que podamos entender cómo los clientes usan nuestro sitio y hacer mejoras. Las cookies esenciales no se pueden desactivar, pero puede hacer clic en “Personalizar” o “Rechazar” para rechazar las cookies de rendimiento.

Si está de acuerdo, AWS y los terceros aprobados también utilizarán cookies para proporcionar características útiles del sitio, recordar sus preferencias y mostrar contenido relevante, incluida publicidad relevante. Para aceptar o rechazar todas las cookies no esenciales, haga clic en “Aceptar” o “Rechazar”. Para elegir opciones más detalladas, haga clic en “Personalizar”.

Desarrolle consumidores con KCL en Java

Modo de enfoque
Desarrolle consumidores con KCL en Java - Amazon Kinesis Data Streams

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Requisitos previos

Antes de empezar a utilizar KCL 3.x, asegúrese de disponer de lo siguiente:

  • Kit de desarrollo de Java (JDK) 8 o posterior

  • AWS SDK for Java 2.x

  • Maven o Gradle para la gestión de dependencias

KCL recopila métricas de uso de la CPU, como el uso de la CPU del host de procesamiento en el que están trabajando los trabajadores, para equilibrar la carga y lograr un nivel de uso de recursos uniforme entre los trabajadores. Para que KCL pueda recopilar las métricas de uso de la CPU de los trabajadores, debe cumplir los siguientes requisitos previos:

Amazon Elastic Compute Cloud(Amazon EC2)

  • Su sistema operativo debe ser Linux.

  • Debe habilitarlo IMDSv2en su EC2 instancia.

Amazon Elastic Container Service (Amazon ECS) en Amazon EC2

Amazon ECS en AWS Fargate

Amazon Elastic Kubernetes Service (Amazon EKS) en Amazon EC2

  • Su sistema operativo debe ser Linux.

Amazon EKS en AWS Fargate

  • Plataforma Fargate 1.3.0 o posterior.

importante

Si KCL no puede recopilar las métricas de uso de la CPU de los trabajadores, utilizará el rendimiento por trabajador para asignar los arrendamientos y equilibrar la carga entre los trabajadores de la flota. Para obtener más información, consulte Cómo asigna KCL los arrendamientos a los trabajadores y equilibra la carga.

Instale y añada dependencias

Si usas Maven, agrega la siguiente dependencia a tu pom.xml archivo. Asegúrese de reemplazar la versión 3.x.x por la última versión de KCL.

<dependency> <groupId>software.amazon.kinesis</groupId> <artifactId>amazon-kinesis-client</artifactId> <version>3.x.x</version> <!-- Use the latest version --> </dependency>

Si usas Gradle, agrega lo siguiente a tu archivo. build.gradle Asegúrate de reemplazar la versión 3.x.x por la última versión de KCL.

implementation 'software.amazon.kinesis:amazon-kinesis-client:3.x.x'

Puede buscar la última versión del KCL en el repositorio central de Maven.

Implementar el consumidor

Una aplicación para consumidores de KCL consta de los siguientes componentes clave:

RecordProcessor

RecordProcessor es el componente principal en el que reside la lógica empresarial para procesar los registros de transmisión de datos de Kinesis. Define el modo en que la aplicación procesa los datos que recibe de la transmisión de Kinesis.

Responsabilidades clave:

  • Inicialice el procesamiento de un fragmento

  • Procese lotes de registros de la transmisión de Kinesis

  • Cierre el procesamiento de un fragmento (por ejemplo, cuando el fragmento se divide o se fusiona, o cuando la concesión se transfiere a otro host)

  • Controle los puntos de control para realizar un seguimiento del progreso

A continuación se muestra un ejemplo de implementación:

import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; import software.amazon.kinesis.exceptions.InvalidStateException; import software.amazon.kinesis.exceptions.ShutdownException; import software.amazon.kinesis.lifecycle.events.*; import software.amazon.kinesis.processor.ShardRecordProcessor; public class SampleRecordProcessor implements ShardRecordProcessor { private static final String SHARD_ID_MDC_KEY = "ShardId"; private static final Logger log = LoggerFactory.getLogger(SampleRecordProcessor.class); private String shardId; @Override public void initialize(InitializationInput initializationInput) { shardId = initializationInput.shardId(); MDC.put(SHARD_ID_MDC_KEY, shardId); try { log.info("Initializing @ Sequence: {}", initializationInput.extendedSequenceNumber()); } finally { MDC.remove(SHARD_ID_MDC_KEY); } } @Override public void processRecords(ProcessRecordsInput processRecordsInput) { MDC.put(SHARD_ID_MDC_KEY, shardId); try { log.info("Processing {} record(s)", processRecordsInput.records().size()); processRecordsInput.records().forEach(r -> log.info("Processing record pk: {} -- Seq: {}", r.partitionKey(), r.sequenceNumber()) ); // Checkpoint periodically processRecordsInput.checkpointer().checkpoint(); } catch (Throwable t) { log.error("Caught throwable while processing records. Aborting.", t); } finally { MDC.remove(SHARD_ID_MDC_KEY); } } @Override public void leaseLost(LeaseLostInput leaseLostInput) { MDC.put(SHARD_ID_MDC_KEY, shardId); try { log.info("Lost lease, so terminating."); } finally { MDC.remove(SHARD_ID_MDC_KEY); } } @Override public void shardEnded(ShardEndedInput shardEndedInput) { MDC.put(SHARD_ID_MDC_KEY, shardId); try { log.info("Reached shard end checkpointing."); shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { log.error("Exception while checkpointing at shard end. Giving up.", e); } finally { MDC.remove(SHARD_ID_MDC_KEY); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { MDC.put(SHARD_ID_MDC_KEY, shardId); try { log.info("Scheduler is shutting down, checkpointing."); shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { log.error("Exception while checkpointing at requested shutdown. Giving up.", e); } finally { MDC.remove(SHARD_ID_MDC_KEY); } } }

La siguiente es una explicación detallada de cada método utilizado en el ejemplo:

initialize (InitializationInputInitializationInput)

  • Propósito: configurar los recursos o estados necesarios para procesar los registros.

  • Cuándo se llama: una vez, cuando KCL asigna un fragmento a este procesador de registros.

  • Puntos clave:

    • initializationInput.shardId(): El ID del fragmento que gestionará este procesador.

    • initializationInput.extendedSequenceNumber(): El número de secuencia desde el que se iniciará el procesamiento.

ProcessRecords () ProcessRecordsInput processRecordsInput

  • Objetivo: procesar los registros entrantes y, opcionalmente, comprobar el progreso de los puntos.

  • Cuando se llama: Repetidamente, siempre y cuando el procesador de registros sea el propietario del arrendamiento del fragmento.

  • Puntos clave:

    • processRecordsInput.records(): Lista de registros que se van a procesar.

    • processRecordsInput.checkpointer(): Se utiliza para comprobar el progreso.

    • Asegúrese de haber gestionado cualquier excepción durante el procesamiento para evitar que la KCL falle.

    • Este método debe ser idempotente, ya que el mismo registro puede procesarse más de una vez en algunos escenarios, como los datos que no han sido objeto de controles antes de que un trabajador se bloquee o se reinicie inesperadamente.

    • Vacíe siempre los datos almacenados en el búfer antes de realizar los controles para garantizar la coherencia de los datos.

LeaseLostInput leaseLostInputLeaseLost ()

  • Propósito: Limpiar cualquier recurso específico para procesar este fragmento.

  • Cuándo se llama: cuando otro programador se hace cargo del arrendamiento de este fragmento.

  • Puntos clave:

    • Los puntos de control no están permitidos en este método.

sharEnded () ShardEndedInput shardEndedInput

  • Propósito: Finalizar el procesamiento de este fragmento y punto de control.

  • Cuándo se llama: cuando el fragmento se divide o se fusiona, lo que indica que se han procesado todos los datos de este fragmento.

  • Puntos clave:

    • shardEndedInput.checkpointer(): Se utiliza para realizar el punto de control final.

    • Los puntos de control de este método son obligatorios para completar el procesamiento.

    • Si no se vacían los datos y se comprueba aquí, es posible que se pierdan los datos o se duplique el procesamiento cuando se vuelva a abrir el fragmento.

ShutdownRequestedInput shutdownRequestedInputShutdownRequested ()

  • Propósito: Controlar y limpiar los recursos cuando KCL se está apagando.

  • Cuándo se llama: cuando KCL se cierra (por ejemplo, cuando la aplicación se cierra).

  • Puntos clave:

    • shutdownRequestedInput.checkpointer(): Se utiliza para realizar puntos de control antes del cierre.

    • Asegúrese de haber implementado los puntos de control en el método para guardar el progreso antes de que la aplicación se detenga.

    • Si no se vacían los datos y el punto de control aquí, se podrían perder los datos o volver a procesar los registros cuando se reinicie la aplicación.

importante

KCL 3.x garantiza un menor reprocesamiento de datos cuando el contrato de arrendamiento se transfiere de un trabajador a otro mediante un punto de control antes de cerrar al trabajador anterior. Si no implementa la lógica de puntos de control en el shutdownRequested() método, no verá este beneficio. Asegúrese de haber implementado una lógica de puntos de control dentro del shutdownRequested() método.

RecordProcessorFactory

RecordProcessorFactory es responsable de crear nuevas RecordProcessor instancias. KCL usa esta fábrica para crear una nueva RecordProcessor para cada fragmento que la aplicación necesite procesar.

Responsabilidades clave:

  • Cree nuevas RecordProcessor instancias bajo demanda

  • Asegúrese de que cada una RecordProcessor esté inicializada correctamente

El siguiente es un ejemplo de implementación:

import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; public class SampleRecordProcessorFactory implements ShardRecordProcessorFactory { @Override public ShardRecordProcessor shardRecordProcessor() { return new SampleRecordProcessor(); } }

En este ejemplo, la fábrica crea una nueva SampleRecordProcessor cada vez que se llama a shardRecordProcessor (). Puede ampliarlo para incluir cualquier lógica de inicialización necesaria.

Programador

El programador es un componente de alto nivel que coordina todas las actividades de la aplicación KCL. Es responsable de la organización general del procesamiento de datos.

Responsabilidades clave:

  • Gestione el ciclo de vida de RecordProcessors

  • Gestione la gestión de arrendamientos de fragmentos

  • Coordine los puntos de control

  • Equilibre la carga de procesamiento de fragmentos entre varios trabajadores de su aplicación

  • Gestione correctamente las señales de cierre y cierre de aplicaciones

El programador normalmente se crea e inicia en la aplicación principal. Puede consultar el ejemplo de implementación de Scheduler en la siguiente sección, Aplicación principal para el consumidor.

Aplicación principal para el consumidor

La aplicación principal para el consumidor une todos los componentes. Es responsable de configurar el consumidor de KCL, crear los clientes necesarios, configurar el programador y gestionar el ciclo de vida de la aplicación.

Responsabilidades clave:

  • Configurar clientes de AWS servicio (Kinesis, DynamoDB,) CloudWatch

  • Configure la aplicación KCL

  • Cree e inicie el Scheduler

  • Gestione el cierre de la aplicación

El siguiente es un ejemplo de implementación:

import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.kinesis.common.ConfigsBuilder; import software.amazon.kinesis.common.KinesisClientUtil; import software.amazon.kinesis.coordinator.Scheduler; import java.util.UUID; public class SampleConsumer { private final String streamName; private final Region region; private final KinesisAsyncClient kinesisClient; public SampleConsumer(String streamName, Region region) { this.streamName = streamName; this.region = region; this.kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(this.region)); } public void run() { DynamoDbAsyncClient dynamoDbAsyncClient = DynamoDbAsyncClient.builder().region(region).build(); CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build(); ConfigsBuilder configsBuilder = new ConfigsBuilder( streamName, streamName, kinesisClient, dynamoDbAsyncClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory() ); Scheduler scheduler = new Scheduler( configsBuilder.checkpointConfig(), configsBuilder.coordinatorConfig(), configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), configsBuilder.processorConfig(), configsBuilder.retrievalConfig() ); Thread schedulerThread = new Thread(scheduler); schedulerThread.setDaemon(true); schedulerThread.start(); } public static void main(String[] args) { String streamName = "your-stream-name"; // replace with your stream name Region region = Region.US_EAST_1; // replace with your region new SampleConsumer(streamName, region).run(); } }

De forma predeterminada, KCL crea un consumidor con ventilación mejorada (EFO) con un rendimiento dedicado. Para obtener más información sobre el Fan-out mejorado, consulte. Desarrolle consumidores con una distribución mejorada con un rendimiento dedicado Si tiene menos de 2 consumidores o no necesita retrasos de propagación de la lectura inferiores a 200 ms, debe establecer la siguiente configuración en el objeto programador para utilizar consumidores de rendimiento compartido:

configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient))

El siguiente código es un ejemplo de cómo crear un objeto planificador que utiliza consumidores de rendimiento compartido:

Importaciones:

import software.amazon.kinesis.retrieval.polling.PollingConfig;

Código:

Scheduler scheduler = new Scheduler( configsBuilder.checkpointConfig(), configsBuilder.coordinatorConfig(), configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), configsBuilder.processorConfig(), configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient)) );/
PrivacidadTérminos del sitioPreferencias de cookies
© 2025, Amazon Web Services, Inc o sus afiliados. Todos los derechos reservados.