Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.
Requisitos previos
Antes de empezar a utilizar KCL 3.x, asegúrese de disponer de lo siguiente:
-
Kit de desarrollo de Java (JDK) 8 o posterior
-
AWS SDK for Java 2.x
-
Maven o Gradle para la gestión de dependencias
KCL recopila métricas de uso de la CPU, como el uso de la CPU del host de procesamiento en el que están trabajando los trabajadores, para equilibrar la carga y lograr un nivel de uso de recursos uniforme entre los trabajadores. Para que KCL pueda recopilar las métricas de uso de la CPU de los trabajadores, debe cumplir los siguientes requisitos previos:
Amazon Elastic Compute Cloud(Amazon EC2)
-
Su sistema operativo debe ser Linux.
-
Debe habilitarlo IMDSv2en su EC2 instancia.
Amazon Elastic Container Service (Amazon ECS) en Amazon EC2
-
Su sistema operativo debe ser Linux.
-
Debe habilitar la versión 4 del punto final de metadatos de tareas de ECS.
-
La versión del agente de contenedores de Amazon ECS debe ser 1.39.0 o posterior.
Amazon ECS en AWS Fargate
-
Debe habilitar la versión 4 del punto final de metadatos de tareas de Fargate. Si usa la versión 1.4.0 o posterior de la plataforma Fargate, está habilitada de forma predeterminada.
-
Plataforma Fargate, versión 1.4.0 o posterior.
Amazon Elastic Kubernetes Service (Amazon EKS) en Amazon EC2
-
Su sistema operativo debe ser Linux.
Amazon EKS en AWS Fargate
-
Plataforma Fargate 1.3.0 o posterior.
importante
Si KCL no puede recopilar las métricas de uso de la CPU de los trabajadores, utilizará el rendimiento por trabajador para asignar los arrendamientos y equilibrar la carga entre los trabajadores de la flota. Para obtener más información, consulte Cómo asigna KCL los arrendamientos a los trabajadores y equilibra la carga.
Instale y añada dependencias
Si usas Maven, agrega la siguiente dependencia a tu pom.xml
archivo. Asegúrese de reemplazar la versión 3.x.x por la última versión de KCL.
<dependency>
<groupId>software.amazon.kinesis</groupId>
<artifactId>amazon-kinesis-client</artifactId>
<version>3.x.x</version> <!-- Use the latest version -->
</dependency>
Si usas Gradle, agrega lo siguiente a tu archivo. build.gradle
Asegúrate de reemplazar la versión 3.x.x por la última versión de KCL.
implementation 'software.amazon.kinesis:amazon-kinesis-client:3.x.x'
Puede buscar la última versión del KCL en el repositorio central de Maven.
Implementar el consumidor
Una aplicación para consumidores de KCL consta de los siguientes componentes clave:
Componentes principales
RecordProcessor
RecordProcessor es el componente principal en el que reside la lógica empresarial para procesar los registros de transmisión de datos de Kinesis. Define el modo en que la aplicación procesa los datos que recibe de la transmisión de Kinesis.
Responsabilidades clave:
-
Inicialice el procesamiento de un fragmento
-
Procese lotes de registros de la transmisión de Kinesis
-
Cierre el procesamiento de un fragmento (por ejemplo, cuando el fragmento se divide o se fusiona, o cuando la concesión se transfiere a otro host)
-
Controle los puntos de control para realizar un seguimiento del progreso
A continuación se muestra un ejemplo de implementación:
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.*;
import software.amazon.kinesis.processor.ShardRecordProcessor;
public class SampleRecordProcessor implements ShardRecordProcessor {
private static final String SHARD_ID_MDC_KEY = "ShardId";
private static final Logger log = LoggerFactory.getLogger(SampleRecordProcessor.class);
private String shardId;
@Override
public void initialize(InitializationInput initializationInput) {
shardId = initializationInput.shardId();
MDC.put(SHARD_ID_MDC_KEY, shardId);
try {
log.info("Initializing @ Sequence: {}", initializationInput.extendedSequenceNumber());
} finally {
MDC.remove(SHARD_ID_MDC_KEY);
}
}
@Override
public void processRecords(ProcessRecordsInput processRecordsInput) {
MDC.put(SHARD_ID_MDC_KEY, shardId);
try {
log.info("Processing {} record(s)", processRecordsInput.records().size());
processRecordsInput.records().forEach(r ->
log.info("Processing record pk: {} -- Seq: {}", r.partitionKey(), r.sequenceNumber())
);
// Checkpoint periodically
processRecordsInput.checkpointer().checkpoint();
} catch (Throwable t) {
log.error("Caught throwable while processing records. Aborting.", t);
} finally {
MDC.remove(SHARD_ID_MDC_KEY);
}
}
@Override
public void leaseLost(LeaseLostInput leaseLostInput) {
MDC.put(SHARD_ID_MDC_KEY, shardId);
try {
log.info("Lost lease, so terminating.");
} finally {
MDC.remove(SHARD_ID_MDC_KEY);
}
}
@Override
public void shardEnded(ShardEndedInput shardEndedInput) {
MDC.put(SHARD_ID_MDC_KEY, shardId);
try {
log.info("Reached shard end checkpointing.");
shardEndedInput.checkpointer().checkpoint();
} catch (ShutdownException | InvalidStateException e) {
log.error("Exception while checkpointing at shard end. Giving up.", e);
} finally {
MDC.remove(SHARD_ID_MDC_KEY);
}
}
@Override
public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
MDC.put(SHARD_ID_MDC_KEY, shardId);
try {
log.info("Scheduler is shutting down, checkpointing.");
shutdownRequestedInput.checkpointer().checkpoint();
} catch (ShutdownException | InvalidStateException e) {
log.error("Exception while checkpointing at requested shutdown. Giving up.", e);
} finally {
MDC.remove(SHARD_ID_MDC_KEY);
}
}
}
La siguiente es una explicación detallada de cada método utilizado en el ejemplo:
initialize (InitializationInputInitializationInput)
-
Propósito: configurar los recursos o estados necesarios para procesar los registros.
-
Cuándo se llama: una vez, cuando KCL asigna un fragmento a este procesador de registros.
-
Puntos clave:
-
initializationInput.shardId()
: El ID del fragmento que gestionará este procesador. -
initializationInput.extendedSequenceNumber()
: El número de secuencia desde el que se iniciará el procesamiento.
-
ProcessRecords () ProcessRecordsInput processRecordsInput
-
Objetivo: procesar los registros entrantes y, opcionalmente, comprobar el progreso de los puntos.
-
Cuando se llama: Repetidamente, siempre y cuando el procesador de registros sea el propietario del arrendamiento del fragmento.
-
Puntos clave:
-
processRecordsInput.records()
: Lista de registros que se van a procesar. -
processRecordsInput.checkpointer()
: Se utiliza para comprobar el progreso. -
Asegúrese de haber gestionado cualquier excepción durante el procesamiento para evitar que la KCL falle.
-
Este método debe ser idempotente, ya que el mismo registro puede procesarse más de una vez en algunos escenarios, como los datos que no han sido objeto de controles antes de que un trabajador se bloquee o se reinicie inesperadamente.
-
Vacíe siempre los datos almacenados en el búfer antes de realizar los controles para garantizar la coherencia de los datos.
-
LeaseLostInput leaseLostInputLeaseLost ()
-
Propósito: Limpiar cualquier recurso específico para procesar este fragmento.
-
Cuándo se llama: cuando otro programador se hace cargo del arrendamiento de este fragmento.
-
Puntos clave:
-
Los puntos de control no están permitidos en este método.
-
sharEnded () ShardEndedInput shardEndedInput
-
Propósito: Finalizar el procesamiento de este fragmento y punto de control.
-
Cuándo se llama: cuando el fragmento se divide o se fusiona, lo que indica que se han procesado todos los datos de este fragmento.
-
Puntos clave:
-
shardEndedInput.checkpointer()
: Se utiliza para realizar el punto de control final. -
Los puntos de control de este método son obligatorios para completar el procesamiento.
-
Si no se vacían los datos y se comprueba aquí, es posible que se pierdan los datos o se duplique el procesamiento cuando se vuelva a abrir el fragmento.
-
ShutdownRequestedInput shutdownRequestedInputShutdownRequested ()
-
Propósito: Controlar y limpiar los recursos cuando KCL se está apagando.
-
Cuándo se llama: cuando KCL se cierra (por ejemplo, cuando la aplicación se cierra).
-
Puntos clave:
-
shutdownRequestedInput.checkpointer()
: Se utiliza para realizar puntos de control antes del cierre. -
Asegúrese de haber implementado los puntos de control en el método para guardar el progreso antes de que la aplicación se detenga.
-
Si no se vacían los datos y el punto de control aquí, se podrían perder los datos o volver a procesar los registros cuando se reinicie la aplicación.
-
importante
KCL 3.x garantiza un menor reprocesamiento de datos cuando el contrato de arrendamiento se transfiere de un trabajador a otro mediante un punto de control antes de cerrar al trabajador anterior. Si no implementa la lógica de puntos de control en el shutdownRequested()
método, no verá este beneficio. Asegúrese de haber implementado una lógica de puntos de control dentro del shutdownRequested()
método.
RecordProcessorFactory
RecordProcessorFactory es responsable de crear nuevas RecordProcessor instancias. KCL usa esta fábrica para crear una nueva RecordProcessor para cada fragmento que la aplicación necesite procesar.
Responsabilidades clave:
-
Cree nuevas RecordProcessor instancias bajo demanda
-
Asegúrese de que cada una RecordProcessor esté inicializada correctamente
El siguiente es un ejemplo de implementación:
import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
public class SampleRecordProcessorFactory implements ShardRecordProcessorFactory {
@Override
public ShardRecordProcessor shardRecordProcessor() {
return new SampleRecordProcessor();
}
}
En este ejemplo, la fábrica crea una nueva SampleRecordProcessor cada vez que se llama a shardRecordProcessor (). Puede ampliarlo para incluir cualquier lógica de inicialización necesaria.
Programador
El programador es un componente de alto nivel que coordina todas las actividades de la aplicación KCL. Es responsable de la organización general del procesamiento de datos.
Responsabilidades clave:
-
Gestione el ciclo de vida de RecordProcessors
-
Gestione la gestión de arrendamientos de fragmentos
-
Coordine los puntos de control
-
Equilibre la carga de procesamiento de fragmentos entre varios trabajadores de su aplicación
-
Gestione correctamente las señales de cierre y cierre de aplicaciones
El programador normalmente se crea e inicia en la aplicación principal. Puede consultar el ejemplo de implementación de Scheduler en la siguiente sección, Aplicación principal para el consumidor.
Aplicación principal para el consumidor
La aplicación principal para el consumidor une todos los componentes. Es responsable de configurar el consumidor de KCL, crear los clientes necesarios, configurar el programador y gestionar el ciclo de vida de la aplicación.
Responsabilidades clave:
-
Configurar clientes de AWS servicio (Kinesis, DynamoDB,) CloudWatch
-
Configure la aplicación KCL
-
Cree e inicie el Scheduler
-
Gestione el cierre de la aplicación
El siguiente es un ejemplo de implementación:
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.KinesisClientUtil;
import software.amazon.kinesis.coordinator.Scheduler;
import java.util.UUID;
public class SampleConsumer {
private final String streamName;
private final Region region;
private final KinesisAsyncClient kinesisClient;
public SampleConsumer(String streamName, Region region) {
this.streamName = streamName;
this.region = region;
this.kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(this.region));
}
public void run() {
DynamoDbAsyncClient dynamoDbAsyncClient = DynamoDbAsyncClient.builder().region(region).build();
CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
ConfigsBuilder configsBuilder = new ConfigsBuilder(
streamName,
streamName,
kinesisClient,
dynamoDbAsyncClient,
cloudWatchClient,
UUID.randomUUID().toString(),
new SampleRecordProcessorFactory()
);
Scheduler scheduler = new Scheduler(
configsBuilder.checkpointConfig(),
configsBuilder.coordinatorConfig(),
configsBuilder.leaseManagementConfig(),
configsBuilder.lifecycleConfig(),
configsBuilder.metricsConfig(),
configsBuilder.processorConfig(),
configsBuilder.retrievalConfig()
);
Thread schedulerThread = new Thread(scheduler);
schedulerThread.setDaemon(true);
schedulerThread.start();
}
public static void main(String[] args) {
String streamName = "your-stream-name"; // replace with your stream name
Region region = Region.US_EAST_1; // replace with your region
new SampleConsumer(streamName, region).run();
}
}
De forma predeterminada, KCL crea un consumidor con ventilación mejorada (EFO) con un rendimiento dedicado. Para obtener más información sobre el Fan-out mejorado, consulte. Desarrolle consumidores con una distribución mejorada con un rendimiento dedicado Si tiene menos de 2 consumidores o no necesita retrasos de propagación de la lectura inferiores a 200 ms, debe establecer la siguiente configuración en el objeto programador para utilizar consumidores de rendimiento compartido:
configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient))
El siguiente código es un ejemplo de cómo crear un objeto planificador que utiliza consumidores de rendimiento compartido:
Importaciones:
import software.amazon.kinesis.retrieval.polling.PollingConfig;
Código:
Scheduler scheduler = new Scheduler(
configsBuilder.checkpointConfig(),
configsBuilder.coordinatorConfig(),
configsBuilder.leaseManagementConfig(),
configsBuilder.lifecycleConfig(),
configsBuilder.metricsConfig(),
configsBuilder.processorConfig(),
configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient))
);/