Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.
Implementieren Sie eine KCL-Consumer-Anwendung für Amazon Keyspaces CDC-Streams
Dieses Thema enthält eine step-by-step Anleitung zur Implementierung einer KCL-Consumer-Anwendung zur Verarbeitung von Amazon Keyspaces CDC-Streams.
-
Voraussetzungen: Bevor Sie beginnen, stellen Sie sicher, dass Sie über Folgendes verfügen:
-
Eine Amazon Keyspaces-Tabelle mit einem CDC-Stream
-
Erforderliche IAM-Berechtigungen für den IAM-Principal, um auf den Amazon Keyspaces CDC-Stream zuzugreifen, DynamoDB-Tabellen für die KCL-Stream-Verarbeitung zu erstellen und darauf zuzugreifen, sowie Berechtigungen zum Veröffentlichen von Metriken in. CloudWatch Weitere Informationen und ein Richtlinienbeispiel finden Sie unter. Berechtigungen zur Verarbeitung von Amazon Keyspaces CDC-Streams mit der Kinesis Client Library (KCL)
Stellen Sie sicher, dass in Ihrer lokalen Konfiguration gültige AWS Anmeldeinformationen eingerichtet sind. Weitere Informationen finden Sie unter Speichern Sie die Zugriffstasten für den programmatischen Zugriff.
-
Java Development Kit (JDK) 8 oder höher
-
Die Anforderungen sind in der Readme-Datei
auf Github aufgeführt.
-
-
In diesem Schritt fügen Sie Ihrem Projekt die KCL-Abhängigkeit hinzu. Für Maven fügen Sie Ihrer pom.xml Folgendes hinzu:
<dependencies> <dependency> <groupId>software.amazon.kinesis</groupId> <artifactId>amazon-kinesis-client</artifactId> <version>3.1.0</version> </dependency> <dependency> <groupId>software.amazon.keyspaces</groupId> <artifactId>keyspaces-streams-kinesis-adapter</artifactId> <version>1.0.0</version> </dependency> </dependencies>Anmerkung
Suchen Sie im KCL-Repository immer nach der neuesten Version von GitHub KCL
. -
Erstellen Sie eine Factory-Klasse, die Datensatzprozessor-Instanzen erzeugt:
import software.amazon.awssdk.services.keyspacesstreams.model.Record; import software.amazon.keyspaces.streamsadapter.adapter.KeyspacesStreamsClientRecord; import software.amazon.keyspaces.streamsadapter.model.KeyspacesStreamsProcessRecordsInput; import software.amazon.keyspaces.streamsadapter.processor.KeyspacesStreamsShardRecordProcessor; import software.amazon.kinesis.lifecycle.events.InitializationInput; import software.amazon.kinesis.lifecycle.events.LeaseLostInput; import software.amazon.kinesis.lifecycle.events.ShardEndedInput; import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput; import software.amazon.kinesis.processor.RecordProcessorCheckpointer; public class RecordProcessor implements KeyspacesStreamsShardRecordProcessor { private String shardId; @Override public void initialize(InitializationInput initializationInput) { this.shardId = initializationInput.shardId(); System.out.println("Initializing record processor for shard: " + shardId); } @Override public void processRecords(KeyspacesStreamsProcessRecordsInput processRecordsInput) { try { for (KeyspacesStreamsClientRecord record : processRecordsInput.records()) { Record keyspacesRecord = record.getRecord(); System.out.println("Received record: " + keyspacesRecord); } if (!processRecordsInput.records().isEmpty()) { RecordProcessorCheckpointer checkpointer = processRecordsInput.checkpointer(); try { checkpointer.checkpoint(); System.out.println("Checkpoint successful for shard: " + shardId); } catch (Exception e) { System.out.println("Error while checkpointing for shard: " + shardId + " " + e); } } } catch (Exception e) { System.out.println("Error processing records for shard: " + shardId + " " + e); } } @Override public void leaseLost(LeaseLostInput leaseLostInput) { System.out.println("Lease lost for shard: " + shardId); } @Override public void shardEnded(ShardEndedInput shardEndedInput) { System.out.println("Shard ended: " + shardId); try { // This is required. Checkpoint at the end of the shard shardEndedInput.checkpointer().checkpoint(); System.out.println("Final checkpoint successful for shard: " + shardId); } catch (Exception e) { System.out.println("Error while final checkpointing for shard: " + shardId + " " + e); throw new RuntimeException("Error while final checkpointing", e); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { System.out.println("Shutdown requested for shard " + shardId); try { shutdownRequestedInput.checkpointer().checkpoint(); } catch (Exception e) { System.out.println("Error while checkpointing on shutdown for shard: " + shardId + " " + e); } } } -
Erstellen Sie eine Record Factory, wie im folgenden Beispiel gezeigt.
import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; public class RecordProcessorFactory implements ShardRecordProcessorFactory { private final Queue<RecordProcessor> processors = new ConcurrentLinkedQueue<>(); @Override public ShardRecordProcessor shardRecordProcessor() { System.out.println("Creating new RecordProcessor"); RecordProcessor processor = new RecordProcessor(); processors.add(processor); return processor; } } -
In diesem Schritt erstellen Sie die zu konfigurierende Basisklasse KCLv3 und den Amazon Keyspaces-Adapter.
import com.example.KCLExample.utils.RecordProcessorFactory; import software.amazon.keyspaces.streamsadapter.AmazonKeyspacesStreamsAdapterClient; import software.amazon.keyspaces.streamsadapter.StreamsSchedulerFactory; import java.util.Arrays; import java.util.List; import java.util.concurrent.ExecutionException; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.dynamodb.model.DeleteTableRequest; import software.amazon.awssdk.services.dynamodb.model.DeleteTableResponse; import software.amazon.awssdk.services.keyspacesstreams.KeyspacesStreamsClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.kinesis.common.ConfigsBuilder; import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.coordinator.CoordinatorConfig; import software.amazon.kinesis.coordinator.Scheduler; import software.amazon.kinesis.leases.LeaseManagementConfig; import software.amazon.kinesis.processor.ProcessorConfig; import software.amazon.kinesis.processor.StreamTracker; import software.amazon.kinesis.retrieval.polling.PollingConfig; public class KCLTestBase { protected KeyspacesStreamsClient streamsClient; protected KinesisAsyncClient adapterClient; protected DynamoDbAsyncClient dynamoDbAsyncClient; protected CloudWatchAsyncClient cloudWatchClient; protected Region region; protected RecordProcessorFactory recordProcessorFactory; protected Scheduler scheduler; protected Thread schedulerThread; public void baseSetUp() { recordProcessorFactory = new RecordProcessorFactory(); setupKCLBase(); } protected void setupKCLBase() { region = Region.US_EAST_1; streamsClient = KeyspacesStreamsClient.builder() .region(region) .build(); adapterClient = new AmazonKeyspacesStreamsAdapterClient( streamsClient, region); dynamoDbAsyncClient = DynamoDbAsyncClient.builder() .region(region) .build(); cloudWatchClient = CloudWatchAsyncClient.builder() .region(region) .build(); } protected void startScheduler(Scheduler scheduler) { this.scheduler = scheduler; schedulerThread = new Thread(() -> scheduler.run()); schedulerThread.start(); } protected void shutdownScheduler() { if (scheduler != null) { scheduler.shutdown(); try { schedulerThread.join(30000); } catch (InterruptedException e) { System.out.println("Error while shutting down scheduler " + e); } } } protected Scheduler createScheduler(String streamArn, String leaseTableName) { String workerId = "worker-" + System.currentTimeMillis(); // Create ConfigsBuilder ConfigsBuilder configsBuilder = createConfigsBuilder(streamArn, workerId, leaseTableName); // Configure retrieval config for polling PollingConfig pollingConfig = new PollingConfig(streamArn, adapterClient); // Create the Scheduler return StreamsSchedulerFactory.createScheduler( configsBuilder.checkpointConfig(), configsBuilder.coordinatorConfig(), configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), configsBuilder.processorConfig(), configsBuilder.retrievalConfig().retrievalSpecificConfig(pollingConfig), streamsClient, region ); } private ConfigsBuilder createConfigsBuilder(String streamArn, String workerId, String leaseTableName) { ConfigsBuilder configsBuilder = new ConfigsBuilder( streamArn, leaseTableName, adapterClient, dynamoDbAsyncClient, cloudWatchClient, workerId, recordProcessorFactory); configureCoordinator(configsBuilder.coordinatorConfig()); configureLeaseManagement(configsBuilder.leaseManagementConfig()); configureProcessor(configsBuilder.processorConfig()); configureStreamTracker(configsBuilder, streamArn); return configsBuilder; } private void configureCoordinator(CoordinatorConfig config) { config.skipShardSyncAtWorkerInitializationIfLeasesExist(true) .parentShardPollIntervalMillis(1000) .shardConsumerDispatchPollIntervalMillis(500); } private void configureLeaseManagement(LeaseManagementConfig config) { config.shardSyncIntervalMillis(0) .leasesRecoveryAuditorInconsistencyConfidenceThreshold(0) .leasesRecoveryAuditorExecutionFrequencyMillis(5000) .leaseAssignmentIntervalMillis(1000L); } private void configureProcessor(ProcessorConfig config) { config.callProcessRecordsEvenForEmptyRecordList(true); } private void configureStreamTracker(ConfigsBuilder configsBuilder, String streamArn) { StreamTracker streamTracker = StreamsSchedulerFactory.createSingleStreamTracker( streamArn, InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON) ); configsBuilder.streamTracker(streamTracker); } public void deleteAllDdbTables(String baseTableName) { List<String> tablesToDelete = Arrays.asList( baseTableName, baseTableName + "-CoordinatorState", baseTableName + "-WorkerMetricStats" ); for (String tableName : tablesToDelete) { deleteTable(tableName); } } private void deleteTable(String tableName) { DeleteTableRequest deleteTableRequest = DeleteTableRequest.builder() .tableName(tableName) .build(); try { DeleteTableResponse response = dynamoDbAsyncClient.deleteTable(deleteTableRequest).get(); System.out.println("Table deletion response " + response); } catch (InterruptedException | ExecutionException e) { System.out.println("Error deleting table: " + tableName + " " + e); } } } -
In diesem Schritt implementieren Sie die Datensatzprozessorklasse für Ihre Anwendung, um mit der Verarbeitung von Änderungsereignissen zu beginnen.
import software.amazon.kinesis.coordinator.Scheduler; public class KCLTest { private static final int APP_RUNTIME_SECONDS = 1800; private static final int SLEEP_INTERNAL_MS = 60*1000; public static void main(String[] args) { KCLTestBase kclTestBase; kclTestBase = new KCLTestBase(); kclTestBase.baseSetUp(); // Create and start scheduler String leaseTableName = generateUniqueApplicationName(); // Update below to your Stream ARN String streamArn = "arn:aws:cassandra:us-east-1:759151643516:/keyspace/cdc_sample_test/table/test_kcl_bool/stream/2025-07-01T15:52:57.529"; Scheduler scheduler = kclTestBase.createScheduler(streamArn, leaseTableName); kclTestBase.startScheduler(scheduler); // Wait for specified time before shutting down - KCL applications are designed to run forever, however in this // example we will shut it down after APP_RUNTIME_SECONDS long startTime = System.currentTimeMillis(); long endTime = startTime + (APP_RUNTIME_SECONDS * 1000); while (System.currentTimeMillis() < endTime) { try { // Print and sleep every minute Thread.sleep(SLEEP_INTERNAL_MS); System.out.println("Application is running"); } catch (InterruptedException e) { System.out.println("Interrupted while waiting for records"); Thread.currentThread().interrupt(); break; } } // Stop the scheduler kclTestBase.shutdownScheduler(); kclTestBase.deleteAllDdbTables(leaseTableName); } public static String generateUniqueApplicationName() { String timestamp = String.valueOf(System.currentTimeMillis()); String randomString = java.util.UUID.randomUUID().toString().substring(0, 8); return String.format("KCL-App-%s-%s", timestamp, randomString); } }
Best Practices
Folgen Sie diesen bewährten Methoden, wenn Sie KCL mit Amazon Keyspaces CDC-Streams verwenden:
- Fehlerbehandlung
-
Implementieren Sie eine robuste Fehlerbehandlung in Ihrem Datensatzprozessor, um Ausnahmen ordnungsgemäß zu behandeln. Erwägen Sie die Implementierung einer Wiederholungslogik für vorübergehende Fehler.
- Häufigkeit des Checkpoints
-
Sorgen Sie für eine ausgewogene Häufigkeit der Checkpoints, um doppelte Bearbeitungen zu minimieren und gleichzeitig eine angemessene Fortschrittsverfolgung zu gewährleisten. Zu häufiges Checkpointing kann sich negativ auf die Leistung auswirken, während ein zu seltenes Checkpointing zu mehr Wiederholungen führen kann, wenn ein Mitarbeiter ausfällt.
- Skalierung der Mitarbeiter
-
Skalieren Sie die Anzahl der Worker basierend auf der Anzahl der Shards in Ihrem CDC-Stream. Ein guter Ausgangspunkt ist es, einen Worker pro Shard zu haben. Möglicherweise müssen Sie dies jedoch an Ihre Verarbeitungsanforderungen anpassen.
- Überwachung
-
Verwenden Sie die von KCL bereitgestellten CloudWatch Kennzahlen, um den Zustand und die Leistung Ihrer Verbraucheranwendung zu überwachen. Zu den wichtigsten Kennzahlen gehören die Verarbeitungslatenz, das Alter der Checkpoints und die Anzahl der Leasingverträge.
- Testen
-
Testen Sie Ihre Verbraucheranwendung gründlich, einschließlich Szenarien wie Worker-Ausfällen, Stream-Resharding und unterschiedlichen Lastbedingungen.
Verwenden von KCL mit Nicht-Java-Sprachen
KCL ist zwar in erster Linie eine Java-Bibliothek, Sie können sie jedoch mit anderen Programmiersprachen über die verwenden. MultiLangDaemon Der MultiLangDaemon ist ein Java-basierter Daemon, der die Interaktion zwischen Ihrem Nicht-Java-Recordprozessor und der KCL verwaltet.
KCL bietet Unterstützung für die folgenden Sprachen:
-
Python
-
Ruby
-
Node.js
-
.NET
Fehlerbehebung
Dieser Abschnitt bietet Lösungen für häufig auftretende Probleme, die bei der Verwendung von KCL mit Amazon Keyspaces CDC-Streams auftreten können.
- Langsame Verarbeitung
-
Wenn Ihre Verbraucheranwendung Daten langsam verarbeitet, sollten Sie Folgendes berücksichtigen:
-
Erhöhung der Anzahl der Worker-Instanzen
-
Optimieren Sie Ihre Logik zur Verarbeitung von Datensätzen
-
Überprüfung auf Engpässe in nachgeschalteten Systemen
-
- Doppelte Verarbeitung
-
Wenn Sie feststellen, dass Datensätze doppelt verarbeitet werden, überprüfen Sie Ihre Checkpoint-Logik. Stellen Sie sicher, dass Sie nach erfolgreicher Verarbeitung von Datensätzen ein Checkpoint ausführen.
- Ausfälle von Arbeitern
-
Wenn Mitarbeiter häufig ausfallen, überprüfen Sie Folgendes:
-
Ressourceneinschränkungen (CPU, Arbeitsspeicher)
-
Probleme mit der Netzwerkkonnektivität
-
Probleme mit den Berechtigungen
-
- Probleme mit der Leasetabelle
-
Wenn Sie Probleme mit der KCL-Leasetabelle haben:
-
Überprüfen Sie, ob Ihre Anwendung über die entsprechenden Berechtigungen für den Zugriff auf die Amazon Keyspaces-Tabelle verfügt
-
Stellen Sie sicher, dass die Tabelle über einen ausreichenden bereitgestellten Durchsatz verfügt
-