Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.
Procesos de trabajo
Un trabajador es un proceso de máquina virtual Java (JVM) que ejecuta la lógica del conector. Cada proceso de trabajo crea un conjunto de tareas que se ejecutan en subprocesos paralelos y realizan el trabajo de copiar los datos. Las tareas no almacenan el estado y, por lo tanto, se pueden iniciar, detener o reiniciar en cualquier momento para proporcionar una canalización de datos flexible y escalable. Los demás procesos de trabajo detectan automáticamente los cambios en el número de procesos de trabajo, ya sea debido a un problema de escalamiento o a fallos inesperados. Se coordinan para reequilibrar las tareas entre el conjunto de procesos de trabajo restantes. Los procesos de trabajo de Connect utilizan los grupos de consumidores de Apache Kafka para coordinarse y reequilibrarse.
Si los requisitos de capacidad de su conector son variables o difíciles de estimar, puede dejar que MSK Connect escale el número de trabajadores según sea necesario entre el límite inferior y el límite superior que especifique. Como alternativa, puede especificar el número exacto de procesos de trabajo en los que desea ejecutar la lógica del conector. Para obtener más información, consulte Capacidad de conector.
MSKConnect: los trabajadores consumen direcciones IP
MSKLos trabajadores de Connect consumen direcciones IP en las subredes proporcionadas por el cliente. Cada trabajador usa una dirección IP de una de las subredes proporcionadas por el cliente. Debe asegurarse de tener suficientes direcciones IP disponibles en las subredes proporcionadas en respuesta a una CreateConnector solicitud para tener en cuenta la capacidad especificada, especialmente cuando se escalan automáticamente los conectores, donde la cantidad de trabajadores puede fluctuar.
Temas
Configuración predeterminada del proceso de trabajo
MSKConnect proporciona la siguiente configuración de trabajo predeterminada:
key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter
Propiedades de configuración de proceso de trabajo compatibles
MSKConnect proporciona una configuración de trabajo predeterminada. También tiene la opción de crear una configuración de proceso de trabajo personalizada para utilizarla con sus conectores. La siguiente lista incluye información sobre las propiedades de configuración de trabajo que Amazon MSK Connect admite o no admite.
-
Solo se necesitan las propiedades
key.converter
yvalue.converter
. -
MSKConnect admite las siguientes propiedades
producer.
de configuración.producer.acks producer.batch.size producer.buffer.memory producer.compression.type producer.enable.idempotence producer.key.serializer producer.linger.ms producer.max.request.size producer.metadata.max.age.ms producer.metadata.max.idle.ms producer.partitioner.class producer.reconnect.backoff.max.ms producer.reconnect.backoff.ms producer.request.timeout.ms producer.retry.backoff.ms producer.value.serializer
-
MSKConnect admite las siguientes propiedades
consumer.
de configuración.consumer.allow.auto.create.topics consumer.auto.offset.reset consumer.check.crcs consumer.fetch.max.bytes consumer.fetch.max.wait.ms consumer.fetch.min.bytes consumer.heartbeat.interval.ms consumer.key.deserializer consumer.max.partition.fetch.bytes consumer.max.poll.records consumer.metadata.max.age.ms consumer.partition.assignment.strategy consumer.reconnect.backoff.max.ms consumer.reconnect.backoff.ms consumer.request.timeout.ms consumer.retry.backoff.ms consumer.session.timeout.ms consumer.value.deserializer
-
Se admiten todas las demás propiedades de configuración que no comiencen por los prefijos
producer.
oconsumer.
, excepto las siguientes propiedades.access.control. admin. admin.listeners.https. client. connect. inter.worker. internal. listeners.https. metrics. metrics.context. rest. sasl. security. socket. ssl. topic.tracking. worker. bootstrap.servers config.storage.topic connections.max.idle.ms connector.client.config.override.policy group.id listeners metric.reporters plugin.path receive.buffer.bytes response.http.headers.config scheduled.rebalance.max.delay.ms send.buffer.bytes status.storage.topic
Para obtener más información sobre las propiedades de configuración de trabajo y lo que representan, consulte Kafka Connect Configs
Creación de una configuración de proceso de trabajo personalizada
Crear una configuración de trabajo personalizada mediante AWS Management Console
Abre la MSK consola de Amazon enhttps://console.aws.amazon.com/msk/
. -
En el panel izquierdo, en MSKConnect, elija Worker Configurations.
-
Elija Crear configuración de proceso de trabajo.
-
Introduzca un nombre y una descripción opcional y, a continuación, añada las propiedades y los valores en los que desee establecerlos.
-
Elija Crear configuración de proceso de trabajo.
Para usar MSK Connect API para crear una configuración de trabajo, consulte CreateWorkerConfiguration.
Gestión de desplazamientos de los conectores de origen mediante offset.storage.topic
En esta sección se proporciona información que le ayudará a gestionar los desplazamientos de los conectores de origen mediante el tema de almacenamiento de desplazamientos. El tema del almacenamiento de desplazamientos es un tema interno que Kafka Connect utiliza para almacenar los desplazamientos de configuración de conectores y tareas.
Uso del tema de almacenamiento de desplazamiento predeterminado
De forma predeterminada, Amazon MSK Connect genera un nuevo tema de almacenamiento offset en el clúster de Kafka para cada conector que cree. MSKconstruye el nombre del tema predeterminado a partir de partes del conector. ARN Por ejemplo, __amazon_msk_connect_offsets_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2
.
Especificación de su propio tema de almacenamiento de desplazamiento
Para proporcionar continuidad de desplazamiento entre los conectores de origen, puede utilizar un tema de almacenamiento de desplazamiento de su elección en lugar del tema predeterminado. Especificar un tema de almacenamiento de desplazamiento le ayuda a realizar tareas como crear un conector de origen que reanude la lectura desde el último desplazamiento de un conector anterior.
Para especificar un tema de almacenamiento de desplazamiento, debe proporcionar un valor para la propiedad offset.storage.topic
en su configuración de proceso de trabajo antes de crear un conector. Si desea reutilizar el tema de almacenamiento de desplazamientos para consumir los desplazamientos de un conector creado anteriormente, debe asignar al nuevo conector el mismo nombre que al conector anterior. Si crea un tema de almacenamiento de desplazamiento personalizado, debe definir cleanup.policy
compact
en la configuración del tema.
nota
Si especifica un tema de almacenamiento de offset al crear un conector colector, MSK Connect crea el tema si aún no existe. Sin embargo, el tema no se utilizará para almacenar los desplazamientos de los conectores.
En cambio, los desplazamientos de los conectores de recepción se gestionan mediante el protocolo de grupos de consumidores de Kafka. Cada conector de recepción crea un grupo denominado connect-{CONNECTOR_NAME}
. Mientras exista el grupo de consumidores, cualquier conector de recepción sucesivo que se cree con el mismo valor CONNECTOR_NAME
se mantendrá desde el último desplazamiento asignado.
ejemplo : especificación de un tema de almacenamiento de desplazamiento para recrear un conector de origen con una configuración actualizada
Supongamos que tiene un conector de captura de datos de cambio (CDC) y desea modificar la configuración del conector sin perder su lugar en la CDC transmisión. No puede actualizar la configuración de conector existente, pero puede eliminar el conector y crear uno nuevo con el mismo nombre. Para indicar al nuevo conector dónde debe empezar a leer en la CDC transmisión, puede especificar el tema de almacenamiento desplazado del conector anterior en su configuración de trabajo. En los siguientes pasos se muestra cómo realizar esta tarea.
-
En el equipo cliente, ejecute el siguiente comando para buscar el nombre del tema de almacenamiento de desplazamiento del conector. Sustituya
por la cadena de agente de arranque de su clúster. Para ver instrucciones sobre cómo obtener la cadena de su agente de arranque, consulte Obtener los agentes de arranque para un clúster de Amazon MSK.<bootstrapBrokerString>
<path-to-your-kafka-installation>
/bin/kafka-topics.sh --list --bootstrap-server<bootstrapBrokerString>
El siguiente resultado muestra una lista de todos los temas del clúster, incluidos los temas de conectores internos predeterminados. En este ejemplo, el CDC conector existente utiliza el tema de almacenamiento de offset predeterminado creado por MSK Connect. Por eso el tema del almacenamiento de desplazamiento se denomina
__amazon_msk_connect_offsets_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2
.__consumer_offsets __amazon_msk_canary __amazon_msk_connect_configs_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2 __amazon_msk_connect_offsets_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2 __amazon_msk_connect_status_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2 my-msk-topic-1 my-msk-topic-2
-
Abre la MSK consola de Amazon en https://console.aws.amazon.com/msk/
. -
Elija el conector de la lista Conectores. Copie y guarde el contenido del campo de Configuración del conector para poder modificarlo y usarlo para crear el nuevo conector.
-
Para eliminar el conector, elija Eliminar. A continuación, ingrese el nombre del conector en el campo de entrada de texto para confirmar la eliminación.
-
Cree una configuración de proceso de trabajo personalizada con valores que se adapten a su caso de uso. Para obtener instrucciones, consulte Creación de una configuración de proceso de trabajo personalizada.
En su configuración de proceso de trabajo, debe especificar el nombre del tema de almacenamiento de desplazamiento que ha recuperado anteriormente como valor para
offset.storage.topic
como en la siguiente configuración.config.providers.secretManager.param.aws.region=us-east-1 key.converter=<org.apache.kafka.connect.storage.StringConverter> value.converter=<org.apache.kafka.connect.storage.StringConverter> config.providers.secretManager.class=com.github.jcustenborder.kafka.config.aws.SecretsManagerConfigProvider config.providers=secretManager offset.storage.topic=
__amazon_msk_connect_offsets_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2
-
importante
Debe asignar al conector nuevo el mismo nombre que al conector anterior.
Cree un conector nuevo con la configuración de proceso de trabajo que configuró en el paso anterior. Para obtener instrucciones, consulte Creación de un conector.
Consideraciones
Tenga en cuenta lo siguiente cuando gestione los desplazamientos del conector de origen.
-
Para especificar un tema de almacenamiento de desplazamientos, proporcione el nombre del tema de Kafka en el que se almacenan los desplazamientos de los conectores como valor
offset.storage.topic
en su configuración de proceso de trabajo. -
Tenga cuidado al realizar cambios en la configuración de un conector. El cambio de los valores de configuración puede provocar un comportamiento no deseado del conector si un conector de origen utiliza valores de la configuración para introducir registros de desplazamiento. Recomendamos que consulte la documentación de su complemento para obtener orientación.
-
Personalice el número predeterminado de particiones: además de personalizar la configuración de proceso de trabajo añadiendo
offset.storage.topic
, puede personalizar el número de particiones para los temas de compensación y almacenamiento de estado. Las particiones predeterminadas para los temas internos son las siguientes.-
config.storage.topic
: 1, no configurable, debe ser un tema de partición única -
offset.storage.topic
: 25, configurable proporcionandooffset.storage.partitions
-
status.storage.topic
: 5, configurable proporcionandostatus.storage.partitions
-
-
Eliminar temas manualmente: Amazon MSK Connect crea nuevos temas internos de Kafka Connect (el nombre del tema comienza por
__amazon_msk_connect
) en cada implementación de conectores. Los temas antiguos que se adjuntan a los conectores eliminados no se eliminan automáticamente, ya que los temas internos, comooffset.storage.topic
, se pueden reutilizar entre los conectores. Sin embargo, puede eliminar manualmente los temas internos no utilizados creados por MSK Connect. Los temas internos se nombran siguiendo el formato__amazon_msk_connect_<offsets|status|configs>_
.connector_name
_connector_id
La expresión regular
__amazon_msk_connect_<offsets|status|configs>_
se puede utilizar para eliminar los temas internos. No debe eliminar un tema interno que utiliza actualmente un conector en ejecución.connector_name
_connector_id
-
Utilizar el mismo nombre para los temas internos creados por MSK Connect: si desea reutilizar el tema de almacenamiento de desplazamientos para consumir los desplazamientos de un conector creado anteriormente, debe asignar al nuevo conector el mismo nombre que al conector anterior. La propiedad
offset.storage.topic
se puede establecer mediante la configuración del proceso de trabajo para asignar el mismo nombre al conectoroffset.storage.topic
y reutilizarla entre distintos conectores. Esta configuración se describe en Gestión de desplazamientos de los conectores. MSKConnect no permite que diferentes conectores compartanconfig.storage.topic
ystatus.storage.topic
. Estos temas se crean cada vez que se crea un conector nuevo enMSKC. Se les asigna un nombre automáticamente según el formato__amazon_msk_connect_<status|configs>_
y, por lo tanto, son diferentes en los distintos conectores que cree.connector_name
_connector_id