Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.
Implémenter une application client KCL pour les flux CDC d'Amazon Keyspaces
Cette rubrique fournit un step-by-step guide pour implémenter une application client KCL pour traiter les flux CDC d'Amazon Keyspaces.
-
Conditions préalables : Avant de commencer, assurez-vous d'avoir :
-
Une table Amazon Keyspaces avec un flux CDC
-
Autorisations IAM requises pour que le principal IAM puisse accéder au flux CDC d'Amazon Keyspaces, créer et accéder aux tables DynamoDB pour le traitement des flux KCL, et autorisations pour publier des métriques sur. CloudWatch Pour plus d'informations et un exemple de stratégie, consultezAutorisations pour traiter les flux CDC d'Amazon Keyspaces avec la Kinesis Client Library (KCL).
Assurez-vous que des AWS informations d'identification valides sont configurées dans votre configuration locale. Pour de plus amples informations, veuillez consulter Stocker les clés d'accès pour un accès programmatique.
-
Kit de développement Java (JDK) 8 ou version ultérieure
-
Les exigences sont répertoriées dans le fichier Readme
sur Github.
-
-
Au cours de cette étape, vous ajoutez la dépendance KCL à votre projet. Pour Maven, ajoutez ce qui suit à votre fichier pom.xml :
<dependencies> <dependency> <groupId>software.amazon.kinesis</groupId> <artifactId>amazon-kinesis-client</artifactId> <version>3.1.0</version> </dependency> <dependency> <groupId>software.amazon.keyspaces</groupId> <artifactId>keyspaces-streams-kinesis-adapter</artifactId> <version>1.0.0</version> </dependency> </dependencies>Note
Vérifiez toujours la dernière version de KCL dans le référentiel KCL GitHub
. -
Créez une classe d'usine qui produit des instances de processeur d'enregistrement :
import software.amazon.awssdk.services.keyspacesstreams.model.Record; import software.amazon.keyspaces.streamsadapter.adapter.KeyspacesStreamsClientRecord; import software.amazon.keyspaces.streamsadapter.model.KeyspacesStreamsProcessRecordsInput; import software.amazon.keyspaces.streamsadapter.processor.KeyspacesStreamsShardRecordProcessor; import software.amazon.kinesis.lifecycle.events.InitializationInput; import software.amazon.kinesis.lifecycle.events.LeaseLostInput; import software.amazon.kinesis.lifecycle.events.ShardEndedInput; import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput; import software.amazon.kinesis.processor.RecordProcessorCheckpointer; public class RecordProcessor implements KeyspacesStreamsShardRecordProcessor { private String shardId; @Override public void initialize(InitializationInput initializationInput) { this.shardId = initializationInput.shardId(); System.out.println("Initializing record processor for shard: " + shardId); } @Override public void processRecords(KeyspacesStreamsProcessRecordsInput processRecordsInput) { try { for (KeyspacesStreamsClientRecord record : processRecordsInput.records()) { Record keyspacesRecord = record.getRecord(); System.out.println("Received record: " + keyspacesRecord); } if (!processRecordsInput.records().isEmpty()) { RecordProcessorCheckpointer checkpointer = processRecordsInput.checkpointer(); try { checkpointer.checkpoint(); System.out.println("Checkpoint successful for shard: " + shardId); } catch (Exception e) { System.out.println("Error while checkpointing for shard: " + shardId + " " + e); } } } catch (Exception e) { System.out.println("Error processing records for shard: " + shardId + " " + e); } } @Override public void leaseLost(LeaseLostInput leaseLostInput) { System.out.println("Lease lost for shard: " + shardId); } @Override public void shardEnded(ShardEndedInput shardEndedInput) { System.out.println("Shard ended: " + shardId); try { // This is required. Checkpoint at the end of the shard shardEndedInput.checkpointer().checkpoint(); System.out.println("Final checkpoint successful for shard: " + shardId); } catch (Exception e) { System.out.println("Error while final checkpointing for shard: " + shardId + " " + e); throw new RuntimeException("Error while final checkpointing", e); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { System.out.println("Shutdown requested for shard " + shardId); try { shutdownRequestedInput.checkpointer().checkpoint(); } catch (Exception e) { System.out.println("Error while checkpointing on shutdown for shard: " + shardId + " " + e); } } } -
Créez une fabrique de disques comme indiqué dans l'exemple suivant.
import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; public class RecordProcessorFactory implements ShardRecordProcessorFactory { private final Queue<RecordProcessor> processors = new ConcurrentLinkedQueue<>(); @Override public ShardRecordProcessor shardRecordProcessor() { System.out.println("Creating new RecordProcessor"); RecordProcessor processor = new RecordProcessor(); processors.add(processor); return processor; } } -
Au cours de cette étape, vous créez la classe de base à configurer KCLv3 et l'adaptateur Amazon Keyspaces.
import com.example.KCLExample.utils.RecordProcessorFactory; import software.amazon.keyspaces.streamsadapter.AmazonKeyspacesStreamsAdapterClient; import software.amazon.keyspaces.streamsadapter.StreamsSchedulerFactory; import java.util.Arrays; import java.util.List; import java.util.concurrent.ExecutionException; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.dynamodb.model.DeleteTableRequest; import software.amazon.awssdk.services.dynamodb.model.DeleteTableResponse; import software.amazon.awssdk.services.keyspacesstreams.KeyspacesStreamsClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.kinesis.common.ConfigsBuilder; import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.coordinator.CoordinatorConfig; import software.amazon.kinesis.coordinator.Scheduler; import software.amazon.kinesis.leases.LeaseManagementConfig; import software.amazon.kinesis.processor.ProcessorConfig; import software.amazon.kinesis.processor.StreamTracker; import software.amazon.kinesis.retrieval.polling.PollingConfig; public class KCLTestBase { protected KeyspacesStreamsClient streamsClient; protected KinesisAsyncClient adapterClient; protected DynamoDbAsyncClient dynamoDbAsyncClient; protected CloudWatchAsyncClient cloudWatchClient; protected Region region; protected RecordProcessorFactory recordProcessorFactory; protected Scheduler scheduler; protected Thread schedulerThread; public void baseSetUp() { recordProcessorFactory = new RecordProcessorFactory(); setupKCLBase(); } protected void setupKCLBase() { region = Region.US_EAST_1; streamsClient = KeyspacesStreamsClient.builder() .region(region) .build(); adapterClient = new AmazonKeyspacesStreamsAdapterClient( streamsClient, region); dynamoDbAsyncClient = DynamoDbAsyncClient.builder() .region(region) .build(); cloudWatchClient = CloudWatchAsyncClient.builder() .region(region) .build(); } protected void startScheduler(Scheduler scheduler) { this.scheduler = scheduler; schedulerThread = new Thread(() -> scheduler.run()); schedulerThread.start(); } protected void shutdownScheduler() { if (scheduler != null) { scheduler.shutdown(); try { schedulerThread.join(30000); } catch (InterruptedException e) { System.out.println("Error while shutting down scheduler " + e); } } } protected Scheduler createScheduler(String streamArn, String leaseTableName) { String workerId = "worker-" + System.currentTimeMillis(); // Create ConfigsBuilder ConfigsBuilder configsBuilder = createConfigsBuilder(streamArn, workerId, leaseTableName); // Configure retrieval config for polling PollingConfig pollingConfig = new PollingConfig(streamArn, adapterClient); // Create the Scheduler return StreamsSchedulerFactory.createScheduler( configsBuilder.checkpointConfig(), configsBuilder.coordinatorConfig(), configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), configsBuilder.processorConfig(), configsBuilder.retrievalConfig().retrievalSpecificConfig(pollingConfig), streamsClient, region ); } private ConfigsBuilder createConfigsBuilder(String streamArn, String workerId, String leaseTableName) { ConfigsBuilder configsBuilder = new ConfigsBuilder( streamArn, leaseTableName, adapterClient, dynamoDbAsyncClient, cloudWatchClient, workerId, recordProcessorFactory); configureCoordinator(configsBuilder.coordinatorConfig()); configureLeaseManagement(configsBuilder.leaseManagementConfig()); configureProcessor(configsBuilder.processorConfig()); configureStreamTracker(configsBuilder, streamArn); return configsBuilder; } private void configureCoordinator(CoordinatorConfig config) { config.skipShardSyncAtWorkerInitializationIfLeasesExist(true) .parentShardPollIntervalMillis(1000) .shardConsumerDispatchPollIntervalMillis(500); } private void configureLeaseManagement(LeaseManagementConfig config) { config.shardSyncIntervalMillis(0) .leasesRecoveryAuditorInconsistencyConfidenceThreshold(0) .leasesRecoveryAuditorExecutionFrequencyMillis(5000) .leaseAssignmentIntervalMillis(1000L); } private void configureProcessor(ProcessorConfig config) { config.callProcessRecordsEvenForEmptyRecordList(true); } private void configureStreamTracker(ConfigsBuilder configsBuilder, String streamArn) { StreamTracker streamTracker = StreamsSchedulerFactory.createSingleStreamTracker( streamArn, InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON) ); configsBuilder.streamTracker(streamTracker); } public void deleteAllDdbTables(String baseTableName) { List<String> tablesToDelete = Arrays.asList( baseTableName, baseTableName + "-CoordinatorState", baseTableName + "-WorkerMetricStats" ); for (String tableName : tablesToDelete) { deleteTable(tableName); } } private void deleteTable(String tableName) { DeleteTableRequest deleteTableRequest = DeleteTableRequest.builder() .tableName(tableName) .build(); try { DeleteTableResponse response = dynamoDbAsyncClient.deleteTable(deleteTableRequest).get(); System.out.println("Table deletion response " + response); } catch (InterruptedException | ExecutionException e) { System.out.println("Error deleting table: " + tableName + " " + e); } } } -
Au cours de cette étape, vous implémentez la classe de processeur d'enregistrements pour que votre application commence à traiter les événements de modification.
import software.amazon.kinesis.coordinator.Scheduler; public class KCLTest { private static final int APP_RUNTIME_SECONDS = 1800; private static final int SLEEP_INTERNAL_MS = 60*1000; public static void main(String[] args) { KCLTestBase kclTestBase; kclTestBase = new KCLTestBase(); kclTestBase.baseSetUp(); // Create and start scheduler String leaseTableName = generateUniqueApplicationName(); // Update below to your Stream ARN String streamArn = "arn:aws:cassandra:us-east-1:759151643516:/keyspace/cdc_sample_test/table/test_kcl_bool/stream/2025-07-01T15:52:57.529"; Scheduler scheduler = kclTestBase.createScheduler(streamArn, leaseTableName); kclTestBase.startScheduler(scheduler); // Wait for specified time before shutting down - KCL applications are designed to run forever, however in this // example we will shut it down after APP_RUNTIME_SECONDS long startTime = System.currentTimeMillis(); long endTime = startTime + (APP_RUNTIME_SECONDS * 1000); while (System.currentTimeMillis() < endTime) { try { // Print and sleep every minute Thread.sleep(SLEEP_INTERNAL_MS); System.out.println("Application is running"); } catch (InterruptedException e) { System.out.println("Interrupted while waiting for records"); Thread.currentThread().interrupt(); break; } } // Stop the scheduler kclTestBase.shutdownScheduler(); kclTestBase.deleteAllDdbTables(leaseTableName); } public static String generateUniqueApplicationName() { String timestamp = String.valueOf(System.currentTimeMillis()); String randomString = java.util.UUID.randomUUID().toString().substring(0, 8); return String.format("KCL-App-%s-%s", timestamp, randomString); } }
Bonnes pratiques
Suivez ces bonnes pratiques lorsque vous utilisez KCL avec les flux CDC d'Amazon Keyspaces :
- Gestion des erreurs
-
Implémentez une gestion robuste des erreurs dans votre processeur d'enregistrements afin de gérer les exceptions avec élégance. Envisagez d'implémenter une logique de nouvelle tentative pour les échecs transitoires.
- Fréquence de pointage
-
Équilibrez la fréquence des points de contrôle afin de minimiser le double traitement tout en garantissant un suivi raisonnable des progrès. Des points de contrôle trop fréquents peuvent avoir un impact sur les performances, tandis que des points de contrôle trop peu fréquents peuvent entraîner un retraitement supplémentaire en cas de défaillance d'un travailleur.
- Dimensionnement des effectifs
-
Dimensionnez le nombre de travailleurs en fonction du nombre de fragments dans votre flux CDC. Un bon point de départ est d'avoir un travailleur par partition, mais vous devrez peut-être procéder à des ajustements en fonction de vos exigences de traitement.
- Surveillance
-
Utilisez CloudWatch les indicateurs fournis par KCL pour surveiller l'état et les performances de votre application grand public. Les indicateurs clés incluent la latence de traitement, l'âge des points de contrôle et le nombre de baux.
- Test
-
Testez votre application grand public de manière approfondie, notamment dans le cas de scénarios tels que les défaillances d'un opérateur, le repartage de flux et les conditions de charge variables.
Utilisation de KCL avec des langages autres que Java
Bien que KCL soit principalement une bibliothèque Java, vous pouvez l'utiliser avec d'autres langages de programmation via le MultiLangDaemon. MultiLangDaemon Il s'agit d'un démon basé sur Java qui gère l'interaction entre votre processeur d'enregistrement non Java et le KCL.
KCL fournit un support pour les langues suivantes :
-
Python
-
Ruby
-
Node.js
-
.NET
Pour plus d'informations sur l'utilisation de KCL avec des langages autres que Java, consultez la documentation MultiLangDaemon KCL
Résolution des problèmes
Cette section fournit des solutions aux problèmes courants que vous pouvez rencontrer lors de l'utilisation de KCL avec les flux CDC d'Amazon Keyspaces.
- Traitement lent
-
Si votre application client traite les dossiers lentement, pensez à :
-
Augmenter le nombre d'instances de travailleurs
-
Optimisation de votre logique de traitement des dossiers
-
Vérification de l'absence de goulots d'étranglement dans les systèmes en aval
-
- Traitement des doublons
-
Si vous constatez un traitement dupliqué des enregistrements, vérifiez votre logique de pointage des points de contrôle. Assurez-vous de vérifier après avoir traité les enregistrements avec succès.
- Défaillances des travailleurs
-
Si les travailleurs échouent fréquemment, vérifiez :
-
Contraintes de ressources (processeur, mémoire)
-
Problèmes de connectivité réseau
-
Problèmes d’autorisations
-
- Problèmes liés aux tables de location
-
Si vous rencontrez des problèmes avec le tableau des baux KCL :
-
Vérifiez que votre application dispose des autorisations appropriées pour accéder au tableau Amazon Keyspaces
-
Vérifiez que le débit provisionné de la table est suffisant
-