在中开发 Kinesis 客户端库使用者。 NET - Amazon Kinesis Data Streams

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

在中开发 Kinesis 客户端库使用者。 NET

您可以使用 Kinesis 客户端库 (KCL) 来构建用于处理来自 Kinesis 数据流的数据的应用程序。Kinesis Client Library 提供多种语言版本。本主题讨论。 NET。

KCL是一个 Java 库;对 Java 以外其他语言的支持是使用名为的多语言接口提供的MultiLangDaemon。此守护程序基于 Java,当您使用 Java 以外的KCL语言时,它会在后台运行。因此,如果你安装了 fo KCL r. NET然后完全用它来编写你的消费者应用程序。 NET,您仍然需要在系统上安装 Java,因为 MultiLangDaemon。此外 MultiLangDaemon ,您可能需要根据自己的用例自定义一些默认设置,例如它所连接的 AWS 区域。有关 MultiLangDaemon on 的更多信息 GitHub,请转到KCL MultiLangDaemon 项目页面。

要下载. NETKCL从 GitHub,前往 Kinesis 客户端库 (. NET)。下载的示例代码. NETKCL消费者应用程序,请转到 f KCLor。 NET上的示例消费者项目页面 GitHub。

在中实现使用KCL者应用程序时,必须完成以下任务。 NET:

实现IRecordProcessor类方法

消费端必须实现适用于 IRecordProcessor 的以下方法。示例消费端提供了可用作起点的实现(请参阅 SampleRecordProcessor 中的 SampleConsumer/AmazonKinesisSampleConsumer.cs 类)。

public void Initialize(InitializationInput input) public void ProcessRecords(ProcessRecordsInput input) public void Shutdown(ShutdownInput input)
Initialize

在实例化记录处理器时KCL调用此方法,并在input参数 () 中传递特定的分片 ID。input.ShardId此记录处理器只处理此分片,并且通常情况下反过来说也成立(此分片只能由此记录处理器处理)。但是,您的消费端应该考虑数据记录可能会经过多次处理的情况。这是因为 Kinesis Data Streams 具有至少一次语义,即分片中的每个数据记录在您的消费端中由工作程序至少处理一次。有关特定分片可能由多个工作线程处理的情况的更多信息,请参阅使用重新分片、缩放和并行处理来更改分片的数量

public void Initialize(InitializationInput input)
ProcessRecords

KCL调用此方法,在input参数 (input.Records) 中从该Initialize方法指定的分片中传递数据记录列表。您实现的记录处理器根据您的消费端的语义处理这些记录中的数据。例如,工作程序可能对数据执行转换,然后将结果存储在 Amazon Simple Storage Service(Amazon S3)存储桶中。

public void ProcessRecords(ProcessRecordsInput input)

除了数据本身之外,记录还包含一个序号和一个分区键。工作程序可在处理数据时使用这些值。例如,工作线程可选择 S3 存储桶,并在其中根据分区键的值存储数据。Record 类公开了以下代理来访问记录的数据、序号和分区键:

byte[] Record.Data string Record.SequenceNumber string Record.PartitionKey

在该示例中,方法 ProcessRecordsWithRetries 具有显示工作程序如何访问记录的数据、序号和分区键的代码。

Kinesis Data Streams 需要记录处理器来跟踪已在分片中处理的记录。通过KCL将Checkpointer对象传递给 ProcessRecords (input.Checkpointer) 来为您处理此跟踪。记录处理器调用该Checkpointer.Checkpoint方法以告知其在处理分片中的记录方面取得了多大的进展。KCL如果工作程序失败,则KCL使用此信息在上次已知的已处理记录处重新开始处理分片。

对于拆分或合并操作,KCL只有原始分片的处理器发出原始分片的所有处理已完成的信号,才会开始处理新的分片。Checkpointer.Checkpoint

如果您不传递参数,则KCL假定对的调用Checkpointer.Checkpoint表示所有记录都已处理,直到传递给记录处理器的最后一条记录为止。因此,记录处理器只应在已处理传递到它的列表中的所有记录后才调用 Checkpointer.Checkpoint。记录处理器不需要在每次调用 Checkpointer.Checkpoint 时调用 ProcessRecords。例如,处理器在每第三次或第四次调用时调用 Checkpointer.Checkpoint。您可以选择性地将某个记录的确切序号指定为 Checkpointer.Checkpoint 的参数。在这种情况下,KCL假设只处理了该记录之前的记录。

在该示例中,私有方法 Checkpoint(Checkpointer checkpointer) 展示了如何使用适当的异常处理和重试逻辑调用 Checkpointer.Checkpoint 方法。

那个 KCL for。 NET处理异常与其他KCL语言库的不同之处在于,它不处理处理数据记录时出现的任何异常。用户代码中未捕获的任何异常都将使程序崩溃。

关闭

在处理结束(关闭原因是TERMINATE)或工作器不再响应(关闭input.Reason值为ZOMBIE)时,都会KCL调用该Shutdown方法。

public void Shutdown(ShutdownInput input)

处理操作在记录处理器不再从分片中接收任何记录时结束,因为分片已被拆分或合并,或者流已删除。

KCL还会将Checkpointer对象传递给shutdown。如果关闭原因为 TERMINATE,则记录处理器应完成处理任何数据记录,然后对此接口调用 checkpoint 方法。

修改配置属性

示例消费端提供了配置属性的默认值。您可使用自己的值覆盖任何这些属性(请参阅 SampleConsumer/kcl.properties)。

应用程序名称

KCL需要一个在您的应用程序中以及同一区域的 Amazon DynamoDB 表中具有唯一性的应用程序。KCL 通过以下方法使用应用程序名称配置值:

  • 假定所有与此应用程序名称关联的工作程序在同一数据流上合作。这些工作程序可被分配到多个实例上。如果您运行相同应用程序代码的另一个实例,但使用不同的应用程序名称,则会将第二个实例KCL视为也在同一流上运行的完全独立的应用程序。

  • 使用应用程序名称KCL创建一个 DynamoDB 表,并使用该表维护应用程序的状态信息(例如检查点和工作分片映射)。每个应用程序都有自己的 DynamoDB 表。有关更多信息,请参阅 使用租赁表跟踪使用者应用程序处理的KCL分片

设置凭证

您必须将您的 AWS 证书提供给默认凭证提供者链中的一个凭证提供商。可以使用 AWSCredentialsProvider 属性设置凭证提供程序。sample.properties 必须向默认凭证提供程序链中的凭证提供程序之一提供您的凭证。如果您在EC2实例上运行使用者应用程序,我们建议您为该实例配置一个IAM角色。 AWS 反映与此IAM角色关联的权限的证书可通过实例元数据提供给实例上的应用程序。对于在EC2实例上运行的使用者,这是管理凭证的最安全的方式。

该示例的属性文件配置KCL为使用中提供的记录处理器处理名为 “文字” 的 Kinesis 数据流。AmazonKinesisSampleConsumer.cs