搭配 Amazon Kinesis Data Streams 使用 OpenSearch 擷取管道 - Amazon OpenSearch Service

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

搭配 Amazon Kinesis Data Streams 使用 OpenSearch 擷取管道

搭配 Amazon Kinesis Data Streams 使用 OpenSearch Ingestion 管道,將來自多個串流的資料擷取至 Amazon OpenSearch Service 網域和集合。OpenSearch Ingestion 管道整合串流擷取基礎設施,以提供從 Kinesis 持續擷取串流記錄的高規模、低延遲方式。

Amazon Kinesis Data Streams 作為來源

透過下列程序,您將了解如何設定使用 Amazon Kinesis Data Streams 做為資料來源的 OpenSearch Ingestion 管道。本節涵蓋必要的先決條件,例如建立 OpenSearch Service 網域或 OpenSearch Serverless Collection,以及逐步完成設定管道角色和建立管道的步驟。

先決條件

若要設定管道,您需要一或多個作用中的 Kinesis Data Streams。這些串流必須正在接收記錄或準備好接收來自其他來源的記錄。如需詳細資訊,請參閱 OpenSearch 擷取概觀

設定管道
  1. 建立 OpenSearch Service 網域或 OpenSearch Serverless 集合

    若要建立網域或集合,請參閱 OpenSearch Ingestion 入門

    若要建立具有正確許可的 IAM 角色,以存取寫入資料至集合或網域,請參閱以資源為基礎的政策

  2. 設定具有 許可的管道角色

    設定您想要在管道組態中使用的管道角色,並將下列許可新增至管道組態。將預留位置值取代為您自己的資訊。

    JSON
    { "Version": "2012-10-17", "Statement": [ { "Sid": "allowReadFromStream", "Effect": "Allow", "Action": [ "kinesis:DescribeStream", "kinesis:DescribeStreamConsumer", "kinesis:DescribeStreamSummary", "kinesis:GetRecords", "kinesis:GetShardIterator", "kinesis:ListShards", "kinesis:ListStreams", "kinesis:ListStreamConsumers", "kinesis:RegisterStreamConsumer", "kinesis:SubscribeToShard" ], "Resource": [ "arn:aws:kinesis:us-east-1:111122223333:stream/stream-name" ] } ] }

    如果在串流上啟用伺服器端加密,下列 AWS KMS 政策允許 解密記錄。將預留位置值取代為您自己的資訊。

    JSON
    { "Version": "2012-10-17", "Statement": [ { "Sid": "allowDecryptionOfCustomManagedKey", "Effect": "Allow", "Action": [ "kms:Decrypt", "kms:GenerateDataKey" ], "Resource": "arn:aws:kms:us-east-1:111122223333:key/key-id" } ] }

    為了讓管道將資料寫入網域,網域必須具有網域層級存取政策,允許 sts_role_arn 管道角色存取它。

    下列範例是網域存取政策,允許在上一個步驟 (pipeline-role) 中建立的管道角色將資料寫入ingestion-domain網域。將預留位置值取代為您自己的資訊。

    { "Statement": [ { "Effect": "Allow", "Principal": { "AWS": "arn:aws:iam::your-account-id:role/pipeline-role" }, "Action": ["es:DescribeDomain", "es:ESHttp*"], "Resource": "arn:aws:es:AWS 區域:account-id:domain/domain-name/*" } ] }
  3. 建立管道

    設定指定 Kinesis-data-streams 做為來源的 OpenSearch Ingestion 管道。您可以在 OpenSearch Ingestion Console 中找到可用於建立此類管道的已備妥藍圖。(選用) 若要使用 建立管道 AWS CLI,您可以使用名為 "AWS-KinesisDataStreamsPipeline" 的藍圖。將預留位置值取代為您自己的資訊。

    version: "2" kinesis-pipeline: source: kinesis_data_streams: acknowledgments: true codec: # Based on whether kinesis records are aggregated or not, you could choose json, newline or ndjson codec for processing the records. # JSON codec supports parsing nested CloudWatch Events into individual log entries that will be written as documents into OpenSearch. # json: # key_name: "logEvents" # These keys contain the metadata sent by CloudWatch Subscription Filters # in addition to the individual log events: # include_keys: [ 'owner', 'logGroup', 'logStream' ] newline: streams: - stream_name: "stream name" # Enable this if ingestion should start from the start of the stream. # initial_position: "EARLIEST" # checkpoint_interval: "PT5M" # Compression will always be gzip for CloudWatch, but will vary for other sources: # compression: "gzip" - stream_name: "stream name" # Enable this if ingestion should start from the start of the stream. # initial_position: "EARLIEST" # checkpoint_interval: "PT5M" # Compression will always be gzip for CloudWatch, but will vary for other sources: # compression: "gzip" # buffer_timeout: "1s" # records_to_accumulate: 100 # Change the consumer strategy to "polling". Default consumer strategy will use enhanced "fan-out" supported by KDS. # consumer_strategy: "polling" # if consumer strategy is set to "polling", enable the polling config below. # polling: # max_polling_records: 100 # idle_time_between_reads: "250ms" aws: # Provide the Role ARN with access to Amazon Kinesis Data Streams. This role should have a trust relationship with osis-pipelines.amazonaws.com sts_role_arn: "arn:aws:iam::111122223333:role/Example-Role" # Provide the AWS 區域 of the Data Stream. region: "us-east-1" sink: - opensearch: # Provide an Amazon OpenSearch Serverless domain endpoint hosts: [ "https://search-mydomain-1a2a3a4a5a6a7a8a9a0a9a8a7a.us-east-1.es.amazonaws.com" ] index: "index_${getMetadata(\"stream_name\")}" # Ensure adding unique document id as a combination of the metadata attributes available. document_id: "${getMetadata(\"partition_key\")}_${getMetadata(\"sequence_number\")}_${getMetadata(\"sub_sequence_number\")}" aws: # Provide a Role ARN with access to the domain. This role should have a trust relationship with osis-pipelines.amazonaws.com sts_role_arn: "arn:aws:iam::111122223333:role/Example-Role" # Provide the AWS 區域 of the domain. region: "us-east-1" # Enable the 'serverless' flag if the sink is an Amazon OpenSearch Serverless collection serverless: false # serverless_options: # Specify a name here to create or update network policy for the serverless collection # network_policy_name: "network-policy-name" # Enable the 'distribution_version' setting if the OpenSearch Serverless domain is of version Elasticsearch 6.x # distribution_version: "es6" # Enable and switch the 'enable_request_compression' flag if the default compression setting is changed in the domain. See https://docs.aws.amazon.com/opensearch-service/latest/developerguide/gzip.html # enable_request_compression: true/false # Optional: Enable the S3 DLQ to capture any failed requests in an S3 bucket. Delete this entire block if you don't want a DLQ. dlq: s3: # Provide an S3 bucket bucket: "your-dlq-bucket-name" # Provide a key path prefix for the failed requests # key_path_prefix: "kinesis-pipeline/logs/dlq" # Provide the region of the bucket. region: "us-east-1" # Provide a Role ARN with access to the bucket. This role should have a trust relationship with osis-pipelines.amazonaws.com sts_role_arn: "arn:aws:iam::111122223333:role/Example-Role"
    組態選項

    如需 Kinesis 組態選項,請參閱 OpenSearch 文件中的組態選項

    可用的中繼資料屬性
    • stream_name – 從中擷取記錄的 Kinesis Data Streams 名稱

    • partition_key – 正在擷取的 Kinesis Data Streams 記錄的分割區索引鍵

    • sequence_number – 正在擷取的 Kinesis Data Streams 記錄序號

    • sub_sequence_number – 正在擷取的 Kinesis Data Streams 記錄的子序號

  4. (選用) 設定 Kinesis Data Streams 管道的建議運算單位 (OCUs)

    OpenSearch Kinesis Data Streams 來源管道也可以設定為從多個串流擷取串流記錄。為了加快擷取速度,我們建議您為每個新增的串流新增額外的運算單位。

資料一致性

OpenSearch Ingestion end-to-end確認,以確保資料耐久性。當管道從 Kinesis 讀取串流記錄時,它會根據與串流相關聯的碎片動態分配讀取串流記錄的工作。在擷取 OpenSearch 網域或集合中的所有記錄之後,管道會在收到確認時自動檢查點串流。這可避免重複處理串流記錄。

若要根據串流名稱建立索引,請在 opensearch sink 區段中將索引定義為 "index_${getMetadata(\"stream_name\")}"

Amazon Kinesis Data Streams 跨帳戶做為來源

您可以使用 Amazon Kinesis Data Streams 跨帳戶授予存取權,以便 OpenSearch Ingestion 管道可以存取另一個帳戶中的 Kinesis Data Streams 作為來源。完成下列步驟以啟用跨帳戶存取:

設定跨帳戶存取
  1. 在具有 Kinesis 串流的帳戶中設定資源政策

    預留位置值取代為您自己的資訊。

    JSON
    { "Version": "2012-10-17", "Statement": [ { "Sid": "StreamReadStatementID", "Effect": "Allow", "Principal": { "AWS": "arn:aws:iam::111122223333:role/Pipeline-Role" }, "Action": [ "kinesis:DescribeStreamSummary", "kinesis:GetRecords", "kinesis:GetShardIterator", "kinesis:ListShards" ], "Resource": "arn:aws:kinesis:us-east-1:444455556666:stream/stream-name" }, { "Sid": "StreamEFOReadStatementID", "Effect": "Allow", "Principal": { "AWS": "arn:aws:iam::111122223333:role/Pipeline-Role" }, "Action": [ "kinesis:DescribeStreamSummary", "kinesis:ListShards" ], "Resource": "arn:aws:kinesis:us-east-1:444455556666:stream/stream-name/consumer/consumer-name" } ] }
  2. (選用) 設定消費者和消費者資源政策

    這是選用步驟,只有在您計劃使用增強廣發消費者策略讀取串流記錄時才需要。如需詳細資訊,請參閱使用專用輸送量開發增強型廣發消費者

    1. 設定取用者

      若要重複使用現有的消費者,您可以略過此步驟。如需詳細資訊,請參閱《Amazon Kinesis Data Streams API 參考》中的 RegisterStreamConsumer

      在下列範例 CLI 命令中,將預留位置值取代為您自己的資訊。

      範例 :CLI 命令範例
      aws kinesis register-stream-consumer \ --stream-arn "arn:aws:kinesis:AWS 區域:account-id:stream/stream-name" \ --consumer-name consumer-name
    2. 設定消費者資源政策

      在下列陳述式中,將預留位置值取代為您自己的資訊。

      JSON
      { "Version": "2012-10-17", "Statement": [ { "Sid": "ConsumerEFOReadStatementID", "Effect": "Allow", "Principal": { "AWS": "arn:aws:iam::111122223333:role/Pipeline-Role" }, "Action": [ "kinesis:DescribeStreamConsumer", "kinesis:SubscribeToShard" ], "Resource": "arn:aws:kinesis:us-east-1:444455556666:stream/stream-1/consumer/consumer-name" } ] }
  3. 管道組態

    對於跨帳戶擷取,kinesis_data_streams請為每個串流在 下新增下列屬性:

    • stream_arn - 屬於串流存在之帳戶之串流的來源

    • consumer_arn - 這是選用屬性,如果選擇預設增強型散發消費者策略,則必須指定此屬性。指定此欄位的實際取用者 Arn。將預留位置值取代為您自己的資訊。

    version: "2" kinesis-pipeline: source: kinesis_data_streams: acknowledgments: true codec: newline: streams: - stream_arn: "arn:aws:kinesis:region:stream-account-id:stream/stream-name" consumer_arn: "consumer arn" # Enable this if ingestion should start from the start of the stream. # initial_position: "EARLIEST" # checkpoint_interval: "PT5M" - stream_arn: "arn:aws:kinesis:region:stream-account-id:stream/stream-name" consumer_arn: "consumer arn" # initial_position: "EARLIEST" # buffer_timeout: "1s" # records_to_accumulate: 100 # Enable the consumer strategy to "polling". Default consumer strategy will use enhanced "fan-out" supported by KDS. # consumer_strategy: "polling" # if consumer strategy is set to "polling", enable the polling config below. # polling: # max_polling_records: 100 # idle_time_between_reads: "250ms" aws: # Provide the Role ARN with access to Kinesis. This role should have a trust relationship with osis-pipelines.amazonaws.com sts_role_arn: "arn:aws:iam::111122223333:role/Example-Role" # Provide the AWS 區域 of the domain. region: "us-east-1" sink: - opensearch: # Provide an OpenSearch Serverless domain endpoint hosts: [ "https://search-mydomain-1a2a3a4a5a6a7a8a9a0a9a8a7a.us-east-1.es.amazonaws.com" ] index: "index_${getMetadata(\"stream_name\")}" # Mapping for documentid based on partition key, shard sequence number and subsequence number metadata attributes document_id: "${getMetadata(\"partition_key\")}_${getMetadata(\"sequence_number\")}_${getMetadata(\"sub_sequence_number\")}" aws: # Provide a Role ARN with access to the domain. This role should have a trust relationship with osis-pipelines.amazonaws.com sts_role_arn: "arn:aws:iam::111122223333:role/Example-Role" # Provide the AWS 區域 of the domain. region: "us-east-1" # Enable the 'serverless' flag if the sink is an OpenSearch Serverless collection serverless: false # serverless_options: # Specify a name here to create or update network policy for the serverless collection # network_policy_name: network-policy-name # Enable the 'distribution_version' setting if the OpenSearch Serverless domain is of version Elasticsearch 6.x # distribution_version: "es6" # Enable and switch the 'enable_request_compression' flag if the default compression setting is changed in the domain. See https://docs.aws.amazon.com/opensearch-service/latest/developerguide/gzip.html # enable_request_compression: true/false # Optional: Enable the S3 DLQ to capture any failed requests in an S3 bucket. Delete this entire block if you don't want a DLQ. dlq: s3: # Provide an Amazon S3 bucket bucket: "your-dlq-bucket-name" # Provide a key path prefix for the failed requests # key_path_prefix: "alb-access-log-pipeline/logs/dlq" # Provide the AWS 區域 of the bucket. region: "us-east-1" # Provide a Role ARN with access to the bucket. This role should have a trust relationship with osis-pipelines.amazonaws.com sts_role_arn: "arn:aws:iam::111122223333:role/Example-Role"
  4. OSI 管道角色 Kinesis 資料串流
    1. IAM 政策

      將下列政策新增至管道角色。將預留位置值取代為您自己的資訊。

      JSON
      { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kinesis:DescribeStreamConsumer", "kinesis:SubscribeToShard" ], "Resource": [ "arn:aws:kinesis:us-east-1:111122223333:stream/my-stream" ] }, { "Sid": "allowReadFromStream", "Effect": "Allow", "Action": [ "kinesis:DescribeStream", "kinesis:DescribeStreamSummary", "kinesis:GetRecords", "kinesis:GetShardIterator", "kinesis:ListShards", "kinesis:ListStreams", "kinesis:ListStreamConsumers", "kinesis:RegisterStreamConsumer" ], "Resource": [ "arn:aws:kinesis:us-east-1:111122223333:stream/my-stream" ] } ] }
    2. 信任政策

      若要從串流帳戶擷取資料,您需要在管道擷取角色和串流帳戶之間建立信任關係。將下列項目新增至管道角色。將預留位置值取代為您自己的資訊。

      JSON
      { "Version": "2012-10-17", "Statement": [{ "Effect": "Allow", "Principal": { "AWS": "arn:aws:iam::111122223333:root" }, "Action": "sts:AssumeRole" }] }