Procesos de trabajo - Transmisión gestionada de Amazon para Apache Kafka

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Procesos de trabajo

Un trabajador es un proceso de máquina virtual Java (JVM) que ejecuta la lógica del conector. Cada proceso de trabajo crea un conjunto de tareas que se ejecutan en subprocesos paralelos y realizan el trabajo de copiar los datos. Las tareas no almacenan el estado y, por lo tanto, se pueden iniciar, detener o reiniciar en cualquier momento para proporcionar una canalización de datos flexible y escalable. Los demás procesos de trabajo detectan automáticamente los cambios en el número de procesos de trabajo, ya sea debido a un problema de escalamiento o a fallos inesperados. Se coordinan para reequilibrar las tareas entre el conjunto de procesos de trabajo restantes. Los procesos de trabajo de Connect utilizan los grupos de consumidores de Apache Kafka para coordinarse y reequilibrarse.

Si los requisitos de capacidad de su conector son variables o difíciles de estimar, puede dejar que MSK Connect escale el número de trabajadores según sea necesario entre el límite inferior y el límite superior que especifique. Como alternativa, puede especificar el número exacto de procesos de trabajo en los que desea ejecutar la lógica del conector. Para obtener más información, consulte Capacidad de conector.

MSKConnect: los trabajadores consumen direcciones IP

MSKLos trabajadores de Connect consumen direcciones IP en las subredes proporcionadas por el cliente. Cada trabajador usa una dirección IP de una de las subredes proporcionadas por el cliente. Debe asegurarse de tener suficientes direcciones IP disponibles en las subredes proporcionadas en respuesta a una CreateConnector solicitud para tener en cuenta la capacidad especificada, especialmente cuando se escalan automáticamente los conectores, donde la cantidad de trabajadores puede fluctuar.

Configuración predeterminada del proceso de trabajo

MSKConnect proporciona la siguiente configuración de trabajo predeterminada:

key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter

Propiedades de configuración de proceso de trabajo compatibles

MSKConnect proporciona una configuración de trabajo predeterminada. También tiene la opción de crear una configuración de proceso de trabajo personalizada para utilizarla con sus conectores. La siguiente lista incluye información sobre las propiedades de configuración de trabajo que Amazon MSK Connect admite o no admite.

  • Solo se necesitan las propiedades key.converter y value.converter.

  • MSKConnect admite las siguientes propiedades producer. de configuración.

    producer.acks producer.batch.size producer.buffer.memory producer.compression.type producer.enable.idempotence producer.key.serializer producer.linger.ms producer.max.request.size producer.metadata.max.age.ms producer.metadata.max.idle.ms producer.partitioner.class producer.reconnect.backoff.max.ms producer.reconnect.backoff.ms producer.request.timeout.ms producer.retry.backoff.ms producer.value.serializer
  • MSKConnect admite las siguientes propiedades consumer. de configuración.

    consumer.allow.auto.create.topics consumer.auto.offset.reset consumer.check.crcs consumer.fetch.max.bytes consumer.fetch.max.wait.ms consumer.fetch.min.bytes consumer.heartbeat.interval.ms consumer.key.deserializer consumer.max.partition.fetch.bytes consumer.max.poll.records consumer.metadata.max.age.ms consumer.partition.assignment.strategy consumer.reconnect.backoff.max.ms consumer.reconnect.backoff.ms consumer.request.timeout.ms consumer.retry.backoff.ms consumer.session.timeout.ms consumer.value.deserializer
  • Se admiten todas las demás propiedades de configuración que no comiencen por los prefijos producer. o consumer., excepto las siguientes propiedades.

    access.control. admin. admin.listeners.https. client. connect. inter.worker. internal. listeners.https. metrics. metrics.context. rest. sasl. security. socket. ssl. topic.tracking. worker. bootstrap.servers config.storage.topic connections.max.idle.ms connector.client.config.override.policy group.id listeners metric.reporters plugin.path receive.buffer.bytes response.http.headers.config scheduled.rebalance.max.delay.ms send.buffer.bytes status.storage.topic

Para obtener más información sobre las propiedades de configuración de trabajo y lo que representan, consulte Kafka Connect Configs en la documentación de Apache Kafka.

Creación de una configuración de proceso de trabajo personalizada

Crear una configuración de trabajo personalizada mediante AWS Management Console
  1. Abre la MSK consola de Amazon enhttps://console.aws.amazon.com/msk/.

  2. En el panel izquierdo, en MSKConnect, elija Worker Configurations.

  3. Elija Crear configuración de proceso de trabajo.

  4. Introduzca un nombre y una descripción opcional y, a continuación, añada las propiedades y los valores en los que desee establecerlos.

  5. Elija Crear configuración de proceso de trabajo.

Para usar MSK Connect API para crear una configuración de trabajo, consulte CreateWorkerConfiguration.

Gestión de desplazamientos de los conectores de origen mediante offset.storage.topic

En esta sección se proporciona información que le ayudará a gestionar los desplazamientos de los conectores de origen mediante el tema de almacenamiento de desplazamientos. El tema del almacenamiento de desplazamientos es un tema interno que Kafka Connect utiliza para almacenar los desplazamientos de configuración de conectores y tareas.

Uso del tema de almacenamiento de desplazamiento predeterminado

De forma predeterminada, Amazon MSK Connect genera un nuevo tema de almacenamiento offset en el clúster de Kafka para cada conector que cree. MSKconstruye el nombre del tema predeterminado a partir de partes del conector. ARN Por ejemplo, __amazon_msk_connect_offsets_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2.

Especificación de su propio tema de almacenamiento de desplazamiento

Para proporcionar continuidad de desplazamiento entre los conectores de origen, puede utilizar un tema de almacenamiento de desplazamiento de su elección en lugar del tema predeterminado. Especificar un tema de almacenamiento de desplazamiento le ayuda a realizar tareas como crear un conector de origen que reanude la lectura desde el último desplazamiento de un conector anterior.

Para especificar un tema de almacenamiento de desplazamiento, debe proporcionar un valor para la propiedad offset.storage.topic en su configuración de proceso de trabajo antes de crear un conector. Si desea reutilizar el tema de almacenamiento de desplazamientos para consumir los desplazamientos de un conector creado anteriormente, debe asignar al nuevo conector el mismo nombre que al conector anterior. Si crea un tema de almacenamiento de desplazamiento personalizado, debe definir cleanup.policy como compact en la configuración del tema.

nota

Si especifica un tema de almacenamiento de offset al crear un conector colector, MSK Connect crea el tema si aún no existe. Sin embargo, el tema no se utilizará para almacenar los desplazamientos de los conectores.

En cambio, los desplazamientos de los conectores de recepción se gestionan mediante el protocolo de grupos de consumidores de Kafka. Cada conector de recepción crea un grupo denominado connect-{CONNECTOR_NAME}. Mientras exista el grupo de consumidores, cualquier conector de recepción sucesivo que se cree con el mismo valor CONNECTOR_NAME se mantendrá desde el último desplazamiento asignado.

ejemplo : especificación de un tema de almacenamiento de desplazamiento para recrear un conector de origen con una configuración actualizada

Supongamos que tiene un conector de captura de datos de cambio (CDC) y desea modificar la configuración del conector sin perder su lugar en la CDC transmisión. No puede actualizar la configuración de conector existente, pero puede eliminar el conector y crear uno nuevo con el mismo nombre. Para indicar al nuevo conector dónde debe empezar a leer en la CDC transmisión, puede especificar el tema de almacenamiento desplazado del conector anterior en su configuración de trabajo. En los siguientes pasos se muestra cómo realizar esta tarea.

  1. En el equipo cliente, ejecute el siguiente comando para buscar el nombre del tema de almacenamiento de desplazamiento del conector. Sustituya <bootstrapBrokerString> por la cadena de agente de arranque de su clúster. Para ver instrucciones sobre cómo obtener la cadena de su agente de arranque, consulte Obtener los agentes de arranque para un clúster de Amazon MSK.

    <path-to-your-kafka-installation>/bin/kafka-topics.sh --list --bootstrap-server <bootstrapBrokerString>

    El siguiente resultado muestra una lista de todos los temas del clúster, incluidos los temas de conectores internos predeterminados. En este ejemplo, el CDC conector existente utiliza el tema de almacenamiento de offset predeterminado creado por MSK Connect. Por eso el tema del almacenamiento de desplazamiento se denomina __amazon_msk_connect_offsets_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2.

    __consumer_offsets __amazon_msk_canary __amazon_msk_connect_configs_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2 __amazon_msk_connect_offsets_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2 __amazon_msk_connect_status_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2 my-msk-topic-1 my-msk-topic-2
  2. Abre la MSK consola de Amazon en https://console.aws.amazon.com/msk/.

  3. Elija el conector de la lista Conectores. Copie y guarde el contenido del campo de Configuración del conector para poder modificarlo y usarlo para crear el nuevo conector.

  4. Para eliminar el conector, elija Eliminar. A continuación, ingrese el nombre del conector en el campo de entrada de texto para confirmar la eliminación.

  5. Cree una configuración de proceso de trabajo personalizada con valores que se adapten a su caso de uso. Para obtener instrucciones, consulte Creación de una configuración de proceso de trabajo personalizada.

    En su configuración de proceso de trabajo, debe especificar el nombre del tema de almacenamiento de desplazamiento que ha recuperado anteriormente como valor para offset.storage.topic como en la siguiente configuración.

    config.providers.secretManager.param.aws.region=us-east-1 key.converter=<org.apache.kafka.connect.storage.StringConverter> value.converter=<org.apache.kafka.connect.storage.StringConverter> config.providers.secretManager.class=com.github.jcustenborder.kafka.config.aws.SecretsManagerConfigProvider config.providers=secretManager offset.storage.topic=__amazon_msk_connect_offsets_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2
  6. importante

    Debe asignar al conector nuevo el mismo nombre que al conector anterior.

    Cree un conector nuevo con la configuración de proceso de trabajo que configuró en el paso anterior. Para obtener instrucciones, consulte Creación de un conector.

Consideraciones

Tenga en cuenta lo siguiente cuando gestione los desplazamientos del conector de origen.

  • Para especificar un tema de almacenamiento de desplazamientos, proporcione el nombre del tema de Kafka en el que se almacenan los desplazamientos de los conectores como valor offset.storage.topic en su configuración de proceso de trabajo.

  • Tenga cuidado al realizar cambios en la configuración de un conector. El cambio de los valores de configuración puede provocar un comportamiento no deseado del conector si un conector de origen utiliza valores de la configuración para introducir registros de desplazamiento. Recomendamos que consulte la documentación de su complemento para obtener orientación.

  • Personalice el número predeterminado de particiones: además de personalizar la configuración de proceso de trabajo añadiendo offset.storage.topic, puede personalizar el número de particiones para los temas de compensación y almacenamiento de estado. Las particiones predeterminadas para los temas internos son las siguientes.

    • config.storage.topic: 1, no configurable, debe ser un tema de partición única

    • offset.storage.topic: 25, configurable proporcionando offset.storage.partitions

    • status.storage.topic: 5, configurable proporcionando status.storage.partitions

  • Eliminar temas manualmente: Amazon MSK Connect crea nuevos temas internos de Kafka Connect (el nombre del tema comienza por__amazon_msk_connect) en cada implementación de conectores. Los temas antiguos que se adjuntan a los conectores eliminados no se eliminan automáticamente, ya que los temas internos, como offset.storage.topic, se pueden reutilizar entre los conectores. Sin embargo, puede eliminar manualmente los temas internos no utilizados creados por MSK Connect. Los temas internos se nombran siguiendo el formato __amazon_msk_connect_<offsets|status|configs>_connector_name_connector_id.

    La expresión regular __amazon_msk_connect_<offsets|status|configs>_connector_name_connector_id se puede utilizar para eliminar los temas internos. No debe eliminar un tema interno que utiliza actualmente un conector en ejecución.

  • Utilizar el mismo nombre para los temas internos creados por MSK Connect: si desea reutilizar el tema de almacenamiento de desplazamientos para consumir los desplazamientos de un conector creado anteriormente, debe asignar al nuevo conector el mismo nombre que al conector anterior. La propiedad offset.storage.topic se puede establecer mediante la configuración del proceso de trabajo para asignar el mismo nombre al conector offset.storage.topic y reutilizarla entre distintos conectores. Esta configuración se describe en Gestión de desplazamientos de los conectores. MSKConnect no permite que diferentes conectores compartan config.storage.topic ystatus.storage.topic. Estos temas se crean cada vez que se crea un conector nuevo enMSKC. Se les asigna un nombre automáticamente según el formato __amazon_msk_connect_<status|configs>_connector_name_connector_id y, por lo tanto, son diferentes en los distintos conectores que cree.