使用 Java 通过 KCL 开发消费端 - Amazon Kinesis Data Streams

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用 Java 通过 KCL 开发消费端

先决条件

开始使用 KCL 3.x 之前,请确保已具备以下条件:

  • Java Development Kit (JDK) 8 或更高版本

  • 适用于 Java 的 AWS SDK 2.x

  • 用于依赖项管理的 Maven 或 Gradle

KCL 从运行工作程序的计算主机上收集 CPU 利用率指标(例如 CPU 利用率)来平衡负载,从而在各工作程序之间实现均衡的资源利用率水平。要让 KCL 能够从工作程序收集 CPU 使用率指标,必须满足以下先决条件:

Amazon Elastic Compute Cloud(亚马逊 EC2)

  • 操作系统必须是 Linux 操作系统。

  • 您必须在您的 EC2 实例IMDSv2中启用。

亚马逊上的亚马逊弹性容器服务 (Amazon ECS) Container Service EC2

Amazon ECS 已开启 AWS Fargate

  • 必须启用 Fargate 任务元数据端点版本 4。如果使用的是 Fargate 平台版本 1.4.0 或更高版本,则默认启用此功能。

  • Fargate(平台版本 1.4.0 或更高版本)。

亚马逊上的亚马逊 Elastic Kubernetes Service(亚马逊 EKS) EC2

  • 操作系统必须是 Linux 操作系统。

亚马逊 EKS 开启 AWS Fargate

  • Fargate(平台版本 1.3.0 或更高版本)。

重要

如果 KCL 无法从工作程序收集 CPU 利用率指标,KCL 将回退到使用每个工作程序的吞吐量来分配租约,并在队列中的工作程序之间平衡负载。有关更多信息,请参阅 KCL 如何向工作程序分配租约并平衡负载

安装并添加依赖项

如果您使用的是 Maven,请将以下依赖项添加到您的 pom.xml 文件中。确保将 3.x.x 替换为最新的 KCL 版本。

<dependency> <groupId>software.amazon.kinesis</groupId> <artifactId>amazon-kinesis-client</artifactId> <version>3.x.x</version> <!-- Use the latest version --> </dependency>

如果使用 Gradle,请在 build.gradle 文件中添加以下信息。确保将 3.x.x 替换为最新的 KCL 版本。

implementation 'software.amazon.kinesis:amazon-kinesis-client:3.x.x'

可以在 Maven Central 存储库中查看最新版本的 KCL。

实现消费端

KCL 消费端应用程序包含以下关键组件:

RecordProcessor

RecordProcessor 是处理 Kinesis 数据流记录的业务逻辑所在的核心组件。它定义了应用程序如何处理从 Kinesis 流接收的数据。

主要职责:

  • 初始化分片的处理

  • 处理来自 Kinesis 流的批量记录

  • 关闭分片的处理(例如,在分片拆分或合并,或者将租约移交给另一台主机时)

  • 处理检查点操作以跟踪进度

以下示例演示如何实施。

import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; import software.amazon.kinesis.exceptions.InvalidStateException; import software.amazon.kinesis.exceptions.ShutdownException; import software.amazon.kinesis.lifecycle.events.*; import software.amazon.kinesis.processor.ShardRecordProcessor; public class SampleRecordProcessor implements ShardRecordProcessor { private static final String SHARD_ID_MDC_KEY = "ShardId"; private static final Logger log = LoggerFactory.getLogger(SampleRecordProcessor.class); private String shardId; @Override public void initialize(InitializationInput initializationInput) { shardId = initializationInput.shardId(); MDC.put(SHARD_ID_MDC_KEY, shardId); try { log.info("Initializing @ Sequence: {}", initializationInput.extendedSequenceNumber()); } finally { MDC.remove(SHARD_ID_MDC_KEY); } } @Override public void processRecords(ProcessRecordsInput processRecordsInput) { MDC.put(SHARD_ID_MDC_KEY, shardId); try { log.info("Processing {} record(s)", processRecordsInput.records().size()); processRecordsInput.records().forEach(r -> log.info("Processing record pk: {} -- Seq: {}", r.partitionKey(), r.sequenceNumber()) ); // Checkpoint periodically processRecordsInput.checkpointer().checkpoint(); } catch (Throwable t) { log.error("Caught throwable while processing records. Aborting.", t); } finally { MDC.remove(SHARD_ID_MDC_KEY); } } @Override public void leaseLost(LeaseLostInput leaseLostInput) { MDC.put(SHARD_ID_MDC_KEY, shardId); try { log.info("Lost lease, so terminating."); } finally { MDC.remove(SHARD_ID_MDC_KEY); } } @Override public void shardEnded(ShardEndedInput shardEndedInput) { MDC.put(SHARD_ID_MDC_KEY, shardId); try { log.info("Reached shard end checkpointing."); shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { log.error("Exception while checkpointing at shard end. Giving up.", e); } finally { MDC.remove(SHARD_ID_MDC_KEY); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { MDC.put(SHARD_ID_MDC_KEY, shardId); try { log.info("Scheduler is shutting down, checkpointing."); shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { log.error("Exception while checkpointing at requested shutdown. Giving up.", e); } finally { MDC.remove(SHARD_ID_MDC_KEY); } } }

下面详细说明了本例中使用的各种方法:

初始化(初InitializationInput始化输入)

  • 目的:为处理记录设置任何必要的资源或状态。

  • 调用时间:当 KCL 为该记录处理器分配分片时,调用一次。

  • 关键点:

    • initializationInput.shardId():此处理器将要处理的分片的 ID。

    • initializationInput.extendedSequenceNumber():开始处理的序列号。

流程记录 () ProcessRecordsInput processRecordsInput

  • 目的:处理传入的记录,可选择处理检查点进度。

  • 调用时间:只要记录处理器持有分片的租约时,就反复调用。

  • 关键点:

    • processRecordsInput.records():要处理的记录列表。

    • processRecordsInput.checkpointer():用于进度的检查点操作。

    • 确保在处理过程中处理了所有异常,以防止 KCL 出现故障。

    • 该方法应该具有幂等性,因为在某些情况下,同一条记录可能会处理多次,例如在工作程序意外崩溃或重启之前尚未进行检查点操作的数据。

    • 在进行检查点操作之前,务必刷新任何缓存数据,以确保数据一致性。

LeaseLost () LeaseLostInput leaseLostInput

  • 目的:清理用于处理此分片的所有特定资源。

  • 调用时间:当其他调度器接管此分片的租约时。

  • 关键点:

    • 该方法不允许进行检查点操作。

ShardEnded () ShardEndedInput shardEndedInput

  • 目的:完成此分片的处理并进行检查点操作。

  • 调用时间:当分片拆分或合并时,表示该分片的所有数据都已处理完毕。

  • 关键点:

    • shardEndedInput.checkpointer():用于执行最终的检查点操作。

    • 该方法必须进行检查点操作才能完成处理。

    • 若此处未刷新数据和进行检查点操作,可能会导致分片重新打开时出现数据丢失或重复处理。

已请求关机 () ShutdownRequestedInput shutdownRequestedInput

  • 目的:在 KCL 关闭时进行检查点操作并清理资源。

  • 调用时间:当 KCL 关闭时,例如,在应用程序终止时。

  • 关键点:

    • shutdownRequestedInput.checkpointer():用于在关闭前执行检查点操作。

    • 确保在该方法中进行了检查点操作,以便在应用程序停止之前保存进度。

    • 若此处未刷新数据和进行检查点操作,可能会导致在应用程序重新启动时出现数据丢失或重新处理记录。

重要

KCL 3.x 通过在前一个工作程序关闭前进行检查点操作,确保在租约从一个工作程序移交给另一个工作程序时减少数据重复处理。如果未在 shutdownRequested() 方法中实现检查点操作逻辑,就无法体验到这一好处。请确保已在 shutdownRequested() 方法中实现了检查点操作逻辑。

RecordProcessorFactory

RecordProcessorFactory 负责创建新 RecordProcessor实例。KCL 使用此工厂 RecordProcessor 为应用程序需要处理的每个分片创建一个新分片。

主要职责:

  • 按需创建新 RecordProcessor 实例

  • 确保每个都已 RecordProcessor 正确初始化

以下是一个实施示例:

import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; public class SampleRecordProcessorFactory implements ShardRecordProcessorFactory { @Override public ShardRecordProcessor shardRecordProcessor() { return new SampleRecordProcessor(); } }

在此示例中, SampleRecordProcessor 每次调用 shardRecordProcessor () 时,工厂都会创建一个新的。您可以进行扩展以添加任何必要的初始化逻辑。

调度器

调度器是一个协调 KCL 应用程序所有活动的高级组件。调度器负责数据处理的总体编排。

主要职责:

  • 管理生命周期 RecordProcessors

  • 处理分片的租约管理

  • 协调检查点操作

  • 在应用程序的多个工作程序之间平衡分片处理负载

  • 处理正常关闭和应用程序终止信号

调度器通常在主应用程序中创建和启动。您可以在下一节“主消费端应用程序”中查看调度器的实现示例。

主消费端应用程序

主消费端应用程序将所有组件联系在一起。它负责设置 KCL 消费端、创建必要的客户端、配置调度器和管理应用程序的生命周期。

主要职责:

  • 设置 AWS 服务客户端(Kinesis、DynamoDB 等) CloudWatch

  • 配置 KCL 应用程序

  • 创建并启动调度器

  • 处理应用程序关闭

以下是一个实施示例:

import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.kinesis.common.ConfigsBuilder; import software.amazon.kinesis.common.KinesisClientUtil; import software.amazon.kinesis.coordinator.Scheduler; import java.util.UUID; public class SampleConsumer { private final String streamName; private final Region region; private final KinesisAsyncClient kinesisClient; public SampleConsumer(String streamName, Region region) { this.streamName = streamName; this.region = region; this.kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(this.region)); } public void run() { DynamoDbAsyncClient dynamoDbAsyncClient = DynamoDbAsyncClient.builder().region(region).build(); CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build(); ConfigsBuilder configsBuilder = new ConfigsBuilder( streamName, streamName, kinesisClient, dynamoDbAsyncClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory() ); Scheduler scheduler = new Scheduler( configsBuilder.checkpointConfig(), configsBuilder.coordinatorConfig(), configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), configsBuilder.processorConfig(), configsBuilder.retrievalConfig() ); Thread schedulerThread = new Thread(scheduler); schedulerThread.setDaemon(true); schedulerThread.start(); } public static void main(String[] args) { String streamName = "your-stream-name"; // replace with your stream name Region region = Region.US_EAST_1; // replace with your region new SampleConsumer(streamName, region).run(); } }

默认情况下,KCL 会创建一个具有专用吞吐量的增强型扇出型 (EFO) 消费端。有关增强扇出功能的更多信息,请参阅开发具有专用吞吐量的增强扇出型消费端。如果消费端少于 2 个,或者不需要低于 200 毫秒的读取传播延迟,则必须在调度器对象中设置以下配置来使用共享吞吐量消费端:

configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient))

以下代码是一个创建使用共享吞吐量消费端的调度器对象的示例:

进口

import software.amazon.kinesis.retrieval.polling.PollingConfig;

代码

Scheduler scheduler = new Scheduler( configsBuilder.checkpointConfig(), configsBuilder.coordinatorConfig(), configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), configsBuilder.processorConfig(), configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient)) );/