选择您的 Cookie 首选项

我们使用必要 Cookie 和类似工具提供我们的网站和服务。我们使用性能 Cookie 收集匿名统计数据,以便我们可以了解客户如何使用我们的网站并进行改进。必要 Cookie 无法停用,但您可以单击“自定义”或“拒绝”来拒绝性能 Cookie。

如果您同意,AWS 和经批准的第三方还将使用 Cookie 提供有用的网站功能、记住您的首选项并显示相关内容,包括相关广告。要接受或拒绝所有非必要 Cookie,请单击“接受”或“拒绝”。要做出更详细的选择,请单击“自定义”。

使用自定义偏移存储主题

聚焦模式
使用自定义偏移存储主题 - Amazon Managed Streaming for Apache Kafka

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

要在源连接器之间提供偏移连续性,您可以使用自己选择的偏移存储主题来代替默认主题。指定偏移存储主题可以帮助您完成创建源连接器之类的任务,该连接器可从上一个连接器的最后一个偏移恢复读取。

要指定偏移存储主题,请在创建连接器之前在工作程序配置中为 offset.storage.topic 属性提供一个值。如果要重复使用偏移存储主题来消耗先前创建的连接器的偏移,则必须为新连接器指定与旧连接器相同的名称。如果您创建自定义偏移存储主题,则必须在主题配置中将 cleanup.policy 设置为 compact

注意

如果您在创建接收器连接器时指定了偏移存储主题,MSKConnect 会创建该主题(如果该主题尚不存在)。但是,该主题不会用于存储连接器偏移,

而是使用 Kafka 使用器组协议来管理接收器连接器偏移。每个接收器连接器都会创建一个名为 connect-{CONNECTOR_NAME} 的组。只要使用器组存在,您创建的任何具有相同 CONNECTOR_NAME 值的连续接收器连接器都将从上次提交的偏移继续。

例 :指定偏移存储主题以使用更新后的配置重新创建源连接器

假设你有一个 change data capture (CDC) 连接器,并且你想在不失去在CDC直播中的位置的情况下修改连接器配置。您无法更新现有的连接器配置,但可以删除连接器并使用相同的名称创建新连接器。要告诉新连接器从何处开始读取CDC流,可以在工作器配置中指定旧连接器的偏移存储主题。以下步骤演示如何完成此任务。

  1. 在您的客户端计算机上,运行以下命令以查找连接器偏移存储主题的名称。将 <bootstrapBrokerString> 替换为集群的引导代理字符串。有关获取引导代理字符串的说明,请参阅获取 Amazon MSK 集群的引导代理

    <path-to-your-kafka-installation>/bin/kafka-topics.sh --list --bootstrap-server <bootstrapBrokerString>

    以下输出显示了所有集群主题的列表,包括所有默认的内部连接器主题。在此示例中,现有CDC连接器使用 C MSK onnect 创建的默认偏移存储主题。这就是偏移存储主题名为 __amazon_msk_connect_offsets_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2 的原因。

    __consumer_offsets __amazon_msk_canary __amazon_msk_connect_configs_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2 __amazon_msk_connect_offsets_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2 __amazon_msk_connect_status_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2 my-msk-topic-1 my-msk-topic-2
  2. 打开 Amazon MSK 控制台,网址为https://console.aws.amazon.com/msk/

  3. 连接器列表中选择您的连接器。复制并保存连接器配置字段的内容,以便您可以对其进行修改并使用它来创建新连接器。

  4. 要删除连接器,请选择删除。然后在文本输入字段中输入连接器名称,以确认删除。

  5. 使用适合您场景的值创建自定义工作程序配置。有关说明,请参阅创建自定义工作程序配置

    在工作程序配置中,必须将之前检索到的偏移存储主题的名称指定为类似于以下配置中 offset.storage.topic 的值。

    config.providers.secretManager.param.aws.region=us-east-1 key.converter=<org.apache.kafka.connect.storage.StringConverter> value.converter=<org.apache.kafka.connect.storage.StringConverter> config.providers.secretManager.class=com.github.jcustenborder.kafka.config.aws.SecretsManagerConfigProvider config.providers=secretManager offset.storage.topic=__amazon_msk_connect_offsets_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2
  6. 重要

    必须为新连接器指定与旧连接器相同的名称。

    使用在上一步中设置的工作程序配置创建新连接器。有关说明,请参阅 创建连接器

隐私网站条款Cookie 首选项
© 2025, Amazon Web Services, Inc. 或其附属公司。保留所有权利。