Amazon Keyspaces CDC ストリーム用の KCL コンシューマーアプリケーションの実装 - Amazon Keyspaces (Apache Cassandra 向け)

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Amazon Keyspaces CDC ストリーム用の KCL コンシューマーアプリケーションの実装

このトピックでは、Amazon Keyspaces CDC ストリームを処理する KCL コンシューマーアプリケーションを実装するためのstep-by-stepガイドを提供します。

  1. 前提条件: 開始する前に、以下を確認してください。

  2. このステップでは、KCL 依存関係をプロジェクトに追加します。Maven の場合は、pom.xml に以下を追加します。

    <dependencies> <dependency> <groupId>software.amazon.kinesis</groupId> <artifactId>amazon-kinesis-client</artifactId> <version>3.1.0</version> </dependency> <dependency> <groupId>software.amazon.keyspaces</groupId> <artifactId>keyspaces-streams-kinesis-adapter</artifactId> <version>1.0.0</version> </dependency> </dependencies>
    注記

    常に KCL の最新バージョンを KCL GitHub リポジトリで確認してください。

  3. レコードプロセッサインスタンスを生成するファクトリクラスを作成します。

    import software.amazon.awssdk.services.keyspacesstreams.model.Record; import software.amazon.keyspaces.streamsadapter.adapter.KeyspacesStreamsClientRecord; import software.amazon.keyspaces.streamsadapter.model.KeyspacesStreamsProcessRecordsInput; import software.amazon.keyspaces.streamsadapter.processor.KeyspacesStreamsShardRecordProcessor; import software.amazon.kinesis.lifecycle.events.InitializationInput; import software.amazon.kinesis.lifecycle.events.LeaseLostInput; import software.amazon.kinesis.lifecycle.events.ShardEndedInput; import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput; import software.amazon.kinesis.processor.RecordProcessorCheckpointer; public class RecordProcessor implements KeyspacesStreamsShardRecordProcessor { private String shardId; @Override public void initialize(InitializationInput initializationInput) { this.shardId = initializationInput.shardId(); System.out.println("Initializing record processor for shard: " + shardId); } @Override public void processRecords(KeyspacesStreamsProcessRecordsInput processRecordsInput) { try { for (KeyspacesStreamsClientRecord record : processRecordsInput.records()) { Record keyspacesRecord = record.getRecord(); System.out.println("Received record: " + keyspacesRecord); } if (!processRecordsInput.records().isEmpty()) { RecordProcessorCheckpointer checkpointer = processRecordsInput.checkpointer(); try { checkpointer.checkpoint(); System.out.println("Checkpoint successful for shard: " + shardId); } catch (Exception e) { System.out.println("Error while checkpointing for shard: " + shardId + " " + e); } } } catch (Exception e) { System.out.println("Error processing records for shard: " + shardId + " " + e); } } @Override public void leaseLost(LeaseLostInput leaseLostInput) { System.out.println("Lease lost for shard: " + shardId); } @Override public void shardEnded(ShardEndedInput shardEndedInput) { System.out.println("Shard ended: " + shardId); try { // This is required. Checkpoint at the end of the shard shardEndedInput.checkpointer().checkpoint(); System.out.println("Final checkpoint successful for shard: " + shardId); } catch (Exception e) { System.out.println("Error while final checkpointing for shard: " + shardId + " " + e); throw new RuntimeException("Error while final checkpointing", e); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { System.out.println("Shutdown requested for shard " + shardId); try { shutdownRequestedInput.checkpointer().checkpoint(); } catch (Exception e) { System.out.println("Error while checkpointing on shutdown for shard: " + shardId + " " + e); } } }
  4. 次の例に示すように、レコードファクトリを作成します。

    import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; public class RecordProcessorFactory implements ShardRecordProcessorFactory { private final Queue<RecordProcessor> processors = new ConcurrentLinkedQueue<>(); @Override public ShardRecordProcessor shardRecordProcessor() { System.out.println("Creating new RecordProcessor"); RecordProcessor processor = new RecordProcessor(); processors.add(processor); return processor; } }
  5. このステップでは、KCLv3 と Amazon Keyspaces アダプターを設定するベースクラスを作成します。

    import com.example.KCLExample.utils.RecordProcessorFactory; import software.amazon.keyspaces.streamsadapter.AmazonKeyspacesStreamsAdapterClient; import software.amazon.keyspaces.streamsadapter.StreamsSchedulerFactory; import java.util.Arrays; import java.util.List; import java.util.concurrent.ExecutionException; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.dynamodb.model.DeleteTableRequest; import software.amazon.awssdk.services.dynamodb.model.DeleteTableResponse; import software.amazon.awssdk.services.keyspacesstreams.KeyspacesStreamsClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.kinesis.common.ConfigsBuilder; import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.coordinator.CoordinatorConfig; import software.amazon.kinesis.coordinator.Scheduler; import software.amazon.kinesis.leases.LeaseManagementConfig; import software.amazon.kinesis.processor.ProcessorConfig; import software.amazon.kinesis.processor.StreamTracker; import software.amazon.kinesis.retrieval.polling.PollingConfig; public class KCLTestBase { protected KeyspacesStreamsClient streamsClient; protected KinesisAsyncClient adapterClient; protected DynamoDbAsyncClient dynamoDbAsyncClient; protected CloudWatchAsyncClient cloudWatchClient; protected Region region; protected RecordProcessorFactory recordProcessorFactory; protected Scheduler scheduler; protected Thread schedulerThread; public void baseSetUp() { recordProcessorFactory = new RecordProcessorFactory(); setupKCLBase(); } protected void setupKCLBase() { region = Region.US_EAST_1; streamsClient = KeyspacesStreamsClient.builder() .region(region) .build(); adapterClient = new AmazonKeyspacesStreamsAdapterClient( streamsClient, region); dynamoDbAsyncClient = DynamoDbAsyncClient.builder() .region(region) .build(); cloudWatchClient = CloudWatchAsyncClient.builder() .region(region) .build(); } protected void startScheduler(Scheduler scheduler) { this.scheduler = scheduler; schedulerThread = new Thread(() -> scheduler.run()); schedulerThread.start(); } protected void shutdownScheduler() { if (scheduler != null) { scheduler.shutdown(); try { schedulerThread.join(30000); } catch (InterruptedException e) { System.out.println("Error while shutting down scheduler " + e); } } } protected Scheduler createScheduler(String streamArn, String leaseTableName) { String workerId = "worker-" + System.currentTimeMillis(); // Create ConfigsBuilder ConfigsBuilder configsBuilder = createConfigsBuilder(streamArn, workerId, leaseTableName); // Configure retrieval config for polling PollingConfig pollingConfig = new PollingConfig(streamArn, adapterClient); // Create the Scheduler return StreamsSchedulerFactory.createScheduler( configsBuilder.checkpointConfig(), configsBuilder.coordinatorConfig(), configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), configsBuilder.processorConfig(), configsBuilder.retrievalConfig().retrievalSpecificConfig(pollingConfig), streamsClient, region ); } private ConfigsBuilder createConfigsBuilder(String streamArn, String workerId, String leaseTableName) { ConfigsBuilder configsBuilder = new ConfigsBuilder( streamArn, leaseTableName, adapterClient, dynamoDbAsyncClient, cloudWatchClient, workerId, recordProcessorFactory); configureCoordinator(configsBuilder.coordinatorConfig()); configureLeaseManagement(configsBuilder.leaseManagementConfig()); configureProcessor(configsBuilder.processorConfig()); configureStreamTracker(configsBuilder, streamArn); return configsBuilder; } private void configureCoordinator(CoordinatorConfig config) { config.skipShardSyncAtWorkerInitializationIfLeasesExist(true) .parentShardPollIntervalMillis(1000) .shardConsumerDispatchPollIntervalMillis(500); } private void configureLeaseManagement(LeaseManagementConfig config) { config.shardSyncIntervalMillis(0) .leasesRecoveryAuditorInconsistencyConfidenceThreshold(0) .leasesRecoveryAuditorExecutionFrequencyMillis(5000) .leaseAssignmentIntervalMillis(1000L); } private void configureProcessor(ProcessorConfig config) { config.callProcessRecordsEvenForEmptyRecordList(true); } private void configureStreamTracker(ConfigsBuilder configsBuilder, String streamArn) { StreamTracker streamTracker = StreamsSchedulerFactory.createSingleStreamTracker( streamArn, InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON) ); configsBuilder.streamTracker(streamTracker); } public void deleteAllDdbTables(String baseTableName) { List<String> tablesToDelete = Arrays.asList( baseTableName, baseTableName + "-CoordinatorState", baseTableName + "-WorkerMetricStats" ); for (String tableName : tablesToDelete) { deleteTable(tableName); } } private void deleteTable(String tableName) { DeleteTableRequest deleteTableRequest = DeleteTableRequest.builder() .tableName(tableName) .build(); try { DeleteTableResponse response = dynamoDbAsyncClient.deleteTable(deleteTableRequest).get(); System.out.println("Table deletion response " + response); } catch (InterruptedException | ExecutionException e) { System.out.println("Error deleting table: " + tableName + " " + e); } } }
  6. このステップでは、アプリケーションのレコードプロセッサクラスを実装して、変更イベントの処理を開始します。

    import software.amazon.kinesis.coordinator.Scheduler; public class KCLTest { private static final int APP_RUNTIME_SECONDS = 1800; private static final int SLEEP_INTERNAL_MS = 60*1000; public static void main(String[] args) { KCLTestBase kclTestBase; kclTestBase = new KCLTestBase(); kclTestBase.baseSetUp(); // Create and start scheduler String leaseTableName = generateUniqueApplicationName(); // Update below to your Stream ARN String streamArn = "arn:aws:cassandra:us-east-1:759151643516:/keyspace/cdc_sample_test/table/test_kcl_bool/stream/2025-07-01T15:52:57.529"; Scheduler scheduler = kclTestBase.createScheduler(streamArn, leaseTableName); kclTestBase.startScheduler(scheduler); // Wait for specified time before shutting down - KCL applications are designed to run forever, however in this // example we will shut it down after APP_RUNTIME_SECONDS long startTime = System.currentTimeMillis(); long endTime = startTime + (APP_RUNTIME_SECONDS * 1000); while (System.currentTimeMillis() < endTime) { try { // Print and sleep every minute Thread.sleep(SLEEP_INTERNAL_MS); System.out.println("Application is running"); } catch (InterruptedException e) { System.out.println("Interrupted while waiting for records"); Thread.currentThread().interrupt(); break; } } // Stop the scheduler kclTestBase.shutdownScheduler(); kclTestBase.deleteAllDdbTables(leaseTableName); } public static String generateUniqueApplicationName() { String timestamp = String.valueOf(System.currentTimeMillis()); String randomString = java.util.UUID.randomUUID().toString().substring(0, 8); return String.format("KCL-App-%s-%s", timestamp, randomString); } }

ベストプラクティス

Amazon Keyspaces CDC ストリームで KCL を使用する場合は、次のベストプラクティスに従ってください。

エラー処理

レコードプロセッサに堅牢なエラー処理を実装して、例外を適切に処理します。一時的な障害に対して再試行ロジックを実装することを検討してください。

チェックポイントの頻度

チェックポイントの頻度を調整して、適切な進行状況の追跡を確保しながら、重複処理を最小限に抑えます。チェックポイントが頻繁すぎるとパフォーマンスに影響する可能性がありますが、チェックポイントが頻繁すぎると、ワーカーが失敗した場合に再処理が増える可能性があります。

ワーカースケーリング

CDC ストリーム内のシャードの数に基づいてワーカーの数をスケールします。まず、シャードごとにワーカーを 1 人配置することをお勧めしますが、処理要件に基づいて調整する必要がある場合があります。

モニタリング

KCL が提供する CloudWatch メトリクスを使用して、コンシューマーアプリケーションのヘルスとパフォーマンスをモニタリングします。主なメトリクスには、処理レイテンシー、チェックポイント経過時間、リース数が含まれます。

テスト

ワーカーの障害、ストリームのリシャーディング、さまざまな負荷条件などのシナリオを含め、コンシューマーアプリケーションを徹底的にテストします。

Java 以外の言語で KCL を使用する

KCL は主に Java ライブラリですが、MultiLangDaemon を通じて他のプログラミング言語で使用できます。MultiLangDaemon は、非 Java レコードプロセッサと KCL 間のインタラクションを管理する Java ベースのデーモンです。

KCL では、次の言語がサポートされています。

  • Python

  • Ruby

  • Node.js

  • .NET

Java 以外の言語で KCL を使用する方法の詳細については、KCL MultiLangDaemon ドキュメントを参照してください。

トラブルシューティング

このセクションでは、Amazon Keyspaces CDC ストリームで KCL を使用する際に発生する可能性がある一般的な問題の解決策を示します。

処理が遅い

コンシューマーアプリケーションでレコードの処理が遅い場合は、次の点を考慮してください。

  • ワーカーインスタンスの数を増やす

  • レコード処理ロジックの最適化

  • ダウンストリームシステムのボトルネックの確認

重複処理

レコードの処理が重複している場合は、チェックポイントロジックを確認してください。レコードを正常に処理した後、チェックポイントに入っていることを確認します。

ワーカーの障害

ワーカーが頻繁に失敗する場合は、以下を確認してください。

  • リソース制約 (CPU、メモリ)

  • ネットワーク接続の問題

  • アクセス許可の問題

リーステーブルの問題

KCL リーステーブルで問題が発生した場合:

  • アプリケーションに Amazon Keyspaces テーブルにアクセスするための適切なアクセス許可があることを確認します。

  • テーブルに十分なプロビジョニングされたスループットがあることを確認する