での OpenSearch 取り込みパイプラインの使用 Amazon Managed Streaming for Apache Kafka - Amazon OpenSearch サービス

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

での OpenSearch 取り込みパイプラインの使用 Amazon Managed Streaming for Apache Kafka

Kafka プラグインを使用して、Amazon Managed Streaming for Apache Kafka (Amazon MSK) から Ingestion OpenSearch パイプラインにデータを取り込むことができます。Amazon MSK を使用すると、Apache Kafka をストリーミングデータの処理に使用するアプリケーションを構築および実行できます。 OpenSearch 取り込みでは AWS PrivateLink 、 を使用して Amazon MSK に接続します。Amazon MSK クラスターと Amazon MSK Serverless クラスターの両方からデータを取り込むことができます。2 つのプロセスの違いは、パイプラインを設定する前に実行する必要がある前提条件の手順だけです。

Amazon MSK の前提条件

OpenSearch 取り込みパイプラインを作成する前に、次のステップを実行します。

  1. 「Amazon Managed Streaming for Apache Kafka デベロッパーガイド」の「クラスターの作成」の手順に従って、Amazon MSK でプロビジョニングされたクラスターを作成します。ブローカータイプ では、t3タイプ以外のオプションを選択します。これらは Ingestion OpenSearch ではサポートされていないためです。

  2. クラスターのステータスが Active になったら、「マルチ VPC 接続を有効にする」の手順に従います。

  3. クラスターとパイプラインが同じ AWS アカウントにあるかどうかに応じて、「クラスターポリシーを MSK クラスターにアタッチする」のステップに従い、以下のポリシーのいずれかをアタッチします。このポリシーにより、Ingestion OpenSearch は Amazon MSK クラスター AWS PrivateLink への接続を作成し、Kafka トピックからデータを読み取ることができます。必ず独自の ARN で resource を更新してください。

    クラスターとパイプラインが同じ AWS アカウントにある場合は、次のポリシーが適用されます。

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "osis.amazonaws.com" }, "Action": [ "kafka:CreateVpcConnection", "kafka:DescribeClusterV2" ], "Resource": "arn:aws:kafka:us-east-1:{account-id}:cluster/cluster-name/cluster-id" }, { "Effect": "Allow", "Principal": { "Service": "osis-pipelines.amazonaws.com" }, "Action": [ "kafka:CreateVpcConnection", "kafka:GetBootstrapBrokers", "kafka:DescribeClusterV2" ], "Resource": "arn:aws:kafka:us-east-1:{account-id}:cluster/cluster-name/cluster-id" } ] }

    Amazon MSK クラスターがパイプライン AWS アカウント とは異なる にある場合は、代わりに次のポリシーをアタッチします。クロスアカウントアクセスは、プロビジョニングされた Amazon MSK クラスターでのみ可能で、Amazon MSK サーバーレスクラスターではできないことに注意してください。の AWS principal ARN は、パイプライン YAML 設定に提供するのと同じパイプラインロールの ARN である必要があります。

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "osis.amazonaws.com" }, "Action": [ "kafka:CreateVpcConnection", "kafka:DescribeClusterV2" ], "Resource": "arn:aws:kafka:us-east-1:{msk-account-id}:cluster/cluster-name/cluster-id" }, { "Effect": "Allow", "Principal": { "Service": "osis-pipelines.amazonaws.com" }, "Action": [ "kafka:CreateVpcConnection", "kafka:GetBootstrapBrokers", "kafka:DescribeClusterV2" ], "Resource": "arn:aws:kafka:us-east-1:{msk-account-id}:cluster/cluster-name/cluster-id" }, { "Effect": "Allow", "Principal": { "AWS": "arn:aws:iam::{pipeline-account-id}:role/pipeline-role" }, "Action": [ "kafka-cluster:*", "kafka:*" ], "Resource": [ "arn:aws:kafka:us-east-1:{msk-account-id}:cluster/cluster-name/cluster-id", "arn:aws:kafka:us-east-1:{msk-account-id}:topic/cluster-name/cluster-id/*", "arn:aws:kafka:us-east-1:{msk-account-id}:group/cluster-name/*" ] } ] }
  4. トピックの作成」の手順に従って Kafka トピックを作成します。BootstrapServerString がプライベートエンドポイント (単一 VPC) のブートストラップ URL の 1 つであることを確認してください。の値は3、Amazon MSK クラスターのゾーン数に基づいて、 2または --replication-factorである必要があります。--partitions の値は少なくとも 10 である必要があります。

  5. データの生成と消費」の手順に従って、データを生成して使用します。BootstrapServerString がプライベートエンドポイント (単一 VPC) のブートストラップ URL の 1 つであることを確認してください。

Amazon MSK Serverless の前提条件

OpenSearch 取り込みパイプラインを作成する前に、次のステップを実行します。

  1. 「Amazon Managed Streaming for Apache Kafka デベロッパーガイド」の「MSK サーバーレスクラスターを作成する」の手順に従って、Amazon MSK サーバーレスクラスターを作成します。

  2. クラスターのステータスがアクティブになったら、「クラスターポリシーを MSK クラスターにアタッチする」の手順に従って、次のポリシーをアタッチします。必ず独自の ARN で resource を更新してください。

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "osis.amazonaws.com" }, "Action": [ "kafka:CreateVpcConnection", "kafka:DescribeClusterV2" ], "Resource": "arn:aws:kafka:us-east-1:{account-id}:cluster/cluster-name/cluster-id" }, { "Effect": "Allow", "Principal": { "Service": "osis-pipelines.amazonaws.com" }, "Action": [ "kafka:CreateVpcConnection", "kafka:GetBootstrapBrokers", "kafka:DescribeClusterV2" ], "Resource": "arn:aws:kafka:us-east-1:{account-id}:cluster/cluster-name/cluster-id" } ] }

    このポリシーにより、Ingestion OpenSearch は Amazon MSK Serverless クラスター AWS PrivateLink への接続を作成し、Kafka トピックからデータを読み取ることができます。このポリシーは、クラスターとパイプラインが同じ にある場合に適用されます。これは AWS アカウント、Amazon MSK Serverless がクロスアカウントアクセスをサポートしていないためです。

  3. トピックの作成」の手順に従って Kafka トピックを作成します。BootstrapServerString が Simple Authentication and Security Layer (SASL) IAM ブートストラップ URLs の 1 つであることを確認します。の値は3、Amazon MSK Serverless クラスターのゾーン数に基づいて、 2または --replication-factorである必要があります。--partitions の値は少なくとも 10 である必要があります。

  4. データの生成と消費」の手順に従って、データを生成して使用します。繰り返しになりますが、 BootstrapServerStringが Simple Authentication and Security Layer (SASL) IAM ブートストラップ URLs の 1 つであることを確認してください。

ステップ 1: パイプラインロールを設定する

Amazon MSK プロビジョンドクラスターまたはサーバーレスクラスターを設定したら、パイプライン設定で使用するパイプラインロールに次の Kafka アクセス許可を追加します。

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kafka-cluster:Connect", "kafka-cluster:AlterCluster", "kafka-cluster:DescribeCluster", "kafka:DescribeClusterV2", "kafka:GetBootstrapBrokers" ], "Resource": [ "arn:aws:kafka:us-east-1:{account-id}:cluster/cluster-name/cluster-id" ] }, { "Effect": "Allow", "Action": [ "kafka-cluster:*Topic*", "kafka-cluster:ReadData" ], "Resource": [ "arn:aws:kafka:us-east-1:{account-id}:topic/cluster-name/cluster-id/topic-name" ] }, { "Effect": "Allow", "Action": [ "kafka-cluster:AlterGroup", "kafka-cluster:DescribeGroup" ], "Resource": [ "arn:aws:kafka:us-east-1:{account-id}:group/cluster-name/*" ] } ] }

ステップ 2: パイプラインを作成する

その後、Kafka OpenSearch をソースとして指定する Ingestion パイプラインを次のように設定できます。

version: "2" log-pipeline: source: kafka: acknowledgements: true topics: - name: "topic-name" group_id: "group-id" aws: msk: arn: "arn:aws:kafka:{region}:{account-id}:cluster/cluster-name/cluster-id" region: "us-west-2" sts_role_arn: "arn:aws:iam::{account-id}:role/pipeline-role" processor: - grok: match: message: - "%{COMMONAPACHELOG}" - date: destination: "@timestamp" from_time_received: true sink: - opensearch: hosts: ["https://search-domain-endpoint.us-east-1.es.amazonaws.com"] index: "index_name" aws_sts_role_arn: "arn:aws:iam::{account-id}:role/pipeline-role" aws_region: "us-east-1" aws_sigv4: true

事前設定された Amazon MSK ブループリントを使用して、このパイプラインを作成できます。詳細については、「ブループリントを使用したパイプラインの作成」を参照してください。

ステップ 3: (オプション) AWS Glue スキーマレジストリを使用する

Amazon MSK OpenSearch で Ingestion を使用する場合、Schema AWS Glue Registry でホストされているスキーマに AVRO データ形式を使用できます。AWS Glue スキーマレジストリを使用すると、データストリームスキーマを一元的に検出、制御、および展開できます。

このオプションを使用するには、パイプライン設定で type スキーマを有効にします。

schema: type: "aws_glue"

また、パイプラインロール AWS Glue で読み取りアクセス許可を に提供する必要があります。という AWS マネージドポリシーを使用できますAWSGlueSchemaRegistryReadonlyAccess。さらに、レジストリは、取り込みパイプラインと同じ OpenSearch AWS アカウント およびリージョンに存在する必要があります。

ステップ 4: (オプション) Amazon MSK パイプラインの推奨コンピューティングユニット (OCU) を設定する

各コンピューティングユニットには、トピックごとに 1 つのコンシューマーがあります。ブローカーは、特定のトピックについて、これらのコンシューマー間でパーティションのバランスを取ります。ただし、パーティションの数がコンシューマーの数よりも多い場合、Amazon MSK は各コンシューマーで複数のパーティションをホストします。 OpenSearch 取り込みには、CPU 使用率またはパイプライン内の保留中のレコード数に基づいてスケールアップまたはスケールダウンする自動スケーリングが組み込まれています。

最適なパフォーマンスを得るには、パーティションを多くのコンピューティングユニットに分散して並列処理を行います。トピックに多くのパーティションがある場合 (パイプラインあたりの最大数である 96 以上の OCU がある場合など)、1 ~ 96 個の OCU でパイプラインを設定することをお勧めします。これは、必要に応じて自動的にスケールするためです。トピックのパーティション数が少ない場合 (96 未満の場合など)、最大コンピューティングユニットをパーティションの数と同じにします。

パイプラインに複数のトピックがある場合は、最大コンピューティングユニットを設定する参照としてパーティション数が最も多いトピックを選択します。新しい OCU セットを含むパイプラインを同じトピックとコンシューマーグループに追加すると、スループットをほぼ直線的にスケールすることができます。