Operadores - Amazon Managed Streaming for Apache Kafka

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Operadores

Um operador é um processo de máquina virtual Java (JVM) que executa a lógica do conector. Cada operador cria um conjunto de tarefas que são executadas em threads paralelos e fazem o trabalho de copiar os dados. As tarefas não armazenam o estado e, portanto, podem ser iniciadas, interrompidas ou reiniciadas a qualquer momento para fornecer um pipeline de dados resiliente e escalável. Alterações no número de operadores, seja devido a um evento de escalonamento ou devido a falhas inesperadas, são detectadas automaticamente pelos demais operadores. Eles se organizam para reequilibrar as tarefas no conjunto de operadores restantes. Os operadores do Connect usam os grupos de consumidores do Apache Kafka para coordenar e reequilibrar.

Se os requisitos de capacidade do seu conector forem variáveis ou difíceis de estimar, você pode deixar o MSK Connect escalar o número de operadores conforme necessário entre um limite inferior e um limite superior que você determina. Como alternativa, você pode especificar o número exato de operadores que deseja executar em sua lógica de conector. Para ter mais informações, consulte Capacidade do conector.

Os trabalhadores do MSK Connect consomem endereços IP

Os funcionários do MSK Connect consomem endereços IP nas sub-redes fornecidas pelo cliente. Cada trabalhador usa um endereço IP de uma das sub-redes fornecidas pelo cliente. Você deve garantir que tenha endereços IP disponíveis suficientes nas sub-redes fornecidas a uma CreateConnector solicitação para considerar a capacidade especificada, especialmente ao escalar automaticamente conectores em que o número de trabalhadores pode flutuar.

Configuração padrão de operador

O MSK Connect fornece a seguinte configuração padrão de operador:

key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter

Propriedades de configuração de operador compatíveis

O MSK Connect fornece uma configuração padrão de operador. Também há a opção de criar uma configuração personalizada de operador para usar com os conectores. A lista a seguir inclui informações sobre as propriedades de configuração do operador compatíveis ou não com o Amazon MSK Connect.

  • As propriedades key.converter e value.converter são obrigatórias.

  • O MSK Connect é compatível com as seguintes propriedades de configuração de producer..

    producer.acks producer.batch.size producer.buffer.memory producer.compression.type producer.enable.idempotence producer.key.serializer producer.max.request.size producer.metadata.max.age.ms producer.metadata.max.idle.ms producer.partitioner.class producer.reconnect.backoff.max.ms producer.reconnect.backoff.ms producer.request.timeout.ms producer.retry.backoff.ms producer.value.serializer
  • O MSK Connect é compatível com as seguintes propriedades de configuração de consumer..

    consumer.allow.auto.create.topics consumer.auto.offset.reset consumer.check.crcs consumer.fetch.max.bytes consumer.fetch.max.wait.ms consumer.fetch.min.bytes consumer.heartbeat.interval.ms consumer.key.deserializer consumer.max.partition.fetch.bytes consumer.max.poll.records consumer.metadata.max.age.ms consumer.partition.assignment.strategy consumer.reconnect.backoff.max.ms consumer.reconnect.backoff.ms consumer.request.timeout.ms consumer.retry.backoff.ms consumer.session.timeout.ms consumer.value.deserializer
  • Todas as outras propriedades de configuração que não comecem com os prefixos producer. ou consumer. são compatíveis, exceto as propriedades a seguir.

    access.control. admin. admin.listeners.https. client. connect. inter.worker. internal. listeners.https. metrics. metrics.context. rest. sasl. security. socket. ssl. topic.tracking. worker. bootstrap.servers config.storage.topic connections.max.idle.ms connector.client.config.override.policy group.id listeners metric.reporters plugin.path receive.buffer.bytes response.http.headers.config scheduled.rebalance.max.delay.ms send.buffer.bytes status.storage.topic

Para obter mais informações sobre as propriedades de configuração do operador e o que elas representam, consulte Configurações do Kafka para o Connect na documentação do Apache Kafka.

Criação de uma configuração personalizada de operador

Criando uma configuração de trabalhador personalizada usando o AWS Management Console
  1. Abra o console do Amazon MSK em https://console.aws.amazon.com/msk/.

  2. No painel esquerdo, em MSK Connect, escolha Configurações do operador.

  3. Escolha Criar configuração de operador.

  4. Insira um nome e uma descrição opcional e, em seguida, adicione as propriedades e os valores para os quais você deseja defini-los.

  5. Escolha Criar configuração de operador.

Para usar a API MSK Connect para criar uma configuração de trabalho, consulte CreateWorkerConfiguration.

Gerenciamento de deslocamentos do conector de origem usando offset.storage.topic

Esta seção fornece informações para ajudar você a gerenciar os deslocamentos do conector de origem usando o tópico de deslocamento de armazenamento. O tópico de deslocamento de armazenamento é um tópico interno que o Kafka Connect usa para armazenar deslocamentos de configuração de conectores e tarefas.

Como usar o tópico padrão de deslocamento de armazenamento

Por padrão, o Amazon MSK Connect gera um novo tópico de deslocamento de armazenamento em seu cluster do Kafka para cada conector que você cria. O MSK estrutura o nome do tópico padrão usando partes do ARN do conector. Por exemplo, __amazon_msk_connect_offsets_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2.

Como especificar seu próprio tópico de deslocamento de armazenamento

Para fornecer continuidade de deslocamento entre conectores de origem, você pode usar um tópico de deslocamento de armazenamento de sua escolha em vez do tópico padrão. Especificar um tópico de deslocamento de armazenamento ajuda você a realizar tarefas como criar um conector de origem que retoma a leitura desde o último deslocamento de um conector anterior.

Para especificar um tópico de deslocamento de armazenamento, você fornece um valor para a propriedade offset.storage.topic em sua configuração de operador antes de criar um conector. Se quiser reutilizar o tópico de deslocamento de armazenamento para consumir deslocamentos de um conector criado anteriormente, você deverá dar ao novo conector o mesmo nome do conector antigo. Se você criar um tópico personalizado de deslocamento de armazenamento, deverá definir cleanup.policy como compact na configuração do tópico.

nota

Se você especificar um tópico de deslocamento de armazenamento ao criar um conector de coletor, o MSK Connect criará o tópico se ele ainda não existir. No entanto, o tópico não será usado para armazenar deslocamentos de conectores.

Em vez disso, os deslocamentos do conector do coletor serão gerenciados usando o protocolo de grupo de consumidores Kafka. Cada conector de coletor cria um grupo chamado connect-{CONNECTOR_NAME}. Enquanto o grupo de consumidores existir, todos os conectores de coletor sucessivos que você criar com o mesmo valor CONNECTOR_NAME continuarão a partir do último deslocamento confirmado.

exemplo Especificar um tópico de deslocamento de armazenamento para recriar um conector de origem com uma configuração atualizada

Suponha que você tenha um conector de Change Data Capture (CDC – Captura de dados de alteração) e queira modificar a configuração do conector sem perder seu lugar no fluxo do CDC. Não é possível atualizar a configuração do conector existente, mas você pode excluir o conector e criar um novo com o mesmo nome. Para informar ao novo conector por onde começar a leitura no fluxo do CDC, você pode especificar o tópico de deslocamento de armazenamento do conector antigo em sua configuração de operador. As etapas a seguir demonstram como concluir essa tarefa.

  1. Em sua máquina cliente, execute o comando a seguir para encontrar o nome do tópico de deslocamento de armazenamento do seu conector. Substitua <bootstrapBrokerString> pela string do agente de bootstrap do seu cluster. Para obter instruções sobre como obter sua string de agente de bootstrap, consulte Como obter agentes de bootstrap para um cluster do Amazon MSK.

    <path-to-your-kafka-installation>/bin/kafka-topics.sh --list --bootstrap-server <bootstrapBrokerString>

    A saída a seguir mostra uma lista de todos os tópicos do cluster, incluindo qualquer tópico de conector interno padrão. Neste exemplo, o conector CDC existente usa o tópico padrão de deslocamento de armazenamento criado pelo MSK Connect. É por isso que o tópico de deslocamento de armazenamento é chamado de __amazon_msk_connect_offsets_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2.

    __consumer_offsets __amazon_msk_canary __amazon_msk_connect_configs_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2 __amazon_msk_connect_offsets_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2 __amazon_msk_connect_status_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2 my-msk-topic-1 my-msk-topic-2
  2. Abra o console do Amazon MSK em https://console.aws.amazon.com/msk/.

  3. Escolha seu conector na lista Conectores. Copie e salve o conteúdo do campo Configuração do conector para que você possa modificá-lo e usá-lo na criação do novo conector.

  4. Selecione Excluir para excluir o conector. Em seguida, insira o nome do conector no campo de entrada de texto para confirmar a exclusão.

  5. Crie uma configuração personalizada de operador com valores adequados ao seu cenário. Para obter instruções, consulte Criação de uma configuração personalizada de operador.

    Em sua configuração de operador, você deve especificar o nome do tópico de deslocamento de armazenamento que você recuperou anteriormente como o valor de offset.storage.topic, assim como na configuração a seguir.

    config.providers.secretManager.param.aws.region=us-east-1 key.converter=<org.apache.kafka.connect.storage.StringConverter> value.converter=<org.apache.kafka.connect.storage.StringConverter> config.providers.secretManager.class=com.github.jcustenborder.kafka.config.aws.SecretsManagerConfigProvider config.providers=secretManager offset.storage.topic=__amazon_msk_connect_offsets_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2
  6. Importante

    Você deve dar ao seu novo conector o mesmo nome do conector antigo.

    Crie um novo conector usando a configuração de operador que você definiu na etapa anterior. Para obter instruções, consulte Como criar um conector.

Considerações

Considere o seguinte ao gerenciar os deslocamentos do conector de origem.

  • Para especificar um tópico de deslocamento de armazenamento, forneça o nome do tópico do Kafka no qual os deslocamentos do conector são armazenados como o valor offset.storage.topic em sua configuração de operador.

  • Tenha cuidado ao fazer alterações na configuração de um conector. A alteração dos valores da configuração pode resultar em um comportamento não intencional do conector se um conector de origem usar valores da configuração para os principais registros de deslocamento. Recomendamos que você consulte a documentação do seu plug-in para obter orientação.

  • Personalize o número padrão de partições: além de personalizar a configuração do operador adicionando offset.storage.topic, você pode personalizar o número de partições para os tópicos de deslocamento e armazenamento de status. As partições padrão para tópicos internos são as seguintes.

    • config.storage.topic: 1, não configurável, deve ser tópico de partição única

    • offset.storage.topic: 25, configurável fornecendo offset.storage.partitions

    • status.storage.topic: 5, configurável fornecendo status.storage.partitions

  • Exclusão manual de tópicos: o Amazon MSK Connect cria novos tópicos internos do Kafka Connect (o nome do tópico começa com __amazon_msk_connect) em cada implantação de conectores. Tópicos antigos anexados a conectores excluídos não são removidos automaticamente porque tópicos internos, como offset.storage.topic, podem ser reutilizados entre conectores. No entanto, você pode excluir manualmente tópicos internos não utilizados criados pelo MSK Connect. Os tópicos internos são nomeados segundo o formato __amazon_msk_connect_<offsets|status|configs>_connector_name_connector_id.

    É possível usar a expressão regular __amazon_msk_connect_<offsets|status|configs>_connector_name_connector_id para excluir os tópicos internos. Você não deve excluir um tópico interno que esteja sendo usado atualmente por um conector em execução.

  • Usar o mesmo nome para os tópicos internos criados pelo MSK Connect: se quiser reutilizar o tópico de deslocamento de armazenamento para consumir deslocamentos de um conector criado anteriormente, você deverá dar ao novo conector o mesmo nome do conector antigo. A propriedade offset.storage.topic pode ser definida usando a configuração do operador para atribuir o mesmo nome ao offset.storage.topic e reutilizada entre conectores diferentes. Essa configuração é descrita em Gerenciamento de deslocamentos de conectores. O MSK Connect não permite que conectores diferentes compartilhem config.storage.topic e status.storage.topic. Esses tópicos são criados sempre que você cria um novo conector no MSKC. Eles são nomeados automaticamente de acordo com o formato __amazon_msk_connect_<status|configs>_connector_name_connector_id e, portanto, são diferentes nos diferentes conectores que você cria.