Confluent Kafka クラウドでの OpenSearch Ingestion パイプラインの使用 - Amazon OpenSearch サービス

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Confluent Kafka クラウドでの OpenSearch Ingestion パイプラインの使用

Confluent Kafka を Ingestion OpenSearch のソースとして使用して、Confluent Kafka クラスターから Amazon OpenSearch Service ドメインまたは Amazon OpenSearch Serverless コレクションにデータをストリーミングできます。 OpenSearch Ingestion は、パブリックネットワークスペースとプライベートネットワークスペースのセルフマネージド Kafka からのストリーミングデータの処理をサポートします。

Confluent パブリック Kafka クラウドへの接続

Ingestion OpenSearch パイプラインを使用して、パブリック設定の Confluent Kafka クラスターからデータをストリーミングできます (ブートストラップサーバーの DNS 名はパブリックに解決する必要があります)。これを行うには、取り込みパイプライン、ソースとしての Confluent Kafka OpenSearch クラスター、および送信先としての Amazon OpenSearch Service ドメインまたは Amazon OpenSearch Serverless コレクションが必要です。

データを移行するには、以下が必要です。

  • ソースとして機能する Confluent Kafka クラスター。クラスターには、移行するデータが含まれている必要があります。

  • 送信先として機能する Amazon OpenSearch Service ドメインまたは Amazon OpenSearch Serverless コレクション。

  • Kafka クラスターでは、 の認証情報を使用して認証を有効にする必要があります AWS Secrets Manager。

要件

セルフマネージド OpenSearch 型または Elasticsearch ソースクラスターで AWS Secrets Manager ベース認証を有効にするには、

  • 「シークレットのローテーション」の手順に従って AWS Secrets Manager 、Confluent Kafka クラスターで認証を設定します。 AWS Secrets Manager

  • Amazon OpenSearch Service ドメインまたは Amazon OpenSearch Serverless コレクションに書き込むアクセス許可を持つパイプラインロールを IAM に作成します。また、 から認証情報を読み取るアクセス許可を指定する必要があります AWS Secrets Manager。これを実行するには:

    • リソースベースのポリシーを Amazon OpenSearch Service ドメインにアタッチするか、コレクションにデータアクセスポリシーをアタッチします。これらのアクセスポリシーにより、Ingestion OpenSearch は自己管理 OpenSearch 型または Elasticsearch ソースクラスターから Amazon OpenSearch Service ドメインまたは Amazon OpenSearch Serverless コレクションにデータを書き込むことができます。

  • 設計図を参照して Ingestion OpenSearch パイプラインを作成します。

    これらのステップを完了すると、パイプラインはソースクラスターからのデータの処理を自動的に開始し、Amazon OpenSearch Service ドメインまたは Amazon OpenSearch Serverless コレクションの送信先に取り込みます。Ingestion OpenSearch パイプラインのさまざまなプロセッサを使用して、取り込まれたデータに対して変換を実行できます。

IAM ロールとアクセス許可

次のサンプルドメインアクセスポリシーでは、次のステップで作成するパイプラインロールが Amazon OpenSearch Service ドメインにデータを書き込むことを許可します。リソースは、必ず独自の ARN で更新してください。

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "AWS": "arn:aws:iam::{pipeline-account-id}:role/pipeline-role" }, "Action": [ "es:DescribeDomain", "es:ESHttp*" ], "Resource": [ "arn:aws:es:{region}:{account-id}:domain/domain-name" ] } ] }

ネットワークインターフェイスを管理するには、次のアクセス許可が必要です。

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "ec2:AttachNetworkInterface", "ec2:CreateNetworkInterface", "ec2:CreateNetworkInterfacePermission", "ec2:DeleteNetworkInterface", "ec2:DeleteNetworkInterfacePermission", "ec2:DetachNetworkInterface", "ec2:DescribeNetworkInterfaces" ], "Resource": [ "arn:aws:ec2:*:{account-id}:network-interface/*", "arn:aws:ec2:*:{account-id}:subnet/*", "arn:aws:ec2:*:{account-id}:security-group/*" ] }, { "Effect": "Allow", "Action": [ "ec2:DescribeDhcpOptions", "ec2:DescribeRouteTables", "ec2:DescribeSecurityGroups", "ec2:DescribeSubnets", "ec2:DescribeVpcs", "ec2:Describe*" ], "Resource": "*" }, { "Effect": "Allow", "Action": [ "ec2:CreateTags" ], "Resource": "arn:aws:ec2:*:*:network-interface/*", "Condition": { "StringEquals": { "aws:RequestTag/OSISManaged": "true" } } } ] }

以下は、 AWS Secrets Manager サービスからシークレットを読み取るために必要なアクセス許可です。

{ "Version": "2012-10-17", "Statement": [ { "Sid": "SecretsManagerReadAccess", "Effect": "Allow", "Action": ["secretsmanager:GetSecretValue"], "Resource": ["arn:aws:secretsmanager:<region>:<account-id>:secret:<secret-name>"] } ] }

Amazon OpenSearch Service ドメインに書き込むには、次のアクセス許可が必要です。

{ "Statement": [ { "Effect": "Allow", "Principal": { "AWS": "arn:aws:iam::{your-account-id}:role/{pipeline-role}" }, "Action": ["es:DescribeDomain", "es:ESHttp*"], "Resource": "arn:aws:es:{region}:{your-account-id}:domain/{domain-name}/*" } ] }
パイプラインの作成

ポリシーをパイプラインロールにアタッチしたら、Confluent Kafka データ移行パイプラインの設計図を使用してパイプラインを作成します。このブループリントには、Kafka と送信先の間でデータを移行するためのデフォルト設定が含まれています。

  • 複数の Amazon OpenSearch Service ドメインをデータの送信先として指定できます。この機能を使用すると、受信データを複数の Amazon OpenSearch Servicedomain に条件付きでルーティングまたはレプリケーションできます。

  • ソース Confluent Kafka クラスターから Amazon OpenSearch Serverless VPC コレクションにデータを移行できます。パイプライン設定内でネットワークアクセスポリシーを指定していることを確認します。

  • confluent スキーマレジストリを使用して、 と confluent スキーマを定義できます。

次のサンプルパイプラインは、Confluent Kafka クラスターから Amazon OpenSearch Service ドメインにデータを取り込みます。

version: "2" kafka-pipeline: source: kafka: # Encryption is always required encryption: type: "ssl" topics: - name: "topic_4" group_id: "demoGroup" bootstrap_servers: # TODO: for public confluent kafka use public booststrap server dns - "<<bootstrap-server>>.us-west-2.aws.private.confluent.cloud:9092" authentication: sasl: plain: username: "${{aws_secrets:confluent-kafka-secret:username}}" password: "${{aws_secrets:confluent-kafka-secret:password}}" # Schema is optional schema: type: confluent registry_url: https://<<registry-url>>.us-west-2.aws.confluent.cloud api_key: "${{aws_secrets:schema-secret:schema_registry_api_key}}" api_secret: "${{aws_secrets:schema-secret:schema_registry_api_secret}}" basic_auth_credentials_source: "USER_INFO" sink: - opensearch: hosts: [ "https://<<opensearchdomain>>.us-west-2.es.amazonaws.com" ] index: "enterprise-confluent-demo" aws: sts_role_arn: "arn:aws:iam::1234567890:role/os-os-test-role" region: "<<aws-region>>" extension: aws: secrets: confluent-kafka-secret: secret_id: "enterprise-kafka-credentials" region: "<<aws-region>>" sts_role_arn: "arn:aws:iam::1234567890:role/os-os-test-role" schema-secret: secret_id: "self-managed-kafka-schema" region: "<<aws-region>>" sts_role_arn: "arn:aws:iam::1234567890:role/os-os-test-role"

VPC 内の Confluent Kafka クラウドへの接続

Ingestion OpenSearch パイプラインを使用して、パブリック設定で Confluent Kafka クラスターからデータをストリーミングできます。これを行うには、Confluent Kafka OpenSearch をソースとして、Amazon OpenSearch Service ドメインまたは Amazon OpenSearch Serverless コレクションを送信先として Ingestion パイプラインを設定します。パイプラインは、kafka クラスターからのすべてのストリーミングデータを処理し、そのデータを宛先クラスターに取り込みます。

Confluent Kafka ネットワーク設定

OpenSearch Ingestion は、Confluent でサポートされているすべてのネットワークモードで設定された Confluent Kafka クラスターをサポートします。以下のネットワーク設定モードは、 OpenSearch 取り込みのソースとしてサポートされています。

  • AWS VPC ピアリング接続

  • AWS PrivateLink 専用クラスター用の

  • AWS PrivateLink for Enterprise クラスター

  • AWS Transit Gateway

Confluent マネージド Kafka は、Confluent クラウドからデータを取り込むためのソースとして使用できます。これを実現するには、Kafka をソースとして設定し、Amazon OpenSearch Service ドメインまたは Amazon OpenSearch Serverless コレクションをシンクとして設定するパイプラインを設定します。これにより、Kafka から指定された宛先へのデータの移行が容易になります。移行では、confluent レジストリの使用もサポートされます。レジストリはまったく使用されません。

データ移行を実行するには、次のリソースが必要です。

  • 移行する予定のデータを含む、ソースとして機能する Confluent Kafka クラスター。

  • Amazon OpenSearch Service ドメインやシンクとしての Amazon OpenSearch Serverless コレクションなどのターゲット送信先。

  • Confluent VPC にアクセスできる Amazon VPC の VPC ID。

  • Kafka クラスターでは、 の認証情報を使用して認証を有効にする必要があります AWS Secrets Manager。

要件

Kafka クラスターで取り込みを設定するには、以下が必要です。

  • Kafka クラスターで AWS Secrets Manager ベースの認証を有効にする必要があります。

  • Ingestion サービスで使用する VPC CIDR OpenSearch を指定する必要があります。

    • AWS マネジメントコンソールを使用してパイプラインを作成する場合は、Confluent Kafka OpenSearch をソースとして使用するには、VPC に Amazon Ingestion パイプラインもアタッチする必要があります。これを行うには、ネットワーク設定セクションを見つけ、VPC にアタッチチェックボックスを選択し、CIDR を選択するか、取り込みで使用する /24 CIDR OpenSearch を手動で入力します。Ingestion で使用するように選択した CIDR OpenSearch は、Confluent マネージド Kafka が実行されている VPC CIDR とは異なる必要があります。Confluent Kafka CIDR の詳細については、「」を参照してください。以下は、Ingestion Service がネットワーク接続を作成するために使用できるデフォルトの CIDR OpenSearch オプションです。

      • 10.99.20.0/24

      • 192.168.36.0/24

      • 172.21.56.0/24

  • Amazon OpenSearch Service ドメインまたは Amazon OpenSearch Serverless コレクションへのアクセス許可と、 からシークレットを読み取るアクセス許可を持つパイプラインロールを IAM で作成する必要があります AWS Secrets Manager。

    • リソースベースのポリシーを Amazon OpenSearch Servicedomain にアタッチするか、Amazon OpenSearch Serverless データアクセスポリシーをコレクションにアタッチします。これらのアクセスポリシーにより、Ingestion OpenSearch は Kafka から Amazon OpenSearch Service ドメインまたは Amazon OpenSearch Serverless コレクションにデータを書き込むことができます。

  • AWS PrivateLink 接続可能な Confluent Kafka の場合は、 を設定します。

    VPC DHCP オプションDNS ホスト名DNS 解決を有効にする必要があります。

IAM ロールとアクセス許可

次のサンプルドメインアクセスポリシーでは、パイプラインロールが Amazon OpenSearch Service ドメインにデータを書き込むことを許可します。

注記

を独自の ARN resourceで更新する必要があります。

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "AWS": "arn:aws:iam::{pipeline-account-id}:role/pipeline-role" }, "Action": [ "es:DescribeDomain", "es:ESHttp*" ], "Resource": [ "arn:aws:es:{region}:{account-id}:domain/domain-name" ] } ] }

次のサンプルは、ネットワークインターフェイスを管理するために必要なアクセス許可を提供します。

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "ec2:AttachNetworkInterface", "ec2:CreateNetworkInterface", "ec2:CreateNetworkInterfacePermission", "ec2:DeleteNetworkInterface", "ec2:DeleteNetworkInterfacePermission", "ec2:DetachNetworkInterface", "ec2:DescribeNetworkInterfaces" ], "Resource": [ "arn:aws:ec2:*:{account-id}:network-interface/*", "arn:aws:ec2:*:{account-id}:subnet/*", "arn:aws:ec2:*:{account-id}:security-group/*" ] }, { "Effect": "Allow", "Action": [ "ec2:DescribeDhcpOptions", "ec2:DescribeRouteTables", "ec2:DescribeSecurityGroups", "ec2:DescribeSubnets", "ec2:DescribeVpcs", "ec2:Describe*" ], "Resource": "*" }, { "Effect": "Allow", "Action": [ "ec2:CreateTags" ], "Resource": "arn:aws:ec2:*:*:network-interface/*", "Condition": { "StringEquals": { "aws:RequestTag/OSISManaged": "true" } } } ]

次のサンプルは、 からシークレットを読み取るために必要なアクセス許可を提供します AWS Secrets Manager。

{ "Version": "2012-10-17", "Statement": [ { "Sid": "SecretsManagerReadAccess", "Effect": "Allow", "Action": ["secretsmanager:GetSecretValue"], "Resource": ["arn:aws:secretsmanager:<region>:<account-id>:secret:<secret-name>"] } ] }

次のサンプルは、Amazon OpenSearch Service ドメインへの書き込みに必要なアクセス許可を提供します。

{ "Statement": [ { "Effect": "Allow", "Principal": { "AWS": "arn:aws:iam::{your-account-id}:role/{pipeline-role}" }, "Action": ["es:DescribeDomain", "es:ESHttp*"], "Resource": "arn:aws:es:{region}:{your-account-id}:domain/{domain-name}/*" } ] }
パイプラインの作成

ポリシーをパイプラインロールにアタッチしたら、Confluent Kafka データ移行パイプラインの設計図を使用してパイプラインを作成できます。このブループリントには、Kafka と送信先の間でデータを移行するためのデフォルト設定が含まれています。

  • 複数の Amazon OpenSearch Service ドメインをデータの送信先として指定できます。この機能により、受信データを複数の Amazon OpenSearch Service に条件付きでルーティングまたはレプリケーションできます。

  • ソース Confluent Kafka クラスターから Amazon OpenSearch Serverless VPC コレクションにデータを移行できます。パイプライン設定内でネットワークアクセスポリシーを指定していることを確認します。

  • Confluent スキーマレジストリを使用して、 と Confluent スキーマを定義できます。

サンプルパイプライン設定

version: "2" kafka-pipeline: source: kafka: # Encryption is always required encryption: type: "ssl" topics: - name: "topic_4" group_id: "demoGroup" bootstrap_servers: # TODO: for public confluent kafka use public booststrap server dns - "<<bootstrap-server>>.us-west-2.aws.private.confluent.cloud:9092" authentication: sasl: plain: username: "${{aws_secrets:confluent-kafka-secret:username}}" password: "${{aws_secrets:confluent-kafka-secret:password}}" # Schema is optional schema: type: confluent registry_url: https://<<registry-url>>.us-west-2.aws.confluent.cloud api_key: "${{aws_secrets:schema-secret:schema_registry_api_key}}" api_secret: "${{aws_secrets:schema-secret:schema_registry_api_secret}}" basic_auth_credentials_source: "USER_INFO" sink: - opensearch: hosts: [ "https://<<opensearchdomain>>.us-west-2.es.amazonaws.com" ] index: "enterprise-confluent-demo" aws: sts_role_arn: "arn:aws:iam::1234567890:role/os-os-test-role" region: "<<aws-region>>" extension: aws: secrets: confluent-kafka-secret: secret_id: "enterprise-kafka-credentials" region: "<<aws-region>>" sts_role_arn: "arn:aws:iam::1234567890:role/os-os-test-role" schema-secret: secret_id: "self-managed-kafka-schema" region: "<<aws-region>>" sts_role_arn: "arn:aws:iam::1234567890:role/os-os-test-role"