

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 为 Amazon Keyspaces CDC 直播实施 KCL 消费者应用程序
<a name="cdc-kcl-implementation"></a>

本主题提供了实施 KCL 消费者应用程序以处理 Amazon Keyspaces CDC 流的分步指南。

1. 先决条件：在开始之前，请确保您具备：
   + 带有 CDC 直播的 Amazon Keyspaces 表
   + IAM 委托人必须拥有 IAM 权限，才能访问 Amazon Keyspaces CDC 流、创建和访问用于 KCL 流处理的 DynamoDB 表以及向其发布指标的权限。 CloudWatch有关更多信息和策略示例，请参阅[使用 Kinesis 客户端库 (KCL) 处理 Amazon Keyspaces CDC 直播的权限](configure-cdc-permissions.md#cdc-permissions-kcl)。
   + 确保在本地配置中设置了有效的 AWS 凭证。有关更多信息，请参阅 [存储用于通过编程方式进行访问的访问密钥](aws.credentials.manage.md)。
   + Java Development Kit（JDK）8 或更高版本
   + 要求列在 Github 上的[自述文件](https://github.com/aws/keyspaces-streams-kinesis-adapter)中。

1. <a name="cdc-kcl-add-dependencies"></a>在此步骤中，您将 KCL 依赖项添加到您的项目中。对于 Maven，请将以下内容添加到你的 pom.xml 中：

   ```
   <dependencies>
           <dependency>
               <groupId>software.amazon.kinesis</groupId>
               <artifactId>amazon-kinesis-client</artifactId>
               <version>3.1.0</version>
           </dependency>
           <dependency>
               <groupId>software.amazon.keyspaces</groupId>
               <artifactId>keyspaces-streams-kinesis-adapter</artifactId>
               <version>1.0.0</version>
           </dependency>
       </dependencies>
   ```
**注意**  
请务必在 KCL 存储库中查看最新版本的 [KCL GitHub ](https://github.com/awslabs/amazon-kinesis-client)。

1. <a name="cdc-kcl-factory"></a>创建一个生成记录处理器实例的工厂类：

   ```
   import software.amazon.awssdk.services.keyspacesstreams.model.Record;
   import software.amazon.keyspaces.streamsadapter.adapter.KeyspacesStreamsClientRecord;
   import software.amazon.keyspaces.streamsadapter.model.KeyspacesStreamsProcessRecordsInput;
   import software.amazon.keyspaces.streamsadapter.processor.KeyspacesStreamsShardRecordProcessor;
   import software.amazon.kinesis.lifecycle.events.InitializationInput;
   import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
   import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
   import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
   import software.amazon.kinesis.processor.RecordProcessorCheckpointer;
   
   public class RecordProcessor implements KeyspacesStreamsShardRecordProcessor {
       private String shardId;
   
       @Override
       public void initialize(InitializationInput initializationInput) {
           this.shardId = initializationInput.shardId();
           System.out.println("Initializing record processor for shard: " + shardId);
       }
   
       @Override
       public void processRecords(KeyspacesStreamsProcessRecordsInput processRecordsInput) {
           try {
               for (KeyspacesStreamsClientRecord record : processRecordsInput.records()) {
                   Record keyspacesRecord = record.getRecord();
                   System.out.println("Received record: " + keyspacesRecord);
               }
   
               if (!processRecordsInput.records().isEmpty()) {
                   RecordProcessorCheckpointer checkpointer = processRecordsInput.checkpointer();
                   try {
                       checkpointer.checkpoint();
                       System.out.println("Checkpoint successful for shard: " + shardId);
                   } catch (Exception e) {
                       System.out.println("Error while checkpointing for shard: " + shardId + " " + e);
                   }
               }
           } catch (Exception e) {
               System.out.println("Error processing records for shard: " + shardId + " " + e);
           }
       }
   
       @Override
       public void leaseLost(LeaseLostInput leaseLostInput) {
           System.out.println("Lease lost for shard: " + shardId);
       }
   
       @Override
       public void shardEnded(ShardEndedInput shardEndedInput) {
           System.out.println("Shard ended: " + shardId);
           try {
               // This is required. Checkpoint at the end of the shard
               shardEndedInput.checkpointer().checkpoint();
               System.out.println("Final checkpoint successful for shard: " + shardId);
           } catch (Exception e) {
               System.out.println("Error while final checkpointing for shard: " + shardId + " " + e);
               throw new RuntimeException("Error while final checkpointing", e);
           }
       }
   
       @Override
       public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
           System.out.println("Shutdown requested for shard " + shardId);
           try {
               shutdownRequestedInput.checkpointer().checkpoint();
           } catch (Exception e) {
               System.out.println("Error while checkpointing on shutdown for shard: " + shardId + " " + e);
           }
       }
   }
   ```

1. <a name="cdc-kcl-record-factory"></a>创建记录工厂，如以下示例所示。

   ```
   import software.amazon.kinesis.processor.ShardRecordProcessor;
   import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
   
   import java.util.Queue;
   import java.util.concurrent.ConcurrentLinkedQueue;
   
   public class RecordProcessorFactory implements ShardRecordProcessorFactory {
       private final Queue<RecordProcessor> processors = new ConcurrentLinkedQueue<>();
   
       @Override
       public ShardRecordProcessor shardRecordProcessor() {
           System.out.println("Creating new RecordProcessor");
           RecordProcessor processor = new RecordProcessor();
           processors.add(processor);
           return processor;
       }
   }
   ```

1. <a name="cdc-kcl-consumer"></a>在此步骤中，您将创建用于配置 KCLv3 和 Amazon Keyspaces 适配器的基类。

   ```
   import com.example.KCLExample.utils.RecordProcessorFactory;
   import software.amazon.keyspaces.streamsadapter.AmazonKeyspacesStreamsAdapterClient;
   import software.amazon.keyspaces.streamsadapter.StreamsSchedulerFactory;
   import java.util.Arrays;
   import java.util.List;
   import java.util.concurrent.ExecutionException;
   
   import software.amazon.awssdk.regions.Region;
   import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
   import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
   import software.amazon.awssdk.services.dynamodb.model.DeleteTableRequest;
   import software.amazon.awssdk.services.dynamodb.model.DeleteTableResponse;
   import software.amazon.awssdk.services.keyspacesstreams.KeyspacesStreamsClient;
   import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
   import software.amazon.kinesis.common.ConfigsBuilder;
   import software.amazon.kinesis.common.InitialPositionInStream;
   import software.amazon.kinesis.common.InitialPositionInStreamExtended;
   import software.amazon.kinesis.coordinator.CoordinatorConfig;
   import software.amazon.kinesis.coordinator.Scheduler;
   import software.amazon.kinesis.leases.LeaseManagementConfig;
   import software.amazon.kinesis.processor.ProcessorConfig;
   import software.amazon.kinesis.processor.StreamTracker;
   import software.amazon.kinesis.retrieval.polling.PollingConfig;
   
   public class KCLTestBase {
   
       protected KeyspacesStreamsClient streamsClient;
       protected KinesisAsyncClient adapterClient;
       protected DynamoDbAsyncClient dynamoDbAsyncClient;
       protected CloudWatchAsyncClient cloudWatchClient;
       protected Region region;
       protected RecordProcessorFactory recordProcessorFactory;
       protected Scheduler scheduler;
       protected Thread schedulerThread;
   
       public void baseSetUp() {
           recordProcessorFactory = new RecordProcessorFactory();
           setupKCLBase();
       }
   
       protected void setupKCLBase() {
           region = Region.US_EAST_1;
   
           streamsClient = KeyspacesStreamsClient.builder()
                   .region(region)
                   .build();
           adapterClient = new AmazonKeyspacesStreamsAdapterClient(
                   streamsClient,
                   region);
           dynamoDbAsyncClient = DynamoDbAsyncClient.builder()
                   .region(region)
                   .build();
           cloudWatchClient = CloudWatchAsyncClient.builder()
                   .region(region)
                   .build();
       }
   
       protected void startScheduler(Scheduler scheduler) {
           this.scheduler = scheduler;
           schedulerThread = new Thread(() -> scheduler.run());
           schedulerThread.start();
       }
   
       protected void shutdownScheduler() {
           if (scheduler != null) {
               scheduler.shutdown();
               try {
                   schedulerThread.join(30000);
               } catch (InterruptedException e) {
                   System.out.println("Error while shutting down scheduler " + e);
               }
           }
       }
   
       protected Scheduler createScheduler(String streamArn, String leaseTableName) {
           String workerId = "worker-" + System.currentTimeMillis();
   
           // Create ConfigsBuilder
           ConfigsBuilder configsBuilder = createConfigsBuilder(streamArn, workerId, leaseTableName);
   
           // Configure retrieval config for polling
           PollingConfig pollingConfig = new PollingConfig(streamArn, adapterClient);
   
           // Create the Scheduler
           return StreamsSchedulerFactory.createScheduler(
                   configsBuilder.checkpointConfig(),
                   configsBuilder.coordinatorConfig(),
                   configsBuilder.leaseManagementConfig(),
                   configsBuilder.lifecycleConfig(),
                   configsBuilder.metricsConfig(),
                   configsBuilder.processorConfig(),
                   configsBuilder.retrievalConfig().retrievalSpecificConfig(pollingConfig),
                   streamsClient,
                   region
           );
       }
   
       private ConfigsBuilder createConfigsBuilder(String streamArn, String workerId, String leaseTableName) {
           ConfigsBuilder configsBuilder = new ConfigsBuilder(
                   streamArn,
                   leaseTableName,
                   adapterClient,
                   dynamoDbAsyncClient,
                   cloudWatchClient,
                   workerId,
                   recordProcessorFactory);
   
           configureCoordinator(configsBuilder.coordinatorConfig());
           configureLeaseManagement(configsBuilder.leaseManagementConfig());
           configureProcessor(configsBuilder.processorConfig());
           configureStreamTracker(configsBuilder, streamArn);
   
           return configsBuilder;
       }
   
       private void configureCoordinator(CoordinatorConfig config) {
           config.skipShardSyncAtWorkerInitializationIfLeasesExist(true)
                   .parentShardPollIntervalMillis(1000)
                   .shardConsumerDispatchPollIntervalMillis(500);
       }
   
       private void configureLeaseManagement(LeaseManagementConfig config) {
           config.shardSyncIntervalMillis(0)
                   .leasesRecoveryAuditorInconsistencyConfidenceThreshold(0)
                   .leasesRecoveryAuditorExecutionFrequencyMillis(5000)
                   .leaseAssignmentIntervalMillis(1000L);
       }
   
       private void configureProcessor(ProcessorConfig config) {
           config.callProcessRecordsEvenForEmptyRecordList(true);
       }
   
       private void configureStreamTracker(ConfigsBuilder configsBuilder, String streamArn) {
           StreamTracker streamTracker = StreamsSchedulerFactory.createSingleStreamTracker(
                   streamArn,
                   InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON)
           );
           configsBuilder.streamTracker(streamTracker);
       }
   
       public void deleteAllDdbTables(String baseTableName) {
           List<String> tablesToDelete = Arrays.asList(
                   baseTableName,
                   baseTableName + "-CoordinatorState",
                   baseTableName + "-WorkerMetricStats"
           );
   
           for (String tableName : tablesToDelete) {
               deleteTable(tableName);
           }
       }
   
       private void deleteTable(String tableName) {
           DeleteTableRequest deleteTableRequest = DeleteTableRequest.builder()
                   .tableName(tableName)
                   .build();
   
           try {
               DeleteTableResponse response = dynamoDbAsyncClient.deleteTable(deleteTableRequest).get();
               System.out.println("Table deletion response " + response);
           } catch (InterruptedException | ExecutionException e) {
               System.out.println("Error deleting table: " + tableName + " " + e);
           }
       }
   }
   ```

1. <a name="cdc-kcl-record-processor"></a>在此步骤中，您将为应用程序实现记录处理器类以开始处理更改事件。

   ```
    import software.amazon.kinesis.coordinator.Scheduler;
   
   public class KCLTest {
   
       private static final int APP_RUNTIME_SECONDS = 1800;
       private static final int SLEEP_INTERNAL_MS = 60*1000;
   
       public static void main(String[] args) {
           KCLTestBase kclTestBase;
   
           kclTestBase = new KCLTestBase();
           kclTestBase.baseSetUp();
   
           // Create and start scheduler
           String leaseTableName = generateUniqueApplicationName();
   
           // Update below to your Stream ARN
           String streamArn = "arn:aws:cassandra:us-east-1:759151643516:/keyspace/cdc_sample_test/table/test_kcl_bool/stream/2025-07-01T15:52:57.529";
           Scheduler scheduler = kclTestBase.createScheduler(streamArn, leaseTableName);
           kclTestBase.startScheduler(scheduler);
   
           // Wait for specified time before shutting down - KCL applications are designed to run forever, however in this
           // example we will shut it down after APP_RUNTIME_SECONDS
           long startTime = System.currentTimeMillis();
           long endTime = startTime + (APP_RUNTIME_SECONDS * 1000);
           while (System.currentTimeMillis() < endTime) {
               try {
                   // Print and sleep every minute
                   Thread.sleep(SLEEP_INTERNAL_MS);
                   System.out.println("Application is running");
               } catch (InterruptedException e) {
                   System.out.println("Interrupted while waiting for records");
                   Thread.currentThread().interrupt();
                   break;
               }
           }
   
           // Stop the scheduler
           kclTestBase.shutdownScheduler();
           kclTestBase.deleteAllDdbTables(leaseTableName);
       }
   
       public static String generateUniqueApplicationName() {
           String timestamp = String.valueOf(System.currentTimeMillis());
           String randomString = java.util.UUID.randomUUID().toString().substring(0, 8);
           return String.format("KCL-App-%s-%s", timestamp, randomString);
       }
   }
   ```

## 最佳实践
<a name="cdc-kcl-best-practices"></a>

在 Amazon Keyspaces CDC 直播中使用 KCL 时，请遵循以下最佳实践：

**错误处理**  
在记录处理器中实现强大的错误处理，以优雅地处理异常。考虑为暂时失败实现重试逻辑。

**检查点频率**  
平衡检查点频率，最大限度地减少重复处理，同时确保合理的进度跟踪。过于频繁的检查点操作会影响性能，而如果工作人员出现故障，检查点操作频率过低可能会导致更多的重新处理。

**工作人员扩展**  
根据您的 CDC 数据流中的分片数量扩展工作人员的数量。一个好的起点是每个分片配置一个工作程序，但您可能需要根据自己的处理要求进行调整。

**监控**  
使用 KCL 提供的 CloudWatch 指标来监控您的消费者应用程序的运行状况和性能。关键指标包括处理延迟、检查点时间和租赁计数。

**测试**  
彻底测试您的消费者应用程序，包括工作程序故障、流重新分片和不同的负载条件等场景。

## 在非 Java 语言中使用 KCL
<a name="cdc-kcl-non-java"></a>

虽然 KCL 主要是一个 Java 库，但您可以通过将其与其他编程语言一起使用。 MultiLangDaemon MultiLangDaemon 是一个 Java-based 守护程序，用于管理您的非 Java 记录处理器与 KCL 之间的交互。

KCL 为以下语言提供支持：
+ Python
+ Ruby
+ Node.js
+ .NET

有关在非 Java 语言中使用 KCL 的更多信息，请参阅 [K MultiLangDaemon ](https://github.com/awslabs/amazon-kinesis-client/tree/master/amazon-kinesis-client-multilang) CL 文档。

## 问题排查
<a name="cdc-kcl-troubleshooting"></a>

本节提供了在将 KCL 与 Amazon Keyspaces CDC 流配合使用时可能遇到的常见问题的解决方案。

**处理速度慢**  
如果您的消费者应用程序处理记录的速度很慢，请考虑：  
+ 增加工作器实例的数量
+ 优化您的记录处理逻辑
+ 检查下游系统中是否存在瓶颈

**重复处理**  
如果您看到重复处理记录，请检查检查点逻辑。确保在成功处理记录后进行检查点检查。

**工作人员失败**  
如果员工经常失败，请检查：  
+ 资源限制（CPU、内存）
+ 网络连接问题
+ 权限问题

**租赁表问题**  
如果您在使用 KCL 租赁表时遇到问题：  
+ 检查您的应用程序是否具有访问 Amazon Keyspaces 表的相应权限
+ 验证表是否有足够的预配置吞吐量