Usando o Apache Kafka como alvo para AWS Database Migration Service - AWS Database Migration Service

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Usando o Apache Kafka como alvo para AWS Database Migration Service

Você pode usar AWS DMS para migrar dados para um cluster Apache Kafka. O Apache Kafka é uma plataforma de streaming distribuída. É possível utilizar o Apache Kafka para a ingestão e o processamento de dados de streaming em tempo real.

AWS também oferece Amazon Managed Streaming for Apache Kafka (Amazon MSK) para uso como destino. AWS DMS O Amazon MSK é um serviço de streaming totalmente gerenciado do Apache Kafka que simplifica a implementação e o gerenciamento de instâncias do Apache Kafka. Ele funciona com versões de código aberto do Apache Kafka, e você acessa instâncias do Amazon MSK como AWS DMS destinos, exatamente como qualquer instância do Apache Kafka. Para obter mais informações, consulte O que é o Amazon MSK? no Guia do desenvolvedor do Amazon Managed Streaming for Apache Kafka.

Um cluster do Kafka armazena fluxos de registros em categorias chamadas tópicos que são divididos em partições. As partições são sequências de registros de dados identificadas exclusivamente (mensagens) em um tópico. As partições podem ser distribuídas entre vários agentes em um cluster para permitir o processamento paralelo dos registros de um tópico. Para obter mais informações sobre tópicos e partições e sua distribuição no Apache Kafka, consulte Tópicos e logs e Distribuição.

O cluster do Kafka pode ser uma instância do Amazon MSK, um cluster em execução em uma instância do Amazon EC2 ou um cluster on-premises. Uma instância do Amazon MSK ou um cluster em uma instância do Amazon EC2 pode estar na mesma VPC ou em uma diferente. Se o cluster estiver on-premises, será possível utilizar o seu próprio servidor de nomes on-premises para a instância de replicação para resolver o nome do host do cluster. Para obter informações sobre como configurar um servidor de nomes na instância de replicação, consulte Utilização do seu próprio servidor de nomes on-premises. Para obter mais informações sobre a configuração de uma rede, consulte Configurar uma rede para uma instância de replicação.

Ao utilizar um cluster do Amazon MSK, verifique se o grupo de segurança permite acesso da instância de replicação. Para obter informações sobre como alterar o grupo de segurança de um cluster do Amazon MSK, consulte Alterar o grupo de segurança de um cluster do Amazon MSK.

AWS Database Migration Service publica registros em um tópico do Kafka usando JSON. Durante a conversão, o AWS DMS serializa cada registro do banco de dados de origem em um par atributo-valor no formato JSON.

Para migrar os dados de uma fonte de dados compatível para um cluster de destino do Kafka, utilize o mapeamento de objetos. Com o mapeamento de objetos, você determina como estruturar os registros de dados no tópico de destino. Também é possível definir uma chave de partição para cada tabela, que será utilizada pelo Apache Kafka para agrupar os dados em suas partições.

Atualmente, AWS DMS oferece suporte a um único tópico por tarefa. Para uma única tarefa com várias tabelas, todas as mensagens vão para um único tópico. Cada mensagem inclui uma seção de metadados que identifica o esquema e a tabela de destino. AWS DMS as versões 3.4.6 e superiores oferecem suporte à replicação multitópica usando mapeamento de objetos. Para ter mais informações, consulte Replicação de multitópico utilizando o mapeamento de objetos.

Configurações do endpoint do Apache Kafka

Você pode especificar os detalhes da conexão por meio das configurações do endpoint no AWS DMS console ou da --kafka-settings opção na CLI. Os requisitos para cada configuração são os seguintes:

  • Broker: especifique a localização de um ou mais agentes no cluster do Kafka na forma de uma lista separada por vírgulas de cada broker-hostname:port. Um exemplo é "ec2-12-345-678-901.compute-1.amazonaws.com:2345,ec2-10-987-654-321.compute-1.amazonaws.com:9876". Essa configuração pode especificar os locais de qualquer um ou de todos os agentes no cluster. Todos os agentes de cluster se comunicam para lidar com o particionamento de registros de dados migrados para o tópico.

  • Topic: (opcional) especifique o nome do tópico com um comprimento máximo de 255 letras e símbolos. É possível utilizar ponto (.), sublinhado (_) e sinal de subtração (-). Os nomes de tópicos com um ponto (.) ou sublinhado (_) podem colidir em estruturas de dados internas. Utilize qualquer um, mas não esses dois símbolos no nome do tópico. Se você não especificar um nome de tópico, AWS DMS use "kafka-default-topic" como tópico de migração.

    nota

    Para AWS DMS criar um tópico de migração especificado por você ou o tópico padrão, defina-o auto.create.topics.enable = true como parte da configuração do cluster do Kafka. Para obter mais informações, consulte Limitações ao usar o Apache Kafka como alvo para AWS Database Migration Service.

  • MessageFormat: o formato de saída dos registros criados no endpoint. O formato da mensagem é JSON (padrão) ou JSON_UNFORMATTED (uma única linha sem guia).

  • MessageMaxBytes: o tamanho máximo em bytes dos registros criados no endpoint. O padrão é 1.000.000.

    nota

    Você só pode usar o AWS CLI/SDK para mudar MessageMaxBytes para um valor não padrão. Por exemplo, para modificar o endpoint existente do Kafka e alterar MessageMaxBytes, utilize o comando a seguir.

    aws dms modify-endpoint --endpoint-arn your-endpoint --kafka-settings Broker="broker1-server:broker1-port,broker2-server:broker2-port,...", Topic=topic-name,MessageMaxBytes=integer-of-max-message-size-in-bytes
  • IncludeTransactionDetails: fornece informações detalhadas sobre transações do banco de dados de origem. Essas informações incluem um timestamp de confirmação, uma posição no log e valores para transaction_id, previous_transaction_id e transaction_record_id (o deslocamento de registro dentro de uma transação). O padrão é false.

  • IncludePartitionValue: mostra o valor da partição na saída da mensagem do Kafka, a menos que o tipo de partição seja schema-table-type. O padrão é false.

  • PartitionIncludeSchemaTable: prefixa os nomes de esquema e de tabela em valores de partições, quando o tipo de partição for primary-key-type. Isso aumenta a distribuição de dados entre partições do Kafka. Por exemplo, suponha que um esquema SysBench tenha milhares de tabelas, e cada tabela tenha apenas um intervalo limitado para uma chave primária. Nesse caso, a mesma chave primária é enviada de milhares de tabelas para a mesma partição, o que provoca o controle de utilização. O padrão é false.

  • IncludeTableAlterOperations: inclui todas as operações da linguagem de definição de dados (DDL) que alteram a tabela nos dados de controle, como rename-table, drop-table, add-column, drop-column e rename-column. O padrão é false.

  • IncludeControlDetails: mostra informações detalhadas de controle para definição de tabela, definição de coluna e alterações de tabela e coluna na saída de mensagem do Kafka. O padrão é false.

  • IncludeNullAndEmpty: inclui colunas NULL e vazias no destino. O padrão é false.

  • SecurityProtocol: define uma conexão segura a um endpoint de destino do Kafka utilizando Transport Layer Security (TLS). As opções incluem ssl-authentication, ssl-encryption e sasl-ssl. A utilização de sasl-ssl requer SaslUsername e SaslPassword.

  • SslEndpointIdentificationAlgorithm— Define a verificação do nome do host para o certificado. Essa configuração é suportada na AWS DMS versão 3.5.1 e posterior. As opções incluem o seguinte:

    • NONE: desative a verificação do nome do host do broker na conexão do cliente.

    • HTTPS: Habilite a verificação do nome do host do broker na conexão do cliente.

É possível utilizar configurações para ajudar a aumentar a velocidade da transferência. Para fazer isso, o AWS DMS é compatível com uma carga multithreaded completa para um cluster de destino do Apache Kafka. O AWS DMS é compatível com esse multithreading com configurações de tarefa que incluem o seguinte:

  • MaxFullLoadSubTasks— Use essa opção para indicar o número máximo de tabelas de origem a serem carregadas paralelamente. AWS DMS carrega cada tabela em sua tabela de destino correspondente do Kafka usando uma subtarefa dedicada. O padrão é 8; o valor máximo é 49.

  • ParallelLoadThreads— Use essa opção para especificar o número de segmentos AWS DMS usados para carregar cada tabela em sua tabela de destino do Kafka. O valor máximo para um destino do Apache Kafka é 32. Você pode solicitar o aumento desse limite máximo.

  • ParallelLoadBufferSize: utilize esta opção para especificar o número máximo de registros a serem armazenados no buffer utilizado pelos threads de carregamento paralelo utilizam para carregar dados no destino do Kafka. O valor padrão é 50. Valor máximo de 1.000. Use essa configuração com ParallelLoadThreads; ParallelLoadBufferSize é válido somente quando há mais de um thread.

  • ParallelLoadQueuesPerThread: utilize esta opção para especificar o número de filas que cada thread simultâneo acessa para extrair registros de dados das filas e gerar uma carga em lote para o destino. O padrão é um. O máximo é 512.

É possível melhorar o desempenho da captura de dados de alteração (CDC) para endpoints do Kafka ajustando as configurações da tarefa para threads paralelos e operações em massa. Para fazer isso, especifique o número de threads simultâneos, filas por thread e o número de registros a serem armazenados em um buffer usando as configurações da tarefa ParallelApply*. Por exemplo, suponha que você queira executar um carregamento de CDC e aplicar 128 threads em paralelo. Você também quer acessar 64 filas por thread, com 50 registros armazenados por buffer.

Para promover o desempenho do CDC, AWS DMS oferece suporte a estas configurações de tarefas:

  • ParallelApplyThreads— Especifica o número de threads simultâneos que são AWS DMS usados durante um carregamento do CDC para enviar registros de dados para um endpoint de destino do Kafka. O valor padrão é zero (0) e o valor máximo é 32.

  • ParallelApplyBufferSize: especifica o número máximo de registros a serem armazenados em cada fila de buffer para threads simultâneos enviarem para um endpoint de destino do Kafka durante uma carga de CDC. O valor padrão é 100 e o valor máximo é 1.000. Use essa opção quando ParallelApplyThreads especificar mais de um thread.

  • ParallelApplyQueuesPerThread: especifica o número de filas que cada thread acessa para extrair registros de dados das filas e gerar uma carga em lote para um endpoint do Kafka durante a CDC. O padrão é um. O máximo é 512.

Ao usar configurações da tarefa ParallelApply*, o partition-key-type padrão é a primary-key da tabela, não o schema-name.table-name.

Conectar-se ao Kafka utilizando Transport Layer Security (TLS)

O cluster do Kafka aceita conexões seguras utilizando Transport Layer Security (TLS). Com o DMS, é possível utilizar qualquer uma das três opções de protocolo de segurança a seguir para proteger uma conexão de endpoint do Kafka.

Criptografia SSL (server-encryption)

Os clientes validam a identidade do servidor por meio do certificado do servidor. Uma conexão criptografada é feita entre o servidor e o cliente.

Autenticação SSL (mutual-authentication)

O servidor e o cliente validam a identidade entre si por meio de seus próprios certificados. Uma conexão criptografada é feita entre o servidor e o cliente.

SASL-SSL (mutual-authentication)

O método Simple Authentication and Security Layer (SASL) substitui o certificado do cliente por um nome de usuário e senha para validar a identidade do cliente. Especificamente, forneça um nome de usuário e uma senha que o servidor registrou para que o servidor possa validar a identidade de um cliente. Uma conexão criptografada é feita entre o servidor e o cliente.

Importante

O Apache Kafka e o Amazon MSK aceitam certificados resolvidos. Essa é uma limitação conhecida do Kafka e do Amazon MSK a ser resolvida. Para obter mais informações, consulte Problemas do Apache Kafka, KAFKA-3700.

Se estiver utilizando o Amazon MSK, considere utilizar listas de controle de acesso (ACLs) como uma solução alternativa para essa limitação conhecida. Para obter mais informações sobre como utilizar ACLs, consulte a seção ACLs do Apache Kafka do Guia do desenvolvedor do Amazon Managed Streaming for Apache Kafka.

Se estiver utilizando um cluster do Kafka autogerenciado, consulte Comentário datado de 18/out/21 para obter informações sobre como configurar o cluster.

Utilizar criptografia SSL com o Amazon MSK ou um cluster do Kafka autogerenciado

É possível utilizar a criptografia SSL para proteger uma conexão de endpoint ao Amazon MSK ou um cluster do Kafka autogerenciado. Ao utilizar o método de autenticação de criptografia SSL, os clientes validam a identidade de um servidor por meio do certificado do servidor. Uma conexão criptografada é feita entre o servidor e o cliente.

Como utilizar a criptografia SSL para conectar-se ao Amazon MSK
  • Defina a configuração do endpoint do protocolo de segurança (SecurityProtocol) utilizando a opção ssl-encryption ao criar o endpoint do Kafka de destino.

    O exemplo de JSON a seguir define o protocolo de segurança como criptografia SSL.

"KafkaSettings": { "SecurityProtocol": "ssl-encryption", }
Para utilizar a criptografia SSL para um cluster do Kafka autogerenciado
  1. Se estiver utilizando uma Autoridade de Certificação (CA) privada no cluster do Kafka on-premises, faça upload do certificado da CA privada e obtenha um nome do recurso da Amazon (ARN).

  2. Defina a configuração do endpoint do protocolo de segurança (SecurityProtocol) utilizando a opção ssl-encryption ao criar o endpoint do Kafka de destino. O exemplo de JSON a seguir define o protocolo de segurança como ssl-encryption.

    "KafkaSettings": { "SecurityProtocol": "ssl-encryption", }
  3. Se estiver utilizando uma CA privada, defina SslCaCertificateArn no ARN obtido na primeira etapa acima.

Utilizar autenticação SSL

É possível utilizar a autenticação SSL para proteger uma conexão de endpoint ao Amazon MSK ou um cluster do Kafka autogerenciado.

Para ativar a autenticação e a criptografia do cliente utilizando a autenticação SSL para conectar-se ao Amazon MSK, faça o seguinte:

  • Prepare uma chave privada e um certificado público para o Kafka.

  • Faça upload dos certificados no gerenciador de certificados do DMS.

  • Crie um endpoint de destino do Kafka com os ARNs dos certificados correspondentes especificados nas configurações do endpoint do Kafka.

Como preparar uma chave privada e um certificado público para o Amazon MSK.
  1. Crie uma instância do EC2 e configure um cliente para utilizar a autenticação conforme descrito nas etapas de 1 a 9 na seção Autenticação de cliente do Guia do desenvolvedor do Amazon Managed Streaming for Apache Kafka.

    Depois de concluir essas etapas, você tem o ARN de um certificado (o ARN do certificado público salvo no ACM) e uma chave privada contida em um arquivo kafka.client.keystore.jks.

  2. Obtenha o certificado público e copie o certificado no arquivo signed-certificate-from-acm.pem, utilizando o comando a seguir:

    aws acm-pca get-certificate --certificate-authority-arn Private_CA_ARN --certificate-arn Certificate_ARN

    O comando retorna informações semelhantes às do exemplo a seguir.

    {"Certificate": "123", "CertificateChain": "456"}

    Copie o equivalente de "123" no arquivo signed-certificate-from-acm.pem.

  3. Obtenha a chave privada importando a chave msk-rsa de kafka.client.keystore.jks to keystore.p12, conforme mostrado no exemplo a seguir.

    keytool -importkeystore \ -srckeystore kafka.client.keystore.jks \ -destkeystore keystore.p12 \ -deststoretype PKCS12 \ -srcalias msk-rsa-client \ -deststorepass test1234 \ -destkeypass test1234
  4. Utilize o comando a seguir para exportar keystore.p12 para o formato .pem.

    Openssl pkcs12 -in keystore.p12 -out encrypted-private-client-key.pem –nocerts

    A mensagem Inserir frase secreta do PEM é exibida e identifica a chave aplicada para criptografar o certificado.

  5. Remova os atributos bag e os atributos-chave do arquivo .pem para garantir que a primeira linha comece com a sequência de caracteres a seguir.

    ---BEGIN ENCRYPTED PRIVATE KEY---
Como fazer upload de um certificado público e de uma chave privada no gerenciador de certificados do DMS e testar a conexão ao Amazon MSK
  1. Faça upload do gerenciador de certificados do DMS utilizando o comando a seguir.

    aws dms import-certificate --certificate-identifier signed-cert --certificate-pem file://path to signed cert aws dms import-certificate --certificate-identifier private-key —certificate-pem file://path to private key
  2. Crie um endpoint de destino do Amazon MSK e teste a conexão para garantir que a autenticação TLS funcione.

    aws dms create-endpoint --endpoint-identifier $endpoint-identifier --engine-name kafka --endpoint-type target --kafka-settings '{"Broker": "b-0.kafka260.aaaaa1.a99.kafka.us-east-1.amazonaws.com:0000", "SecurityProtocol":"ssl-authentication", "SslClientCertificateArn": "arn:aws:dms:us-east-1:012346789012:cert:", "SslClientKeyArn": "arn:aws:dms:us-east-1:0123456789012:cert:","SslClientKeyPassword":"test1234"}' aws dms test-connection -replication-instance-arn=$rep_inst_arn —endpoint-arn=$kafka_tar_arn_msk
Importante

É possível utilizar a autenticação SSL para proteger uma conexão a um cluster do Kafka autogerenciado. Em alguns casos, é possível utilizar uma Autoridade de Certificação (CA) privada no cluster do Kafka on-premises. Nesse caso, faça upload da cadeia de CAs, do certificado público e da chave privada para o gerenciador de certificados do DMS. Utilize o nome do recurso da Amazon (ARN) correspondente nas configurações do endpoint ao criar o endpoint de destino do Kafka on-premises.

Como preparar uma chave privada e um certificado assinado para um cluster do Kafka autogerenciado
  1. Gere um par de chaves como mostrado no exemplo a seguir.

    keytool -genkey -keystore kafka.server.keystore.jks -validity 300 -storepass your-keystore-password -keypass your-key-passphrase -dname "CN=your-cn-name" -alias alias-of-key-pair -storetype pkcs12 -keyalg RSA
  2. Gere uma solicitação de assinatura de certificado (CSR).

    keytool -keystore kafka.server.keystore.jks -certreq -file server-cert-sign-request-rsa -alias on-premise-rsa -storepass your-key-store-password -keypass your-key-password
  3. Utilize a CA no truststore do cluster para assinar a CSR. Se não tiver uma CA, você poderá criar sua própria CA privada.

    openssl req -new -x509 -keyout ca-key -out ca-cert -days validate-days
  4. Importe ca-cert para o truststore e o keystore do servidor. Se você não tiver um truststore, utilize o seguinte comando para criar o truststore e importar ca-cert nele.

    keytool -keystore kafka.server.truststore.jks -alias CARoot -import -file ca-cert keytool -keystore kafka.server.keystore.jks -alias CARoot -import -file ca-cert
  5. Assine o certificado.

    openssl x509 -req -CA ca-cert -CAkey ca-key -in server-cert-sign-request-rsa -out signed-server-certificate.pem -days validate-days -CAcreateserial -passin pass:ca-password
  6. Importe o certificado assinado para o keystore.

    keytool -keystore kafka.server.keystore.jks -import -file signed-certificate.pem -alias on-premise-rsa -storepass your-keystore-password -keypass your-key-password
  7. Utilize o comando a seguir para importar a chave on-premise-rsa de kafka.server.keystore.jks para keystore.p12.

    keytool -importkeystore \ -srckeystore kafka.server.keystore.jks \ -destkeystore keystore.p12 \ -deststoretype PKCS12 \ -srcalias on-premise-rsa \ -deststorepass your-truststore-password \ -destkeypass your-key-password
  8. Utilize o comando a seguir para exportar keystore.p12 para o formato .pem.

    Openssl pkcs12 -in keystore.p12 -out encrypted-private-server-key.pem –nocerts
  9. Faça upload de encrypted-private-server-key.pem, signed-certificate.pem e ca-cert para o gerenciador de certificados do DMS.

  10. Crie um endpoint utilizando os ARNs retornados.

    aws dms create-endpoint --endpoint-identifier $endpoint-identifier --engine-name kafka --endpoint-type target --kafka-settings '{"Broker": "b-0.kafka260.aaaaa1.a99.kafka.us-east-1.amazonaws.com:9092", "SecurityProtocol":"ssl-authentication", "SslClientCertificateArn": "your-client-cert-arn","SslClientKeyArn": "your-client-key-arn","SslClientKeyPassword":"your-client-key-password", "SslCaCertificateArn": "your-ca-certificate-arn"}' aws dms test-connection -replication-instance-arn=$rep_inst_arn —endpoint-arn=$kafka_tar_arn_msk

Utilizar a autenticação SASL-SSL para se conectar ao Amazon MSK

O método Simple Authentication and Security Layer (SASL) utiliza um nome de usuário e uma senha para validar a identidade de um cliente e faz uma conexão criptografada entre o servidor e o cliente.

Para utilizar o SASL, primeiro crie um nome de usuário e uma senha seguros ao configurar o cluster do Amazon MSK. Para obter uma descrição de como configurar um nome de usuário e senha seguros para um cluster do Amazon MSK, consulte Configurar a autenticação SASL/SCRAM para um cluster do Amazon MSK no Guia do desenvolvedor do Amazon Managed Streaming for Apache Kafka.

Crie o endpoint de destino do Kafka, defina a configuração do endpoint do protocolo de segurança (SecurityProtocol) utilizando a opção sasl-ssl. Defina também as opções SaslUsername e SaslPassword. Verifique se essas opções são consistentes com o nome de usuário e senha seguros criados ao configurar o cluster do Amazon MSK pela primeira vez, conforme mostrado no exemplo de JSON a seguir.

"KafkaSettings": { "SecurityProtocol": "sasl-ssl", "SaslUsername":"Amazon MSK cluster secure user name", "SaslPassword":"Amazon MSK cluster secure password" }
nota
  • Atualmente, AWS DMS oferece suporte somente a SASL-SSL público apoiado pela CA. O DMS não é compatível com SASL-SSL para utilização com o Kafka autogerenciado compatível com a CA privada.

  • Para autenticação SASL-SSL, AWS DMS suporta o mecanismo SCRAM-SHA-512 por padrão. AWS DMS as versões 3.5.0 e superiores também suportam o mecanismo Plain. Para ser compatível com o mecanismo Plain, defina o parâmetro SaslMechanism do tipo de dados da API do KafkaSettings como PLAIN.

Utilizar uma imagem anterior para visualizar os valores originais de linhas da CDC para o Apache Kafka como destino

Ao gravar as atualizações de CDC em um destino de streaming de dados, como o Kafka, é possível visualizar os valores originais de linhas do banco de dados de origem antes da alteração por uma atualização. Para tornar isso possível, AWS DMS preenche uma imagem anterior dos eventos de atualização com base nos dados fornecidos pelo mecanismo do banco de dados de origem.

Diferentes mecanismos de banco de dados de origem fornecem diferentes quantidades de informações para uma imagem anterior:

  • O Oracle fornece atualizações para colunas somente se elas forem alteradas.

  • O PostgreSQL fornece somente dados para colunas que fazem parte da chave primária (alterada ou não). Se a replicação lógica estiver em uso e a REPLICA IDENTITY FULL estiver definida para a tabela de origem, será possível obter informações completas de antes e depois na linha gravada nos WALs e disponíveis aqui.

  • O MySQL geralmente fornece dados para todas as colunas (alteradas ou não).

Para habilitar a criação de imagem anterior para adicionar valores originais do banco de dados de origem à saída do AWS DMS , use a configuração de tarefa BeforeImageSettings ou o parâmetro add-before-image-columns. Esse parâmetro aplica uma regra de transformação de coluna.

BeforeImageSettings adiciona um novo atributo JSON a cada operação de atualização com valores coletados do sistema de banco de dados de origem, conforme mostrado a seguir.

"BeforeImageSettings": { "EnableBeforeImage": boolean, "FieldName": string, "ColumnFilter": pk-only (default) / non-lob / all (but only one) }
nota

Aplique BeforeImageSettings às tarefas de carga máxima e de CDC (que migram dados existentes e replicam alterações contínuas) ou às tarefas de somente CDC (que replicam somente as alterações de dados). Não aplique BeforeImageSettings a tarefas que são somente de carga total.

Para opções BeforeImageSettings, aplica-se o seguinte:

  • Defina a opção EnableBeforeImage como true para habilitar a criação de imagem anterior. O padrão é false.

  • Use a opção FieldName para atribuir um nome ao novo atributo JSON. Quando EnableBeforeImage for true, FieldName será necessário e não poderá estar vazio.

  • A opção ColumnFilter especifica uma coluna a ser adicionada usando imagem anterior. Para adicionar somente colunas que fazem parte das chaves primárias da tabela, use o valor padrão, pk-only. Para adicionar somente colunas que não são do tipo LOB, utilize non-lob. Para adicionar qualquer coluna que tenha um valor de imagem anterior, use all.

    "BeforeImageSettings": { "EnableBeforeImage": true, "FieldName": "before-image", "ColumnFilter": "pk-only" }

Usar uma regra de transformação de imagem anterior

Como alternativa às configurações de tarefa, é possível usar o parâmetro add-before-image-columns, que aplica uma regra de transformação de coluna. Com esse parâmetro, é possível habilitar a criação de imagem anterior durante a CDC em destinos de streaming de dados, como o Kafka.

Usando add-before-image-columns em uma regra de transformação, é possível aplicar um controle mais refinado dos resultados da imagem anterior. As regras de transformação permitem que você use um localizador de objetos que oferece controle sobre as tabelas selecionadas para a regra. Além disso, é possível encadear regras de transformação, o que permite que regras diferentes sejam aplicadas a tabelas diferentes. Depois, você poderá manipular as colunas produzidas usando outras regras.

nota

Não use o parâmetro add-before-image-columns junto com a configuração da tarefa BeforeImageSettings na mesma tarefa. Em vez disso, use o parâmetro ou a configuração, mas não ambos, para uma única tarefa.

Um tipo de regra transformation com o parâmetro add-before-image-columns de uma coluna deve fornecer uma seção before-image-def. Por exemplo:

{ "rule-type": "transformation", … "rule-target": "column", "rule-action": "add-before-image-columns", "before-image-def":{ "column-filter": one-of (pk-only / non-lob / all), "column-prefix": string, "column-suffix": string, } }

O valor de column-prefix precede um nome de coluna e o valor padrão de column-prefix é BI_. O valor de column-suffix é anexado ao nome da coluna e o padrão é vazio. Não defina column-prefix e column-suffix como strings vazias.

Escolha um valor para column-filter. Para adicionar somente colunas que fazem parte das chaves primárias da tabela, escolha pk-only. Escolha non-lob para adicionar somente colunas que não sejam do tipo LOB. Ou escolha all para adicionar qualquer coluna que tenha um valor de imagem anterior.

Exemplo de uma regra de transformação de imagem anterior

A regra de transformação no exemplo a seguir adiciona uma nova coluna chamada BI_emp_no no destino. Portanto, uma instrução como UPDATE employees SET emp_no = 3 WHERE emp_no = 1; preenche o campo BI_emp_no com 1. Ao gravar atualizações da CDC em destinos do Amazon S3, a coluna BI_emp_no possibilita identificar qual linha original foi atualizada.

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "object-locator": { "schema-name": "%", "table-name": "%" }, "rule-action": "include" }, { "rule-type": "transformation", "rule-id": "2", "rule-name": "2", "rule-target": "column", "object-locator": { "schema-name": "%", "table-name": "employees" }, "rule-action": "add-before-image-columns", "before-image-def": { "column-prefix": "BI_", "column-suffix": "", "column-filter": "pk-only" } } ] }

Para obter informações sobre como usar a ação da regra add-before-image-columns, consulte Regras de transformação e ações.

Limitações ao usar o Apache Kafka como alvo para AWS Database Migration Service

Aplicam-se as seguintes limitações ao utilizar o Apache Kafka como destino:

  • AWS DMS Os endpoints de destino do Kafka não oferecem suporte ao controle de acesso do IAM para o Amazon Managed Streaming for Apache Kafka (Amazon MSK).

  • O modo Full LOB não é compatível.

  • Especifique um arquivo de configuração do Kafka para seu cluster com propriedades que permitem AWS DMS criar novos tópicos automaticamente. Inclua a configuração, auto.create.topics.enable = true. Se estiver utilizando o Amazon MSK, será possível especificar a configuração padrão ao criar o cluster do Kafka e alterar a configuração auto.create.topics.enable para true. Para obter mais informações sobre as configurações padrão, consulte A configuração padrão do Amazon MSK no Guia do desenvolvedor do Amazon Managed Streaming for Apache Kafka. Se você precisar modificar um cluster existente do Kafka criado usando o Amazon MSK, execute o AWS CLI comando aws kafka create-configuration para atualizar sua configuração do Kafka, como no exemplo a seguir:

    14:38:41 $ aws kafka create-configuration --name "kafka-configuration" --kafka-versions "2.2.1" --server-properties file://~/kafka_configuration { "LatestRevision": { "Revision": 1, "CreationTime": "2019-09-06T14:39:37.708Z" }, "CreationTime": "2019-09-06T14:39:37.708Z", "Name": "kafka-configuration", "Arn": "arn:aws:kafka:us-east-1:111122223333:configuration/kafka-configuration/7e008070-6a08-445f-9fe5-36ccf630ecfd-3" }

    Aqui, //~/kafka_configuration é o arquivo configuração criado com as configurações de propriedades necessárias.

    Se você estiver usando sua própria instância do Kafka instalada no Amazon EC2, modifique a configuração do cluster Kafka com auto.create.topics.enable = true a configuração para AWS DMS permitir a criação automática de novos tópicos, usando as opções fornecidas com sua instância.

  • AWS DMS publica cada atualização em um único registro no banco de dados de origem como um registro de dados (mensagem) em um determinado tópico do Kafka, independentemente das transações.

  • AWS DMS suporta as duas formas a seguir para chaves de partição:

    • SchemaName.TableName: uma combinação de esquema e nome da tabela.

    • ${AttributeName}: o valor de um dos campos no JSON ou a chave primária da tabela no banco de dados de origem.

  • O BatchApply não é compatível com um endpoint do Kafka. A utilização da aplicação em lote (por exemplo, a configuração da tarefa de metadados de destino BatchApplyEnabled) para um destino do Kafka pode resultar em perda de dados.

  • AWS DMS não suporta a migração de valores do tipo de BigInt dados com mais de 16 dígitos. Para contornar essa limitação, você pode usar a regra de transformação a seguir para converter a coluna BigInt em uma string. Para obter mais informações sobre regras transformação, consulte Regras de transformação e ações.

    { "rule-type": "transformation", "rule-id": "id", "rule-name": "name", "rule-target": "column", "object-locator": { "schema-name": "valid object-mapping rule action", "table-name": "", "column-name": "" }, "rule-action": "change-data-type", "data-type": { "type": "string", "length": 20 } }

Utilizar o mapeamento de objetos para migrar dados para um tópico do Kafka

AWS DMS usa regras de mapeamento de tabelas para mapear dados da fonte para o tópico de destino do Kafka. Para mapear dados para um tópico de destino, utilize um tipo de regra de mapeamento de tabelas chamado mapeamento de objetos. Utilize o mapeamento de objetos para definir como os registros de dados na origem são mapeados para os registros de dados publicados em um tópico do Kafka.

Os tópicos do Kafka não têm uma estrutura predefinida além de uma chave de partição.

nota

Não é necessário utilizar o mapeamento de objetos. É possível utilizar o mapeamento de tabela normal para várias transformações. No entanto, o tipo de chave de partição seguirá estes comportamentos padrão:

  • A chave primária é utilizada como uma chave de partição para a carga máxima.

  • Se nenhuma configuração da tarefa de aplicação paralela for utilizada, schema.table será utilizada como uma chave de partição para a CDC.

  • Se as configurações de tarefas de aplicação paralela forem utilizadas, a chave primária será utilizada como uma chave de partição para a CDC.

Para criar uma regra de mapeamento de objetos, especifique rule-type como object-mapping. Essa regra especifica o tipo de mapeamento de objeto que você deseja usar.

A estrutura da regra é a seguinte:

{ "rules": [ { "rule-type": "object-mapping", "rule-id": "id", "rule-name": "name", "rule-action": "valid object-mapping rule action", "object-locator": { "schema-name": "case-sensitive schema name", "table-name": "" } } ] }

AWS DMS atualmente suporta map-record-to-record e map-record-to-document como os únicos valores válidos para o rule-action parâmetro. Essas configurações afetam valores que não são excluídos como parte da lista de atributos exclude-columns. Os map-record-to-document valores map-record-to-record e especificam como AWS DMS manipula esses registros por padrão. Esses valores não afetam os mapeamentos de atributos de forma alguma.

Utilize o map-record-to-record ao migrar de um banco de dados relacional para um tópico do Kafka. Esse tipo de regra utiliza o valor taskResourceId.schemaName.tableName encontrado no banco de dados relacional como a chave de partição no tópico do Kafka e cria um atributo para cada coluna no banco de dados de origem.

Ao utilizar map-record-to-record, observe o seguinte:

  • Essa configuração afeta somente as colunas excluídas pela lista exclude-columns.

  • Para cada coluna desse tipo, AWS DMS cria um atributo correspondente no tópico de destino.

  • AWS DMS cria esse atributo correspondente independentemente de a coluna de origem ser usada em um mapeamento de atributos.

Uma maneira de compreender o map-record-to-record é vê-lo em ação. Para este exemplo, suponha que você está começando com uma linha de tabela do banco de dados relacional com a seguinte estrutura de dados:

FirstName LastName StoreId HomeAddress HomePhone WorkAddress WorkPhone DateofBirth

Randy

Marsh

5

221B Baker Street

1234567890

31 Spooner Street, Quahog

9876543210

29/02/1988

Para migrar essas informações de um esquema chamado Test para um tópico do Kafka, crie regras para mapear os dados para o tópico de destino. A regra a seguir ilustra o mapeamento.

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "rule-action": "include", "object-locator": { "schema-name": "Test", "table-name": "%" } }, { "rule-type": "object-mapping", "rule-id": "2", "rule-name": "DefaultMapToKafka", "rule-action": "map-record-to-record", "object-locator": { "schema-name": "Test", "table-name": "Customers" } } ] }

Com um tópico do Kafka e uma chave de partição determinados (neste caso, taskResourceId.schemaName.tableName), o seguinte ilustra o formato do registro resultante utilizando os nossos exemplos de dados no tópico de destino do Kafka:

{ "FirstName": "Randy", "LastName": "Marsh", "StoreId": "5", "HomeAddress": "221B Baker Street", "HomePhone": "1234567890", "WorkAddress": "31 Spooner Street, Quahog", "WorkPhone": "9876543210", "DateOfBirth": "02/29/1988" }

Reestruturação de dados com mapeamento de atributo

É possível reestruturar os dados ao migrá-los para um tópico do Kafka utilizando um mapa de atributos. Por exemplo, você pode combinar vários campos na origem em um único campo no destino. O mapa de atributo a seguir ilustra como reestruturar os dados.

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "rule-action": "include", "object-locator": { "schema-name": "Test", "table-name": "%" } }, { "rule-type": "object-mapping", "rule-id": "2", "rule-name": "TransformToKafka", "rule-action": "map-record-to-record", "target-table-name": "CustomerData", "object-locator": { "schema-name": "Test", "table-name": "Customers" }, "mapping-parameters": { "partition-key-type": "attribute-name", "partition-key-name": "CustomerName", "exclude-columns": [ "firstname", "lastname", "homeaddress", "homephone", "workaddress", "workphone" ], "attribute-mappings": [ { "target-attribute-name": "CustomerName", "attribute-type": "scalar", "attribute-sub-type": "string", "value": "${lastname}, ${firstname}" }, { "target-attribute-name": "ContactDetails", "attribute-type": "document", "attribute-sub-type": "json", "value": { "Home": { "Address": "${homeaddress}", "Phone": "${homephone}" }, "Work": { "Address": "${workaddress}", "Phone": "${workphone}" } } } ] } } ] }

Para definir um valor constante para partition-key, especifique um valor de partition-key. Por exemplo, é possível fazer isso para forçar o armazenamento de todos os dados em uma única partição. O mapeamento a seguir ilustra esse método.

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "object-locator": { "schema-name": "Test", "table-name": "%" }, "rule-action": "include" }, { "rule-type": "object-mapping", "rule-id": "1", "rule-name": "TransformToKafka", "rule-action": "map-record-to-document", "object-locator": { "schema-name": "Test", "table-name": "Customer" }, "mapping-parameters": { "partition-key": { "value": "ConstantPartitionKey" }, "exclude-columns": [ "FirstName", "LastName", "HomeAddress", "HomePhone", "WorkAddress", "WorkPhone" ], "attribute-mappings": [ { "attribute-name": "CustomerName", "value": "${FirstName},${LastName}" }, { "attribute-name": "ContactDetails", "value": { "Home": { "Address": "${HomeAddress}", "Phone": "${HomePhone}" }, "Work": { "Address": "${WorkAddress}", "Phone": "${WorkPhone}" } } }, { "attribute-name": "DateOfBirth", "value": "${DateOfBirth}" } ] } } ] }
nota

O valor do partition-key para um registro de controle para uma tabela específica é TaskId.SchemaName.TableName. O valor do partition-key para um registro de controle específico para uma tarefa é o TaskId daquele registro. A especificação de um valor do partition-key no mapeamento do objeto não tem impacto sobre o partition-key no caso dos registros de controle.

Replicação de multitópico utilizando o mapeamento de objetos

Por padrão, AWS DMS as tarefas migram todos os dados de origem para um dos tópicos do Kafka a seguir:

  • Conforme especificado no campo Tópico do endpoint de AWS DMS destino.

  • Conforme especificado por kafka-default-topic, se o campo Tópico do endpoint de destino não estiver preenchido e a configuração auto.create.topics.enable do Kafka estiver definida como true.

Com as versões 3.4.6 e posteriores do AWS DMS mecanismo, você pode usar o kafka-target-topic atributo para mapear cada tabela de origem migrada para um tópico separado. Por exemplo, as regras de mapeamento de objetos a seguir migram as tabelas de origem Customer e Address para os tópicos customer_topic e address_topic do Kafka, respectivamente. Ao mesmo tempo, AWS DMS migra todas as outras tabelas de origem, incluindo a Bills tabela no Test esquema, para o tópico especificado no endpoint de destino.

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "rule-action": "include", "object-locator": { "schema-name": "Test", "table-name": "%" } }, { "rule-type": "object-mapping", "rule-id": "2", "rule-name": "MapToKafka1", "rule-action": "map-record-to-record", "kafka-target-topic": "customer_topic", "object-locator": { "schema-name": "Test", "table-name": "Customer" }, "partition-key": {"value": "ConstantPartitionKey" } }, { "rule-type": "object-mapping", "rule-id": "3", "rule-name": "MapToKafka2", "rule-action": "map-record-to-record", "kafka-target-topic": "address_topic", "object-locator": { "schema-name": "Test", "table-name": "Address" }, "partition-key": {"value": "HomeAddress" } }, { "rule-type": "object-mapping", "rule-id": "4", "rule-name": "DefaultMapToKafka", "rule-action": "map-record-to-record", "object-locator": { "schema-name": "Test", "table-name": "Bills" } } ] }

Ao utilizar a replicação de multitópico do Kafka, é possível agrupar e migrar tabelas de origem para tópicos separados do Kafka utilizando uma única tarefa de replicação.

Formato de mensagem do Apache Kafka

A saída JSON é simplesmente uma lista de pares chave/valor.

RecordType

O tipo de registro pode ser dados ou controle. Os registros de dados representam as linhas reais na origem. Os registros de controle são relacionados a importantes eventos no stream, como a reinicialização de uma tarefa, por exemplo.

Operation

Para registros de dados, a operação pode ser load, insert, update ou delete.

Para registros de controle, a operação pode ser create-table, rename-table, drop-table, change-columns, add-column, drop-column, rename-column ou column-type-change.

SchemaName

O esquema de origem para o registro. Esse campo pode estar vazio para um registro de controle.

TableName

A tabela de origem para um registro. Esse campo pode estar vazio para um registro de controle.

Timestamp

A marca de data e hora de quando a mensagem do JSON foi criada. O campo é formatado com o formato ISO 8601.

O exemplo de mensagem JSON a seguir ilustra uma mensagem de tipo de dados com todos os metadados adicionais.

{ "data":{ "id":100000161, "fname":"val61s", "lname":"val61s", "REGION":"val61s" }, "metadata":{ "timestamp":"2019-10-31T22:53:59.721201Z", "record-type":"data", "operation":"insert", "partition-key-type":"primary-key", "partition-key-value":"sbtest.sbtest_x.100000161", "schema-name":"sbtest", "table-name":"sbtest_x", "transaction-id":9324410911751, "transaction-record-id":1, "prev-transaction-id":9324410910341, "prev-transaction-record-id":10, "commit-timestamp":"2019-10-31T22:53:55.000000Z", "stream-position":"mysql-bin-changelog.002171:36912271:0:36912333:9324410911751:mysql-bin-changelog.002171:36912209" } }

O exemplo de mensagem JSON a seguir ilustra uma mensagem de tipo de controle.

{ "control":{ "table-def":{ "columns":{ "id":{ "type":"WSTRING", "length":512, "nullable":false }, "fname":{ "type":"WSTRING", "length":255, "nullable":true }, "lname":{ "type":"WSTRING", "length":255, "nullable":true }, "REGION":{ "type":"WSTRING", "length":1000, "nullable":true } }, "primary-key":[ "id" ], "collation-name":"latin1_swedish_ci" } }, "metadata":{ "timestamp":"2019-11-21T19:14:22.223792Z", "record-type":"control", "operation":"create-table", "partition-key-type":"task-id", "schema-name":"sbtest", "table-name":"sbtest_t1" } }