本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
使用 Apache Kafka 作为目标 AWS Database Migration Service
您可以使用将数据迁移 AWS DMS 到 Apache Kafka 集群。Apache Kafka 是一个分布式流式处理平台。您可以使用 Apache Kafka 实时提取和处理流数据。
AWS 还提供适用于 Apache Kafka 的亚马逊托管流媒体(亚马逊 MSK)作为目标。 AWS DMS Amazon MSK 是一个完全托管的 Apache Kafka 流式处理服务,可简化 Apache Kafka 实例的实施和管理。它适用于开源 Apache Kafka 版本,你可以像任何 Apache Kafka 实例一样作为 AWS DMS 目标访问亚马逊 MSK 实例。有关更多信息,请参阅《Amazon Managed Streaming for Apache Kafka 开发人员指南》中的 Amazon MSK 是什么?。
Kafka 集群将记录流存储在称为主题的类别中,这些类别被划分为多个分区。分区是主题中唯一标识的数据记录(消息)序列。分区可分布在集群中的多个代理之间,以支持并行处理某个主题的记录。有关 Apache Kafka 中主题和分区及其分布的更多信息,请参阅主题和日志
您的 Kafka 集群可以是 Amazon MSK 实例、在 Amazon EC2 实例上运行的集群或本地集群。Amazon MSK 实例或 Amazon EC2 实例上的集群可以位于同一 VPC 中,也可以位于不同 VPC 中。如果您的集群位于本地,可以为复制实例使用自己的本地名称服务器以解析集群的主机名。有关在复制实例上设置名称服务器的信息,请参阅 使用您自己的本地名称服务器。有关设置网络的更多信息,请参阅为复制实例设置网络。
使用 Amazon MSK 集群时,请确保其安全组允许从您的复制实例进行访问。有关更改 Amazon MSK 集群的安全组的信息,请参阅更改 Amazon MSK 集群的安全组。
AWS Database Migration Service 使用 JSON 将记录发布到 Kafka 主题。转换期间, AWS DMS 将每个记录从源数据库序列化到 JSON 格式的属性/值对。
要将数据从支持的任何数据源迁移到目标 Kafka 集群,您应使用对象映射。使用对象映射,可确定如何在目标主题中建立数据记录结构。您还可以为每个表定义分区键,Apache Kafka 使用该键将数据分组为分区。
目前, AWS DMS 支持每个任务使用一个主题。对于包含多个表的单个任务,所有消息都将转到同一个主题。每条消息都包含一个标识目标架构和表的元数据部分。 AWS DMS 3.4.6 及更高版本支持使用对象映射进行多主题复制。有关更多信息,请参阅使用对象映射进行多主题复制。
Apache Kafka 端点设置
您可以通过 AWS DMS 控制台中的端点设置或 CLI 中的--kafka-settings
选项来指定连接详细信息。每个设置的要求如下:
-
Broker
– 以各个
的逗号分隔列表的形式指定 Kafka 集群中一个或多个代理的位置。例如,broker-hostname
:port
"ec2-12-345-678-901.compute-1.amazonaws.com:2345,ec2-10-987-654-321.compute-1.amazonaws.com:9876"
。此设置可以指定集群中任何或所有代理的位置。集群代理全部通过通信来处理迁移到主题的数据记录的分区。 -
Topic
–(可选)指定主题名称,最大长度为 255 个字母和符号。您可以使用句点 (.)、下划线 (_) 和减号 (-)。带句点 (.) 或下划线 (_) 的主题名称可能会在内部数据结构中发生冲突。在主题名称中使用其中一个符号,但不要同时使用这两个符号。如果您未指定主题名称,则 AWS DMS 使用"kafka-default-topic"
作为迁移主题。注意
要 AWS DMS 创建您指定的迁移主题或默认主题,请
auto.create.topics.enable = true
将其设置为 Kafka 集群配置的一部分。有关更多信息,请参阅使用 Apache Kafka 作为目标时的限制 AWS Database Migration Service。 MessageFormat
– 在端点上创建的记录的输出格式。消息格式为JSON
(默认值)或JSON_UNFORMATTED
(单行,无制表符)。-
MessageMaxBytes
– 端点上创建的记录的最大大小(以字节为单位)。默认值是 1000000。注意
您只能使用 AWS CLI/SDK 更改
MessageMaxBytes
为非默认值。例如,要修改现有的 Kafka 端点并更改MessageMaxBytes
,请使用以下命令。aws dms modify-endpoint --endpoint-arn
your-endpoint
--kafka-settings Broker="broker1-server
:broker1-port
,broker2-server
:broker2-port
,...", Topic=topic-name
,MessageMaxBytes=integer-of-max-message-size-in-bytes
IncludeTransactionDetails
– 提供源数据库中的详细事务信息。此信息包括提交时间戳、日志位置以及transaction_id
、previous_transaction_id
和transaction_record_id
(事务内的记录偏移)的值。默认值为false
。IncludePartitionValue
– 显示 Kafka 消息输出中的分区值,除非分区类型为schema-table-type
。默认值为false
。PartitionIncludeSchemaTable
– 当分区类型为primary-key-type
时,将架构和表名称作为分区值的前缀。这样做会提高数据在 Kafka 分区之间的分布广度。例如,假设SysBench
架构具有数千个表,并且每个表的主键只有有限的范围。在这种情况下,同一主键将从数千个表发送到同一个分区,这会导致限制。默认值为false
。IncludeTableAlterOperations
– 包括更改控制数据中表的任何数据定义语言(DDL)操作,例如rename-table
、drop-table
、add-column
、drop-column
和rename-column
。默认值为false
。IncludeControlDetails
– 显示 Kafka 消息输出中的表定义、列定义以及表和列更改的详细控制信息。默认值为false
。-
IncludeNullAndEmpty
– 在目标中包括空列。默认值为false
。 -
SecurityProtocol
– 使用传输层安全性协议(TLS)设置到 Kafka 目标端点的安全连接。选项包括ssl-authentication
、ssl-encryption
和sasl-ssl
。使用sasl-ssl
需要SaslUsername
和SaslPassword
。 -
SslEndpointIdentificationAlgorithm
— 为证书设置主机名验证。3.5.1 及更高 AWS DMS 版本支持此设置。这些选项包含以下内容:NONE
:在客户端连接中禁用代理的主机名验证。HTTPS
:在客户端连接中启用代理的主机名验证。
您可以使用设置来帮助提高传输速度。为此, AWS DMS 支持多线程完全加载到 Apache Kafka 目标集群。 AWS DMS 通过包含以下选项的任务设置支持此多线程:
-
MaxFullLoadSubTasks
— 使用此选项表示要并行加载的最大源表数。 AWS DMS 使用专用子任务将每个表加载到其相应的 Kafka 目标表中。默认值为 8;最大值为 49。 -
ParallelLoadThreads
— 使用此选项指定用于将每个表加载到其 Kafka 目标表中的线程数。 AWS DMS Apache Kafka 目标的最大值为 32。您可以请求提高此最大值限制。 -
ParallelLoadBufferSize
– 使用此选项指定在缓冲区(并行加载线程将数据加载到 Kafka 目标时使用)中存储的最大记录数。默认值是 50。最大值为 1000。将此设置与ParallelLoadThreads
一起使用;仅在有多个线程时ParallelLoadBufferSize
才有效。 -
ParallelLoadQueuesPerThread
– 使用此选项可以指定每个并发线程访问的队列数,以便从队列中取出数据记录并为目标生成批处理负载。默认值是 1。最大值为 512。
您可以通过调整并行线程和批量操作的任务设置来提高 Kafka 端点的更改数据捕获(CDC)性能。为此,您可以使用 ParallelApply*
任务设置来指定并发线程的数量、每个线程的队列数以及要存储在缓冲区中的记录数。例如,假设您要执行 CDC 加载并且要并行应用 128 个线程。您还希望对于每个线程访问 64 个队列,每个缓冲区存储 50 条记录。
为了提高 CDC 性能, AWS DMS 支持以下任务设置:
-
ParallelApplyThreads
— 指定在 CDC 加载期间 AWS DMS 用于将数据记录推送到 Kafka 目标端点的并发线程数。默认值为零(0),最大值为 32。 -
ParallelApplyBufferSize
– 指定在 CDC 加载期间要在每个缓冲区队列中存储的最大记录数,以便将并发线程推送到 Kafka 目标端点。默认值是 100,最大值是 1,000。当ParallelApplyThreads
指定多个线程时,请使用此选项。 -
ParallelApplyQueuesPerThread
– 指定每个线程访问的队列数,以便从队列中取出数据记录并在 CDC 期间为 Kafka 端点生成批处理负载。默认值是 1。最大值为 512。
使用 ParallelApply*
任务设置时,partition-key-type
默认值是表的 primary-key
,而不是 schema-name.table-name
。
使用传输层安全性协议(TLS)连接到 Kafka
Kafka 集群仅接受使用传输层安全性协议(TLS)的安全连接。通过 DMS,您可以使用以下三个安全协议选项中的任何一个来保护 Kafka 端点连接。
- SSL 加密(
server-encryption
) -
客户端通过服务器的证书验证服务器身份。然后在服务器和客户端之间建立加密连接。
- SSL 身份验证(
mutual-authentication
) -
服务器和客户端通过自己的证书相互验证身份。然后在服务器和客户端之间建立加密连接。
- SASL-SSL(
mutual-authentication
) -
简单身份验证和安全层(SASL)方法将客户端的证书替换为用户名和密码以验证客户端身份。具体而言,您需要提供服务器已注册的用户名和密码,以便服务器可以验证客户端的身份。然后在服务器和客户端之间建立加密连接。
重要
Apache Kafka 和 Amazon MSK 接受已解析的证书。这是 Kafka 和 Amazon MSK 有待解决的已知限制。有关更多信息,请参阅 Apache Kafka 问题,KAFKA-3700
如果您使用的是 Amazon MSK,请考虑使用访问控制列表(ACL)来临时解决此已知限制。有关使用 ACL 的更多信息,请参阅《Amazon Managed Streaming for Apache Kafka 开发人员指南》中的 Apache Kafka ACL 部分。
如果您使用的是自管理 Kafka 集群,请参阅 2018 年 10 月 21 日的评论
将 SSL 加密与 Amazon MSK 或自管理 Kafka 集群结合使用
您可以使用 SSL 加密来保护与 Amazon MSK 或自管理 Kafka 集群的端点连接。使用 SSL 加密身份验证方法时,客户端会通过服务器的证书验证服务器的身份。然后在服务器和客户端之间建立加密连接。
使用 SSL 加密连接到 Amazon MSK
-
创建目标 Kafka 端点时,使用
ssl-encryption
选项设置安全协议端点设置(SecurityProtocol
)。以下 JSON 示例将安全协议设置为 SSL 加密。
"KafkaSettings": { "SecurityProtocol": "ssl-encryption", }
对自管理 Kafka 集群使用 SSL 加密
-
如果您在本地 Kafka 集群中使用私有证书颁发机构(CA),请上传您的私有 CA 证书并获取 Amazon 资源名称(ARN)。
-
创建目标 Kafka 端点时,使用
ssl-encryption
选项设置安全协议端点设置(SecurityProtocol
)。以下 JSON 示例将安全协议设置为ssl-encryption
。"KafkaSettings": { "SecurityProtocol": "ssl-encryption", }
-
如果您使用的是私有 CA,请在上面第一步中获得的 ARN 中设置
SslCaCertificateArn
。
使用 SSL 身份验证
您可以使用 SSL 身份验证来保护与 Amazon MSK 或自管理 Kafka 集群的端点连接。
要使用 SSL 身份验证启用客户端身份验证和加密以连接到 Amazon MSK,请执行以下操作:
-
为 Kafka 准备私钥和公有证书。
-
将证书上传到 DMS 证书管理器。
-
使用 Kafka 端点设置中指定的相应证书 ARN 创建一个 Kafka 目标端点。
为 Amazon MSK 准备私钥和公有证书
-
创建 EC2 实例并设置客户端以使用身份验证,如《Amazon Managed Streaming for Apache Kafka 开发人员指南》中的客户端身份验证部分的步骤 1 到 9 中所述。
完成这些步骤后,您将获得证书 ARN(保存在 ACM 中的公有证书 ARN)和一个包含在
kafka.client.keystore.jks
文件中的私钥。 -
使用以下命令获取公有证书并将证书复制到
signed-certificate-from-acm.pem
文件中:aws acm-pca get-certificate --certificate-authority-arn Private_CA_ARN --certificate-arn Certificate_ARN
该命令会返回类似以下示例中显示的信息。
{"Certificate": "123", "CertificateChain": "456"}
然后,将
"123"
的等效内容复制到signed-certificate-from-acm.pem
文件中。 -
通过导入
msk-rsa
密钥从kafka.client.keystore.jks to keystore.p12
获取私钥,如以下示例所示。keytool -importkeystore \ -srckeystore kafka.client.keystore.jks \ -destkeystore keystore.p12 \ -deststoretype PKCS12 \ -srcalias msk-rsa-client \ -deststorepass test1234 \ -destkeypass test1234
-
使用以下命令将
keystore.p12
导出为.pem
格式。Openssl pkcs12 -in keystore.p12 -out encrypted-private-client-key.pem –nocerts
出现输入 PEM 密码短语消息,其中标识出用于加密证书的密钥。
-
从
.pem
文件中删除包属性和密钥属性,确保第一行以如下字符串开头。---BEGIN ENCRYPTED PRIVATE KEY---
将公有证书和私钥上传到 DMS 证书管理器并测试与 Amazon MSK 的连接
-
使用以下命令上传到 DMS 证书管理器。
aws dms import-certificate --certificate-identifier signed-cert --certificate-pem file://
path to signed cert
aws dms import-certificate --certificate-identifier private-key —certificate-pem file://path to private key
-
创建 Amazon MSK 目标端点并测试连接,以确保 TLS 身份验证有效。
aws dms create-endpoint --endpoint-identifier $endpoint-identifier --engine-name kafka --endpoint-type target --kafka-settings '{"Broker": "b-0.kafka260.aaaaa1.a99.kafka.us-east-1.amazonaws.com:0000", "SecurityProtocol":"ssl-authentication", "SslClientCertificateArn": "arn:aws:dms:us-east-1:012346789012:cert:", "SslClientKeyArn": "arn:aws:dms:us-east-1:0123456789012:cert:","SslClientKeyPassword":"test1234"}' aws dms test-connection -replication-instance-arn=$rep_inst_arn —endpoint-arn=$kafka_tar_arn_msk
重要
您可以使用 SSL 身份验证来保护与自管理 Kafka 集群的连接。在某些情况下,您可能会在本地 Kafka 集群中使用私有证书颁发机构(CA)。如果是,请将您的 CA 链、公有证书和私钥上传到 DMS 证书管理器。然后,在创建本地 Kafka 目标端点时,在端点设置中使用相应的 Amazon 资源名称(ARN)。
为自管理 Kafka 集群准备私钥和签名的证书
-
生成以下示例所示的密钥对。
keytool -genkey -keystore kafka.server.keystore.jks -validity 300 -storepass
your-keystore-password
-keypassyour-key-passphrase
-dname "CN=your-cn-name
" -aliasalias-of-key-pair
-storetype pkcs12 -keyalg RSA -
生成证书签名请求(CSR)。
keytool -keystore kafka.server.keystore.jks -certreq -file server-cert-sign-request-rsa -alias on-premise-rsa -storepass
your-key-store-password
-keypassyour-key-password
-
使用集群信任库中的 CA 签署 CSR。如果您没有 CA,则可以创建自己的私有 CA。
openssl req -new -x509 -keyout ca-key -out ca-cert -days
validate-days
-
将
ca-cert
导入服务器信任库和密钥库。如果您没有信任库,请使用以下命令创建信任库,并将ca-cert
导入其中。keytool -keystore kafka.server.truststore.jks -alias CARoot -import -file ca-cert keytool -keystore kafka.server.keystore.jks -alias CARoot -import -file ca-cert
-
在证书上签名。
openssl x509 -req -CA ca-cert -CAkey ca-key -in server-cert-sign-request-rsa -out signed-server-certificate.pem -days
validate-days
-CAcreateserial -passin pass:ca-password
-
将签名的证书导入密钥库。
keytool -keystore kafka.server.keystore.jks -import -file signed-certificate.pem -alias on-premise-rsa -storepass
your-keystore-password
-keypassyour-key-password
-
使用以下命令将
on-premise-rsa
密钥从kafka.server.keystore.jks
导入keystore.p12
。keytool -importkeystore \ -srckeystore kafka.server.keystore.jks \ -destkeystore keystore.p12 \ -deststoretype PKCS12 \ -srcalias on-premise-rsa \ -deststorepass
your-truststore-password
\ -destkeypassyour-key-password
-
使用以下命令将
keystore.p12
导出为.pem
格式。Openssl pkcs12 -in keystore.p12 -out encrypted-private-server-key.pem –nocerts
-
将
encrypted-private-server-key.pem
、signed-certificate.pem
和ca-cert
上传到 DMS 证书管理器。 -
使用返回的 ARN 创建端点。
aws dms create-endpoint --endpoint-identifier $endpoint-identifier --engine-name kafka --endpoint-type target --kafka-settings '{"Broker": "b-0.kafka260.aaaaa1.a99.kafka.us-east-1.amazonaws.com:9092", "SecurityProtocol":"ssl-authentication", "SslClientCertificateArn": "
your-client-cert-arn
","SslClientKeyArn": "your-client-key-arn
","SslClientKeyPassword":"your-client-key-password
", "SslCaCertificateArn": "your-ca-certificate-arn
"}' aws dms test-connection -replication-instance-arn=$rep_inst_arn —endpoint-arn=$kafka_tar_arn_msk
使用 SASL-SSL 身份验证连接到 Amazon MSK
简单身份验证和安全层(SASL)方法使用用户名和密码来验证客户端身份,并在服务器和客户端之间建立加密连接。
要使用 SASL,您需要在设置 Amazon MSK 集群时先创建一个安全的用户名和密码。有关如何为 Amazon MSK 集群设置安全用户名和密码的说明,请参阅《Amazon Managed Streaming for Apache Kafka 开发人员指南》中的为 Amazon MSK 集群设置 SASL/SCRAM 身份验证。
然后,在创建 Kafka 目标端点时,使用 sasl-ssl
选项设置安全协议端点设置(SecurityProtocol
)。您还可以设置 SaslUsername
和 SaslPassword
选项。确保这些设置与首次设置 Amazon MSK 集群时创建的安全用户名和密码一致,如以下 JSON 示例所示。
"KafkaSettings": { "SecurityProtocol": "sasl-ssl", "SaslUsername":"
Amazon MSK cluster secure user name
", "SaslPassword":"Amazon MSK cluster secure password
" }
注意
-
当前,仅 AWS DMS 支持公共 CA 支持的 SASL-SSL。DMS 不支持 SASL-SSL 用于配置了私有 CA 的自管理 Kafka。
-
对于 SASL-SSL 身份验证,默认情况下 AWS DMS 支持 SCRAM-SHA-512 机制。 AWS DMS 3.5.0 及更高版本还支持 Plain 机制。要支持 Plain 机制,请将
KafkaSettings
API 数据类型的SaslMechanism
参数设置为PLAIN
。
使用之前映像查看 Apache Kafka(作为目标)的 CDC 行的原始值
在将 CDC 更新写入数据流目标(如 Kafka)时,可以在更新进行更改之前查看源数据库行的原始值。为此,请根据源数据库引擎提供的数据 AWS DMS 填充更新事件之前的图像。
不同的源数据库引擎为之前映像提供不同的信息量:
-
仅当列发生更改时,Oracle 才会对列提供更新。
-
PostgreSQL 仅为作为主键一部分的列(已更改或未更改)提供数据。如果正在使用逻辑复制,并且为源表设置了 REPLICA IDENTITY FULL,则可以在此处获取有关写入 WAL 的行的完整前后信息。
-
MySQL 通常为所有列(已更改或未更改)提供数据。
要启用之前映像以便将源数据库中的原始值添加到 AWS DMS 输出,请使用 BeforeImageSettings
任务设置或 add-before-image-columns
参数。此参数应用列转换规则。
BeforeImageSettings
使用从源数据库系统收集的值向每个更新操作添加一个新的 JSON 属性,如下所示。
"BeforeImageSettings": { "EnableBeforeImage": boolean, "FieldName": string, "ColumnFilter": pk-only (default) / non-lob / all (but only one) }
注意
将 BeforeImageSettings
应用于完全加载任务及 CDC 任务(这会迁移现有数据并复制正在进行的更改)或仅应用于 CDC 任务(这仅复制数据更改)。不将 BeforeImageSettings
应用于仅完全加载的任务。
对于 BeforeImageSettings
选项,以下条件适用:
-
将
EnableBeforeImage
选项设置为true
以启用之前映像。默认值为false
。 -
使用
FieldName
选项为新 JSON 属性指定名称。当EnableBeforeImage
为true
时,FieldName
是必填项且不能为空。 -
ColumnFilter
选项指定要使用之前映像添加的列。要仅添加属于表主键一部分的列,请使用默认值pk-only
。要仅添加非 LOB 类型的列,请使用non-lob
。要添加具有之前映像值的任何列,请使用all
。"BeforeImageSettings": { "EnableBeforeImage": true, "FieldName": "before-image", "ColumnFilter": "pk-only" }
使用之前映像转换规则
作为任务设置的替代方法,您可以使用 add-before-image-columns
参数,该参数应用列转换规则。使用此参数,您可以在 CDC 期间对数据流目标(如 Kafka)启用之前映像。
通过在转换规则中使用 add-before-image-columns
,可以对之前映像结果应用更精细的控制。转换规则允许您使用对象定位器,该定位器允许您控制为规则选择的表。此外,您可以将转换规则链接在一起,这样可以将不同的规则应用于不同的表。然后,您可以操控使用其他规则生成的列。
注意
不要在同一任务中将 add-before-image-columns
参数与 BeforeImageSettings
任务设置结合使用。而是对单个任务使用此参数或此设置,但不要同时使用这两者。
包含列的 add-before-image-columns
参数的 transformation
规则类型必须提供一个 before-image-def
部分。下面是一个示例。
{ "rule-type": "transformation", … "rule-target": "column", "rule-action": "add-before-image-columns", "before-image-def":{ "column-filter": one-of (pk-only / non-lob / all), "column-prefix": string, "column-suffix": string, } }
column-prefix
的值附加到列名称前面,column-prefix
的默认值为 BI_
。column-suffix
的值将附加到列名称之后,默认值为空。不要同时将 column-prefix
和 column-suffix
设置为空字符串。
为 column-filter
选择一个值。要仅添加属于表主键一部分的列,请选择 pk-only
。选择 non-lob
以仅添加不属于 LOB 类型的列。或者,选择 all
以添加任何具有之前映像值的列。
之前映像转换规则的示例
以下示例中的转换规则在目标中添加一个名为 BI_emp_no
的新列。因此,像 UPDATE
employees SET emp_no = 3 WHERE emp_no = 1;
这样的语句用 1 填充 BI_emp_no
字段。当您将 CDC 更新写入 Amazon S3 目标时,通过 BI_emp_no
列能够判断哪个原始行已更新。
{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "object-locator": { "schema-name": "%", "table-name": "%" }, "rule-action": "include" }, { "rule-type": "transformation", "rule-id": "2", "rule-name": "2", "rule-target": "column", "object-locator": { "schema-name": "%", "table-name": "employees" }, "rule-action": "add-before-image-columns", "before-image-def": { "column-prefix": "BI_", "column-suffix": "", "column-filter": "pk-only" } } ] }
有关使用 add-before-image-columns
规则操作的信息,请参阅 转换规则和操作。
使用 Apache Kafka 作为目标时的限制 AWS Database Migration Service
将 Apache Kafka 作为目标时存在以下限制:
-
AWS DMS Kafka 目标终端节点不支持适用于 Apache Kafka 的亚马逊托管流媒体(亚马逊 MSK)的 IAM 访问控制。
-
不支持完整 LOB 模式。
-
为您的集群指定 Kafka 配置文件,其属性 AWS DMS 允许自动创建新主题。包括设置
auto.create.topics.enable = true
。如果您使用的是 Amazon MSK,可以在创建 Kafka 集群时指定默认配置,然后将auto.create.topics.enable
设置更改为true
。有关默认配置设置的更多信息,请参阅《Amazon Managed Streaming for Apache Kafka 开发人员指南》中的默认 Amazon MSK 配置。如果您需要修改使用 Amazon MSK 创建的现有 Kafka 集群,请运行 AWS CLI 命令aws kafka create-configuration
更新您的 Kafka 配置,如以下示例所示:14:38:41 $ aws kafka create-configuration --name "kafka-configuration" --kafka-versions "2.2.1" --server-properties file://~/kafka_configuration { "LatestRevision": { "Revision": 1, "CreationTime": "2019-09-06T14:39:37.708Z" }, "CreationTime": "2019-09-06T14:39:37.708Z", "Name": "kafka-configuration", "Arn": "arn:aws:kafka:us-east-1:111122223333:configuration/kafka-configuration/7e008070-6a08-445f-9fe5-36ccf630ecfd-3" }
此处,
//~/kafka_configuration
是您使用所需的属性设置创建的配置文件。如果您使用自己安装在 Amazon EC2 上的 Kafka 实例,请使用您的实例提供的选项
auto.create.topics.enable = true
将 Kafka 集群配置修改为允许 AWS DMS 自动创建新主题。 -
AWS DMS 无论事务如何,都会将源数据库中单个记录的每次更新作为给定 Kafka 主题中的一条数据记录(消息)发布。
-
AWS DMS 支持以下两种形式的分区键:
-
SchemaName.TableName
:架构和表名称的组合。 -
${AttributeName}
:JSON 中其中一个字段的值,或源数据库中表的主键。
-
-
对于 Kafka 端点,不支持
BatchApply
。为 KafkaBatchApplyEnabled
目标使用“批量应用”(例如,目标元数据任务设置)可能会导致数据丢失。 -
AWS DMS 不支持迁移超过 16 位数
BigInt
的数据类型的值。要解决此限制问题,您可以使用以下转换规则将BigInt
列转换为字符串。有关转换规则的更多信息,请参阅 转换规则和操作。{ "rule-type": "transformation", "rule-id": "id", "rule-name": "name", "rule-target": "column", "object-locator": { "schema-name": "valid object-mapping rule action", "table-name": "", "column-name": "" }, "rule-action": "change-data-type", "data-type": { "type": "string", "length": 20 } }
使用对象映射将数据迁移到 Kafka 主题
AWS DMS 使用表映射规则将数据从源映射到目标 Kafka 主题。要将数据映射到目标主题,您必须使用称为对象映射的表映射规则类型。您可以使用对象映射来定义源中的数据记录如何映射到发布到 Kafka 主题的数据记录。
除了具有分区键以外,Kafka 主题没有预设结构。
注意
您不一定要使用对象映射。可以使用常规表映射进行各种转换。但是,分区键类型将遵循以下默认行为:
-
主键用作完全加载时的分区键。
-
如果未使用并行应用任务设置,
schema.table
将用作 CDC 的分区键。 -
如果未使用并行应用任务设置,主键则用作 CDC 的分区键。
要创建对象映射规则,请将 rule-type
指定为 object-mapping
。此规则指定您要使用的对象映射的类型。
规则的结构如下所示。
{ "rules": [ { "rule-type": "object-mapping", "rule-id": "
id
", "rule-name": "name
", "rule-action": "valid object-mapping rule action
", "object-locator": { "schema-name": "case-sensitive schema name
", "table-name": "" } } ] }
AWS DMS 目前支持将map-record-to-record
和map-record-to-document
作为该rule-action
参数的唯一有效值。这些设置会影响未作为 exclude-columns
属性列表一部分排除的值。map-record-to-record
和map-record-to-document
值指定默认情况下如何 AWS DMS 处理这些记录。这些值不会以任何方式影响属性映射。
从关系数据库迁移到 Kafka 主题时使用 map-record-to-record
。此规则类型使用关系数据库中的 taskResourceId.schemaName.tableName
值作为 Kafka 主题中的分区键,并为源数据库中的每个列创建一个属性。
使用 map-record-to-record
时请注意:
此设置仅影响
exclude-columns
列表排除的列。对于每个这样的列,在目标主题中 AWS DMS 创建一个相应的属性。
AWS DMS 无论源列是否用于属性映射,都会创建相应的属性。
了解 map-record-to-record
的一种方法是在操作时加以观察。对于本示例,假定您使用关系数据库表行开始处理,该行具有以下结构和数据。
FirstName | LastName | StoreId | HomeAddress | HomePhone | WorkAddress | WorkPhone | DateofBirth |
---|---|---|---|---|---|---|---|
Randy |
Marsh | 5 |
221B Baker Street |
1234567890 |
31 Spooner Street, Quahog |
9876543210 |
02/29/1988 |
要将此信息从名为 Test
的架构迁移到 Kafka 主题,请创建规则来将数据映射到目标主题。以下规则对此映射进行了说明。
{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "rule-action": "include", "object-locator": { "schema-name": "Test", "table-name": "%" } }, { "rule-type": "object-mapping", "rule-id": "2", "rule-name": "DefaultMapToKafka", "rule-action": "map-record-to-record", "object-locator": { "schema-name": "Test", "table-name": "Customers" } } ] }
给定 Kafka 主题和分区键(在本例中为 taskResourceId.schemaName.tableName
),下面说明了使用 Kafka 目标主题中的示例数据生成的记录格式:
{ "FirstName": "Randy", "LastName": "Marsh", "StoreId": "5", "HomeAddress": "221B Baker Street", "HomePhone": "1234567890", "WorkAddress": "31 Spooner Street, Quahog", "WorkPhone": "9876543210", "DateOfBirth": "02/29/1988" }
使用属性映射调整数据结构
在使用属性映射将数据迁移到 Kafka 主题时,您可以调整数据的结构。例如,您可能希望将源中的多个字段合并到目标中的单个字段中。以下属性映射说明如何调整数据结构。
{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "rule-action": "include", "object-locator": { "schema-name": "Test", "table-name": "%" } }, { "rule-type": "object-mapping", "rule-id": "2", "rule-name": "TransformToKafka", "rule-action": "map-record-to-record", "target-table-name": "CustomerData", "object-locator": { "schema-name": "Test", "table-name": "Customers" }, "mapping-parameters": { "partition-key-type": "attribute-name", "partition-key-name": "CustomerName", "exclude-columns": [ "firstname", "lastname", "homeaddress", "homephone", "workaddress", "workphone" ], "attribute-mappings": [ { "target-attribute-name": "CustomerName", "attribute-type": "scalar", "attribute-sub-type": "string", "value": "${lastname}, ${firstname}" }, { "target-attribute-name": "ContactDetails", "attribute-type": "document", "attribute-sub-type": "json", "value": { "Home": { "Address": "${homeaddress}", "Phone": "${homephone}" }, "Work": { "Address": "${workaddress}", "Phone": "${workphone}" } } } ] } } ] }
要为 partition-key
设置常量值,请指定 partition-key
值。例如,您可以执行此操作来强制将所有数据存储在一个分区中。以下映射说明了此方法。
{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "object-locator": { "schema-name": "Test", "table-name": "%" }, "rule-action": "include" }, { "rule-type": "object-mapping", "rule-id": "1", "rule-name": "TransformToKafka", "rule-action": "map-record-to-document", "object-locator": { "schema-name": "Test", "table-name": "Customer" }, "mapping-parameters": { "partition-key": { "value": "ConstantPartitionKey" }, "exclude-columns": [ "FirstName", "LastName", "HomeAddress", "HomePhone", "WorkAddress", "WorkPhone" ], "attribute-mappings": [ { "attribute-name": "CustomerName", "value": "${FirstName},${LastName}" }, { "attribute-name": "ContactDetails", "value": { "Home": { "Address": "${HomeAddress}", "Phone": "${HomePhone}" }, "Work": { "Address": "${WorkAddress}", "Phone": "${WorkPhone}" } } }, { "attribute-name": "DateOfBirth", "value": "${DateOfBirth}" } ] } } ] }
注意
表示特定表的控制记录的 partition-key
值为 TaskId.SchemaName.TableName
。表示特定任务的控制记录的 partition-key
值为该记录的 TaskId
。在对象映射中指定 partition-key
值不会影响控制记录的 partition-key
。
使用对象映射进行多主题复制
默认情况下, AWS DMS 任务会将所有源数据迁移到以下 Kafka 主题之一:
如 AWS DMS 目标终端节点的 “主题” 字段所指定。
如果目标端点的主题字段未填,且 Kafka
auto.create.topics.enable
设置设为true
,则由kafka-default-topic
指定。
在 3.4.6 及更高版本的 AWS DMS 引擎中,您可以使用kafka-target-topic
属性将每个迁移的源表映射到单独的主题。例如,下面的对象映射规则分别将源表 Customer
和 Address
迁移到 Kafka 主题 customer_topic
和 address_topic
。同时,将所有其他源表(包括Test
架构中的Bills
表) AWS DMS 迁移到目标终端节点中指定的主题。
{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "rule-action": "include", "object-locator": { "schema-name": "Test", "table-name": "%" } }, { "rule-type": "object-mapping", "rule-id": "2", "rule-name": "MapToKafka1", "rule-action": "map-record-to-record", "kafka-target-topic": "customer_topic", "object-locator": { "schema-name": "Test", "table-name": "Customer" }, "partition-key": {"value": "ConstantPartitionKey" } }, { "rule-type": "object-mapping", "rule-id": "3", "rule-name": "MapToKafka2", "rule-action": "map-record-to-record", "kafka-target-topic": "address_topic", "object-locator": { "schema-name": "Test", "table-name": "Address" }, "partition-key": {"value": "HomeAddress" } }, { "rule-type": "object-mapping", "rule-id": "4", "rule-name": "DefaultMapToKafka", "rule-action": "map-record-to-record", "object-locator": { "schema-name": "Test", "table-name": "Bills" } } ] }
通过使用 Kafka 多主题复制,您可以使用单个复制任务将源表分组并迁移到单独的 Kafka 主题。
Apache Kafka 的消息格式
JSON 输出只是键值对的列表。
- RecordType
-
记录类型可以是数据或控制。数据记录表示源中的实际行。控制记录表示流中的重要事件,例如,重新开始任务。
- 操作
-
对于数据记录,操作可以是
load
、insert
、update
或delete
。对于控制记录,操作可以是
create-table
、rename-table
、drop-table
、change-columns
、add-column
、drop-column
、rename-column
或column-type-change
。 - SchemaName
-
记录的源架构。此字段对于控制记录可能是空的。
- TableName
-
记录的源表。此字段对于控制记录可能是空的。
- Timestamp
-
JSON 消息构建时间的时间戳。此字段采用 ISO 8601 格式。
以下 JSON 消息示例说明了包含所有其他元数据的数据类型消息。
{ "data":{ "id":100000161, "fname":"val61s", "lname":"val61s", "REGION":"val61s" }, "metadata":{ "timestamp":"2019-10-31T22:53:59.721201Z", "record-type":"data", "operation":"insert", "partition-key-type":"primary-key", "partition-key-value":"sbtest.sbtest_x.100000161", "schema-name":"sbtest", "table-name":"sbtest_x", "transaction-id":9324410911751, "transaction-record-id":1, "prev-transaction-id":9324410910341, "prev-transaction-record-id":10, "commit-timestamp":"2019-10-31T22:53:55.000000Z", "stream-position":"mysql-bin-changelog.002171:36912271:0:36912333:9324410911751:mysql-bin-changelog.002171:36912209" } }
以下 JSON 消息示例说明了控件类型消息。
{ "control":{ "table-def":{ "columns":{ "id":{ "type":"WSTRING", "length":512, "nullable":false }, "fname":{ "type":"WSTRING", "length":255, "nullable":true }, "lname":{ "type":"WSTRING", "length":255, "nullable":true }, "REGION":{ "type":"WSTRING", "length":1000, "nullable":true } }, "primary-key":[ "id" ], "collation-name":"latin1_swedish_ci" } }, "metadata":{ "timestamp":"2019-11-21T19:14:22.223792Z", "record-type":"control", "operation":"create-table", "partition-key-type":"task-id", "schema-name":"sbtest", "table-name":"sbtest_t1" } }