To provide offset continuity between source connectors, you can use an offset storage topic of your choice instead of the default topic. Specifying an offset storage topic helps you accomplish tasks like creating a source connector that resumes reading from the last offset of a previous connector.
To specify an offset storage topic, you supply a value for the
offset.storage.topic
property in your worker configuration before
you create a connector. If you want to reuse the offset storage topic to consume
offsets from a previously created connector, you must give the new connector the
same name as the old connector. If you create a custom offset storage topic, you
must set cleanup.policy
compact
in your topic
configuration.
Note
If you specify an offset storage topic when you create a sink connector, MSK Connect creates the topic if it does not already exist. However, the topic will not be used to store connector offsets.
Sink connector offsets are instead managed using the Kafka consumer group
protocol. Each sink connector creates a group named
connect-{CONNECTOR_NAME}
. As long as the consumer group exists,
any successive sink connectors that you create with the same
CONNECTOR_NAME
value will continue from the last committed
offset.
Example : Specifying an offset storage topic to recreate a source connector with an updated configuration
Suppose you have a change data capture (CDC) connector and you want to modify the connector configuration without losing your place in the CDC stream. You can't update the existing connector configuration, but you can delete the connector and create a new one with the same name. To tell the new connector where to start reading in the CDC stream, you can specify the old connector's offset storage topic in your worker configuration. The following steps demonstrate how to accomplish this task.
-
On your client machine, run the following command to find the name of your connector's offset storage topic. Replace
with your cluster's bootstrap broker string. For instructions on getting your bootstrap broker string, see Get the bootstrap brokers for an Amazon MSK cluster.<bootstrapBrokerString>
<path-to-your-kafka-installation>
/bin/kafka-topics.sh --list --bootstrap-server<bootstrapBrokerString>
The following output shows a list of all cluster topics, including any default internal connector topics. In this example, the existing CDC connector uses the default offset storage topic created by MSK Connect. This is why the offset storage topic is called
__amazon_msk_connect_offsets_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2
.__consumer_offsets __amazon_msk_canary __amazon_msk_connect_configs_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2 __amazon_msk_connect_offsets_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2 __amazon_msk_connect_status_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2 my-msk-topic-1 my-msk-topic-2
-
Open the Amazon MSK console at https://console.aws.amazon.com/msk/
. -
Choose your connector from the Connectors list. Copy and save the contents of the Connector configuration field so that you can modify it and use it to create the new connector.
-
Choose Delete to delete the connector. Then enter the connector name in the text input field to confirm deletion.
-
Create a custom worker configuration with values that fit your scenario. For instructions, see Create a custom worker configuration.
In your worker configuration, you must specify the name of the offset storage topic that you previously retrieved as the value for
offset.storage.topic
like in the following configuration.config.providers.secretManager.param.aws.region=eu-west-3 key.converter=<org.apache.kafka.connect.storage.StringConverter> value.converter=<org.apache.kafka.connect.storage.StringConverter> config.providers.secretManager.class=com.github.jcustenborder.kafka.config.aws.SecretsManagerConfigProvider config.providers=secretManager offset.storage.topic=
__amazon_msk_connect_offsets_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2
-
Important
You must give your new connector the same name as the old connector.
Create a new connector using the worker configuration that you set up in the previous step. For instructions, see Create a connector.