Desenvolver consumidores com a KCL em Java - Amazon Kinesis Data Streams

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Desenvolver consumidores com a KCL em Java

Pré-requisitos

Antes de começar a usar a KCL 3.x, certifique-se de ter os pré-requisitos apresentados a seguir:

  • Java Development Kit (JDK) 8 ou posterior

  • AWS SDK para Java 2. x

  • Maven ou Gradle para gerenciamento de dependências

A KCL coleta métricas de utilização da CPU, como a utilização da CPU, do host de computação executado pelos operadores para equilibrar a carga e alcançar um nível uniforme de utilização de recursos entre os operadores. Para permitir que a KCL colete métricas de utilização da CPU dos operadores, você deve atender aos seguintes pré-requisitos:

Amazon Elastic Compute Cloud(Amazon EC2)

  • Seu sistema operacional deve ser Linux OS.

  • Você deve habilitar IMDSv2em sua EC2 instância.

Amazon Elastic Container Service (Amazon ECS) na Amazon EC2

Amazon ECS em AWS Fargate

Amazon Elastic Kubernetes Service (Amazon EKS) na Amazon EC2

  • Seu sistema operacional deve ser Linux OS.

Amazon EKS em AWS Fargate

  • Plataforma 1.3.0 ou posterior do Fargate.

Importante

Se a KCL não conseguir coletar dos operadores as métricas de utilização da CPU, a KCL voltará a usar a throughput por operador para atribuir concessões e equilibrar a carga entre os operadores da frota. Para obter mais informações, consulte Como a KCL atribui concessões aos operadores e equilibra a carga.

Instalar e adicionar dependências

Se estiver usando Maven, adicione a dependência a seguir ao seu arquivo pom.xml. Certifique-se de ter substituído 3.x.x pela versão mais recente da KCL.

<dependency> <groupId>software.amazon.kinesis</groupId> <artifactId>amazon-kinesis-client</artifactId> <version>3.x.x</version> <!-- Use the latest version --> </dependency>

Se você estiver usando Gradle, adicione o seguinte ao seu arquivo build.gradle. Certifique-se de ter substituído 3.x.x pela versão mais recente da KCL.

implementation 'software.amazon.kinesis:amazon-kinesis-client:3.x.x'

A versão mais recente da KCL pode ser obtida no Repositório central do Maven.

Implementar o consumidor

Uma aplicação de consumo da KCL consiste nos seguintes componentes principais:

RecordProcessor

RecordProcessor é o componente principal em que reside sua lógica de negócios para processar registros de stream de dados do Kinesis. Ele define como sua aplicação processa os dados que recebe do fluxo do Kinesis.

Principais responsabilidades:

  • Inicializar o processamento de um fragmento

  • Processar lotes de registros do fluxo do Kinesis

  • Encerrar o processamento de um fragmento (por exemplo, quando o fragmento é dividido ou mesclado ou ainda quando a concessão é entregue a outro host)

  • Tratar do ponto de verificação para acompanhar o progresso

A seguir, um exemplo de implementação:

import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; import software.amazon.kinesis.exceptions.InvalidStateException; import software.amazon.kinesis.exceptions.ShutdownException; import software.amazon.kinesis.lifecycle.events.*; import software.amazon.kinesis.processor.ShardRecordProcessor; public class SampleRecordProcessor implements ShardRecordProcessor { private static final String SHARD_ID_MDC_KEY = "ShardId"; private static final Logger log = LoggerFactory.getLogger(SampleRecordProcessor.class); private String shardId; @Override public void initialize(InitializationInput initializationInput) { shardId = initializationInput.shardId(); MDC.put(SHARD_ID_MDC_KEY, shardId); try { log.info("Initializing @ Sequence: {}", initializationInput.extendedSequenceNumber()); } finally { MDC.remove(SHARD_ID_MDC_KEY); } } @Override public void processRecords(ProcessRecordsInput processRecordsInput) { MDC.put(SHARD_ID_MDC_KEY, shardId); try { log.info("Processing {} record(s)", processRecordsInput.records().size()); processRecordsInput.records().forEach(r -> log.info("Processing record pk: {} -- Seq: {}", r.partitionKey(), r.sequenceNumber()) ); // Checkpoint periodically processRecordsInput.checkpointer().checkpoint(); } catch (Throwable t) { log.error("Caught throwable while processing records. Aborting.", t); } finally { MDC.remove(SHARD_ID_MDC_KEY); } } @Override public void leaseLost(LeaseLostInput leaseLostInput) { MDC.put(SHARD_ID_MDC_KEY, shardId); try { log.info("Lost lease, so terminating."); } finally { MDC.remove(SHARD_ID_MDC_KEY); } } @Override public void shardEnded(ShardEndedInput shardEndedInput) { MDC.put(SHARD_ID_MDC_KEY, shardId); try { log.info("Reached shard end checkpointing."); shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { log.error("Exception while checkpointing at shard end. Giving up.", e); } finally { MDC.remove(SHARD_ID_MDC_KEY); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { MDC.put(SHARD_ID_MDC_KEY, shardId); try { log.info("Scheduler is shutting down, checkpointing."); shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { log.error("Exception while checkpointing at requested shutdown. Giving up.", e); } finally { MDC.remove(SHARD_ID_MDC_KEY); } } }

Veja a seguir uma explicação detalhada de cada método usado no exemplo:

inicializar (InitializationInputInitializationInput)

  • Objetivo: configurar todos os recursos ou estados necessários para processar registros.

  • Quando é chamado: uma vez, quando a KCL atribui um fragmento a esse processador de registros.

  • Principais pontos:

    • initializationInput.shardId(): a ID do fragmento que esse processador manipulará.

    • initializationInput.extendedSequenceNumber(): o número sequencial a partir do qual o processamento será iniciado.

processRecords () ProcessRecordsInput processRecordsInput

  • Objetivo: processar os registros recebidos e, opcionalmente, verificar o progresso.

  • Quando é chamado: repetidamente, desde que o processador de registros mantenha a concessão do fragmento.

  • Principais pontos:

    • processRecordsInput.records(): lista de registros a serem processados.

    • processRecordsInput.checkpointer(): usado para verificar o progresso.

    • Verifique se todas as exceções foram tratadas durante o processamento para evitar que a KCL falhe.

    • Esse método deve ser idempotente, pois o mesmo registro pode ser processado mais de uma vez em alguns cenários, como dados que não foram verificados antes de falhas ou reinicializações inesperadas do operador.

    • Sempre limpe todos os dados armazenados em buffer antes de verificar para garantir a consistência de dados.

Locação perdida () LeaseLostInput leaseLostInput

  • Objetivo: limpar todos os recursos específicos para o processamento desse fragmento.

  • Quando é chamado: quando outro Scheduler assume a concessão desse fragmento.

  • Principais pontos:

    • A verificação não é permitida neste método.

Encerrado () ShardEndedInput shardEndedInput

  • Objetivo: concluir o processamento desse fragmento e verificar.

  • Quando é chamado: quando o fragmento é dividido ou mesclado, indicando que todos os dados desse fragmento foram processados.

  • Principais pontos:

    • shardEndedInput.checkpointer(): usado para realizar a verificação final.

    • A verificação nesse método é obrigatória para concluir o processamento.

    • Deixar de liberar os dados e fazer a verificação aqui pode resultar na perda de dados ou no processamento duplicado quando o fragmento for reaberto.

Desligamento solicitado () ShutdownRequestedInput shutdownRequestedInput

  • Objetivo: verifique e limpe os recursos quando a KCL estiver desligada.

  • Quando é chamado: quando a KCL está sendo encerrada. Por exemplo, quando a aplicação está sendo encerrada).

  • Principais pontos:

    • shutdownRequestedInput.checkpointer(): usado para realizar o ponto de verificação antes do desligamento.

    • Implemente o ponto de verificação no método para que o andamento seja salvo antes que a aplicação pare.

    • A falha na liberação dos dados e na implementação do ponto de verificação aqui pode resultar na perda de dados ou no reprocessamento de registros quando a aplicação for reiniciada.

Importante

A KCL 3.x garante menos reprocessamento de dados quando a concessão é passada de um operador para outro por meio de um ponto de verificação antes que o operador anterior seja desligado. Se não implementar a lógica do ponto de verificação no método shutdownRequested(), você não obterá esse benefício. Implemente uma lógica de ponto de verificação dentro do método shutdownRequested().

RecordProcessorFactory

RecordProcessorFactory é responsável pela criação de novas RecordProcessor instâncias. A KCL usa essa fábrica para criar um novo RecordProcessor para cada fragmento que o aplicativo precisa processar.

Principais responsabilidades:

  • Crie novas RecordProcessor instâncias sob demanda

  • Certifique-se de que cada um RecordProcessor esteja inicializado corretamente

A seguir, um exemplo de implementação:

import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; public class SampleRecordProcessorFactory implements ShardRecordProcessorFactory { @Override public ShardRecordProcessor shardRecordProcessor() { return new SampleRecordProcessor(); } }

Neste exemplo, a fábrica cria um novo SampleRecordProcessor cada vez que shardRecordProcessor () é chamado. Isso pode ser estendido para incluir qualquer lógica de inicialização necessária.

Scheduler

O Scheduler é um componente de alto nível que coordena todas as atividades da aplicação KCL. Ele é responsável pela orquestração geral do processamento de dados.

Principais responsabilidades:

  • Gerencie o ciclo de vida do RecordProcessors

  • Gerenciar o gerenciamento de concessão para fragmentos

  • Coordenar os pontos de verificação

  • Equilibrar a carga de processamento de fragmentos entre vários operadores da sua aplicação

  • Gerenciar os sinais de desligamento normal e encerramento da aplicação

Normalmente, o Agendador é criado e iniciado na aplicação principal. Você pode verificar o exemplo de implementação do Agendador na seção a seguir: Aplicação de consumo principal.

Aplicação de consumo principal

A aplicação de consumo principal une todos os componentes. Ela é responsável por configurar o consumidor da KCL, criar os clientes necessários, configurar o Agendador e gerenciar o ciclo de vida da aplicação.

Principais responsabilidades:

  • Configurar clientes AWS de serviço (Kinesis, DynamoDB,) CloudWatch

  • Configurar a aplicação KCL

  • Criar e iniciar o Agendador

  • Controlar o desligamento da aplicação

A seguir, um exemplo de implementação:

import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.kinesis.common.ConfigsBuilder; import software.amazon.kinesis.common.KinesisClientUtil; import software.amazon.kinesis.coordinator.Scheduler; import java.util.UUID; public class SampleConsumer { private final String streamName; private final Region region; private final KinesisAsyncClient kinesisClient; public SampleConsumer(String streamName, Region region) { this.streamName = streamName; this.region = region; this.kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(this.region)); } public void run() { DynamoDbAsyncClient dynamoDbAsyncClient = DynamoDbAsyncClient.builder().region(region).build(); CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build(); ConfigsBuilder configsBuilder = new ConfigsBuilder( streamName, streamName, kinesisClient, dynamoDbAsyncClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory() ); Scheduler scheduler = new Scheduler( configsBuilder.checkpointConfig(), configsBuilder.coordinatorConfig(), configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), configsBuilder.processorConfig(), configsBuilder.retrievalConfig() ); Thread schedulerThread = new Thread(scheduler); schedulerThread.setDaemon(true); schedulerThread.start(); } public static void main(String[] args) { String streamName = "your-stream-name"; // replace with your stream name Region region = Region.US_EAST_1; // replace with your region new SampleConsumer(streamName, region).run(); } }

A KCL cria um consumidor de distribuição avançada (Enhanced Fan-out, EFO) com throughput dedicada por padrão. Para obter mais informações sobre distribuição avançada, consulte Desenvolver consumidores de distribuição avançada com throughput dedicada. Se você tiver menos de 2 consumidores ou não precisar de atrasos de propagação de leitura abaixo de 200 ms, defina a seguinte configuração no objeto do agendador para usar consumidores de throughput compartilhada:

configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient))

O código a seguir é um exemplo de criação de um objeto do agendador que usa consumidores de throughput compartilhada:

Importações:

import software.amazon.kinesis.retrieval.polling.PollingConfig;

Código:

Scheduler scheduler = new Scheduler( configsBuilder.checkpointConfig(), configsBuilder.coordinatorConfig(), configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), configsBuilder.processorConfig(), configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient)) );/