在 Python 中开发 Kinesis 客户端库使用者 - Amazon Kinesis Data Streams

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

在 Python 中开发 Kinesis 客户端库使用者

您可以使用 Kinesis 客户端库 (KCL) 来构建用于处理来自 Kinesis 数据流的数据的应用程序。Kinesis Client Library 提供多种语言版本。本主题将讨论 Python。

KCL是一个 Java 库;对 Java 以外其他语言的支持是使用名为的多语言接口提供的MultiLangDaemon。此守护程序基于 Java,当您使用 Java 以外的KCL语言时,它会在后台运行。因此,如果您安装KCL适用于 Python 的,并完全使用 Python 编写使用者应用程序,则仍然需要在系统上安装 Java,因为 MultiLangDaemon。此外 MultiLangDaemon ,您可能需要根据自己的用例自定义一些默认设置,例如它所连接的 AWS 区域。有关 MultiLangDaemon on 的更多信息 GitHub,请转到KCL MultiLangDaemon 项目页面。

要KCL从中下载 Python GitHub,请前往 Kinesis 客户端库 (Python)。要下载 Python 使用KCL者应用程序的示例代码,请转到上KCL的 Python 示例项目页面 GitHub。

在 Python 中实现使用KCL者应用程序时,必须完成以下任务:

实现 RecordProcessor 类方法

RecordProcess 类必须扩展 RecordProcessorBase 以实现以下方法。该示例提供了可用作起点的实现(请参阅 sample_kclpy_app.py)。

def initialize(self, shard_id) def process_records(self, records, checkpointer) def shutdown(self, checkpointer, reason)
初始化

在实例化记录处理器时KCL调用该initialize方法,将特定的分片 ID 作为参数传递。此记录处理器只处理此分片,并且通常情况下反过来说也成立(此分片只能由此记录处理器处理)。但是,您的消费端应该考虑数据记录可能会经过多次处理的情况。这是因为 Kinesis Data Streams 具有至少一次语义,即分片中的每个数据记录在您的消费端中由工作程序至少处理一次。有关特定分片可能由多个工作程序进行处理的情况的更多信息,请参阅使用重新分片、缩放和并行处理来更改分片的数量

def initialize(self, shard_id)
process_records

KCL调用此方法,从该initialize方法指定的分片中传递数据记录列表。您实现的记录处理器根据您的消费端的语义处理这些记录中的数据。例如,工作程序可能对数据执行转换,然后将结果存储在 Amazon Simple Storage Service(Amazon S3)存储桶中。

def process_records(self, records, checkpointer)

除了数据本身之外,记录还包含一个序号和一个分区键。工作程序可在处理数据时使用这些值。例如,工作线程可选择 S3 存储桶,并在其中根据分区键的值存储数据。record 词典公开了以下键-值对来访问记录的数据、序号和分区键:

record.get('data') record.get('sequenceNumber') record.get('partitionKey')

请注意,数据是 Base64 编码的。

在该示例中,方法 process_records 具有显示工作程序如何访问记录的数据、序号和分区键的代码。

Kinesis Data Streams 需要记录处理器来跟踪已在分片中处理的记录。通过KCL将Checkpointer对象传递给来为您处理此跟踪process_records。记录处理器在此对象上调用该checkpoint方法,以告知其在处理分片中的记录方面取得了多大的进展。KCL如果工作程序失败,则KCL使用此信息在上次已知的已处理记录处重新开始处理分片。

对于拆分或合并操作,KCL只有原始分片的处理器发出原始分片的所有处理已完成的信号,才会开始处理新的分片。checkpoint

如果您不传递参数,则KCL假定调用checkpoint表示所有记录都已处理,直到传递给记录处理器的最后一条记录为止。因此,记录处理器只应在已处理传递到它的列表中的所有记录后才调用 checkpoint。记录处理器不需要在每次调用 checkpoint 时调用 process_records。例如,处理器可在每第三次调用时调用 checkpoint。您可以选择性地将某个记录的确切序号指定为 checkpoint 的参数。在这种情况下,KCL假设所有记录都已处理完毕,直到该记录为止。

在该示例中,私有方法 checkpoint 展示了如何使用适当的异常处理和重试逻辑调用 Checkpointer.checkpoint 方法。

KCL依赖process_records于处理数据记录时出现的任何异常。如果引发了异常process_records,则会KCL跳过在异常process_records之前传递到的数据记录。也就是说,这些记录不会重新发送到引发异常的记录处理器或消费端中的任何其他记录处理器。

shutdown

在处理结束(关闭原因是TERMINATE)或工作器不再响应(关闭是)时,都会KCL调用该shutdown方法。reason ZOMBIE

def shutdown(self, checkpointer, reason)

处理操作在记录处理器不再从分片中接收任何记录时结束,因为分片已被拆分或合并,或者流已删除。

KCL还会将Checkpointer对象传递给shutdown。如果关闭 reasonTERMINATE,则记录处理器应完成处理任何数据记录,然后对此接口调用 checkpoint 方法。

修改配置属性

该示例提供了配置属性的默认值。您可使用自己的值覆盖任何这些属性(请参阅 sample.properties)。

应用程序名称

KCL需要的应用程序名称在您的应用程序中以及同一区域的 Amazon DynamoDB 表中必须是唯一的。KCL 通过以下方法使用应用程序名称配置值:

  • 假定与此应用程序名称关联的所有工作线程在同一个流上一起运行。这些工作线程可分布在多个实例上。如果您运行相同应用程序代码的另一个实例,但使用不同的应用程序名称,则会将第二个实例KCL视为也在同一流上运行的完全独立的应用程序。

  • 使用应用程序名称KCL创建一个 DynamoDB 表,并使用该表维护应用程序的状态信息(例如检查点和工作分片映射)。每个应用程序都有自己的 DynamoDB 表。有关更多信息,请参阅 使用租赁表跟踪使用者应用程序处理的KCL分片

设置凭证

您必须将您的 AWS 证书提供给默认凭证提供者链中的一个凭证提供商。可以使用 AWSCredentialsProvider 属性设置凭证提供程序。sample.properties 必须向默认凭证提供程序链中的凭证提供程序之一提供您的凭证。如果您在 Amazon EC2 实例上运行使用者应用程序,我们建议您为该实例配置一个IAM角色。 AWS 反映与此IAM角色关联的权限的证书可通过实例元数据提供给实例上的应用程序。这是管理在EC2实例上运行的使用者应用程序的凭证的最安全的方式。

该示例的属性文件配置KCL为使用中提供的记录处理器处理名为 “文字” 的 Kinesis 数据流。sample_kclpy_app.py