Migrar consumidores do KCL 1.x para o KCL 2.x - Amazon Kinesis Data Streams

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Migrar consumidores do KCL 1.x para o KCL 2.x

Este tópico explica as diferenças entre as versões 1.x e 2.x da Kinesis Client Library (KCL). Ele também mostra como migrar o consumidor da versão 1.x para a versão 2.x da KCL. Depois que você migrar o cliente, ele iniciará o processamento de registros a partir do local verificado pela última vez.

A versão 2.0 da KCL apresenta as seguintes alterações de interface:

Alterações de interface da KCL
Interface KCL 1.x Interface KCL 2.0
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor software.amazon.kinesis.processor.ShardRecordProcessor
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory software.amazon.kinesis.processor.ShardRecordProcessorFactory
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware Compactada em software.amazon.kinesis.processor.ShardRecordProcessor

Migrar o processador de registros

Este exemplo mostra um processador de registros implementado para a KCL 1.x:

package com.amazonaws.kcl; import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput; public class TestRecordProcessor implements IRecordProcessor, IShutdownNotificationAware { @Override public void initialize(InitializationInput initializationInput) { // // Setup record processor // } @Override public void processRecords(ProcessRecordsInput processRecordsInput) { // // Process records, and possibly checkpoint // } @Override public void shutdown(ShutdownInput shutdownInput) { if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) { try { shutdownInput.getCheckpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { throw new RuntimeException(e); } } } @Override public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { try { checkpointer.checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow exception // e.printStackTrace(); } } }
Como migrar a classe de processador de registro
  1. Altere as interfaces de com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor e com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware para software.amazon.kinesis.processor.ShardRecordProcessor, da seguinte forma:

    // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; import software.amazon.kinesis.processor.ShardRecordProcessor; // public class TestRecordProcessor implements IRecordProcessor, IShutdownNotificationAware { public class TestRecordProcessor implements ShardRecordProcessor {
  2. Atualize as instruções import para os métodos initialize e processRecords.

    // import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import software.amazon.kinesis.lifecycle.events.InitializationInput; //import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
  3. Substitua o método shutdown pelos novos métodos a seguir: leaseLost, shardEnded, e shutdownRequested.

    // @Override // public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { // // // // This is moved to shardEnded(...) // // // try { // checkpointer.checkpoint(); // } catch (ShutdownException | InvalidStateException e) { // // // // Swallow exception // // // e.printStackTrace(); // } // } @Override public void leaseLost(LeaseLostInput leaseLostInput) { } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } // @Override // public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { // // // // This is moved to shutdownRequested(ShutdownReauestedInput) // // // try { // checkpointer.checkpoint(); // } catch (ShutdownException | InvalidStateException e) { // // // // Swallow exception // // // e.printStackTrace(); // } // } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { try { shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } }

Veja a seguir a versão atualizada da classe de processador de registro.

package com.amazonaws.kcl; import software.amazon.kinesis.exceptions.InvalidStateException; import software.amazon.kinesis.exceptions.ShutdownException; import software.amazon.kinesis.lifecycle.events.InitializationInput; import software.amazon.kinesis.lifecycle.events.LeaseLostInput; import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput; import software.amazon.kinesis.lifecycle.events.ShardEndedInput; import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput; import software.amazon.kinesis.processor.ShardRecordProcessor; public class TestRecordProcessor implements ShardRecordProcessor { @Override public void initialize(InitializationInput initializationInput) { } @Override public void processRecords(ProcessRecordsInput processRecordsInput) { } @Override public void leaseLost(LeaseLostInput leaseLostInput) { } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { try { shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } }

Migrar a Fábrica do Processador de Registros

A fábrica do processador de registros é responsável por criar processadores de registro quando uma concessão é realizada. Veja a seguir um exemplo de uma fábrica da KCL 1.x.

package com.amazonaws.kcl; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; public class TestRecordProcessorFactory implements IRecordProcessorFactory { @Override public IRecordProcessor createProcessor() { return new TestRecordProcessor(); } }
Migrar a fábrica do processador de registros
  1. Altere a interface implementada de com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory para software.amazon.kinesis.processor.ShardRecordProcessorFactory, da seguinte forma:

    // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessor; // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; // public class TestRecordProcessorFactory implements IRecordProcessorFactory { public class TestRecordProcessorFactory implements ShardRecordProcessorFactory {
  2. Alterar a assinatura de retorno para createProcessor.

    // public IRecordProcessor createProcessor() { public ShardRecordProcessor shardRecordProcessor() {

Veja a seguir um exemplo da fábrica do processador de registros em 2.0:

package com.amazonaws.kcl; import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; public class TestRecordProcessorFactory implements ShardRecordProcessorFactory { @Override public ShardRecordProcessor shardRecordProcessor() { return new TestRecordProcessor(); } }

Migração do operador

Na versão 2.0 da KCL, uma nova classe, chamada Scheduler, substitui a classe Worker. Veja a seguir um exemplo de um operador da KCL 1.x.

final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...) final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory(); final Worker worker = new Worker.Builder() .recordProcessorFactory(recordProcessorFactory) .config(config) .build();
Para migrar o operador
  1. Altere a instrução import para a classe Worker para as instruções de importação para as classes Scheduler e ConfigsBuilder.

    // import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker; import software.amazon.kinesis.coordinator.Scheduler; import software.amazon.kinesis.common.ConfigsBuilder;
  2. Crie o ConfigsBuilder e um Scheduler conforme mostrado no exemplo a seguir.

    Recomendamos que você use KinesisClientUtil para criar KinesisAsyncClient e configurar maxConcurrency no KinesisAsyncClient.

    Importante

    O Amazon Kinesis Client pode ter latência significativamente maior, a menos que você configure KinesisAsyncClient para ter um maxConcurrency alto o suficiente para permitir todas as concessões e usos adicionais do KinesisAsyncClient.

    import java.util.UUID; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.kinesis.common.ConfigsBuilder; import software.amazon.kinesis.common.KinesisClientUtil; import software.amazon.kinesis.coordinator.Scheduler; ... Region region = Region.AP_NORTHEAST_2; KinesisAsyncClient kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(region)); DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build(); CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build(); ConfigsBuilder configsBuilder = new ConfigsBuilder(streamName, applicationName, kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory()); Scheduler scheduler = new Scheduler( configsBuilder.checkpointConfig(), configsBuilder.coordinatorConfig(), configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), configsBuilder.processorConfig(), configsBuilder.retrievalConfig() );

Configurar o cliente do Amazon Kinesis

Com a versão 2.0 da Kinesis Client Library, a configuração do cliente passou de uma única classe de configuração (KinesisClientLibConfiguration) para seis classes. A tabela a seguir descreve a migração.

Campos de Configuração e suas Novas Classes
Campo Original Nova classe de configuração Descrição
applicationName ConfigsBuilder O nome da aplicação da KCL. Usado como padrão para o tableName e o consumerName.
tableName ConfigsBuilder Permite substituir o nome usado para a tabela de concessão do Amazon DynamoDB.
streamName ConfigsBuilder O nome do stream a partir do qual esse aplicativo processa registros.
kinesisEndpoint ConfigsBuilder Essa opção não existe mais. Consulte remoções de configuração de cliente.
dynamoDBEndpoint ConfigsBuilder Essa opção não existe mais. Consulte remoções de configuração de cliente.
initialPositionInStreamExtended RetrievalConfig A localização no estilhaço a partir da qual a KCL começa a obter registros, começando com a execução inicial do aplicativo.
kinesisCredentialsProvider ConfigsBuilder Essa opção não existe mais. Consulte remoções de configuração de cliente.
dynamoDBCredentialsProvider ConfigsBuilder Essa opção não existe mais. Consulte remoções de configuração de cliente.
cloudWatchCredentialsProvider ConfigsBuilder Essa opção não existe mais. Consulte remoções de configuração de cliente.
failoverTimeMillis ConfigdeGerenciamentodeConcessão O número de milissegundos que devem passar antes que se considere uma falha do proprietário da concessão.
workerIdentifier ConfigsBuilder Um identificador exclusivo que representa a instanciação do processador do aplicativo. Isso deve ser exclusivo.
shardSyncIntervalMillis ConfigdeGerenciamentodeConcessão O tempo entre as chamadas de sincronização de estilhaços.
maxRecords PollingConfig Permite definir o número máximo de registros que o Kinesis retorna.
idleTimeBetweenReadsInMillis CoordinatorConfig Essa opção não existe mais. Consulte a remoção do tempo ocioso.
callProcessRecordsEvenForEmptyRecordList ProcessorConfig Quando definido, o processador de registros é chamado mesmo quando o Kinesis não fornece nenhum registro.
parentShardPollIntervalMillis CoordinatorConfig Com que frequência um processador de registros deve sondar a conclusão de estilhaços pai.
cleanupLeasesUponShardCompletion LeaseManagementConfig Quando definidas, as concessões são removidas assim que as concessões filho iniciam o processamento.
ignoreUnexpectedChildShards LeaseManagementConfig Quando definidos, estilhaços filho que possuem um estilhaço aberto são ignorados. Essa configuração destina-se principalmente a fluxos do DynamoDB.
kinesisClientConfig ConfigsBuilder Essa opção não existe mais. Consulte remoções de configuração de cliente.
dynamoDBClientConfig ConfigsBuilder Essa opção não existe mais. Consulte remoções de configuração de cliente.
cloudWatchClientConfig ConfigsBuilder Essa opção não existe mais. Consulte remoções de configuração de cliente.
taskBackoffTimeMillis LifecycleConfig O tempo de espera para repetir tarefas com falha.
metricsBufferTimeMillis MetricsConfig Controla a publicação de métricas do CloudWatch.
metricsMaxQueueSize MetricsConfig Controla a publicação de métricas do CloudWatch.
metricsLevel MetricsConfig Controla a publicação de métricas do CloudWatch.
metricsEnabledDimensions MetricsConfig Controla a publicação de métricas do CloudWatch.
validateSequenceNumberBeforeCheckpointing CheckpointConfig Essa opção não existe mais. Consulte a validação do número de sequência do ponto de verificação.
regionName ConfigsBuilder Essa opção não existe mais. Consulte remoção de configuração de cliente.
maxLeasesForWorker LeaseManagementConfig O número máximo de concessões que uma única instância do aplicativo deve aceitar.
maxLeasesToStealAtOneTime LeaseManagementConfig O número máximo de concessões que um aplicativo deve tentar roubar de uma só vez.
initialLeaseTableReadCapacity LeaseManagementConfig As IOPS de leitura do DynamoDB que serão usadas se a Kinesis Client Library precisar criar uma nova tabela de concessão do DynamoDB.
initialLeaseTableWriteCapacity LeaseManagementConfig As IOPS de leitura do DynamoDB que serão usadas se a Kinesis Client Library precisar criar uma nova tabela de concessão do DynamoDB.
initialPositionInStreamExtended ConfigdeGerenciamentodeConcessão A posição inicial do aplicativo no stream. Isso é usado somente durante a criação da concessão inicial.
skipShardSyncAtWorkerInitializationIfLeasesExist CoordinatorConfig Desative a sincronização de dados de estilhaço se a tabela de concessão contiver concessões existentes. TODO: KinesisEco-438
shardPrioritization CoordinatorConfig A priorização de estilhaços a ser usada.
shutdownGraceMillis N/D Essa opção não existe mais. Consulte as remoções MultiLang.
timeoutInSeconds N/D Essa opção não existe mais. Consulte as remoções MultiLang.
retryGetRecordsInSeconds PollingConfig Configura o atraso entre as tentativas GetRecords para falhas.
maxGetRecordsThreadPool PollingConfig O tamanho do grupo de threads usado para GetRecords.
maxLeaseRenewalThreads LeaseManagementConfig Controla o tamanho do grupo de threads de renovação de concessão. Quanto mais concessões seu aplicativo aceitar, maior esse grupo deve ser.
recordsFetcherFactory PollingConfig Permite substituir a fábrica usada para criar extratores que recuperam dados dos streams.
logWarningForTaskAfterMillis LifecycleConfig Quanto tempo esperar antes de um aviso ser registrado caso uma tarefa não seja concluída.
listShardsBackoffTimeInMillis RetrievalConfig O número de milissegundos de espera entre as chamadas para ListShards em caso de falha.
maxListShardsRetryAttempts RetrievalConfig O número máximo de novas tentativas de ListShards antes de desistir.

Remoção do tempo ocioso

Na versão 1.x da KCL, idleTimeBetweenReadsInMillis corresponde a duas quantidades:

  • A quantidade de tempo entre as verificações de envio de tarefas. Agora você pode configurar esse tempo entre tarefas, definindo CoordinatorConfig#shardConsumerDispatchPollIntervalMillis.

  • A quantidade de tempo inativo quando nenhum registro é retornado do Kinesis Data Streams. Na versão 2.0, em distribuição avançada registros são enviados a partir de sua respectiva recuperação. Atividade no do consumidor estilhaço só ocorre quando uma solicitação é enviada.

Remoções de configuração de cliente

Na versão 2.0, a KCL não cria mais clientes. Ela depende do fornecimento de um cliente válido pelo usuário. Com essa alteração, todos os parâmetros de configuração que controlavam a criação do cliente foram removidos. Se precisar desses parâmetros, você pode configurá-los nos clientes antes de fornecê-los ao ConfigsBuilder.

Campo removido Configuração equivalente
kinesisEndpoint Configure o SDK KinesisAsyncClient com o endpoint de sua preferência: KinesisAsyncClient.builder().endpointOverride(URI.create("https://<kinesis endpoint>")).build().
dynamoDBEndpoint Configure o SDK DynamoDbAsyncClient com o endpoint de sua preferência: DynamoDbAsyncClient.builder().endpointOverride(URI.create("https://<dynamodb endpoint>")).build().
kinesisClientConfig Configure o SDK KinesisAsyncClient com a configuração necessária: KinesisAsyncClient.builder().overrideConfiguration(<your configuration>).build().
dynamoDBClientConfig Configure o SDK DynamoDbAsyncClient com a configuração necessária: DynamoDbAsyncClient.builder().overrideConfiguration(<your configuration>).build().
cloudWatchClientConfig Configure o SDK CloudWatchAsyncClient com a configuração necessária: CloudWatchAsyncClient.builder().overrideConfiguration(<your configuration>).build().
regionName Configure o SDK com a Região de sua preferência. Ela é a mesma para todos os clientes do SDK. Por exemplo, KinesisAsyncClient.builder().region(Region.US_WEST_2).build().