连接器 - Amazon Managed Streaming for Apache Kafka

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

连接器

连接器会持续将数据来源中的流数据复制到您的 Apache Kafka 集群,或者持续将数据从集群复制到数据接收器中,从而将外部系统和 Amazon 服务与 Apache Kafka 集群相集成。连接器还可以执行轻量级逻辑,例如在将数据传送到目标之前进行转换、格式转换或数据筛选。源连接器从数据来源提取数据,并将这些数据推送到集群中,而接收器连接器则从集群中提取数据,并将这些数据推送到数据接收器中。

下图显示了连接器的架构。工作程序是运行连接器逻辑的 Java 虚拟机(JVM)进程。每个工作程序都会创建一组任务,这些任务在并行线程中运行并执行复制数据的工作。任务不存储状态,因此可以随时启动、停止或重新启动,以提供弹性且可扩展的数据管道。

显示连接器集群架构的示意图。

连接器容量

连接器的总容量取决于该连接器拥有的工作程序数量,以及每个工作程序的 MSK Connect 单位(MCU)数量。每个 MCU 代表 1 个 vCPU 的计算能力和 4GiB 的内存。MCU 内存与工作程序实例的总内存有关,而不是正在使用的堆内存。

MSK Connect 工作人员使用客户提供的子网中的 IP 地址。每个工作人员使用客户提供的子网中的一个 IP 地址。您应确保在提供给 CreateConnector 请求的子网中有足够的可用的 IP 地址来考虑其指定容量,尤其是在自动缩放连接器时,工作人员数量可能会波动。

要创建连接器,必须选择以下两种容量模式之一。

  • 已预置 – 如果您知道连接器的容量要求,请选择此模式。指定两个值:

    • 工作程序数量。

    • 每个工作程序的 MCU 数量。

  • 自动扩缩 – 如果连接器的容量要求各不相同,或者您事先不知道连接器的容量要求,请选择此模式。当您使用自动扩缩模式时,Amazon MSK Connect 会覆盖连接器的 tasks.max 属性,其值与连接器中运行的工作程序数量和每个工作程序的 MCU 数量成正比。

    指定三组值:

    • 最小和最大工作程序数量。

    • CPU 利用率的横向缩减百分比和横向扩展百分比,该百分比由 CpuUtilization 指标确定。当连接器的 CpuUtilization 指标超过横向扩展百分比时,MSK Connect 会增加连接器中运行的工作程序数量。当 CpuUtilization 指标低于横向缩减百分比时,MSK Connect 会减少工作程序数量。工作程序的数量将始终保持在创建连接器时指定的最小和最大数量之间。

    • 每个工作程序的 MCU 数量。

有关工作程序的更多信息,请参阅 工作线程。要了解有关 MSK Connect 指标的信息,请参阅 监控 MSK Connect

创建连接器

使用创建连接器 AWS Management Console
  1. https://console.aws.amazon.com/msk/ 打开 Amazon MSK 控制台。

  2. 在左侧窗格的 MSK Connect 下,选择连接器

  3. 选择 Create connector (创建连接器)

  4. 您可以选择使用现有的自定义插件来创建连接器,也可以先创建新的自定义插件。有关自定义插件以及如何创建这些插件的信息,请参阅 插件。在此过程中,假设您有一个要使用的自定义插件。在自定义插件列表中,找到要使用的插件,选中其左侧的复选框,然后选择下一步

  5. 输入名称和描述(可选)。

  6. 选择您想要连接到的集群。

  7. 指定连接器配置。您需要指定的配置参数取决于要创建的连接器类型。但是,部分参数是所有连接器通用的参数,例如 connector.classtasks.max 参数。以下是 Confluent Amazon S3 Sink Connector 的配置示例。

    connector.class=io.confluent.connect.s3.S3SinkConnector tasks.max=2 topics=my-example-topic s3.region=us-east-1 s3.bucket.name=my-destination-bucket flush.size=1 storage.class=io.confluent.connect.s3.storage.S3Storage format.class=io.confluent.connect.s3.format.json.JsonFormat partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter schema.compatibility=NONE
  8. 接下来,配置您的连接器容量。您可以在两种容量模式之间选择:已预置和自动扩缩。有关这两个选项的信息,请参阅连接器容量

  9. 选择默认工作程序配置或自定义工作程序配置。有关创建自定义工作程序配置的信息,请参阅 工作线程

  10. 接下来,指定服务执行角色。这必须是 MSK Connect 可以担任的 IAM 角色,该角色向连接器授予访问必要 AWS 资源所需的所有权限。这些权限取决于连接器的逻辑。有关如何创建此角色的信息,请参阅 服务执行角色

  11. 选择下一步,查看安全信息,然后再次选择下一步

  12. 指定所需的日志记录选项,然后选择下一步。有关日志记录的信息,请参阅为 MSK Connect 进行日志记录

  13. 选择 Create connector (创建连接器)

要使用 MSK Connect API 创建连接器,请参阅CreateConnector