本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
要在源连接器之间提供偏移连续性,您可以使用自己选择的偏移存储主题来代替默认主题。指定偏移存储主题可以帮助您完成创建源连接器之类的任务,该连接器可从上一个连接器的最后一个偏移恢复读取。
要指定偏移存储主题,请在创建连接器之前在工作程序配置中为 offset.storage.topic
属性提供一个值。如果要重复使用偏移存储主题来消耗先前创建的连接器的偏移,则必须为新连接器指定与旧连接器相同的名称。如果您创建自定义偏移存储主题,则必须在主题配置中将 cleanup.policy
compact
。
注意
如果您在创建接收器连接器时指定了偏移存储主题,MSKConnect 会创建该主题(如果该主题尚不存在)。但是,该主题不会用于存储连接器偏移,
而是使用 Kafka 使用器组协议来管理接收器连接器偏移。每个接收器连接器都会创建一个名为 connect-{CONNECTOR_NAME}
的组。只要使用器组存在,您创建的任何具有相同 CONNECTOR_NAME
值的连续接收器连接器都将从上次提交的偏移继续。
例 :指定偏移存储主题以使用更新后的配置重新创建源连接器
假设你有一个 change data capture (CDC) 连接器,并且你想在不失去在CDC直播中的位置的情况下修改连接器配置。您无法更新现有的连接器配置,但可以删除连接器并使用相同的名称创建新连接器。要告诉新连接器从何处开始读取CDC流,可以在工作器配置中指定旧连接器的偏移存储主题。以下步骤演示如何完成此任务。
-
在您的客户端计算机上,运行以下命令以查找连接器偏移存储主题的名称。将
替换为集群的引导代理字符串。有关获取引导代理字符串的说明,请参阅获取 Amazon MSK 集群的引导代理。<bootstrapBrokerString>
<path-to-your-kafka-installation>
/bin/kafka-topics.sh --list --bootstrap-server<bootstrapBrokerString>
以下输出显示了所有集群主题的列表,包括所有默认的内部连接器主题。在此示例中,现有CDC连接器使用 C MSK onnect 创建的默认偏移存储主题。这就是偏移存储主题名为
__amazon_msk_connect_offsets_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2
的原因。__consumer_offsets __amazon_msk_canary __amazon_msk_connect_configs_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2 __amazon_msk_connect_offsets_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2 __amazon_msk_connect_status_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2 my-msk-topic-1 my-msk-topic-2
-
打开 Amazon MSK 控制台,网址为https://console.aws.amazon.com/msk/
。 -
从连接器列表中选择您的连接器。复制并保存连接器配置字段的内容,以便您可以对其进行修改并使用它来创建新连接器。
-
要删除连接器,请选择删除。然后在文本输入字段中输入连接器名称,以确认删除。
-
使用适合您场景的值创建自定义工作程序配置。有关说明,请参阅创建自定义工作程序配置。
在工作程序配置中,必须将之前检索到的偏移存储主题的名称指定为类似于以下配置中
offset.storage.topic
的值。config.providers.secretManager.param.aws.region=us-east-1 key.converter=<org.apache.kafka.connect.storage.StringConverter> value.converter=<org.apache.kafka.connect.storage.StringConverter> config.providers.secretManager.class=com.github.jcustenborder.kafka.config.aws.SecretsManagerConfigProvider config.providers=secretManager offset.storage.topic=
__amazon_msk_connect_offsets_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2
-
重要
必须为新连接器指定与旧连接器相同的名称。
使用在上一步中设置的工作程序配置创建新连接器。有关说明,请参阅 创建连接器。