As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.
Migrar consumidores do KCL 1.x para o KCL 2.x
Este tópico explica as diferenças entre as versões 1.x e 2.x da Kinesis Client Library (KCL). Ele também mostra como migrar o consumidor da versão 1.x para a versão 2.x da KCL. Depois que você migrar o cliente, ele iniciará o processamento de registros a partir do local verificado pela última vez.
A versão 2.0 da KCL apresenta as seguintes alterações de interface:
Alterações de interface da KCL | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Interface KCL 1.x | Interface KCL 2.0 | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor |
software.amazon.kinesis.processor.ShardRecordProcessor |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory |
software.amazon.kinesis.processor.ShardRecordProcessorFactory |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware |
Compactada em software.amazon.kinesis.processor.ShardRecordProcessor |
Tópicos
Migrar o processador de registros
Este exemplo mostra um processador de registros implementado para a KCL 1.x:
package com.amazonaws.kcl; import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput; public class TestRecordProcessor implements IRecordProcessor, IShutdownNotificationAware { @Override public void initialize(InitializationInput initializationInput) { // // Setup record processor // } @Override public void processRecords(ProcessRecordsInput processRecordsInput) { // // Process records, and possibly checkpoint // } @Override public void shutdown(ShutdownInput shutdownInput) { if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) { try { shutdownInput.getCheckpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { throw new RuntimeException(e); } } } @Override public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { try { checkpointer.checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow exception // e.printStackTrace(); } } }
Como migrar a classe de processador de registro
-
Altere as interfaces de
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor
ecom.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware
parasoftware.amazon.kinesis.processor.ShardRecordProcessor
, da seguinte forma:// import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; import software.amazon.kinesis.processor.ShardRecordProcessor; // public class TestRecordProcessor implements IRecordProcessor, IShutdownNotificationAware { public class TestRecordProcessor implements ShardRecordProcessor {
-
Atualize as instruções
import
para os métodosinitialize
eprocessRecords
.// import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import software.amazon.kinesis.lifecycle.events.InitializationInput; //import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
-
Substitua o método
shutdown
pelos novos métodos a seguir:leaseLost
,shardEnded
, eshutdownRequested
.// @Override // public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { // // // // This is moved to shardEnded(...) // // // try { // checkpointer.checkpoint(); // } catch (ShutdownException | InvalidStateException e) { // // // // Swallow exception // // // e.printStackTrace(); // } // } @Override public void leaseLost(LeaseLostInput leaseLostInput) { } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } // @Override // public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { // // // // This is moved to shutdownRequested(ShutdownReauestedInput) // // // try { // checkpointer.checkpoint(); // } catch (ShutdownException | InvalidStateException e) { // // // // Swallow exception // // // e.printStackTrace(); // } // } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { try { shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } }
Veja a seguir a versão atualizada da classe de processador de registro.
package com.amazonaws.kcl; import software.amazon.kinesis.exceptions.InvalidStateException; import software.amazon.kinesis.exceptions.ShutdownException; import software.amazon.kinesis.lifecycle.events.InitializationInput; import software.amazon.kinesis.lifecycle.events.LeaseLostInput; import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput; import software.amazon.kinesis.lifecycle.events.ShardEndedInput; import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput; import software.amazon.kinesis.processor.ShardRecordProcessor; public class TestRecordProcessor implements ShardRecordProcessor { @Override public void initialize(InitializationInput initializationInput) { } @Override public void processRecords(ProcessRecordsInput processRecordsInput) { } @Override public void leaseLost(LeaseLostInput leaseLostInput) { } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { try { shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } }
Migrar a Fábrica do Processador de Registros
A fábrica do processador de registros é responsável por criar processadores de registro quando uma concessão é realizada. Veja a seguir um exemplo de uma fábrica da KCL 1.x.
package com.amazonaws.kcl; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; public class TestRecordProcessorFactory implements IRecordProcessorFactory { @Override public IRecordProcessor createProcessor() { return new TestRecordProcessor(); } }
Migrar a fábrica do processador de registros
-
Altere a interface implementada de
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory
parasoftware.amazon.kinesis.processor.ShardRecordProcessorFactory
, da seguinte forma:// import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessor; // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; // public class TestRecordProcessorFactory implements IRecordProcessorFactory { public class TestRecordProcessorFactory implements ShardRecordProcessorFactory {
-
Alterar a assinatura de retorno para
createProcessor
.// public IRecordProcessor createProcessor() { public ShardRecordProcessor shardRecordProcessor() {
Veja a seguir um exemplo da fábrica do processador de registros em 2.0:
package com.amazonaws.kcl; import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; public class TestRecordProcessorFactory implements ShardRecordProcessorFactory { @Override public ShardRecordProcessor shardRecordProcessor() { return new TestRecordProcessor(); } }
Migração do operador
Na versão 2.0 da KCL, uma nova classe, chamada Scheduler
, substitui a classe Worker
. Veja a seguir um exemplo de um operador da KCL 1.x.
final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...) final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory(); final Worker worker = new Worker.Builder() .recordProcessorFactory(recordProcessorFactory) .config(config) .build();
Para migrar o operador
-
Altere a instrução
import
para a classeWorker
para as instruções de importação para as classesScheduler
eConfigsBuilder
.// import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker; import software.amazon.kinesis.coordinator.Scheduler; import software.amazon.kinesis.common.ConfigsBuilder;
-
Crie o
ConfigsBuilder
e umScheduler
conforme mostrado no exemplo a seguir.Recomendamos que você use
KinesisClientUtil
para criarKinesisAsyncClient
e configurarmaxConcurrency
noKinesisAsyncClient
.Importante
O Amazon Kinesis Client pode ter latência significativamente maior, a menos que você configure
KinesisAsyncClient
para ter ummaxConcurrency
alto o suficiente para permitir todas as concessões e usos adicionais doKinesisAsyncClient
.import java.util.UUID; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.kinesis.common.ConfigsBuilder; import software.amazon.kinesis.common.KinesisClientUtil; import software.amazon.kinesis.coordinator.Scheduler; ... Region region = Region.AP_NORTHEAST_2; KinesisAsyncClient kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(region)); DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build(); CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build(); ConfigsBuilder configsBuilder = new ConfigsBuilder(streamName, applicationName, kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory()); Scheduler scheduler = new Scheduler( configsBuilder.checkpointConfig(), configsBuilder.coordinatorConfig(), configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), configsBuilder.processorConfig(), configsBuilder.retrievalConfig() );
Configurar o cliente do Amazon Kinesis
Com a versão 2.0 da Kinesis Client Library, a configuração do cliente passou de uma única classe de configuração (KinesisClientLibConfiguration
) para seis classes. A tabela a seguir descreve a migração.
Campos de Configuração e suas Novas Classes | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Campo Original | Nova classe de configuração | Descrição | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
applicationName |
ConfigsBuilder |
O nome da aplicação da KCL. Usado como padrão para o tableName e o consumerName . |
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
tableName |
ConfigsBuilder |
Permite substituir o nome usado para a tabela de concessão do Amazon DynamoDB. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
streamName |
ConfigsBuilder |
O nome do stream a partir do qual esse aplicativo processa registros. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
kinesisEndpoint |
ConfigsBuilder |
Essa opção não existe mais. Consulte remoções de configuração de cliente. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
dynamoDBEndpoint |
ConfigsBuilder |
Essa opção não existe mais. Consulte remoções de configuração de cliente. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
initialPositionInStreamExtended |
RetrievalConfig |
A localização no estilhaço a partir da qual a KCL começa a obter registros, começando com a execução inicial do aplicativo. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
kinesisCredentialsProvider |
ConfigsBuilder |
Essa opção não existe mais. Consulte remoções de configuração de cliente. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
dynamoDBCredentialsProvider |
ConfigsBuilder |
Essa opção não existe mais. Consulte remoções de configuração de cliente. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
cloudWatchCredentialsProvider |
ConfigsBuilder |
Essa opção não existe mais. Consulte remoções de configuração de cliente. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
failoverTimeMillis |
ConfigdeGerenciamentodeConcessão | O número de milissegundos que devem passar antes que se considere uma falha do proprietário da concessão. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
workerIdentifier |
ConfigsBuilder |
Um identificador exclusivo que representa a instanciação do processador do aplicativo. Isso deve ser exclusivo. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
shardSyncIntervalMillis |
ConfigdeGerenciamentodeConcessão | O tempo entre as chamadas de sincronização de estilhaços. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
maxRecords |
PollingConfig |
Permite definir o número máximo de registros que o Kinesis retorna. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
idleTimeBetweenReadsInMillis |
CoordinatorConfig |
Essa opção não existe mais. Consulte a remoção do tempo ocioso. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
callProcessRecordsEvenForEmptyRecordList |
ProcessorConfig |
Quando definido, o processador de registros é chamado mesmo quando o Kinesis não fornece nenhum registro. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
parentShardPollIntervalMillis |
CoordinatorConfig |
Com que frequência um processador de registros deve sondar a conclusão de estilhaços pai. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
cleanupLeasesUponShardCompletion |
LeaseManagementConfig |
Quando definidas, as concessões são removidas assim que as concessões filho iniciam o processamento. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
ignoreUnexpectedChildShards |
LeaseManagementConfig |
Quando definidos, estilhaços filho que possuem um estilhaço aberto são ignorados. Essa configuração destina-se principalmente a fluxos do DynamoDB. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
kinesisClientConfig |
ConfigsBuilder |
Essa opção não existe mais. Consulte remoções de configuração de cliente. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
dynamoDBClientConfig |
ConfigsBuilder |
Essa opção não existe mais. Consulte remoções de configuração de cliente. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
cloudWatchClientConfig |
ConfigsBuilder |
Essa opção não existe mais. Consulte remoções de configuração de cliente. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
taskBackoffTimeMillis |
LifecycleConfig |
O tempo de espera para repetir tarefas com falha. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
metricsBufferTimeMillis |
MetricsConfig |
Controla a publicação de métricas do CloudWatch. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
metricsMaxQueueSize |
MetricsConfig |
Controla a publicação de métricas do CloudWatch. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
metricsLevel |
MetricsConfig |
Controla a publicação de métricas do CloudWatch. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
metricsEnabledDimensions |
MetricsConfig |
Controla a publicação de métricas do CloudWatch. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
validateSequenceNumberBeforeCheckpointing |
CheckpointConfig |
Essa opção não existe mais. Consulte a validação do número de sequência do ponto de verificação. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
regionName |
ConfigsBuilder |
Essa opção não existe mais. Consulte remoção de configuração de cliente. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
maxLeasesForWorker |
LeaseManagementConfig |
O número máximo de concessões que uma única instância do aplicativo deve aceitar. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
maxLeasesToStealAtOneTime |
LeaseManagementConfig |
O número máximo de concessões que um aplicativo deve tentar roubar de uma só vez. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
initialLeaseTableReadCapacity |
LeaseManagementConfig |
As IOPS de leitura do DynamoDB que serão usadas se a Kinesis Client Library precisar criar uma nova tabela de concessão do DynamoDB. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
initialLeaseTableWriteCapacity |
LeaseManagementConfig |
As IOPS de leitura do DynamoDB que serão usadas se a Kinesis Client Library precisar criar uma nova tabela de concessão do DynamoDB. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
initialPositionInStreamExtended |
ConfigdeGerenciamentodeConcessão | A posição inicial do aplicativo no stream. Isso é usado somente durante a criação da concessão inicial. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
skipShardSyncAtWorkerInitializationIfLeasesExist |
CoordinatorConfig |
Desative a sincronização de dados de estilhaço se a tabela de concessão contiver concessões existentes. TODO: KinesisEco-438 | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
shardPrioritization |
CoordinatorConfig |
A priorização de estilhaços a ser usada. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
shutdownGraceMillis |
N/D | Essa opção não existe mais. Consulte as remoções MultiLang. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
timeoutInSeconds |
N/D | Essa opção não existe mais. Consulte as remoções MultiLang. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
retryGetRecordsInSeconds |
PollingConfig |
Configura o atraso entre as tentativas GetRecords para falhas. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
maxGetRecordsThreadPool |
PollingConfig |
O tamanho do grupo de threads usado para GetRecords. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
maxLeaseRenewalThreads |
LeaseManagementConfig |
Controla o tamanho do grupo de threads de renovação de concessão. Quanto mais concessões seu aplicativo aceitar, maior esse grupo deve ser. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
recordsFetcherFactory |
PollingConfig |
Permite substituir a fábrica usada para criar extratores que recuperam dados dos streams. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
logWarningForTaskAfterMillis |
LifecycleConfig |
Quanto tempo esperar antes de um aviso ser registrado caso uma tarefa não seja concluída. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
listShardsBackoffTimeInMillis |
RetrievalConfig |
O número de milissegundos de espera entre as chamadas para ListShards em caso de falha. |
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
maxListShardsRetryAttempts |
RetrievalConfig |
O número máximo de novas tentativas de ListShards antes de desistir. |
Remoção do tempo ocioso
Na versão 1.x da KCL, idleTimeBetweenReadsInMillis
corresponde a duas quantidades:
-
A quantidade de tempo entre as verificações de envio de tarefas. Agora você pode configurar esse tempo entre tarefas, definindo
CoordinatorConfig#shardConsumerDispatchPollIntervalMillis
. -
A quantidade de tempo inativo quando nenhum registro é retornado do Kinesis Data Streams. Na versão 2.0, em distribuição avançada registros são enviados a partir de sua respectiva recuperação. Atividade no do consumidor estilhaço só ocorre quando uma solicitação é enviada.
Remoções de configuração de cliente
Na versão 2.0, a KCL não cria mais clientes. Ela depende do fornecimento de um cliente válido pelo usuário. Com essa alteração, todos os parâmetros de configuração que controlavam a criação do cliente foram removidos. Se precisar desses parâmetros, você pode configurá-los nos clientes antes de fornecê-los ao ConfigsBuilder
.
Campo removido | Configuração equivalente |
---|---|
kinesisEndpoint |
Configure o SDK KinesisAsyncClient com o endpoint de sua preferência: KinesisAsyncClient.builder().endpointOverride(URI.create("https://<kinesis
endpoint>")).build() . |
dynamoDBEndpoint |
Configure o SDK DynamoDbAsyncClient com o endpoint de sua preferência: DynamoDbAsyncClient.builder().endpointOverride(URI.create("https://<dynamodb
endpoint>")).build() . |
kinesisClientConfig |
Configure o SDK KinesisAsyncClient com a configuração necessária: KinesisAsyncClient.builder().overrideConfiguration(<your
configuration>).build() . |
dynamoDBClientConfig |
Configure o SDK DynamoDbAsyncClient com a configuração necessária: DynamoDbAsyncClient.builder().overrideConfiguration(<your
configuration>).build() . |
cloudWatchClientConfig |
Configure o SDK CloudWatchAsyncClient com a configuração necessária: CloudWatchAsyncClient.builder().overrideConfiguration(<your
configuration>).build() . |
regionName |
Configure o SDK com a Região de sua preferência. Ela é a mesma para todos os clientes do SDK. Por exemplo, KinesisAsyncClient.builder().region(Region.US_WEST_2).build() . |