Implementieren Sie eine KCL-Consumer-Anwendung für Amazon Keyspaces CDC-Streams - Amazon Keyspaces (für Apache Cassandra)

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Implementieren Sie eine KCL-Consumer-Anwendung für Amazon Keyspaces CDC-Streams

Dieses Thema enthält eine step-by-step Anleitung zur Implementierung einer KCL-Consumer-Anwendung zur Verarbeitung von Amazon Keyspaces CDC-Streams.

  1. Voraussetzungen: Bevor Sie beginnen, stellen Sie sicher, dass Sie über Folgendes verfügen:

  2. In diesem Schritt fügen Sie Ihrem Projekt die KCL-Abhängigkeit hinzu. Für Maven fügen Sie Ihrer pom.xml Folgendes hinzu:

    <dependencies> <dependency> <groupId>software.amazon.kinesis</groupId> <artifactId>amazon-kinesis-client</artifactId> <version>3.1.0</version> </dependency> <dependency> <groupId>software.amazon.keyspaces</groupId> <artifactId>keyspaces-streams-kinesis-adapter</artifactId> <version>1.0.0</version> </dependency> </dependencies>
    Anmerkung

    Suchen Sie im KCL-Repository immer nach der neuesten Version von GitHub KCL.

  3. Erstellen Sie eine Factory-Klasse, die Datensatzprozessor-Instanzen erzeugt:

    import software.amazon.awssdk.services.keyspacesstreams.model.Record; import software.amazon.keyspaces.streamsadapter.adapter.KeyspacesStreamsClientRecord; import software.amazon.keyspaces.streamsadapter.model.KeyspacesStreamsProcessRecordsInput; import software.amazon.keyspaces.streamsadapter.processor.KeyspacesStreamsShardRecordProcessor; import software.amazon.kinesis.lifecycle.events.InitializationInput; import software.amazon.kinesis.lifecycle.events.LeaseLostInput; import software.amazon.kinesis.lifecycle.events.ShardEndedInput; import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput; import software.amazon.kinesis.processor.RecordProcessorCheckpointer; public class RecordProcessor implements KeyspacesStreamsShardRecordProcessor { private String shardId; @Override public void initialize(InitializationInput initializationInput) { this.shardId = initializationInput.shardId(); System.out.println("Initializing record processor for shard: " + shardId); } @Override public void processRecords(KeyspacesStreamsProcessRecordsInput processRecordsInput) { try { for (KeyspacesStreamsClientRecord record : processRecordsInput.records()) { Record keyspacesRecord = record.getRecord(); System.out.println("Received record: " + keyspacesRecord); } if (!processRecordsInput.records().isEmpty()) { RecordProcessorCheckpointer checkpointer = processRecordsInput.checkpointer(); try { checkpointer.checkpoint(); System.out.println("Checkpoint successful for shard: " + shardId); } catch (Exception e) { System.out.println("Error while checkpointing for shard: " + shardId + " " + e); } } } catch (Exception e) { System.out.println("Error processing records for shard: " + shardId + " " + e); } } @Override public void leaseLost(LeaseLostInput leaseLostInput) { System.out.println("Lease lost for shard: " + shardId); } @Override public void shardEnded(ShardEndedInput shardEndedInput) { System.out.println("Shard ended: " + shardId); try { // This is required. Checkpoint at the end of the shard shardEndedInput.checkpointer().checkpoint(); System.out.println("Final checkpoint successful for shard: " + shardId); } catch (Exception e) { System.out.println("Error while final checkpointing for shard: " + shardId + " " + e); throw new RuntimeException("Error while final checkpointing", e); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { System.out.println("Shutdown requested for shard " + shardId); try { shutdownRequestedInput.checkpointer().checkpoint(); } catch (Exception e) { System.out.println("Error while checkpointing on shutdown for shard: " + shardId + " " + e); } } }
  4. Erstellen Sie eine Record Factory, wie im folgenden Beispiel gezeigt.

    import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; public class RecordProcessorFactory implements ShardRecordProcessorFactory { private final Queue<RecordProcessor> processors = new ConcurrentLinkedQueue<>(); @Override public ShardRecordProcessor shardRecordProcessor() { System.out.println("Creating new RecordProcessor"); RecordProcessor processor = new RecordProcessor(); processors.add(processor); return processor; } }
  5. In diesem Schritt erstellen Sie die zu konfigurierende Basisklasse KCLv3 und den Amazon Keyspaces-Adapter.

    import com.example.KCLExample.utils.RecordProcessorFactory; import software.amazon.keyspaces.streamsadapter.AmazonKeyspacesStreamsAdapterClient; import software.amazon.keyspaces.streamsadapter.StreamsSchedulerFactory; import java.util.Arrays; import java.util.List; import java.util.concurrent.ExecutionException; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.dynamodb.model.DeleteTableRequest; import software.amazon.awssdk.services.dynamodb.model.DeleteTableResponse; import software.amazon.awssdk.services.keyspacesstreams.KeyspacesStreamsClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.kinesis.common.ConfigsBuilder; import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.coordinator.CoordinatorConfig; import software.amazon.kinesis.coordinator.Scheduler; import software.amazon.kinesis.leases.LeaseManagementConfig; import software.amazon.kinesis.processor.ProcessorConfig; import software.amazon.kinesis.processor.StreamTracker; import software.amazon.kinesis.retrieval.polling.PollingConfig; public class KCLTestBase { protected KeyspacesStreamsClient streamsClient; protected KinesisAsyncClient adapterClient; protected DynamoDbAsyncClient dynamoDbAsyncClient; protected CloudWatchAsyncClient cloudWatchClient; protected Region region; protected RecordProcessorFactory recordProcessorFactory; protected Scheduler scheduler; protected Thread schedulerThread; public void baseSetUp() { recordProcessorFactory = new RecordProcessorFactory(); setupKCLBase(); } protected void setupKCLBase() { region = Region.US_EAST_1; streamsClient = KeyspacesStreamsClient.builder() .region(region) .build(); adapterClient = new AmazonKeyspacesStreamsAdapterClient( streamsClient, region); dynamoDbAsyncClient = DynamoDbAsyncClient.builder() .region(region) .build(); cloudWatchClient = CloudWatchAsyncClient.builder() .region(region) .build(); } protected void startScheduler(Scheduler scheduler) { this.scheduler = scheduler; schedulerThread = new Thread(() -> scheduler.run()); schedulerThread.start(); } protected void shutdownScheduler() { if (scheduler != null) { scheduler.shutdown(); try { schedulerThread.join(30000); } catch (InterruptedException e) { System.out.println("Error while shutting down scheduler " + e); } } } protected Scheduler createScheduler(String streamArn, String leaseTableName) { String workerId = "worker-" + System.currentTimeMillis(); // Create ConfigsBuilder ConfigsBuilder configsBuilder = createConfigsBuilder(streamArn, workerId, leaseTableName); // Configure retrieval config for polling PollingConfig pollingConfig = new PollingConfig(streamArn, adapterClient); // Create the Scheduler return StreamsSchedulerFactory.createScheduler( configsBuilder.checkpointConfig(), configsBuilder.coordinatorConfig(), configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), configsBuilder.processorConfig(), configsBuilder.retrievalConfig().retrievalSpecificConfig(pollingConfig), streamsClient, region ); } private ConfigsBuilder createConfigsBuilder(String streamArn, String workerId, String leaseTableName) { ConfigsBuilder configsBuilder = new ConfigsBuilder( streamArn, leaseTableName, adapterClient, dynamoDbAsyncClient, cloudWatchClient, workerId, recordProcessorFactory); configureCoordinator(configsBuilder.coordinatorConfig()); configureLeaseManagement(configsBuilder.leaseManagementConfig()); configureProcessor(configsBuilder.processorConfig()); configureStreamTracker(configsBuilder, streamArn); return configsBuilder; } private void configureCoordinator(CoordinatorConfig config) { config.skipShardSyncAtWorkerInitializationIfLeasesExist(true) .parentShardPollIntervalMillis(1000) .shardConsumerDispatchPollIntervalMillis(500); } private void configureLeaseManagement(LeaseManagementConfig config) { config.shardSyncIntervalMillis(0) .leasesRecoveryAuditorInconsistencyConfidenceThreshold(0) .leasesRecoveryAuditorExecutionFrequencyMillis(5000) .leaseAssignmentIntervalMillis(1000L); } private void configureProcessor(ProcessorConfig config) { config.callProcessRecordsEvenForEmptyRecordList(true); } private void configureStreamTracker(ConfigsBuilder configsBuilder, String streamArn) { StreamTracker streamTracker = StreamsSchedulerFactory.createSingleStreamTracker( streamArn, InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON) ); configsBuilder.streamTracker(streamTracker); } public void deleteAllDdbTables(String baseTableName) { List<String> tablesToDelete = Arrays.asList( baseTableName, baseTableName + "-CoordinatorState", baseTableName + "-WorkerMetricStats" ); for (String tableName : tablesToDelete) { deleteTable(tableName); } } private void deleteTable(String tableName) { DeleteTableRequest deleteTableRequest = DeleteTableRequest.builder() .tableName(tableName) .build(); try { DeleteTableResponse response = dynamoDbAsyncClient.deleteTable(deleteTableRequest).get(); System.out.println("Table deletion response " + response); } catch (InterruptedException | ExecutionException e) { System.out.println("Error deleting table: " + tableName + " " + e); } } }
  6. In diesem Schritt implementieren Sie die Datensatzprozessorklasse für Ihre Anwendung, um mit der Verarbeitung von Änderungsereignissen zu beginnen.

    import software.amazon.kinesis.coordinator.Scheduler; public class KCLTest { private static final int APP_RUNTIME_SECONDS = 1800; private static final int SLEEP_INTERNAL_MS = 60*1000; public static void main(String[] args) { KCLTestBase kclTestBase; kclTestBase = new KCLTestBase(); kclTestBase.baseSetUp(); // Create and start scheduler String leaseTableName = generateUniqueApplicationName(); // Update below to your Stream ARN String streamArn = "arn:aws:cassandra:us-east-1:759151643516:/keyspace/cdc_sample_test/table/test_kcl_bool/stream/2025-07-01T15:52:57.529"; Scheduler scheduler = kclTestBase.createScheduler(streamArn, leaseTableName); kclTestBase.startScheduler(scheduler); // Wait for specified time before shutting down - KCL applications are designed to run forever, however in this // example we will shut it down after APP_RUNTIME_SECONDS long startTime = System.currentTimeMillis(); long endTime = startTime + (APP_RUNTIME_SECONDS * 1000); while (System.currentTimeMillis() < endTime) { try { // Print and sleep every minute Thread.sleep(SLEEP_INTERNAL_MS); System.out.println("Application is running"); } catch (InterruptedException e) { System.out.println("Interrupted while waiting for records"); Thread.currentThread().interrupt(); break; } } // Stop the scheduler kclTestBase.shutdownScheduler(); kclTestBase.deleteAllDdbTables(leaseTableName); } public static String generateUniqueApplicationName() { String timestamp = String.valueOf(System.currentTimeMillis()); String randomString = java.util.UUID.randomUUID().toString().substring(0, 8); return String.format("KCL-App-%s-%s", timestamp, randomString); } }

Best Practices

Folgen Sie diesen bewährten Methoden, wenn Sie KCL mit Amazon Keyspaces CDC-Streams verwenden:

Fehlerbehandlung

Implementieren Sie eine robuste Fehlerbehandlung in Ihrem Datensatzprozessor, um Ausnahmen ordnungsgemäß zu behandeln. Erwägen Sie die Implementierung einer Wiederholungslogik für vorübergehende Fehler.

Häufigkeit des Checkpoints

Sorgen Sie für eine ausgewogene Häufigkeit der Checkpoints, um doppelte Bearbeitungen zu minimieren und gleichzeitig eine angemessene Fortschrittsverfolgung zu gewährleisten. Zu häufiges Checkpointing kann sich negativ auf die Leistung auswirken, während ein zu seltenes Checkpointing zu mehr Wiederholungen führen kann, wenn ein Mitarbeiter ausfällt.

Skalierung der Mitarbeiter

Skalieren Sie die Anzahl der Worker basierend auf der Anzahl der Shards in Ihrem CDC-Stream. Ein guter Ausgangspunkt ist es, einen Worker pro Shard zu haben. Möglicherweise müssen Sie dies jedoch an Ihre Verarbeitungsanforderungen anpassen.

Überwachung

Verwenden Sie die von KCL bereitgestellten CloudWatch Kennzahlen, um den Zustand und die Leistung Ihrer Verbraucheranwendung zu überwachen. Zu den wichtigsten Kennzahlen gehören die Verarbeitungslatenz, das Alter der Checkpoints und die Anzahl der Leasingverträge.

Testen

Testen Sie Ihre Verbraucheranwendung gründlich, einschließlich Szenarien wie Worker-Ausfällen, Stream-Resharding und unterschiedlichen Lastbedingungen.

Verwenden von KCL mit Nicht-Java-Sprachen

KCL ist zwar in erster Linie eine Java-Bibliothek, Sie können sie jedoch mit anderen Programmiersprachen über die verwenden. MultiLangDaemon Der MultiLangDaemon ist ein Java-basierter Daemon, der die Interaktion zwischen Ihrem Nicht-Java-Recordprozessor und der KCL verwaltet.

KCL bietet Unterstützung für die folgenden Sprachen:

  • Python

  • Ruby

  • Node.js

  • .NET

Weitere Informationen zur Verwendung von KCL mit Nicht-Java-Sprachen finden Sie in der KCL-Dokumentation. MultiLangDaemon

Fehlerbehebung

Dieser Abschnitt bietet Lösungen für häufig auftretende Probleme, die bei der Verwendung von KCL mit Amazon Keyspaces CDC-Streams auftreten können.

Langsame Verarbeitung

Wenn Ihre Verbraucheranwendung Daten langsam verarbeitet, sollten Sie Folgendes berücksichtigen:

  • Erhöhung der Anzahl der Worker-Instanzen

  • Optimieren Sie Ihre Logik zur Verarbeitung von Datensätzen

  • Überprüfung auf Engpässe in nachgeschalteten Systemen

Doppelte Verarbeitung

Wenn Sie feststellen, dass Datensätze doppelt verarbeitet werden, überprüfen Sie Ihre Checkpoint-Logik. Stellen Sie sicher, dass Sie nach erfolgreicher Verarbeitung von Datensätzen ein Checkpoint ausführen.

Ausfälle von Arbeitern

Wenn Mitarbeiter häufig ausfallen, überprüfen Sie Folgendes:

  • Ressourceneinschränkungen (CPU, Arbeitsspeicher)

  • Probleme mit der Netzwerkkonnektivität

  • Probleme mit den Berechtigungen

Probleme mit der Leasetabelle

Wenn Sie Probleme mit der KCL-Leasetabelle haben:

  • Überprüfen Sie, ob Ihre Anwendung über die entsprechenden Berechtigungen für den Zugriff auf die Amazon Keyspaces-Tabelle verfügt

  • Stellen Sie sicher, dass die Tabelle über einen ausreichenden bereitgestellten Durchsatz verfügt