本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
在中开发 Kinesis 客户端库使用者。 NET
您可以使用 Kinesis 客户端库 (KCL) 来构建用于处理来自 Kinesis 数据流的数据的应用程序。Kinesis Client Library 提供多种语言版本。本主题讨论。 NET。
KCL是一个 Java 库;对 Java 以外其他语言的支持是使用名为的多语言接口提供的MultiLangDaemon。此守护程序基于 Java,当您使用 Java 以外的KCL语言时,它会在后台运行。因此,如果你安装了 fo KCL r. NET然后完全用它来编写你的消费者应用程序。 NET,您仍然需要在系统上安装 Java,因为 MultiLangDaemon。此外 MultiLangDaemon ,您可能需要根据自己的用例自定义一些默认设置,例如它所连接的 AWS 区域。有关 MultiLangDaemon on 的更多信息 GitHub,请转到KCL MultiLangDaemon 项目
要下载. NETKCL从 GitHub,前往 Kinesis 客户端库 (. NET
在中实现使用KCL者应用程序时,必须完成以下任务。 NET:
实现IRecordProcessor类方法
消费端必须实现适用于 IRecordProcessor
的以下方法。示例消费端提供了可用作起点的实现(请参阅 SampleRecordProcessor
中的 SampleConsumer/AmazonKinesisSampleConsumer.cs
类)。
public void Initialize(InitializationInput input)
public void ProcessRecords(ProcessRecordsInput input)
public void Shutdown(ShutdownInput input)
Initialize
在实例化记录处理器时KCL调用此方法,并在input
参数 () 中传递特定的分片 ID。input.ShardId
此记录处理器只处理此分片,并且通常情况下反过来说也成立(此分片只能由此记录处理器处理)。但是,您的消费端应该考虑数据记录可能会经过多次处理的情况。这是因为 Kinesis Data Streams 具有至少一次语义,即分片中的每个数据记录在您的消费端中由工作程序至少处理一次。有关特定分片可能由多个工作线程处理的情况的更多信息,请参阅使用重新分片、缩放和并行处理来更改分片的数量。
public void Initialize(InitializationInput input)
ProcessRecords
KCL调用此方法,在input
参数 (input.Records
) 中从该Initialize
方法指定的分片中传递数据记录列表。您实现的记录处理器根据您的消费端的语义处理这些记录中的数据。例如,工作程序可能对数据执行转换,然后将结果存储在 Amazon Simple Storage Service(Amazon S3)存储桶中。
public void ProcessRecords(ProcessRecordsInput input)
除了数据本身之外,记录还包含一个序号和一个分区键。工作程序可在处理数据时使用这些值。例如,工作线程可选择 S3 存储桶,并在其中根据分区键的值存储数据。Record
类公开了以下代理来访问记录的数据、序号和分区键:
byte[] Record.Data
string Record.SequenceNumber
string Record.PartitionKey
在该示例中,方法 ProcessRecordsWithRetries
具有显示工作程序如何访问记录的数据、序号和分区键的代码。
Kinesis Data Streams 需要记录处理器来跟踪已在分片中处理的记录。通过KCL将Checkpointer
对象传递给 ProcessRecords
(input.Checkpointer
) 来为您处理此跟踪。记录处理器调用该Checkpointer.Checkpoint
方法以告知其在处理分片中的记录方面取得了多大的进展。KCL如果工作程序失败,则KCL使用此信息在上次已知的已处理记录处重新开始处理分片。
对于拆分或合并操作,KCL只有原始分片的处理器发出原始分片的所有处理已完成的信号,才会开始处理新的分片。Checkpointer.Checkpoint
如果您不传递参数,则KCL假定对的调用Checkpointer.Checkpoint
表示所有记录都已处理,直到传递给记录处理器的最后一条记录为止。因此,记录处理器只应在已处理传递到它的列表中的所有记录后才调用 Checkpointer.Checkpoint
。记录处理器不需要在每次调用 Checkpointer.Checkpoint
时调用 ProcessRecords
。例如,处理器在每第三次或第四次调用时调用 Checkpointer.Checkpoint
。您可以选择性地将某个记录的确切序号指定为 Checkpointer.Checkpoint
的参数。在这种情况下,KCL假设只处理了该记录之前的记录。
在该示例中,私有方法 Checkpoint(Checkpointer checkpointer)
展示了如何使用适当的异常处理和重试逻辑调用 Checkpointer.Checkpoint
方法。
那个 KCL for。 NET处理异常与其他KCL语言库的不同之处在于,它不处理处理数据记录时出现的任何异常。用户代码中未捕获的任何异常都将使程序崩溃。
关闭
在处理结束(关闭原因是TERMINATE
)或工作器不再响应(关闭input.Reason
值为ZOMBIE
)时,都会KCL调用该Shutdown
方法。
public void Shutdown(ShutdownInput input)
处理操作在记录处理器不再从分片中接收任何记录时结束,因为分片已被拆分或合并,或者流已删除。
KCL还会将Checkpointer
对象传递给shutdown
。如果关闭原因为 TERMINATE
,则记录处理器应完成处理任何数据记录,然后对此接口调用 checkpoint
方法。
修改配置属性
示例消费端提供了配置属性的默认值。您可使用自己的值覆盖任何这些属性(请参阅 SampleConsumer/kcl.properties
)。
应用程序名称
KCL需要一个在您的应用程序中以及同一区域的 Amazon DynamoDB 表中具有唯一性的应用程序。KCL 通过以下方法使用应用程序名称配置值:
-
假定所有与此应用程序名称关联的工作程序在同一数据流上合作。这些工作程序可被分配到多个实例上。如果您运行相同应用程序代码的另一个实例,但使用不同的应用程序名称,则会将第二个实例KCL视为也在同一流上运行的完全独立的应用程序。
-
使用应用程序名称KCL创建一个 DynamoDB 表,并使用该表维护应用程序的状态信息(例如检查点和工作分片映射)。每个应用程序都有自己的 DynamoDB 表。有关更多信息,请参阅 使用租赁表跟踪使用者应用程序处理的KCL分片。
设置凭证
您必须将您的 AWS 证书提供给默认凭证提供者链中的一个凭证提供商。可以使用 AWSCredentialsProvider
属性设置凭证提供程序。sample.properties
该示例的属性文件配置KCL为使用中提供的记录处理器处理名为 “文字” 的 Kinesis 数据流。AmazonKinesisSampleConsumer.cs