AWS Database Migration Service のターゲットとしての Amazon Kinesis Data Streams の使用 - AWS Database Migration Service

「翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。」

AWS Database Migration Service のターゲットとしての Amazon Kinesis Data Streams の使用

データを Amazon Kinesis データストリームに移行するために、AWS DMS を使用できます。Amazon Kinesis データストリームは Amazon Kinesis Data Streams サービスの一部です。データレコードの大量のストリームをリアルタイムで収集し、処理するには、Kinesis データストリームを使用します。

Kinesis データストリームはシャードで構成されています。シャードは、ストリーム内のデータレコードの一意に識別されたシーケンスです。のシャードの詳細については、Amazon Kinesis Data Streamsの「シャード」を参照してくださいAmazon Kinesis Data Streams 開発者ガイド。

AWS Database Migration Service は、JSON を使用してレコードを Kinesis データストリームに発行します。変換時に、AWS DMS はソースデータベースから各レコードを JSON 形式または JSON_UNFORMATTED メッセージ形式の属性と値のペアにシリアル化します。JSON_UNFORMATTED メッセージ形式は、改行区切り文字を含む単一行の JSON 文字列です。これにより、Amazon Kinesis Data Firehose は Amazon S3 の宛先に Kinesis データを配信し、Amazon Athena を含むさまざまなクエリエンジンを使用してクエリを実行できます。

注記

JSON_UNFORMATTED Kinesis メッセージ形式のサポートは、AWS DMS バージョン 3.3.1 以降で使用できます。

サポートされている任意のデータソースから、ターゲットストリームにデータを移行するために、オブジェクトのマッピングを使用します。オブジェクトマッピングを使用して、ストリームにデータレコードを構築する方法を決定します。データをそのシャードにグループ化するために Kinesis Data Streams で使用する、各テーブルのパーティションキーも定義します。

AWS DMS が Kinesis Data Streams ターゲットエンドポイントでテーブルを作成するときに、ソースデータベースのエンドポイントと同じ数のテーブルを作成します。また、AWS DMS はいくつかの Kinesis Data Streams パラメータ値も設定します。テーブル作成のコストは、データの量および移行するテーブルの数によって異なります。

Kinesis Data Streams エンドポイント設定

Kinesis Data Streams ターゲットエンドポイントを使用すると、AWS DMS API の KinesisSettings オプションを使用してトランザクションとコントロールの詳細を取得できます。CLI で、次の --kinesis-settings オプションのリクエストパラメータを使用します。

注記

IncludeNullAndEmptyエンドポイント設定のサポートはバージョン 3.4.1 以降で使用できます。AWS DMSただし、Kinesis Data Streamsターゲットの次の他のエンドポイント設定のサポートは、AWS DMSバージョン 3.3.1 以降で利用できます。

  • IncludeControlDetails – Kinesis メッセージ出力に、テーブル定義、列定義、テーブルおよび列の変更の詳細な制御情報を表示します。デフォルト: false

  • ターゲットに NULL 列と空の列IncludeNullAndEmpty–を含めます。デフォルト: false

  • パーティションタイプが でない限り、IncludePartitionValueメッセージ出力内のパーティション値–Kinesisを表示しますschema-table-type。 デフォルトは ですfalse

  • IncludeTableAlterOperations–、 、 rename-table 、 、 など、制御データのテーブルを変更するデータ定義言語 (DDL) drop-table オペレーションが含まれますadd-columndrop-columnrename-column デフォルトは ですfalse

  • IncludeTransactionDetails – ソースデータベースからの詳細のトランザクション情報を提供します。この情報には、コミットタイムスタンプ、ログの位置、transaction_idprevious_transaction_id、および transaction_record_id (トランザクション内のレコードオフセット) の値が含まれます。デフォルト: false

  • パーティションタイプが の場合、スキーマとテーブル名をパーティション値にPartitionIncludeSchemaTable–プレフィックスprimary-key-typeします。 これにより、Kinesis シャード間のデータ分散が増加します。たとえば、SysBench スキーマに数千のテーブルがあり、各テーブルのプライマリキーの範囲が制限されているとします。この場合、同じプライマリキーが数千のテーブルから同じシャードに送信され、スロットリングが発生します。デフォルト: false

次の例は、AWS CLI を使用して発行されたkinesis-settingsコマンド例で使用中のcreate-endpointオプションを示しています。

aws dms create-endpoint --endpoint-identifier=$target_name --engine-name kinesis --endpoint-type target --region us-east-1 --kinesis-settings ServiceAccessRoleArn=arn:aws:iam::333333333333:role/dms-kinesis-role, StreamArn=arn:aws:kinesis:us-east-1:333333333333:stream/dms-kinesis-target-doc,MessageFormat=json-unformatted, IncludeControlDetails=true,IncludeTransactionDetails=true,IncludePartitionValue=true,PartitionIncludeSchemaTable=true, IncludeTableAlterOperations=true

マルチスレッド全ロードタスク設定

転送の速度を高めるために、AWS DMS は Kinesis Data Streams ターゲットインスタンスへのマルチスレッドフルロードがサポートされています。DMS では、このマルチスレッドでのタスク設定を、以下でサポートします。

  • MaxFullLoadSubTasks – 並行してロードするソーステーブルの最大数を指定するには、このオプションを使用します。DMS は、専用のサブタスクを使用して、対応する Kinesis ターゲットテーブルに各テーブルをロードします。デフォルトは 8、最大値は 49 です。

  • ParallelLoadThreads – AWS DMS が各テーブルを Kinesis ターゲットテーブルにロードするために使用するスレッドの数を指定するには、このオプションを使用します。Kinesis Data Streams ターゲットの最大値は 32 です。この上限を増やすよう依頼できます。

  • ParallelLoadBufferSize – Kinesis ターゲットにデータをロードするために並列ロードスレッドが使用する、バッファ内に保存するレコードの最大数を指定するには、このオプションを使用します。デフォルト値は 50 です。最大値は 1000 です。この設定は ParallelLoadThreads で使用します。ParallelLoadBufferSize は、複数のスレッドがある場合にのみ有効です。

  • ParallelLoadQueuesPerThread – このオプションを使用して、各同時スレッドがキューからデータレコードを取り出し、ターゲットのバッチロードを生成するためにアクセスするキューの数を指定します。デフォルトは1です。ただし、さまざまなペイロードサイズの Kinesis ターゲットでは、有効な範囲はスレッドあたり 5~512 キューです。

マルチスレッド CDC ロードタスクの設定

タスク設定を使用して PutRecords API コールの動作を変更するなど、Kinesis などのリアルタイムデータストリーミングターゲットエンドポイントの変更データキャプチャ (CDC) のパフォーマンスを向上させることができます。これを行うには、ParallelApply* タスク設定を使用して、同時スレッドの数、スレッドあたりのキュー数、バッファに格納するレコード数を指定します。たとえば、CDC ロードを実行し、128 個のスレッドを並列に適用するとします。また、スレッドあたり 64 個のキューにアクセスして、バッファあたり 50 個のレコードを保存する必要があります。

注記

AWS DMS バージョン 3.3.1 以降では、CDC 中に Kinesis Data Streams ターゲットエンドポイントに対して、ParallelApply* タスク設定の使用がサポートされています。

CDC のパフォーマンスを向上させるため、AWS DMS では次のタスク設定をサポートしています。

  • ParallelApplyThreads – データレコードを Kinesis ターゲットエンドポイントにプッシュするために CDC ロード中に AWS DMS が使用する同時スレッドの数を指定します。デフォルト値は 0 で、最大値は 32 です。

  • ParallelApplyBufferSize – CDC ロード中に同時スレッドが Kinesis ターゲットエンドポイントにプッシュする場合に、各バッファキューに格納するレコードの最大数を指定します。デフォルト値は 50 で、最大値は 1,000 です。このオプションは、ParallelApplyThreads が複数のスレッドを指定する場合に使用します。

  • ParallelApplyQueuesPerThread – 各スレッドがキューからデータレコードを取り出し、CDC 中に Kinesis エンドポイントのバッチロードを生成するためにアクセスするキューの数を指定します。

ParallelApply* タスク設定を使用する場合、partition-key-type のデフォルトは schema-name.table-name ではなくテーブルの primary-key です。

前イメージを使用した Kinesis データストリームの CDC 行の元の値のターゲットとしての表示

Kinesis のようなデータストリーミングターゲットに CDC 更新を書き込む場合、更新によって変更される前に、ソースデータベースの行の元の値を表示できます。これを実現するために、AWS DMS は、ソースデータベースエンジンから提供されたデータに基づいて更新イベントの前イメージを設定します。

注記

AWS DMS バージョン 3.3.1 以降では、CDC 中に Amazon Kinesis Data Streams ターゲットエンドポイントに対して前イメージタスク設定の使用がサポートされています。

ソースデータベースエンジンによって、前イメージに対してさまざまな量の情報が提供されます。

  • Oracle では、列が変更された場合にのみ列の更新が提供されます。

  • はPostgreSQL、プライマリキーの一部である列のデータ (変更されたかどうか) のみを提供します。

  • MySQLは、通常、すべての列のデータ (変更されたかどうか) を提供します。

前イメージを有効にして、ソースデータベースから元の値を AWS DMS 出力に追加するには、BeforeImageSettings タスク設定または add-before-image-columns パラメータを使用します。このパラメータは、列変換ルールを適用します。

BeforeImageSettings は、次に示すように、ソースデータベースシステムから収集された値を使用して、すべての更新オペレーションに新しい JSON 属性を追加します。

"BeforeImageSettings": { "EnableBeforeImage": boolean, "FieldName": string, "ColumnFilter": pk-only (default) / non-lob / all (but only one) }
注記

全ロード + CDCタスク (既存のデータを移行して進行中の変更をレプリケートする)、または CDC のみのタスク (データ変更のみをレプリケートする) に BeforeImageSettings を適用します。全ロードのタスクには BeforeImageSettings を適用しないでください。

BeforeImageSettings オプションには、次の項目が適用されます。

  • EnableBeforeImage オプションを true に設定して、前イメージを有効にします。デフォルト: false

  • FieldName オプションを使用して、新しい JSON 属性に名前を割り当てます。EnableBeforeImagetrue の場合、FieldName は必須であり、空にすることはできません。

  • ColumnFilter オプションは、前イメージを使用して追加する列を指定します。テーブルのプライマリキーの一部である列だけを追加するには、デフォルト値 を使用しますpk-only。 LOB タイプではない列のみを追加するには、 non-lob を使用します。 前イメージ値を持つ列を追加するには、 all を使用します。

    "BeforeImageSettings": { "EnableBeforeImage": true, "FieldName": "before-image", "ColumnFilter": "pk-only" }
注記

Amazon S3ターゲットは をサポートしていませんBeforeImageSettings。 S3 ターゲットの場合、CDC 中に前イメージを実行するadd-before-image-columns変換ルールのみを使用します。

前イメージ変換ルールの使用

タスク設定の代わりに、列変換ルールを適用する add-before-image-columns パラメータを使用できます。このパラメータを使用すると、Kinesis のようなデータストリーミングターゲットで CDC 中に前イメージを有効にできます。

変換ルールで add-before-image-columns を使用すると、前イメージの結果のよりきめ細かい制御を適用することができます。変換ルールを使用すると、オブジェクトロケーターを使用し、ルールに選択したテーブルを制御できます。また、変換ルールを連結することもできます。これにより、テーブルごとに異なるルールを適用できます。その後、他のルールを使用して生成された列を操作できます。

注記

同じタスク内で、add-before-image-columns パラメータと同時に BeforeImageSettings タスク設定を使用しないでください。代わりに、1 つのタスクにこのパラメータとこの設定のいずれかを使用し、両方を使用しないでください。

列の add-before-image-columns パラメータを持つ transformation ルールタイプは、before-image-def セクションを提供する必要があります。次に の例を示します。

{ "rule-type": "transformation", … "rule-target": "column", "rule-action": "add-before-image-columns", "before-image-def":{ "column-filter": one-of (pk-only / non-lob / all), "column-prefix": string, "column-suffix": string, } }

の値は列名の前に付加され、 のデフォルト値column-prefixは ですcolumn-prefixBI_ の値は列名に追加され、デフォルトは空です。column-suffixcolumn-prefixcolumn-suffix の両方を空の文字列に設定しないでください。

の値を 1 つ選択しますcolumn-filter。 テーブルのプライマリキーの一部である列だけを追加するには、 を選択しますpk-only。LOB タイプではない列のみを追加するように non-lob を選択します。または、前イメージの値を持つ任意の列を追加するように all を選択します。

前イメージ変換前ルールの例

次の例の変換ルールは、ターゲットに BI_emp_no という新しい列を追加します。したがって、UPDATE employees SET emp_no = 3 WHERE emp_no = 1; のようなステートメントは、BI_emp_no フィールドに 1 を設定します。CDC 更新を Amazon S3 ターゲットに書き込むと、更新された元の行は BI_emp_no 列からわかります。

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "object-locator": { "schema-name": "%", "table-name": "%" }, "rule-action": "include" }, { "rule-type": "transformation", "rule-id": "2", "rule-name": "2", "rule-target": "column", "object-locator": { "schema-name": "%", "table-name": "employees" }, "rule-action": "add-before-image-columns", "before-image-def": { "column-prefix": "BI_", "column-suffix": "", "column-filter": "pk-only" } } ] }

add-before-image-columns ルールアクションの使用方法については、「 変換ルールおよび変換アクション」を参照してください。

AWS Database Migration Service のターゲットとして Kinesis データストリームを使用する場合の前提条件

AWS DMS のターゲットとして Kinesis データストリームを設定する前に、IAM ロールを必ず作成してください。このロールにより、移行先の Kinesis データストリームへのアクセスを AWS DMS が引き受けて許可できるようになります。アクセス許可の最小設定は、次の IAM ポリシーで示します。

{ "Version": "2012-10-17", "Statement": [ { "Sid": "1", "Effect": "Allow", "Principal": { "Service": "dms.amazonaws.com" }, "Action": "sts:AssumeRole" } ] }

Kinesis データストリームに移行する際に使用するロールには、次のアクセス許可が必要です。

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kinesis:DescribeStream", "kinesis:PutRecord", "kinesis:PutRecords" ], "Resource": "arn:aws:kinesis:region:accountID:stream/streamName" } ] }

AWS Database Migration Service のターゲットとして Kinesis Data Streams を使用する場合の制限

ターゲットとして Kinesis Data Streams を使用する場合、以下の制限が適用されます。

  • AWS DMSは、MiBターゲットKinesis Data Streamsに対して最大メッセージサイズ 1 をサポートしています。

  • AWS DMS は、特定の Kinesis データストリームの 1 つのデータレコードとして、トランザクションに関係なく、各更新をソースデータベースの 1 つのレコードに発行します。ただし、KinesisSettings API の関連パラメータを使用して、各データレコードのトランザクションの詳細を含めることができます。

  • Kinesis Data Streams は重複排除をサポートしていません。ストリームからデータを消費するアプリケーションは、重複したレコードを処理する必要があります。詳細については、次のガイドの「重複レコードの処理」を参照してくださいAmazon Kinesis Data Streams 開発者ガイド。

  • AWS DMS では、パーティションキーの次の 2 つの形式がサポートされています。

    • SchemaName.TableName: スキーマとテーブル名の組み合わせ。

    • ${AttributeName}: JSON のいずれかのフィールドの値、またはソースデータベースのテーブルのプライマリキー。

  • Kinesis Data Streams内に保存されているデータの暗号化の詳細については、 Kinesis Data Streams 開発者ガイドの「 でのデータ保護」を参照してください。AWS Key Management Service

Kinesis データストリームにデータを移行するためのオブジェクトマッピングの使用

AWS DMS は、ソースからターゲット Kinesis データストリームにデータをマッピングするためのテーブルマッピングルールを使用します。ターゲットストリームにデータをマッピングするために、オブジェクトマッピングと呼ばれるテーブルマッピングルールのタイプを使用します。オブジェクトマッピングを使用して、ソースのデータレコードがどのように Kinesis データストリームに発行されたデータレコードにマッピングされるかを定義します。

Kinesis データストリームには、パーティションキー以外にプリセット構造はありません。オブジェクトマッピングルールでは、データレコードの partition-key-type に指定できる値は、schema-tabletransaction-idprimary-keyconstantattribute-name です。

オブジェクトマッピングルールを作成するには、 rule-typeとして指定しますobject-mapping。 このルールは、使用するオブジェクトマッピングのタイプを指定します。

ルールの構造は次のとおりです。

{ "rules": [ { "rule-type": "object-mapping", "rule-id": "id", "rule-name": "name", "rule-action": "valid object-mapping rule action", "object-locator": { "schema-name": "case-sensitive schema name", "table-name": "" } } ] }

AWS DMS では現在、rule-action パラメータに対する有効な値として map-record-to-record および map-record-to-document のみがサポートされています。map-record-to-record および map-record-to-document の値は、exclude-columns 属性リストの一部として除外されてないものとして AWS DMS がデフォルトで記録するものを指定します。これらの値は、どのような方法でも属性マッピングに影響しません。

リレーショナルデータベースから Kinesis データストリームに移行する際に map-record-to-record を使用します。このルールタイプでは、Kinesis データストリームのパーティションキーとしてリレーショナルデータベースから taskResourceId.schemaName.tableName 値を使用し、ソースデータベース内の各列の属性を作成します。map-record-to-record を使用する場合、exclude-columns 属性リストに示されていないソーステーブル内のすべての列について、AWS DMS はターゲットストリームに対応する属性を作成します。この対応する属性は、そのソース列が属性マッピングで使用されているかどうかにかかわらず作成されます。

"_doc" を使用して、適切なターゲットストリーム内の 1 つのフラットドキュメントにソース列を配置map-record-to-documentするために使用します。 AWS DMSは、データを "_doc" という名前のソース上の 1 つのフラットマップに配置します。この配置は、exclude-columns 属性リストに含まれていないソーステーブル内のすべての列に適用されます。

map-record-to-record を理解するための 1 つの方法は、実際の動作を確認することです。この例では、次の構造とデータを含むリレーショナルデータベースのテーブルの行から始めると想定してください。

FirstName LastName StoreId HomeAddress HomePhone WorkAddress WorkPhone DateofBirth

Randy

Marsh

5

221B Baker Street

1234567890

31 Spooner Street, Quahog

9876543210

02/29/1988

この情報を Test という名前のスキーマから Kinesis データストリームに移行するには、データをターゲットストリームにマッピングするルールを作成します。以下のルールはマッピングを示しています。

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "rule-action": "include", "object-locator": { "schema-name": "Test", "table-name": "%" } }, { "rule-type": "object-mapping", "rule-id": "2", "rule-name": "DefaultMapToKinesis", "rule-action": "map-record-to-record", "object-locator": { "schema-name": "Test", "table-name": "Customers" } } ] }

次は、Kinesis データストリームの結果のレコード形式を示しています。

  • StreamName: XXX

  • PartitionKey: Test.Customers //schmaName.tableName

  • データ: //次の JSON メッセージ

    { "FirstName": "Randy", "LastName": "Marsh", "StoreId": "5", "HomeAddress": "221B Baker Street", "HomePhone": "1234567890", "WorkAddress": "31 Spooner Street, Quahog", "WorkPhone": "9876543210", "DateOfBirth": "02/29/1988" }

ただし、同じルールを使用するが、rule-actionパラメータを特定の列に変更map-record-to-documentしたり特定の列を除外したりするとします。以下のルールはマッピングを示しています。

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "rule-action": "include", "object-locator": { "schema-name": "Test", "table-name": "%" } }, { "rule-type": "object-mapping", "rule-id": "2", "rule-name": "DefaultMapToKinesis", "rule-action": "map-record-to-document", "object-locator": { "schema-name": "Test", "table-name": "Customers" }, "mapping-parameters": { "exclude-columns": [ "homeaddress", "homephone", "workaddress", "workphone" ] } } ] }

この場合、exclude-columnsパラメータ 、 FirstNameLastNameおよび にリストされていない列は、 StoreId にマッピングされますDateOfBirth_doc 次は、結果のレコード形式を示しています。

{ "data":{ "_doc":{ "FirstName": "Randy", "LastName": "Marsh", "StoreId": "5", "DateOfBirth": "02/29/1988" } } }

属性マッピングを使用したデータの再構築

属性マップを使用してデータを Kinesis データストリームに移行している間にデータを再構築できます。たとえば、ソース内の複数のフィールドを結合してターゲット内に 1 つのフィールドを構成することもできます。以下の属性マップはデータを再構築する方法を示しています。

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "rule-action": "include", "object-locator": { "schema-name": "Test", "table-name": "%" } }, { "rule-type": "object-mapping", "rule-id": "2", "rule-name": "TransformToKinesis", "rule-action": "map-record-to-record", "target-table-name": "CustomerData", "object-locator": { "schema-name": "Test", "table-name": "Customers" }, "mapping-parameters": { "partition-key-type": "attribute-name", "partition-key-name": "CustomerName", "exclude-columns": [ "firstname", "lastname", "homeaddress", "homephone", "workaddress", "workphone" ], "attribute-mappings": [ { "target-attribute-name": "CustomerName", "attribute-type": "scalar", "attribute-sub-type": "string", "value": "${lastname}, ${firstname}" }, { "target-attribute-name": "ContactDetails", "attribute-type": "document", "attribute-sub-type": "json", "value": { "Home": { "Address": "${homeaddress}", "Phone": "${homephone}" }, "Work": { "Address": "${workaddress}", "Phone": "${workphone}" } } } ] } } ] }

partition-key の定数値を設定するには、partition-key 値を指定します。たとえば、すべてのデータを 1 つのシャードに強制的に格納するためにこれを行うことができます。以下のマッピングはこの方法を示しています。

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "object-locator": { "schema-name": "Test", "table-name": "%" }, "rule-action": "include" }, { "rule-type": "object-mapping", "rule-id": "1", "rule-name": "TransformToKinesis", "rule-action": "map-record-to-document", "object-locator": { "schema-name": "Test", "table-name": "Customer" }, "mapping-parameters": { "partition-key": { "value": "ConstantPartitionKey" }, "exclude-columns": [ "FirstName", "LastName", "HomeAddress", "HomePhone", "WorkAddress", "WorkPhone" ], "attribute-mappings": [ { "attribute-name": "CustomerName", "value": "${FirstName},${LastName}" }, { "attribute-name": "ContactDetails", "value": { "Home": { "Address": "${HomeAddress}", "Phone": "${HomePhone}" }, "Work": { "Address": "${WorkAddress}", "Phone": "${WorkPhone}" } } }, { "attribute-name": "DateOfBirth", "value": "${DateOfBirth}" } ] } } ] }
注記

特定のテーブル用のコントロールレコードpartition-keyの値は、 ですTaskId.SchemaName.TableName。 特定のタスク用のコントロールレコードpartition-keyの値は、そのレコードの TaskId です。 オブジェクトマッピングにpartition-key値を指定しても、コントロールレコードの には影響しません。partition-key

Kinesis Data Streams のメッセージ形式

JSON 出力は、単にキーと値のペアのリストです。JSON_UNFORMATTED メッセージ形式は、改行区切り文字を含む単一行の JSON 文字列です。

注記

JSON_UNFORMATTED Kinesis メッセージ形式のサポートは、AWS DMS バージョン 3.3.1 以降で使用できます。

AWS DMS には、Kinesis Data Streams からデータを簡単に利用するために、以下の予約されたフィールドがあります。

RecordType

レコードタイプはデータまたはコントロールのいずれかです。データレコードは、ソースの実際の行を表します。コントロールレコードは、タスクの再起動など、ストリーム内の重要なイベント用です。

オペレーション

データレコードの場合、オペレーションは createreadupdate、または delete です。

コントロールレコードの場合、オペレーションは TruncateTable または DropTable です。

SchemaName

レコードのソーススキーマ。コントロールレコードの場合、このフィールドは空です。

TableName

レコードのソーステーブル。コントロールレコードの場合、このフィールドは空です。

タイムスタンプ

JSON メッセージが構築された時刻のタイムスタンプ。このフィールドは ISO 8601 形式でフォーマットされます。