翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
での OpenSearch 取り込みパイプラインの使用 Amazon Managed Streaming for Apache Kafka
Kafka プラグイン
トピック
Amazon MSK の前提条件
OpenSearch 取り込みパイプラインを作成する前に、次のステップを実行します。
-
「Amazon Managed Streaming for Apache Kafka デベロッパーガイド」の「クラスターの作成」の手順に従って、Amazon MSK でプロビジョニングされたクラスターを作成します。ブローカータイプ では、
t3
タイプ以外のオプションを選択します。これらは Ingestion OpenSearch ではサポートされていないためです。 -
クラスターのステータスが Active になったら、「マルチ VPC 接続を有効にする」の手順に従います。
-
クラスターとパイプラインが同じ AWS アカウントにあるかどうかに応じて、「クラスターポリシーを MSK クラスターにアタッチする」のステップに従い、以下のポリシーのいずれかをアタッチします。このポリシーにより、Ingestion OpenSearch は Amazon MSK クラスター AWS PrivateLink への接続を作成し、Kafka トピックからデータを読み取ることができます。必ず独自の ARN で
resource
を更新してください。クラスターとパイプラインが同じ AWS アカウントにある場合は、次のポリシーが適用されます。
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "osis.amazonaws.com" }, "Action": [ "kafka:CreateVpcConnection", "kafka:DescribeClusterV2" ], "Resource": "arn:aws:kafka:
us-east-1
:{account-id}
:cluster/cluster-name
/cluster-id
" }, { "Effect": "Allow", "Principal": { "Service": "osis-pipelines.amazonaws.com" }, "Action": [ "kafka:CreateVpcConnection", "kafka:GetBootstrapBrokers", "kafka:DescribeClusterV2" ], "Resource": "arn:aws:kafka:us-east-1
:{account-id}
:cluster/cluster-name
/cluster-id
" } ] }Amazon MSK クラスターがパイプライン AWS アカウント とは異なる にある場合は、代わりに次のポリシーをアタッチします。クロスアカウントアクセスは、プロビジョニングされた Amazon MSK クラスターでのみ可能で、Amazon MSK サーバーレスクラスターではできないことに注意してください。の AWS
principal
ARN は、パイプライン YAML 設定に提供するのと同じパイプラインロールの ARN である必要があります。{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "osis.amazonaws.com" }, "Action": [ "kafka:CreateVpcConnection", "kafka:DescribeClusterV2" ], "Resource": "arn:aws:kafka:
us-east-1
:{msk-account-id}
:cluster/cluster-name
/cluster-id
" }, { "Effect": "Allow", "Principal": { "Service": "osis-pipelines.amazonaws.com" }, "Action": [ "kafka:CreateVpcConnection", "kafka:GetBootstrapBrokers", "kafka:DescribeClusterV2" ], "Resource": "arn:aws:kafka:us-east-1
:{msk-account-id}
:cluster/cluster-name
/cluster-id
" }, { "Effect": "Allow", "Principal": { "AWS": "arn:aws:iam::{pipeline-account-id}
:role/pipeline-role
" }, "Action": [ "kafka-cluster:*", "kafka:*" ], "Resource": [ "arn:aws:kafka:us-east-1:{msk-account-id}
:cluster/cluster-name
/cluster-id
", "arn:aws:kafka:us-east-1:{msk-account-id}
:topic/cluster-name
/cluster-id
/*", "arn:aws:kafka:us-east-1:{msk-account-id}
:group/cluster-name
/*" ] } ] } -
「トピックの作成」の手順に従って Kafka トピックを作成します。
がプライベートエンドポイント (単一 VPC) のブートストラップ URL の 1 つであることを確認してください。の値はBootstrapServerString
3
、Amazon MSK クラスターのゾーン数に基づいて、2
または--replication-factor
である必要があります。--partitions
の値は少なくとも10
である必要があります。 -
「データの生成と消費」の手順に従って、データを生成して使用します。
がプライベートエンドポイント (単一 VPC) のブートストラップ URL の 1 つであることを確認してください。BootstrapServerString
Amazon MSK Serverless の前提条件
OpenSearch 取り込みパイプラインを作成する前に、次のステップを実行します。
-
「Amazon Managed Streaming for Apache Kafka デベロッパーガイド」の「MSK サーバーレスクラスターを作成する」の手順に従って、Amazon MSK サーバーレスクラスターを作成します。
-
クラスターのステータスがアクティブになったら、「クラスターポリシーを MSK クラスターにアタッチする」の手順に従って、次のポリシーをアタッチします。必ず独自の ARN で
resource
を更新してください。{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "osis.amazonaws.com" }, "Action": [ "kafka:CreateVpcConnection", "kafka:DescribeClusterV2" ], "Resource": "arn:aws:kafka:
us-east-1
:{account-id}
:cluster/cluster-name
/cluster-id
" }, { "Effect": "Allow", "Principal": { "Service": "osis-pipelines.amazonaws.com" }, "Action": [ "kafka:CreateVpcConnection", "kafka:GetBootstrapBrokers", "kafka:DescribeClusterV2" ], "Resource": "arn:aws:kafka:us-east-1
:{account-id}
:cluster/cluster-name
/cluster-id
" } ] }このポリシーにより、Ingestion OpenSearch は Amazon MSK Serverless クラスター AWS PrivateLink への接続を作成し、Kafka トピックからデータを読み取ることができます。このポリシーは、クラスターとパイプラインが同じ にある場合に適用されます。これは AWS アカウント、Amazon MSK Serverless がクロスアカウントアクセスをサポートしていないためです。
-
「トピックの作成」の手順に従って Kafka トピックを作成します。
が Simple Authentication and Security Layer (SASL) IAM ブートストラップ URLs の 1 つであることを確認します。の値はBootstrapServerString
3
、Amazon MSK Serverless クラスターのゾーン数に基づいて、2
または--replication-factor
である必要があります。--partitions
の値は少なくとも10
である必要があります。 -
「データの生成と消費」の手順に従って、データを生成して使用します。繰り返しになりますが、
が Simple Authentication and Security Layer (SASL) IAM ブートストラップ URLs の 1 つであることを確認してください。BootstrapServerString
ステップ 1: パイプラインロールを設定する
Amazon MSK プロビジョンドクラスターまたはサーバーレスクラスターを設定したら、パイプライン設定で使用するパイプラインロールに次の Kafka アクセス許可を追加します。
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kafka-cluster:Connect", "kafka-cluster:AlterCluster", "kafka-cluster:DescribeCluster", "kafka:DescribeClusterV2", "kafka:GetBootstrapBrokers" ], "Resource": [ "arn:aws:kafka:
us-east-1
:{account-id}
:cluster/cluster-name
/cluster-id
" ] }, { "Effect": "Allow", "Action": [ "kafka-cluster:*Topic*", "kafka-cluster:ReadData" ], "Resource": [ "arn:aws:kafka:us-east-1
:{account-id}
:topic/cluster-name
/cluster-id
/topic-name
" ] }, { "Effect": "Allow", "Action": [ "kafka-cluster:AlterGroup", "kafka-cluster:DescribeGroup" ], "Resource": [ "arn:aws:kafka:us-east-1
:{account-id}
:group/cluster-name
/*" ] } ] }
ステップ 2: パイプラインを作成する
その後、Kafka OpenSearch をソースとして指定する Ingestion パイプラインを次のように設定できます。
version: "2" log-pipeline: source: kafka: acknowledgements: true topics: - name: "
topic-name
" group_id: "group-id
" aws: msk: arn: "arn:aws:kafka:{region}
:{account-id}
:cluster/cluster-name
/cluster-id
" region: "us-west-2
" sts_role_arn: "arn:aws:iam::{account-id}
:role/pipeline-role
" processor: - grok: match: message: - "%{COMMONAPACHELOG}" - date: destination: "@timestamp" from_time_received: true sink: - opensearch: hosts: ["https://search-domain-endpoint
.us-east-1
.es.amazonaws.com"] index: "index_name
" aws_sts_role_arn: "arn:aws:iam::{account-id}
:role/pipeline-role
" aws_region: "us-east-1
" aws_sigv4: true
事前設定された Amazon MSK ブループリントを使用して、このパイプラインを作成できます。詳細については、「ブループリントを使用したパイプラインの作成」を参照してください。
ステップ 3: (オプション) AWS Glue スキーマレジストリを使用する
Amazon MSK OpenSearch で Ingestion を使用する場合、Schema AWS Glue Registry でホストされているスキーマに AVRO データ形式を使用できます。AWS Glue スキーマレジストリを使用すると、データストリームスキーマを一元的に検出、制御、および展開できます。
このオプションを使用するには、パイプライン設定で type
スキーマを有効にします。
schema: type: "aws_glue"
また、パイプラインロール AWS Glue で読み取りアクセス許可を に提供する必要があります。という AWS マネージドポリシーを使用できますAWSGlueSchemaRegistryReadonlyAccess。さらに、レジストリは、取り込みパイプラインと同じ OpenSearch AWS アカウント およびリージョンに存在する必要があります。
ステップ 4: (オプション) Amazon MSK パイプラインの推奨コンピューティングユニット (OCU) を設定する
各コンピューティングユニットには、トピックごとに 1 つのコンシューマーがあります。ブローカーは、特定のトピックについて、これらのコンシューマー間でパーティションのバランスを取ります。ただし、パーティションの数がコンシューマーの数よりも多い場合、Amazon MSK は各コンシューマーで複数のパーティションをホストします。 OpenSearch 取り込みには、CPU 使用率またはパイプライン内の保留中のレコード数に基づいてスケールアップまたはスケールダウンする自動スケーリングが組み込まれています。
最適なパフォーマンスを得るには、パーティションを多くのコンピューティングユニットに分散して並列処理を行います。トピックに多くのパーティションがある場合 (パイプラインあたりの最大数である 96 以上の OCU がある場合など)、1 ~ 96 個の OCU でパイプラインを設定することをお勧めします。これは、必要に応じて自動的にスケールするためです。トピックのパーティション数が少ない場合 (96 未満の場合など)、最大コンピューティングユニットをパーティションの数と同じにします。
パイプラインに複数のトピックがある場合は、最大コンピューティングユニットを設定する参照としてパーティション数が最も多いトピックを選択します。新しい OCU セットを含むパイプラインを同じトピックとコンシューマーグループに追加すると、スループットをほぼ直線的にスケールすることができます。