翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
Amazon Kinesis Data Streams で OpenSearch Ingestion パイプラインを使用する
Amazon Kinesis Data Streams で OpenSearch Ingestion パイプラインを使用して、複数のストリームから Amazon OpenSearch Service ドメインとコレクションにストリームレコードデータを取り込みます。OpenSearch Ingestion パイプラインにはストリーミング取り込みインフラストラクチャが組み込まれており、Kinesis からストリームレコードを継続的に取り込むための、高スケールで低レイテンシーな方法を提供します。
ソースとしての Amazon Kinesis Data Streams
次の手順では、Amazon Kinesis Data Streams をデータソースとして使用する OpenSearch Ingestion パイプラインを設定する方法について説明します。このセクションでは、OpenSearch Service ドメインまたは OpenSearch Serverless コレクションの作成、パイプラインロールの設定とパイプラインの作成の手順など、必要な前提条件について説明します。
前提条件
パイプラインを設定するには、1 つ以上のアクティブな Kinesis Data Streams が必要です。これらのストリームは、レコードを受信するか、他のソースからレコードを受信する準備ができている必要があります。詳細については、OpenSearch Ingestion の概要」を参照してください。
パイプラインを設定するには
-
OpenSearch Service ドメインまたは OpenSearch Serverless コレクションを作成する
ドメインまたはコレクションを作成するには、OpenSearch Ingestion の開始方法」を参照してください。
コレクションまたはドメインへの書き込みデータにアクセスするための正しいアクセス許可を持つ IAM ロールを作成するには、「リソースベースのポリシー」を参照してください。
-
アクセス許可を持つパイプラインロールを設定する
パイプライン設定で使用するパイプラインロールを設定し、そのロールに次のアクセス許可を追加します。
プレースホルダー値
を、ユーザー自身の情報に置き換えます。ストリームでサーバー側の暗号化が有効になっている場合、次の AWS KMS ポリシーでは がレコードを復号化できるようにします。
プレースホルダー値
を、ユーザー自身の情報に置き換えます。パイプラインがドメインにデータを書き込むには、sts_role_arn パイプラインロールにドメインへのアクセスを許可するドメインレベルのアクセスポリシーが、このドメインに必要になります。
次の例は、前のステップ (
pipeline-role
) で作成したパイプラインロールがドメインにデータを書き込むことを許可するingestion-domain
ドメインアクセスポリシーです。プレースホルダー値
を、ユーザー自身の情報に置き換えます。{ "Statement": [ { "Effect": "Allow", "Principal": { "AWS": "arn:aws:iam::
your-account-id
:role/pipeline-role
" }, "Action": ["es:DescribeDomain", "es:ESHttp*"], "Resource": "arn:aws:es:AWS リージョン
:account-id
:domain/domain-name
/*" } ] } -
パイプラインの作成
Kinesis-data-streams をソースとして指定する OpenSearch Ingestion パイプラインを設定します。このようなパイプラインを作成するために、OpenSearch Ingestion Console で用意された設計図を見つけることができます。(オプション) を使用してパイプラインを作成するには AWS CLI、「」という名前のブループリントを使用できます
AWS-KinesisDataStreamsPipeline
。プレースホルダー値
を、ユーザー自身の情報に置き換えます。version: "2" kinesis-pipeline: source: kinesis_data_streams: acknowledgments: true codec: # Based on whether kinesis records are aggregated or not, you could choose json, newline or ndjson codec for processing the records. # JSON codec supports parsing nested CloudWatch Events into individual log entries that will be written as documents into OpenSearch. # json: # key_name: "logEvents" # These keys contain the metadata sent by CloudWatch Subscription Filters # in addition to the individual log events: # include_keys: [ 'owner', 'logGroup', 'logStream' ] newline: streams: - stream_name: "
stream name
" # Enable this if ingestion should start from the start of the stream. # initial_position: "EARLIEST" # checkpoint_interval: "PT5M" # Compression will always be gzip for CloudWatch, but will vary for other sources: # compression: "gzip" - stream_name: "stream name
" # Enable this if ingestion should start from the start of the stream. # initial_position: "EARLIEST" # checkpoint_interval: "PT5M" # Compression will always be gzip for CloudWatch, but will vary for other sources: # compression: "gzip" # buffer_timeout: "1s" # records_to_accumulate: 100 # Change the consumer strategy to "polling". Default consumer strategy will use enhanced "fan-out" supported by KDS. # consumer_strategy: "polling" # if consumer strategy is set to "polling", enable the polling config below. # polling: # max_polling_records: 100 # idle_time_between_reads: "250ms" aws: # Provide the Role ARN with access to Amazon Kinesis Data Streams. This role should have a trust relationship with osis-pipelines.amazonaws.com sts_role_arn: "arn:aws:iam::111122223333:role/Example-Role
" # Provide the AWS リージョン of the Data Stream. region: "us-east-1
" sink: - opensearch: # Provide an Amazon OpenSearch Serverless domain endpoint hosts: [ "https://search-mydomain-1a2a3a4a5a6a7a8a9a0a9a8a7a.us-east-1.es.amazonaws.com
" ] index: "index_${getMetadata(\"stream_name\")}" # Ensure adding unique document id as a combination of the metadata attributes available. document_id: "${getMetadata(\"partition_key\")}_${getMetadata(\"sequence_number\")}_${getMetadata(\"sub_sequence_number\")}" aws: # Provide a Role ARN with access to the domain. This role should have a trust relationship with osis-pipelines.amazonaws.com sts_role_arn: "arn:aws:iam::111122223333:role/Example-Role
" # Provide the AWS リージョン of the domain. region: "us-east-1
" # Enable the 'serverless' flag if the sink is an Amazon OpenSearch Serverless collection serverless: false # serverless_options: # Specify a name here to create or update network policy for the serverless collection # network_policy_name: "network-policy-name" # Enable the 'distribution_version' setting if the OpenSearch Serverless domain is of version Elasticsearch 6.x # distribution_version: "es6" # Enable and switch the 'enable_request_compression' flag if the default compression setting is changed in the domain. See https://docs.aws.amazon.com/opensearch-service/latest/developerguide/gzip.html # enable_request_compression: true/false # Optional: Enable the S3 DLQ to capture any failed requests in an S3 bucket. Delete this entire block if you don't want a DLQ. dlq: s3: # Provide an S3 bucket bucket: "your-dlq-bucket-name
" # Provide a key path prefix for the failed requests # key_path_prefix: "kinesis-pipeline/logs/dlq" # Provide the region of the bucket. region: "us-east-1
" # Provide a Role ARN with access to the bucket. This role should have a trust relationship with osis-pipelines.amazonaws.com sts_role_arn: "arn:aws:iam::111122223333:role/Example-Role
"設定オプション
Kinesis 設定オプションについては、OpenSearch ドキュメントの「設定オプション
」を参照してください。 使用可能なメタデータ属性
-
stream_name – レコードが取り込まれた Kinesis Data Streams の名前
-
partition_key – 取り込まれる Kinesis Data Streams レコードのパーティションキー
-
sequence_number – 取り込まれる Kinesis Data Streams レコードのシーケンス番号
-
sub_sequence_number – 取り込まれる Kinesis Data Streams レコードのサブシーケンス番号
-
-
(オプション) Kinesis Data Streams パイプラインの推奨コンピューティングユニット (OCUs) を設定する
OpenSearch Kinesis Data Streams ソースパイプラインは、複数のストリームからストリームレコードを取り込むように設定することもできます。取り込みを高速化するには、新しいストリームが追加されるごとにコンピューティングユニットを追加することをお勧めします。
データ整合性
OpenSearch Ingestion は、データの耐久性を確保するためにエンドツーエンドの確認応答をサポートしています。パイプラインが Kinesis からストリームレコードを読み取ると、ストリームに関連付けられたシャードに基づいてストリームレコードを読み取る作業が動的に分散されます。パイプラインは、OpenSearch ドメインまたはコレクション内のすべてのレコードを取り込んだ後に確認を受け取ると、ストリームを自動的にチェックポイントします。これにより、ストリームレコードの重複処理を回避できます。
ストリーム名に基づいてインデックスを作成するには、opensearch sink セクションのインデックスを "index_${getMetadata(\"stream_name\")}" として定義します。
ソースとしての Amazon Kinesis Data Streams クロスアカウント
OpenSearch Ingestion パイプラインがソースとして別のアカウントの Kinesis Data Streams にアクセスできるように、Amazon Kinesis Data Streams のアカウント間でアクセスを許可できます。クロスアカウントアクセスを有効にするには、次の手順を実行します。
クロスアカウントアクセスの設定
-
Kinesis ストリームがあるアカウントにリソースポリシーを設定する
プレースホルダー値
を、ユーザー自身の情報に置き換えます。 -
(オプション) コンシューマーおよびコンシューマーリソースポリシーの設定
これはオプションのステップであり、ストリームレコードの読み取りに拡張ファンアウトコンシューマー戦略を使用する場合にのみ必要です。詳細については、「専用のスループットで拡張ファンアウトコンシューマーを開発する」を参照してください。
-
コンシューマーのセットアップ
既存のコンシューマーを再利用するには、このステップをスキップできます。詳細については、Amazon Kinesis Data Streams API リファレンスのRegisterStreamConsumer」を参照してください。
次の CLI コマンドの例では、
プレースホルダー値を
独自の情報に置き換えます。例 CLI コマンドの例:
aws kinesis register-stream-consumer \ --stream-arn "arn:aws:kinesis:
AWS リージョン
:account-id
:stream/stream-name
" \ --consumer-nameconsumer-name
-
コンシューマーリソースポリシーの設定
次のステートメントで、
プレースホルダー値を
独自の情報に置き換えます。
-
-
パイプラインの設定
クロスアカウント取り込みの場合は、ストリームごとに
kinesis_data_streams
に次の属性を追加します。-
stream_arn
- ストリームが存在するアカウントに属するストリームの ARN -
consumer_arn
- これはオプションの属性であり、デフォルトの拡張ファンアウトコンシューマー戦略が選択されている場合は指定する必要があります。このフィールドの実際のコンシューマー ARN を指定します。プレースホルダー値
を、ユーザー自身の情報に置き換えます。
version: "2" kinesis-pipeline: source: kinesis_data_streams: acknowledgments: true codec: newline: streams: - stream_arn: "arn:aws:kinesis:
region
:stream-account-id
:stream/stream-name
" consumer_arn: "consumer arn
" # Enable this if ingestion should start from the start of the stream. # initial_position: "EARLIEST" # checkpoint_interval: "PT5M" - stream_arn: "arn:aws:kinesis:region
:stream-account-id
:stream/stream-name
" consumer_arn: "consumer arn
" # initial_position: "EARLIEST" # buffer_timeout: "1s" # records_to_accumulate: 100 # Enable the consumer strategy to "polling". Default consumer strategy will use enhanced "fan-out" supported by KDS. # consumer_strategy: "polling" # if consumer strategy is set to "polling", enable the polling config below. # polling: # max_polling_records: 100 # idle_time_between_reads: "250ms" aws: # Provide the Role ARN with access to Kinesis. This role should have a trust relationship with osis-pipelines.amazonaws.com sts_role_arn: "arn:aws:iam::111122223333
:role/Example-Role
" # Provide the AWS リージョン of the domain. region: "us-east-1
" sink: - opensearch: # Provide an OpenSearch Serverless domain endpoint hosts: [ "https://search-mydomain-1a2a3a4a5a6a7a8a9a0a9a8a7a.us-east-1.es.amazonaws.com
" ] index: "index_${getMetadata(\"stream_name\")}" # Mapping for documentid based on partition key, shard sequence number and subsequence number metadata attributes document_id: "${getMetadata(\"partition_key\")}_${getMetadata(\"sequence_number\")}_${getMetadata(\"sub_sequence_number\")}" aws: # Provide a Role ARN with access to the domain. This role should have a trust relationship with osis-pipelines.amazonaws.com sts_role_arn: "arn:aws:iam::111122223333:role/Example-Role
" # Provide the AWS リージョン of the domain. region: "us-east-1
" # Enable the 'serverless' flag if the sink is an OpenSearch Serverless collection serverless: false # serverless_options: # Specify a name here to create or update network policy for the serverless collection # network_policy_name:network-policy-name
# Enable the 'distribution_version' setting if the OpenSearch Serverless domain is of version Elasticsearch 6.x # distribution_version: "es6" # Enable and switch the 'enable_request_compression' flag if the default compression setting is changed in the domain. See https://docs.aws.amazon.com/opensearch-service/latest/developerguide/gzip.html # enable_request_compression: true/false # Optional: Enable the S3 DLQ to capture any failed requests in an S3 bucket. Delete this entire block if you don't want a DLQ. dlq: s3: # Provide an Amazon S3 bucket bucket: "your-dlq-bucket-name
" # Provide a key path prefix for the failed requests # key_path_prefix: "alb-access-log-pipeline/logs/dlq
" # Provide the AWS リージョン of the bucket. region: "us-east-1
" # Provide a Role ARN with access to the bucket. This role should have a trust relationship with osis-pipelines.amazonaws.com sts_role_arn: "arn:aws:iam::111122223333:role/Example-Role
" -
-
OSI パイプラインロール Kinesis Data Streams
-
IAM ポリシー
パイプラインロールに次のポリシーを追加します。
プレースホルダー値
を、ユーザー自身の情報に置き換えます。 -
信頼ポリシー
ストリームアカウントからデータを取り込むには、パイプライン取り込みロールとストリームアカウントの間に信頼関係を確立する必要があります。パイプラインロールに以下を追加します。
プレースホルダー値
を、ユーザー自身の情報に置き換えます。
-