Uso de Apache Kafka como objetivo para AWS Database Migration Service - AWS Database Migration Service

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Uso de Apache Kafka como objetivo para AWS Database Migration Service

Puede usarlo AWS DMS para migrar datos a un clúster de Apache Kafka. Apache Kafka es una plataforma de streaming distribuida. Puede utilizar Apache Kafka para ingerir y procesar datos de streaming en tiempo real.

AWS también ofrece Amazon Managed Streaming para que Apache Kafka (Amazon MSK) lo utilice como objetivo. AWS DMS Amazon MSK es un servicio de streaming de Apache Kafka completamente administrado que simplifica la implementación y gestión de instancias de Apache Kafka. Funciona con versiones de código abierto de Apache Kafka y puede acceder a las instancias de Amazon MSK como AWS DMS destinos exactamente igual que a cualquier instancia de Apache Kafka. Para obtener más información, consulte ¿Qué es Amazon MSK? en la Guía para desarrolladores de Amazon Managed Streaming para Apache Kafka..

Un clúster de Kafka almacena flujos de registros en categorías denominadas temas que se dividen en particiones. Las particiones son secuencias de registros de datos (mensajes) identificados de forma única en un tema. Las particiones se pueden distribuir entre varios agentes de un clúster para permitir el procesamiento paralelo de los registros de un tema. Para obtener más información sobre temas y particiones y su distribución en Apache Kafka, consulte Temas y registros y Distribución.

El clúster de Kafka puede ser una instancia de Amazon MSK, un clúster que se ejecute en una instancia de Amazon EC2 o un clúster en las instalaciones. Una instancia de Amazon MSK o un clúster en una instancia de Amazon EC2 se pueden encontrar en la misma VPC o en una diferente. Si el clúster está en las instalaciones, puede usar su propio servidor de nombres en las instalaciones para la instancia de replicación a fin de resolver el nombre de host del clúster. Para obtener información acerca de cómo configurar un servidor de nombres para la instancia de replicación, consulte Uso de su propio servidor de nombres en las instalaciones. Para obtener más información sobre cómo configurar una red, consulte Configuración de una red para una instancia de replicación.

Cuando utilice un clúster de Amazon MSK, asegúrese de que su grupo de seguridad permita el acceso desde la instancia de replicación. Para obtener información sobre cómo cambiar el grupo de seguridad de un clúster de Amazon MSK, consulte Cambio del grupo de seguridad de un clúster de Amazon MSK.

AWS Database Migration Service publica los registros de un tema de Kafka mediante JSON. Durante la conversión, AWS DMS serializa cada registro de la base de datos de origen en un par de atributo-valor en formato JSON.

Para migrar los datos desde cualquier origen de datos admitido a un clúster de Kafka de destino, se usa la asignación de objetos. Con la asignación de objetos, se determina cómo se estructuran los registros de datos en el tema de destino. También debe definir una clave de partición para cada tabla, que Apache Kafka utiliza para agrupar los datos en particiones.

Actualmente, AWS DMS admite un solo tema por tarea. En el caso de una sola tarea con varias tablas, todos los mensajes van a un solo tema. Cada mensaje incluye una sección de metadatos que identifica el esquema y la tabla de destino. AWS DMS las versiones 3.4.6 y posteriores admiten la replicación multitema mediante el mapeo de objetos. Para obtener más información, consulte Replicación multitemática mediante asignación de objetos.

Configuración de punto de enlace de Apache Kafka

Puede especificar los detalles de la conexión mediante la configuración del punto final en la AWS DMS consola o mediante la --kafka-settings opción en la CLI. A continuación se indican los requisitos para cada ajuste:

  • Broker: especifique las ubicaciones de uno o más agentes en el clúster de Kafka en forma de una lista separada por comas de cada broker-hostname:port. Un ejemplo es "ec2-12-345-678-901.compute-1.amazonaws.com:2345,ec2-10-987-654-321.compute-1.amazonaws.com:9876". Esta configuración puede especificar las ubicaciones de algunos o todos los agentes del clúster. Todos los agentes de clúster se comunican para gestionar la partición de los registros de datos migrados al tema.

  • Topic: (opcional) especifique el nombre del tema con una longitud máxima de 255 letras y símbolos. Puede usar el punto (.), el guion bajo (_) y el signo menos (-). Los nombres de temas con un punto (.) o guion bajo (_) pueden colisionar en las estructuras de datos internas. Puede usar uno de estos símbolos, pero no ambos, en el nombre del tema. Si no especifica un nombre para el tema, "kafka-default-topic" lo AWS DMS utiliza como tema de migración.

    nota

    Para AWS DMS crear un tema de migración que especifique o el tema predeterminado, auto.create.topics.enable = true configúrelo como parte de la configuración del clúster de Kafka. Para obtener más información, consulte Limitaciones al usar Apache Kafka como destino para AWS Database Migration Service

  • MessageFormat: el formato del resultado de los registros creados en el punto de conexión. El formato del mensaje es JSON (predeterminado) o JSON_UNFORMATTED (una sola línea sin tabulación).

  • MessageMaxBytes: el tamaño máximo en bytes de los registros creados en el punto de conexión. El valor predeterminado es 1 000 000.

    nota

    Solo puede usar la AWS CLI/SDK para cambiar a un valor que no sea MessageMaxBytes el predeterminado. Por ejemplo, para modificar el punto de conexión de Kafka existente y cambiar MessageMaxBytes, utilice el siguiente comando.

    aws dms modify-endpoint --endpoint-arn your-endpoint --kafka-settings Broker="broker1-server:broker1-port,broker2-server:broker2-port,...", Topic=topic-name,MessageMaxBytes=integer-of-max-message-size-in-bytes
  • IncludeTransactionDetails: proporciona información detallada sobre transacciones de la base de datos de origen. Esta información incluye una marca temporal de confirmación, una posición de registro y valores para transaction_id, previous_transaction_id y transaction_record_id (el desplazamiento del registro dentro de una transacción). El valor predeterminado es false.

  • IncludePartitionValue: muestra el valor de partición dentro de la salida del mensaje de Kafka, a menos que el tipo de partición sea schema-table-type. El valor predeterminado es false.

  • PartitionIncludeSchemaTable: agrega los nombres de los esquemas y de las tablas como prefijo a los valores de partición, cuando el tipo de partición es primary-key-type. Al hacerlo, aumenta la distribución de datos entre las particiones de Kafka. Por ejemplo, supongamos que un esquema SysBench tiene miles de tablas y cada tabla tiene un rango limitado para una clave principal. En este caso, la misma clave principal se envía desde miles de tablas a la misma partición, lo que provoca limitación. El valor predeterminado es false.

  • IncludeTableAlterOperations: incluye todas las operaciones de lenguaje de definición de datos (DDL) que cambien la tabla en los datos de control, como rename-table, drop-table, add-column, drop-column y rename-column. El valor predeterminado es false.

  • IncludeControlDetails: muestra información detallada de control para la definición de tablas, la definición de columnas y los cambios de tablas y columnas en la salida del mensaje de Kafka. El valor predeterminado es false.

  • IncludeNullAndEmpty: incluya columnas NULL y vacías en el objetivo. El valor predeterminado es false.

  • SecurityProtocol: establece una conexión segura a un punto de conexión de destino de Kafka utilizando la seguridad de la capa de transporte (TLS). Las opciones incluyen ssl-authentication, ssl-encryptiony sasl-ssl. El uso de sasl-ssl requiere SaslUsername y SaslPassword.

  • SslEndpointIdentificationAlgorithm— Establece la verificación del nombre de host para el certificado. Esta configuración se admite en la AWS DMS versión 3.5.1 y versiones posteriores. Estas son las opciones disponibles:

    • NONE: Deshabilite la verificación del nombre de host del corredor en la conexión del cliente.

    • HTTPS: Habilite la verificación del nombre de host del corredor en la conexión del cliente.

Puede usar la configuración para ayudar a aumentar la velocidad de la transferencia. Para ello, AWS DMS admite la carga completa con varios subprocesos en un clúster de destino de Apache Kafka. AWS DMS admite esta operación con varios subprocesos con las configuraciones de tareas que incluyen lo siguiente:

  • MaxFullLoadSubTasks— Utilice esta opción para indicar el número máximo de tablas de origen que se van a cargar en paralelo. AWS DMS carga cada tabla en su tabla de destino de Kafka correspondiente mediante una subtarea dedicada. El valor predeterminado es 8, el valor máximo es 49.

  • ParallelLoadThreads— Utilice esta opción para especificar el número de subprocesos que se AWS DMS utilizan para cargar cada tabla en su tabla de destino de Kafka. El valor máximo para un destino de Apache Kafka es 32. Puede pedir que se incremente este límite máximo.

  • ParallelLoadBufferSize: utilice esta opción para especificar el número máximo de registros para almacenar en el búfer que los subprocesos de carga en paralelo utilizan para cargar datos en el destino de Kafka. El valor predeterminado es 50. El valor máximo es 1000. Utilice este parámetro con ParallelLoadThreads. ParallelLoadBufferSize es válido solo cuando hay más de un subproceso.

  • ParallelLoadQueuesPerThread: utilice esta opción para especificar el número de colas que acceden a cada subproceso simultáneo para eliminar los registros de datos de las colas y generar una carga por lotes para el destino. El valor predeterminado de es 1. El máximo es 512.

Puede mejorar el rendimiento de la captura de datos de cambios (CDC) para los puntos de conexión de Kafka ajustando la configuración de las tareas para los subprocesos paralelos y las operaciones masivas. Para ello, puede especificar el número de subprocesos simultáneos, las colas por subproceso y el número de registros que se van a almacenar en un búfer mediante la configuración de tareas ParallelApply*. Suponga, por ejemplo, que desea realizar una carga de CDC y aplicar 128 subprocesos en paralelo. También desea acceder a 64 colas por subproceso, con 50 registros almacenados por búfer.

Para promover el desempeño de los CDC, AWS DMS admite las siguientes configuraciones de tareas:

  • ParallelApplyThreads— Especifica la cantidad de subprocesos simultáneos que se AWS DMS utilizan durante una carga de CDC para enviar los registros de datos a un punto final de Kafka. El valor predeterminado es cero (0) y el valor máximo es 32.

  • ParallelApplyBufferSize: especifica el número máximo de registros que se almacenan en cada cola del búfer para los subprocesos simultáneos que insertan datos en un punto de conexión de destino de Kafka durante una carga de CDC. El valor predeterminado es 100 y el máximo es 1000. Utilice esta opción cuando ParallelApplyThreads especifique más de un subproceso.

  • ParallelApplyQueuesPerThread: especifica el número de colas a las que accede cada subproceso para sacar registros de datos de las colas y generar una carga por lotes para un punto de conexión de Kafka durante el proceso de CDC. El valor predeterminado de es 1. El máximo es 512.

Cuando se utiliza la configuración de tareas ParallelApply*, el valor predeterminado de partition-key-type es el valor de primary-key de la tabla, no el valor de schema-name.table-name.

Conexión a Kafka mediante seguridad de la capa de transporte (TLS)

Un clúster de Kafka acepta conexiones seguras mediante seguridad de la capa de transporte (TLS). Con DMS, puede utilizar cualquiera de las siguientes tres opciones de protocolos de seguridad para proteger una conexión de punto de conexión de Kafka.

Cifrado SSL (server-encryption)

Los clientes validan la identidad del servidor mediante el certificado del servidor. A continuación, se establece una conexión cifrada entre el servidor y el cliente.

Autenticación SSL (mutual-authentication)

El servidor y el cliente validan la identidad entre sí mediante sus propios certificados. A continuación, se establece una conexión cifrada entre el servidor y el cliente.

SASL-SSL (mutual-authentication)

El método de capa de seguridad y autenticación simple (SASL) reemplaza el certificado del cliente por un nombre de usuario y una contraseña para validar la identidad del cliente. En concreto, debe proporcionar un nombre de usuario y una contraseña que el servidor haya registrado para que el servidor pueda validar la identidad de un cliente. A continuación, se establece una conexión cifrada entre el servidor y el cliente.

importante

Apache Kafka y Amazon MSK aceptan certificados resueltos. Se trata de una limitación conocida que deben abordar Kafka y Amazon MSK. Para obtener más información, consulte Problemas de Apache Kafka, KAFKA-3700.

Si utiliza Amazon MSK, considere la posibilidad de utilizar listas de control de acceso (ACL) como solución alternativa a esta conocida limitación. Para obtener más información sobre el uso de ACL, consulte la sección ACL de Apache Kafka de la Guía para desarrolladores de Amazon Managed Streaming para Apache Kafka.

Si utiliza un clúster de Kafka autoadministrado, consulte Comentario del 21 de octubre de 2018 para obtener información sobre la configuración del clúster.

Uso del cifrado SSL con Amazon MSK o un clúster de Kafka autoadministrado

Puede utilizar el cifrado SSL para proteger una conexión de punto de conexión con Amazon MSK o con un clúster de Kafka autoadministrado. Cuando utiliza el método de autenticación de cifrado SSL, los clientes validan la identidad de un servidor mediante el certificado del servidor. A continuación, se establece una conexión cifrada entre el servidor y el cliente.

Uso del cifrado SSL para conectarse a Amazon MSK
  • Establezca la configuración del punto de conexión del protocolo de seguridad (SecurityProtocol) mediante la opción ssl-encryption al crear el punto de conexión de Kafka de destino.

    En el siguiente ejemplo de JSON se establece el protocolo de seguridad como cifrado SSL.

"KafkaSettings": { "SecurityProtocol": "ssl-encryption", }
Uso del cifrado SSL en un clúster de Kafka autoadministrado
  1. Si utiliza una entidad de certificación (CA) privada en el clúster de Kafka en las instalaciones, cargue el certificado de la entidad de certificación privado y obtenga un nombre de recurso de Amazon (ARN).

  2. Establezca la configuración del punto de conexión del protocolo de seguridad (SecurityProtocol) mediante la opción ssl-encryption al crear el punto de conexión de Kafka de destino. En el siguiente ejemplo de JSON se establece el protocolo de seguridad como ssl-encryption.

    "KafkaSettings": { "SecurityProtocol": "ssl-encryption", }
  3. Si utiliza una entidad de certificación privada, establezca SslCaCertificateArn en el ARN que obtuvo en el primer paso anterior.

Uso de la autenticación SSL

Puede utilizar la autenticación SSL para proteger una conexión de punto de conexión con Amazon MSK o con un clúster de Kafka autoadministrado.

Para habilitar la autenticación y el cifrado del cliente mediante la autenticación SSL para conectarse a Amazon MSK, haga lo siguiente:

  • Prepare una clave privada y un certificado público para Kafka.

  • Cargue certificados en el mánager de certificados de DMS.

  • Cree un punto de conexión de destino de Kafka con los ARN de certificado correspondientes especificados en la configuración del punto de conexión de Kafka.

Preparación de una clave privada y un certificado público para Amazon MSK
  1. Cree una instancia EC2 y configure un cliente para que utilice la autenticación tal y como se describe en los pasos 1 a 9 de la sección Autenticación de clientes de la Guía para desarrolladores de Amazon Managed Streaming para Apache Kafka.

    Tras completar estos pasos, dispondrá de un ARN de certificado (el ARN del certificado público guardado en ACM) y de una clave privada en un archivo kafka.client.keystore.jks.

  2. Obtenga el certificado público y cópielo en el archivo signed-certificate-from-acm.pem mediante el siguiente comando:

    aws acm-pca get-certificate --certificate-authority-arn Private_CA_ARN --certificate-arn Certificate_ARN

    El comando devuelve información similar al siguiente ejemplo:

    {"Certificate": "123", "CertificateChain": "456"}

    A continuación, copie el equivalente de "123" al archivo signed-certificate-from-acm.pem.

  3. Obtenga la clave privada importando la clave msk-rsa desde kafka.client.keystore.jks to keystore.p12, como se muestra en el siguiente ejemplo.

    keytool -importkeystore \ -srckeystore kafka.client.keystore.jks \ -destkeystore keystore.p12 \ -deststoretype PKCS12 \ -srcalias msk-rsa-client \ -deststorepass test1234 \ -destkeypass test1234
  4. Utilice el siguiente comando para exportar keystore.p12 a un formato .pem.

    Openssl pkcs12 -in keystore.p12 -out encrypted-private-client-key.pem –nocerts

    Aparece el mensaje Ingrese la frase de contraseña PEM e identifica la clave que se aplica para cifrar el certificado.

  5. Elimine los atributos de bolsa y los atributos de clave del archivo .pem para asegurarse de que la primera línea comience con la siguiente cadena.

    ---BEGIN ENCRYPTED PRIVATE KEY---
Carga de un certificado público y una clave privada en el mánager de certificados de DMS y prueba de la conexión a Amazon MSK
  1. Cargue en el mánager de certificados de DMS mediante el siguiente comando.

    aws dms import-certificate --certificate-identifier signed-cert --certificate-pem file://path to signed cert aws dms import-certificate --certificate-identifier private-key —certificate-pem file://path to private key
  2. Cree un punto de conexión de destino de Amazon MSK y pruebe la conexión para asegurarse de que la autenticación TLS funciona.

    aws dms create-endpoint --endpoint-identifier $endpoint-identifier --engine-name kafka --endpoint-type target --kafka-settings '{"Broker": "b-0.kafka260.aaaaa1.a99.kafka.us-east-1.amazonaws.com:0000", "SecurityProtocol":"ssl-authentication", "SslClientCertificateArn": "arn:aws:dms:us-east-1:012346789012:cert:", "SslClientKeyArn": "arn:aws:dms:us-east-1:0123456789012:cert:","SslClientKeyPassword":"test1234"}' aws dms test-connection -replication-instance-arn=$rep_inst_arn —endpoint-arn=$kafka_tar_arn_msk
importante

Puede utilizar la autenticación SSL para proteger una conexión a un clúster de Kafka autoadministrado. En algunos casos, es posible que use una entidad de certificación (CA) privada en el clúster de Kafka en las instalaciones. Si es así, cargue la cadena de la entidad de certificación, el certificado público y la clave privada en el mánager de certificados de DMS. A continuación, utilice el nombre de recurso de Amazon (ARN) correspondiente en la configuración del punto de conexión cuando cree el punto de conexión de destino Kafka en las instalaciones.

Preparación de una clave privada y un certificado firmado para un clúster de Kafka autoadministrado
  1. Genere un par de claves como se muestra en el siguiente ejemplo.

    keytool -genkey -keystore kafka.server.keystore.jks -validity 300 -storepass your-keystore-password -keypass your-key-passphrase -dname "CN=your-cn-name" -alias alias-of-key-pair -storetype pkcs12 -keyalg RSA
  2. Genere una solicitud de firma de certificado (CSR).

    keytool -keystore kafka.server.keystore.jks -certreq -file server-cert-sign-request-rsa -alias on-premise-rsa -storepass your-key-store-password -keypass your-key-password
  3. Utilice la entidad de certificación del almacén de confianza del clúster para firmar la CSR. Si no tiene una entidad de certificación, puede crear su propia entidad de certificación privada.

    openssl req -new -x509 -keyout ca-key -out ca-cert -days validate-days
  4. Importe ca-cert en el almacén de confianza y al almacén de claves del servidor. Si no dispone de un almacén de confianza, utilice el siguiente comando para crear el almacén de confianza e importar ca-cert en él.

    keytool -keystore kafka.server.truststore.jks -alias CARoot -import -file ca-cert keytool -keystore kafka.server.keystore.jks -alias CARoot -import -file ca-cert
  5. Firme el certificado.

    openssl x509 -req -CA ca-cert -CAkey ca-key -in server-cert-sign-request-rsa -out signed-server-certificate.pem -days validate-days -CAcreateserial -passin pass:ca-password
  6. Importe el certificado firmado al almacén de claves.

    keytool -keystore kafka.server.keystore.jks -import -file signed-certificate.pem -alias on-premise-rsa -storepass your-keystore-password -keypass your-key-password
  7. Utilice el siguiente comando para importar la clave on-premise-rsa de kafka.server.keystore.jks a keystore.p12.

    keytool -importkeystore \ -srckeystore kafka.server.keystore.jks \ -destkeystore keystore.p12 \ -deststoretype PKCS12 \ -srcalias on-premise-rsa \ -deststorepass your-truststore-password \ -destkeypass your-key-password
  8. Utilice el siguiente comando para exportar keystore.p12 a un formato .pem.

    Openssl pkcs12 -in keystore.p12 -out encrypted-private-server-key.pem –nocerts
  9. Cargue encrypted-private-server-key.pem, signed-certificate.pem y ca-cert en el mánager de certificados de DMS.

  10. Cree un punto de conexión mediante los ARN devueltos.

    aws dms create-endpoint --endpoint-identifier $endpoint-identifier --engine-name kafka --endpoint-type target --kafka-settings '{"Broker": "b-0.kafka260.aaaaa1.a99.kafka.us-east-1.amazonaws.com:9092", "SecurityProtocol":"ssl-authentication", "SslClientCertificateArn": "your-client-cert-arn","SslClientKeyArn": "your-client-key-arn","SslClientKeyPassword":"your-client-key-password", "SslCaCertificateArn": "your-ca-certificate-arn"}' aws dms test-connection -replication-instance-arn=$rep_inst_arn —endpoint-arn=$kafka_tar_arn_msk

Uso de la autenticación SASL-SSL para conectarse a Amazon MSK

El método de capa de seguridad y autenticación simple (SASL) utiliza un nombre de usuario y una contraseña para validar la identidad del cliente y establece una conexión cifrada entre el servidor y el cliente.

Para usar SASL, primero debe crear un nombre de usuario y una contraseña seguros al configurar el clúster de Amazon MSK. Para obtener una descripción de cómo configurar un nombre de usuario y una contraseña seguros para un clúster de Amazon MSK, consulte Configuración de la autenticación SASL/SCRAM para un clúster de Amazon MSK en la Guía para desarrolladores de Amazon Managed Streaming para Apache Kafka.

A continuación, cuando cree el punto de conexión de destino de Kafka, establezca la configuración del punto de conexión del protocolo de seguridad (SecurityProtocol) mediante la opción sasl-ssl. Establezca también las opciones SaslUsername y SaslPassword. Asegúrese de que coincidan con el nombre de usuario y la contraseña seguros que creó cuando configuró por primera vez el clúster de Amazon MSK, como se muestra en el siguiente ejemplo de JSON.

"KafkaSettings": { "SecurityProtocol": "sasl-ssl", "SaslUsername":"Amazon MSK cluster secure user name", "SaslPassword":"Amazon MSK cluster secure password" }
nota
  • Actualmente, solo AWS DMS admite el SASL-SSL público respaldado por una CA. El DMS no admite el SASL-SSL para su uso con Kafka autoadministrado y respaldado por una entidad de certificación privada.

  • Para la autenticación SASL-SSL, AWS DMS es compatible con el mecanismo SCRAM-SHA-512 de forma predeterminada. AWS DMS las versiones 3.5.0 y superiores también admiten el mecanismo Plain. Para admitir el mecanismo Plain, defina el parámetro SaslMechanism del tipo de datos de la API KafkaSettings en PLAIN.

Uso de una imagen anterior para consultar los valores originales de las filas de CDC para Apache Kafka como destino

Al escribir actualizaciones de CDC en un destino de transmisión de datos como Kafka, puede ver los valores originales de una fila de base de datos de origen antes de cambiar mediante una actualización. Para que esto sea posible, AWS DMS rellena una imagen anterior de los eventos de actualización en función de los datos proporcionados por el motor de base de datos de origen.

Los diferentes motores de base de datos de origen proporcionan diferentes cantidades de información para una imagen anterior:

  • Oracle proporciona actualizaciones a las columnas solo si cambian.

  • PostgreSQL proporciona datos solo para las columnas que forman parte de la clave principal (cambiadas o no). Si se utiliza la replicación lógica y se ha configurado REPLICA IDENTITY FULL para la tabla de origen, puede obtener toda la información del antes y el después de la fila escrita en los WAL y disponible aquí.

  • MySQL generalmente proporciona datos para todas las columnas (cambiadas o no).

Para habilitar imágenes anteriores para agregar valores originales de la base de datos de origen a la salida AWS DMS , utilice la configuración de tarea BeforeImageSettings o el parámetro add-before-image-columns. Este parámetro aplica una regla de transformación de columna.

BeforeImageSettings agrega un nuevo atributo JSON a cada operación de actualización con valores recopilados desde el sistema de base de datos de origen, como se muestra a continuación.

"BeforeImageSettings": { "EnableBeforeImage": boolean, "FieldName": string, "ColumnFilter": pk-only (default) / non-lob / all (but only one) }
nota

Aplicar BeforeImageSettings a tareas de carga completa más CDC (que migran datos existentes y replican cambios en curso) o a tareas de solo CDC (que replican cambios de datos solamente). No se aplica BeforeImageSettings a tareas que son solo de carga completa.

Para las opciones de BeforeImageSettings, se aplica lo siguiente:

  • Establezca la opción EnableBeforeImage para habilitar true antes de crear imágenes. El valor predeterminado es false.

  • Utilice la opción FieldName para asignar un nombre al nuevo atributo JSON. Cuando EnableBeforeImage es true, FieldName es necesario y no puede estar vacío.

  • La opción ColumnFilter especifica una columna para agregar mediante el uso de las imágenes anteriores. Para agregar solo columnas que forman parte de las claves principales de la tabla, utilice el valor predeterminado, pk-only. Para agregar solo columnas que no son del tipo LOB, utilice non-lob. Para agregar cualquier columna que tenga un valor de imagen anterior, utilice all.

    "BeforeImageSettings": { "EnableBeforeImage": true, "FieldName": "before-image", "ColumnFilter": "pk-only" }

Uso de una regla de transformación de imagen anterior

Como alternativa a la configuración de tareas, puede utilizar el parámetro add-before-image-columns, que aplica una regla de transformación de columnas. Con este parámetro, puede habilitar las imágenes anteriores durante CDC en destinos de transmisión de datos como Kafka.

Al utilizar add-before-image-columns en una regla de transformación, puede aplicar un control más detallado de los resultados de la imagen anterior. Las reglas de transformación permiten utilizar un localizador de objetos que le da control sobre las tablas seleccionadas para la regla. Además, puede encadenar reglas de transformación, lo que permite aplicar diferentes reglas a diferentes tablas. A continuación, puede manipular las columnas producidas utilizando otras reglas.

nota

No utilice el parámetro add-before-image-columns junto con la configuración de tarea BeforeImageSettings dentro de la misma tarea. En su lugar, utilice el parámetro o la configuración, pero no ambos, para una sola tarea.

Un tipo de regla transformation con el parámetro add-before-image-columns de una columna debe proporcionar una sección before-image-def. A continuación se muestra un ejemplo.

{ "rule-type": "transformation", … "rule-target": "column", "rule-action": "add-before-image-columns", "before-image-def":{ "column-filter": one-of (pk-only / non-lob / all), "column-prefix": string, "column-suffix": string, } }

El valor de column-prefix se antepone a un nombre de columna y el valor predeterminado de column-prefix es BI_. El valor de column-suffix se añade al nombre de la columna y el valor predeterminado está vacío. No configure ambas cadenas column-prefix y column-suffix como cadenas vacías.

Elija un valor para column-filter. Para agregar solo columnas que forman parte de las claves principales de la tabla, elija pk-only. Elija non-lob para agregar solo columnas que no sean de tipo LOB. O elija all para agregar cualquier columna que tenga un valor de imagen anterior.

Ejemplo de una regla de transformación de imagen anterior

La regla de transformación del siguiente ejemplo agrega una nueva columna llamada BI_emp_no en el destino. Entonces, una instrucción como UPDATE employees SET emp_no = 3 WHERE emp_no = 1; rellena el campo BI_emp_no con 1. Cuando escribe actualizaciones de CDC en destinos de Amazon S3, la columna BI_emp_no permite indicar qué fila original se actualizó.

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "object-locator": { "schema-name": "%", "table-name": "%" }, "rule-action": "include" }, { "rule-type": "transformation", "rule-id": "2", "rule-name": "2", "rule-target": "column", "object-locator": { "schema-name": "%", "table-name": "employees" }, "rule-action": "add-before-image-columns", "before-image-def": { "column-prefix": "BI_", "column-suffix": "", "column-filter": "pk-only" } } ] }

Para obtener información sobre el uso de la acción de regla add-before-image-columns, consulte Reglas y acciones de transformación.

Limitaciones al usar Apache Kafka como destino para AWS Database Migration Service

Al utilizar Apache Kafka como destino, se aplican las siguientes restricciones:

  • AWS DMS Los puntos de enlace de destino de Kafka no admiten el control de acceso de IAM para Amazon Managed Streaming for Apache Kafka (Amazon MSK).

  • No se admite el modo LOB completo.

  • Especifique un archivo de configuración de Kafka para su clúster con propiedades que permitan AWS DMS crear nuevos temas automáticamente. Incluya el valor auto.create.topics.enable = true. Si utiliza Amazon MSK, puede especificar la configuración predeterminada al crear el clúster de Kafka y, a continuación, cambiar la configuración de auto.create.topics.enable a true. Para obtener más información acerca de los valores de configuración predeterminados, consulte La configuración predeterminada de Amazon MSK en la Guía para desarrolladores de Amazon Managed Streaming para Apache Kafka. Si necesita modificar un clúster de Kafka existente creado con Amazon MSK, ejecute el AWS CLI comando aws kafka create-configuration para actualizar la configuración de Kafka, como en el siguiente ejemplo:

    14:38:41 $ aws kafka create-configuration --name "kafka-configuration" --kafka-versions "2.2.1" --server-properties file://~/kafka_configuration { "LatestRevision": { "Revision": 1, "CreationTime": "2019-09-06T14:39:37.708Z" }, "CreationTime": "2019-09-06T14:39:37.708Z", "Name": "kafka-configuration", "Arn": "arn:aws:kafka:us-east-1:111122223333:configuration/kafka-configuration/7e008070-6a08-445f-9fe5-36ccf630ecfd-3" }

    Aquí, //~/kafka_configuration es el archivo de configuración que ha creado con los valores de propiedades necesarios.

    Si utiliza su propia instancia de Kafka instalada en Amazon EC2, modifique la configuración del clúster de Kafka con auto.create.topics.enable = true la configuración AWS DMS que permita la creación automática de nuevos temas mediante las opciones que se proporcionan con la instancia.

  • AWS DMS publica cada actualización en un único registro de la base de datos de origen como un registro de datos (mensaje) de un tema de Kafka determinado, independientemente de las transacciones.

  • AWS DMS admite las dos formas siguientes de claves de partición:

    • SchemaName.TableName una combinación del nombre de esquema y de tabla.

    • ${AttributeName}: el valor de uno de los campos del archivo JSON o la clave principal de la tabla de la base de datos de origen.

  • BatchApply no es compatible con un punto de conexión de Kafka. Es posible que el uso de la aplicación por lotes (por ejemplo, la configuración de tareas de metadatos de destino BatchApplyEnabled) para un objetivo de Kafka provoque la pérdida de datos.

  • AWS DMS no admite la migración de valores de tipos de BigInt datos con más de 16 dígitos. Para evitar esta limitación, puede usar la siguiente regla de transformación para convertir la columna BigInt en una cadena. Para obtener más información sobre las reglas de transformación, consulte Reglas y acciones de transformación.

    { "rule-type": "transformation", "rule-id": "id", "rule-name": "name", "rule-target": "column", "object-locator": { "schema-name": "valid object-mapping rule action", "table-name": "", "column-name": "" }, "rule-action": "change-data-type", "data-type": { "type": "string", "length": 20 } }

Uso de la asignación de objetos para migrar datos a un tema de Kafka

AWS DMS utiliza reglas de mapeo de tablas para mapear datos del tema de Kafka de origen al de destino. Para asignar datos a un tema de destino, se utiliza un tipo de regla de asignación de tablas denominado asignación de objetos. Puede utilizar el mapeo de objetos para definir cómo los registros de datos del origen se asignan a los registros de datos publicados en un tema de Kafka.

Los temas de Kafka no tienen una estructura predeterminada distinta de una clave de partición.

nota

No tiene que utilizar la asignación de objetos. Puede utilizar la asignación de tablas normal para varias transformaciones. Sin embargo, el tipo de clave de partición seguirá estos comportamientos predeterminados:

  • La clave principal se usa como clave de partición para la carga completa.

  • Si no se utiliza ninguna configuración de tareas de aplicación paralela, schema.table se utiliza como clave de partición para CDC.

  • Si se utiliza la configuración de tareas de aplicación paralela, la clave principal se utiliza como clave de partición para CDC.

Para crear una regla de mapeo de objetos, se especifica rule-type como object-mapping. Esta regla indica el tipo de mapeo de objetos que desea utilizar.

La estructura de la regla es la siguiente.

{ "rules": [ { "rule-type": "object-mapping", "rule-id": "id", "rule-name": "name", "rule-action": "valid object-mapping rule action", "object-locator": { "schema-name": "case-sensitive schema name", "table-name": "" } } ] }

AWS DMS actualmente admite map-record-to-record y es map-record-to-document el único valor válido para el parámetro. rule-action Esta configuración afecta a los valores que no están excluidos como parte de la lista de atributos exclude-columns. Los map-record-to-document valores map-record-to-record y especifican cómo se AWS DMS gestionan estos registros de forma predeterminada. Estos valores no afectan a los mapeos de atributos en modo alguno.

Utilice map-record-to-record al migrar desde una base de datos relacional a un tema de Kafka. Este tipo de regla utiliza el valor taskResourceId.schemaName.tableName de la base de datos relacional como la clave de partición en el tema de Kafka y crea un atributo para cada columna de la base de datos de origen.

Cuando utilice map-record-to-record, tenga en cuenta lo siguiente:

  • Esta configuración solo afecta a las columnas excluidas de la lista exclude-columns.

  • Para cada columna de este tipo, AWS DMS crea un atributo correspondiente en el tema de destino.

  • AWS DMS crea el atributo correspondiente independientemente de si la columna de origen se utiliza en una asignación de atributos.

Una forma de entender map-record-to-record es verlo en acción. En este ejemplo, imagine que empieza con una fila de una tabla de base de datos relacional con la estructura y los datos siguientes.

FirstName LastName StoreId HomeAddress HomePhone WorkAddress WorkPhone DateofBirth

Randy

Marsh

5

221B Baker Street

1234567890

31 Spooner Street, Quahog

9876543210

02/29/1988

Para migrar esta información desde un esquema denominado Test a un tema de Kafka, cree reglas para mapear los datos al tema. La siguiente regla ilustra la operación de asignación.

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "rule-action": "include", "object-locator": { "schema-name": "Test", "table-name": "%" } }, { "rule-type": "object-mapping", "rule-id": "2", "rule-name": "DefaultMapToKafka", "rule-action": "map-record-to-record", "object-locator": { "schema-name": "Test", "table-name": "Customers" } } ] }

Dados un tema de Kafka y una clave de partición (en este caso, taskResourceId.schemaName.tableName), a continuación se ilustra el formato de registro resultante al usar nuestros datos de ejemplo en el tema de destino de Kafka:

{ "FirstName": "Randy", "LastName": "Marsh", "StoreId": "5", "HomeAddress": "221B Baker Street", "HomePhone": "1234567890", "WorkAddress": "31 Spooner Street, Quahog", "WorkPhone": "9876543210", "DateOfBirth": "02/29/1988" }

Reestructuración de datos con el mapeo de atributos

Puede reestructurar los datos mientras los migra a un tema de Kafka utilizando un mapa de atributos. Por ejemplo, es posible que desee combinar varios campos del origen en un único campo en el destino. El mapa de atributos siguiente ilustra cómo reestructurar los datos.

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "rule-action": "include", "object-locator": { "schema-name": "Test", "table-name": "%" } }, { "rule-type": "object-mapping", "rule-id": "2", "rule-name": "TransformToKafka", "rule-action": "map-record-to-record", "target-table-name": "CustomerData", "object-locator": { "schema-name": "Test", "table-name": "Customers" }, "mapping-parameters": { "partition-key-type": "attribute-name", "partition-key-name": "CustomerName", "exclude-columns": [ "firstname", "lastname", "homeaddress", "homephone", "workaddress", "workphone" ], "attribute-mappings": [ { "target-attribute-name": "CustomerName", "attribute-type": "scalar", "attribute-sub-type": "string", "value": "${lastname}, ${firstname}" }, { "target-attribute-name": "ContactDetails", "attribute-type": "document", "attribute-sub-type": "json", "value": { "Home": { "Address": "${homeaddress}", "Phone": "${homephone}" }, "Work": { "Address": "${workaddress}", "Phone": "${workphone}" } } } ] } } ] }

Para establecer un valor constante para partition-key, especifique un valor de partition-key. Tal vez desee hacer esto, por ejemplo, para obligar a que todos los datos se almacenen en una única partición. El siguiente mapeo ilustra este enfoque.

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "object-locator": { "schema-name": "Test", "table-name": "%" }, "rule-action": "include" }, { "rule-type": "object-mapping", "rule-id": "1", "rule-name": "TransformToKafka", "rule-action": "map-record-to-document", "object-locator": { "schema-name": "Test", "table-name": "Customer" }, "mapping-parameters": { "partition-key": { "value": "ConstantPartitionKey" }, "exclude-columns": [ "FirstName", "LastName", "HomeAddress", "HomePhone", "WorkAddress", "WorkPhone" ], "attribute-mappings": [ { "attribute-name": "CustomerName", "value": "${FirstName},${LastName}" }, { "attribute-name": "ContactDetails", "value": { "Home": { "Address": "${HomeAddress}", "Phone": "${HomePhone}" }, "Work": { "Address": "${WorkAddress}", "Phone": "${WorkPhone}" } } }, { "attribute-name": "DateOfBirth", "value": "${DateOfBirth}" } ] } } ] }
nota

El valor partition-key de un registro de control para una tabla específica es TaskId.SchemaName.TableName. El valor partition-key de un registro de control para una tabla específica es el TaskId de ese registro. La especificación de un valor partition-key en el mapeo de objetos no tiene ningún efecto en el elemento partition-key de un registro de control.

Replicación multitemática mediante asignación de objetos

De forma predeterminada, AWS DMS las tareas migran todos los datos de origen a uno de los siguientes temas de Kafka:

  • Como se especifica en el campo Tema del punto final de AWS DMS destino.

  • Como especifica kafka-default-topic si el campo Tema del punto de conexión de destino no está rellenado y la configuración auto.create.topics.enable de Kafka está establecida en true.

Con las versiones 3.4.6 y posteriores AWS DMS del motor, puede usar el kafka-target-topic atributo para asignar cada tabla fuente migrada a un tema diferente. Por ejemplo, las siguientes reglas de asignación de objetos migran las tablas de origen Customer y Address a los temas de Kafka customer_topic y address_topic, respectivamente. Al mismo tiempo, AWS DMS migra todas las demás tablas de origen, incluida la Bills tabla del Test esquema, al tema especificado en el punto final de destino.

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "rule-action": "include", "object-locator": { "schema-name": "Test", "table-name": "%" } }, { "rule-type": "object-mapping", "rule-id": "2", "rule-name": "MapToKafka1", "rule-action": "map-record-to-record", "kafka-target-topic": "customer_topic", "object-locator": { "schema-name": "Test", "table-name": "Customer" }, "partition-key": {"value": "ConstantPartitionKey" } }, { "rule-type": "object-mapping", "rule-id": "3", "rule-name": "MapToKafka2", "rule-action": "map-record-to-record", "kafka-target-topic": "address_topic", "object-locator": { "schema-name": "Test", "table-name": "Address" }, "partition-key": {"value": "HomeAddress" } }, { "rule-type": "object-mapping", "rule-id": "4", "rule-name": "DefaultMapToKafka", "rule-action": "map-record-to-record", "object-locator": { "schema-name": "Test", "table-name": "Bills" } } ] }

Al utilizar la replicación multitema de Kafka, puede agrupar y migrar las tablas de origen para separar los temas de Kafka mediante una única tarea de replicación.

Formato de mensajes para Apache Kafka

La salida JSON es simplemente una lista de pares de clave-valor.

RecordType

El tipo de registro puede ser de datos o de control. Los registros de datos representan las filas reales en el origen. Los registros de control son para eventos importantes de la secuencia como, por ejemplo, el reinicio de una tarea.

Operación

Para los registros de datos, la operación puede ser load, insert, update o delete.

Para los registros de control, la operación puede ser create-table, rename-table, drop-table, change-columns, add-column, drop-column, rename-column o column-type-change.

SchemaName

El esquema de origen del registro. Este campo puede estar vacío para un registro de control.

TableName

La tabla de origen del registro. Este campo puede estar vacío para un registro de control.

Timestamp

La marca temporal que indica cuándo se creó el mensaje JSON. El campo está formateado con el formato ISO 8601.

El siguiente ejemplo de mensaje JSON ilustra un mensaje de tipo de datos con todos los metadatos adicionales.

{ "data":{ "id":100000161, "fname":"val61s", "lname":"val61s", "REGION":"val61s" }, "metadata":{ "timestamp":"2019-10-31T22:53:59.721201Z", "record-type":"data", "operation":"insert", "partition-key-type":"primary-key", "partition-key-value":"sbtest.sbtest_x.100000161", "schema-name":"sbtest", "table-name":"sbtest_x", "transaction-id":9324410911751, "transaction-record-id":1, "prev-transaction-id":9324410910341, "prev-transaction-record-id":10, "commit-timestamp":"2019-10-31T22:53:55.000000Z", "stream-position":"mysql-bin-changelog.002171:36912271:0:36912333:9324410911751:mysql-bin-changelog.002171:36912209" } }

El siguiente ejemplo de mensaje JSON ilustra un mensaje de tipo control.

{ "control":{ "table-def":{ "columns":{ "id":{ "type":"WSTRING", "length":512, "nullable":false }, "fname":{ "type":"WSTRING", "length":255, "nullable":true }, "lname":{ "type":"WSTRING", "length":255, "nullable":true }, "REGION":{ "type":"WSTRING", "length":1000, "nullable":true } }, "primary-key":[ "id" ], "collation-name":"latin1_swedish_ci" } }, "metadata":{ "timestamp":"2019-11-21T19:14:22.223792Z", "record-type":"control", "operation":"create-table", "partition-key-type":"task-id", "schema-name":"sbtest", "table-name":"sbtest_t1" } }