Menggunakan Apache Kafka sebagai target AWS Database Migration Service - AWS Layanan Migrasi Database

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Menggunakan Apache Kafka sebagai target AWS Database Migration Service

Anda dapat menggunakan AWS DMS untuk memigrasikan data ke cluster Apache Kafka. Apache Kafka adalah platform streaming terdistribusi. Anda dapat menggunakan Apache Kafka untuk menelan dan memproses data streaming secara langsung.

AWS juga menawarkan Amazon Managed Streaming for Apache Kafka (Amazon MSK) untuk digunakan sebagai target. AWS DMS Amazon MSK adalah layanan streaming Apache Kafka yang terkelola penuh yang menyederhanakan implementasi dan pengelolaan instans Apache Kafka. Ini bekerja dengan versi Apache Kafka open-source, dan Anda mengakses instans MSK Amazon sebagai AWS DMS target persis seperti instans Apache Kafka lainnya. Untuk informasi lebih lanjut, lihat Apa itu Amazon MSK? dalam Panduan Developer: Amazon Managed Streaming for Apache Kafka.

Sebuah klaster Kafka menyimpan aliran catatan dalam kategori yang disebut topik yang dibagi menjadi partisi. Partisi adalah urutan catatan data (pesan) yang diidentifikasi secara unik secara unik dalam suatu topik. Partisi dapat didistribusikan ke beberapa broker dalam sebuah klaster untuk mengaktifkan pemrosesan paralel catatan topik. Untuk informasi lebih lanjut tentang topik dan partisi dan distribusi mereka di Apache Kafka, lihat Topik dan log dan Distribusi.

Kluster Kafka Anda dapat berupa instans MSK Amazon, klaster yang berjalan di instans Amazon EC2, atau kluster lokal. Instans MSK Amazon atau cluster pada instans Amazon EC2 dapat berada di VPC yang sama atau yang berbeda. Jika klaster Anda lokal, Anda dapat menggunakan server nama lokal Anda sendiri untuk instance replikasi untuk menyelesaikan nama host klaster. Untuk informasi tentang menyiapkan server nama untuk instance replikasi Anda, lihat Menggunakan server nama on-premise Anda sendiri. Untuk informasi selengkapnya tentang pengaturan jaringan, lihatMenyiapkan jaringan untuk instans replikasi.

Saat menggunakan kluster MSK Amazon, pastikan grup keamanannya mengizinkan akses dari instance replikasi Anda. Untuk informasi tentang mengubah grup keamanan untuk klaster MSK Amazon, lihat Mengubah grup keamanan klaster MSK Amazon.

AWS Database Migration Service menerbitkan catatan ke topik Kafka menggunakan JSON. Selama konversi, AWS DMS membuat serial setiap catatan dari basis data sumber ke dalam pasangan atribut-nilai dalam format JSON.

Untuk memigrasikan data Anda dari sumber data apa pun yang didukung ke klaster Kafka target, Anda menggunakan pemetaan objek. Dengan pemetaan objek, Anda menentukan bagaimana struktur catatan data dalam topik target. Anda juga menentukan kunci partisi untuk setiap tabel, yang digunakan Apache Kafka untuk mengelompokkan data ke dalam partisi.

Saat ini, AWS DMS mendukung satu topik per tugas. Untuk satu tugas dengan beberapa tabel, semua pesan masuk ke satu topik. Setiap pesan menyertakan bagian metadata yang mengidentifikasi skema dan tabel target. AWS DMS versi 3.4.6 dan yang lebih tinggi mendukung replikasi multitopik menggunakan pemetaan objek. Untuk informasi selengkapnya, lihat Replikasi multitopik menggunakan pemetaan objek.

Pengaturan titik akhir Apache Kafka

Anda dapat menentukan detail koneksi melalui pengaturan titik akhir di AWS DMS konsol, atau --kafka-settings opsi di CLI. Persyaratan untuk setiap pengaturan:

  • Broker - Tentukan lokasi dari satu atau lebih broker di klaster Kafka Anda dalam bentuk daftar yang dipisahkan koma untuk masing-masing broker-hostname:port. Contohnya "ec2-12-345-678-901.compute-1.amazonaws.com:2345,ec2-10-987-654-321.compute-1.amazonaws.com:9876". Pengaturan ini dapat menentukan lokasi dari salah satu atau semua broker dalam klaster. Semua Klaster broker berkomunikasi untuk menangani partisi catatan data yang dimigrasikan ke topik.

  • Topic - (Opsional) Tentukan nama topik dengan panjang maksimum 255 huruf dan simbol. Anda dapat menggunakan titik (.), garis bawah (_), dan minus (-). Nama topik dengan titik (.) atau garis bawah (_) dapat bertabrakan dalam struktur data internal. Gunakan salah satu, tetapi jangan kedua simbol ini sekaligus dalam nama topik. Jika Anda tidak menentukan nama topik, AWS DMS gunakan "kafka-default-topic" sebagai topik migrasi.

    catatan

    Untuk AWS DMS membuat topik migrasi yang Anda tentukan atau topik default, tetapkan auto.create.topics.enable = true sebagai bagian dari konfigurasi cluster Kafka Anda. Untuk informasi lebih lanjut, lihat Keterbatasan saat menggunakan Apache Kafka sebagai target AWS Database Migration Service

  • MessageFormat - Format keluaran untuk catatan yang dibuat pada titik akhir. Format pesan adalah JSON (default) atau JSON_UNFORMATTED (satu baris tanpa tab).

  • MessageMaxBytes - Ukuran maksimum dalam byte untuk catatan yang dibuat pada titik akhir. Default adalah 1.000.000.

    catatan

    Anda hanya dapat menggunakan AWS CLI/SDK untuk mengubah MessageMaxBytes ke nilai non-default. Misalnya, untuk memodifikasi titik akhir Kafka yang sudah ada dan mengubah MessageMaxBytes, gunakan perintah berikut.

    aws dms modify-endpoint --endpoint-arn your-endpoint --kafka-settings Broker="broker1-server:broker1-port,broker2-server:broker2-port,...", Topic=topic-name,MessageMaxBytes=integer-of-max-message-size-in-bytes
  • IncludeTransactionDetails - Menyediakan informasi transaksi detail dari basis data sumber. Informasi ini mencakup stempel waktu melakukan, posisi log, dan nilai-nilai untuk transaction_id, previous_transaction_id, dan transaction_record_id (catatan offset dalam transaksi). Default adalah false.

  • IncludePartitionValue - Menampilkan nilai partisi dalam pesan keluaran Kafka, kecuali jika tipe partisi adalah schema-table-type. Default adalah false.

  • PartitionIncludeSchemaTable - Skema prefiks dan nama tabel untuk nilai partisi, ketika tipe partisi primary-key-type. Melakukan hal ini meningkatkan distribusi data antara partisi Kafka. Sebagai contoh, anggaplah bahwa skema SysBench memiliki ribuan tabel dan setiap tabel hanya memiliki rentang terbatas untuk kunci primer. Dalam kasus ini, kunci primer yang sama dikirim dari ribuan tabel ke partisi yang sama, sehingga menyebabkan throttling. Default adalah false.

  • IncludeTableAlterOperations - Termasuk setiap operasi bahasa definisi data (DDL) yang mengubah tabel dalam data kontrol, seperti rename-table, drop-table, add-column, drop-column, dan rename-column. Default adalah false.

  • IncludeControlDetails - Menunjukkan informasi kontrol detail untuk definisi tabel, definisi kolom, dan perubahan tabel dan kolom pada keluaran pesan Kafka. Default adalah false.

  • IncludeNullAndEmpty - Sertakan NULL dan kolom kosong dalam target. Default adalah false.

  • SecurityProtocol - Menyetel koneksi yang aman ke titik akhir target Kafka menggunakan Keamanan Lapisan Pengangkutan (TLS). Pilihan termasuk ssl-authentication, ssl-encryption, dan sasl-ssl. Menggunakan sasl-ssl membutuhkan SaslUsername dan SaslPassword.

  • SslEndpointIdentificationAlgorithm— Menetapkan verifikasi nama host untuk sertifikat. Pengaturan ini didukung di AWS DMS versi 3.5.1 dan yang lebih baru. Opsi mencakup hal berikut:

    • NONE: Nonaktifkan verifikasi nama host broker dalam koneksi klien.

    • HTTPS: Aktifkan verifikasi nama host broker dalam koneksi klien.

Anda dapat menggunakan pengaturan untuk membantu meningkatkan kecepatan transfer Anda. Untuk melakukannya, AWS DMS mendukung beban penuh multithread klaster target Apache Kafka. AWS DMS mendukung multithreading dengan pengaturan tugas yang meliputi hal berikut ini:

  • MaxFullLoadSubTasks— Gunakan opsi ini untuk menunjukkan jumlah maksimum tabel sumber untuk dimuat secara paralel. AWS DMS memuat setiap tabel ke dalam tabel target Kafka yang sesuai menggunakan subtugas khusus. Default adalah 8; nilai maksimum adalah 49.

  • ParallelLoadThreads— Gunakan opsi ini untuk menentukan jumlah utas yang AWS DMS digunakan untuk memuat setiap tabel ke dalam tabel target Kafka. Nilai maksimum untuk target Apache Kafka adalah 32. Anda dapat meminta untuk meningkatkan batas maksimum ini.

  • ParallelLoadBufferSize - Gunakan pilihan ini untuk menentukan jumlah maksimum catatan untuk disimpan di buffer yang digunakan oleh thread beban paralel untuk memuat data ke target Kafka. Nilai default adalah 50. Nilai maksimumnya adalah 1.000. Gunakan pengaturan ini dengan ParallelLoadThreads. ParallelLoadBufferSize hanya berlaku bila ada lebih dari satu thread.

  • ParallelLoadQueuesPerThread - Gunakan pilihan ini untuk menentukan jumlah antrean yang diakses setiap thread secara bersamaan untuk mengambil catatan data dari antrean dan menghasilkan beban batch untuk target. Default adalah 1. Maksimum adalah 512.

Anda dapat meningkatkan kinerja pengambilan data perubahan (CDC) untuk titik akhir Kafka dengan menyetel pengaturan tugas untuk thread paralel dan operasi massal. Untuk melakukan ini, Anda dapat menentukan jumlah thread yang terjadi bersamaan, antrean per thread, dan jumlah catatan yang disimpan dalam buffer menggunakan pengaturan tugas ParallelApply*. Misalnya, Anda ingin melakukan beban CDC dan menerapkan 128 thread secara paralel. Anda juga ingin mengakses 64 antrean per thread, dengan 50 catatan disimpan per buffer.

Untuk mempromosikan kinerja CDC, AWS DMS mendukung pengaturan tugas ini:

  • ParallelApplyThreads— Menentukan jumlah thread bersamaan yang AWS DMS digunakan selama beban CDC untuk mendorong catatan data ke titik akhir target Kafka. Nilai default adalah nol (0) dan nilai maksimum adalah 32.

  • ParallelApplyBufferSize - Menentukan jumlah maksimum catatan yang disimpan di setiap antrean buffer untuk thread serentak untuk mendorong ke titik akhir target Kinesis selama beban CDC. Nilai default adalah 100 dan nilai maksimum adalah 1.000. Gunakan pilihan ini saat ParallelApplyThreads menentukan lebih dari satu thread.

  • ParallelApplyQueuesPerThread - Menentukan jumlah antrean yang diakses oleh setiap thread untuk mengambil catatan data dari antrean dan menghasilkan beban batch untuk titik akhir Kafka selama CDC. Default-nya adalah 1. Maksimum adalah 512.

Saat menggunakan pengaturan tugas ParallelApply*, default partition-key-type-nya adalah primary-key dari tabel, bukan schema-name.table-name.

Menghubungkan ke Kafka menggunakan Keamanan Lapisan Pengangkutan (TLS)

Sebuah klaster Kafka menerima koneksi aman menggunakan Keamanan Lapisan Pengangkutan (TLS). Dengan DMS, Anda dapat menggunakan salah satu dari tiga pilihan protokol keamanan berikut untuk mengamankan koneksi titik akhir Kafka.

Enkripsi SSL (server-encryption)

Klien memvalidasi identitas server melalui sertifikat server. Kemudian koneksi terenkripsi dibuat antara server dan klien.

Autentikasi SSL (mutual-authentication)

Server dan klien memvalidasi identitas dengan satu sama lain melalui sertifikat mereka sendiri. Kemudian koneksi terenkripsi dibuat antara server dan klien.

SASL-SSL (mutual-authentication)

Metode The Simple Authentication and Security Layer (SASL) menggantikan sertifikat klien dengan nama pengguna dan kata sandi untuk memvalidasi identitas klien. Secara spesifik, Anda memberikan nama pengguna dan kata sandi yang telah didaftarkan ke server sehingga server dapat memvalidasi identitas klien. Kemudian koneksi terenkripsi dibuat antara server dan klien.

penting

Apache Kafka dan Amazon MSK menerima sertifikat diselesaikan. Ini adalah keterbatasan yang sudah diketahui dari Kafka dan Amazon MSK yang harus ditangani. Untuk informasi lebih lanjut, lihat Masalah Apache Kafka, KAFKA-3700.

Jika Anda menggunakan Amazon MSK, pertimbangkan untuk menggunakan daftar kontrol akses (ACL) sebagai solusi untuk keterbatasan ini. Untuk informasi lebih lanjut tentang penggunaan ACL, lihat Apache Kafka ACL dalam Panduan Developer Amazon Managed Streaming for Apache Kafka.

Jika Anda menggunakan klaster Kafka yang dikelola sendiri, lihat Komentar tanggal 21/Oct/18 untuk informasi tentang mengonfigurasi klaster Anda.

Menggunakan enkripsi SSL dengan Amazon MSK atau klaster Kafka yang dikelola sendiri

Anda dapat menggunakan enkripsi SSL untuk mengamankan koneksi titik akhir ke Amazon MSK atau klaster Kafka yang dikelola sendiri. Saat Anda menggunakan metode autentikasi enkripsi SSL, klien memvalidasi identitas server melalui sertifikat server. Kemudian koneksi terenkripsi dibuat antara server dan klien.

Menggunakan enkripsi SSL untuk terhubung ke Amazon MSK
  • Tetapkan pengaturan titik akhir protokol keamanan (SecurityProtocol) menggunakan pilihan ssl-encryption ketika membuat titik akhir target Kafka Anda.

    Contoh JSON berikut menetapkan protokol keamanan sebagai enkripsi SSL.

"KafkaSettings": { "SecurityProtocol": "ssl-encryption", }
Untuk menggunakan enkripsi SSL sebagai klaster Kafka yang dikelola sendiri
  1. Jika Anda menggunakan Otoritas Sertifikasi (CA) pribadi di klaster Kafka on-premise Anda, unggahlah sertifikat CA pribadi dan dapatkan Amazon Resource Name (ARN).

  2. Tetapkan pengaturan titik akhir protokol keamanan (SecurityProtocol) menggunakan pilihan ssl-encryption ketika membuat titik akhir target Kafka Anda. Contoh JSON berikut menetapkan protokol keamanan sebagai ssl-encryption.

    "KafkaSettings": { "SecurityProtocol": "ssl-encryption", }
  3. Jika Anda menggunakan CA pribadi, atur SslCaCertificateArn di ARN yang Anda dapatkan di langkah pertama di atas.

Menggunakan autentikasi SSL

Anda dapat menggunakan autentikasi SSL untuk mengamankan koneksi titik akhir ke Amazon MSK atau klaster Kafka yang dikelola sendiri.

Untuk mengaktifkan autentikasi klien dan enkripsi menggunakan otentikasi SSL untuk menyambung ke Amazon MSK, lakukan hal berikut:

  • Siapkan kunci privat dan sertifikat publik untuk Kafka.

  • Unggah sertifikat ke DMS Certificate Manager .

  • Buat target titik akhir Kafka dengan sertifikat ARN yang sesuai yang ditentukan dalam pengaturan titik akhir Kafka.

Untuk mempersiapkan kunci privat dan sertifikat publik untuk Amazon MSK
  1. Membuat instans EC2 dan atur klien untuk menggunakan autentikasi seperti yang dijelaskan dalam langkah 1 sampai 9 di Autentikasi klien Bagian dari Panduan Developer Amazon Managed Streaming for Apache Kafka.

    Setelah Anda menyelesaikan langkah-langkah tersebut, Anda memiliki sertifikat-ARN (sertifikat publik ARN disimpan di ACM), dan kunci privat yang terdapat dalam file kafka.client.keystore.jks.

  2. Dapatkan sertifikat publik dan salin sertifikat ke file signed-certificate-from-acm.pem, menggunakan perintah berikut:

    aws acm-pca get-certificate --certificate-authority-arn Private_CA_ARN --certificate-arn Certificate_ARN

    Perintah itu mengembalikan informasi yang serupa dengan contoh berikut:

    {"Certificate": "123", "CertificateChain": "456"}

    Anda kemudian menyalin yang setara dengan "123" ke file signed-certificate-from-acm.pem.

  3. Dapatkan kunci privat dengan mengimpor kunci msk-rsa dari kafka.client.keystore.jks to keystore.p12, seperti yang ditunjukkan dalam contoh berikut.

    keytool -importkeystore \ -srckeystore kafka.client.keystore.jks \ -destkeystore keystore.p12 \ -deststoretype PKCS12 \ -srcalias msk-rsa-client \ -deststorepass test1234 \ -destkeypass test1234
  4. Gunakan perintah berikut untuk mengekspor keystore.p12 ke format .pem.

    Openssl pkcs12 -in keystore.p12 -out encrypted-private-client-key.pem –nocerts

    Pesan Masukkan frase PEM pass muncul dan mengidentifikasi kunci yang diterapkan untuk mengenkripsi sertifikat.

  5. Hapus atribut tas dan atribut kunci dari file .pem untuk memastikan bahwa baris pertama dimulai dengan string berikut.

    ---BEGIN ENCRYPTED PRIVATE KEY---
Untuk mengunggah sertifikat publik dan kunci privat ke DMS certificate manager dan menguji sambungan ke Amazon MSK
  1. Unggah ke DMS Certificate Manager menggunakan perintah berikut.

    aws dms import-certificate --certificate-identifier signed-cert --certificate-pem file://path to signed cert aws dms import-certificate --certificate-identifier private-key —certificate-pem file://path to private key
  2. Buat titik akhit target Amazon MSK dan uji koneksi untuk memastikan bahwa autentikasi TLS berfungsi.

    aws dms create-endpoint --endpoint-identifier $endpoint-identifier --engine-name kafka --endpoint-type target --kafka-settings '{"Broker": "b-0.kafka260.aaaaa1.a99.kafka.us-east-1.amazonaws.com:0000", "SecurityProtocol":"ssl-authentication", "SslClientCertificateArn": "arn:aws:dms:us-east-1:012346789012:cert:", "SslClientKeyArn": "arn:aws:dms:us-east-1:0123456789012:cert:","SslClientKeyPassword":"test1234"}' aws dms test-connection -replication-instance-arn=$rep_inst_arn —endpoint-arn=$kafka_tar_arn_msk
penting

Anda dapat menggunakan autentikasi SSL untuk mengamankan koneksi ke klaster Kafka yang dikelola sendiri. Dalam beberapa kasus, Anda mungkin menggunakan Otoritas Sertifikasi (CA) pribadi di kluster Kafka on-premise Anda. Jika demikian, unggah rantai CA Anda, sertifikat publik, dan kunci privat ke DMS certificate manager. Kemudian, gunakan Amazon Resource Name (ARN) yang sesuai dalam pengaturan titik akhir Anda ketika Anda membuat titik akhir target Kafka on-premise.

Untuk mempersiapkan kunci privat dan sertifikat yang sudah ditandatangani untuk klaster Kafka yang dikelola sendiri
  1. Buat pasangan kunci seperti yang ditunjukkan pada contoh berikut.

    keytool -genkey -keystore kafka.server.keystore.jks -validity 300 -storepass your-keystore-password -keypass your-key-passphrase -dname "CN=your-cn-name" -alias alias-of-key-pair -storetype pkcs12 -keyalg RSA
  2. Buat sertifikat tanda permintaan (CSR).

    keytool -keystore kafka.server.keystore.jks -certreq -file server-cert-sign-request-rsa -alias on-premise-rsa -storepass your-key-store-password -keypass your-key-password
  3. Gunakan CA di trusstore klaster Anda untuk menandatangani CSR. Jika Anda tidak memiliki CA, Anda dapat membuat CA pribadi Anda sendiri.

    openssl req -new -x509 -keyout ca-key -out ca-cert -days validate-days
  4. Impor ca-cert ke dalam server truststore dan keystore. Jika Anda tidak memiliki truststore, gunakan perintah berikut untuk membuat truststore dan impor ca-cert ke dalamnya.

    keytool -keystore kafka.server.truststore.jks -alias CARoot -import -file ca-cert keytool -keystore kafka.server.keystore.jks -alias CARoot -import -file ca-cert
  5. Tanda tangani sertifikat.

    openssl x509 -req -CA ca-cert -CAkey ca-key -in server-cert-sign-request-rsa -out signed-server-certificate.pem -days validate-days -CAcreateserial -passin pass:ca-password
  6. Impor sertifikat yang sudah ditandatangani ke keystore.

    keytool -keystore kafka.server.keystore.jks -import -file signed-certificate.pem -alias on-premise-rsa -storepass your-keystore-password -keypass your-key-password
  7. Gunakan perintah berikut untuk mengimpor on-premise-rsa kunci dari kafka.server.keystore.jks ke keystore.p12.

    keytool -importkeystore \ -srckeystore kafka.server.keystore.jks \ -destkeystore keystore.p12 \ -deststoretype PKCS12 \ -srcalias on-premise-rsa \ -deststorepass your-truststore-password \ -destkeypass your-key-password
  8. Gunakan perintah berikut untuk mengekspor keystore.p12 ke format .pem.

    Openssl pkcs12 -in keystore.p12 -out encrypted-private-server-key.pem –nocerts
  9. Unggah encrypted-private-server-key.pem, signed-certificate.pem, dan ca-cert untuk DMS certificate manager.

  10. Buat titik akhir dengan menggunakan ARN yang dikembalikan.

    aws dms create-endpoint --endpoint-identifier $endpoint-identifier --engine-name kafka --endpoint-type target --kafka-settings '{"Broker": "b-0.kafka260.aaaaa1.a99.kafka.us-east-1.amazonaws.com:9092", "SecurityProtocol":"ssl-authentication", "SslClientCertificateArn": "your-client-cert-arn","SslClientKeyArn": "your-client-key-arn","SslClientKeyPassword":"your-client-key-password", "SslCaCertificateArn": "your-ca-certificate-arn"}' aws dms test-connection -replication-instance-arn=$rep_inst_arn —endpoint-arn=$kafka_tar_arn_msk

Menggunakan autentikasi SASL-SSL untuk terhubung ke Amazon MSK

Metode The Simple Authentication and Security Layer (SASL) menggunakan nama pengguna dan kata sandi untuk memvalidasi identitas klien, dan membuat koneksi terenkripsi antara server dan klien.

Untuk menggunakan SASL, pertama-tama buat nama pengguna dan kata sandi yang aman ketika mengatur Amazon MSK klaster Anda. Untuk deskripsi cara menyiapkan nama pengguna dan kata sandi yang aman untuk klaster Amazon MSK, lihat Menyiapkan autentikasi SASL/SCRAM untuk klaster Amazon MSK di Panduan Developer Amazon Managed Streaming for Apache Kafka.

Kemudian, ketika Anda membuat titik akhir target Kafka, setel pengaturan titik akhir protokol keamanan (SecurityProtocol) menggunakan pilihan sasl-ssl. Anda juga mengatur pilihan SaslUsername dan SaslPassword. Pastikan semua ini konsisten dengan nama pengguna dan kata sandi aman yang Anda buat ketika pertama kali menyiapkan klaster Amazon MSK Anda, seperti yang ditunjukkan dalam contoh JSON berikut.

"KafkaSettings": { "SecurityProtocol": "sasl-ssl", "SaslUsername":"Amazon MSK cluster secure user name", "SaslPassword":"Amazon MSK cluster secure password" }
catatan
  • Saat ini, hanya AWS DMS mendukung SASL-SSL yang didukung CA publik. DMS tidak mendukung SASL-SSL untuk digunakan dengan Kafka yang dikelola sendiri yang didukung oleh CA pribadi.

  • Untuk otentikasi SASL-SSL, AWS DMS mendukung mekanisme SCRAM-SHA-512 secara default. AWS DMS versi 3.5.0 dan yang lebih tinggi juga mendukung mekanisme Plain. Untuk mendukung mekanisme Plain, atur SaslMechanism parameter tipe data KafkaSettings API kePLAIN.

Menggunakan citra sebelumnya untuk melihat nilai-nilai asli dari baris CDC untuk Apache Kafka sebagai target

Ketika menulis pembaruan CDC ke target data-streaming seperti Kafka Anda dapat melihat nilai asli baris basis data sumber sebelum diubah oleh pembaruan.. Untuk memungkinkan hal ini, AWS DMS mengisi gambar sebelum peristiwa pembaruan berdasarkan data yang disediakan oleh mesin database sumber.

Mesin basis data sumber yang berbeda memberikan jumlah informasi yang berbeda untuk citra sebelum:

  • Oracle menyediakan update untuk kolom hanya jika mereka berubah.

  • PostgreSQL menyediakan data hanya untuk kolom yang merupakan bagian dari kunci primer (berubah atau tidak). Jika replikasi logis sedang digunakan dan REPLICA IDENTITY FULL diatur untuk tabel sumber, Anda bisa mendapatkan seluruh informasi sebelum dan sesudah pada baris yang ditulis ke WALS dan tersedia di sini.

  • MySQL umumnya menyediakan data untuk semua kolom (berubah atau tidak).

Untuk mengaktifkan pencitraan sebelumnya dengan tujuan menambahkan nilai asli dari basis data sumber untuk keluaran AWS DMS , gunakan pengaturan tugas BeforeImageSettings atau parameter add-before-image-columns. Parameter ini menerapkan aturan transformasi kolom.

BeforeImageSettings menambahkan atribut JSON baru untuk setiap operasi pembaruan dengan nilai yang dikumpulkan dari sistem basis data sumber, seperti yang ditunjukkan berikut.

"BeforeImageSettings": { "EnableBeforeImage": boolean, "FieldName": string, "ColumnFilter": pk-only (default) / non-lob / all (but only one) }
catatan

Terapkan BeforeImageSettings pada beban penuh ditambah tugas CDC (yang memigrasikan data yang ada dan mereplikasi perubahan yang sedang berlangsung), atau ke tugas CDC saja (yang hanya mereplikasi perubahan data). Jangan terapkan BeforeImageSettings pada tugas-tugas yang beban penuh saja.

Untuk pilihan BeforeImageSettings, hal berikut berlaku:

  • Atur pilihan EnableBeforeImage ke true untuk mengaktifkan pencitraan sebelum. Default adalah false.

  • Gunakan pilihan FieldName untuk memberikan nama ke atribut JSON baru. Ketika EnableBeforeImage adalah true, FieldName diperlukan dan tidak dapat kosong.

  • Pilihan ColumnFilter menentukan kolom untuk menambahkan dengan menggunakan pencitraan sebelumnya. Untuk menambahkan kolom yang hanya merupakan bagian dari kunci primer tabel saja, gunakan nilai default, pk-only. Untuk menambahkan hanya kolom yang bukan tipe LOB, gunakan non-lob. Untuk menambahkan kolom apa pun yang memiliki nilai citra sebelum, gunakan all.

    "BeforeImageSettings": { "EnableBeforeImage": true, "FieldName": "before-image", "ColumnFilter": "pk-only" }

Menggunakan aturan transformasi citra sebelumnya

Sebagai alternatif untuk pengaturan tugas, Anda dapat menggunakan parameter add-before-image-columns, yang menerapkan aturan transformasi kolom. Dengan parameter ini, Anda dapat mengaktifkan pencitraan sebelumnya selama CDC, pada target data streaming seperti Kinesis.

Dengan menggunakan add-before-image-columns dalam aturan transformasi, Anda dapat menerapkan kontrol yang lebih halus dari hasil citra sebelumnya. Aturan transformasi memungkinkan Anda untuk menggunakan pencari objek yang memberikan Anda kontrol atas tabel yang dipilih untuk aturan tersebut. Dan juga, Anda dapat merangkai aturan transformasi, yang memungkinkan aturan yang berbeda untuk diterapkan ke tabel yang berbeda. Anda kemudian dapat memanipulasi kolom yang dihasilkan dengan menggunakan aturan lain.

catatan

Jangan gunakan parameter add-before-image-columns bersamaan dengan pengaturan tugas BeforeImageSettings dalam tugas yang sama. Sebaliknya, gunakan salah satu saja, parameter atau pengaturan, tetapi tidak keduanya, untuk satu tugas.

Sebuah tipe peraturan transformation dengan parameter add-before-image-columns untuk kolom harus menyediakan before-image-def bagian. Berikut ini adalah sebuah contoh.

{ "rule-type": "transformation", … "rule-target": "column", "rule-action": "add-before-image-columns", "before-image-def":{ "column-filter": one-of (pk-only / non-lob / all), "column-prefix": string, "column-suffix": string, } }

Nilai dari column-prefix ditambahkan sebelum nama kolom, dan nilai default column-prefix adalah BI_. Nilai dari column-suffix ditambahkan sebelum nama kolom, dan default kosong. Jangan atur column-prefix dan column-suffix ke string kosong.

Pilih satu nilai untuk column-filter. Untuk menambahkan kolom yang merupakan bagian dari kunci primer tabel saja, pilih pk-only. Untuk menambahkan kolom yang bukan tipe LOB saja, gunakan non-lob. Atau pilih all untuk menambahkan kolom yang memiliki nilai citra-sebelum.

Contoh untuk aturan transformasi citra sebelumnya

Aturan transformasi dalam contoh berikut menambahkan kolom baru yang disebut BI_emp_no pada target. Jadi pernyataan seperti UPDATE employees SET emp_no = 3 WHERE emp_no = 1; mengisi BI_emp_no field dengan 1. Ketika Anda menulis pembaruan CDC untuk target Amazon S3, kolom BI_emp_no memungkinkan untuk memberitahu baris asli yang diperbarui.

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "object-locator": { "schema-name": "%", "table-name": "%" }, "rule-action": "include" }, { "rule-type": "transformation", "rule-id": "2", "rule-name": "2", "rule-target": "column", "object-locator": { "schema-name": "%", "table-name": "employees" }, "rule-action": "add-before-image-columns", "before-image-def": { "column-prefix": "BI_", "column-suffix": "", "column-filter": "pk-only" } } ] }

Untuk informasi tentang penggunaan peraturan tindakan add-before-image-columns, lihat Aturan dan tindakan transformasi.

Keterbatasan saat menggunakan Apache Kafka sebagai target AWS Database Migration Service

Batasan berikut berlaku saat menggunakan Apache Kafka sebagai target:

  • AWS DMS Titik akhir target Kafka tidak mendukung kontrol akses IAM untuk Amazon Managed Streaming for Apache Kafka (Amazon MSK).

  • Mode LOB penuh tidak didukung.

  • Tentukan file konfigurasi Kafka untuk klaster Anda dengan properti yang memungkinkan AWS DMS untuk membuat topik baru secara otomatis. Sertakan pengaturan, auto.create.topics.enable = true. Jika menggunakan Amazon MSK, Anda dapat menentukan konfigurasi default ketika membuat klaster Kafka Anda, kemudian mengubah pengaturan auto.create.topics.enable ke true. Untuk informasi lebih lanjut tentang pengaturan konfigurasi default, lihat Konfigurasi Amazon MSK default di Panduan Developer Amazon Managed Streaming for Apache Kafka. Jika Anda perlu memodifikasi cluster Kafka yang ada yang dibuat menggunakan Amazon MSK, jalankan AWS CLI perintah aws kafka create-configuration untuk memperbarui konfigurasi Kafka Anda, seperti pada contoh berikut:

    14:38:41 $ aws kafka create-configuration --name "kafka-configuration" --kafka-versions "2.2.1" --server-properties file://~/kafka_configuration { "LatestRevision": { "Revision": 1, "CreationTime": "2019-09-06T14:39:37.708Z" }, "CreationTime": "2019-09-06T14:39:37.708Z", "Name": "kafka-configuration", "Arn": "arn:aws:kafka:us-east-1:111122223333:configuration/kafka-configuration/7e008070-6a08-445f-9fe5-36ccf630ecfd-3" }

    Di sini, //~/kafka_configuration adalah file konfigurasi yang telah Anda buat dengan pengaturan properti yang diperlukan.

    Jika Anda menggunakan instans Kafka Anda sendiri yang diinstal di Amazon EC2, ubah konfigurasi cluster Kafka dengan auto.create.topics.enable = true pengaturan untuk AWS DMS memungkinkan Anda membuat topik baru secara otomatis, menggunakan opsi yang disediakan bersama instans Anda.

  • AWS DMS menerbitkan setiap pembaruan ke satu catatan dalam database sumber sebagai satu catatan data (pesan) dalam topik Kafka tertentu terlepas dari transaksi.

  • AWS DMS mendukung dua bentuk berikut untuk kunci partisi:

    • SchemaName.TableName: Kombinasi skema dan nama tabel.

    • ${AttributeName}: Nilai salah satu field di JSON, atau kunci primer dari tabel dalam basis data sumber.

  • BatchApply Tidak didukung untuk titik akhir Kafka. Menggunakan penerapan Batch (misalnya, pengaturan tugas metadata target BatchApplyEnabled) untuk target Kafka dapat mengakibatkan hilangnya data.

  • AWS DMS tidak mendukung migrasi nilai tipe BigInt data dengan lebih dari 16 digit. Untuk mengatasi batasan ini, Anda dapat menggunakan aturan transformasi berikut untuk mengonversi BigInt kolom menjadi string. Untuk informasi selengkapnya tentang aturan transformasi, lihat Aturan dan tindakan transformasi.

    { "rule-type": "transformation", "rule-id": "id", "rule-name": "name", "rule-target": "column", "object-locator": { "schema-name": "valid object-mapping rule action", "table-name": "", "column-name": "" }, "rule-action": "change-data-type", "data-type": { "type": "string", "length": 20 } }

Menggunakan pemetaan objek untuk bermigrasi data ke topik Kafka

AWS DMS menggunakan aturan pemetaan tabel untuk memetakan data dari sumber ke topik Kafka target. Untuk memetakan data ke topik target, Anda menggunakan jenis aturan pemetaan tabel yang disebut pemetaan objek. Anda menggunakan pemetaan objek untuk menentukan cara pencatatan data di peta sumber ke catatan data yang dipublikasikan ke topik Kafka.

topik Kafka tidak memiliki struktur preset selain memiliki kunci partisi.

catatan

Anda tidak perlu menggunakan pemetaan objek. Anda dapat menggunakan pemetaan tabel biasa untuk berbagai transformasi. Namun, jenis kunci partisi akan mengikuti perilaku default ini:

  • Primary Key digunakan sebagai kunci partisi untuk Full Load.

  • Jika tidak ada pengaturan tugas paralle-apply yang digunakan, schema.table digunakan sebagai kunci partisi untuk CDC.

  • Jika pengaturan tugas penerapan paralel digunakan, kunci utama digunakan sebagai kunci partisi untuk CDC.

Untuk membuat aturan pemetaan objek, tetapkan rule-type sebagai object-mapping. Aturan ini menentukan jenis pemetaan objek yang ingin Anda gunakan.

Struktur untuk aturan tersebut adalah sebagai berikut.

{ "rules": [ { "rule-type": "object-mapping", "rule-id": "id", "rule-name": "name", "rule-action": "valid object-mapping rule action", "object-locator": { "schema-name": "case-sensitive schema name", "table-name": "" } } ] }

AWS DMS saat ini mendukung map-record-to-record dan map-record-to-document sebagai satu-satunya nilai yang valid untuk rule-action parameter. Pengaturan ini memengaruhi nilai yang tidak dikecualikan sebagai bagian dari daftar exclude-columns atribut. map-record-to-documentNilai map-record-to-record dan menentukan bagaimana AWS DMS menangani catatan ini secara default. Nilai-nilai ini tidak mempengaruhi pemetaan atribut dengan cara apapun.

Gunakan map-record-to-record ketika bermigrasi dari basis data relasional ke sebuah topik Kafka. Jenis aturan ini menggunakan nilai taskResourceId.schemaName.tableName dari basis data relasional sebagai kunci partisi dalam topik Kafka dan membuat atribut untuk masing-masing kolom dalam basis data sumber.

Saat menggunakanmap-record-to-record, perhatikan hal berikut:

  • Pengaturan ini hanya memengaruhi kolom yang dikecualikan oleh exclude-columns daftar.

  • Untuk setiap kolom tersebut, AWS DMS buat atribut yang sesuai dalam topik target.

  • AWS DMS menciptakan atribut yang sesuai ini terlepas dari apakah kolom sumber digunakan dalam pemetaan atribut.

Salah satu cara untuk memahami map-record-to-record adalah melihatnya beraksi. Untuk contoh ini, anggaplah bahwa Anda memulai dengan baris tabel basis data relasional dengan struktur dan data berikut.

FirstName LastName StoreId HomeAddress HomePhone WorkAddress WorkPhone DateofBirth

Randy

Marsh

5

221B Baker Street

1234567890

31 Spooner Street, Quahog

9876543210

02/29/1988

Untuk memigrasi informasi ini dari skema bernama Test ke topik Kafka, buatlah aturan untuk memetakan data ke topik target. Aturan berikut menggambarkan pemetaan.

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "rule-action": "include", "object-locator": { "schema-name": "Test", "table-name": "%" } }, { "rule-type": "object-mapping", "rule-id": "2", "rule-name": "DefaultMapToKafka", "rule-action": "map-record-to-record", "object-locator": { "schema-name": "Test", "table-name": "Customers" } } ] }

Mengingat topik Kafka dan kunci partisi (dalam hal ini, taskResourceId.schemaName.tableName), berikut ini menggambarkan format catatan yang dihasilkan menggunakan data sampel kami di topik target Kafka:

{ "FirstName": "Randy", "LastName": "Marsh", "StoreId": "5", "HomeAddress": "221B Baker Street", "HomePhone": "1234567890", "WorkAddress": "31 Spooner Street, Quahog", "WorkPhone": "9876543210", "DateOfBirth": "02/29/1988" }

Restrukturisasi data dengan pemetaan atribut

Anda dapat merestrukturisasi data saat migrasi ke topik kafka berlangsung menggunakan peta atribut. Misalnya, Anda mungkin ingin menggabungkan beberapa field di sumber menjadi field tunggal dalam target. Peta atribut berikut menggambarkan bagaimana merestrukturisasi data.

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "rule-action": "include", "object-locator": { "schema-name": "Test", "table-name": "%" } }, { "rule-type": "object-mapping", "rule-id": "2", "rule-name": "TransformToKafka", "rule-action": "map-record-to-record", "target-table-name": "CustomerData", "object-locator": { "schema-name": "Test", "table-name": "Customers" }, "mapping-parameters": { "partition-key-type": "attribute-name", "partition-key-name": "CustomerName", "exclude-columns": [ "firstname", "lastname", "homeaddress", "homephone", "workaddress", "workphone" ], "attribute-mappings": [ { "target-attribute-name": "CustomerName", "attribute-type": "scalar", "attribute-sub-type": "string", "value": "${lastname}, ${firstname}" }, { "target-attribute-name": "ContactDetails", "attribute-type": "document", "attribute-sub-type": "json", "value": { "Home": { "Address": "${homeaddress}", "Phone": "${homephone}" }, "Work": { "Address": "${workaddress}", "Phone": "${workphone}" } } } ] } } ] }

Untuk menetapkan nilai konstan untuk partition-key, tentukan nilai partition-key. Sebagai contoh, Anda mungkin melakukan ini untuk memaksa semua data untuk disimpan dalam partisi tunggal. Pemetaan berikut menggambarkan pendekatan ini.

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "object-locator": { "schema-name": "Test", "table-name": "%" }, "rule-action": "include" }, { "rule-type": "object-mapping", "rule-id": "1", "rule-name": "TransformToKafka", "rule-action": "map-record-to-document", "object-locator": { "schema-name": "Test", "table-name": "Customer" }, "mapping-parameters": { "partition-key": { "value": "ConstantPartitionKey" }, "exclude-columns": [ "FirstName", "LastName", "HomeAddress", "HomePhone", "WorkAddress", "WorkPhone" ], "attribute-mappings": [ { "attribute-name": "CustomerName", "value": "${FirstName},${LastName}" }, { "attribute-name": "ContactDetails", "value": { "Home": { "Address": "${HomeAddress}", "Phone": "${HomePhone}" }, "Work": { "Address": "${WorkAddress}", "Phone": "${WorkPhone}" } } }, { "attribute-name": "DateOfBirth", "value": "${DateOfBirth}" } ] } } ] }
catatan

Nilai partition-key untuk catatan kontrol yang ditujukan khusus untuk tabel tertentu adalah TaskId.SchemaName.TableName. Nilai partition-key untuk catatan kontrol yang ditujukan untuk tugas tertentu adalah TaskId dari catatan tersebut. Menentukan nilai partition-key dalam pemetaan objek tidak berdampak pada partition-key untuk catatan kontrol.

Replikasi multitopik menggunakan pemetaan objek

Secara default, AWS DMS tugas memigrasikan semua data sumber ke salah satu topik Kafka berikut:

  • Seperti yang ditentukan dalam bidang Topik dari titik akhir AWS DMS target.

  • Seperti yang ditentukan oleh kafka-default-topic jika bidang Topik dari titik akhir target tidak diisi dan auto.create.topics.enable pengaturan Kafka diatur ke. true

Dengan versi AWS DMS engine 3.4.6 dan yang lebih tinggi, Anda dapat menggunakan kafka-target-topic atribut untuk memetakan setiap tabel sumber yang dimigrasi ke topik terpisah. Misalnya, aturan pemetaan objek berikut memigrasikan tabel sumber Customer dan Address ke topik Kafka customer_topic danaddress_topic, masing-masing. Pada saat yang sama, AWS DMS memigrasikan semua tabel sumber lainnya, termasuk Bills tabel dalam Test skema, ke topik yang ditentukan di titik akhir target.

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "rule-action": "include", "object-locator": { "schema-name": "Test", "table-name": "%" } }, { "rule-type": "object-mapping", "rule-id": "2", "rule-name": "MapToKafka1", "rule-action": "map-record-to-record", "kafka-target-topic": "customer_topic", "object-locator": { "schema-name": "Test", "table-name": "Customer" }, "partition-key": {"value": "ConstantPartitionKey" } }, { "rule-type": "object-mapping", "rule-id": "3", "rule-name": "MapToKafka2", "rule-action": "map-record-to-record", "kafka-target-topic": "address_topic", "object-locator": { "schema-name": "Test", "table-name": "Address" }, "partition-key": {"value": "HomeAddress" } }, { "rule-type": "object-mapping", "rule-id": "4", "rule-name": "DefaultMapToKafka", "rule-action": "map-record-to-record", "object-locator": { "schema-name": "Test", "table-name": "Bills" } } ] }

Dengan menggunakan replikasi multitopik Kafka, Anda dapat mengelompokkan dan memigrasikan tabel sumber untuk memisahkan topik Kafka menggunakan satu tugas replikasi.

Format pesan untuk Apache Kafka

Keluaran JSON hanya berupa daftar pasangan nilai-kunci.

RecordType

Jenis catatan dapat berupa data atau kontrol. Catatan data mewakili baris yang sebenarnya dalam sumber. Kontrol catatan adalah untuk peristiwa-peristiwa penting di aliran, misalnya restart tugas.

Operasi

Untuk catatan data, operasinya dapat load, insert, update, atau delete.

Untuk catatan kontrol, operasi dapat berupacreate-table,rename-table,drop-table,change-columns,add-column,drop-column,rename-column, ataucolumn-type-change.

SchemaName

Skema sumber untuk catatan. Field ini bisa kosong untuk catatan kontrol.

TableName

Tabel sumber untuk catatan. Field ini bisa kosong untuk catatan kontrol.

Stempel waktu

Stempel waktu untuk saat pesan JSON dibuat. Field ini diformat dengan format ISO 8601.

Contoh pesan JSON berikut menggambarkan pesan tipe data dengan semua metadata tambahan.

{ "data":{ "id":100000161, "fname":"val61s", "lname":"val61s", "REGION":"val61s" }, "metadata":{ "timestamp":"2019-10-31T22:53:59.721201Z", "record-type":"data", "operation":"insert", "partition-key-type":"primary-key", "partition-key-value":"sbtest.sbtest_x.100000161", "schema-name":"sbtest", "table-name":"sbtest_x", "transaction-id":9324410911751, "transaction-record-id":1, "prev-transaction-id":9324410910341, "prev-transaction-record-id":10, "commit-timestamp":"2019-10-31T22:53:55.000000Z", "stream-position":"mysql-bin-changelog.002171:36912271:0:36912333:9324410911751:mysql-bin-changelog.002171:36912209" } }

Contoh pesan JSON berikut menggambarkan pesan tipe kontrol.

{ "control":{ "table-def":{ "columns":{ "id":{ "type":"WSTRING", "length":512, "nullable":false }, "fname":{ "type":"WSTRING", "length":255, "nullable":true }, "lname":{ "type":"WSTRING", "length":255, "nullable":true }, "REGION":{ "type":"WSTRING", "length":1000, "nullable":true } }, "primary-key":[ "id" ], "collation-name":"latin1_swedish_ci" } }, "metadata":{ "timestamp":"2019-11-21T19:14:22.223792Z", "record-type":"control", "operation":"create-table", "partition-key-type":"task-id", "schema-name":"sbtest", "table-name":"sbtest_t1" } }