将 Amazon Kinesis Data Streams 作为 AWS Database Migration Service 的目标 - AWS Database Migration Service

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

将 Amazon Kinesis Data Streams 作为 AWS Database Migration Service 的目标

您可以使用 AWS DMS 将数据迁移到 Amazon Kinesis 数据流。Amazon Kinesis 数据流是 Amazon Kinesis Data Streams 服务的一部分。可以使用 Kinesis 数据流实时收集和处理大型数据记录流。

Kinesis 数据流由分片组成。分片是流中数据记录的唯一标识序列。有关 Amazon Kinesis Data Streams 中的分片的更多信息,请参阅 中的分片。Amazon Kinesis Data Streams 开发人员指南

AWS Database Migration Service 使用 JSON 将记录发布到 Kinesis 数据流。在转换期间,AWS DMS 将每个记录从源数据库序列化到 JSON 格式的属性/值对或 JSON_UNFORMATTED 消息格式。JSON_UNFORMATTED 消息格式是带有换行符的单行 JSON 字符串。它允许 Amazon Kinesis Data Firehose 将 Kinesis 数据传递到 Amazon S3 目标,然后使用各种查询引擎(包括 Amazon Athena)进行查询。

您将使用对象映射将数据从支持的数据源迁移到目标流。使用对象映射,您确定如何在流中建立数据记录结构。您还可以为每个表定义分区键,Kinesis Data Streams 用它来将数据分组为分片。

当 AWS DMS 在 Kinesis Data Streams 目标终端节点上创建表时,它创建与源数据库终端节点相同数量的表。AWS DMS 还会设置几个 Kinesis Data Streams 参数值。创建表的成本取决于要迁移的数据量和表数。

注意

DMS 控制台或 API 上的 SSL 模式选项不适用于某些数据流式处理和 NoSQL 服务,例如 Kinesis 和 DynamoDB。 它们在默认情况下是安全的,因此 DMS 显示 SSL 模式设置为无 (SSL Mode=None)。您无需为终端节点提供任何额外配置即可使用 SSL。例如,将 Kinesis 作为目标终端节点时,默认情况下它是安全的。对 Kinesis 的所有 API 调用都使用 SSL,因此无需在 DMS 终端节点中使用其他 SSL 选项。您可以使用 HTTPS 协议安全地放置数据并通过 SSL 终端节点检索数据,默认情况下,DMS 在连接到 Kinesis 数据流时会使用该协议。

Kinesis Data Streams 终端节点设置

在使用 Kinesis Data Streams 目标终端节点时,可以使用 KinesisSettings API 中的 AWS DMS 选项获取事务和控制详细信息。在 CLI 中,使用 --kinesis-settings 选项的请求参数,如下所示:

注意

版本 3.4.1 及更高版本中支持 IncludeNullAndEmpty 终端节点设置。AWS DMS但 Kinesis Data Streams 版本 3.3.1 及更高版本中支持 AWS DMS 目标的其他以下终端节点设置。

  • MessageFormat – 在终端节点上创建的记录的输出格式。消息格式为 JSON(默认值)或 JSON_UNFORMATTED(单行,无制表符)。

  • IncludeControlDetails – 显示 Kinesis 消息输出中的表定义、列定义以及表和列更改的详细控制信息。默认值为 false

  • IncludeNullAndEmpty – 在目标中包含 NULL 和空列。默认值为 false

  • IncludePartitionValue – 显示 Kinesis 消息输出中的分区值,除非分区类型为 schema-table-type。 默认值为 false

  • IncludeTableAlterOperations – 包含更改控制数据中表的任何数据定义语言 (DDL) 操作,例如 rename-tabledrop-tableadd-columndrop-columnrename-column。 默认值为 false

  • IncludeTransactionDetails – 提供源数据库中的详细事务信息。此信息包括提交时间戳、日志位置以及 transaction_idprevious_transaction_idtransaction_record_id (事务内的记录偏移)的值。默认值为 false

  • PartitionIncludeSchemaTable – 当分区类型为 primary-key-type 时,将架构和表名作为分区值的前缀。 这样做会增加 Kinesis 分片之间的数据分布。例如,假设 SysBench 架构具有数千个表,并且每个表的主键只有有限的范围。在此况下,同一主键将从数千个表发送到同一个分片,这会导致限制。默认值为 false

以下示例显示了与使用 AWS CLI 发出的示例 kinesis-settings 命令结合使用的 create-endpoint 选项。

aws dms create-endpoint --endpoint-identifier=$target_name --engine-name kinesis --endpoint-type target --region us-east-1 --kinesis-settings ServiceAccessRoleArn=arn:aws:iam::333333333333:role/dms-kinesis-role, StreamArn=arn:aws:kinesis:us-east-1:333333333333:stream/dms-kinesis-target-doc,MessageFormat=json-unformatted, IncludeControlDetails=true,IncludeTransactionDetails=true,IncludePartitionValue=true,PartitionIncludeSchemaTable=true, IncludeTableAlterOperations=true

多线程完全加载任务设置

为了帮助提高速度传输,AWS DMS 支持多线程完全加载到 Kinesis Data Streams 目标实例。对于包含下列内容的任务设置,DMS 支持此多线程处理:

  • MaxFullLoadSubTasks – 使用此选项指示要并行加载的表的最大数目。DMS 使用专用的子任务将各个表加载到其对应的 Kinesis 目标表。默认值为 8;最大值为 49。

  • ParallelLoadThreads – 使用此选项指定 AWS DMS 将各个表加载到其 Kinesis 目标表时使用的线程数。Kinesis Data Streams 目标的最大值为 32。您可以请求提高此最大值限制。

  • ParallelLoadBufferSize – 使用此选项指定在缓冲区(并行加载线程将 Kinesis 数据加载到目标时使用)中存储的最大记录数。默认值是 50。最大值为 1,000。将此设置与 ParallelLoadThreads 一起使用;仅在有多个线程时ParallelLoadBufferSize 才有效。

  • ParallelLoadQueuesPerThread – 使用此选项可以指定每个并发线程访问的队列数,以便从队列中取出数据记录并为目标生成批处理负载。默认值为 1。但是,对于各种负载大小的 Kinesis 目标,有效范围为每个线程 5–512 个队列。

多线程 CDC 加载任务设置

您可以提高实时数据流目标终端节点的更改数据捕获 (CDC) 的性能,例如,Kinesis 使用任务设置来修改 PutRecords API 调用的行为。为此,您可以使用 ParallelApply* 任务设置来指定并发线程的数量、每个线程的队列数以及要存储在缓冲区中的记录数。例如,假设您要执行 CDC 加载并且要并行应用 128 个线程。您还希望对于每个线程访问 64 个队列,每个缓冲区存储 50 条记录。

注意

ParallelApply* 版本 3.3.1 及更高版本中,支持在 CDC 到 Kinesis Data Streams 目标终端节点期间使用 AWS DMS 任务设置。

要提高 CDC 性能,请 AWS DMS 支持以下任务设置:

  • ParallelApplyThreads – 指定 AWS DMS 在 CDC 加载期间用于将数据记录推送到 Kinesis 目标终端节点的并发线程数。默认值为零 (0),最大值为 32。

  • ParallelApplyBufferSize – 指定在 CDC 加载过程中,要在每个缓冲区队列中存储的供并发线程推送到目标 Kinesis 终端节点的最大记录数。默认值为 50,最大值为 1,000。当 ParallelApplyThreads 指定多个线程时,请使用此选项。

  • ParallelApplyQueuesPerThread – 指定每个线程访问以将数据记录从队列中取出并在 CDC 期间为 Kinesis 终端节点生成批处理负载的队列数。

使用 ParallelApply* 任务设置时,partition-key-type 默认值为表的 primary-key,而不是 schema-name.table-name

使用之前映像查看 Kinesis 数据流(作为目标)的 CDC 行的原始值

在将 CDC 更新写入数据流目标(如 Kinesis)时,可以在更新进行更改之前查看源数据库行的原始值。为做到这一点,AWS DMS 根据源数据库引擎提供的数据填充更新事件的之前映像

注意

在 Amazon Kinesis Data Streams 版本 3.3.1 及更高版本中,支持在 CDC到 AWS DMS 目标终端节点期间使用之前映像任务设置。

不同的源数据库引擎为之前映像提供不同的信息量:

  • 仅当列发生更改时,Oracle 才会对列提供更新。

  • PostgreSQL 仅为属于主键一部分的列(已更改或未更改)提供数据。

  • MySQL 通常为所有列(已更改或未更改)提供数据。

要启用之前映像以便将源数据库中的原始值添加到 AWS DMS 输出,请使用 BeforeImageSettings 任务设置或 add-before-image-columns 参数。此参数应用列转换规则。

BeforeImageSettings 使用从源数据库系统收集的值向每个更新操作添加一个新的 JSON 属性,如下所示。

"BeforeImageSettings": { "EnableBeforeImage": boolean, "FieldName": string, "ColumnFilter": pk-only (default) / non-lob / all (but only one) }
注意

BeforeImageSettings 应用于完全加载任务及 CDC 任务(这会迁移现有数据并复制正在进行的更改)或仅应用于 CDC 任务(这仅复制数据更改)。不将 BeforeImageSettings 应用于仅完全加载的任务。

对于 BeforeImageSettings 选项,以下条件适用:

  • EnableBeforeImage 选项设置为 true 以启用之前映像。默认值为 false

  • 使用 FieldName 选项为新 JSON 属性指定名称。当 EnableBeforeImagetrue 时,FieldName 是必填项且不能为空。

  • ColumnFilter 选项指定要使用之前映像添加的列。要仅添加属于表主键一部分的列,请使用默认值 pk-only。 要仅添加非 LOB 类型的列,请使用 non-lob。 要添加具有之前映像值的任何列,请使用 all

    "BeforeImageSettings": { "EnableBeforeImage": true, "FieldName": "before-image", "ColumnFilter": "pk-only" }
注意

Amazon S3 目标不支持 BeforeImageSettings。 对于 S3 目标,在 CDC 期间仅使用 add-before-image-columns 转换规则执行之前映像。

使用之前映像转换规则

作为任务设置的替代方法,您可以使用 add-before-image-columns 参数,该参数应用列转换规则。使用此参数,您可以在 CDC 期间对数据流目标(如 )启用之前映像。Kinesis.

通过在转换规则中使用 add-before-image-columns,可以对之前映像结果应用更精细的控制。转换规则允许您使用对象定位器,该定位器允许您控制为规则选择的表。此外,您可以将转换规则链接在一起,这样可以将不同的规则应用于不同的表。然后,您可以操控使用其他规则生成的列。

注意

不要在同一任务中将 add-before-image-columns 参数与 BeforeImageSettings 任务设置结合使用。而是对单个任务使用此参数或此设置,但不要同时使用这两者。

包含列的 transformation 参数的 add-before-image-columns 规则类型必须提供一个 before-image-def 部分。下面是一个示例。

{ "rule-type": "transformation", … "rule-target": "column", "rule-action": "add-before-image-columns", "before-image-def":{ "column-filter": one-of (pk-only / non-lob / all), "column-prefix": string, "column-suffix": string, } }

的值附加到列名称前面,column-prefix 的默认值为 column-prefixBI_ 的值将附加到列名称之后,默认值为空。column-suffix不要同时将 column-prefixcolumn-suffix 设置为空字符串。

column-filter 选择一个值。 要仅添加属于表主键一部分的列,请选择 pk-only。选择 non-lob 以仅添加不属于 LOB 类型的列。或者,选择 all 以添加任何具有之前映像值的列。

之前映像转换规则的示例

以下示例中的转换规则在目标中添加一个名为 BI_emp_no 的新列。因此,像 UPDATE employees SET emp_no = 3 WHERE emp_no = 1; 这样的语句用 1 填充 BI_emp_no 字段。当您将 CDC 更新写入 Amazon S3 目标时,通过 BI_emp_no 列能够判断哪个原始行已更新。

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "object-locator": { "schema-name": "%", "table-name": "%" }, "rule-action": "include" }, { "rule-type": "transformation", "rule-id": "2", "rule-name": "2", "rule-target": "column", "object-locator": { "schema-name": "%", "table-name": "employees" }, "rule-action": "add-before-image-columns", "before-image-def": { "column-prefix": "BI_", "column-suffix": "", "column-filter": "pk-only" } } ] }

有关使用 add-before-image-columns 规则操作的信息,请参阅 转换规则和操作.

将 Kinesis 数据流作为 AWS Database Migration Service 目标的先决条件

在将 Kinesis 数据流设置为 AWS DMS 的目标之前,请确保您创建了 IAM 角色。此角色必须允许 AWS DMS 代入并授予对要迁移到的 Kinesis 数据流的访问权限。以下 IAM 策略中显示了所需的最小访问权限集合。

{ "Version": "2012-10-17", "Statement": [ { "Sid": "1", "Effect": "Allow", "Principal": { "Service": "dms.amazonaws.com" }, "Action": "sts:AssumeRole" } ] }

您在迁移到 Kinesis 数据流时使用的角色必须具有以下权限。

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kinesis:DescribeStream", "kinesis:PutRecord", "kinesis:PutRecords" ], "Resource": "arn:aws:kinesis:region:accountID:stream/streamName" } ] }

将 Kinesis Data Streams 作为 AWS Database Migration Service 目标的限制

将 Kinesis Data Streams 作为目标时存在以下限制:

  • AWS DMS 支持 MiB 目标的最大消息大小为 1 Kinesis Data Streams。

  • AWS DMS 将每个更新作为给定 Kinesis 数据流中的一条数据记录发布到源数据库中的单条记录,而不考虑事务。但是,您可以使用 KinesisSettings API 的相关参数包含每条数据记录的事务详细信息。

  • Kinesis Data Streams 不支持重复数据删除。使用流中数据的应用程序需要处理重复记录。有关更多信息,请参阅 https://docs.aws.amazon.com/streams/latest/dev/kinesis-record-processor-duplicates.html 中的处理重复记录Amazon Kinesis Data Streams 开发人员指南。

  • AWS DMS 支持以下两种分区键:

    • SchemaName.TableName:架构和表名称的组合。

    • ${AttributeName}:JSON 中其中一个字段的值,或源数据库中表的主键。

  • 有关在 Kinesis Data Streams 中加密静态数据的信息,请参阅 开发人员指南Kinesis Data Streams 中的 中的数据保护。AWS Key Management Service

使用对象映射将数据迁移到 Kinesis 数据流

AWS DMS 使用表映射规则将数据从源映射到目标 Kinesis 数据流。要将数据映射到目标流,您必须使用称为 object mapping 的表映射规则类型。您可以使用对象映射来定义源中的数据记录如何映射到发布到 Kinesis 数据流的数据记录。

Kinesis除了具有分区键以外, 数据流没有预设结构。在对象映射规则中,数据记录的 partition-key-type 的可能值为 schema-tabletransaction-idprimary-keyconstantattribute-name

要创建对象映射规则,请将 rule-type 指定为 object-mapping。 此规则指定您要使用的对象映射的类型。

规则的结构如下所示。

{ "rules": [ { "rule-type": "object-mapping", "rule-id": "id", "rule-name": "name", "rule-action": "valid object-mapping rule action", "object-locator": { "schema-name": "case-sensitive schema name", "table-name": "" } } ] }

AWS DMS 目前支持 map-record-to-recordmap-record-to-document 作为 rule-action 参数的唯一有效值。map-record-to-recordmap-record-to-document 值指定 AWS DMS 默认情况下对未作为 exclude-columns 属性列表的一部分排除的记录执行的操作。这些值不会以任何方式影响属性映射。

从关系数据库迁移到 map-record-to-record 数据流时使用 Kinesis 此规则类型使用关系数据库的 taskResourceId.schemaName.tableName 值作为 Kinesis 数据流中的分区键,并为源数据库中的每个列创建一个属性。在使用 map-record-to-record 时,对于源表中未在 exclude-columns 属性列表中列出的任何列,AWS DMS 将在目标流中创建对应的属性。不论是否在属性映射中使用源列,都会创建对应的属性。

使用 map-record-to-document 将源列放入相应目标流的单个扁平文档中,并使用属性名称“_doc”。AWS DMS 将数据放入源上名为“_doc”的单个扁平映射中。此放置应用于源表中的未在 exclude-columns 属性列表中列出的任何列。

了解 map-record-to-record 的一种方法是在操作时加以观察。对于本示例,假定您使用关系数据库表行开始处理,该行具有以下结构和数据。

FirstName LastName StoreId HomeAddress HomePhone WorkAddress WorkPhone DateofBirth

Randy

Marsh

5

221B Baker Street

1234567890

31 Spooner Street, Quahog

9876543210

02/29/1988

要将此信息从名为 Test 的架构迁移到 Kinesis 数据流,您将创建规则来将数据映射到目标流。以下规则对此映射进行了说明。

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "rule-action": "include", "object-locator": { "schema-name": "Test", "table-name": "%" } }, { "rule-type": "object-mapping", "rule-id": "2", "rule-name": "DefaultMapToKinesis", "rule-action": "map-record-to-record", "object-locator": { "schema-name": "Test", "table-name": "Customers" } } ] }

以下内容说明 Kinesis 数据流中生成的记录格式。

  • StreamName:XXX

  • PartitionKey:Test.Customers //schmaName.tableName

  • 数据://The following JSON message

    { "FirstName": "Randy", "LastName": "Marsh", "StoreId": "5", "HomeAddress": "221B Baker Street", "HomePhone": "1234567890", "WorkAddress": "31 Spooner Street, Quahog", "WorkPhone": "9876543210", "DateOfBirth": "02/29/1988" }

但是,假设您使用相同的规则,但将 rule-action 参数更改为 map-record-to-document 并排除某些列。以下规则对此映射进行了说明。

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "rule-action": "include", "object-locator": { "schema-name": "Test", "table-name": "%" } }, { "rule-type": "object-mapping", "rule-id": "2", "rule-name": "DefaultMapToKinesis", "rule-action": "map-record-to-document", "object-locator": { "schema-name": "Test", "table-name": "Customers" }, "mapping-parameters": { "exclude-columns": [ "homeaddress", "homephone", "workaddress", "workphone" ] } } ] }

在这种情况下,exclude-columns 参数中未列出的列 FirstNameLastNameStoreIdDateOfBirth 将映射到 _doc。 以下内容说明生成的记录格式。

{ "data":{ "_doc":{ "FirstName": "Randy", "LastName": "Marsh", "StoreId": "5", "DateOfBirth": "02/29/1988" } } }

使用属性映射调整数据结构

在使用属性映射将数据迁移到 Kinesis 数据流时,您可以调整数据结构。例如,您可能希望将源中的多个字段合并到目标中的单个字段中。以下属性映射说明如何调整数据结构。

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "rule-action": "include", "object-locator": { "schema-name": "Test", "table-name": "%" } }, { "rule-type": "object-mapping", "rule-id": "2", "rule-name": "TransformToKinesis", "rule-action": "map-record-to-record", "target-table-name": "CustomerData", "object-locator": { "schema-name": "Test", "table-name": "Customers" }, "mapping-parameters": { "partition-key-type": "attribute-name", "partition-key-name": "CustomerName", "exclude-columns": [ "firstname", "lastname", "homeaddress", "homephone", "workaddress", "workphone" ], "attribute-mappings": [ { "target-attribute-name": "CustomerName", "attribute-type": "scalar", "attribute-sub-type": "string", "value": "${lastname}, ${firstname}" }, { "target-attribute-name": "ContactDetails", "attribute-type": "document", "attribute-sub-type": "json", "value": { "Home": { "Address": "${homeaddress}", "Phone": "${homephone}" }, "Work": { "Address": "${workaddress}", "Phone": "${workphone}" } } } ] } } ] }

要为 partition-key 设置常量值,请指定 partition-key 值。例如,您可以执行此操作来强制将所有数据存储在一个分片内。以下映射说明了此方法。

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "object-locator": { "schema-name": "Test", "table-name": "%" }, "rule-action": "include" }, { "rule-type": "object-mapping", "rule-id": "1", "rule-name": "TransformToKinesis", "rule-action": "map-record-to-document", "object-locator": { "schema-name": "Test", "table-name": "Customer" }, "mapping-parameters": { "partition-key": { "value": "ConstantPartitionKey" }, "exclude-columns": [ "FirstName", "LastName", "HomeAddress", "HomePhone", "WorkAddress", "WorkPhone" ], "attribute-mappings": [ { "attribute-name": "CustomerName", "value": "${FirstName},${LastName}" }, { "attribute-name": "ContactDetails", "value": { "Home": { "Address": "${HomeAddress}", "Phone": "${HomePhone}" }, "Work": { "Address": "${WorkAddress}", "Phone": "${WorkPhone}" } } }, { "attribute-name": "DateOfBirth", "value": "${DateOfBirth}" } ] } } ] }
注意

用于特定表的控制记录的 partition-key 值为 TaskId.SchemaName.TableName。 用于特定任务的控制记录的 partition-key 值为该记录的 TaskId。 在对象映射中指定 partition-key 值不会影响控制记录的 partition-key

的消息格式Kinesis Data Streams

JSON 输出只是键值对的列表。JSON_UNFORMATTED 消息格式是带有换行符的单行 JSON 字符串。

注意

在 AWS DMS 版本 3.3.1 及更高版本中,支持 JSON_UNFORMATTED Kinesis 消息格式。

AWS DMS 提供了以下预留字段,以便更轻松地使用 Kinesis Data Streams 中的数据:

RecordType

记录类型可以是数据或控制。数据记录表示源中的实际行。控制记录表示流中的重要事件,例如,重新开始任务。

运算

对于数据记录,操作可以是 createreadupdatedelete

对于控制记录,操作可以是 TruncateTableDropTable.

SchemaName

记录的源架构。此字段对于控制记录可能是空的。

TableName

记录的源表。此字段对于控制记录可能是空的。

时间戳

JSON 消息构建时间的时间戳。此字段采用 ISO 8601 格式。