

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# サインイン認証情報を使用したクラスターへの接続
<a name="msk-password-tutorial-connect"></a>

シークレットを作成してクラスターに関連付けると、クライアントをクラスターに接続できます。次の手順は、SASL/SCRAM 認証を使用するクラスターにクライアントを接続する方法を示しています。また、トピックの例に対して生成および消費する方法も示します。

**Topics**
+ [SASL/SCRAM 認証を使用してクライアントをクラスターに接続する](#w2aab9c13c29c17c13c11b9b7)
+ [接続の問題のトラブルシューティング](#msk-password-tutorial-connect-troubleshooting)

## SASL/SCRAM 認証を使用してクライアントをクラスターに接続する
<a name="w2aab9c13c29c17c13c11b9b7"></a>

1.  AWS CLI がインストールされているマシンで次のコマンドを実行します。{{cluster-ARN}} を自身のクラスター ARN に置き換えます。

   ```
   aws kafka get-bootstrap-brokers --cluster-arn {{clusterARN}}
   ```

   このコマンドの JSON 結果から、`BootstrapBrokerStringSaslScram` という名前の文字列に関連付けられた値を保存します。この値は後のステップで使用します。

1. クライアントマシンで、シークレットに保存されているユーザー認証情報を含む JAAS 設定ファイルを作成します。例えば、ユーザー **alice** の場合、次の内容を含む `users_jaas.conf` という名前のファイルを作成します。

   ```
   KafkaClient {
      org.apache.kafka.common.security.scram.ScramLoginModule required
      username="alice"
      password="alice-secret";
   };
   ```

1. 次のコマンドを使用して、JAAS 設定ファイルを `KAFKA_OPTS` 環境パラメータとしてエクスポートします。

   ```
   export KAFKA_OPTS=-Djava.security.auth.login.config={{<path-to-jaas-file>}}/users_jaas.conf
   ```

1. `/tmp` ディレクトリに `kafka.client.truststore.jks` という名前のファイルを作成します。

1. (オプション)次のコマンドを使用して、 JVM `cacerts` フォルダから JDK キーストアファイルを、前の手順で作成した`kafka.client.truststore.jks`ファイルにコピーします。{{JDKFolder}} は、インスタンス上の JDK フォルダの名前に置き換えてください。例えば、JDK フォルダには `java-1.8.0-openjdk-1.8.0.201.b09-0.amzn2.x86_64` という名前が付いている場合があります。

   ```
   cp /usr/lib/jvm/{{JDKFolder}}/lib/security/cacerts /tmp/kafka.client.truststore.jks
   ```

1. Apache Kafka のインストール済み環境の `bin` ディレクトリに、次の内容を含む `client_sasl.properties` という名前のクライアントプロパティファイルを作成します。このファイルは、SASL メカニズムとプロトコルを定義します。

   ```
   security.protocol=SASL_SSL
   sasl.mechanism=SCRAM-SHA-512
   ```

1. 例となるトピックを作成するには、次のコマンドを実行します。{{BootstrapBrokerStringSaslScram}} を、このトピックの手順 1 で取得したブートストラップブローカー文字列に置き換えます。

   ```
   {{<path-to-your-kafka-installation>}}/bin/kafka-topics.sh --create --bootstrap-server {{BootstrapBrokerStringSaslScram}} --command-config {{<path-to-client-properties>}}/client_sasl.properties --replication-factor 3 --partitions 1 --topic ExampleTopicName
   ```

1. 作成したサンプルトピックにデータを生成するには、クライアントマシンで次のコマンドを実行します。{{BootstrapBrokerStringSaslScram}} を、このトピックの手順 1 で取得したブートストラップブローカー文字列に置き換えます。

   ```
   {{<path-to-your-kafka-installation>}}/bin/kafka-console-producer.sh --broker-list {{BootstrapBrokerStringSaslScram}} --topic {{ExampleTopicName}} --producer.config client_sasl.properties
   ```

1. 作成したトピックからデータを消費するには、クライアントマシンで次のコマンドを実行します。{{BootstrapBrokerStringSaslScram}} を、このトピックの手順 1 で取得したブートストラップブローカー文字列に置き換えます。

   ```
   {{<path-to-your-kafka-installation>}}/bin/kafka-console-consumer.sh --bootstrap-server {{BootstrapBrokerStringSaslScram}} --topic {{ExampleTopicName}} --from-beginning --consumer.config client_sasl.properties
   ```

## 接続の問題のトラブルシューティング
<a name="msk-password-tutorial-connect-troubleshooting"></a>

Kafka クライアントコマンドを実行する際、特に大規模なトピックやデータセットを操作する場合、Java ヒープメモリエラーが発生する可能性があります。これらのエラーは、Kafka ツールが Java アプリケーションとしてデフォルトのメモリ設定で実行されるため、ワークロードに対して不十分な場合があります。

`Out of Memory Java Heap` エラーを解決するには、`KAFKA_OPTS` 環境変数を変更してメモリ設定を含めることで、Java ヒープサイズを増やすことができます。

次の例では、ヒープの最大サイズを 1GB (`-Xmx1G`) に設定します。この値は、使用可能なシステムメモリと要件に基づいて調整できます。

```
export KAFKA_OPTS="-Djava.security.auth.login.config={{<path-to-jaas-file>}}/users_jaas.conf -Xmx1G"
```

大規模なトピックを消費する場合、メモリ使用量を抑えるために、`--from-beginning` の代わりに時間ベースまたはオフセットベースのパラメータの使用を検討してください。

```
{{<path-to-your-kafka-installation>}}/bin/kafka-console-consumer.sh --bootstrap-server {{BootstrapBrokerStringSaslScram}} --topic {{ExampleTopicName}} --max-messages 1000 --consumer.config client_sasl.properties
```