Migrazione dei consumatori da KCL 1.x a KCL 2.x - Flusso di dati Amazon Kinesis

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Migrazione dei consumatori da KCL 1.x a KCL 2.x

Questo argomento descrive le differenze tra le versioni 1.x e 2.x della Kinesis Client Library (KCL). Viene inoltre illustrato come migrare l'applicazione consumer dalla versione 1.x alla versione 2.x della KCL. Dopo la migrazione, il client avvierà l'elaborazione dei record dall'ultimo punto di controllo verificato.

La versione 2.0 della KCL introduce le seguenti modifiche di interfaccia:

Modifiche di interfaccia KCL
Interfaccia KCL 1.x Interfaccia KCL 2.0
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor software.amazon.kinesis.processor.ShardRecordProcessor
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory software.amazon.kinesis.processor.ShardRecordProcessorFactory
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware Piegata in software.amazon.kinesis.processor.ShardRecordProcessor

Migrazione dell'elaboratore di record

L'esempio seguente mostra un elaboratore di record implementato per &KCL; 1.x:

package com.amazonaws.kcl; import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput; public class TestRecordProcessor implements IRecordProcessor, IShutdownNotificationAware { @Override public void initialize(InitializationInput initializationInput) { // // Setup record processor // } @Override public void processRecords(ProcessRecordsInput processRecordsInput) { // // Process records, and possibly checkpoint // } @Override public void shutdown(ShutdownInput shutdownInput) { if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) { try { shutdownInput.getCheckpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { throw new RuntimeException(e); } } } @Override public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { try { checkpointer.checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow exception // e.printStackTrace(); } } }
Per migrare la classe dell'elaboratore di record
  1. Modifica le interfacce da com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor e com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware verso software.amazon.kinesis.processor.ShardRecordProcessor, come segue:

    // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; import software.amazon.kinesis.processor.ShardRecordProcessor; // public class TestRecordProcessor implements IRecordProcessor, IShutdownNotificationAware { public class TestRecordProcessor implements ShardRecordProcessor {
  2. Aggiorna le istruzioni import per i metodi initialize e processRecords.

    // import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import software.amazon.kinesis.lifecycle.events.InitializationInput; //import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
  3. Sostituisci il metodo shutdown con i seguenti nuovi metodi: leaseLost, shardEnded e shutdownRequested.

    // @Override // public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { // // // // This is moved to shardEnded(...) // // // try { // checkpointer.checkpoint(); // } catch (ShutdownException | InvalidStateException e) { // // // // Swallow exception // // // e.printStackTrace(); // } // } @Override public void leaseLost(LeaseLostInput leaseLostInput) { } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } // @Override // public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { // // // // This is moved to shutdownRequested(ShutdownReauestedInput) // // // try { // checkpointer.checkpoint(); // } catch (ShutdownException | InvalidStateException e) { // // // // Swallow exception // // // e.printStackTrace(); // } // } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { try { shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } }

Segue la versione aggiornata della classe dell'elaboratore di record.

package com.amazonaws.kcl; import software.amazon.kinesis.exceptions.InvalidStateException; import software.amazon.kinesis.exceptions.ShutdownException; import software.amazon.kinesis.lifecycle.events.InitializationInput; import software.amazon.kinesis.lifecycle.events.LeaseLostInput; import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput; import software.amazon.kinesis.lifecycle.events.ShardEndedInput; import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput; import software.amazon.kinesis.processor.ShardRecordProcessor; public class TestRecordProcessor implements ShardRecordProcessor { @Override public void initialize(InitializationInput initializationInput) { } @Override public void processRecords(ProcessRecordsInput processRecordsInput) { } @Override public void leaseLost(LeaseLostInput leaseLostInput) { } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { try { shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } }

Migrazione della fabbrica dell'elaboratore di record

La fabbrica dell'elaboratore di record è responsabile per la creazione di elaboratori di record quando un lease è acquisito. Di seguito è illustrato un esempio di una fabbrica &KCL; 1.x.

package com.amazonaws.kcl; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; public class TestRecordProcessorFactory implements IRecordProcessorFactory { @Override public IRecordProcessor createProcessor() { return new TestRecordProcessor(); } }
Per migrare la fabbrica dell'elaboratore di record
  1. Modifica l'interfaccia implementata da com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory a software.amazon.kinesis.processor.ShardRecordProcessorFactory, come segue.

    // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessor; // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; // public class TestRecordProcessorFactory implements IRecordProcessorFactory { public class TestRecordProcessorFactory implements ShardRecordProcessorFactory {
  2. Modifica la firma di ritorno per createProcessor.

    // public IRecordProcessor createProcessor() { public ShardRecordProcessor shardRecordProcessor() {

Di seguito è riportato un esempio di fabbrica di elaboratore di record in 2.0:

package com.amazonaws.kcl; import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; public class TestRecordProcessorFactory implements ShardRecordProcessorFactory { @Override public ShardRecordProcessor shardRecordProcessor() { return new TestRecordProcessor(); } }

Migrazione del lavoratore

Nella versione 2.0 della KCL, una nuova classe, denominata Scheduler, sostituisce la classe Worker. Di seguito è illustrato un esempio di un worker di KCL 1.x.

final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...) final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory(); final Worker worker = new Worker.Builder() .recordProcessorFactory(recordProcessorFactory) .config(config) .build();
Per migrare il lavoratore
  1. Modifica la dichiarazione import per la classe Worker nelle dichiarazioni di importazione delle classi Scheduler e ConfigsBuilder.

    // import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker; import software.amazon.kinesis.coordinator.Scheduler; import software.amazon.kinesis.common.ConfigsBuilder;
  2. Crea ConfigsBuilder e Scheduler come mostrato nell'esempio seguente.

    Si consiglia di utilizzare KinesisClientUtil per creare KinesisAsyncClient e configurare maxConcurrency in KinesisAsyncClient.

    Importante

    Il client Amazon Kinesis potrebbe presentare un aumento significativo della latenza, a meno che non venga configurato KinesisAsyncClient per avere una maxConcurrency sufficientemente alta per consentire tutti i canoni più ulteriori utilizzi di KinesisAsyncClient.

    import java.util.UUID; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.kinesis.common.ConfigsBuilder; import software.amazon.kinesis.common.KinesisClientUtil; import software.amazon.kinesis.coordinator.Scheduler; ... Region region = Region.AP_NORTHEAST_2; KinesisAsyncClient kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(region)); DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build(); CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build(); ConfigsBuilder configsBuilder = new ConfigsBuilder(streamName, applicationName, kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory()); Scheduler scheduler = new Scheduler( configsBuilder.checkpointConfig(), configsBuilder.coordinatorConfig(), configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), configsBuilder.processorConfig(), configsBuilder.retrievalConfig() );

Configurazione del client Amazon Kinesis

Con il rilascio 2.0 della Kinesis Client Library, la configurazione del client si è spostata da una singola classe di configurazione (KinesisClientLibConfiguration) a sei classi di configurazione. La tabella seguente descrive la migrazione.

Campi di configurazione e relative nuove classi
Campo originale Nuova classe di configurazione Descrizione
applicationName ConfigsBuilder Il nome per l'applicazione della KCL. Utilizzato come predefinito per tableName e consumerName.
tableName ConfigsBuilder Consente di ignorare il nome della tabella utilizzato per la tabella di lease di Amazon DynamoDB.
streamName ConfigsBuilder Il nome del flusso dal quale l'applicazione elabora i record.
kinesisEndpoint ConfigsBuilder Questa opzione è stata eliminata. Configurazione client, vedere la sezione Rimozioni.
dynamoDBEndpoint ConfigsBuilder Questa opzione è stata eliminata. Configurazione client, vedere la sezione Rimozioni.
initialPositionInStreamExtended RetrievalConfig La posizione nello shard da cui il KCL inizia a recuperare i record, a partire dall'esecuzione iniziare all'applicazione.
kinesisCredentialsProvider ConfigsBuilder Questa opzione è stata eliminata. Configurazione client, vedere la sezione Rimozioni.
dynamoDBCredentialsProvider ConfigsBuilder Questa opzione è stata eliminata. Configurazione client, vedere la sezione Rimozioni.
cloudWatchCredentialsProvider ConfigsBuilder Questa opzione è stata eliminata. Configurazione client, vedere la sezione Rimozioni.
failoverTimeMillis LeaseManagementConfig Il numero di millisecondi che devono passare prima di poter considerare un proprietario di lease come fallito.
workerIdentifier ConfigsBuilder Un identificatore univoco che rappresenta la creazione dell'elaboratore di applicazione. Deve essere univoco.
shardSyncIntervalMillis LeaseManagementConfig Il periodo di tempo tra le chiamate di sincronizzazione dello shard.
maxRecords PollingConfig Consente di impostare il numero massimo di record restituiti da Kinesis.
idleTimeBetweenReadsInMillis CoordinatorConfig Questa opzione è stata eliminata. Vedi rimozione tempo di inattività.
callProcessRecordsEvenForEmptyRecordList ProcessorConfig Quando impostato, l'elaboratore di record viene chiamato anche quando nessun record è stato fornito da .
parentShardPollIntervalMillis CoordinatorConfig Con quale frequenza un elaboratore di record deve eseguire il polling per vedere se il shard padre è stata completato.
cleanupLeasesUponShardCompletion LeaseManagementConfig Quando impostati, i lease vengono rimossi non appena i lease figlio hanno iniziato l'elaborazione.
ignoreUnexpectedChildShards LeaseManagementConfig Quando impostato, i shard figlio che hanno un shard aperto vengono ignorati. Questo è principalmente per DynamoDB Streams.
kinesisClientConfig ConfigsBuilder Questa opzione è stata eliminata. Configurazione client, vedere la sezione Rimozioni.
dynamoDBClientConfig ConfigsBuilder Questa opzione è stata eliminata. Configurazione client, vedere la sezione Rimozioni.
cloudWatchClientConfig ConfigsBuilder Questa opzione è stata eliminata. Configurazione client, vedere la sezione Rimozioni.
taskBackoffTimeMillis LifecycleConfig Il tempo di attesa per riprovare operazioni non riuscite.
metricsBufferTimeMillis MetricsConfig Controlli di pubblicazione di parametri CloudWatch.
metricsMaxQueueSize MetricsConfig Controlli di pubblicazione di parametri CloudWatch.
metricsLevel MetricsConfig Controlli di pubblicazione di parametri CloudWatch.
metricsEnabledDimensions MetricsConfig Controlli di pubblicazione di parametri CloudWatch.
validateSequenceNumberBeforeCheckpointing CheckpointConfig Questa opzione è stata eliminata. Vedi la convalida del numero di sequenza del checkpoint.
regionName ConfigsBuilder Questa opzione è stata eliminata. Vedi la rimozione della configurazione client.
maxLeasesForWorker LeaseManagementConfig Il numero massimo di lease che una singola istanza dell'applicazione deve accettare.
maxLeasesToStealAtOneTime LeaseManagementConfig Il numero massimo di lease che un'applicazione deve tentare di intercettare simultaneamente.
initialLeaseTableReadCapacity LeaseManagementConfig IOPS di lettura DynamoDB utilizzato se la Kinesis Client Library deve creare una nuova tabella di lease DynamoDB.
initialLeaseTableWriteCapacity LeaseManagementConfig IOPS di lettura DynamoDB utilizzato se la Kinesis Client Library deve creare una nuova tabella di lease DynamoDB.
initialPositionInStreamExtended LeaseManagementConfig La posizione iniziale nel flusso nella quale l'applicazione dovrebbe iniziare. Questo viene utilizzato soltanto durante la creazione del lease iniziale.
skipShardSyncAtWorkerInitializationIfLeasesExist CoordinatorConfig Disabilita la sincronizzazione dei dati shard se la tabella di lease contiene lease esistenti. TODO: KinesisEco-438
shardPrioritization CoordinatorConfig Quale prioritizzazione shard utilizzare.
shutdownGraceMillis N/D Questa opzione è stata eliminata. Vedi rimozioni MultiLang.
timeoutInSeconds N/D Questa opzione è stata eliminata. Vedi rimozioni MultiLang.
retryGetRecordsInSeconds PollingConfig Configura il ritardo tra i tentativi GetRecords per gli errori.
maxGetRecordsThreadPool PollingConfig La dimensione del pool di thread utilizzato per GetRecords.
maxLeaseRenewalThreads LeaseManagementConfig Controlla le dimensioni del pool di thread di rinnovo del lease. Quanto maggiori sono i lease che può richiedere l'applicazione, tanto più grande deve essere questo pool.
recordsFetcherFactory PollingConfig Consente di sostituire la factory utilizzata per creare fetcher che recuperano dai flussi.
logWarningForTaskAfterMillis LifecycleConfig Quanto tempo bisogna attendere prima che venga registrato un avviso se un'attività non è stata completata.
listShardsBackoffTimeInMillis RetrievalConfig Il numero di millisecondi di attesa tra le chiamate in ListShards quando si verificano errori.
maxListShardsRetryAttempts RetrievalConfig Il numero massimo di volte che ListShards effettua nuovi tentativi prima di desistere.

Rimozione tempo di inattività

Nella versione 1.x di &KCL;, idleTimeBetweenReadsInMillis corrispondeva a due quantità:

  • La quantità di tempo tra controlli dell'attività. È ora possibile configurare questo periodo tra attività impostando CoordinatorConfig#shardConsumerDispatchPollIntervalMillis.

  • La quantità di tempo di sospensione quando non è stato restituito alcun record da . Nella versione 2.0, nel fan-out ottimizzato, i record vengono inviati da chi li ha recuperati. L'attività sul consumo di shard avviene solo quando arriva una richiesta di push.

Rimozioni configurazione client

Nella versione 2.0, la KCL non crea più client. Spetta all'utente fornire un client valido. Con questa modifica, tutti i parametri di configurazione che controllavano la creazione del client sono stati rimossi. Se hai bisogno di questi parametri, puoi impostarli sui client prima di fornire i client a ConfigsBuilder.

Campo rimosso Configurazione equivalente
kinesisEndpoint Configura SDK KinesisAsyncClient con l'endpoint preferito: KinesisAsyncClient.builder().endpointOverride(URI.create("https://<kinesis endpoint>")).build().
dynamoDBEndpoint Configura SDK DynamoDbAsyncClient con l'endpoint preferito: DynamoDbAsyncClient.builder().endpointOverride(URI.create("https://<dynamodb endpoint>")).build().
kinesisClientConfig Configura SDK KinesisAsyncClient con la configurazione necessaria: KinesisAsyncClient.builder().overrideConfiguration(<your configuration>).build().
dynamoDBClientConfig Configura SDK DynamoDbAsyncClient con la configurazione necessaria: DynamoDbAsyncClient.builder().overrideConfiguration(<your configuration>).build().
cloudWatchClientConfig Configura SDK CloudWatchAsyncClient con la configurazione necessaria: CloudWatchAsyncClient.builder().overrideConfiguration(<your configuration>).build().
regionName Configura SDK con la regione preferita. Questo è uguale per tutti i client SDK. Ad esempio, KinesisAsyncClient.builder().region(Region.US_WEST_2).build().