

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# MSK Connect を理解する
<a name="msk-connect"></a>

MSK Connect は Amazon MSK の機能であり、デベロッパーが Apache Kafka クラスターとの間でデータを簡単にストリーミングできるようにします。MSK Connect は、Apache Kafka クラスターをデータベース、検索インデックス、ファイルシステムなどの外部システムに接続するためのオープンソースフレームワークである Kafka Connect 2.7.1 または、3.7.x,を使用します。MSK Connect を使用すると、Kafka Connect 用に構築されたフルマネージドコネクタをデプロイして、Amazon S3 や Amazon OpenSearch Service などの一般的なデータストアにデータを移動したりデータストアからデータをプルしたりできます。データベースから Apache Kafka クラスターに変更ログをストリーミングするために Debezium などのサードパーティによって開発されたコネクタをデプロイするか、コードを変更せずに既存のコネクタをデプロイできます。コネクタは、ロードの変化に合わせてオートスケーリングされ、使用したリソースに対してのみ料金が発生します。

ソースコネクタを使用して、外部システムからトピックにデータをインポートします。シンクコネクタを使用すると、トピックから外部システムにデータをエクスポートできます。

MSK Connect は、MSK クラスターであろうと独立してホストされている Apache Kafka クラスターであろうと、Amazon VPC に接続できる Apache Kafka クラスターのコネクタをサポートします。

MSK Connect は、コネクタのヘルスと配信状態を継続的に監視し、基盤となるハードウェアにパッチを適用して管理し、スループットの変化に合わせてコネクタをオートスケーリングします。

MSK Connect の使用を開始するには、「[MSK Connect の使用開始](msk-connect-getting-started.md)」を参照してください。

MSK Connect で作成できる AWS リソースについては、「」、[カスタムプラグインの作成](msk-connect-plugins.md)「」、および[コネクタを理解する](msk-connect-connectors.md)「」を参照してください[MSK Connect ワーカーを理解する](msk-connect-workers.md)。

MSK Connect API の詳細については、「[Amazon MSK Connect API リファレンス](https://docs.aws.amazon.com/MSKC/latest/mskc/Welcome.html)」を参照してください。

## Amazon MSK Connect を使用する利点
<a name="msk-connect-benefits"></a>

Apache Kafka は、リアルタイムデータストリームの取り込みと処理に最も広く採用されているオープンソースストリーミングプラットフォームの 1 つです。Apache Kafka を使用すると、データ生成アプリケーションとデータ消費アプリケーションを分離して個別にスケーリングできます。

Kafka Connect は、Apache Kafka を使用してストリーミングアプリケーションを構築および実行する上で重要なコンポーネントです。Kafka Connect は、Kafka と外部システム間でデータを移動する標準化された方法を提供します。Kafka Connect はスケーラビリティが高く、大量のデータを処理できます。Kafka Connect は、Kafka トピックと外部システム間でデータを移動するコネクタを設定、デプロイ、およびモニタリングするための強力な API オペレーションとツールのセットを提供します。これらのツールを使用して、ストリーミングアプリケーションの特定のニーズを満たすように Kafka Connect の機能をカスタマイズおよび拡張できます。

Apache Kafka Connect クラスターを単独で運用している場合や、オープンソースの Apache Kafka Connect アプリケーションを AWSに移行しようとしている場合に、課題に遭遇する可能性があります。これらの課題には、インフラストラクチャのセットアップとアプリケーションのデプロイに必要な時間、セルフマネージド型の Apache Kafka Connect クラスターを設定する際のエンジニアリング上の障害、管理運用上のオーバーヘッドが含まれます。

これらの課題に対処するために、Amazon Managed Streaming for Apache Kafka Connect (Amazon MSK Connect) を使用して、オープンソースの Apache Kafka Connect アプリケーションを AWSに移行することをお勧めします。Amazon MSK Connect は、Kafka Connect を使用することで、Apache Kafka クラスターと外部システム (データベース、検索インデックス、ファイルシステムなど) との間のデータのストリーミングを簡素化します。

Amazon MSK Connect に移行する利点をいくつかご紹介します。
+ **運用上のオーバーヘッドの排除** – Amazon MSK Connect は、Apache Kafka Connect クラスターのパッチ適用、プロビジョニング、およびスケーリングに関連する運用上の負担を軽減します。Amazon MSK Connect は、Connect クラスターの状態を継続的にモニタリングし、ワークロードの中断を引き起こすことなく、パッチ適用とバージョンアップグレードを自動化します。
+ **Connect タスクの自動再起動** – Amazon MSK Connect は、失敗したタスクを自動的に復旧するため、本番環境の中断を減らすことができます。タスクの失敗は、Kafka の TCP 接続制限に違反するなどの一時的なエラーや、新しいワーカーがシンクコネクタのコンシューマーグループに参加したときのタスクのリバランス実行によって発生する可能性があります。
+ **自動水平スケーリングおよび垂直スケーリング** – Amazon MSK Connect により、コネクタアプリケーションが自動的にスケーリングして、より高いスループットをサポートできるようになります。Amazon MSK Connect はユーザーに代わってスケーリングを管理します。ユーザーは、自動スケーリンググループのワーカー数と使用率のしきい値のみを指定する必要があります。Amazon MSK Connect `UpdateConnector` API オペレーションを使用して、vCPU を 1～8 個の範囲で垂直方向にスケールアップまたはスケールダウンすることで、可変スループットのサポートが可能になります。
+ **プライベートネットワーク接続** — Amazon MSK Connect は、 AWS PrivateLink とプライベート DNS 名を使用してソースシステムとシンクシステムにプライベートに接続します。

# MSK Connect の使用開始
<a name="msk-connect-getting-started"></a>

これは、 AWS マネジメントコンソール を使用して MSK クラスターを作成し、クラスターから S3 バケットにデータを送信するシンクコネクタを作成するstep-by-stepチュートリアルです。

**Topics**
+ [MSK Connect に必要なリソースを設定する](mkc-tutorial-setup.md)
+ [カスタムプラグインを作成する](mkc-create-plugin.md)
+ [クライアントマシンと Apache Kafka トピックを作成する](mkc-create-topic.md)
+ [コネクタの作成](mkc-create-connector.md)
+ [MSK クラスターにデータを送信する](mkc-send-data.md)

# MSK Connect に必要なリソースを設定する
<a name="mkc-tutorial-setup"></a>

このステップでは、この入門シナリオに必要な次のリソースを作成します。
+ Amazon S3 バケットは、コネクタからデータを受信する宛先として機能します。
+ データの送信先となる MSK クラスター。次に、コネクタはこのクラスターからデータを読み取り、宛先の S3 バケットに送信します。
+ 送信先の S3 バケットへの書き込むアクセス許可を含む IAM ポリシー。
+ コネクタが宛先 S3 バケットに書き込むことを可能にする IAM ロール。作成した IAM ポリシーを、このロールに追加します。
+ クラスターとコネクタを備えた Amazon VPC から Amazon S3 にデータを送信できるようにする Amazon VPC エンドポイント。

**S3 バケットを作成するには**

1. にサインイン AWS マネジメントコンソール し、[https://console.aws.amazon.com/s3/](https://console.aws.amazon.com/s3/) で Amazon S3 コンソールを開きます。

1. **[バケットを作成]** を選択します。

1. バケットの名前には、`amzn-s3-demo-bucket-mkc-tutorial` などのわかりやすい名前を入力します。

1. 下にスクロールして、**Create bucket** (バケットの作成) を選択します。

1. バケットのリストで、新しく作成されたバケットを選択します。

1. **Create folder** (フォルダの作成) を選択します。

1. フォルダの名前として `tutorial` と入力し、下にスクロールして **Create folder** (フォルダの作成) を選択します。

**クラスターを作成するには**

1. [https://console.aws.amazon.com/msk/home?region=us-east-1\$1/home/](https://console.aws.amazon.com/msk/home?region=us-east-1#/home/) で Amazon MSK コンソールを開きます。

1. 左側のペインの **MSK Clusters** (MSK クラスター) で、**Clusters** (クラスター) を選択します。

1. **Create cluster** (クラスターの作成) を選択します。

1. **[作成方法]** では、**[カスタム作成]** を選択してください。

1. クラスター名には **mkc-tutorial-cluster** と入力します。

1. **クラスタータイプ** では、 **プロビジョンド** を選択します。

1. [**次へ**] を選択します。

1. **[ネットワーク]** で Amazon VPC を選択します。次に、使用するアベイラビリティーゾーンとサブネットを選択します。このチュートリアルの後半で必要になるため、選択した Amazon VPC とサブネットの ID を覚えておいてください。

1. [**次へ**] を選択します。

1. **Access control methods** (アクセス制御方法) で、**Unauthenticated access** (認証されていないアクセス) のみが選択されていることを確認します。

1. **Encryption** (暗号化) で、**Plaintext** (プレーンテキスト) のみが選択されていることを確認します。

1. ウィザードを続行し、**[クラスターの作成]** を選択します。そのクラスターの詳細ページが表示されます。そのページで、**[適用されたセキュリティグループ]** の下のセキュリティグループ ID を探します。このチュートリアルの後半で必要になるため、その ID を覚えておいてください。

**S3 バケットへの書き込み権限を持つ IAM ポリシーを作成する**

1. IAM コンソール ([https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/)) を開きます。

1. ナビゲーションペインで [**Policies**] (ポリシー) を選択します。

1. [**Create policy**] (ポリシーの作成) を選択します。

1. **ポリシーエディター**で、**JSON** タブを選択し、エディタウィンドウの JSON を次の JSON に置き換えます。

   次の例では、*<amzn-s3-demo-bucket-my-tutorial>* を自身の S3 バケット名に置き換えてください。

------
#### [ JSON ]

****  

   ```
   {
     "Version":"2012-10-17",		 	 	 
     "Statement": [
       {
         "Sid": "AllowListBucket",
         "Effect": "Allow",
         "Action": [
           "s3:ListBucket",
           "s3:GetBucketLocation"
         ],
         "Resource": "arn:aws:s3:::<amzn-s3-demo-bucket-my-tutorial>"
       },
       {
         "Sid": "AllowObjectActions",
         "Effect": "Allow",
         "Action": [
           "s3:PutObject",
           "s3:GetObject",
           "s3:DeleteObject",
           "s3:AbortMultipartUpload",
           "s3:ListMultipartUploadParts",
           "s3:ListBucketMultipartUploads"
         ],
         "Resource": "arn:aws:s3:::<amzn-s3-demo-bucket-my-tutorial>/*"
       }
     ]
   }
   ```

------

   安全なポリシーの記述方法については、「[IAM アクセスコントロール](iam-access-control.md)」を参照してください。

1. [**次へ**] を選択します。

1. **[レビューと作成]** ページで、以下の操作を実行します。

   1. **ポリシー名**にわかりやすい名前 (**mkc-tutorial-policy** など) を入力します。

   1. **このポリシーで定義されているアクセス許可**で、ポリシーで定義されたアクセス許可を確認および/または編集します。

   1. (オプション) ポリシーの識別、整理、検索を簡単にするには、キーと値のペアとして**新規タグを追加**します。たとえば、**Environment** と **Test** のキーと値のペアを使用してポリシーにタグを追加します。

      タグの使用の詳細については、*IAM ユーザーガイド*の[「 AWS Identity and Access Management リソースのタグ](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_tags.html)」を参照してください。

1. [**Create policy**] (ポリシーの作成) を選択します。

**送信先バケットに書き込みことができる IAM ロールを作成するには**

1. IAM コンソールのナビゲーション ペインで、**ロール** を選択し、次に **ロールの作成** を選択します。

1. **[信頼されたエンティティを選択]** ページで、以下の操作を実行してください。

   1. **信頼できるエンティティタイプ** で、**AWS のサービス** を選択します。

   1. ［**サービスまたはユースケース**］で、［**S3**］を選択します。

   1. [**Use case**] (ユースケース) で、[**S3**] を選択します。

1. [**次へ**] を選択します。

1. **[アクセス許可を追加]** ページで、以下を実行します。

   1. **権限ポリシー**の下にある検索ボックスに、このチュートリアル用に以前に作成したポリシーの名前を入力します。例えば、**mkc-tutorial-policy** などです。次に、ポリシー名の左側にある、チェックボックスを選択してください。

   1. (オプション) [アクセス許可の境界](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_boundaries.html)を設定します。このアドバンスド機能は、サービスロールで使用できますが、サービスにリンクされたロールではありません。アクセス許可の境界の設定については、*IAM ユーザーガイド*の「[ロールの作成とポリシーのアタッチ (コンソール)](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_job-functions_create-policies.html)」を参照してください。

1. [**次へ**] を選択します。

1. **[名前を付けて、レビューし、作成する]** ページで、以下の操作を実行します。

   1. **ロール名**に、わかりやすい名前 (**mkc-tutorial-role** など) を入力します。
**重要**  
ロールに名前を付けるときは、次のことに注意してください。  
ロール名は 内で一意である必要があり AWS アカウント、大文字と小文字を区別することはできません。  
例えば、**PRODROLE** と **prodrole** の両方の名前でロールを作成することはできません。ロール名がポリシーまたは ARN の一部として使用される場合、ロール名は大文字と小文字が区別されます。ただし、サインインプロセスなど、コンソールにロール名がユーザーに表示される場合、ロール名は大文字と小文字が区別されません。
他のエンティティがロールを参照する可能性があるため、ロールを作成した後にロール名を編集することはできません。

   1. (オプション) **[説明]** にロールの説明を入力します。

   1. (オプション) ロールの使用事例とアクセス許可を編集するには、[**ステップ 1: 信頼されたエンティティを選択**] または [**ステップ 2: アクセス権限を追加**] のセクションで [**編集**] を選択します。

   1. (オプション) ロールの識別、整理、検索を簡単にするには、キーと値のペアとして**新規タグを追加**します。たとえば、**ProductManager** と **John** のキーと値のペアを使用してロールにタグを追加します。

      タグの使用の詳細については、*IAM ユーザーガイド*の[「 AWS Identity and Access Management リソースのタグ](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_tags.html)」を参照してください。

1. ロールを確認したら、**[ロールを作成]** を選択します。

**MSK Connect がそのロールを引き受けることができるようにするには**

1. IAM コンソールの左側のペインの**Access management** (アクセス管理) で、**Roles** (ロール) を選択します。

1. `mkc-tutorial-role` を見つけて選択します。

1. ロールの **Summary** (概要) で、**Trust relationships** (信頼関係) タブを選択します。

1. **Edit trust relationship** (信頼関係の編集) を選択します。

1. 既存の信頼ポリシーを次の JSON に置き換えます。

------
#### [ JSON ]

****  

   ```
   {
     "Version":"2012-10-17",		 	 	 
     "Statement": [
       {
         "Effect": "Allow",
         "Principal": {
           "Service": "kafkaconnect.amazonaws.com"
         },
         "Action": "sts:AssumeRole"
       }
     ]
   }
   ```

------

1. **信頼ポリシーの更新** を選択します。

**クラスターの VPC から Amazon S3 への Amazon VPC エンドポイントを作成するには**

1.  Amazon VPC コンソール[https://console.aws.amazon.com/vpc/](https://console.aws.amazon.com/vpc/)を開きます。

1. 左側のペインで、**Endpoints** (エンドポイント) を選択します。

1. **Create endpoint** (エンドポイントの作成) を選択します。

1. **Service Name** (サービス名) で、**com.amazonaws.us-east-1.s3** サービスと**Gateway** (ゲートウェイ) タイプを選択します。

1. クラスターの VPC を選択してから、クラスターのサブネットに関連付けられているルートテーブルの左側にあるボックスを選択します。

1. **Create endpoint** (エンドポイントの作成) を選択します。

**次のステップ**

[カスタムプラグインを作成する](mkc-create-plugin.md)

# カスタムプラグインを作成する
<a name="mkc-create-plugin"></a>

プラグインには、コネクタのロジックを定義するコードが含まれています。このステップでは、Lenses Amazon S3 Sink Connector のコードを含むカスタムプラグインを作成します。後のステップで、MSK コネクタを作成するときに、そのコードがこのカスタム プラグインにあることを指定します。同じプラグインを使用して、構成が異なる複数の MSK コネクタを作成できます。

**カスタムプラグインを作成するには**

1. [S3 コネクタ](https://www.confluent.io/hub/confluentinc/kafka-connect-s3)をダウンロードします。

1. アクセスできる S3 バケットに ZIP ファイルをアップロードします。Amazon S3 にファイルをアップロードする方法については、Amazon S3 ユーザーガイドの[オブジェクトのアップロード](https://docs.aws.amazon.com/AmazonS3/latest/userguide/upload-objects.html)を参照してください。

1. [https://console.aws.amazon.com/msk/](https://console.aws.amazon.com/msk/) で Amazon MSK コンソールを開きます。

1. 左側のペインで **MSK Connect** を展開し、**Custom plugins** (カスタムプラグイン) を選択します。

1. **Create custom plugin** (カスタムプラグインの作成) を選択します。

1. **Browse S3** (S3 の参照) を選択します。

1. バケットのリストで、ZIP ファイルをアップロードしたバケットを見つけて、そのバケットを選択します。

1. バケット内のオブジェクトのリストで、ZIP ファイルの左側にあるラジオボタンを選択し、**選択** というラベル付けたボタンを選択します。

1. カスタムプラグイン名に `mkc-tutorial-plugin` と入力し、**Create custom plugin** (カスタムプラグインの作成) を選択します。

カスタムプラグインの作成が完了するまで AWS に数分かかる場合があります。作成プロセスが完了すると、ブラウザウィンドウの上部にあるバナーに次のメッセージが表示されます。

```
Custom plugin mkc-tutorial-plugin was successfully created
The custom plugin was created. You can now create a connector using this custom plugin.
```

**次のステップ**

[クライアントマシンと Apache Kafka トピックを作成する](mkc-create-topic.md)

# クライアントマシンと Apache Kafka トピックを作成する
<a name="mkc-create-topic"></a>

このステップでは、Apache Kafka クライアントインスタンスとして使用する Amazon EC2 インスタンスを作成します。次に、このインスタンスを使用して、クラスター上にトピックを作成します。

**クライアントマシンを作成するには**

1. Amazon EC2 コンソールの [https://console.aws.amazon.com/ec2/](https://console.aws.amazon.com/ec2/) を開いてください。

1. **[Launch Instances]** (インスタンスの起動) を選択してください。

1. クライアントマシンの **[名前]** (**mkc-tutorial-client** など) を入力します。

1. **[Amazon マシンイメージ (AMI) のタイプ]** については、**[Amazon Linux 2 AMI (HVM) - カーネル 5.10、SSD ボリューム タイプ]** を選択したままにします。

1. **[t2.xlarge]** インスタンスタイプを選択します。

1. **[キーペア (ログイン)]** で、**[新しいキーペアの作成]** を選択します。**[キーペア名]** に **mkc-tutorial-key-pair** を入力し、**[キーペアのダウンロード]** を選択します。既存のキーペアを使用することもできます。

1. **[インスタンスを起動]** を選択します。

1. [**インスタンスの表示**] を選択します。次に、**[セキュリティグループ]** 列で、新しいインスタンスに関連付けられているセキュリティグループを選択します。セキュリティグループの ID をコピーし、後で使用できるように保存します。

**新しく作成されたクライアントがクラスターにデータを送信するのを許可するには**

1. Amazon VPC コンソール[https://console.aws.amazon.com/vpc/](https://console.aws.amazon.com/vpc/)を開きます。

1. 左側のペインの**SECURITY** (セキュリティ) で、**Security Groups** (セキュリティグループ) を選択します。**Security group ID** (セキュリティグループID) 列で、クラスターのセキュリティグループを見つけます。[MSK Connect に必要なリソースを設定する](mkc-tutorial-setup.md) でクラスターを作成したときに、このセキュリティグループのIDを保存しました。行の左側にあるボックスを選択して、このセキュリティグループを選択します。他のセキュリティグループが同時に選択されていないことを確認してください。

1. 画面の下半分で、**Inbound rules** (インバウンドルール) タブを選択します。

1. **Edit inbound rules** (インバウンドルールの編集) を選択します。

1. 画面の左下で、**Add rule** (ルールの追加) を選択します。

1. 新しいルールで、**Type** (タイプ) 列の **All traffic** (すべてのトラフィック) を選択します。**[ソース]** 列の右側のフィールドに、クライアントマシンのセキュリティグループの ID を入力します。これは、クライアントマシンを作成した後に保存したセキュリティグループ ID です。

1. **Save rules** (ルールの保存) を選択します。これで、MSK クラスターは、前の手順で作成したクライアントからのすべてのトラフィックを受け入れます。

**トピックを作成する**

1. Amazon EC2 コンソール[https://console.aws.amazon.com/ec2/](https://console.aws.amazon.com/ec2/)を開きます。

1. インスタンスのテーブルで `mkc-tutorial-client` を選択します。

1. 画面上部の**Connect** (接続) を選択し、指示に従ってインスタンスに接続します。

1. 次のコマンドを実行して、クライアントインスタンスに Java をインストールします。

   ```
   sudo yum install java-1.8.0
   ```

1. 次のコマンドを実行して、Apache Kafka をダウンロードします。

   ```
   wget https://archive.apache.org/dist/kafka/2.2.1/kafka_2.12-2.2.1.tgz
   ```
**注記**  
このコマンドで使用されているもの以外のミラーサイトを使用する場合は、[Apache](https://www.apache.org/dyn/closer.cgi?path=/kafka/1.1.1/kafka_2.11-1.1.1.tgz) ウェブサイトで別のサイトを選択できます。

1. 前のステップで TAR ファイルをダウンロードしたディレクトリで次のコマンドを実行します。

   ```
   tar -xzf kafka_2.12-2.2.1.tgz
   ```

1. **kafka\$12.12-2.2.1** ディレクトリに移動します。

1. [https://console.aws.amazon.com/msk/home?region=us-east-1\$1/home/](https://console.aws.amazon.com/msk/home?region=us-east-1#/home/) で Amazon MSK コンソールを開きます。

1. 左側のペインで **Clusters** (クラスター) を選択してから、`mkc-tutorial-cluster` という名前を選択します。

1. **View client information** (ライアント情報の表示) を選択します。

1. **プレーンテキスト**の接続文字列をコピーします。

1. **[Done]** (完了) をクリックします。

1. クライアントインスタンス (`mkc-tutorial-client`) で次のコマンドを実行し、*bootstrapServerString* を、クラスターのクライアント情報を表示したときに保存した値に置き換えます。

   ```
   <path-to-your-kafka-installation>/bin/kafka-topics.sh --create --bootstrap-server bootstrapServerString --replication-factor 2 --partitions 1 --topic mkc-tutorial-topic
   ```

   コマンドが成功すると、次のメッセージが表示されます : `Created topic mkc-tutorial-topic.`

**次のステップ**

[コネクタの作成](mkc-create-connector.md)

# コネクタの作成
<a name="mkc-create-connector"></a>

この手順では、 AWS マネジメントコンソールを使用してコネクタを作成する方法について説明します。

**コネクタを作成するには**

1. にサインインし AWS マネジメントコンソール、[https://console.aws.amazon.com/msk/home?region=us-east-1\$1/home/](https://console.aws.amazon.com/msk/home?region=us-east-1#/home/) で Amazon MSK コンソールを開きます。

1. 左側のペインで、**MSK Connect** を展開し、**Connectors** (コネクタ) を選択します。

1. **Create connector** (コネクタの作成) を選択します。

1. プラグインのリストで、`mkc-tutorial-plugin` を選択し、**Next** を選択します。

1. コネクタ名に `mkc-tutorial-connector` と入力します。

1. クラスターのリストから、`mkc-tutorial-cluster` を選択します。

1. **コネクタネットワーク設定**セクションで、ネットワークタイプに次のいずれかを選択します。
   + **IPv4** (デフォルト) - IPv4 経由の送信先への接続のみ
   + **デュアルスタック -** IPv4 と IPv6 の両方を介した送信先への接続用 (サブネットに IPv4 と IPv6 CIDR ブロックが関連付けられている場合にのみ使用可能)

1. 次の構成をコピーして、コネクタ構成フィールドに貼り付けます。

   region は、コネクタ AWS リージョン を作成する のコードに置き換えてください。同様に、Amazon S3 バケット名 *<amzn-s3-demo-bucket-my-tutorial>* を次の例のバケット名に置き換えます。

   ```
   connector.class=io.confluent.connect.s3.S3SinkConnector
   s3.region=us-east-1
   format.class=io.confluent.connect.s3.format.json.JsonFormat
   flush.size=1
   schema.compatibility=NONE
   tasks.max=2
   topics=mkc-tutorial-topic
   partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner
   storage.class=io.confluent.connect.s3.storage.S3Storage
   s3.bucket.name=<amzn-s3-demo-bucket-my-tutorial>
   topics.dir=tutorial
   ```

1. **Access permissions** (アクセス許可) で `mkc-tutorial-role` を選択します。

1. **Next** (次へ) を選択します。**Security** (セキュリティ) ページで、もう一度 **Next** (次へ) を選択します。

1. **Logs** (ログ) ページで、**Next** (次へ) を選択します。

1. **確認と作成**ページで、コネクタ設定を確認し、**コネクタの作成**を選択します。

**次のステップ**

[MSK クラスターにデータを送信する](mkc-send-data.md)

# MSK クラスターにデータを送信する
<a name="mkc-send-data"></a>

このステップでは、前に作成した Apache Kafka トピックにデータを送信し、宛先 S3 バケットで同じデータを探します。

**MSK クラスターにデータを送信するには**

1. クライアントインスタンスの Apache Kafka インストールの `bin` フォルダに、次の内容の `client.properties` という名前のテキストファイルを作成します。

   ```
   security.protocol=SASL_SSL
   sasl.mechanism=AWS_MSK_IAM
   ```

1. 次のコマンドを実行して、コンソールプロデューサーを作成します。*BootstrapBrokerString* を前のコマンドを実行したときに取得した値に置き換えます。

   ```
   <path-to-your-kafka-installation>/bin/kafka-console-producer.sh --broker-list BootstrapBrokerString --producer.config client.properties --topic mkc-tutorial-topic
   ```

1. 必要なメッセージを入力して、**Enter** キーを押します。このステップを 2、3 回繰り返します。行を入力して **Enter** キーを押すたびに、その行は個別のメッセージとして Apache Kafka クラスターに送信されます。

1. 宛先の Amazon S3 バケットを調べて、前のステップで送信したメッセージを見つけます。

# コネクタを理解する
<a name="msk-connect-connectors"></a>

コネクタは、ストリーミングデータをデータソースから Apache Kafka クラスターに継続的にコピーするか、クラスターからデータシンクにデータを継続的にコピーすることにより、外部システムと Amazon サービスを Apache Kafka と統合します。コネクタは、データを宛先に配信する前に、変換、フォーマット変換、データのフィルタリングなどの軽量ロジックを実行することもできます。ソースコネクタはデータソースからデータをプルしてこのデータをクラスターにプッシュし、シンクコネクタはクラスターからデータをプルしてこのデータをデータシンクにプッシュします。

次の図表は、コネクタのアーキテクチャを示しています。ワーカーは、コネクタロジックを実行する Java 仮想マシン (JVM) プロセスです。各ワーカーは、並列スレッドで実行され、データをコピーする作業を行う一連のタスクを作成します。タスクは状態を保存しないため、復元力のあるスケーラブルな Data Pipeline を提供するために、いつでも開始、停止、または再開できます。

![\[コネクタクラスターのアーキテクチャを示す図表。\]](http://docs.aws.amazon.com/ja_jp/msk/latest/developerguide/images/mkc-worker-architecture.png)


# コネクタ容量を理解する
<a name="msk-connect-capacity"></a>

コネクタの総容量は、コネクタが持つワーカーの数、およびワーカーごとの MSK 接続ユニット (MCU) の数によって異なります。各 MCU は、1 vCPU のコンピューティングと 4 GiB のメモリを表します。MCU メモリは、使用中のヒープメモリではなく、ワーカーインスタンスの合計メモリに関係します。

MSK Connect ワーカーは、カスタマー提供のサブネット内の IP アドレスを消費します。各ワーカーは、カスタマー提供のサブネットの 1 つに含まれている 1 つの IP アドレスを使用します。CreateConnector リクエストに応答して提供されたサブネットに使用可能な IP アドレスが十分にあり、指定した容量を満たしていることを確認する必要があります。特に、ワーカー数が変動する可能性のあるコネクタを自動スケーリングする場合は注意が必要です。

コネクタを作成するには、次の 2 つの容量モードのいずれかを選択する必要があります。
+ プロビジョニング済み: コネクタの容量要件がわかっている場合は、このモードを選択します。次の 2 つの値を指定します。
  + ワーカー数
  + ワーカーあたりの MCU の数。
+ オートスケーリング: コネクタの容量要件が可変である場合、またはアドバンスにそれらを知らない場合は、このモードを選択します。自動スケーリングモードを使用すると、Amazon MSK Connect はコネクタの `tasks.max` プロパティを、コネクタで実行されているワーカーの数とワーカーあたりの MCU の数に比例した値で上書きします。

  次の 3 つの値のセットを指定します。
  + ワーカーの最小数と最大数。
  + CPU 使用率のスケールインおよびスケールアウトのパーセンテージ。これは、`CpuUtilization` メトリクスによって決定されます。コネクタの `CpuUtilization` メトリクスがスケールアウトのパーセンテージを超えると、MSK Connect はコネクタで実行されているワーカーの数を増やします。`CpuUtilization` メトリクスがスケールインのパーセンテージを下回ると、MSK Connect はワーカーの数を減らします。ワーカーの数は、コネクタの作成時に指定した最小数と最大数の範囲内に常に留まります。
  + ワーカーあたりの MCU の数。
  + (オプション) *Auto Scaling タスクの最大数* - Auto Scaling オペレーション中にコネクタに割り当てられたタスクの最大数。このパラメータを使用すると、タスク作成の上限を設定できるため、Kafka トピックパーティションに関連するリソース使用率と並列処理をより詳細に制御できます。

ワーカーの詳細については、「」を参照してください。最大自動スケーリングタスク数[MSK Connect ワーカーを理解する](msk-connect-workers.md)の詳細については、「」を参照してください[Auto Scaling タスクの最大数を理解する](msk-connect-max-autoscaling-task-count.md)。MSK Connect メトリクスについては、「[Amazon MSK Connect のモニタリング](mkc-monitoring-overview.md)」を参照してください。

# Auto Scaling タスクの最大数を理解する
<a name="msk-connect-max-autoscaling-task-count"></a>

`maxAutoscalingTaskCount` パラメータは、Amazon MSK Connect でコネクタの自動スケーリングに使用できるオプションの容量フィールドです。このパラメータを使用すると、コネクタの自動スケーリングオペレーション中に作成できるタスクの最大数の上限を設定できるため、リソースの使用率とパフォーマンスをより細かく制御できます。

自動スケーリングキャパシティモードを使用すると、Amazon MSK Connect はコネクタの `tasks.max`プロパティを、ワーカーあたりのワーカー数と MCUs 数に比例した値で自動的に上書きします。`maxAutoscalingTaskCount` パラメータは、コネクタ用に作成されたタスクの最大数を制限するための追加の設定可能なオプションを提供します。

この機能は、Kafka クラスター内のトピックパーティションの数に関連して並列処理のレベルを制御する場合に特に便利です。この制限を設定することで、パフォーマンスを最適化し、自動的に計算されたタスク数がワークロード要件を超えた場合に発生する可能性のある非効率的なタスク分散を防ぐことができます。

## 設定要件
<a name="msk-connect-max-autoscaling-task-count-requirements"></a>

`maxAutoscalingTaskCount` パラメータは、次の要件を満たしている必要があります。

```
maxAutoscalingTaskCount ≥ maxWorkerCount
```

この要件により、ワーカーごとに少なくとも 1 つのタスクを維持することで、効率的なリソース使用率が確保されます。コネクタの機能を最適化するために、システムはこの最小値を適用します。

を指定すると`maxAutoscalingTaskCount`、コネクタの作成直後とそれ以降のすべてのスケーリングイベント中に制限が適用されます。Auto Scaling オペレーション中にワーカー数が増減するにつれて、システムは引き続きこの制限を尊重します。この`tasks.max`値は、ワーカーあたりのワーカー数と MCUsの数に比例して調整されますが、設定された`maxAutoscalingTaskCount`値を超えることはありません。

このパラメータを指定しない場合、コネクタは制限なしで標準計算を使用します `tasks.max = workerCount × mcuCount × tasksPerMcu` ( tasksPerMcu は 2)。

## maxAutoscalingTaskCount を使用するタイミング
<a name="msk-connect-max-autoscaling-task-count-when-to-use"></a>

以下のシナリオ`maxAutoscalingTaskCount`では、 の使用を検討してください。
+ *パーティション数の制限*: Kafka トピックのパーティション数が自動計算されたタスク数よりも少ない場合、制限を設定すると、実行する作業がないアイドルタスクの作成が防止されます。
+ *パフォーマンスの最適化*: 特定のタスク数がワークロードに最適なスループットを提供することを確認したら、最大タスク数を制限して一貫したパフォーマンスを維持できます。
+ *リソース管理*: 実行中のワーカー数に関係なく、コネクタの最大並列処理とリソース消費量を制御する場合。

## 例
<a name="msk-connect-max-autoscaling-task-count-example"></a>

次の設定のコネクタの場合:

```
minWorkerCount: 1
maxWorkerCount: 4
mcuCount: 8
maxAutoscalingTaskCount: 15
```

がない場合`maxAutoscalingTaskCount`、4 人のワーカーにスケーリングすると、コネクタは 64 個のタスク (4 人のワーカー x 8 MCUs x MCU 1 MCU あたり 2 個のタスク) を作成します。を 15 `maxAutoscalingTaskCount`に設定すると、コネクタは 15 個のタスクのみを作成します。これは、Kafka トピックのパーティションが 15 個以下の場合に適切です。

# デュアルスタックネットワークタイプを設定する
<a name="msk-connect-dual-stack"></a>

Amazon MSK Connect は、新しいコネクタのデュアルスタックネットワークタイプをサポートしています。デュアルスタックネットワークを使用すると、コネクタは IPv4 と IPv6 の両方の宛先に接続できます。IPv6 接続はデュアルスタックモード (IPv4 \$1 IPv6) でのみ使用できることに注意してください。IPv6-onlyネットワークはサポートされていません。

デフォルトでは、新しいコネクタは IPv4 ネットワークタイプを使用します。デュアルスタックネットワークタイプのコネクタを作成するには、次のセクションで説明する前提条件を満たしていることを確認してください。デュアルスタックネットワークタイプを使用してコネクタを作成すると、そのネットワークタイプを変更できないことに注意してください。ネットワークタイプを変更するには、コネクタを削除して再作成する必要があります。

Amazon MSK Connect は、IPv6 と IPv4 の両方を介したサービス API エンドポイント接続もサポートしています。API コールに IPv6 接続を使用するには、デュアルスタックエンドポイントを使用する必要があります。MSK Connect サービスエンドポイントの詳細については、[「Amazon MSK Connect エンドポイントとクォータ](https://docs.aws.amazon.com/general/latest/gr/msk-connect.html)」を参照してください。

## デュアルスタックネットワークタイプを使用するための前提条件
<a name="dual-stack-prerequisites"></a>

コネクタにデュアルスタックネットワークタイプを設定する前に、コネクタの作成時に指定するすべてのサブネットに IPv6 と IPv4 の両方の CIDR ブロックが割り当てられていることを確認してください。

## デュアルスタックネットワークタイプの使用に関する考慮事項
<a name="dual-stack-considerations"></a>
+ IPv6 サポートは現在、IPv6 専用ではなく、デュアルスタックモード (IPv4 \$1 IPv6) でのみ使用できます。 IPv6-only
+ デュアルスタックが有効になっているコネクタは、IPv4 と IPv6 の両方を MSK とシンクまたはソースデータシステムに接続できます。
+ コネクタの作成後にネットワークタイプを変更することはできません。ネットワークタイプを変更するには、コネクタを削除して再作成する必要があります。
+ コネクタの作成時に指定されたすべてのサブネットは、デュアルスタックネットワークタイプでコネクタの作成を成功させるには、デュアルスタックをサポートしている必要があります。
+ デュアルスタックサブネットを使用していてもネットワークタイプが指定されていない場合、コネクタは下位互換性のためにデフォルトで IPv4-onlyになります。
+ 既存のコネクタの場合、ネットワークタイプを更新することはできません。ネットワークタイプを変更するには、コネクタを削除して再作成する必要があります。
+ デュアルスタックネットワークを使用しても追加コストは発生しません

# コネクタを作成する
<a name="mkc-create-connector-intro"></a>

この手順では、 AWS マネジメントコンソールを使用してコネクタを作成する方法について説明します。

**を使用したコネクタの作成 AWS マネジメントコンソール**

1. [https://console.aws.amazon.com/msk/](https://console.aws.amazon.com/msk/) でAmazon MSK コンソールを開きます。

1. 左側のペインの **MSK Connect** で、**Connectors** (コネクタ) を選択します。

1. **Create connector** (コネクタの作成) を選択します。

1. 既存のカスタムプラグインを使用してコネクタを作成するか、最初に新しいカスタムプラグインを作成するかを選択できます。カスタムプラグインとその作成方法については、「[カスタムプラグインの作成](msk-connect-plugins.md)」を参照してください。この手順では、使用するカスタムプラグインがあると仮定します。カスタムプラグインのリストで、使用するプラグインを見つけ、左側のボックスを選択して、**Next** (次へ) を選択します。

1. 名前と、オプションで説明を入力します。

1. 接続するクラスターを選択します。

1. **コネクタネットワーク設定**セクションで、ネットワークタイプとして次のいずれかを選択します。
   + **IPv4** (デフォルト) - IPv4 経由の送信先への接続のみ
   + **デュアルスタック -** IPv4 と IPv6 の両方を介した送信先への接続用 (サブネットに IPv4 と IPv6 CIDR ブロックが関連付けられている場合にのみ使用可能)

1. コネクタ構成を指定します。指定する必要のある構成パラメータは、作成するコネクタのタイプによって異なります。ただし、`connector.class` パラメータや`tasks.max` パラメータなど、一部のパラメータはすべてのコネクタに共通です。以下は、[Confluent Amazon S3 Sink Connector](https://www.confluent.io/hub/confluentinc/kafka-connect-s3)の設定例です。

   ```
   connector.class=io.confluent.connect.s3.S3SinkConnector
   tasks.max=2
   topics=my-example-topic
   s3.region=us-east-1
   s3.bucket.name=amzn-s3-demo-bucket
   flush.size=1
   storage.class=io.confluent.connect.s3.storage.S3Storage
   format.class=io.confluent.connect.s3.format.json.JsonFormat
   partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner
   key.converter=org.apache.kafka.connect.storage.StringConverter
   value.converter=org.apache.kafka.connect.storage.StringConverter
   schema.compatibility=NONE
   ```

1. 次に、コネクタ容量を設定します。プロビジョニングモードとオートスケーリングの 2 つの容量モードから選択できます。これら 2 つのオプションの詳細については、「[コネクタ容量を理解する](msk-connect-capacity.md)」を参照してください。

1. (オプション) **Maximum Autoscaling Task Count** セクションで、Maximum Autoscaling Task Count フィールドを使用して、Autoscaling オペレーション中にコネクタに割り当てるタスクの最大数を入力します。値は、少なくともワーカーの最大数と等しくなければなりません。値を指定しない場合、コネクタは制限なしで標準計算を使用します。詳細については、「[Auto Scaling タスクの最大数を理解する](msk-connect-max-autoscaling-task-count.md)」を参照してください。

1. デフォルトのワーカー構成またはカスタムワーカー構成のいずれかを選択します。カスタムワーカー構成の作成については、「[MSK Connect ワーカーを理解する](msk-connect-workers.md)」を参照してください。

1. 次に、サービス実行ロールを指定します。これは、MSK Connect が引き受けることができ、必要な AWS リソースにアクセスするために必要なすべてのアクセス許可をコネクタに付与する IAM ロールである必要があります。これらの権限は、コネクタのロジックによって異なります。このロールを作成する方法については、「[サービス実行ロールを理解する](msk-connect-service-execution-role.md)」を参照してください。

1. **Next** (次へ) を選択し、セキュリティ情報を確認してから、もう一度 **Next** (次へ) を選択します。

1. 必要なログオプションを指定し、**Next** (次へ) を選択します。ログ作成の詳細については、「[MSK Connect のロギング](msk-connect-logging.md)」を参照してください。

1. **確認と作成**ページで、コネクタ設定を確認し、**コネクタの作成**を選択します。

MSK Connect API を使用してコネクタを作成するには、[CreateConnector](https://docs.aws.amazon.com/MSKC/latest/mskc/API_CreateConnector.html)を参照してください。

`UpdateConnector` API を使用して コネクタ の設定を変更できます。詳細については、「[コネクタを更新する](mkc-update-connector.md)」を参照してください。

# コネクタを更新する
<a name="mkc-update-connector"></a>

この手順では、 AWS マネジメントコンソールを使用して既存の MSK Connect コネクタの設定を更新する方法について説明します。

**を使用したコネクタ設定の更新 AWS マネジメントコンソール**

1. [https://console.aws.amazon.com/msk/](https://console.aws.amazon.com/msk/) でAmazon MSK コンソールを開きます。

1. 左側のペインの **MSK Connect** で、**Connectors** (コネクタ) を選択します。

1. 既存のコネクタを選択します。

1. **コネクタ設定の編集**を選択します。

1. コネクタ設定を更新します。`connector.class` を UpdateConnector を使用して上書きすることはできません。次の例は、Confluent Amazon S3 Sink コネクタの設定例を示しています。

   ```
   connector.class=io.confluent.connect.s3.S3SinkConnector
   tasks.max=2
   topics=my-example-topic
   s3.region=us-east-1
   s3.bucket.name=amzn-s3-demo-bucket
   flush.size=1
   storage.class=io.confluent.connect.s3.storage.S3Storage
   format.class=io.confluent.connect.s3.format.json.JsonFormat
   partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner
   key.converter=org.apache.kafka.connect.storage.StringConverter
   value.converter=org.apache.kafka.connect.storage.StringConverter
   schema.compatibility=NONE
   ```

1. [**Submit**] を選択してください。

1. その後、コネクタの **Operations** タブでオペレーションの現在の状態をモニタリングできます。

MSK Connect API を使用して コネクタ の設定を更新するには、[UpdateConnector](https://docs.aws.amazon.com/MSKC/latest/mskc/API_UpdateConnector.html) を参照してください。

# コネクタからの接続
<a name="msk-connect-from-connectors"></a>

以下のベストプラクティスにより、Amazon MSK Connect への接続パフォーマンスを向上させることができます。

## Amazon VPC ピアリングまたは Transit Gateway の IP が重複しないようにする
<a name="CIDR-ip-ranges"></a>

Amazon VPC ピアリングまたは Transit Gateway を Amazon MSK Connect と共に使用している場合は、ピアリングされた VPC リソースに到達するのに以下の CIDR 範囲内の IP を使用するようにコネクタを設定しないでください。
+ "10.99.0.0/16"
+ "192.168.0.0/16"
+ "172.21.0.0/16"

# カスタムプラグインの作成
<a name="msk-connect-plugins"></a>

プラグインは、コネクタロジックを定義するコードを含む AWS リソースです。JAR ファイル (または 1 つ以上の JAR ファイルを含む ZIP ファイル) を S3 バケットにアップロードし、プラグインを作成するときにバケットの場所を指定します。プラグインが作成されると、MSK Connect はその時点で S3 オブジェクトの内容をコピーします。S3 オブジェクトへのリンクは保持されないため、オブジェクトへのそれ以降の変更はプラグインまたはそのコネクタには影響しません。コネクタを作成するときは、MSK Connect で使用するプラグインを指定します。プラグインとコネクタの関係は one-to-manyです。同じプラグインから 1 つ以上のコネクタを作成できます。

**注記**  
カスタムプラグインは更新できません。新しいバージョンのプラグインコードを使用するには、プラグインを参照するすべてのコネクタを削除し、プラグインを削除してから再作成します。

**カスタムプラグインの依存関係パッケージ**  
プラグインに必要なすべての JAR ファイルと依存関係を含めることをお勧めします。コネクタを次のいずれかとしてパッケージ化します。  
プラグインに必要なすべての JAR ファイルと依存関係を含む ZIP ファイル。
プラグインとその依存関係のすべてのクラスファイルを含む単一の uber JAR。
プラグインの依存関係をバンドルしないと、ランタイム環境の可用性や互換性に影響し、予期しないエラーが発生する可能性があります。

コネクタのコードを開発する方法については、Apache Kafka ドキュメントの[コネクタ開発ガイド](https://kafka.apache.org/documentation/#connect_development)を参照してください。

**を使用したカスタムプラグインの作成 AWS マネジメントコンソール**

1. [https://console.aws.amazon.com/msk/](https://console.aws.amazon.com/msk/) で Amazon MSK コンソールを開きます。

1. 左側のペインの **MSK Connect** で、**Custom plugins** (カスタムプラグイン) を選択します。

1. **Create custom plugin** (カスタムプラグインの作成) を選択します。

1. **Browse S3** (S3 の参照) を選択します。

1. S3 バケットのリストで、プラグインの JAR または ZIP ファイルを含むバケットを選択します。

1. オブジェクトのリストで、プラグインの JAR または ZIP ファイルの左側にあるボックスを選択し、**Choose** (選択) を選択します。

1. **Create custom plugin** (カスタムプラグインの作成) を選択します。

MSK Connect API を使用してカスタムプラグインを作成するには、[CreateCustomPlugin](https://docs.aws.amazon.com/MSKC/latest/mskc/API_CreateCustomPlugin.html)を参照してください。

# MSK Connect ワーカーを理解する
<a name="msk-connect-workers"></a>

ワーカーは、コネクタロジックを実行する Java 仮想マシン (JVM) プロセスです。各ワーカーは、並列スレッドで実行され、データをコピーする作業を行う一連のタスクを作成します。タスクは状態を保存しないため、復元力のあるスケーラブルな Data Pipeline を提供するために、いつでも開始、停止、または再開できます。スケーリングイベントまたは予期しない障害によるワーカー数の変更は、残りのワーカーによって自動的に検出され、残りのワーカーのセット全体でタスクのバランスを取り直すように調整されます。Connect ワーカーは、Apache Kafka のコンシューマーグループを使用して、調整とリバランスを行います。

コネクタの容量要件が可変するか、見積もりが難しい場合は、MSK Connect に、指定した下限と上限の間で必要に応じてワーカー数をスケールさせることができます。または、コネクタロジックを実行するワーカーの正確な数を指定することもできます。詳細については、「[コネクタ容量を理解する](msk-connect-capacity.md)」を参照してください。

**MSK Connect ワーカーによる IP アドレスの消費**  
MSK Connect ワーカーは、カスタマー提供のサブネット内の IP アドレスを消費します。各ワーカーは、カスタマー提供のサブネットの 1 つに含まれている 1 つの IP アドレスを使用します。CreateConnector リクエストに応答して提供されたサブネットに使用可能な IP アドレスが十分にあり、指定した容量を満たしていることを確認する必要があります。特に、ワーカー数が変動する可能性のあるコネクタを自動スケーリングする場合は注意が必要です。

## デフォルトのワーカー構成
<a name="msk-connect-default-worker-config"></a>

MSK Connect は、次のデフォルトのワーカー構成を提供します。

```
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
```

# サポートされているワーカー設定プロパティ
<a name="msk-connect-supported-worker-config-properties"></a>

MSK Connect は、デフォルトのワーカー構成を提供します。コネクタで使用するカスタムワーカー設定を作成することもできます。次のリストには、Amazon MSK Connect がサポートする、またはサポートしないワーカー設定プロパティに関する情報が含まれています。
+ `key.converter` プロパティと `value.converter` プロパティが必要です。
+ MSK Connect は、次の `producer.` 設定プロパティをサポートしています。

  ```
  producer.acks
  producer.batch.size
  producer.buffer.memory
  producer.compression.type
  producer.enable.idempotence
  producer.key.serializer
  producer.linger.ms
  producer.max.request.size
  producer.metadata.max.age.ms
  producer.metadata.max.idle.ms
  producer.partitioner.class
  producer.reconnect.backoff.max.ms
  producer.reconnect.backoff.ms
  producer.request.timeout.ms
  producer.retry.backoff.ms
  producer.value.serializer
  ```
+ MSK Connect は、次の `consumer.` 設定プロパティをサポートしています。

  ```
  consumer.allow.auto.create.topics
  consumer.auto.offset.reset
  consumer.check.crcs
  consumer.fetch.max.bytes
  consumer.fetch.max.wait.ms
  consumer.fetch.min.bytes
  consumer.heartbeat.interval.ms
  consumer.key.deserializer
  consumer.max.partition.fetch.bytes
  consumer.max.poll.interval.ms
  consumer.max.poll.records
  consumer.metadata.max.age.ms
  consumer.partition.assignment.strategy
  consumer.reconnect.backoff.max.ms
  consumer.reconnect.backoff.ms
  consumer.request.timeout.ms
  consumer.retry.backoff.ms
  consumer.session.timeout.ms
  consumer.value.deserializer
  ```
+ 次のプロパティを除いて、`producer.` または `consumer.` プレフィックスでスタートしないすべての設定プロパティが許可されます。

  ```
  access.control.
  admin.
  admin.listeners.https.
  client.
  connect.
  inter.worker.
  internal.
  listeners.https.
  metrics.
  metrics.context.
  rest.
  sasl.
  security.
  socket.
  ssl.
  topic.tracking.
  worker.
  bootstrap.servers
  config.storage.topic
  connections.max.idle.ms
  connector.client.config.override.policy
  group.id
  listeners
  metric.reporters
  plugin.path
  receive.buffer.bytes
  response.http.headers.config
  scheduled.rebalance.max.delay.ms
  send.buffer.bytes
  status.storage.topic
  ```

ワーカー設定プロパティとその表現については、Apache Kafka ドキュメントの「[Kafka Connect Config](https://kafka.apache.org/documentation/#connectconfigs)」を参照してください。

# カスタムワーカー設定の作成
<a name="msk-connect-create-custom-worker-config"></a>

この手順では、 AWS マネジメントコンソールを使用してカスタムワーカー設定を作成する方法について説明します。

**を使用したカスタムワーカー設定の作成 AWS マネジメントコンソール**

1. [https://console.aws.amazon.com/msk/](https://console.aws.amazon.com/msk/) で Amazon MSK コンソールを開きます。

1. 左側のペインの **MSK Connect** で、**Worker configurations** (ワーカー構成) を選択します。

1. **Create worker configuration** (ワーカー構成の作成) を選択します。

1. 名前とオプションの説明を入力し、設定するプロパティと値を追加します。

1. **Create worker configuration** (ワーカー構成の作成) を選択します。

MSK Connect API を使用してワーカー構成を作成するには、[CreateWorkerConfiguration](https://docs.aws.amazon.com/MSKC/latest/mskc/API_CreateWorkerConfiguration.html)を参照してください。

# `offset.storage.topic` を使用してソースコネクタオフセットを管理する
<a name="msk-connect-manage-connector-offsets"></a>

このセクションでは、「オフセットストレージトピック」を使用してソースコネクタオフセットを管理するのに役立つ情報を提供します。オフセットストレージトピックは、Kafka Connect がコネクタとタスク設定のオフセットを保存するために使用する内部トピックです。

## 考慮事項
<a name="msk-connect-manage-connector-offsets-considerations"></a>

ソースコネクタオフセットを管理するときは、次の点を考慮してください。
+ オフセットストレージトピックを指定するには、ワーカー設定の `offset.storage.topic` の値として、コネクタオフセットが保存される Kafka トピックの名前を指定します。
+ コネクタ設定を変更する場合は注意が必要です。ソースコネクタがキーオフセットレコードに対して設定の値を使用している場合、設定値を変更すると、コネクタが意図しない動作をする可能性があります。プラグインのドキュメントを参照することをお勧めします。
+ **デフォルトのパーティション数のカスタマイズ** — `offset.storage.topic` の追加によるワーカー設定のカスタマイズに加えて、オフセットとステータスストレージのトピックのパーティション数をカスタマイズできます。内部トピックのデフォルトパーティションは次のとおりです。
  + `config.storage.topic`: 1、設定不可、単一パーティションのトピックである必要がある
  + `offset.storage.topic`: 25、`offset.storage.partitions` を指定することで設定可能
  + `status.storage.topic`: 5、`status.storage.partitions` を指定することで設定可能
+ **トピックの手動削除** — Amazon MSK Connect は、コネクタをデプロイするたびに新しい Kafka Connect 内部トピック (トピック名が `__amazon_msk_connect` で始まる) を作成します。`offset.storage.topic` などの内部トピックはコネクタ間で再利用される可能性があるため、削除されたコネクタにアタッチされた古いトピックは自動的に削除されません。ただし、MSK Connect によって作成された未使用の内部トピックは手動で削除できます。内部トピックには `__amazon_msk_connect_<offsets|status|configs>_connector_name_connector_id` 形式に従って名前が付けられます。

  正規表現 `__amazon_msk_connect_<offsets|status|configs>_connector_name_connector_id` を使用して内部トピックを削除できます。実行中のコネクタによって現在使用されている内部トピックは削除しないでください。
+ **MSK Connect によって作成された内部トピックに同じ名前を使用する** — オフセットストレージトピックを再利用して以前に作成したコネクタのオフセットを利用する場合は、新しいコネクタに古いコネクタと同じ名前を付ける必要があります。ワーカー設定を使用して `offset.storage.topic` プロパティを設定し、`offset.storage.topic` に同じ名前を割り当て、異なるコネクタ間で再利用することができます。この設定については、「[コネクタオフセットを管理する](https://docs.aws.amazon.com/msk/latest/developerguide/msk-connect-workers.html#msk-connect-create-custom-worker-config)」で説明しています。MSK Connect では、異なるコネクタが `config.storage.topic` と `status.storage.topic` を共有することはできません。これらのトピックは、MSKC で新しいコネクタを作成するたびに作成されます。`__amazon_msk_connect_<status|configs>_connector_name_connector_id` 形式に従って自動的に名前が付けられるため、作成するコネクタによって名前が異なります。

# デフォルトのオフセットストレージトピックを使用する
<a name="msk-connect-default-offset-storage-topic"></a>

デフォルトでは、Amazon MSK Connect は、作成したコネクタごとに Kafka クラスターに新しいオフセットストレージトピックを生成します。MSK は、コネクタ ARN の一部を使用してデフォルトのトピック名を作成します。例えば、`__amazon_msk_connect_offsets_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2`。

# カスタムオフセットストレージトピックを使用する
<a name="msk-connect-set-offset-storage-topic"></a>

ソースコネクタ間のオフセットの連続性を提供するために、デフォルトトピックの代わりに任意のオフセットストレージトピックを使用できます。オフセットストレージトピックを指定すると、前のコネクタの最後のオフセットから読み取りを再開するソースコネクタを作成するといったタスクを実行しやすくなります。

オフセットストレージトピックを指定するには、コネクタを作成する前にワーカー設定で `offset.storage.topic` プロパティの値を指定します。オフセットストレージトピックを再利用して以前に作成したコネクタのオフセットを利用する場合は、新しいコネクタに古いコネクタと同じ名前を付ける必要があります。カスタムオフセットストレージトピックを作成する場合は、トピック設定で [https://kafka.apache.org/27/documentation.html#topicconfigs_cleanup.policy](https://kafka.apache.org/27/documentation.html#topicconfigs_cleanup.policy) を `compact` に設定する必要があります。

**注記**  
*シンク*コネクタの作成時にオフセットストレージトピックを指定すると、トピックがまだ存在しない場合は MSK Connect によってそのトピックが作成されます。ただし、このトピックはコネクタオフセットの保存には使用されません。  
代わりに、シンクコネクタオフセットは Kafka コンシューマーグループプロトコルを使用して管理されます。各シンクコネクタは `connect-{CONNECTOR_NAME}` という名前のグループを作成します。コンシューマーグループが存在する限り、同じ `CONNECTOR_NAME` 値で連続して作成されるシンクコネクタは、最後にコミットされたオフセットから継続されます。

**重要**  
オフセットの継続性を維持しながら既存のコネクタ設定を更新する場合は、UpdateConnector API を使用します。詳細については、「[コネクタを更新する](mkc-update-connector.md)」を参照してください。

**Example : ソースコネクタを再作成するときにオフセットストレージトピックを指定する**  
オフセットの継続性を維持しながらコネクタを削除して再作成する必要がある場合は、ワーカー設定でオフセットストレージトピックを指定できます。たとえば、変更データキャプチャ (CDC) コネクタがあり、CDC ストリーム内の場所を失うことなく再作成するとします。次のステップでこのタスクのやり方を説明します。  

1. クライアントマシンで、次のコマンドを実行してコネクタのオフセットストレージトピックの名前を検索します。`<bootstrapBrokerString>` をクラスターのブートストラップブローカー文字列に置き換えます。ブートストラップブローカー文字列を取得する手順については、「[Amazon MSK クラスターのブートストラップブローカーを取得する](msk-get-bootstrap-brokers.md)」を参照してください。

   ```
   <path-to-your-kafka-installation>/bin/kafka-topics.sh --list --bootstrap-server <bootstrapBrokerString>
   ```

   次の出力は、デフォルトの内部コネクタトピックを含むすべてのクラスタートピックのリストを示しています。この例では、既存の CDC コネクタは MSK Connect によって作成された[デフォルトのオフセットストレージトピック](msk-connect-default-offset-storage-topic.md)を使用します。オフセットストレージトピックが `__amazon_msk_connect_offsets_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2` と呼ばれるのはこれが理由です。

   ```
   __consumer_offsets
   __amazon_msk_canary
   __amazon_msk_connect_configs_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2
   __amazon_msk_connect_offsets_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2
   __amazon_msk_connect_status_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2
   my-msk-topic-1
   my-msk-topic-2
   ```

1. [https://console.aws.amazon.com/msk/](https://console.aws.amazon.com/msk) で Amazon MSK コンソールを開きます。

1. **[コネクタ]** リストからコネクタを選択します。**[コネクタ設定]** フィールドの内容をコピーして保存し、内容を変更して新しいコネクタを作成できるようにします。

1. コネクタを削除するには、**[削除]** を選択します。テキスト入力フィールドにコネクタ名を入力して、削除を確定します。

1. 実際のシナリオに合った値を使用してカスタムワーカー設定を作成します。手順については、「[カスタムワーカー設定の作成](msk-connect-create-custom-worker-config.md)」を参照してください。

   ワーカー設定では、以下の設定のように、以前に `offset.storage.topic` の値として取得したオフセットストレージトピックの名前を指定する必要があります。

   ```
   config.providers.secretManager.param.aws.region=eu-west-3
   key.converter=<org.apache.kafka.connect.storage.StringConverter>
   value.converter=<org.apache.kafka.connect.storage.StringConverter>
   config.providers.secretManager.class=com.github.jcustenborder.kafka.config.aws.SecretsManagerConfigProvider
   config.providers=secretManager
   offset.storage.topic=__amazon_msk_connect_offsets_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2
   ```

1. 
**重要**  
新しいコネクタには古いコネクタと同じ名前を付ける必要があります。

   前のステップで設定したワーカー設定を使用して、新しいコネクタを作成します。手順については、「[コネクタを作成する](mkc-create-connector-intro.md)」を参照してください。

# チュートリアル: 設定プロバイダーを用いた機密情報の外部化
<a name="msk-connect-config-provider"></a>

この例は、オープンソースの設定プロバイダーを使用して Amazon MSK Connect の機密情報を外部化する方法を示しています。設定プロバイダーを使用すると、コネクタまたはワーカー設定でプレーンテキストの代わりに変数を指定でき、コネクタで実行されているワーカーは実行時にこれらの変数を解決します。これにより、認証情報やその他のシークレットがプレーンテキストで保存されるのを防ぐことができます。この例の設定プロバイダーは、 AWS Secrets Manager、Amazon S3、Systems Manager (SSM) からの設定パラメータの取得をサポートしています。[ステップ 2](#msk-connect-config-providers) では、設定するサービスの機密情報の保存と取得をセットアップする方法を確認できます。

## 考慮事項
<a name="msk-connect-config-providers-considerations"></a>

Amazon MSK Connect で MSK 設定プロバイダーを使用する際には、次の点を考慮してください。
+ 設定プロバイダーを使用するときは、IAM サービス実行ロールに適切なアクセス許可を割り当てます。
+ ワーカー設定で設定プロバイダーを定義し、コネクタ設定でその実装を定義します。
+ プラグインで機密設定値をシークレットとして定義していない場合、機密設定値がコネクタログに表示される可能性があります。Kafka Connect は、未定義の設定値を他のプレーンテキスト値と同じように扱います。詳細については[シークレットがコネクタログに表示されないようにする](msk-connect-logging.md#msk-connect-logging-secrets)を参照してください。
+ デフォルトでは、コネクタが設定プロバイダーを使用するときに、MSK Connect はコネクタを頻繁に再起動します。この再起動動作を無効にするには、コネクタ設定で `config.action.reload` の値を `none` に設定します。

## カスタムプラグインを作成して S3 にアップロードする
<a name="msk-connect-config-providers-create-custom-plugin"></a>

 カスタムプラグインを作成するには、ローカルマシンで以下のコマンドを実行して、コネクタと msk-config-provider を含む zip ファイルを作成します。

**ターミナルウィンドウと Debezium をコネクタとして使用してカスタムプラグインを作成するには**

CLI を使用して、 AWS S3 AWS バケットへのアクセスを許可する認証情報を持つスーパーユーザーとしてコマンドを実行します。CLI のインストールとセットアップの詳細については、 AWS [「 ユーザーガイド」の「 CLI AWS の開始方法](https://docs.aws.amazon.com/cli/latest/userguide/cli-chap-getting-started.html)」を参照してください。 *AWS Command Line Interface *Amazon S3 で AWS CLI を使用する方法については、 *AWS Command Line Interface ユーザーガイド*の「 [CLI での Amazon S3 AWS の使用](https://docs.aws.amazon.com/cli/latest/userguide/cli-services-s3.html)」を参照してください。

1. ターミナルウィンドウで、以下のコマンドを使用して `custom-plugin` という名前のフォルダをワークスペースに作成します。

   ```
   mkdir custom-plugin && cd custom-plugin
   ```

1. 次のコマンドを使用して、[Debezium サイト](https://debezium.io/releases/)から **MySQL コネクタプラグイン**の最新の安定版リリースをダウンロードします。

   ```
   wget https://repo1.maven.org/maven2/io/debezium/debezium-connectormysql/
   2.2.0.Final/debezium-connector-mysql-2.2.0.Final-plugin.tar.gz
   ```

   次のコマンドを使用して、ダウンロードした gzip ファイルを `custom-plugin` フォルダに解凍します。

   ```
   tar xzf debezium-connector-mysql-2.2.0.Final-plugin.tar.gz
   ```

1. 次のコマンドを使用して、[MSK 設定プロバイダーの zip ファイル](https://github.com/aws-samples/msk-config-providers/releases/download/r0.4.0/msk-config-providers-0.4.0-with-dependencies.zip)をダウンロードします。

   ```
   wget https://github.com/aws-samples/msk-config-providers/releases/download/r0.4.0/msk-config-providers-0.4.0-with-dependencies.zip
   ```

   次のコマンドを使用して、ダウンロードした zip ファイルを `custom-plugin` フォルダに解凍します。

   ```
   unzip msk-config-providers-0.4.0-with-dependencies.zip
   ```

1. 上記のステップで取得した MSK 設定プロバイダーとカスタムコネクタの内容を、`custom-plugin.zip` という名前を付けた単一のファイルに圧縮します。

   ```
   zip -r ../custom-plugin.zip * 
   ```

1. このファイルを後で参照できるように S3 にアップロードします。

   ```
   aws s3 cp ../custom-plugin.zip s3:<S3_URI_BUCKET_LOCATION>
   ```

1. Amazon MSK コンソールの **[MSK Connect]** セクションから **[カスタムプラグイン]** を選択し、次に **[カスタムプラグインの作成]** を選択して **s3:<*S3\$1URI\$1BUCKET\$1LOCATION*>** S3 バケットを参照し、アップロードしたカスタムプラグインの ZIP ファイルを選択します。  
![\[Amazon S3 bucket interface showing a single custom-plugin.zip file in the debezium folder.\]](http://docs.aws.amazon.com/ja_jp/msk/latest/developerguide/images/s3-object-browser.png)

1. プラグイン名には **debezium-custom-plugin** と入力します。オプションで説明を入力し、**[カスタムプラグインの作成]** を選択します。  
![\[Amazon S3 bucket interface showing a single custom-plugin.zip file in the debezium folder.\]](http://docs.aws.amazon.com/ja_jp/msk/latest/developerguide/images/create-custom-plugin.png)

## さまざまなプロバイダーのパラメータとアクセス許可を設定する
<a name="msk-connect-config-providers"></a>

次の 3 つのサービスでパラメータ値を設定できます。
+ Secrets Manager
+ Systems Manager Parameter Store
+ S3 - Simple Storage Service

以下のタブのいずれかを選択すると、そのサービスのパラメータ設定および関連するアクセス許可の設定方法が表示されます。

------
#### [ Configure in Secrets Manager ]

**Secrets Manager でパラメータ値を設定するには**

1. [Secrets Manager コンソール](https://console.aws.amazon.com/secretsmanager/)を開きます。

1. 認証情報またはシークレットを保存する新しいシークレットを作成します。手順については、「 *AWS Secrets Manager ユーザーガイド*[」の「 AWS Secrets Manager シークレット](https://docs.aws.amazon.com/secretsmanager/latest/userguide/create_secret.html)の作成」を参照してください。

1. シークレットの ARN をコピーします。

1. 以下のサンプルポリシーの Secrets Manager のアクセス許可を[サービス実行ロール](https://docs.aws.amazon.com/msk/latest/developerguide/msk-connect-service-execution-role.html)に追加します。サンプル ARN である `arn:aws:secretsmanager:us-east-1:123456789012:secret:MySecret-1234` をシークレットの ARN に置き換えてください。

1. ワーカー設定とコネクタの指示を追加します。  
****  

   ```
   {
           "Version":"2012-10-17",		 	 	 
           "Statement": [
               {
                   "Effect": "Allow",
                   "Action": [
                       "secretsmanager:GetResourcePolicy",
                       "secretsmanager:GetSecretValue",
                       "secretsmanager:DescribeSecret",
                       "secretsmanager:ListSecretVersionIds"
                   ],
                   "Resource": [
                   "arn:aws:secretsmanager:us-east-1:123456789012:secret:MySecret-1234"
                   ]
               }
           ]
       }
   ```

1. Secrets Manager 設定プロバイダーを使用するには、ステップ 3 のワーカー設定テキストボックスに次のコード行をコピーします。

   ```
   # define name of config provider:
   
   config.providers = secretsmanager
   
   # provide implementation classes for secrets manager:
   
   config.providers.secretsmanager.class = com.amazonaws.kafka.config.providers.SecretsManagerConfigProvider
   
   # configure a config provider (if it needs additional initialization), for example you can provide a region where the secrets or parameters are located:
   
   config.providers.secretsmanager.param.region = us-east-1
   ```

1. Secrets Manager 設定プロバイダーで、ステップ 4 のコネクタ設定にある次のコード行をコピーします。

   ```
   #Example implementation for secrets manager variable
   database.user=${secretsmanager:MSKAuroraDBCredentials:username}
   database.password=${secretsmanager:MSKAuroraDBCredentials:password}
   ```

上記のステップは、他の設定プロバイダーでも使用できます。

------
#### [ Configure in Systems Manager Parameter Store ]

**Systems Manager のパラメータストアでパラメータ値を設定するには**

1. [ Systems Manager コンソール](https://console.aws.amazon.com/systems-manager/)を開きます。

1. ナビゲーションペインで、**[パラメータストア]** を選択します。

1. Systems Manager に保存する新しいパラメータを作成します。手順については、「 AWS Systems Manager ユーザーガイド[」の「Systems Manager パラメータの作成 (コンソール)](https://docs.aws.amazon.com/systems-manager/latest/userguide/parameter-create-console.html)」を参照してください。

1. パラメータの ARN をコピーします。

1. 以下のサンプルポリシーの Secrets Manager のアクセス許可を[サービス実行ロール](https://docs.aws.amazon.com/msk/latest/developerguide/msk-connect-service-execution-role.html)に追加します。*<arn:aws:ssm:us-east-1:123456789000:parameter/MyParameterName>* をパラメータの ARN で置き換えます。  
****  

   ```
   {
           "Version":"2012-10-17",		 	 	 
           "Statement": [
               {
                   "Sid": "VisualEditor0",
                   "Effect": "Allow",
                   "Action": [
                       "ssm:GetParameterHistory",
                       "ssm:GetParametersByPath",
                       "ssm:GetParameters",
                       "ssm:GetParameter"
                   ],
                   "Resource": "arn:aws:ssm:us-east-1:123456789000:parameter/MyParameterName"
               }
           ]
       }
   ```

1. パラメータストア設定プロバイダーを使用するには、ステップ 3 のワーカー設定テキストボックスに次のコード行をコピーします。

   ```
   # define name of config provider:
   
   config.providers = ssm
   
   # provide implementation classes for parameter store:
   
   config.providers.ssm.class = com.amazonaws.kafka.config.providers.SsmParamStoreConfigProvider
   
   # configure a config provider (if it needs additional initialization), for example you can provide a region where the secrets or parameters are located:
   
   config.providers.ssm.param.region = us-east-1
   ```

1. パラメータストア設定プロバイダーで、ステップ 5 のコネクタ設定にある次のコード行をコピーします。

   ```
   #Example implementation for parameter store variable
   schema.history.internal.kafka.bootstrap.servers=${ssm::MSKBootstrapServerAddress}
   ```

   上記の 2 つのステップを他の設定プロバイダーにバンドルすることもできます。

------
#### [ Configure in Amazon S3 ]

**Amazon S3 のオブジェクト/ファイルを設定するには**

1. [Amazon S3 コンソール](https://console.aws.amazon.com/s3/)を開きます。

1. オブジェクトを S3 のバケットにアップロードします。手順については、「[オブジェクトのアップロード](https://docs.aws.amazon.com/AmazonS3/latest/userguide/upload-objects.html)」を参照してください。

1. オブジェクトの ARN をコピーします。

1. 以下のサンプルポリシーの Amazon S3 オブジェクト読み取りアクセス許可を[サービス実行ロール](https://docs.aws.amazon.com/msk/latest/developerguide/msk-connect-service-execution-role.html)に追加します。サンプル ARN である `arn:aws:s3:::<MY_S3_BUCKET/path/to/custom-plugin.zip>` をシークレットの ARN に置き換えてください。  
****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement": [
               {
                   "Sid": "VisualEditor0",
                   "Effect": "Allow",
                   "Action": "s3:GetObject",
                   "Resource": "arn:aws:s3:::<MY_S3_BUCKET/path/to/custom-plugin.zip>"
               }
           ]
       }
   ```

1. Amazon S3 設定プロバイダーを使用するには、ステップ 3 のワーカー設定テキストボックスに次のコード行をコピーします。

   ```
   # define name of config provider:
   
   config.providers = s3import
   # provide implementation classes for S3:
   
   config.providers.s3import.class = com.amazonaws.kafka.config.providers.S3ImportConfigProvider
   ```

1. Amazon S3 設定プロバイダーで、次のコード行をステップ 4 のコネクタ設定にコピーします。

   ```
   #Example implementation for S3 object
   
   database.ssl.truststore.location = ${s3import:us-west-2:my_cert_bucket/path/to/trustore_unique_filename.jks}
   ```

   上記の 2 つのステップを他の設定プロバイダーにバンドルすることもできます。

------

## 設定プロバイダーに関する情報を使用してカスタムワーカー設定を作成します。
<a name="msk-connect-config-providers-create-custom-config"></a>

1. **[Amazon MSK Connect]** セクションで **[ワーカー設定]** を選択します。

1. **[ワーカー設定の作成]** を選択します。

1. [ワーカー設定名] テキストボックスに `SourceDebeziumCustomConfig` を入力します。説明はオプションです。

1. 必要なプロバイダーに基づいて関連する設定コードをコピーし、**[ワーカー設定]** テキストボックスに貼り付けます。

1. 3 つのプロバイダーすべてのワーカー設定の例を以下に示します。

   ```
   key.converter=org.apache.kafka.connect.storage.StringConverter
   key.converter.schemas.enable=false
   value.converter=org.apache.kafka.connect.json.JsonConverter
   value.converter.schemas.enable=false
   offset.storage.topic=offsets_my_debezium_source_connector
   
   # define names of config providers:
   
   config.providers=secretsmanager,ssm,s3import
   
   # provide implementation classes for each provider:
   
   config.providers.secretsmanager.class    = com.amazonaws.kafka.config.providers.SecretsManagerConfigProvider
   config.providers.ssm.class               = com.amazonaws.kafka.config.providers.SsmParamStoreConfigProvider
   config.providers.s3import.class          = com.amazonaws.kafka.config.providers.S3ImportConfigProvider
   
   # configure a config provider (if it needs additional initialization), for example you can provide a region where the secrets or parameters are located:
   
   
   config.providers.secretsmanager.param.region = us-east-1
   config.providers.ssm.param.region = us-east-1
   ```

1. [ワーカー設定の作成] をクリックします。

## コネクタを作成する
<a name="msk-connect-config-providers-create-connector"></a>

1. 「[Create a new connector](https://docs.aws.amazon.com/msk/latest/developerguide/mkc-create-connector.html)」の手順に従って新しいコネクタを作成します。

1. [カスタムプラグインを作成して S3 にアップロードする](#msk-connect-config-providers-create-custom-plugin) で S3 バケットにアップロードした `custom-plugin.zip` ファイルをカスタムプラグインのソースとして選択します。

1. 必要なプロバイダーに基づいて関連する設定コードをコピーし、[コネクタの設定] フィールドに貼り付けます。

1. 3 つのプロバイダーすべてのコネクタ設定の例を以下に示します。

   ```
   #Example implementation for parameter store variable
   schema.history.internal.kafka.bootstrap.servers=${ssm::MSKBootstrapServerAddress}
   
   #Example implementation for secrets manager variable
   database.user=${secretsmanager:MSKAuroraDBCredentials:username}
   database.password=${secretsmanager:MSKAuroraDBCredentials:password}
   
   #Example implementation for Amazon S3 file/object
   database.ssl.truststore.location = ${s3import:us-west-2:my_cert_bucket/path/to/trustore_unique_filename.jks}
   ```

1. **[ワーカー設定]** ドロップダウンから **[カスタム設定を使用]** を選択し、**[SourceDebeziumCustomConfig]** を選択します。

1. 「[Create connector](https://docs.aws.amazon.com/msk/latest/developerguide/mkc-create-connector.html)」の残りの手順に従います。

# MSK Connect の IAM のロールとポリシー
<a name="msk-connect-iam"></a>

このセクションでは、 AWS 環境内で Amazon MSK Connect を安全にデプロイおよび管理するための適切な IAM ポリシーとロールを設定するのに役立ちます。以下のセクションでは、MSK Connect で使用することが必要でありサービス実行ロールについて説明します。これには、IAM 認証 MSK クラスターに接続するときに必要な信頼ポリシーと追加のアクセス許可が含まれます。このページでは、MSK Connect 機能へのフルアクセスを許可するための包括的な IAM ポリシーの例と、サービスで使用できる AWS マネージドポリシーの詳細も示します。

**Topics**
+ [サービス実行ロールを理解する](msk-connect-service-execution-role.md)
+ [MSK Connect の IAM ポリシーの例](mkc-iam-policy-examples.md)
+ [サービス間での混乱した代理問題を防止する](cross-service-confused-deputy-prevention.md)
+ [AWS MSK Connect の マネージドポリシー](mkc-security-iam-awsmanpol.md)
+ [MSK Connect のサービスにリンクされたロールの使用](mkc-using-service-linked-roles.md)

# サービス実行ロールを理解する
<a name="msk-connect-service-execution-role"></a>

**注記**  
Amazon MSK Connect は、[サービスにリンクされたロール](mkc-using-service-linked-roles.md)をサービス実行ロールとして使用することをサポートしていません。サービス実行ロールは別途作成する必要があります。カスタム IAM ロールを作成する手順については、*IAM ユーザーガイド*の[「 AWS サービスにアクセス許可を委任するロールの作成](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_create_for-service.html)」を参照してください。

MSK Connect でコネクタを作成するときは、それで使用する AWS Identity and Access Management (IAM) ロールを指定する必要があります。MSK Connect が継承できるように、サービス実行ロールには次の信頼ポリシーが必要です。このポリシーの条件コンテキストキーの詳細については、「[サービス間での混乱した代理問題を防止する](cross-service-confused-deputy-prevention.md)」を参照してください。

------
#### [ JSON ]

****  

```
{
  "Version":"2012-10-17",		 	 	 
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "Service": "kafkaconnect.amazonaws.com"
      },
      "Action": "sts:AssumeRole",
      "Condition": {
        "StringEquals": {
          "aws:SourceAccount": "123456789012"
        },
        "ArnLike": {
          "aws:SourceArn": "arn:aws:kafkaconnect:us-east-1:123456789012:connector/myConnector/abc12345-abcd-4444-a8b9-123456f513ed-2"
        }
      }
    }   
  ]
}
```

------

コネクタで使用する Amazon MSK クラスターが IAM 認証を使用するクラスターである場合は、次の許可ポリシーをコネクタのサービス実行ロールに追加する必要があります。クラスターの UUID を確認方法と、トピック ARN を作成する方法については、[認可ポリシーリソース](kafka-actions.md#msk-iam-resources) を参照してください。

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:Connect",
                "kafka-cluster:DescribeCluster"
            ],
            "Resource": [
                "arn:aws:kafka:us-east-1:000000000001:cluster/testClusterName/300d0000-0000-0005-000f-00000000000b-1"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:ReadData",
                "kafka-cluster:DescribeTopic"
            ],
            "Resource": [
                "arn:aws:kafka:us-east-1:123456789012:topic/myCluster/300a0000-0000-0003-000a-00000000000b-6/__amazon_msk_connect_read"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:WriteData",
                "kafka-cluster:DescribeTopic"
            ],
            "Resource": [
                "arn:aws:kafka:us-east-1:123456789012:topic/testCluster/300f0000-0000-0008-000d-00000000000m-7/__amazon_msk_connect_write"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:CreateTopic",
                "kafka-cluster:WriteData",
                "kafka-cluster:ReadData",
                "kafka-cluster:DescribeTopic"
            ],
            "Resource": [
                "arn:aws:kafka:us-east-1:123456789012:topic/testCluster/300f0000-0000-0008-000d-00000000000m-7/__amazon_msk_connect_*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:AlterGroup",
                "kafka-cluster:DescribeGroup"
            ],
            "Resource": [
                "arn:aws:kafka:us-east-1:123456789012:group/testCluster/300d0000-0000-0005-000f-00000000000b-1/__amazon_msk_connect_*",
                "arn:aws:kafka:us-east-1:123456789012:group/testCluster/300d0000-0000-0005-000f-00000000000b-1/connect-*"
            ]
        }
    ]
}
```

------

コネクタの種類によっては、 AWS リソースへのアクセスを許可するアクセス許可ポリシーをサービス実行ロールにアタッチする必要がある場合もあります。例えば、コネクタが S3 バケットにデータを送信する必要がある場合、サービス実行ロールには、そのバケットへの書き込み許可を付与する許可ポリシーが必要です。テストの目的で、`arn:aws:iam::aws:policy/AmazonS3FullAccess` のように、フルアクセスを提供する事前に構築された IAM ポリシーの 1 つを使用できます。ただし、セキュリティ上の理由から、コネクタが AWS ソースからの読み取りまたは AWS シンクへの書き込みを許可する最も制限の厳しいポリシーを使用することをお勧めします。

# MSK Connect の IAM ポリシーの例
<a name="mkc-iam-policy-examples"></a>

管理者以外のユーザーにすべての MSK Connect 機能へのフルアクセスを許可するには、次のようなポリシーをユーザーの IAM ロールにアタッチします。

------
#### [ JSON ]

****  

```
{
  "Version":"2012-10-17",		 	 	 
  "Statement": [
    {
      "Sid": "MSKConnectFullAccess",
      "Effect": "Allow",
      "Action": [
        "kafkaconnect:CreateConnector",
        "kafkaconnect:DeleteConnector",
        "kafkaconnect:DescribeConnector",
        "kafkaconnect:ListConnectors",
        "kafkaconnect:UpdateConnector",
        "kafkaconnect:CreateCustomPlugin",
        "kafkaconnect:DeleteCustomPlugin",
        "kafkaconnect:DescribeCustomPlugin",
        "kafkaconnect:ListCustomPlugins",
        "kafkaconnect:CreateWorkerConfiguration",
        "kafkaconnect:DeleteWorkerConfiguration",
        "kafkaconnect:DescribeWorkerConfiguration",
        "kafkaconnect:ListWorkerConfigurations"
      ],
      "Resource": "*"
    },
    {
      "Sid": "IAMPassRole",
      "Effect": "Allow",
      "Action": "iam:PassRole",
      "Resource": "arn:aws:iam::123456789012:role/MSKConnectServiceRole",
      "Condition": {
        "StringEquals": {
          "iam:PassedToService": "kafkaconnect.amazonaws.com"
        }
      }
    },
    {
      "Sid": "EC2NetworkAccess",
      "Effect": "Allow",
      "Action": [
        "ec2:CreateNetworkInterface",
        "ec2:DescribeNetworkInterfaces",
        "ec2:DeleteNetworkInterface",
        "ec2:DescribeVpcs",
        "ec2:DescribeSubnets",
        "ec2:DescribeSecurityGroups"
      ],
      "Resource": "*"
    },
    {
      "Sid": "MSKClusterAccess",
      "Effect": "Allow",
      "Action": [
        "kafka:DescribeCluster",
        "kafka:DescribeClusterV2",
        "kafka:GetBootstrapBrokers"
      ],
      "Resource": "arn:aws:kafka:us-east-1:123456789012:cluster/myCluster/"
    },
    {
      "Sid": "MSKLogGroupAccess",
      "Effect": "Allow",
      "Action": [
        "logs:CreateLogGroup",
        "logs:CreateLogStream",
        "logs:PutLogEvents",
        "logs:DescribeLogStreams",
        "logs:DescribeLogGroups"
      ],
      "Resource": [
        "arn:aws:logs:us-east-1:123456789012:log-group:/aws/msk-connect/*"
      ]
    },
    {
      "Sid": "S3PluginAccess",
      "Effect": "Allow",
      "Action": [
        "s3:GetObject",
        "s3:ListBucket",
        "s3:PutObject"
      ],
      "Resource": [
        "arn:aws:s3:::amzn-s3-demo-bucket1-custom-plugins",
        "arn:aws:s3:::amzn-s3-demo-bucket1-custom-plugins/*"
      ]
    }
  ]
}
```

------

# サービス間での混乱した代理問題を防止する
<a name="cross-service-confused-deputy-prevention"></a>

混乱した代理問題は、アクションを実行する許可を持たないエンティティが、より特権のあるエンティティにアクションを実行するように強制できるセキュリティの問題です。では AWS、サービス間のなりすましにより、混乱した代理問題が発生する可能性があります。サービス間でのなりすましは、1 つのサービス (*呼び出し元サービス*) が、別のサービス (*呼び出し対象サービス*) を呼び出すときに発生する可能性があります。呼び出し元サービスは、本来ならアクセスすることが許可されるべきではない方法でその許可を使用して、別のお客様のリソースに対する処理を実行するように操作される場合があります。これを防ぐため、 AWS では、アカウントのリソースへのアクセス権が付与されたサービスプリンシパルで、すべてのサービスのデータを保護するために役立つツールを提供しています。

リソース ポリシーで [https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_condition-keys.html#condition-keys-sourcearn](https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_condition-keys.html#condition-keys-sourcearn) および [https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_condition-keys.html#condition-keys-sourceaccount](https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_condition-keys.html#condition-keys-sourceaccount) のグローバル条件コンテキストキーを使用して、MSK Connect が別のサービスに付与するアクセス許可をそのリソースに制限することをお勧めします。`aws:SourceArn` 値にアカウント ID が含まれていない (例: Amazon S3 バケット ARN にアカウント ID が含まれていない) 場合、アクセス許可を制限するため、両方のグローバル条件コンテキストキーを使用する必要があります。同じポリシーステートメントでこれらのグローバル条件コンテキストキーの両方を使用し、アカウント ID に`aws:SourceArn` の値が含まれていない場合、`aws:SourceAccount` 値と `aws:SourceArn` 値の中のアカウントには、同じアカウント ID を使用する必要があります。クロスサービスのアクセスにリソースを 1 つだけ関連付けたい場合は、`aws:SourceArn` を使用します。そのアカウント内のリソースをクロスサービスの使用に関連付けることを許可する場合は、`aws:SourceAccount` を使用します。

MSK Connect の場合、`aws:SourceArn` の値は MSK コネクタである必要があります。

混乱した代理問題から保護するための最も効果的な方法は、リソースの完全な ARN を指定して `aws:SourceArn` グローバル条件コンテキストキーを使用することです。リソースの完全な ARN が不明な場合や、複数のリソースを指定する場合は、`aws:SourceArn` グローバルコンテキスト条件キーを使用して、ARN の未知部分をワイルドカード (`*`) で表します。例えば、*arn:aws:kafkaconnect:us-east-1:123456789012:connector/\$1* は、米国東部 (バージニア北部) リージョンの ID 123456789012 のアカウントに属するすべてのコネクタを表します。

次の例では、MSK Connect で `aws:SourceArn` および `aws:SourceAccount` グローバル条件コンテキストキーを使用して、混乱した代理問題を回避する方法を示します。*123456789012* および arn:aws:kafkaconnect:*us-east-1*:*123456789012*:connector/*my-S3-Sink-Connector*/*abcd1234-5678-90ab-cdef-1234567890ab* を AWS アカウント および コネクタ情報に置き換えます。

------
#### [ JSON ]

****  

```
{
  "Version":"2012-10-17",		 	 	 
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "Service": " kafkaconnect.amazonaws.com"
      },
      "Action": "sts:AssumeRole",
      "Condition": {
        "StringEquals": {
          "aws:SourceAccount": "123456789012"
        },
        "ArnLike": {
        "aws:SourceArn": "arn:aws:kafkaconnect:us-east-1:123456789012:connector/my-S3-Sink-Connector/abcd1234-5678-90ab-cdef-1234567890ab"
        }
      }
    }   
  ]
}
```

------

# AWS MSK Connect の マネージドポリシー
<a name="mkc-security-iam-awsmanpol"></a>

 AWS 管理ポリシーは、 によって作成および管理されるスタンドアロンポリシーです AWS。 AWS 管理ポリシーは、多くの一般的なユースケースにアクセス許可を付与するように設計されているため、ユーザー、グループ、ロールにアクセス許可の割り当てを開始できます。

 AWS 管理ポリシーは、すべての AWS お客様が使用できるため、特定のユースケースに対して最小特権のアクセス許可を付与しない場合があることに注意してください。ユースケースに固有の[カスタマー管理ポリシー](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_managed-vs-inline.html#customer-managed-policies)を定義して、アクセス許可を絞り込むことをお勧めします。

 AWS 管理ポリシーで定義されているアクセス許可は変更できません。が AWS マネージドポリシーで定義されたアクセス許可 AWS を更新すると、ポリシーがアタッチされているすべてのプリンシパル ID (ユーザー、グループ、ロール) に影響します。 AWS は、新しい が起動されるか、新しい API オペレーション AWS のサービス が既存のサービスで使用できるようになったときに、 AWS マネージドポリシーを更新する可能性が高くなります。

詳細については、「**IAM ユーザーガイド」の「[AWS マネージドポリシー](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_managed-vs-inline.html#aws-managed-policies)」を参照してください。

## AWS マネージドポリシー: AmazonMSKConnectReadOnlyAccess
<a name="security-iam-awsmanpol-AmazonMSKConnectReadOnlyAccess"></a>

このポリシーは、MSK Connect リソースを一覧表示および説明するために必要なアクセス許可をユーザーに付与します。

`AmazonMSKConnectReadOnlyAccess` ポリシーを IAM アイデンティティにアタッチできます。

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "kafkaconnect:ListConnectors",
                "kafkaconnect:ListCustomPlugins",
                "kafkaconnect:ListWorkerConfigurations"
            ],
            "Resource": "*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "kafkaconnect:DescribeConnector"
            ],
            "Resource": [
                "arn:aws:kafkaconnect:*:*:connector/*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "kafkaconnect:DescribeCustomPlugin"
            ],
            "Resource": [
                "arn:aws:kafkaconnect:*:*:custom-plugin/*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "kafkaconnect:DescribeWorkerConfiguration"
            ],
            "Resource": [
                "arn:aws:kafkaconnect:*:*:worker-configuration/*"
            ]
        }
    ]
}
```

------

## AWS マネージドポリシー: KafkaConnectServiceRolePolicy
<a name="security-iam-awsmanpol-KafkaConnectServiceRolePolicy"></a>

このポリシーは、タグ `AmazonMSKConnectManaged:true` を持つネットワークインターフェイスを作成および管理するために必要なアクセス許可を MSK Connect サービスに付与します。これらのネットワークインターフェースにより、MSK Connect ネットワークは Apache Kafka クラスターやソースまたはシンクなどの Amazon VPC 内のリソースにアクセスできます。

KafkaConnectServiceRolePolicy を IAM エンティティにアタッチすることはできません。このポリシーは、MSK Connect がユーザーに代わってアクションを実行できるようにするサービスにリンクされたロールに関連付けられています。

------
#### [ JSON ]

****  

```
{
	"Version":"2012-10-17",		 	 	 
	"Statement": [
		{
			"Effect": "Allow",
			"Action": [
				"ec2:CreateNetworkInterface"
			],
			"Resource": "arn:aws:ec2:*:*:network-interface/*",
			"Condition": {
				"StringEquals": {
					"aws:RequestTag/AmazonMSKConnectManaged": "true"
				},
				"ForAllValues:StringEquals": {
					"aws:TagKeys": "AmazonMSKConnectManaged"
				}
			}
		},
		{
			"Effect": "Allow",
			"Action": [
				"ec2:CreateNetworkInterface"
			],
			"Resource": [
				"arn:aws:ec2:*:*:subnet/*",
				"arn:aws:ec2:*:*:security-group/*"
			]
		},
		{
			"Effect": "Allow",
			"Action": [
				"ec2:CreateTags"
			],
			"Resource": "arn:aws:ec2:*:*:network-interface/*",
			"Condition": {
				"StringEquals": {
					"ec2:CreateAction": "CreateNetworkInterface"
				}
			}
		},
		{
			"Effect": "Allow",
			"Action": [
				"ec2:DescribeNetworkInterfaces",
				"ec2:CreateNetworkInterfacePermission",
				"ec2:AttachNetworkInterface",
				"ec2:DetachNetworkInterface",
				"ec2:DeleteNetworkInterface"
			],
			"Resource": "arn:aws:ec2:*:*:network-interface/*",
			"Condition": {
				"StringEquals": {
					"ec2:ResourceTag/AmazonMSKConnectManaged": "true"
				}
			}
		}
	]
}
```

------

## MSK Connect AWS 管理ポリシーの更新
<a name="security-iam-awsmanpol-updates"></a>

このサービスがこれらの変更の追跡を開始してからの MSK Connect の AWS マネージドポリシーの更新に関する詳細を表示します。


| 変更 | 説明 | 日付 | 
| --- | --- | --- | 
|  MSK Connect の更新された読み取り専用ポリシー  |  MSK Connect は、AmazonMSKConnectReadOnlyAccess ポリシーを更新して、リストオペレーションの制限を削除しました。  | 2021 年 10 月 13 日 | 
|  MSK Connect が変更の追跡をスタートしました  |  MSK Connect は AWS 、管理ポリシーの変更の追跡を開始しました。  | 2021 年 9 月 14 日 | 

# MSK Connect のサービスにリンクされたロールの使用
<a name="mkc-using-service-linked-roles"></a>

Amazon MSK Connect は AWS Identity and Access Management (IAM)[ サービスにリンクされたロール](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_terms-and-concepts.html#iam-term-service-linked-role)を使用します。サービスにリンクされたロールは、MSK Connect に直接リンクされている一意のタイプの IAM ロールです。サービスにリンクされたロールは MSK Connect によって事前定義されており、サービスがユーザーに代わって他の AWS サービスを呼び出すために必要なすべてのアクセス許可が含まれています。

サービスにリンクされたロールにより、必要な許可を手動で追加する必要がないため、MSK Connect の設定が簡単になります。MSK Connect は、サービスにリンクされたロールの許可を定義します。特に定義されていない限り、MSK Connect のみがそのロールを引き受けることができます。定義されたアクセス許可には、信頼ポリシーとアクセス許可ポリシーが含まれ、そのアクセス許可ポリシーを他の IAM エンティティに添付することはできません。

サービスリンクロールをサポートする他のサービスについては、[IAM と連携するAWS のサービス](https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_aws-services-that-work-with-iam.html)を参照して、**[サービスにリンクされたロール]** 列が **[はい]** になっているサービスを探してください。そのサービスのサービスにリンクされたロールのドキュメントを表示するには、リンク付きの**はい**を選択します。

## MSK Connect のサービスにリンクされたロールのアクセス許可
<a name="slr-permissions"></a>

MSK Connect は、**AWSServiceRoleForKafkaConnect** という名前のサービスにリンクされたロールを使用します – Amazon MSK Connect がユーザーに代わって Amazon リソースにアクセスできるようにします。

AWSServiceRoleForKafkaConnect サービスにリンクされたロールは、`kafkaconnect.amazonaws.com` サービスを信頼してロールを引き受けます。

ロールが使用する権限ポリシーについては、「[AWS マネージドポリシー: KafkaConnectServiceRolePolicy](mkc-security-iam-awsmanpol.md#security-iam-awsmanpol-KafkaConnectServiceRolePolicy)」を参照してください。

サービスにリンクされたロールの作成、編集、削除を IAM エンティティ (ユーザー、グループ、ロールなど) に許可するには、アクセス許可を設定する必要があります。詳細については、「*IAMユーザーガイド*」の[サービスにリンクされたロールのアクセス許可](https://docs.aws.amazon.com/IAM/latest/UserGuide/using-service-linked-roles.html#service-linked-role-permissions)を参照してください。

## MSK Connect のサービスにリンクされたロールの作成
<a name="create-slr"></a>

サービスリンクロールを手動で作成する必要はありません。 AWS マネジメントコンソール、 AWS CLI、または AWS API でコネクタを作成すると、MSK Connect によってサービスにリンクされたロールが作成されます。

このサービスにリンクされたロールを削除してから再度作成する必要がある場合は、同じプロセスを使用してアカウントにロールを再作成できます。コネクタを作成すると、MSK Connect はサービスにリンクされたロールを再度作成します。

## MSK Connect のサービスにリンクされたロールの編集
<a name="edit-slr"></a>

MSK Connect では、AWSServiceRoleForKafkaConnect サービスにリンクされたロールを編集することはできません。サービスにリンクされたロールを作成した後は、さまざまなエンティティがロールをリファレンスする可能性があるため、ロールの名前を変更することはできません。ただし、IAM を使用したロール記述の編集はできます。詳細については、「*IAM ユーザーガイド*」の[サービスにリンクされたロールの編集](https://docs.aws.amazon.com/IAM/latest/UserGuide/using-service-linked-roles.html#edit-service-linked-role)を参照してください。

## MSK Connect のサービスにリンクされたロールの削除
<a name="delete-slr"></a>

IAM コンソール、、 AWS CLI または AWS API を使用して、サービスにリンクされたロールを手動で削除できます。これを行うには、最初にすべての MSK Connect コネクタを手動で削除する必要があります。次に、ロールを手動で削除できます。詳細については、「*IAM ユーザーガイド*」の[サービスにリンクされたロールの削除](https://docs.aws.amazon.com/IAM/latest/UserGuide/using-service-linked-roles.html#delete-service-linked-role)を参照してください。

## MSK Connect のサービスにリンクされたロールがサポートされているリージョン
<a name="slr-regions"></a>

MSK Connect は、サービスが利用可能なすべてのリージョンでサービスにリンクされたロールの使用をサポートします。詳細については、「[AWS リージョンとエンドポイント](https://docs.aws.amazon.com/general/latest/gr/rande.html)」を参照してください。

# Amazon MSK Connect のインターネットアクセスを有効にする
<a name="msk-connect-internet-access"></a>

Amazon MSK Connect のコネクタがインターネットにアクセスする必要がある場合は、次の Amazon Virtual Private Cloud (VPC) 設定を使用してそのアクセスを有効にすることをお勧めします。
+ コネクタをプライベートサブネットで設定します。
+ パブリックサブネットの VPC 用にパブリック [NAT ゲートウェイ](https://docs.aws.amazon.com/vpc/latest/userguide/vpc-nat-gateway.html)または [NAT インスタンス](https://docs.aws.amazon.com/vpc/latest/userguide/VPC_NAT_Instance.html)を作成します。詳細については、「Amazon Virtual Private Cloudユーザーガイド」の「[Connect subnets to the internet or other VPCs using NAT devices](https://docs.aws.amazon.com/vpc/latest/userguide/vpc-nat-gateway.html)」ページを参照してください。
+ プライベートサブネットから NAT ゲートウェイまたはインスタンスへのアウトバウンドトラフィックを許可します。

# Amazon MSK Connect 用の NAT ゲートウェイを設定する
<a name="msk-connect-internet-access-private-subnets-example"></a>

次のステップは、NAT ゲートウェイをセットアップしてコネクタのインターネットアクセスを有効にする方法を示しています。プライベートサブネットにコネクタを作成する前に、次のステップを完了する必要があります。

## NAT ゲートウェイを設定するための完全な前提条件
<a name="msk-connect-internet-access-private-subnets-prereq"></a>

以下があることを確認します。
+ クラスターに関連付けられている Amazon Virtual Private Cloud (VPC) の ID。例えば、*vpc-123456ab* などです。
+ VPC 内のプライベートサブネットの ID。例えば、*subnet-a1b2c3de*、*subnet-f4g5h6ij* などです。コネクタにはプライベートサブネットを設定する必要があります。

## コネクタのインターネットアクセスを有効にするための手順
<a name="msk-connect-internet-access-private-subnets-steps"></a>

**コネクタのインターネットアクセスを有効にするには**

1. [https://console.aws.amazon.com/vpc/](https://console.aws.amazon.com/vpc/) で Amazon Virtual Private Cloud コンソールを開きます。

1. わかりやすい名前を付けて NAT ゲートウェイのパブリックサブネットを作成し、サブネット ID を書き留めます。詳細な手順については、「[VPC にサブネットを作成する](https://docs.aws.amazon.com/vpc/latest/userguide/working-with-vpcs.html#AddaSubnet)」を参照してください。

1. VPC がインターネットと通信できるようにインターネットゲートウェイを作成し、ゲートウェイ ID を書き留めます。VPC にインターネットゲートウェイをアタッチします。手順については、「[インターネットゲートウェイの作成とアタッチ](https://docs.aws.amazon.com/vpc/latest/userguide/VPC_Internet_Gateway.html#Add_IGW_Attach_Gateway)」を参照してください。

1. プライベートサブネット内のホストがパブリックサブネットにアクセスできるように、パブリック NAT ゲートウェイをプロビジョニングします。NAT ゲートウェイを作成するときに、前に作成したパブリックサブネットを選択します。手順については、「[NAT ゲートウェイの作成](https://docs.aws.amazon.com/vpc/latest/userguide/vpc-nat-gateway.html#nat-gateway-creating)」を参照してください。

1. ルートテーブルを設定します。この設定を完了するには、合計で 2 つのルートテーブルが必要です。VPC と同時に自動的に作成されたメインのルートテーブルが既にあるはずです。このステップでは、パブリックサブネット用の追加のルートテーブルを作成します。

   1. 次の設定を使用して VPC のメインルートテーブルを変更し、プライベートサブネットがトラフィックを NAT ゲートウェイにルーティングするようにします。手順については、*Amazon Virtual Private Cloud**ユーザーガイド*の[ルートテーブルの操作](https://docs.aws.amazon.com/vpc/latest/userguide/WorkWithRouteTables.html) を参照してください。  
**プライベート MSKC ルートテーブル**    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/ja_jp/msk/latest/developerguide/msk-connect-internet-access-private-subnets-example.html)

   1. 「[カスタムルートテーブルを作成する](https://docs.aws.amazon.com/vpc/latest/userguide/VPC_Internet_Gateway.html#Add_IGW_Routing)」の手順に従って、パブリックサブネットのルートテーブルを作成します。テーブルを作成するときは、そのテーブルがどのサブネットに関連付けられているかを識別しやすいように、**[名前タグ]** フィールドにわかりやすい名前を入力します。例えば、**パブリック MSKC** と入力します。

   1. 以下の設定を使用して**パブリック MSKC** のルートテーブルを設定します。  
****    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/ja_jp/msk/latest/developerguide/msk-connect-internet-access-private-subnets-example.html)

# プライベート DNS ホスト名を理解する
<a name="msk-connect-dns"></a>

MSK Connect のプライベート DNS ホスト名のサポートにより、パブリックドメイン名またはプライベートドメイン名を参照するようにコネクタを設定できます。サポートは VPC DHCP オプションセットで指定されている DNS サーバーによって異なります。

DHCP オプションセットは、EC2 インスタンスが VPC で VPC ネットワーク経由で通信するために使用するネットワーク構成のグループです。各 VPC にはデフォルトの DHCP オプションセットがありますが、例えば、VPC 内のインスタンスで Amazon が提供する DNS サーバーではない別の DNS サーバーを使用してドメイン名解決を行う場合は、カスタム DHCP オプションセットを作成できます。「[Amazon VPC の DHCP オプションセット](https://docs.aws.amazon.com/vpc/latest/userguide/VPC_DHCP_Options.html)」を参照してください。

プライベート DNS 解決機能が MSK Connect に組み込まれる前は、コネクタでは顧客のコネクタからの DNS クエリにサービス VPC DNS リゾルバーが使用されていました。コネクタが、顧客の VPC DHCP オプションセットで定義されている DNS サーバーを DNS 解決に使用することはありませんでした。

コネクタが参照できるのは、顧客のコネクタ設定内のホスト名またはパブリックに解決可能なプラグインのホスト名のみでした。プライベートホストゾーンで定義されたプライベートホスト名を解決したり、別の顧客ネットワークの DNS サーバーを使用したりすることはできませんでした。

顧客が自社の VPC 内のデータベース、データウェアハウス、Secrets Manager などのシステムをインターネットアクセス不可とすることを選択した場合、プライベート DNS がなければ MSK コネクタと連携できません。顧客は、企業のセキュリティ体制に準拠するためにプライベート DNS ホスト名を使用することがよくあります。

# コネクタの VPC DHCP オプションセットを設定する
<a name="msk-connect-dns-config-dhcp"></a>

コネクタは、コネクタの作成時に VPC DHCP オプションセットで定義されている DNS サーバーを自動的に使用します。コネクタを作成する前に、コネクタの DNS ホスト名の解決要件に応じて VPC DHCP オプションセットを設定してください。

プライベート DNS ホスト名機能が MSK Connect で使用できるようになる前に作成したコネクタは、以前の DNS 解決設定を変更不要で引き続き使用します。

パブリックで解決可能な DNS ホスト名解決のみがコネクタで必要な場合は、セットアップを簡単にするために、コネクタの作成時にアカウントのデフォルト VPC を使用することをお勧めします。アマゾンが提供する DNS サーバーまたは Amazon Route 53 Resolver の詳細については、「Amazon VPC ユーザーガイド」の「[Amazon DNS サーバー](https://docs.aws.amazon.com/vpc/latest/userguide/vpc-dns.html#AmazonDNS)」を参照してください。

プライベート DNS ホスト名を解決する必要がある場合は、コネクタの作成時に渡される VPC の DHCP オプションが正しく設定されていることを確認してください。詳細については、「Amazon VPC ユーザーガイド」の「[DHCP オプションセットの使用](https://docs.aws.amazon.com/vpc/latest/userguide/DHCPOptionSet.html)」を参照してください。

プライベート DNS ホスト名解決用に DHCP オプションセットを設定する場合、コネクタが DHCP オプションセットで設定したカスタム DNS サーバーにアクセスできることを確認してください。アクセスできない場合、コネクタの作成は失敗します。

VPC DHCP オプションセットをカスタマイズすると、その VPC でその後作成されるコネクタでは、オプションセットで指定した DNS サーバーが使用されます。コネクタを作成した後にオプションセットを変更すると、コネクタは数分以内に新しいオプションセットの設定を採用します。

# VPC の DNS 属性を設定する
<a name="msk-connect-dns-attributes"></a>

「Amazon VPC ユーザーガイド」の「[VPC 内の DNS 属性](https://docs.aws.amazon.com/vpc/latest/userguide/vpc-dns.html#vpc-dns-support)」と「[DNS ホスト名](https://docs.aws.amazon.com/vpc/latest/userguide/vpc-dns.html#vpc-dns-hostnames)」の説明に従って、VPC DNS 属性が正しく設定されていることを確認します。

インバウンドとアウトバウンドのリゾルバーエンドポイントを使用して他のネットワークを VPC に接続してコネクタと連携させる方法については、「Amazon Route 53 開発者ガイド」の「[VPC とネットワークの間における DNS クエリの解決](https://docs.aws.amazon.com/Route53/latest/DeveloperGuide/resolver.html)」を参照してください。

# コネクタ作成の失敗を処理する
<a name="msk-connect-dns-failure-handling"></a>

このセクションでは、DNS 解決に関連して発生する可能性のあるコネクタ作成の失敗と、問題を解決するための推奨処置について説明します。


| 失敗 | 推奨されるアクション | 
| --- | --- | 
|  DNS 解決クエリが失敗した場合、またはコネクタから DNS サーバーにアクセスできない場合、コネクタの作成は失敗します。  |  DNS 解決クエリの失敗によるコネクタ作成の失敗は、CloudWatch ログに記録されます (コネクタでこれらのログを設定している場合)。 DNS サーバーの設定を確認し、コネクタから DNS サーバーへのネットワーク接続を確認します。  | 
|  コネクタの実行中に VPC DHCP オプションセットの DNS サーバー設定を変更すると、コネクタからの DNS 解決クエリが失敗する可能性があります。DNS 解決に失敗すると、コネクタタスクの一部が失敗状態になる可能性があります。  |  DNS 解決クエリの失敗によるコネクタ作成の失敗は、CloudWatch ログに記録されます (コネクタでこれらのログを設定している場合)。 失敗したタスクは自動的に再開され、コネクタが復旧するはずです。そうならない場合は、サポートに連絡して失敗したコネクタのタスクを再開するか、コネクタを再作成してください。  | 

# MSK Connect のセキュリティ
<a name="msk-connect-security"></a>

Amazon VPC と Amazon MSK-Connect 互換 APIs 間のトラフィックが Amazon ネットワークから出るのを防ぐために AWS PrivateLink、 を搭載したインターフェイス VPC エンドポイントを使用できます。インターフェイス VPC エンドポイントには、インターネットゲートウェイ、NAT デバイス、VPN 接続、または AWS Direct Connect 接続は必要ありません。詳細については、「[インターフェイス VPC エンドポイントで Amazon MSK API を使用する](privatelink-vpc-endpoints.md)」を参照してください。

# MSK Connect のロギング
<a name="msk-connect-logging"></a>

MSK Connect は、コネクタのデバッグに使用できるログイベントを書き込むことができます。コネクタを作成するときに、次のログの宛先を 0 個以上指定できます。
+ Amazon CloudWatch Logs: MSK Connect がコネクタのログイベントを送信するロググループを指定します。ロググループを作成する方法については、「CloudWatch Logs ユーザーガイド」の「[ロググループの作成](https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/Working-with-log-groups-and-streams.html#Create-Log-Group)」を参照してください。
+ Amazon S3: MSK Connect がコネクタのログイベントを送信する S3 バケットを指定します。S3 バケットを作成する方法については、「Amazon S3 ユーザーガイド」の「[バケットの作成](https://docs.aws.amazon.com/AmazonS3/latest/userguide/create-bucket-overview.html)」を参照してください。
+ Amazon Data Firehose: MSK Connect がコネクタのログイベントを送信する配信ストリームを指定します。配信ストリームを作成する方法については、「*Firehose ユーザーガイド*」の「[Amazon Data Firehose 配信ストリームの作成](https://docs.aws.amazon.com/firehose/latest/dev/basic-create.html)」を参照してください。

ロギングの設定について詳しくは、「Amazon CloudWatch Logs ユーザーガイド」の「[特定の AWS サービスからのログの記録を有効にする](https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/AWS-logs-and-resource-policy.html)」を参照してください。

MSK Connect は、次のタイプのログイベントを発行します。


****  

| レベル | 説明 | 
| --- | --- | 
| INFO | スタートアップとシャットダウン時の対象となるランタイムイベント。 | 
| WARN | エラーではないが、望ましくない、または予期しないランタイムの状況。 | 
| FATAL | 早期終了の原因となる重大なエラー。 | 
| ERROR | 致命的ではない予期しない状態とランタイムエラー。 | 

以下は、CloudWatch Logs に送信されるログイベントの例です。

```
[Worker-0bb8afa0b01391c41] [2021-09-06 16:02:54,151] WARN [Producer clientId=producer-1] Connection to node 1 (b-1.my-test-cluster.twwhtj.c2.kafka.us-east-1.amazonaws.com/INTERNAL_IP) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient:782)
```

## シークレットがコネクタログに表示されないようにする
<a name="msk-connect-logging-secrets"></a>

**注記**  
プラグインで機密設定値をシークレットとして定義していない場合、機密設定値がコネクタログに表示される可能性があります。Kafka Connect は、未定義の設定値を他のプレーンテキスト値と同じように扱います。

プラグインがプロパティをシークレットとして定義する場合、Kafka Connect はコネクタログからプロパティの値を削除します。例えば、以下のコネクタログは、プラグインによって `aws.secret.key` が `PASSWORD` タイプとして定義されている場合、その値が `[hidden]` に置き換えられることを示しています。

```
    2022-01-11T15:18:55.000+00:00    [Worker-05e6586a48b5f331b] [2022-01-11 15:18:55,150] INFO SecretsManagerConfigProviderConfig values:
    2022-01-11T15:18:55.000+00:00    [Worker-05e6586a48b5f331b] aws.access.key = my_access_key
    2022-01-11T15:18:55.000+00:00    [Worker-05e6586a48b5f331b] aws.region = us-east-1
    2022-01-11T15:18:55.000+00:00    [Worker-05e6586a48b5f331b] aws.secret.key = [hidden]
    2022-01-11T15:18:55.000+00:00    [Worker-05e6586a48b5f331b] secret.prefix =
    2022-01-11T15:18:55.000+00:00    [Worker-05e6586a48b5f331b] secret.ttl.ms = 300000
    2022-01-11T15:18:55.000+00:00    [Worker-05e6586a48b5f331b] (com.github.jcustenborder.kafka.config.aws.SecretsManagerConfigProviderConfig:361)
```

シークレットがコネクタログファイルに表示されないようにするには、プラグイン開発者は Kafka Connect の列挙定数 [https://kafka.apache.org/27/javadoc/org/apache/kafka/common/config/ConfigDef.Type.html#PASSWORD](https://kafka.apache.org/27/javadoc/org/apache/kafka/common/config/ConfigDef.Type.html#PASSWORD) を使用して機密プロパティを定義する必要があります。プロパティがタイプ `ConfigDef.Type.PASSWORD` の場合、値がプレーンテキストとして送信された場合でも、Kafka Connect はその値をコネクタログから除外します。

# Amazon MSK Connect のモニタリング
<a name="mkc-monitoring-overview"></a>

モニタリングは、MSK Connect やその他の AWS ソリューションの信頼性、可用性、パフォーマンスを維持する上で重要な部分です。Amazon CloudWatch は、 AWS リソースと で実行しているアプリケーションを AWS リアルタイムでモニタリングします。メトリクスを収集および追跡し、カスタマイズされたダッシュボードを作成し、指定されたメトリクスが指定したしきい値に達したときに通知またはアクションを実行するアラームを設定できます。例えば、CloudWatch でコネクタの CPU 使用率やその他のメトリクスを追跡できるため、必要に応じて容量を増やすことができます。詳細については、「[Amazon CloudWatch ユーザーガイド](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/)」を参照してください。

次の API 操作を使用することができます。
+ `DescribeConnectorOperation`: コネクタ更新オペレーションのステータスをモニタリングします。
+ `ListConnectorOperations`: コネクタで実行された以前の更新を追跡します。

次の表は、MSK Connect が`ConnectorName` ディメンションで CloudWatch に送信するメトリクスを示しています。MSK Connect は、これらのメトリクスをデフォルトで追加コストなしで提供します。CloudWatch はこれらのメトリクスを 15 か月間保持するため、履歴情報にアクセスして、コネクタのパフォーマンスをより正確に把握できます。また、特定のしきい値を監視するアラームを設定し、それらのしきい値に達したときに通知を送信したり、アクションを実行したりすることもできます。詳細については、「[Amazon CloudWatch ユーザーガイド](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/)」を参照してください。


| メトリクス名 | 説明 | 
| --- | --- | 
| CpuUtilization | システムおよびユーザーによる CPU 消費のパーセンテージ。 | 
| ErroredTaskCount | エラーが発生したタスクの数。 | 
| MemoryUtilization | 現在使用中の Java 仮想マシン (JVM) ヒープメモリだけでなく、ワーカーインスタンスの合計メモリに占める割合。JVM は通常、メモリを運用システムに解放しません。そのため、JVM ヒープサイズ (MemoryUtilization) は通常、最小ヒープサイズから始まり、徐々に増加して約 80 ～ 90% の安定した最大値になります。JVM ヒープ使用量は、コネクタの実際のメモリ使用量の変化に合わせて増減する場合があります。 | 
| RebalanceCompletedTotal | このコネクタによって完了したリバランスの総数。 | 
| RebalanceTimeAvg | コネクタがリバランスに費やした平均時間 (ミリ秒単位)。 | 
| RebalanceTimeMax | コネクタがリバランスに費やした最大時間 (ミリ秒単位)。 | 
| RebalanceTimeSinceLast |  このコネクタが最新のリバランスを完了してからのミリ秒単位の時間。  | 
| RunningTaskCount | コネクタで実行中のタスクの数。 | 
| SinkConsumerByteRate | 変換がデータに適用されるまでに Kafka Connect フレームワークのシンクコンシューマーが消費した 1 秒あたりの平均バイト数。 | 
| SinkRecordReadRate | Apache Kafka または Amazon MSK クラスターから読み取られた 1 秒あたりの平均レコード数。 | 
| SinkRecordSendRate | 変換から出力され、宛先に送信される 1 秒あたりの平均レコード数。この数には、フィルタリングされたレコードは含まれていません。 | 
| SourceRecordPollRate | 生成またはポーリングされた 1 秒あたりの平均レコード数。 | 
| SourceProducerByteRate | 変換がデータに適用された後、Kafka Connect フレームワークのソースプロデューサーによって 1 秒あたりに生成された平均バイト数。 | 
| SourceRecordWriteRate | 変換から出力され、Apache Kafka または Amazon MSK クラスターに書き込まれる 1 秒あたりの平均レコード数。 | 
| TaskStartupAttemptsTotal | コネクタが試行したタスクの起動の総数。このメトリクスを使用して、タスクのスタートアップの異常を識別できます。 | 
| TaskStartupSuccessPercentage | 成功したタスクの平均パーセンテージは、コネクタでスタートされます。このメトリクスを使用して、タスクのスタートアップ試行の異常を識別できます。 | 
| WorkerCount | コネクタで実行されているワーカーの数。 | 
| BytesInPerSec | ワーカー間の通信のために Kafka Connect フレームワークに転送されたメタデータのバイト数。 | 
| BytesOutPerSec | ワーカー間の通信のために Kafka Connect フレームワークから転送されたメタデータのバイト数。 | 

# Amazon MSK Connect リソースの設定例
<a name="msk-connect-examples"></a>

このセクションには、一般的なサードパーティー製コネクタや設定プロバイダーなどの Amazon MSK Connect リソースのセットアップに役立つ例が含まれています。

**Topics**
+ [Amazon S3 シンクコネクタを設定する](mkc-S3sink-connector-example.md)
+ [MSK Connect の EventBridge Kafka シンクコネクタを設定する](mkc-eventbridge-kafka-connector.md)
+ [Debezium ソースコネクタ (設定プロバイダー付き) を使用する](mkc-debeziumsource-connector-example.md)

# Amazon S3 シンクコネクタを設定する
<a name="mkc-S3sink-connector-example"></a>

この例では、Confluent [Amazon S3 シンクコネクタ](https://www.confluent.io/hub/confluentinc/kafka-connect-s3)と を使用して AWS CLI MSK Connect で Amazon S3 シンクコネクタを作成する方法を示します。

1. 次の JSON をコピーして、新しいファイルに貼り付けます。プレースホルダー文字列を、Amazon MSK クラスターのブートストラップサーバー接続文字列とクラスターのサブネットおよびセキュリティグループ ID に対応する値に置き換えます。サービス実行ロールの設定方法については、「[MSK Connect の IAM のロールとポリシー](msk-connect-iam.md)」を参照してください。

   ```
   {
       "connectorConfiguration": {
           "connector.class": "io.confluent.connect.s3.S3SinkConnector",
           "s3.region": "us-east-1",
           "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
           "flush.size": "1",
           "schema.compatibility": "NONE",
           "topics": "my-test-topic",
           "tasks.max": "2",
           "partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner",
           "storage.class": "io.confluent.connect.s3.storage.S3Storage",
           "s3.bucket.name": "amzn-s3-demo-bucket"
       },
       "connectorName": "example-S3-sink-connector",
       "kafkaCluster": {
           "apacheKafkaCluster": {
               "bootstrapServers": "<cluster-bootstrap-servers-string>",
               "vpc": {
                   "subnets": [
                       "<cluster-subnet-1>",
                       "<cluster-subnet-2>",
                       "<cluster-subnet-3>"
                   ],
                   "securityGroups": ["<cluster-security-group-id>"]
               }
           }
       },
       "capacity": {
           "provisionedCapacity": {
               "mcuCount": 2,
               "workerCount": 4
           }
       },
       "kafkaConnectVersion": "2.7.1",
       "serviceExecutionRoleArn": "<arn-of-a-role-that-msk-connect-can-assume>",
       "plugins": [
           {
               "customPlugin": {
                   "customPluginArn": "<arn-of-custom-plugin-that-contains-connector-code>",
                   "revision": 1
               }
           }
       ],
       "kafkaClusterEncryptionInTransit": {"encryptionType": "PLAINTEXT"},
       "kafkaClusterClientAuthentication": {"authenticationType": "NONE"}
   }
   ```

1. 前のステップで JSON ファイルを保存したフォルダで、次の AWS CLI コマンドを実行します。

   ```
   aws kafkaconnect create-connector --cli-input-json file://connector-info.json
   ```

   以下は、コマンドを正常に実行したときに得られる出力の例です。

   ```
   {
       "ConnectorArn": "arn:aws:kafkaconnect:us-east-1:123450006789:connector/example-S3-sink-connector/abc12345-abcd-4444-a8b9-123456f513ed-2", 
       "ConnectorState": "CREATING", 
       "ConnectorName": "example-S3-sink-connector"
   }
   ```

# MSK Connect の EventBridge Kafka シンクコネクタを設定する
<a name="mkc-eventbridge-kafka-connector"></a>

このトピックでは、MSK Connect 用の [EventBridge Kafka シンクコネクタ](https://github.com/awslabs/eventbridge-kafka-connector)を設定する方法について説明します。このコネクタを使用すると、MSK クラスターから EventBridge [イベントバス](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-event-bus.html) にイベントを送信できます。このトピックでは、必要なリソースを作成し、Kafka と EventBridge 間のシームレスなデータフローを可能にするようにコネクタを設定するプロセスについて説明します。

**Topics**
+ [前提条件](#mkc-eb-kafka-prerequisites)
+ [MSK Connect に必要なリソースをセットアップする](#mkc-eb-kafka-set-up-resources)
+ [コネクタを作成する](#mkc-eb-kafka-create-connector)
+ [Kafka にメッセージを送信する](#mkc-eb-kafka-send-json-encoded-messages)

## 前提条件
<a name="mkc-eb-kafka-prerequisites"></a>

コネクタをデプロイする前に、次のリソースがあることを確認してください。
+ **Amazon MSK クラスター**: Kafka メッセージを生成および消費するためのアクティブな MSK クラスター。
+ **Amazon EventBridge イベントバス**: Kafka トピックからイベントを受信するための EventBridge イベントバス。
+ **IAM ロール**: MSK Connect と EventBridge コネクタに必要なアクセス許可を持つ IAM ロールを作成します。
+ MSK Connect または [VPC インターフェース エンドポイント](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-related-service-vpc.html) から [パブリック インターネットへのアクセス](msk-connect-internet-access.md) (MSK クラスターの VPC およびサブネット内に作成された EventBridge 用)。これにより、NAT ゲートウェイを必要とせずにパブリックインターネットを経由することを回避できます。
+ Amazon EC2 インスタンスや [AWS CloudShell](https://aws.amazon.com/cloudshell/) などの [クライアントマシン](create-serverless-cluster-client.md) を使用して、トピックを作成し Kafka にレコードを送信します。

## MSK Connect に必要なリソースをセットアップする
<a name="mkc-eb-kafka-set-up-resources"></a>

コネクタの IAM ロールを作成してから、コネクタを作成します。また、EventBridge ルールを作成して、EventBridge イベントバスに送信された Kafka イベントをフィルタリングします。

**Topics**
+ [コネクタの IAM ロール](#mkc-eb-kafka-iam-role-connector)
+ [受信イベントの EventBridge ルール](#mkc-eb-kafka-create-rule)

### コネクタの IAM ロール
<a name="mkc-eb-kafka-iam-role-connector"></a>

コネクタに関連付ける IAM ロールには、EventBridge へのイベントの送信を許可する [PutEvents](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-permissions-reference.html) アクセス許可が必要です。次の IAM ポリシーの例では、`example-event-bus` という名前のイベントバスにイベントを送信する許可を付与します。次の例のリソース ARN をイベントバスの ARN に置き換えてください。

------
#### [ JSON ]

****  

```
{
  "Version":"2012-10-17",		 	 	 
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "events:PutEvents"
      ],
      "Resource": "arn:aws:events:us-east-1:123456789012:event-bus/example-event-bus"
    }
  ]
}
```

------

さらに、コネクタの IAM ロールに次の信頼ポリシーが含まれていることを確認する必要があります。

------
#### [ JSON ]

****  

```
{
  "Version":"2012-10-17",		 	 	 
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "Service": "kafkaconnect.amazonaws.com"
      },
      "Action": "sts:AssumeRole"
    }
  ]
}
```

------

### 受信イベントの EventBridge ルール
<a name="mkc-eb-kafka-create-rule"></a>

受信イベントを[https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-event-patterns.html](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-event-patterns.html)と呼ばれるイベントデータ基準と照合する[ルール](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-rules.html)を作成します。イベントパターンを使用すると、受信イベントをフィルタリングする基準を定義し、特定のルールをトリガーし、その後指定された[ターゲット](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-targets.html)にルーティングするイベントを決定できます。次のイベントパターンの例は、EventBridge イベントバスに送信された Kafka イベントと一致します。

```
{
  "detail": {
    "topic": ["msk-eventbridge-tutorial"]
  }
}
```

以下は、Kafka シンクコネクタを使用して Kafka から EventBridge に送信されるイベントの例です。

```
{
  "version": "0",
  "id": "dbc1c73a-c51d-0c0e-ca61-ab9278974c57",
  "account": "123456789012",
  "time": "2025-03-26T10:15:00Z",
  "region": "us-east-1",
  "detail-type": "msk-eventbridge-tutorial",
  "source": "kafka-connect.msk-eventbridge-tutorial",
  "resources": [],
  "detail": {
    "topic": "msk-eventbridge-tutorial",
    "partition": 0,
    "offset": 0,
    "timestamp": 1742984100000,
    "timestampType": "CreateTime",
    "headers": [],
    "key": "order-1",
    "value": {
      "orderItems": [
        "item-1",
        "item-2"
      ],
      "orderCreatedTime": "Wed Mar 26 10:15:00 UTC 2025"
    }
  }
}
```

EventBridge コンソールで、このパターン例を使用してイベントバスに[ルールを作成し](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-create-rule.html)、CloudWatch Logs グループなどのターゲットを指定します。EventBridge コンソールは、CloudWatch Logs グループに必要なアクセスポリシーを自動的に設定します。

## コネクタを作成する
<a name="mkc-eb-kafka-create-connector"></a>

次のセクションでは、 AWS マネジメントコンソールを使用して [EventBridge Kafka シンクコネクタ](https://github.com/awslabs/eventbridge-kafka-connector) を作成してデプロイします。

**Topics**
+ [ステップ 1: コレクタをダウンロードする](#mkc-eb-kafka-download-connector)
+ [ステップ 1: Amazon S3 バケットを作成する](#mkc-eb-kafka-s3-bucket-create)
+ [ステップ 3: MSK Connect でプラグインを作成する](#mkc-eb-kafka-create-plugin)
+ [ステップ 4: コネクタを作成する](#mkc-eb-kafka-create-connector)

### ステップ 1: コレクタをダウンロードする
<a name="mkc-eb-kafka-download-connector"></a>

EventBridge Kafka コネクタの [GitHub リリースページ](https://github.com/awslabs/eventbridge-kafka-connector/releases)から最新の EventBridge コネクタシンク JAR をダウンロードします。たとえば、バージョン v1.4.1 をダウンロードするには、JAR ファイルリンク `kafka-eventbridge-sink-with-dependencies.jar` を選択してコネクタをダウンロードします。次に、ファイルをマシン上の任意の場所に保存します。

### ステップ 1: Amazon S3 バケットを作成する
<a name="mkc-eb-kafka-s3-bucket-create"></a>

1. MSK Connect で使用する JAR ファイルを Amazon S3 に保存するには、 を開き AWS マネジメントコンソール、Amazon S3 を選択します。

1. Amazon S3 コンソールで、**バケットの作成**を選択し、一意のバケット名を入力します。例えば、**amzn-s3-demo-bucket1-eb-connector**。

1. Amazon S3 バケットに適したリージョンを選択します。MSK クラスターがデプロイされているリージョンと一致することを確認してください。

1. **バケット設定**では、デフォルトの選択を保持するか、必要に応じて調整します。

1. [**バケットを作成**] を選択します。

1. JAR ファイルを Amazon S3 バケットにアップロードします。

### ステップ 3: MSK Connect でプラグインを作成する
<a name="mkc-eb-kafka-create-plugin"></a>

1. を開き AWS マネジメントコンソール、**MSK Connect** に移動します。

1. 左側のナビゲーションペインで **[カスタムプラグイン]** を選択します。

1. **プラグインの作成**を選択し、**プラグイン名**を入力します。例えば、**eventbridge-sink-plugin**。

1. **カスタムプラグインの場所**の場合は、**S3 オブジェクト URL **を貼り付けます。

1. プラグインのオプションの説明を追加します。

1. **プラグインの作成** を選択します。

プラグインを作成したら、それを使用して MSK Connect で EventBridge Kafka コネクタを設定およびデプロイできます。

### ステップ 4: コネクタを作成する
<a name="mkc-eb-kafka-create-connector"></a>

コネクタを作成する前に、コネクタエラーを避けるために必要な Kafka トピックを作成することをお勧めします。トピックを作成するには、クライアントマシンを使用します。

1. MSK コンソールの左側のペインで[**コネクタ**] を選択し、[**コネクタの作成**]を選択します。

1. プラグインのリストで、**eventbridge-sink-plugin** を選択し、**Next** を選択します。

1. コネクタ名には、 **EventBridgeSink** と入力します。

1. クラスターリストから、 お客様の MSK クラスターを選択します。

1. <a name="connector-ex"></a>以下のコネクタ設定をコピーし、**コネクタ設定**フィールドに貼り付けてください

   必要に応じて、次の設定のプレースホルダーを置き換えます。
   + MSK クラスターにパブリックインターネットアクセスがある場合は、`aws.eventbridge.endpoint.uri` を削除します。
   + PrivateLink を使用して MSK から EventBridge に安全に接続する場合は、`https://` の後の DNS 部分を、前に作成した EventBridge の (オプション) VPC インターフェイスエンドポイントの正しいプライベート DNS 名に置き換えます。
   + 次の設定の EventBridge イベントバス ARN をイベントバスの ARN に置き換えます。
   + リージョン固有の値を更新します。

   ```
   {
     "connector.class": "software.amazon.event.kafkaconnector.EventBridgeSinkConnector",
     "aws.eventbridge.connector.id": "msk-eventbridge-tutorial",
     "topics": "msk-eventbridge-tutorial",
     "tasks.max": "1",
     "aws.eventbridge.endpoint.uri": "https://events.us-east-1.amazonaws.com",
     "aws.eventbridge.eventbus.arn": "arn:aws:events:us-east-1:123456789012:event-bus/example-event-bus",
     "value.converter.schemas.enable": "false",
     "value.converter": "org.apache.kafka.connect.json.JsonConverter",
     "aws.eventbridge.region": "us-east-1",
     "auto.offset.reset": "earliest",
     "key.converter": "org.apache.kafka.connect.storage.StringConverter"
   }
   ```

   コネクタ設定の詳細については、「[eventbridge-kafka-connector](https://github.com/awslabs/eventbridge-kafka-connector)」を参照してください。

   必要に応じて、ワーカーと自動スケーリングの設定を変更します。また、ドロップダウンから利用可能な最新の (推奨) Apache Kafka Connect バージョンを使用することをお勧めします。**アクセス許可**で、前に作成したロールを使用します。また、オブザーバビリティとトラブルシューティングのために CloudWatch へのログ記録を有効にすることをお勧めします。必要に応じて、タグなどの他のオプション設定を調整します。次に、コネクタをデプロイし、ステータスが実行中状態になるまで待ちます。

## Kafka にメッセージを送信する
<a name="mkc-eb-kafka-send-json-encoded-messages"></a>

Apache Avro や JSON などのメッセージエンコーディングを設定するには、 `value.converter` と、オプションで Kafka Connect で使用できる`key.converter`設定を使用して、さまざまなコンバーターを指定します。

このトピックの [connector example](#connector-ex) は、`org.apache.kafka.connect.json.JsonConverter` の使用で示されているように、`value converter` 用の JSON エンコードされたメッセージで動作するように設定されています。コネクタが実行中状態になったら、クライアントマシンから `msk-eventbridge-tutorial` Kafka トピックにレコードを送信します。

# Debezium ソースコネクタ (設定プロバイダー付き) を使用する
<a name="mkc-debeziumsource-connector-example"></a>

この例は、MySQL 互換の [Amazon Aurora](https://aws.amazon.com/rds/aurora/) データベースをソースとして Debezium MySQL コネクタプラグインを使用する方法を示しています。この例では、 AWS Secrets Managerのデータベースの認証情報を外部化するために、オープンソースの [AWS Secrets Manager Config プロバイダー](https://github.com/jcustenborder/kafka-config-provider-aws)も設定しています。設定プロバイダーの詳細については、「[チュートリアル: 設定プロバイダーを用いた機密情報の外部化](msk-connect-config-provider.md)」を参照してください。

**重要**  
Debezium MySQL コネクタプラグインは [1 つのタスクのみをサポート](https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-property-tasks-max)し、Amazon MSK Connect の自動スケーリングキャパシティモードでは動作しません。代わりにプロビジョニングキャパシティモードを使用し、`workerCount` をコネクタ設定の値と等しい値に設定してください。MSK Connect のキャパシティモードの詳細については、「[コネクタ容量を理解する](msk-connect-capacity.md)」を参照してください。

# Debezium ソースコネクタを使用するための完全な前提条件
<a name="mkc-debeziumsource-connector-example-prereqs"></a>

コネクタは、 の外部 AWS Secrets Manager にある などのサービスとやり取りできるように、インターネットにアクセスできる必要があります Amazon Virtual Private Cloud。このセクションの手順は、インターネットアクセスを有効にするための次のタスクを実行するのに役立ちます。
+ NAT ゲートウェイをホストし、VPC 内のインターネットゲートウェイにトラフィックをルーティングするパブリックサブネットを設定します。
+ プライベートサブネットのトラフィックを NAT ゲートウェイに送るデフォルトルートを作成します。

詳細については、「[Amazon MSK Connect のインターネットアクセスを有効にする](msk-connect-internet-access.md)」を参照してください。

**前提条件**

インターネットアクセスを有効にするには、以下のものが必要です。
+ クラスターに関連付けられている Amazon Virtual Private Cloud (VPC) の ID。例えば、*vpc-123456ab* などです。
+ VPC 内のプライベートサブネットの ID。例えば、*subnet-a1b2c3de*、*subnet-f4g5h6ij* などです。コネクタにはプライベートサブネットを設定する必要があります。

**コネクタのインターネットアクセスを有効にするには**

1. [https://console.aws.amazon.com/vpc/](https://console.aws.amazon.com/vpc/) で Amazon Virtual Private Cloud コンソールを開きます。

1. わかりやすい名前を付けて NAT ゲートウェイのパブリックサブネットを作成し、サブネット ID を書き留めます。詳細な手順については、「[VPC にサブネットを作成する](https://docs.aws.amazon.com/vpc/latest/userguide/working-with-vpcs.html#AddaSubnet)」を参照してください。

1. VPC がインターネットと通信できるようにインターネットゲートウェイを作成し、ゲートウェイ ID を書き留めます。VPC にインターネットゲートウェイをアタッチします。手順については、「[インターネットゲートウェイの作成とアタッチ](https://docs.aws.amazon.com/vpc/latest/userguide/VPC_Internet_Gateway.html#Add_IGW_Attach_Gateway)」を参照してください。

1. プライベートサブネット内のホストがパブリックサブネットにアクセスできるように、パブリック NAT ゲートウェイをプロビジョニングします。NAT ゲートウェイを作成するときに、前に作成したパブリックサブネットを選択します。手順については、「[NAT ゲートウェイの作成](https://docs.aws.amazon.com/vpc/latest/userguide/vpc-nat-gateway.html#nat-gateway-creating)」を参照してください。

1. ルートテーブルを設定します。この設定を完了するには、合計で 2 つのルートテーブルが必要です。VPC と同時に自動的に作成されたメインのルートテーブルが既にあるはずです。このステップでは、パブリックサブネット用の追加のルートテーブルを作成します。

   1. 次の設定を使用して VPC のメインルートテーブルを変更し、プライベートサブネットがトラフィックを NAT ゲートウェイにルーティングするようにします。手順については、*Amazon Virtual Private Cloud**ユーザーガイド*の[ルートテーブルの操作](https://docs.aws.amazon.com/vpc/latest/userguide/WorkWithRouteTables.html) を参照してください。  
**プライベート MSKC ルートテーブル**    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/ja_jp/msk/latest/developerguide/mkc-debeziumsource-connector-example-prereqs.html)

   1. 「[カスタムルートテーブルを作成する](https://docs.aws.amazon.com/vpc/latest/userguide/VPC_Internet_Gateway.html#Add_IGW_Routing)」の手順に従って、パブリックサブネットのルートテーブルを作成します。テーブルを作成するときは、そのテーブルがどのサブネットに関連付けられているかを識別しやすいように、**[名前タグ]** フィールドにわかりやすい名前を入力します。例えば、**パブリック MSKC** と入力します。

   1. 以下の設定を使用して**パブリック MSKC** のルートテーブルを設定します。  
****    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/ja_jp/msk/latest/developerguide/mkc-debeziumsource-connector-example-prereqs.html)

Amazon MSK Connect のインターネットアクセスが有効になり、コネクタを作成する準備が整いました。

# Debezium ソースコネクタを作成する
<a name="msk-connect-debeziumsource-connector-example-steps"></a>

この手順では、Debezium ソースコネクタを作成する方法について説明します。

1. 

**カスタムプラグインを作成する**

   1. [Debezium](https://debezium.io/releases/) サイトから最新の安定版リリース用の MySQL コネクタプラグインをダウンロードしてください。ダウンロードした Debezium リリースバージョン (バージョン 2.x、または古いシリーズ 1.x) を書き留めます。この手順の後半で、Debezium のバージョンに基づいてコネクタを作成します。

   1. [AWS Secrets Manager 設定プロバイダー](https://www.confluent.io/hub/jcustenborder/kafka-config-provider-aws)をダウンロードして解凍します。

   1. 以下のアーカイブを同じディレクトリに置きます。
      + `debezium-connector-mysql` フォルダ
      + `jcusten-border-kafka-config-provider-aws-0.1.1` フォルダ

   1. 前のステップで作成したディレクトリを ZIP ファイルに圧縮し、その ZIP ファイルを S3 バケットにアップロードします。手順については、*Amazon S3 ユーザーガイド*の[オブジェクトのアップロード](https://docs.aws.amazon.com/AmazonS3/latest/userguide/upload-objects.html) を参照してください。

   1. 次の JSON をコピーして、ファイルに貼り付けます。例えば、`debezium-source-custom-plugin.json`。*<example-custom-plugin-name>* をプラグインに付けたい名前に置き換え、*<amzn-s3-demo-bucket-arn>* を ZIP ファイルをアップロードした Amazon S3 バケットの ARN に置き換え、 `<file-key-of-ZIP-object>` を S3 にアップロードした ZIP オブジェクトのファイルキーに置き換えてください。

      ```
      {
          "name": "<example-custom-plugin-name>",
          "contentType": "ZIP",
          "location": {
              "s3Location": {
                  "bucketArn": "<amzn-s3-demo-bucket-arn>",
                  "fileKey": "<file-key-of-ZIP-object>"
              }
          }
      }
      ```

   1. JSON ファイルを保存したフォルダから次の AWS CLI コマンドを実行して、プラグインを作成します。

      ```
      aws kafkaconnect create-custom-plugin --cli-input-json file://<debezium-source-custom-plugin.json>
      ```

      以下のような出力が表示されます。

      ```
      {
          "CustomPluginArn": "arn:aws:kafkaconnect:us-east-1:012345678901:custom-plugin/example-custom-plugin-name/abcd1234-a0b0-1234-c1-12345678abcd-1",
          "CustomPluginState": "CREATING",
          "Name": "example-custom-plugin-name",
          "Revision": 1
      }
      ```

   1. 次のコマンドを実行して、プラグインの状態を確認します。状態は `CREATING` から `ACTIVE` に変わります。ARN プレースホルダーを前のコマンドの出力で取得した ARN に置き換えます。

      ```
      aws kafkaconnect describe-custom-plugin --custom-plugin-arn "<arn-of-your-custom-plugin>"
      ```

1. 

**データベース認証情報のシークレットを設定 AWS Secrets Manager および作成する**

   1. [https://console.aws.amazon.com/secretsmanager/](https://console.aws.amazon.com/secretsmanager/) から Secrets Manager コンソールを開きます。

   1. データベースのサインイン認証情報を保存する新しいシークレットを作成します。手順については、*AWS Secrets Manager* ユーザーガイドの[シークレットを作成する](https://docs.aws.amazon.com/secretsmanager/latest/userguide/manage_create-basic-secret.html)を参照してください。

   1. シークレットの ARN をコピーします。

   1. 以下のサンプルポリシーの Secrets Manager のアクセス許可を [サービス実行ロールを理解する](msk-connect-service-execution-role.md) に追加します。*<arn:aws:secretsmanager:us-east-1:123456789000:secret:MySecret-1234>* をシークレットの ARN で置き換えます。

------
#### [ JSON ]

****  

      ```
      {
        "Version":"2012-10-17",		 	 	 
        "Statement": [
          {
            "Effect": "Allow",
            "Action": [
              "secretsmanager:GetResourcePolicy",
              "secretsmanager:GetSecretValue",
              "secretsmanager:DescribeSecret",
              "secretsmanager:ListSecretVersionIds"
            ],
            "Resource": [
            "arn:aws:secretsmanager:us-east-1:123456789012:secret:MySecret-1234"
            ]
          }
        ]
      }
      ```

------

      IAM のアクセス許可を追加する手順については、「*IAM ユーザーガイド*」の「[IAM ID のアクセス許可の追加と削除](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_manage-attach-detach.html)」を参照してください。

1. 

**設定プロバイダーに関する情報を使用してカスタムワーカー設定を作成します。**

   1. 次のワーカー設定プロパティをファイルにコピーして、プレースホルダー文字列をシナリオに対応する値に置き換えます。 AWS Secrets Manager 設定プロバイダーの設定プロパティの詳細については、プラグインのドキュメントの「[SecretsManagerConfigProvider](https://jcustenborder.github.io/kafka-connect-documentation/projects/kafka-config-provider-aws/configProviders/SecretsManagerConfigProvider.html)」を参照してください。

      ```
      key.converter=<org.apache.kafka.connect.storage.StringConverter>
      value.converter=<org.apache.kafka.connect.storage.StringConverter>
      config.providers.secretManager.class=com.github.jcustenborder.kafka.config.aws.SecretsManagerConfigProvider
      config.providers=secretManager
      config.providers.secretManager.param.aws.region=<us-east-1>
      ```

   1. 次の AWS CLI コマンドを実行して、カスタムワーカー設定を作成します。

      以下の値を置き換えます:
      + *<my-worker-config-name>* - カスタムワーカー設定のわかりやすい名前
      + *<encoded-properties-file-content-string>* - 前のステップでコピーしたプレーンテキストプロパティの base64 でエンコードされたバージョン

      ```
      aws kafkaconnect create-worker-configuration --name <my-worker-config-name> --properties-file-content <encoded-properties-file-content-string>
      ```

1. 

**コネクタを作成する**

   1. Debezium のバージョン (2.x または 1.x) に対応する次の JSON をコピーして、新しいファイルに貼り付けます。`<placeholder>` 文字列をシナリオに対応する値に置き換えます。サービス実行ロールの設定方法については、「[MSK Connect の IAM のロールとポリシー](msk-connect-iam.md)」を参照してください。

      この設定では、データベースの認証情報を指定するのにプレーンテキストではなく `${secretManager:MySecret-1234:dbusername}` のような変数を使用していることに注意してください。`MySecret-1234` をシークレットの名前に置き換えてから、取得したいキーの名前を入力します。また、`<arn-of-config-provider-worker-configuration>` をカスタムワーカー設定の ARN に置き換える必要があります。

------
#### [ Debezium 2.x ]

      Debezium 2.x バージョンでは、次の JSON をコピーして、新しいファイルに貼り付けます。*<placeholder>* 文字列をシナリオに対応する値に置き換えます。

      ```
      {
      	"connectorConfiguration": {
      		"connector.class": "io.debezium.connector.mysql.MySqlConnector",
      		"tasks.max": "1",
      		"database.hostname": "<aurora-database-writer-instance-endpoint>",
      		"database.port": "3306",
      		"database.user": "<${secretManager:MySecret-1234:dbusername}>",
      		"database.password": "<${secretManager:MySecret-1234:dbpassword}>",
      		"database.server.id": "123456",
      		"database.include.list": "<list-of-databases-hosted-by-specified-server>",
      		"topic.prefix": "<logical-name-of-database-server>",
      		"schema.history.internal.kafka.topic": "<kafka-topic-used-by-debezium-to-track-schema-changes>",
      		"schema.history.internal.kafka.bootstrap.servers": "<cluster-bootstrap-servers-string>",
      		"schema.history.internal.consumer.security.protocol": "SASL_SSL",
      		"schema.history.internal.consumer.sasl.mechanism": "AWS_MSK_IAM",
      		"schema.history.internal.consumer.sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;",
      		"schema.history.internal.consumer.sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler",
      		"schema.history.internal.producer.security.protocol": "SASL_SSL",
      		"schema.history.internal.producer.sasl.mechanism": "AWS_MSK_IAM",
      		"schema.history.internal.producer.sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;",
      		"schema.history.internal.producer.sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler",
      		"include.schema.changes": "true"
      	},
      	"connectorName": "example-Debezium-source-connector",
      	"kafkaCluster": {
      		"apacheKafkaCluster": {
      			"bootstrapServers": "<cluster-bootstrap-servers-string>",
      			"vpc": {
      				"subnets": [
      					"<cluster-subnet-1>",
      					"<cluster-subnet-2>",
      					"<cluster-subnet-3>"
      				],
      				"securityGroups": ["<id-of-cluster-security-group>"]
      			}
      		}
      	},
      	"capacity": {
      		"provisionedCapacity": {
      			"mcuCount": 2,
      			"workerCount": 1
      		}
      	},
      	"kafkaConnectVersion": "2.7.1",
      	"serviceExecutionRoleArn": "<arn-of-service-execution-role-that-msk-connect-can-assume>",
      	"plugins": [{
      		"customPlugin": {
      			"customPluginArn": "<arn-of-msk-connect-plugin-that-contains-connector-code>",
      			"revision": 1
      		}
      	}],
      	"kafkaClusterEncryptionInTransit": {
      		"encryptionType": "TLS"
      	},
      	"kafkaClusterClientAuthentication": {
      		"authenticationType": "IAM"
      	},
      	"workerConfiguration": {
      		"workerConfigurationArn": "<arn-of-config-provider-worker-configuration>",
      		"revision": 1
      	}
      }
      ```

------
#### [ Debezium 1.x ]

      Debezium 1.x バージョンでは、次の JSON をコピーして、新しいファイルに貼り付けます。*<placeholder>* 文字列をシナリオに対応する値に置き換えます。

      ```
      {
      	"connectorConfiguration": {
      		"connector.class": "io.debezium.connector.mysql.MySqlConnector",
      		"tasks.max": "1",
      		"database.hostname": "<aurora-database-writer-instance-endpoint>",
      		"database.port": "3306",
      		"database.user": "<${secretManager:MySecret-1234:dbusername}>",
      		"database.password": "<${secretManager:MySecret-1234:dbpassword}>",
      		"database.server.id": "123456",
      		"database.server.name": "<logical-name-of-database-server>",
      		"database.include.list": "<list-of-databases-hosted-by-specified-server>",
      		"database.history.kafka.topic": "<kafka-topic-used-by-debezium-to-track-schema-changes>",
      		"database.history.kafka.bootstrap.servers": "<cluster-bootstrap-servers-string>",
      		"database.history.consumer.security.protocol": "SASL_SSL",
      		"database.history.consumer.sasl.mechanism": "AWS_MSK_IAM",
      		"database.history.consumer.sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;",
      		"database.history.consumer.sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler",
      		"database.history.producer.security.protocol": "SASL_SSL",
      		"database.history.producer.sasl.mechanism": "AWS_MSK_IAM",
      		"database.history.producer.sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;",
      		"database.history.producer.sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler",
      		"include.schema.changes": "true"
      	},
      	"connectorName": "example-Debezium-source-connector",
      	"kafkaCluster": {
      		"apacheKafkaCluster": {
      			"bootstrapServers": "<cluster-bootstrap-servers-string>",
      			"vpc": {
      				"subnets": [
      					"<cluster-subnet-1>",
      					"<cluster-subnet-2>",
      					"<cluster-subnet-3>"
      				],
      				"securityGroups": ["<id-of-cluster-security-group>"]
      			}
      		}
      	},
      	"capacity": {
      		"provisionedCapacity": {
      			"mcuCount": 2,
      			"workerCount": 1
      		}
      	},
      	"kafkaConnectVersion": "2.7.1",
      	"serviceExecutionRoleArn": "<arn-of-service-execution-role-that-msk-connect-can-assume>",
      	"plugins": [{
      		"customPlugin": {
      			"customPluginArn": "<arn-of-msk-connect-plugin-that-contains-connector-code>",
      			"revision": 1
      		}
      	}],
      	"kafkaClusterEncryptionInTransit": {
      		"encryptionType": "TLS"
      	},
      	"kafkaClusterClientAuthentication": {
      		"authenticationType": "IAM"
      	},
      	"workerConfiguration": {
      		"workerConfigurationArn": "<arn-of-config-provider-worker-configuration>",
      		"revision": 1
      	}
      }
      ```

------

   1. 前のステップで JSON ファイルを保存したフォルダで、次の AWS CLI コマンドを実行します。

      ```
      aws kafkaconnect create-connector --cli-input-json file://connector-info.json
      ```

      以下は、コマンドを正常に実行したときに得られる出力の例です。

      ```
      {
          "ConnectorArn": "arn:aws:kafkaconnect:us-east-1:123450006789:connector/example-Debezium-source-connector/abc12345-abcd-4444-a8b9-123456f513ed-2", 
          "ConnectorState": "CREATING", 
          "ConnectorName": "example-Debezium-source-connector"
      }
      ```

# Debezium コネクタ設定を更新する
<a name="mkc-debeziumsource-connector-update"></a>

Debezium コネクタの設定を更新するには、次の手順に従います。

1. 次の JSON をコピーし、新しいファイルに貼り付けます。`<placeholder>` 文字列をシナリオに対応する値に置き換えます。

   ```
   {
      "connectorArn": <connector_arn>,
      "connectorConfiguration": <new_configuration_in_json>,
      "currentVersion": <current_version>
   }
   ```

1. 前のステップで JSON ファイルを保存したフォルダで、次の AWS CLI コマンドを実行します。

   ```
   aws kafkaconnect update-connector --cli-input-json file://connector-info.json
   ```

   コマンドを正常に実行したときの出力の例を次に示します。

   ```
   {
       "connectorArn": "arn:aws:kafkaconnect:us-east-1:123450006789:connector/example-Debezium-source-connector/abc12345-abcd-4444-a8b9-123456f513ed-2",
       "connectorOperationArn": "arn:aws:kafkaconnect:us-east-1:123450006789:connector-operation/example-Debezium-source-connector/abc12345-abcd-4444-a8b9-123456f513ed-2/41b6ad56-3184-479b-850a-a8bedd5a02f3",
       "connectorState": "UPDATING"
   }
   ```

1. 次のコマンドを実行して、オペレーションの現在の状態をモニタリングできるようになりました。

   ```
   aws kafkaconnect describe-connector-operation --connector-operation-arn <operation_arn>
   ```

詳細なステップを含む Debezium コネクタの例については、「[Amazon MSK Connect の紹介 – マネージドコネクタを使用した Apache Kafka クラスターとの間のデータのストリーミング](https://aws.amazon.com/blogs/aws/introducing-amazon-msk-connect-stream-data-to-and-from-your-apache-kafka-clusters-using-managed-connectors/)」を参照してください。

# Amazon MSK Connect への移行
<a name="msk-connect-migrating"></a>

このセクションでは、Apache Kafka コネクタアプリケーションを Amazon Managed Streaming for Apache Kafka Connect (Amazon MSK Connect) に移行する方法について説明します。Amazon MSK Connect に移行する利点の詳細については、「[Amazon MSK Connect を使用する利点](msk-connect.md#msk-connect-benefits)」を参照してください。

このセクションでは、Kafka Connect と Amazon MSK Connect で使用される状態管理のトピックだけでなく、ソースコネクタとシンクコネクタの移行手順についても説明します。

# Kafka Connect で使用される内部トピックを理解する
<a name="msk-connect-kafka-connect-topics"></a>

分散モードで実行されている Apache Kafka Connect アプリケーションは、Kafka クラスターとグループメンバーの内部トピックを使用して、その状態を保存します。以下は、Kafka Connect アプリケーションで使用される内部トピックに対応する設定値です。
+ `config.storage.topic` で指定された設定トピック

  Kafka Connect は、ユーザーが開始したすべてのコネクタとタスクの設定を設定トピックに保存します。ユーザーがコネクタの設定を更新するたびに、またはコネクタが再設定の要求を送信するたびに (コネクタがより多くのタスクを開始できることを検出した場合など)、このトピックにレコードが出力されます。このトピックは圧縮が有効になっているため、常に各エンティティの最後の状態を維持します。
+ `offset.storage.topic` で指定されたオフセットトピック

  Kafka Connect は、ソースコネクタのオフセットをオフセットトピックに保存します。設定トピックと同様に、オフセットトピックは圧縮が有効になっています。このトピックは、外部システムから Kafka にデータを生成するソースコネクタのソース位置のみを記述するために使用されます。Kafka からデータを読み込んで外部システムに送信するシンクコネクタは、通常の Kafka コンシューマーグループを使用してコンシューマーオフセットを保存します。
+ `status.storage.topic` で指定されたステータストピック

  Kafka Connect は、コネクタとタスクの現在の状態をステータストピックに保存します。このトピックは、REST API のユーザーがクエリを実行するデータを一元化する場所として使用されます。このトピックを使用すると、ユーザーは任意のワーカーにクエリを実行し、実行中のすべてのプラグインのステータスを取得できます。設定トピックやオフセットのトピックと同様に、ステータストピックも圧縮が有効になっています。

これらのトピックに加えて、Kafka Connect は Kafka のグループメンバー API を幅広く活用しています。グループはコネクタ名にちなんで命名されます。例えば、file-sink という名前のコネクタの場合、グループは connect-file-sink という名前になります。グループ内の各コンシューマーは、単一のタスクにレコードを提供します。これらのグループとそのオフセットは、`Kafka-consumer-group.sh` などの通常のコンシューマーグループのツールを使用して取得できます。各シンクコネクタに対して、Connect ランタイムは Kafka からレコードを抽出する通常のコンシューマーグループを実行します。

# Amazon MSK Connect アプリケーションの状態管理
<a name="msk-connect-state-management"></a>

デフォルトでは、Amazon MSK Connect は、Amazon MSK コネクタごとに Kafka クラスターに 3 つの別個のトピックを作成し、コネクタの設定、オフセット、およびステータスを保存します。デフォルトのトピック名は、次のような構造になっています。
+ \$1\$1msk\$1connect\$1configs\$1*connector-name*\$1*connector-id*
+ \$1\$1msk\$1connect\$1status\$1*connector-name*\$1*connector-id*
+ \$1\$1msk\$1connect\$1offsets\$1*connector-name*\$1*connector-id*

**注記**  
ソースコネクタ間のオフセットの連続性を提供するために、デフォルトトピックの代わりに任意のオフセットストレージトピックを使用できます。オフセットストレージトピックを指定すると、前のコネクタの最後のオフセットから読み取りを再開するソースコネクタを作成するといったタスクを実行しやすくなります。オフセットストレージトピックを指定するには、コネクタを作成する前に Amazon MSK Connect ワーカー設定で [https://docs.aws.amazon.com/msk/latest/developerguide/msk-connect-workers.html#msk-connect-manage-connector-offsets](https://docs.aws.amazon.com/msk/latest/developerguide/msk-connect-workers.html#msk-connect-manage-connector-offsets) プロパティの値を指定します。

# ソースコネクタを Amazon MSK Connect に移行する
<a name="msk-connect-migrate-source-connectors"></a>

ソースコネクタは、外部システムから Kafka にレコードをインポートする Apache Kafka Connect アプリケーションです。このセクションでは、 で実行されているオンプレミスまたはセルフマネージド Kafka Connect クラスターを実行している Apache Kafka Connect ソースコネクタアプリケーションを AWS Amazon MSK Connect に移行するプロセスについて説明します。

Kafka Connect ソースコネクタアプリケーションは、設定プロパティ `offset.storage.topic` に設定された値で名前が付けられたトピックにオフセットを保存します。以下は、`movies` と `shows` という 2 つの異なるテーブルからデータをインポートする 2 つのタスクを実行している JDBC コネクタのオフセットメッセージの例です。テーブル movies からインポートされた最新の行のプライマリ ID は `18343` です。テーブル shows からインポートされた最新の行のプライマリ ID は `732` です。

```
["jdbcsource",{"protocol":"1","table":"sample.movies"}] {"incrementing":18343}
["jdbcsource",{"protocol":"1","table":"sample.shows"}] {"incrementing":732}
```

ソースコネクタを Amazon MSK Connect に移行するには、次の手順を実行します。

1. オンプレミスまたはセルフマネージド型の Kafka Connect クラスターからコネクタライブラリをプルして、Amazon MSK Connect [カスタムプラグイン](https://docs.aws.amazon.com/msk/latest/developerguide/msk-connect-plugins.html)を作成します。

1. Amazon MSK Connect [ワーカープロパティ](https://docs.aws.amazon.com/msk/latest/developerguide/msk-connect-config-provider.html#msk-connect-config-providers-create-custom-config)を作成し、プロパティ `key.converter`、`value.converter`、および `offset.storage.topic` を、既存の Kafka Connect クラスターで実行されている Kafka コネクタに設定された値と同じ値に設定します。

1. 既存の Kafka Connect クラスターに `PUT /connectors/connector-name/pause` リクエストを送信して、既存のクラスターのコネクタアプリケーションを一時停止します。

1. コネクタアプリケーションのすべてのタスクが完全に停止していることを確認します。タスクを停止するには、既存の Kafka Connect クラスターに `GET /connectors/connector-name/status` リクエストを送信するか、プロパティ `status.storage.topic` に設定されているトピック名からのメッセージを消費します。

1. 既存のクラスターからコネクタ設定を取得します。コネクタ設定を取得するには、既存のクラスターに `GET /connectors/connector-name/config/` リクエストを送信するか、プロパティ `config.storage.topic` に設定されているトピック名からのメッセージを消費します。

1. 既存のクラスターと同じ名前の新しい [Amazon MSK コネクタ](https://docs.aws.amazon.com/msk/latest/developerguide/msk-connect-connectors.html)を作成します。ステップ 1 で作成したコネクタカスタムプラグイン、ステップ 2 で作成したワーカープロパティ、およびステップ 5 で抽出したコネクタ設定を使用して、このコネクタを作成します。

1. Amazon MSK コネクタのステータスが `active` の場合、ログを表示して、コネクタがソースシステムからデータのインポートを開始したことを確認します。

1. `DELETE /connectors/connector-name` リクエストを送信して、既存のクラスター内のコネクタを削除します。

# シンクコネクタを Amazon MSK Connect に移行する
<a name="msk-connect-migrate-sink-connectors"></a>

シンクコネクタは、Kafka から外部システムにデータをエクスポートする Apache Kafka Connect アプリケーションです。このセクションでは、 で実行されているオンプレミスまたはセルフマネージド Kafka Connect クラスターを実行している Apache Kafka Connect シンクコネクタアプリケーションを AWS Amazon MSK Connect に移行するプロセスについて説明します。

Kafka Connect シンクコネクタは Kafka グループメンバー API を使用し、一般的なコンシューマーアプリケーションと同じ `__consumer_offset` トピックにオフセットを保存します。この動作により、シンクコネクタのセルフマネージド型クラスターから Amazon MSK Connect への移行が簡素化されます。

シンクコネクタを Amazon MSK Connect に移行するには、次の手順を実行します。

1. オンプレミスまたはセルフマネージド型の Kafka Connect クラスターからコネクタライブラリをプルして、Amazon MSK Connect [カスタムプラグイン](https://docs.aws.amazon.com/msk/latest/developerguide/msk-connect-plugins.html)を作成します。

1. Amazon MSK Connect [ワーカープロパティ](https://docs.aws.amazon.com/msk/latest/developerguide/msk-connect-config-provider.html#msk-connect-config-providers-create-custom-config)を作成し、プロパティ `key.converter` と `value.converter` を、既存の Kafka Connect クラスターで実行されている Kafka コネクタに設定された値と同じ値に設定します。

1. 既存の Kafka Connect クラスターに `PUT /connectors/connector-name/pause` リクエストを送信して、既存のクラスターでコネクタアプリケーションを一時停止します。

1. コネクタアプリケーションのすべてのタスクが完全に停止していることを確認します。タスクを停止するには、既存の Kafka Connect クラスターに `GET /connectors/connector-name/status` リクエストを送信するか、プロパティ `status.storage.topic` に設定されているトピック名からのメッセージを消費します。

1. 既存のクラスターからコネクタ設定を取得します。コネクタ設定を取得するには、既存のクラスターに `GET /connectors/connector-name/config` リクエストを送信するか、プロパティ `config.storage.topic` に設定されているトピック名からのメッセージを消費します。

1. 既存のクラスターと同じ名前の新しい [Amazon MSK コネクタ](https://docs.aws.amazon.com/msk/latest/developerguide/msk-connect-connectors.html)を作成します。ステップ 1 で作成したコネクタカスタムプラグイン、ステップ 2 で作成したワーカープロパティ、およびステップ 5 で抽出したコネクタ設定を使用して、このコネクタを作成します。

1. Amazon MSK コネクタのステータスが `active` の場合、ログを表示して、コネクタがソースシステムからデータのインポートを開始したことを確認します。

1. `DELETE /connectors/connector-name` リクエストを送信して、既存のクラスター内のコネクタを削除します。

# Amazon MSK Connect の問題のトラブルシューティング
<a name="msk-connect-troubleshooting"></a>

次の情報は、MSK Connect の使用時に発生する問題を解決するために役立ちます。[AWS re:Post](https://repost.aws/) に問題を投稿することもできます。

**パブリックインターネット上でホストされているリソースにコネクタがアクセスできない**  
「[Amazon MSK Connect のインターネットアクセスを有効にする](https://docs.aws.amazon.com/msk/latest/developerguide/msk-connect-internet-access.html)」を参照してください。

**コネクタで実行中のタスクの数が tasks.max で指定されたタスクの数と等しくない**  
コネクタが使用するタスクの数が指定された tasks.max 設定よりも少ない理由は次のとおりです。
+ コネクタの実装によっては、使用できるタスクの数が制限されている場合があります。例えば、MySQL 用の Debezium コネクタは 1 つのタスクしか使用できません。
+ 自動スケーリングキャパシティモードを使用する場合、Amazon MSK Connect は、コネクタで実行されているワーカーの数とワーカーあたりの MCU の数に比例する値でコネクタの tasks.max プロパティを上書きします。オプションの `maxAutoscalingTaskCount`パラメータを設定している場合、`tasks.max`値はこの制限を超えません。詳細については、[「最大自動スケーリングタスク数を理解する](https://docs.aws.amazon.com/msk/latest/developerguide/msk-connect-connectors.html#msk-connect-max-autoscaling-task-count)」を参照してください。
+ シンクコネクタの場合、並列処理レベル (タスクの数) はトピックパーティションの数を超えることはできません。tasks.max をそれより大きい値に設定することはできますが、1 つのパーティションが同時に複数のタスクによって処理されることはありません。
+ Kafka Connect 2.7.x では、デフォルトのコンシューマーパーティションのアサイナーは `RangeAssignor` です。このアサイナーの動作は、各トピックの最初のパーティションを単一のコンシューマーに、各トピックの 2 番目のパーティションを単一のコンシューマーに割り当てるというものです。つまり、`RangeAssignor` を使用するシンクコネクタのアクティブなタスクの最大数は、消費されている単一のトピックに含まれるパーティションの最大数と同じになります。これがユースケースに適さない場合は、`consumer.partition.assignment.strategy` プロパティをより適切なコンシューマーパーティションのアサイナーに設定した[ワーカー設定を作成](https://docs.aws.amazon.com/msk/latest/developerguide/msk-connect-workers.html#msk-connect-create-custom-worker-config)する必要があります。「[Kafka 2.7 Interface ConsumerPartitionAssignor: *All Known Implementing Classes*](https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.html)」を参照してください。