Apache Kafka を AWS Database Migration Service のターゲットとして使用する - AWS Database Migration Service

「翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。」

Apache Kafka を AWS Database Migration Service のターゲットとして使用する

AWS DMS を使用して、Apache Kafka クラスターにデータを移行できます。Apache Kafka は分散ストリーミングプラットフォームです。Apache Kafka を使用して、ストリーミングデータをリアルタイムで取り込み、処理できます。

AWS は、AWS DMS ターゲットとして使用する Amazon Managed Streaming for Apache Kafka (Amazon MSK) も提供しています。Amazon MSK は、Apache Kafka インスタンスの実装と管理を簡素化する、フルマネージド型 Apache Kafka ストリーミングサービスです。オープンソースの Apache Kafka バージョンで動作し、Apache Kafka インスタンスとまったく同じように AWS DMS ターゲットとして Amazon MSK インスタンスにアクセスします。詳細については、 の「Amazon MSK とは」を参照してくださいAmazon Managed Streaming for Apache Kafka 開発者ガイド。

Kafka クラスターは、パーティションに分割されたトピックと呼ばれるカテゴリにレコードのストリームを保存します。パーティションは、トピック内のデータレコード (メッセージ) の一意に識別されたシーケンスです。パーティションは、トピックのレコードの並列処理を可能にするために、クラスター内の複数のブローカーに分散できます。トピックとパーティション、および Apache Kafka での分散の詳細については、「トピックとログ」と「分散」を参照してください。

は、JSON を使用して Kafka トピックにレコードAWS Database Migration Serviceを発行します。変換時、AWS DMS はソースデータベースからの各レコードを JSON フォーマットの属性と値のペアにシリアル化します。

サポートされている任意のデータソースから、ターゲット Kafka クラスターにデータを移行するには、オブジェクトのマッピングを使用します。オブジェクトマッピングを使用して、ターゲットトピックのデータレコードを構築する方法を決定します。データをそのパーティションにグループ化するために Apache Kafka で使用する、各テーブルのパーティションキーも定義します。

AWS DMS が Apache Kafka ターゲットエンドポイントでテーブルを作成するときに、ソースデータベースのエンドポイントと同じ数のテーブルを作成します。また、AWS DMS はいくつかの Apache Kafka パラメータ値も設定します。テーブル作成のコストは、データの量および移行するテーブルの数によって異なります。

Apache Kafka のエンドポイント設定

接続の詳細を指定するには、AWS DMSコンソールのエンドポイント設定または CLI --kafka-settingsのオプションを使用します。各設定の要件は次のとおりです。

  • の形式でブローカーの場所Broker–を指定しますbroker-hostname:port。 たとえば、 "ec2-12-345-678-901.compute-1.amazonaws.com:2345" 。 これは、クラスター内の任意のブローカーの場所にすることができます。クラスターブローカーはすべて、トピックに移行されたデータレコードのパーティション化を処理するために通信します。

  • Topic – (オプション) 最大 255 文字および記号のトピック名を指定します。ピリオド (.)、アンダースコア (_)、マイナス (-) を使用できます。ピリオド (.) またはアンダースコア (_) があるトピック名は、内部データ構造内で衝突する可能性があります。トピック名には、どちらか一方を使用し、両方とも使用することは避けてください。トピック名を指定しない場合、 AWS DMSは移行トピック"kafka-default-topic"として使用します。

    注記

    指定した移行トピックまたはデフォルトのトピックのいずれかを AWS DMS で作成するには、auto.create.topics.enable = true を Kafka クラスター設定の一部として設定します。詳細については、「AWS Database Migration Service のターゲットとして Apache Kafka を使用する場合の制限」を参照してください

  • MessageFormat – エンドポイントで作成されたレコードの出力形式。メッセージ形式は JSON (デフォルト) または JSON_UNFORMATTED (タブなし 1 行) です。

  • MessageMaxBytes–エンドポイントで作成されたレコードの最大サイズ (バイト単位)。デフォルトは 1,000,000 です。

  • IncludeTransactionDetails – ソースデータベースからの詳細のトランザクション情報を提供します。この情報には、コミットタイムスタンプ、ログの位置、transaction_idprevious_transaction_id、および transaction_record_id (トランザクション内のレコードオフセット) の値が含まれます。デフォルト: false

  • パーティションタイプが でない限り、Kafka メッセージ出力内のパーティション値IncludePartitionValue–を表示しますschema-table-type。 デフォルトは ですfalse

  • パーティションタイプが の場合、スキーマとテーブル名をパーティション値にPartitionIncludeSchemaTable–プレフィックスprimary-key-typeします。 これにより、Kafka パーティション間のデータ分散が増加します。たとえば、SysBench スキーマに数千のテーブルがあり、各テーブルのプライマリキーの範囲が制限されているとします。この場合、同じプライマリキーが数千のテーブルから同じパーティションに送信され、スロットリングが発生します。デフォルト: false

  • IncludeTableAlterOperations–、 、 rename-table 、 、 など、制御データのテーブルを変更するデータ定義言語 (DDL) drop-table オペレーションが含まれますadd-columndrop-columnrename-column デフォルトは ですfalse

  • IncludeControlDetails – Kafka メッセージ出力に、テーブル定義、列定義、テーブルおよび列の変更の詳細な制御情報を表示します。デフォルト: false

  • ターゲットに NULL 列と空の列IncludeNullAndEmpty–を含めます。デフォルト: false

マルチスレッド全ロードタスク設定

設定を使用すると、転送速度を上げることができます。これを行うために、 AWS DMSは Apache Kafka ターゲットクラスターへのマルチスレッドフルロードをサポートしています。 AWS DMSは、このマルチスレッドを以下のようなタスク設定でサポートします。

  • MaxFullLoadSubTasks – 並行してロードするソーステーブルの最大数を指定するには、このオプションを使用します。AWS DMS は専用サブタスクを使用して、各テーブルを対応する Kafka ターゲットテーブルにロードします。デフォルトは 8、最大値は 49 です。

  • ParallelLoadThreads – AWS DMS が各テーブルを Kafka ターゲットテーブルにロードするために使用するスレッドの数を指定するには、このオプションを使用します。Apache Kafka ターゲットの最大値は 32 です。この上限を増やすよう依頼できます。

  • ParallelLoadBufferSize – Kafka ターゲットにデータをロードするために並列ロードスレッドが使用する、バッファ内に保存するレコードの最大数を指定するには、このオプションを使用します。デフォルト値は 50 です。最大値は 1000 です。この設定は ParallelLoadThreads で使用します。ParallelLoadBufferSize は、複数のスレッドがある場合にのみ有効です。

  • ParallelLoadQueuesPerThread – このオプションを使用して、各同時スレッドがキューからデータレコードを取り出し、ターゲットのバッチロードを生成するためにアクセスするキューの数を指定します。デフォルトは1です。ただし、さまざまなペイロードサイズの Kafka ターゲットでは、有効な範囲はスレッドあたり 5~512 キューです。

マルチスレッド CDC ロードタスクの設定

タスク設定を使用して PutRecords API コールの動作を変更するなど、Kafka などのリアルタイムデータストリーミングターゲットエンドポイントの変更データキャプチャ (CDC) のパフォーマンスを向上させることができます。これを行うには、ParallelApply* タスク設定を使用して、同時スレッドの数、スレッドあたりのキュー数、バッファに格納するレコード数を指定します。たとえば、CDC ロードを実行し、128 個のスレッドを並列に適用するとします。また、スレッドあたり 64 個のキューにアクセスして、バッファあたり 50 個のレコードを保存する必要があります。

CDC のパフォーマンスを向上させるため、AWS DMS では次のタスク設定をサポートしています。

  • ParallelApplyThreads – データレコードを Kafka ターゲットエンドポイントにプッシュするために CDC ロード中に AWS DMS が使用する同時スレッドの数を指定します。デフォルト値は 0 で、最大値は 32 です。

  • ParallelApplyBufferSize – CDC ロード中に同時スレッドが Kafka ターゲットエンドポイントにプッシュする場合に、各バッファキューに格納するレコードの最大数を指定します。デフォルト値は 50 で、最大値は 1,000 です。このオプションは、ParallelApplyThreads が複数のスレッドを指定する場合に使用します。

  • ParallelApplyQueuesPerThread – 各スレッドがキューからデータレコードを取り出し、CDC 中に Kafka エンドポイントのバッチロードを生成するためにアクセスするキューの数を指定します。

ParallelApply* タスク設定を使用する場合、partition-key-type のデフォルトは schema-name.table-name ではなくテーブルの primary-key です。

ターゲットとして Apache Kafka の CDC 行の元の値を表示するために前イメージを使用

Kafka のようなデータストリーミングターゲットに CDC 更新を書き込むときは、更新によって変更される前に、ソースデータベースの行の元の値を表示できます。これを実現するために、AWS DMS は、ソースデータベースエンジンから提供されたデータに基づいて更新イベントの前イメージを設定します。

ソースデータベースエンジンによって、前イメージに対してさまざまな量の情報が提供されます。

  • Oracle では、列が変更された場合にのみ列の更新が提供されます。

  • はPostgreSQL、プライマリキーの一部である列のデータ (変更されたかどうか) のみを提供します。

  • MySQLは、通常、すべての列のデータ (変更されたかどうか) を提供します。

前イメージを有効にして、ソースデータベースから元の値を AWS DMS 出力に追加するには、BeforeImageSettings タスク設定または add-before-image-columns パラメータを使用します。このパラメータは、列変換ルールを適用します。

BeforeImageSettings は、次に示すように、ソースデータベースシステムから収集された値を使用して、すべての更新オペレーションに新しい JSON 属性を追加します。

"BeforeImageSettings": { "EnableBeforeImage": boolean, "FieldName": string, "ColumnFilter": pk-only (default) / non-lob / all (but only one) }
注記

全ロード + CDCタスク (既存のデータを移行して進行中の変更をレプリケートする)、または CDC のみのタスク (データ変更のみをレプリケートする) に BeforeImageSettings を適用します。全ロードのタスクには BeforeImageSettings を適用しないでください。

BeforeImageSettings オプションには、次の項目が適用されます。

  • EnableBeforeImage オプションを true に設定して、前イメージを有効にします。デフォルト: false

  • FieldName オプションを使用して、新しい JSON 属性に名前を割り当てます。EnableBeforeImagetrue の場合、FieldName は必須であり、空にすることはできません。

  • ColumnFilter オプションは、前イメージを使用して追加する列を指定します。テーブルのプライマリキーの一部である列だけを追加するには、デフォルト値 を使用しますpk-only。 LOB タイプではない列のみを追加するには、 non-lob を使用します。 前イメージ値を持つ列を追加するには、 all を使用します。

    "BeforeImageSettings": { "EnableBeforeImage": true, "FieldName": "before-image", "ColumnFilter": "pk-only" }

前イメージ変換ルールの使用

タスク設定の代わりに、列変換ルールを適用する add-before-image-columns パラメータを使用できます。このパラメータを使用すると、Kafka のようなデータストリーミングターゲットで CDC 中に前イメージを有効にできます。

変換ルールで add-before-image-columns を使用すると、前イメージの結果のよりきめ細かい制御を適用することができます。変換ルールを使用すると、オブジェクトロケーターを使用し、ルールに選択したテーブルを制御できます。また、変換ルールを連結することもできます。これにより、テーブルごとに異なるルールを適用できます。その後、他のルールを使用して生成された列を操作できます。

注記

同じタスク内で、add-before-image-columns パラメータと同時に BeforeImageSettings タスク設定を使用しないでください。代わりに、1 つのタスクにこのパラメータとこの設定のいずれかを使用し、両方を使用しないでください。

列の add-before-image-columns パラメータを持つ transformation ルールタイプは、before-image-def セクションを提供する必要があります。次に の例を示します。

{ "rule-type": "transformation", … "rule-target": "column", "rule-action": "add-before-image-columns", "before-image-def":{ "column-filter": one-of (pk-only / non-lob / all), "column-prefix": string, "column-suffix": string, } }

の値は列名の前に付加され、 のデフォルト値column-prefixは ですcolumn-prefixBI_ の値は列名に追加され、デフォルトは空です。column-suffixcolumn-prefixcolumn-suffix の両方を空の文字列に設定しないでください。

の値を 1 つ選択しますcolumn-filter。 テーブルのプライマリキーの一部である列だけを追加するには、 を選択しますpk-only。LOB タイプではない列のみを追加するように non-lob を選択します。または、前イメージの値を持つ任意の列を追加するように all を選択します。

前イメージ変換前ルールの例

次の例の変換ルールは、ターゲットに BI_emp_no という新しい列を追加します。したがって、UPDATE employees SET emp_no = 3 WHERE emp_no = 1; のようなステートメントは、BI_emp_no フィールドに 1 を設定します。CDC 更新を Amazon S3 ターゲットに書き込むと、更新された元の行は BI_emp_no 列からわかります。

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "object-locator": { "schema-name": "%", "table-name": "%" }, "rule-action": "include" }, { "rule-type": "transformation", "rule-id": "2", "rule-name": "2", "rule-target": "column", "object-locator": { "schema-name": "%", "table-name": "employees" }, "rule-action": "add-before-image-columns", "before-image-def": { "column-prefix": "BI_", "column-suffix": "", "column-filter": "pk-only" } } ] }

add-before-image-columns ルールアクションの使用方法については、「 変換ルールおよび変換アクション」を参照してください。

AWS Database Migration Service のターゲットとして Apache Kafka を使用する場合の制限

ターゲットとして Apache Kafka を使用する場合、以下の制限が適用されます。

  • AWS DMSは、Kafka ターゲットMiBに対して 1 の最大メッセージサイズをサポートしています。

  • AWS DMSレプリケーションインスタンスと Kafka クラスターの両方を、同じセキュリティグループに基づく同じ仮想プライベートクラウド (VPC) Amazon VPCおよび同じセキュリティグループに設定します。Kafka クラスターは、Amazon MSK インスタンスまたは Amazon EC2 で実行されている独自の Kafka インスタンスのどちらかになります。詳細については、を参照してください レプリケーションインスタンスのためのネットワークのセットアップ

    注記

    Amazon MSK のセキュリティグループを指定するには、[クラスターの作成] ページで [詳細設定] を選択して [Customize settings (設定をカスタマイズ)] を選択し、セキュリティグループを選択するか、レプリケーションインスタンスと同じ場合はデフォルトを受け入れます。

  • AWS DMS が新しいトピックを自動的に作成できるようにするプロパティを使用して、クラスターの Kafka 設定ファイルを指定します。設定 を含めますauto.create.topics.enable = true。 を使用している場合は、Kafka Amazon MSK クラスターの作成時にデフォルト設定を指定し、そのauto.create.topics.enable設定を true に変更できます。 デフォルトの構成設定の詳細については、のデフォルトの Amazon MSK 設定を参照してくださいhttps://docs.aws.amazon.com/msk/latest/developerguide/msk-default-configuration.html。Amazon Managed Streaming for Apache Kafka 開発者ガイドAmazon MSK を使用して作成した既存の Kafka クラスターを変更する必要がある場合は、次の例のとおり、AWS CLI コマンド aws kafka create-configuration を実行して Kafka 設定を更新します。

    14:38:41 $ aws kafka create-configuration --name "kafka-configuration" --kafka-versions "2.2.1" --server-properties file://~/kafka_configuration { "LatestRevision": { "Revision": 1, "CreationTime": "2019-09-06T14:39:37.708Z" }, "CreationTime": "2019-09-06T14:39:37.708Z", "Name": "kafka-configuration", "Arn": "arn:aws:kafka:us-east-1:111122223333:configuration/kafka-configuration/7e008070-6a08-445f-9fe5-36ccf630ecfd-3" }

    ここでは、//~/kafka_configuration は必要なプロパティ設定を使用して作成した設定ファイルです。

    Amazon EC2 にインストールされている独自の Kafka インスタンスを使用している場合、インスタンスで適用されているオプションを使用して、auto.create.topics.enable = true を含め同様のプロパティ設定で Kafka クラスター設定を変更します。

  • は、特定の Kafka トピックの 1 つのデータレコード (メッセージ) として、トランザクションに関係なく、各更新をソースデータベースの 1 つのレコードにAWS DMS発行します。

  • AWS DMS では、パーティションキーの次の 2 つの形式がサポートされています。

    • SchemaName.TableName: スキーマとテーブル名の組み合わせ。

    • ${AttributeName}: JSON のいずれかのフィールドの値、またはソースデータベースのテーブルのプライマリキー。

Kafka トピックにデータを移行するためのオブジェクトマッピングの使用

AWS DMSは、ソースからターゲット Kafka トピックにデータをマッピングするためのテーブルマッピングルールを使用します。ターゲットトピックにデータをマッピングするために、オブジェクトマッピングと呼ばれるテーブルマッピングルールのタイプを使用します。オブジェクトマッピングを使用して、ソースのデータレコードがどのように Kafka トピックに発行されたデータレコードにマッピングされるかを定義します。

Kafka トピックには、パーティションキー以外にプリセット構造はありません。

オブジェクトマッピングルールを作成するには、 rule-typeとして指定しますobject-mapping。 このルールは、使用するオブジェクトマッピングのタイプを指定します。

ルールの構造は次のとおりです。

{ "rules": [ { "rule-type": "object-mapping", "rule-id": "id", "rule-name": "name", "rule-action": "valid object-mapping rule action", "object-locator": { "schema-name": "case-sensitive schema name", "table-name": "" } } ] }

AWS DMS では現在、rule-action パラメータに対する有効な値として map-record-to-record および map-record-to-document のみがサポートされています。map-record-to-record および map-record-to-document の値は、exclude-columns 属性リストの一部として除外されてないものとして AWS DMS がデフォルトで記録するものを指定します。これらの値は、どのような方法でも属性マッピングに影響しません。

リレーショナルデータベースから Kafka トピックに移行する際に map-record-to-record を使用します。このルールタイプでは、Kafka トピックのパーティションキーとしてリレーショナルデータベースから taskResourceId.schemaName.tableName 値を使用し、ソースデータベース内の各列の属性を作成します。map-record-to-recordを使用する場合、exclude-columns属性リストに示されていないソーステーブル内のすべての列について、 AWS DMSはターゲットトピックに対応する属性を作成します。この対応する属性は、そのソース列が属性マッピングで使用されているかどうかにかかわらず作成されます。

map-record-to-record を理解するための 1 つの方法は、実際の動作を確認することです。この例では、次の構造とデータを含むリレーショナルデータベースのテーブルの行から始めると想定してください。

FirstName LastName StoreId HomeAddress HomePhone WorkAddress WorkPhone DateofBirth

Randy

Marsh

5

221B Baker Street

1234567890

31 Spooner Street, Quahog

9876543210

02/29/1988

この情報を Test という名前のスキーマから Kafka トピックに移行するには、データをターゲットストリームにマッピングするルールを作成します。以下のルールはマッピングを示しています。

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "rule-action": "include", "object-locator": { "schema-name": "Test", "table-name": "%" } }, { "rule-type": "object-mapping", "rule-id": "2", "rule-name": "DefaultMapToKafka", "rule-action": "map-record-to-record", "object-locator": { "schema-name": "Test", "table-name": "Customers" } } ] }

Kafka トピックとパーティションキー (この場合は taskResourceId.schemaName.tableName ) を指定すると、以下は Kafka ターゲットトピックのサンプルデータを使用した結果のレコード形式を示します。

{ "FirstName": "Randy", "LastName": "Marsh", "StoreId": "5", "HomeAddress": "221B Baker Street", "HomePhone": "1234567890", "WorkAddress": "31 Spooner Street, Quahog", "WorkPhone": "9876543210", "DateOfBirth": "02/29/1988" }

属性マッピングを使用したデータの再構築

属性マップを使用してデータを Kafka トピックに移行している間にデータを再構築できます。たとえば、ソース内の複数のフィールドを結合してターゲット内に 1 つのフィールドを構成することもできます。以下の属性マップはデータを再構築する方法を示しています。

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "rule-action": "include", "object-locator": { "schema-name": "Test", "table-name": "%" } }, { "rule-type": "object-mapping", "rule-id": "2", "rule-name": "TransformToKafka", "rule-action": "map-record-to-record", "target-table-name": "CustomerData", "object-locator": { "schema-name": "Test", "table-name": "Customers" }, "mapping-parameters": { "partition-key-type": "attribute-name", "partition-key-name": "CustomerName", "exclude-columns": [ "firstname", "lastname", "homeaddress", "homephone", "workaddress", "workphone" ], "attribute-mappings": [ { "target-attribute-name": "CustomerName", "attribute-type": "scalar", "attribute-sub-type": "string", "value": "${lastname}, ${firstname}" }, { "target-attribute-name": "ContactDetails", "attribute-type": "document", "attribute-sub-type": "json", "value": { "Home": { "Address": "${homeaddress}", "Phone": "${homephone}" }, "Work": { "Address": "${workaddress}", "Phone": "${workphone}" } } } ] } } ] }

partition-key の定数値を設定するには、partition-key 値を指定します。たとえば、すべてのデータを 1 つのパーティションに強制的に格納するためにこれを行うことができます。以下のマッピングはこの方法を示しています。

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "object-locator": { "schema-name": "Test", "table-name": "%" }, "rule-action": "include" }, { "rule-type": "object-mapping", "rule-id": "1", "rule-name": "TransformToKafka", "rule-action": "map-record-to-document", "object-locator": { "schema-name": "Test", "table-name": "Customer" }, "mapping-parameters": { "partition-key": { "value": "ConstantPartitionKey" }, "exclude-columns": [ "FirstName", "LastName", "HomeAddress", "HomePhone", "WorkAddress", "WorkPhone" ], "attribute-mappings": [ { "attribute-name": "CustomerName", "value": "${FirstName},${LastName}" }, { "attribute-name": "ContactDetails", "value": { "Home": { "Address": "${HomeAddress}", "Phone": "${HomePhone}" }, "Work": { "Address": "${WorkAddress}", "Phone": "${WorkPhone}" } } }, { "attribute-name": "DateOfBirth", "value": "${DateOfBirth}" } ] } } ] }
注記

特定のテーブル用のコントロールレコードpartition-keyの値は、 ですTaskId.SchemaName.TableName。 特定のタスク用のコントロールレコードpartition-keyの値は、そのレコードの TaskId です。 オブジェクトマッピングにpartition-key値を指定しても、コントロールレコードの には影響しません。partition-key

Apache Kafka のメッセージ形式

JSON 出力は、単にキーと値のペアのリストです。

RecordType

レコードタイプはデータまたはコントロールのいずれかです。データレコードは、ソースの実際の行を表します。コントロールレコードは、タスクの再起動など、ストリーム内の重要なイベント用です。

オペレーション

データレコードの場合、オペレーションは createreadupdate、または delete です。

コントロールレコードの場合、オペレーションは TruncateTable または DropTable です。

SchemaName

レコードのソーススキーマ。コントロールレコードの場合、このフィールドは空です。

TableName

レコードのソーステーブル。コントロールレコードの場合、このフィールドは空です。

タイムスタンプ

JSON メッセージが構築された時刻のタイムスタンプ。このフィールドは ISO 8601 形式でフォーマットされます。