Verwenden von Apache Kafka als Ziel für AWS Database Migration Service - AWS Database Migration Service

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Verwenden von Apache Kafka als Ziel für AWS Database Migration Service

Sie können verwenden AWS DMS , um Daten zu einem Apache-Kafka-Cluster zu migrieren. Apache Kafka ist eine verteilte Streaming-Plattform. Mit Apache Kafka können Sie Streaming-Daten in Echtzeit erfassen und verarbeiten.

AWS bietet auch Amazon Managed Streaming für Apache Kafka (Amazon MSK) zur Verwendung als AWS DMS Ziel. Amazon MSK ist ein vollständig verwalteter Apache-Kafka-Streaming-Service, der die Implementierung und Verwaltung von Apache-Kafka-Instances vereinfacht. Es funktioniert mit Open-Source-Apache-Kafka-Versionen und Sie greifen genau wie jede Apache-Kafka-Instance als AWS DMS Ziele auf Amazon-MSK-Instances zu. Weitere Informationen finden Sie unter Was ist Amazon MSK? im Entwicklerhandbuch für Amazon Managed Streaming für Apache Kafka.

Ein Kafka-Cluster speichert Streams von Datensätzen in Kategorien, die als Themen bezeichnet und in Partitionen unterteilt werden. Partitionen sind eindeutig identifizierte Sequenzen von Datensätzen (Nachrichten) in einem Thema. Partitionen können über mehrere Broker in einem Cluster verteilt werden, um die parallele Verarbeitung der Datensätze eines Themas zu ermöglichen. Weitere Informationen zu Themen und Partitionen und ihrer Verteilung in Apache Kafka finden Sie unter Themen und Protokolle und Verteilung.

Bei Ihrem Kafka-Cluster kann es sich entweder um eine Amazon-MSK-Instance, einen Cluster, der auf einer Amazon-EC2-Instance ausgeführt wird, oder einen On-Premises-Cluster handeln. Eine Amazon-MSK-Instance oder ein Cluster auf einer Amazon-EC2-Instance kann sich in derselben oder einer anderen VPC befinden. Wenn es sich um einen On-Premises-Cluster handelt, können Sie Ihren eigenen On-Premises-Namensserver für Ihre Replikations-Instance verwenden, um den Host-Namen des Clusters aufzulösen. Informationen zum Einrichten eines Namensservers für Ihre Replikations-Instance finden Sie unter Verwenden Ihres eigenen Vor-Ort-Nameservers. Weitere Informationen zum Einrichten eines Netzwerks finden Sie unter Einrichten eines Netzwerks für eine Replikations-Instance.

Wenn Sie einen Amazon-MSK-Cluster verwenden, stellen Sie sicher, dass dessen Sicherheitsgruppe den Zugriff von Ihrer Replikations-Instance aus ermöglicht. Informationen zum Ändern der Sicherheitsgruppe für einen Amazon-MSK-Cluster finden Sie unter Ändern der Sicherheitsgruppe eines Amazon-MSK-Clusters.

AWS Database Migration Service veröffentlicht Datensätze in einem Kafka-Thema mithilfe von JSON. Während der Konvertierung serialisiert AWS DMS jeden Datensatz aus der Quelldatenbank in ein Attribut/Wert-Paar im JSON-Format.

Zum Migrieren Ihrer Daten von einer unterstützten Datenquelle zu einem Kafka-Ziel-Cluster verwenden Sie die Objektzuweisung. Mit der Objektzuweisung bestimmen Sie, wie die Datensätze im Zielthema strukturiert werden sollen. Außerdem definieren Sie einen Partitionsschlüssel für jede Tabelle, die Apache Kafka zum Gruppieren der Daten in seine Partitionen verwendet.

Derzeit AWS DMS unterstützt ein einzelnes Thema pro Aufgabe. Bei einer einzelnen Aufgabe mit mehreren Tabellen beziehen sich alle Nachrichten auf ein einzelnes Thema. Jede Nachricht enthält einen Metadatenabschnitt, der das Zielschema und die AWS DMS Tabellenversionen 3.4.6 und höher identifiziert. unterstützt die Replikation mehrerer Themen mithilfe der Objektzuordnung. Weitere Informationen finden Sie unter Replikation für mehrere Themen mithilfe der Objektzuweisung.

Apache Kafka-Endpunkteinstellungen

Sie können Verbindungsdetails über Endpunkteinstellungen in der AWS DMS Konsole oder die --kafka-settings Option in der CLI angeben. Es folgen die Anforderungen für jede Einstellung:

  • Broker – Geben Sie die Speicherorte eines oder mehrerer Broker in Ihrem Kafka-Cluster jeweils in Form einer durch Kommata getrennten Liste mit broker-hostname:port an. Ein Beispiel ist "ec2-12-345-678-901.compute-1.amazonaws.com:2345,ec2-10-987-654-321.compute-1.amazonaws.com:9876". Mit dieser Einstellung können Sie die Speicherorte einiger oder aller Broker im Cluster angeben. Die Cluster-Broker kommunizieren miteinander, um die Partitionierung von Datensätzen zu verarbeiten, die zu dem Thema migriert wurden.

  • Topic – (Optional) Geben Sie den Themennamen mit einer maximalen Länge von 255 Buchstaben und Symbolen an. Sie können Punkt (.), Unterstrich (_) und Minuszeichen (-) verwenden. Themennamen mit einem Punkt (.) oder einem Unterstrich (_) können in internen Datenstrukturen kollidieren. Verwenden Sie entweder eines, aber nicht beide dieser Symbole im Namen des Themas. Wenn Sie keinen Themennamen angeben, AWS DMS verwendet "kafka-default-topic" als Migrationsthema.

    Anmerkung

    Um entweder ein von Ihnen angegebenes Migrationsthema oder das Standardthema AWS DMS erstellen zu lassen, legen Sie auto.create.topics.enable = true als Teil Ihrer Kafka-Cluster-Konfiguration fest. Weitere Informationen finden Sie unter Einschränkungen bei der Verwendung von Apache Kafka als Ziel für AWS Database Migration Service.

  • MessageFormat – Das Ausgabeformat für die Datensätze, die auf dem Endpunkt erstellt wurden. Das Nachrichtenformat ist JSON (Standard) oder JSON_UNFORMATTED (eine einzelne Zeile ohne Tabulator).

  • MessageMaxBytes – Die maximale Größe in Byte für Datensätze, die auf dem Endpunkt erstellt wurden. Der Standardwert ist 1.000.000.

    Anmerkung

    Sie können die AWS CLI/das SDK nur verwenden, um zu einem nicht standardmäßigen Wert MessageMaxBytes zu wechseln. Verwenden Sie beispielsweise den folgenden Befehl, um Ihren vorhandenen Kafka-Endpunkt und MessageMaxBytes zu ändern.

    aws dms modify-endpoint --endpoint-arn your-endpoint --kafka-settings Broker="broker1-server:broker1-port,broker2-server:broker2-port,...", Topic=topic-name,MessageMaxBytes=integer-of-max-message-size-in-bytes
  • IncludeTransactionDetails – Stellt detaillierte Transaktionsinformationen aus der Quelldatenbank bereit. Diese Informationen beinhalten einen Durchführungszeitstempel, eine Protokollposition sowie Werte für transaction_id, previous_transaction_id und transaction_record_id (den Datensatzoffset innerhalb einer Transaktion). Der Standardwert ist false.

  • IncludePartitionValue – Zeigt den Partitionswert innerhalb der Kafka-Nachrichtenausgabe an, es sei denn, der Partitionstyp ist schema-table-type. Der Standardwert ist false.

  • PartitionIncludeSchemaTable – Fügt Schema- und Tabellennamen zu Partitionswerten als Präfix hinzu, wenn der Partitionstyp primary-key-type ist. Dadurch wird die Datenverteilung zwischen Kafka-Partitionen erhöht. Angenommen, ein SysBench-Schema hat Tausende von Tabellen und jede davon hat nur einen begrenzten Bereich für einen Primärschlüssel. In diesem Fall wird derselbe Primärschlüssel von Tausenden von Tabellen an dieselbe Partition gesendet, was zu einer Drosselung führt. Der Standardwert ist false.

  • IncludeTableAlterOperations – Enthält alle DDL-Operationen (DDL = Data Definition Language), die die Tabelle in den Steuerungsdaten ändern, wie etwa rename-table, drop-table, add-column, drop-column und rename-column. Der Standardwert ist false.

  • IncludeControlDetails – Zeigt detaillierte Steuerungsinformationen für Tabellendefinition, Spaltendefinition und Tabellen- und Spaltenänderungen in der Kafka-Nachrichtenausgabe an. Der Standardwert ist false.

  • IncludeNullAndEmpty – Schließt NULL-Spalten und leere Spalten in das Ziel ein. Der Standardwert ist false.

  • SecurityProtocol – Richtet eine sichere Verbindung mit einem Kafka-Zielendpunkt mit Transport Layer Security (TLS) ein. Optionen: ssl-authentication, ssl-encryption und sasl-ssl. Für die Verwendung von sasl-ssl sind SaslUsername und SaslPassword erforderlich.

  • SslEndpointIdentificationAlgorithm – Legt die Hostnamenüberprüfung für das Zertifikat fest. Diese Einstellung wird in AWS DMS Version 3.5.1 und höher unterstützt. Es gibt die folgenden Optionen:

    • NONE: Deaktivieren Sie die Hostnamenverifizierung des Brokers in der Clientverbindung.

    • HTTPS: Aktivieren Sie die Hostnamenüberprüfung des Brokers in der Client-Verbindung.

Sie können Einstellungen verwenden, um die Übertragungsgeschwindigkeit zu erhöhen. Dazu unterstützt AWS DMS Multi-Thread-Volllastvorgänge in einen Ziel-Cluster in Apache Kafka. AWS DMS unterstützt Multi-Threading u. a. mithilfe der folgenden Aufgabeneinstellungen:

  • MaxFullLoadSubTasks – Verwenden Sie diese Option, um die maximale Anzahl von Quelltabellen anzugeben, die parallel geladen werden sollen. AWS DMS lädt jede Tabelle mithilfe einer dedizierten Unteraufgabe in die entsprechende Kafka-Zieltabelle. Der Standardwert beträgt 8; der Maximalwert beträgt 49.

  • ParallelLoadThreads – Verwenden Sie diese Option, um die Anzahl der Threads anzugeben, die AWS DMS verwendet, um jede Tabelle in ihre Kafka-Zieltabelle zu laden. Der maximale Wert für ein Apache Kafka-Ziel ist 32. Sie können eine Erhöhung dieses Höchstwerts anfordern.

  • ParallelLoadBufferSize – Verwenden Sie diese Option, um die maximale Anzahl der Datensätze anzugeben, die in dem Puffer gespeichert werden sollen, den die parallelen Lade-Threads zum Laden von Daten in das Kafka-Ziel verwenden. Der Standardwert lautet 50. Die maximale Wert ist 1.000. Verwenden Sie diese Einstellung mit ParallelLoadThreads; ParallelLoadBufferSize ist nur gültig, wenn es mehr als einen Thread gibt.

  • ParallelLoadQueuesPerThread – Verwenden Sie diese Option, um die Anzahl der Warteschlangen anzugeben, auf die jeder gleichzeitige Thread zugreift, um Datensätze aus Warteschlangen zu entfernen und eine Stapellast für das Ziel zu generieren. Der Standardwert ist 1. Der Höchstwert ist 512.

Sie können die Leistung der Erfassung von Datenänderungen (Change Data Capture, CDC) für Kafka-Endpunkte verbessern, indem Sie die Aufgabeneinstellungen für parallele Threads und Massenoperationen optimieren. Dazu können Sie die Anzahl der gleichzeitigen Threads, der Warteschlangen pro Thread und die Anzahl der Datensätze angeben, die in einem Puffer unter Verwendung von ParallelApply*-Aufgabeneinstellungen gespeichert werden sollen. Beispiel: Sie möchten eine CDC-Last durchführen und 128 Threads parallel anwenden. Außerdem möchten Sie auf 64 Warteschlangen pro Thread zugreifen, wobei 50 Datensätze pro Puffer gespeichert sind.

Um die CDC-Leistung hochzustufen, AWS DMS unterstützt die folgenden Aufgabeneinstellungen:

  • ParallelApplyThreads – Gibt die Anzahl der gleichzeitigen Threads an, die während eines CDC-Ladevorgangs AWS DMS verwendet, um Datensätze an einen Kafka-Zielendpunkt zu übertragen. Der Standardwert ist Null (0) und der maximale Wert ist 32.

  • ParallelApplyBufferSize – Gibt die maximale Anzahl von Datensätzen an, die in jeder Pufferwarteschlange für gleichzeitige Threads gespeichert werden sollen, um sie während einer CDC-Last an einen Kafka-Zielendpunkt zu übertragen. Der Standardwert ist 100 und der Höchstwert 1 000. Verwenden Sie diese Option, wenn ParallelApplyThreads mehrere Threads angibt.

  • ParallelApplyQueuesPerThread – Gibt die Anzahl der Warteschlangen an, auf die jeder Thread zugreift, um Datensätze aus Warteschlangen zu entfernen und während CDC eine Stapellast für einen Kafka-Endpunkt zu generieren. Der Standardwert ist 1. Der Höchstwert ist 512.

Wenn Sie ParallelApply*-Aufgabeneinstellungen verwenden, ist der primary-key der Tabelle der partition-key-type-Standardwert, nicht schema-name.table-name.

Herstellen einer Verbindung zu Kafka mit Transport Layer Security (TLS)

Ein Kafka-Cluster akzeptiert sichere Verbindungen mit Transport Layer Security (TLS). Mit DMS können Sie eine der folgenden drei Sicherheitsprotokoll-Optionen verwenden, um eine Verbindung mit einem Kafka-Endpunkt zu sichern.

SSL-Verschlüsselung (server-encryption)

Clients validieren die Serveridentität anhand des Serverzertifikats. Anschließend wird eine verschlüsselte Verbindung zwischen Server und Client hergestellt.

SSL-Authentifizierung (mutual-authentication)

Server und Client validieren die Identität untereinander durch ihre eigenen Zertifikate. Anschließend wird eine verschlüsselte Verbindung zwischen Server und Client hergestellt.

SASL-SSL (mutual-authentication)

Bei der Methode Simple Authentication and Security Layer (SASL) wird das Zertifikat des Clients durch einen Benutzernamen und ein Passwort ersetzt, um die Identität eines Clients zu überprüfen. Sie geben einen Benutzernamen und ein Passwort an, die der Server registriert hat, damit der Server die Identität eines Clients überprüfen kann. Anschließend wird eine verschlüsselte Verbindung zwischen Server und Client hergestellt.

Wichtig

Apache Kafka und Amazon MSK akzeptieren aufgelöste Zertifikate. Dies ist eine bekannte Einschränkung von Kafka und Amazon MSK, die behoben werden muss. Weitere Informationen finden Sie unter Apache Kafka issues, KAFKA-3700.

Wenn Sie Amazon MSK verwenden, sollten Sie die Verwendung von Zugriffssteuerungslisten (Access Control Lists, ACLs) als Problemumgehung für diese bekannte Einschränkung in Betracht ziehen. Weitere Informationen zur Verwendung von ACLs finden Sie im Abschnitt Apache Kafka ACLs im Entwicklerhandbuch für Amazon Managed Streaming für Apache Kafka.

Wenn Sie einen selbstverwalteten Kafka-Cluster verwenden, finden Sie Informationen zur Konfiguration des Clusters im Kommentar vom 21. Oktober 18.

Verwenden von SSL-Verschlüsselung mit Amazon MSK oder einem selbstverwalteten Kafka-Cluster

Sie können SSL-Verschlüsselung verwenden, um eine Endpunktverbindung mit Amazon MSK oder einem selbstverwalteten Kafka-Cluster zu sichern. Wenn Sie die SSL-Verschlüsselung als Authentifizierungsmethode verwenden, validieren Clients die Identität eines Servers anhand des Serverzertifikats. Anschließend wird eine verschlüsselte Verbindung zwischen Server und Client hergestellt.

So verwenden Sie SSL-Verschlüsselung für die Verbindung mit Amazon MSK
  • Legen Sie die Sicherheitsprotokoll-Endpunkteinstellung (SecurityProtocol) mithilfe der Option ssl-encryption fest, wenn Sie Ihren Kafka-Zielendpunkt erstellen.

    Im folgenden JSON-Beispiel wird das Sicherheitsprotokoll als SSL-Verschlüsselung festgelegt.

"KafkaSettings": { "SecurityProtocol": "ssl-encryption", }
So verwenden Sie SSL-Verschlüsselung für einen selbstverwalteten Kafka-Cluster
  1. Wenn Sie eine private Zertifizierungsstelle (Certification Authority, CA) in Ihrem On-Premises-Kafka-Cluster verwenden, laden Sie Ihr privates CA-Zertifikat hoch, um einen Amazon-Ressourcennamen (ARN) abzurufen.

  2. Legen Sie die Sicherheitsprotokoll-Endpunkteinstellung (SecurityProtocol) mithilfe der Option ssl-encryption fest, wenn Sie Ihren Kafka-Zielendpunkt erstellen. Im folgenden JSON-Beispiel wird das Sicherheitsprotokoll als ssl-encryption festgelegt.

    "KafkaSettings": { "SecurityProtocol": "ssl-encryption", }
  3. Wenn Sie eine private CA verwenden, legen Sie SslCaCertificateArn im ARN fest, den Sie im ersten Schritt abgerufen haben.

Verwenden von SSL-Authentifizierung

Sie können SSL-Authentifizierung verwenden, um eine Endpunktverbindung mit Amazon MSK oder einem selbstverwalteten Kafka-Cluster zu sichern.

Gehen Sie wie folgt vor, um die Client-Authentifizierung und -Verschlüsselung mithilfe von SSL-Authentifizierung für die Verbindung mit Amazon MSK zu aktivieren:

  • Bereiten Sie einen privaten Schlüssel und ein öffentliches Zertifikat für Kafka vor.

  • Laden Sie die Zertifikate in den DMS-Zertifikatmanager hoch.

  • Erstellen Sie einen Kafka-Zielendpunkt mit den entsprechenden in den Kafka-Endpunkteinstellungen angegebenen Zertifikat-ARNs.

So bereiten Sie einen privaten Schlüssel und ein öffentliches Zertifikat für Amazon MSK vor
  1. Erstellen Sie eine EC2-Instance und richten Sie einen Client zur Verwendung der Authentifizierung ein, wie in den Schritten 1 bis 9 im Abschnitt zur Client-Authentifizierung im Entwicklerhandbuch für Amazon Managed Streaming für Apache Kafka beschrieben.

    Nachdem Sie diese Schritte abgeschlossen haben, verfügen Sie über einen Zertifikat-ARN (den in ACM gespeicherten öffentlichen Zertifikat-ARN) und einen privaten Schlüssel, der in einer Datei mit dem Namen kafka.client.keystore.jks enthalten ist.

  2. Rufen Sie das öffentliche Zertifikat ab und kopieren Sie das Zertifikat mit dem folgenden Befehl in die Datei signed-certificate-from-acm.pem:

    aws acm-pca get-certificate --certificate-authority-arn Private_CA_ARN --certificate-arn Certificate_ARN

    Dieser Befehl gibt ähnliche Informationen wie im folgenden Beispiel zurück:

    {"Certificate": "123", "CertificateChain": "456"}

    Anschließend kopieren Sie Ihr Äquivalent von "123" in die Datei signed-certificate-from-acm.pem.

  3. Rufen Sie den privaten Schlüssel ab, indem Sie den Schlüssel msk-rsa aus kafka.client.keystore.jks to keystore.p12 importieren, wie im folgenden Beispiel gezeigt.

    keytool -importkeystore \ -srckeystore kafka.client.keystore.jks \ -destkeystore keystore.p12 \ -deststoretype PKCS12 \ -srcalias msk-rsa-client \ -deststorepass test1234 \ -destkeypass test1234
  4. Verwenden Sie den folgenden Befehl, um keystore.p12 in das Format .pem zu exportieren.

    Openssl pkcs12 -in keystore.p12 -out encrypted-private-client-key.pem –nocerts

    Die Meldung PEM-Passphrase eingeben wird angezeigt und gibt den Schlüssel an, der zur Verschlüsselung des Zertifikats verwendet wird.

  5. Entfernen Sie Taschen- und Schlüsselattribute aus der .pem-Datei, um sicherzustellen, dass die erste Zeile mit der folgenden Zeichenfolge beginnt.

    ---BEGIN ENCRYPTED PRIVATE KEY---
So laden Sie ein öffentliches Zertifikat und einen privaten Schlüssel in den DMS-Zertifikatmanager hoch und testen die Verbindung mit Amazon MSK
  1. Führen Sie den folgenden Befehl aus, um den Upload in den DMS-Zertifikatmanager auszuführen.

    aws dms import-certificate --certificate-identifier signed-cert --certificate-pem file://path to signed cert aws dms import-certificate --certificate-identifier private-key —certificate-pem file://path to private key
  2. Erstellen Sie einen Amazon-MSK-Zielendpunkt und testen Sie die Verbindung, um sicherzustellen, dass die TLS-Authentifizierung funktioniert.

    aws dms create-endpoint --endpoint-identifier $endpoint-identifier --engine-name kafka --endpoint-type target --kafka-settings '{"Broker": "b-0.kafka260.aaaaa1.a99.kafka.us-east-1.amazonaws.com:0000", "SecurityProtocol":"ssl-authentication", "SslClientCertificateArn": "arn:aws:dms:us-east-1:012346789012:cert:", "SslClientKeyArn": "arn:aws:dms:us-east-1:0123456789012:cert:","SslClientKeyPassword":"test1234"}' aws dms test-connection -replication-instance-arn=$rep_inst_arn —endpoint-arn=$kafka_tar_arn_msk
Wichtig

Sie können SSL-Authentifizierung verwenden, um eine Verbindung mit einem selbstverwalteten Kafka-Cluster zu sichern. In Einzelfällen können Sie eine private Zertifizierungsstelle (CA) in Ihrem On-Premises-Kafka-Cluster verwenden. Laden Sie in diesem Fall Ihre CA-Kette, das öffentliche Zertifikat und den privaten Schlüssel in den DMS-Zertifikatmanager hoch. Verwenden Sie dann den entsprechenden Amazon-Ressourcennamen (ARN) in Ihren Endpunkteinstellungen, wenn Sie Ihren On-Premises-Kafka-Zielendpunkt erstellen.

So bereiten Sie einen privaten Schlüssel und ein signiertes Zertifikat für einen selbstverwalteten Kafka-Cluster vor
  1. Erzeugen Sie ein Schlüsselpaar wie im folgenden Beispiel dargestellt.

    keytool -genkey -keystore kafka.server.keystore.jks -validity 300 -storepass your-keystore-password -keypass your-key-passphrase -dname "CN=your-cn-name" -alias alias-of-key-pair -storetype pkcs12 -keyalg RSA
  2. Generieren Sie eine Anfrage zum Signieren des Zertifikats (Certificate Sign Request, CSR).

    keytool -keystore kafka.server.keystore.jks -certreq -file server-cert-sign-request-rsa -alias on-premise-rsa -storepass your-key-store-password -keypass your-key-password
  3. Verwenden Sie die CA in Ihrem Cluster-Truststore zum Signieren der CSR. Wenn Sie keine CA haben, können Sie Ihre eigene private CA erstellen.

    openssl req -new -x509 -keyout ca-key -out ca-cert -days validate-days
  4. Importieren Sie ca-cert in den Truststore und Keystore des Servers. Wenn Sie keinen Truststore haben, führen Sie folgenden Befehl aus, um den Truststore zu erstellen und ca-cert in diesen zu importieren.

    keytool -keystore kafka.server.truststore.jks -alias CARoot -import -file ca-cert keytool -keystore kafka.server.keystore.jks -alias CARoot -import -file ca-cert
  5. Signieren Sie das Zertifikat.

    openssl x509 -req -CA ca-cert -CAkey ca-key -in server-cert-sign-request-rsa -out signed-server-certificate.pem -days validate-days -CAcreateserial -passin pass:ca-password
  6. Importieren Sie das signierte Zertifikat in den Keystore.

    keytool -keystore kafka.server.keystore.jks -import -file signed-certificate.pem -alias on-premise-rsa -storepass your-keystore-password -keypass your-key-password
  7. Verwenden Sie den folgenden Befehl, um den Schlüssel on-premise-rsa von kafka.server.keystore.jks in keystore.p12 zu importieren.

    keytool -importkeystore \ -srckeystore kafka.server.keystore.jks \ -destkeystore keystore.p12 \ -deststoretype PKCS12 \ -srcalias on-premise-rsa \ -deststorepass your-truststore-password \ -destkeypass your-key-password
  8. Verwenden Sie den folgenden Befehl, um keystore.p12 in das Format .pem zu exportieren.

    Openssl pkcs12 -in keystore.p12 -out encrypted-private-server-key.pem –nocerts
  9. Laden Sie encrypted-private-server-key.pem, signed-certificate.pem und ca-cert in den DMS-Zertifikatmanager hoch.

  10. Erstellen Sie einen Endpunkt mithilfe der zurückgegebenen ARNs.

    aws dms create-endpoint --endpoint-identifier $endpoint-identifier --engine-name kafka --endpoint-type target --kafka-settings '{"Broker": "b-0.kafka260.aaaaa1.a99.kafka.us-east-1.amazonaws.com:9092", "SecurityProtocol":"ssl-authentication", "SslClientCertificateArn": "your-client-cert-arn","SslClientKeyArn": "your-client-key-arn","SslClientKeyPassword":"your-client-key-password", "SslCaCertificateArn": "your-ca-certificate-arn"}' aws dms test-connection -replication-instance-arn=$rep_inst_arn —endpoint-arn=$kafka_tar_arn_msk

Verwenden von SASL-SSL-Authentifizierung für die Verbindung mit Amazon MSK

Bei der Methode Simple Authentication and Security Layer (SASL) werden ein Benutzername und ein Passwort verwendet, um die Identität eines Clients zu überprüfen, und es wird eine verschlüsselte Verbindung zwischen Server und Client hergestellt.

Um SASL zu verwenden, erstellen Sie zunächst einen sicheren Benutzernamen und ein sicheres Passwort, wenn Sie Ihren Amazon-MSK-Cluster einrichten. Eine Beschreibung, wie sich ein sicherer Benutzername und ein sicheres Passwort für einen Amazon-MSK-Cluster einrichten lassen, finden Sie unter Einrichtung der SASL/SCRAM-Authentifizierung für einen Amazon-MSK-Cluster im Entwicklerhandbuch für Amazon Managed Streaming für Apache Kafka.

Wenn Sie dann Ihren Kafka-Zielendpunkt erstellen, legen Sie die Sicherheitsprotokoll-Endpunkteinstellung (SecurityProtocol) mithilfe der Option sasl-ssl fest. Legen Sie auch die Optionen SaslUsername und SaslPassword fest. Stellen Sie sicher, dass diese mit dem sicheren Benutzernamen und Passwort übereinstimmen, die Sie bei der erstmaligen Einrichtung Ihres Amazon-MSK-Clusters erstellt haben, wie im folgenden JSON-Beispiel gezeigt.

"KafkaSettings": { "SecurityProtocol": "sasl-ssl", "SaslUsername":"Amazon MSK cluster secure user name", "SaslPassword":"Amazon MSK cluster secure password" }
Anmerkung
  • Derzeit AWS DMS unterstützt nur öffentliche CA-gestützte SASL-SSL. DMS unterstützt SASL-SSL nicht für die Verwendung mit selbstverwaltetem Kafka, das von einer privaten CA unterstützt wird.

  • Für die SASL-SSL-Authentifizierung AWS DMS unterstützt standardmäßig den SCRAM-SHA-512-Mechanismus. - AWS DMS Versionen 3.5.0 und höher unterstützen auch den Plain-Mechanismus. Setzen Sie den Parameter SaslMechanism des API-Datentyps KafkaSettings auf PLAIN, um den Plain-Mechanismus zu unterstützen.

Verwenden eines Vorher-Abbilds zum Anzeigen von Originalwerten von CDC-Zeilen für Apache Kafka als Ziel

Beim Schreiben von CDC-Aktualisierungen auf ein Data-Streaming-Ziel wie Kafka können Sie die ursprünglichen Werte einer Quelldatenbankzeile anzeigen, bevor sie durch eine Aktualisierung geändert werden. Damit dies möglich ist, AWS DMS füllt ein Vorher-Image von Aktualisierungsereignissen basierend auf Daten aus, die von der Quelldatenbank-Engine bereitgestellt werden.

Verschiedene Quelldatenbank-Engines liefern unterschiedliche Mengen an Informationen für ein Vorher-Abbild:

  • Oracle stellt für Spalten nur dann Aktualisierungen bereit, wenn sie sich ändern.

  • PostgreSQL stellt nur Daten für Spalten bereit, die Teil des Primärschlüssels sind (geändert oder nicht). Wenn die logische Replikation verwendet wird und REPLICA IDENTITY FULL für die Quelltabelle festgelegt ist, können Sie vollständige Vorher-Nachher-Informationen zu der Zeile abrufen, die in die WALs geschrieben wurde und hier verfügbar ist.

  • MySQL stellt generell Daten für alle Spalten (geändert oder nicht) bereit.

Verwenden Sie entweder die BeforeImageSettings-Aufgabeneinstellung oder den add-before-image-columns-Parameter, um die Erstellung von Vorher-Abbildern zum Hinzufügen von Originalwerten aus der Quelldatenbank zur AWS DMS -Ausgabe zu aktivieren. Dieser Parameter wendet eine Spalten-Transformationsregel an.

BeforeImageSettings fügt jeder Aktualisierungsoperation ein neues JSON-Attribut mit Werten hinzu, die aus dem Quelldatenbanksystem erfasst werden, wie nachfolgend gezeigt.

"BeforeImageSettings": { "EnableBeforeImage": boolean, "FieldName": string, "ColumnFilter": pk-only (default) / non-lob / all (but only one) }
Anmerkung

Wenden Sie BeforeImageSettings auf Volllast plus CDC-Aufgaben (die vorhandene Daten migrieren und laufende Änderungen replizieren) oder nur auf CDC-Aufgaben (die nur Datenänderungen replizieren) an. Wenden Sie BeforeImageSettings nicht auf Nur-Volllast-Aufgaben an.

Für BeforeImageSettings-Optionen gilt Folgendes:

  • Legen Sie die EnableBeforeImage-Option vor dem Imaging auf true fest. Der Standardwert ist false.

  • Verwenden Sie die FieldName-Option, um dem neuen JSON-Attribut einen Namen zuzuweisen. Wann EnableBeforeImage true ist, ist FieldName erforderlich und darf nicht leer sein.

  • Die ColumnFilter-Option gibt eine Spalte an, die vor dem Imaging hinzugefügt werden soll. Wenn Sie nur Spalten hinzufügen möchten, die Teil der Primärschlüssel der Tabelle sind, verwenden Sie den Standardwert pk-only. Wenn Sie nur Spalten hinzufügen möchten, die nicht vom LOB-Typ sind, verwenden Sie non-lob. Wenn Sie eine Spalte hinzufügen möchten, die einen Vorher-Abbild-Wert hat, verwenden Sie all.

    "BeforeImageSettings": { "EnableBeforeImage": true, "FieldName": "before-image", "ColumnFilter": "pk-only" }

Verwenden einer Vorher-Abbild-Transformationsregel

Alternativ zu den Aufgabeneinstellungen können Sie den add-before-image-columns-Parameter verwenden, der eine Spalten-Transformationsregel anwendet. Mit diesem Parameter können Sie das Vorher-Imaging während des CDC-Vorgangs auf Data-Streaming-Zielen wie Kafka aktivieren.

Wenn Sie add-before-image-columns in einer Transformationsregel verwenden, können Sie eine feinere Steuerung der Ergebnisse für das Vorher-Abbild anwenden. Mit Transformationsregeln können Sie einen Objekt-Locator verwenden, der Ihnen die Kontrolle über die für die Regel ausgewählten Tabellen gibt. Außerdem können Sie Transformationsregeln miteinander verketten, wodurch verschiedene Regeln auf verschiedene Tabellen angewendet werden können. Anschließend können Sie die erzeugten Spalten mithilfe anderer Regeln bearbeiten.

Anmerkung

Verwenden Sie den add-before-image-columns-Parameter nicht zusammen mit der BeforeImageSettings-Aufgabeneinstellung innerhalb derselben Aufgabe. Verwenden Sie stattdessen entweder den Parameter oder die Einstellung, aber nicht beide, für eine einzelne Aufgabe.

Ein transformation-Regeltyp mit dem add-before-image-columns-Parameter für eine Spalte muss einen before-image-def-Abschnitt bereitstellen. Es folgt ein Beispiel.

{ "rule-type": "transformation", … "rule-target": "column", "rule-action": "add-before-image-columns", "before-image-def":{ "column-filter": one-of (pk-only / non-lob / all), "column-prefix": string, "column-suffix": string, } }

Der Wert von column-prefix wird einem Spaltennamen vorangestellt, und der Standardwert von column-prefix ist BI_. Der Wert von column-suffix wird an den Spaltennamen angehängt, und der Standardwert ist leer. Setzen Sie nicht column-prefix und column-suffix auf leere Zeichenfolgen.

Wählen Sie einen Wert für column-filter. Wenn Sie nur Spalten hinzufügen möchten, die Teil der Primärschlüssel der Tabelle sind, wählen Sie pk-only . Wählen Sie non-lob, um nur Spalten hinzuzufügen, die nicht vom LOB-Typ sind. Oder lassenall Sie eine Spalte hinzufügen, die einen Vorher-Abbild-Wert hat.

Beispiel für eine Vorher-Abbild-Transformationsregel

Die Transformationsregel im folgenden Beispiel fügt eine neue Spalte mit dem Namen BI_emp_no auf dem Ziel hinzu. Eine Anweisung wie UPDATE employees SET emp_no = 3 WHERE emp_no = 1; füllt daher das BI_emp_no Feld mit 1. Wenn Sie CDC-Aktualisierungen für Amazon-S3-Ziele schreiben, ermöglicht die BI_emp_no-Spalte, zu erkennen, welche ursprüngliche Zeile aktualisiert wurde.

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "object-locator": { "schema-name": "%", "table-name": "%" }, "rule-action": "include" }, { "rule-type": "transformation", "rule-id": "2", "rule-name": "2", "rule-target": "column", "object-locator": { "schema-name": "%", "table-name": "employees" }, "rule-action": "add-before-image-columns", "before-image-def": { "column-prefix": "BI_", "column-suffix": "", "column-filter": "pk-only" } } ] }

Weitere Informationen zur Verwendung der add-before-image-columns-Regelaktion finden Sie unter Transformationsregeln und Aktionen.

Einschränkungen bei der Verwendung von Apache Kafka als Ziel für AWS Database Migration Service

Bei der Verwendung von Apache Kafka als Ziel gelten die folgenden Einschränkungen:

  • AWS DMS Kafka-Zielendpunkte unterstützen keine IAM-Zugriffskontrolle für Amazon Managed Streaming for Apache Kafka (Amazon MSK).

  • Der vollständige LOB-Modus wird nicht unterstützt.

  • Geben Sie eine Kafka-Konfigurationsdatei für Ihren Cluster mit Eigenschaften an, die es ermöglichen, automatisch neue Themen AWS DMS zu erstellen. Schließen Sie die Einstellung auto.create.topics.enable = true ein. Wenn Sie Amazon MSK verwenden, können Sie beim Erstellen Ihres Kafka-Clusters die Standardkonfiguration angeben und dann die Einstellung auto.create.topics.enable in true ändern. Weitere Informationen zu den Standardkonfigurationseinstellungen finden Sie unter Die Standardkonfiguration von Amazon MSK im Entwicklerhandbuch für Amazon Managed Streaming für Apache Kafka. Wenn Sie einen vorhandenen Kafka-Cluster ändern müssen, der mit Amazon MSK erstellt wurde, führen Sie den AWS CLI Befehl aus, aws kafka create-configuration um Ihre Kafka-Konfiguration zu aktualisieren, wie im folgenden Beispiel:

    14:38:41 $ aws kafka create-configuration --name "kafka-configuration" --kafka-versions "2.2.1" --server-properties file://~/kafka_configuration { "LatestRevision": { "Revision": 1, "CreationTime": "2019-09-06T14:39:37.708Z" }, "CreationTime": "2019-09-06T14:39:37.708Z", "Name": "kafka-configuration", "Arn": "arn:aws:kafka:us-east-1:111122223333:configuration/kafka-configuration/7e008070-6a08-445f-9fe5-36ccf630ecfd-3" }

    Hier ist //~/kafka_configuration die Konfigurationsdatei, die Sie mit den erforderlichen Eigenschaftseinstellungen erstellt haben.

    Wenn Sie Ihre eigene Kafka-Instance verwenden, die auf Amazon EC2 installiert ist, ändern Sie die Konfiguration des Kafka-Clusters mit der -auto.create.topics.enable = trueEinstellung, damit AWS DMS automatisch neue Themen erstellen kann, indem Sie die Optionen verwenden, die mit Ihrer Instance bereitgestellt werden.

  • AWS DMS veröffentlicht jede Aktualisierung für einen einzelnen Datensatz in der Quelldatenbank als einen Datensatz (Nachricht) in einem bestimmten Kafka-Thema, unabhängig von Transaktionen.

  • AWS DMS unterstützt die folgenden zwei Formen für Partitionsschlüssel:

    • SchemaName.TableName: Eine Kombination des Schema- und Tabellennamens.

    • ${AttributeName}: Der Wert in einem der Felder in der JSON-Datei oder der Primärschlüssel der Tabelle in der Quelldatenbank.

  • BatchApply wird für einen Kafka-Endpunkt nicht unterstützt. Bei Verwendung von Batch Apply (z. B. der Zielmetadaten-Aufgabeneinstellung BatchApplyEnabled) für ein Kafka-Ziel kann es zu einem Datenverlust kommen.

  • AWS DMS unterstützt nicht die Migration von Werten des BigInt Datentyps mit mehr als 16 Ziffern. Um diese Einschränkung zu umgehen, können Sie die folgende Transformationsregel verwenden, um die BigInt-Spalte in eine Zeichenfolge zu konvertieren. Informationen zu Transformationsregeln finden Sie unter Transformationsregeln und Aktionen.

    { "rule-type": "transformation", "rule-id": "id", "rule-name": "name", "rule-target": "column", "object-locator": { "schema-name": "valid object-mapping rule action", "table-name": "", "column-name": "" }, "rule-action": "change-data-type", "data-type": { "type": "string", "length": 20 } }

Verwenden der Objektzuweisung zum Migrieren von Daten zu einem Kafka-Thema

AWS DMS verwendet Tabellenzuordnungsregeln, um Daten aus der Quelle dem Ziel-Kafka-Thema zuzuordnen. Um Daten einem Zielthema zuzuweisen, verwenden Sie eine Art von Tabellenzuweisungsregel, die als Objektzuweisung bezeichnet wird. Mit der Objektzuweisung legen Sie fest, wie Datensätze in der Quelle den in einem Kafka-Thema veröffentlichten Datensätzen zugewiesen werden.

Kafka-Themen verfügen bis auf einen Partitionsschlüssel über keine voreingestellte Struktur.

Anmerkung

Sie müssen die Objektzuweisung nicht verwenden. Sie können die reguläre Tabellenzuweisung für verschiedene Transformationen verwenden. Der Partitionsschlüsseltyp folgt jedoch diesen Standardverhaltensweisen:

  • Der Primärschlüssel wird als Partitionsschlüssel für Volllast verwendet.

  • Wenn keine Aufgabeneinstellungen für parallele Anwendung verwendet werden, wird schema.table als Partitionsschlüssel für CDC verwendet.

  • Wenn Aufgabeneinstellungen für parallele Anwendung verwendet werden, wird der Primärschlüssel als Partitionsschlüssel für CDC verwendet.

Um eine Objektzuweisungsregel zu erstellen, legen Sie rule-type als object-mapping fest. Diese Regel gibt an, welchen Objektzuweisungstyp Sie verwenden möchten.

Die Struktur für die Regel lautet wie folgt.

{ "rules": [ { "rule-type": "object-mapping", "rule-id": "id", "rule-name": "name", "rule-action": "valid object-mapping rule action", "object-locator": { "schema-name": "case-sensitive schema name", "table-name": "" } } ] }

AWS DMS unterstützt derzeit map-record-to-record und map-record-to-document als einzige gültige Werte für den -rule-actionParameter. Diese Einstellungen wirken sich auf Werte aus, die nicht in der exclude-columns-Attributliste ausgeschlossen sind. Die map-record-to-document Werte map-record-to-record und geben an, wie diese Datensätze standardmäßig AWS DMS verarbeitet. Diese Werte wirken sich in keiner Weise auf die Attributzuweisungen aus.

Verwenden Sie map-record-to-record beim Migrieren aus einer relationalen Datenbank zu einem Kafka-Thema. Dieser Regeltyp verwendet den taskResourceId.schemaName.tableName-Wert aus der relationalen Datenbank als Partitionsschlüssel in dem Kafka-Thema und erstellt ein Attribut für jede Spalte in der Quelldatenbank.

Beachten Sie bei Verwendung von map-record-to-record Folgendes:

  • Diese Einstellung wirkt sich nur auf Spalten aus, die durch die exclude-columns-Liste ausgeschlossen wurden.

  • Für jede solche Spalte AWS DMS erstellt ein entsprechendes Attribut im Zielthema.

  • AWS DMS erstellt dieses entsprechende Attribut unabhängig davon, ob die Quellspalte in einer Attributzuordnung verwendet wird.

Eine Möglichkeit, map-record-to-record zu verstehen, besteht darin, sich die praktische Anwendung zu veranschaulichen. In diesem Beispiel wird davon ausgegangen, dass Sie mit einer Tabellenzeile einer relationalen Datenbank beginnen, die die folgende Struktur aufweist und die folgenden Daten enthält.

FirstName LastName StoreId HomeAddress HomePhone WorkAddress WorkPhone DateofBirth

Randy

Marsh

5

221B Baker Street

1234567890

31 Spooner Street, Quahog

9876543210

02/29/1988

Um diese Informationen von einem Schema mit dem Namen Test zu einem Kafka-Thema zu migrieren, erstellen Sie Regeln, um die Daten dem Zielthema zuzuweisen. Die folgende Regel veranschaulicht die Zuweisung.

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "rule-action": "include", "object-locator": { "schema-name": "Test", "table-name": "%" } }, { "rule-type": "object-mapping", "rule-id": "2", "rule-name": "DefaultMapToKafka", "rule-action": "map-record-to-record", "object-locator": { "schema-name": "Test", "table-name": "Customers" } } ] }

Nachfolgend wird das resultierende Datensatzformat bei Verwendung unserer Beispieldaten in dem Kafka-Zielthema für ein bestimmtes Kafka-Thema und einen bestimmten Partitionsschlüssel (in diesem Fall taskResourceId.schemaName.tableName) illustriert.

{ "FirstName": "Randy", "LastName": "Marsh", "StoreId": "5", "HomeAddress": "221B Baker Street", "HomePhone": "1234567890", "WorkAddress": "31 Spooner Street, Quahog", "WorkPhone": "9876543210", "DateOfBirth": "02/29/1988" }

Umstrukturieren von Daten mit Attributzuweisung

Sie können die Daten umstrukturieren, während Sie sie mittels einer Attributzuweisung zu einem Kafka-Thema migrieren. So möchten Sie zum Beispiel vielleicht mehrere Felder in der Quelle in einem einzigen Feld im Ziel vereinen. Die folgenden Attributzuordnung veranschaulicht, wie die Daten umstrukturiert werden.

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "rule-action": "include", "object-locator": { "schema-name": "Test", "table-name": "%" } }, { "rule-type": "object-mapping", "rule-id": "2", "rule-name": "TransformToKafka", "rule-action": "map-record-to-record", "target-table-name": "CustomerData", "object-locator": { "schema-name": "Test", "table-name": "Customers" }, "mapping-parameters": { "partition-key-type": "attribute-name", "partition-key-name": "CustomerName", "exclude-columns": [ "firstname", "lastname", "homeaddress", "homephone", "workaddress", "workphone" ], "attribute-mappings": [ { "target-attribute-name": "CustomerName", "attribute-type": "scalar", "attribute-sub-type": "string", "value": "${lastname}, ${firstname}" }, { "target-attribute-name": "ContactDetails", "attribute-type": "document", "attribute-sub-type": "json", "value": { "Home": { "Address": "${homeaddress}", "Phone": "${homephone}" }, "Work": { "Address": "${workaddress}", "Phone": "${workphone}" } } } ] } } ] }

Um einen konstanten Wert für partition-key festzulegen, geben Sie einen partition-key-Wert an. So könnten Sie auf diese Weise beispielsweise erzwingen, dass alle Daten in einer einzigen Partition gespeichert werden. Die folgende Darstellung veranschaulicht dieses Konzept.

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "object-locator": { "schema-name": "Test", "table-name": "%" }, "rule-action": "include" }, { "rule-type": "object-mapping", "rule-id": "1", "rule-name": "TransformToKafka", "rule-action": "map-record-to-document", "object-locator": { "schema-name": "Test", "table-name": "Customer" }, "mapping-parameters": { "partition-key": { "value": "ConstantPartitionKey" }, "exclude-columns": [ "FirstName", "LastName", "HomeAddress", "HomePhone", "WorkAddress", "WorkPhone" ], "attribute-mappings": [ { "attribute-name": "CustomerName", "value": "${FirstName},${LastName}" }, { "attribute-name": "ContactDetails", "value": { "Home": { "Address": "${HomeAddress}", "Phone": "${HomePhone}" }, "Work": { "Address": "${WorkAddress}", "Phone": "${WorkPhone}" } } }, { "attribute-name": "DateOfBirth", "value": "${DateOfBirth}" } ] } } ] }
Anmerkung

Der partition-key-Wert für einen Steuerungsdatensatz, der für eine spezifische Tabelle bestimmt ist, lautet TaskId.SchemaName.TableName. Der partition-key-Wert für einen Steuerungsdatensatz, der für eine spezifische Aufgabe bestimmt ist, ist die TaskId des betreffenden Datensatzes. Wenn Sie einen partition-key-Wert in der Objektzuweisung angeben, hat dies keine Auswirkungen auf den partition-key für einen Steuerungsdatensatz.

Replikation für mehrere Themen mithilfe der Objektzuweisung

Standardmäßig migrieren AWS DMS Aufgaben alle Quelldaten zu einem der folgenden Kafka-Themen:

  • Wie im Feld Thema des AWS DMS Zielendpunkts angegeben.

  • Wie von kafka-default-topic angegeben, wenn das Feld Thema des Zielendpunkts nicht gefüllt ist und die Kafka-Einstellung auto.create.topics.enable auf true gesetzt ist.

Mit den AWS DMS Engine-Versionen 3.4.6 und höher können Sie das -kafka-target-topicAttribut verwenden, um jede migrierte Quelltabelle einem separaten Thema zuzuordnen. Mit den folgenden Objektzuweisungsregeln werden beispielsweise die Quelltabellen Customer und Address zu den Kafka-Themen customer_topic bzw. address_topic migriert. Gleichzeitig AWS DMS migriert alle anderen Quelltabellen, einschließlich der Bills Tabelle im Test Schema, zu dem im Zielendpunkt angegebenen Thema.

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "rule-action": "include", "object-locator": { "schema-name": "Test", "table-name": "%" } }, { "rule-type": "object-mapping", "rule-id": "2", "rule-name": "MapToKafka1", "rule-action": "map-record-to-record", "kafka-target-topic": "customer_topic", "object-locator": { "schema-name": "Test", "table-name": "Customer" }, "partition-key": {"value": "ConstantPartitionKey" } }, { "rule-type": "object-mapping", "rule-id": "3", "rule-name": "MapToKafka2", "rule-action": "map-record-to-record", "kafka-target-topic": "address_topic", "object-locator": { "schema-name": "Test", "table-name": "Address" }, "partition-key": {"value": "HomeAddress" } }, { "rule-type": "object-mapping", "rule-id": "4", "rule-name": "DefaultMapToKafka", "rule-action": "map-record-to-record", "object-locator": { "schema-name": "Test", "table-name": "Bills" } } ] }

Mithilfe der Kafka-Replikation für mehrere Themen können Sie Quelltabellen mit einer einzigen Replikationsaufgabe gruppieren und zu separaten Kafka-Themen migrieren.

Nachrichtenformat für Apache Kafka

Die JSON-Ausgabe ist einfach eine Liste von Schlüssel-Wert-Paaren.

RecordType

Der Datensatztyp kann entweder für Daten oder zur Steuerung bestimmt sein. Datensätze für Datenrepräsentieren die tatsächlichen Zeilen in der Quelle. Steuerungsdatensätze sind für wichtige Ereignisse im Stream bestimmt, z. B. einen Neustart der Aufgabe.

Operation

Mögliche Operationen für Datensätze sind load, insert, update oder delete.

Mögliche Operationen für Steuerungsdatensätze sind create-table, rename-table, drop-table, change-columns, add-column, drop-column, rename-column oder column-type-change.

SchemaName

Das Quellschema für den Datensatz. Dieses Feld kann für einen Steuerungsdatensatz leer sein.

TableName

Die Quelltabelle für den Datensatz. Dieses Feld kann für einen Steuerungsdatensatz leer sein.

Zeitstempel

Der Zeitstempel für den Zeitpunkt, an dem die JSON-Nachricht erstellt wurde. Das Feld ist mit dem ISO-8601-Format formatiert.

Das folgende Beispiel für eine JSON-Nachricht veranschaulicht eine Datentyp-Nachricht mit allen zusätzlichen Metadaten.

{ "data":{ "id":100000161, "fname":"val61s", "lname":"val61s", "REGION":"val61s" }, "metadata":{ "timestamp":"2019-10-31T22:53:59.721201Z", "record-type":"data", "operation":"insert", "partition-key-type":"primary-key", "partition-key-value":"sbtest.sbtest_x.100000161", "schema-name":"sbtest", "table-name":"sbtest_x", "transaction-id":9324410911751, "transaction-record-id":1, "prev-transaction-id":9324410910341, "prev-transaction-record-id":10, "commit-timestamp":"2019-10-31T22:53:55.000000Z", "stream-position":"mysql-bin-changelog.002171:36912271:0:36912333:9324410911751:mysql-bin-changelog.002171:36912209" } }

Das folgende Beispiel für eine JSON-Nachricht veranschaulicht eine Steuerungstyp-Nachricht.

{ "control":{ "table-def":{ "columns":{ "id":{ "type":"WSTRING", "length":512, "nullable":false }, "fname":{ "type":"WSTRING", "length":255, "nullable":true }, "lname":{ "type":"WSTRING", "length":255, "nullable":true }, "REGION":{ "type":"WSTRING", "length":1000, "nullable":true } }, "primary-key":[ "id" ], "collation-name":"latin1_swedish_ci" } }, "metadata":{ "timestamp":"2019-11-21T19:14:22.223792Z", "record-type":"control", "operation":"create-table", "partition-key-type":"task-id", "schema-name":"sbtest", "table-name":"sbtest_t1" } }