本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
将 OpenSearch 摄取管道与 Amazon Kinesis Data Streams 配合使用
使用 OpenSearch 采集管道和 Amazon Kinesis Data Streams,将来自多个流的流记录数据提取到亚马逊 OpenSearch 服务域和集合中。 OpenSearch Ingestion 管道整合了流媒体摄取基础架构,为持续从 Kinesis 摄取直播记录提供了一种高规模、低延迟的方式。
亚马逊 Kinesis Data Streams 作为来源
通过以下过程,您将学习如何设置使用 Amazon K OpenSearch inesis Data Streams 作为数据源的摄取管道。本节介绍必要的先决条件,例如创建 OpenSearch 服务域或 OpenSearch 无服务器集合,并逐步完成配置管道角色和创建管道的步骤。
先决条件
要设置管道,你需要一个或多个活跃的 Kinesis Data Streams。这些流必须要么正在接收记录,要么准备好接收来自其他来源的记录。有关更多信息,请参阅 OpenSearch 摄取概述。
设置您的管道
-
创建 OpenSearch 服务域或 OpenSearch 无服务器集合
要创建域名或集合,请参阅 OpenSearch Ingestion 入门。
要创建具有正确权限的 IAM 角色来访问向集合或域写入数据,请参阅基于资源的策略。
-
为管道角色配置权限
设置要在工作流配置中使用的管道角色,并向其添加以下权限。将
placeholder values
替换为您自己的信息。如果在直播中启用了服务器端加密,则以下 AWS KMS 策略允许对记录进行解密。将
placeholder values
替换为您自己的信息。为使管道能够将数据写入域,域必须具有域级访问策略,以允许 sts_role_arn 管道角色访问域。
以下示例是一个域访问策略,它允许在上一步中创建的管道角色 (
pipeline-role
) 向ingestion-domain
域写入数据。将placeholder values
替换为您自己的信息。{ "Statement": [ { "Effect": "Allow", "Principal": { "AWS": "arn:aws:iam::
your-account-id
:role/pipeline-role
" }, "Action": ["es:DescribeDomain", "es:ESHttp*"], "Resource": "arn:aws:es:AWS 区域
:account-id
:domain/domain-name
/*" } ] } -
创建管道
配置一个将 K 指定inesis-data-streams为来源的 OpenSearch 摄取管道。您可以在 OpenSearch Ingestion 控制台上找到用于创建此类管道的现成蓝图。(可选)要使用创建管道 AWS CLI,您可以使用名为 “
AWS-KinesisDataStreamsPipeline
” 的蓝图。将placeholder values
替换为您自己的信息。version: "2" kinesis-pipeline: source: kinesis_data_streams: acknowledgments: true codec: # Based on whether kinesis records are aggregated or not, you could choose json, newline or ndjson codec for processing the records. # JSON codec supports parsing nested CloudWatch Events into individual log entries that will be written as documents into OpenSearch. # json: # key_name: "logEvents" # These keys contain the metadata sent by CloudWatch Subscription Filters # in addition to the individual log events: # include_keys: [ 'owner', 'logGroup', 'logStream' ] newline: streams: - stream_name: "
stream name
" # Enable this if ingestion should start from the start of the stream. # initial_position: "EARLIEST" # checkpoint_interval: "PT5M" # Compression will always be gzip for CloudWatch, but will vary for other sources: # compression: "gzip" - stream_name: "stream name
" # Enable this if ingestion should start from the start of the stream. # initial_position: "EARLIEST" # checkpoint_interval: "PT5M" # Compression will always be gzip for CloudWatch, but will vary for other sources: # compression: "gzip" # buffer_timeout: "1s" # records_to_accumulate: 100 # Change the consumer strategy to "polling". Default consumer strategy will use enhanced "fan-out" supported by KDS. # consumer_strategy: "polling" # if consumer strategy is set to "polling", enable the polling config below. # polling: # max_polling_records: 100 # idle_time_between_reads: "250ms" aws: # Provide the Role ARN with access to Amazon Kinesis Data Streams. This role should have a trust relationship with osis-pipelines.amazonaws.com sts_role_arn: "arn:aws:iam::111122223333:role/Example-Role
" # Provide the AWS 区域 of the Data Stream. region: "us-east-1
" sink: - opensearch: # Provide an Amazon OpenSearch Serverless domain endpoint hosts: [ "https://search-mydomain-1a2a3a4a5a6a7a8a9a0a9a8a7a.us-east-1.es.amazonaws.com
" ] index: "index_${getMetadata(\"stream_name\")}" # Ensure adding unique document id as a combination of the metadata attributes available. document_id: "${getMetadata(\"partition_key\")}_${getMetadata(\"sequence_number\")}_${getMetadata(\"sub_sequence_number\")}" aws: # Provide a Role ARN with access to the domain. This role should have a trust relationship with osis-pipelines.amazonaws.com sts_role_arn: "arn:aws:iam::111122223333:role/Example-Role
" # Provide the AWS 区域 of the domain. region: "us-east-1
" # Enable the 'serverless' flag if the sink is an Amazon OpenSearch Serverless collection serverless: false # serverless_options: # Specify a name here to create or update network policy for the serverless collection # network_policy_name: "network-policy-name" # Enable the 'distribution_version' setting if the OpenSearch Serverless domain is of version Elasticsearch 6.x # distribution_version: "es6" # Enable and switch the 'enable_request_compression' flag if the default compression setting is changed in the domain. See https://docs.aws.amazon.com/opensearch-service/latest/developerguide/gzip.html # enable_request_compression: true/false # Optional: Enable the S3 DLQ to capture any failed requests in an S3 bucket. Delete this entire block if you don't want a DLQ. dlq: s3: # Provide an S3 bucket bucket: "your-dlq-bucket-name
" # Provide a key path prefix for the failed requests # key_path_prefix: "kinesis-pipeline/logs/dlq" # Provide the region of the bucket. region: "us-east-1
" # Provide a Role ARN with access to the bucket. This role should have a trust relationship with osis-pipelines.amazonaws.com sts_role_arn: "arn:aws:iam::111122223333:role/Example-Role
"配置选项
有关 Kinesis 配置选项,请参阅文档中的OpenSearch配置选项
。 可用的元数据属性
-
stream_name — 从中提取记录的 Kinesis Data Streams 的名称
-
p@@ artition_key — 正在提取的 Kinesis Data Streams 记录的分区键
-
s@@ equence_n umber — 正在摄取的 Kinesis Data Streams 记录的序列号
-
sub_sequence_number — 正在摄取的 Kinesis Data Streams 记录的子序列号
-
-
(可选)为 Kinesis Data Streams 管道配置推荐的计算单位 (OCUs)
也可以将 OpenSearch Kinesis Data Streams 源管道配置为从多个流中提取直播记录。为了加快摄取速度,我们建议您为每个新添加的流添加额外的计算单元。
数据一致性
OpenSearch Ingestion 支持 end-to-end确认,以确保数据的持久性。当管道从 Kinesis 读取流记录时,它会根据与流关联的分片动态分配读取流记录的工作。Pipeline 在收录 OpenSearch 域或集合中的所有记录后收到确认后,将自动对流进行检查点。这将避免对直播记录进行重复处理。
要根据直播名称创建索引,请在 opensearch sink 部分中将索引定义为 “index_$ {getMetadata (\" stream_name\”)}”。
亚马逊 Kinesis Data Streams 跨账户作为来源
您可以使用 Amazon Kinesis Data Streams 跨账户授予访问权限 OpenSearch ,以便摄取管道可以访问另一个账户作为来源的 Kinesis Data Streams。完成以下步骤以启用跨账户访问权限:
配置跨账户访问
-
在拥有 Kinesis 直播的账户中设置资源策略
将
placeholder values
替换为您自己的信息。 -
(可选)设置消费者和消费者资源政策
这是一个可选步骤,只有在您计划使用增强型 Fanout Consumer 策略读取直播记录时才需要执行此步骤。有关更多信息,请参阅开发具有专用吞吐量的增强型扇出消费者。
-
设置消费者
要重复使用现有消费者,可以跳过此步骤。有关更多信息,请参阅RegisterStreamConsumer《亚马逊 Kinesis Data Streams API 参考》。
在以下示例 CLI 命令中,
placeholder values
用您自己的信息替换。例 示例 CLI 命令:
aws kinesis register-stream-consumer \ --stream-arn "arn:aws:kinesis:
AWS 区域
:account-id
:stream/stream-name
" \ --consumer-nameconsumer-name
-
设置消费者资源政策
在以下语句中,
placeholder values
用您自己的信息替换。
-
-
管道配置
要进行跨账号提取,请在下方
kinesis_data_streams
为每个直播添加以下属性:-
stream_arn
-属于直播所在账号的直播的 arn -
consumer_arn
-这是一个可选属性,如果选择默认的增强型扇出消费者策略,则必须指定该属性。为此字段指定实际使用者 arn。将placeholder values
替换为您自己的信息。
version: "2" kinesis-pipeline: source: kinesis_data_streams: acknowledgments: true codec: newline: streams: - stream_arn: "arn:aws:kinesis:
region
:stream-account-id
:stream/stream-name
" consumer_arn: "consumer arn
" # Enable this if ingestion should start from the start of the stream. # initial_position: "EARLIEST" # checkpoint_interval: "PT5M" - stream_arn: "arn:aws:kinesis:region
:stream-account-id
:stream/stream-name
" consumer_arn: "consumer arn
" # initial_position: "EARLIEST" # buffer_timeout: "1s" # records_to_accumulate: 100 # Enable the consumer strategy to "polling". Default consumer strategy will use enhanced "fan-out" supported by KDS. # consumer_strategy: "polling" # if consumer strategy is set to "polling", enable the polling config below. # polling: # max_polling_records: 100 # idle_time_between_reads: "250ms" aws: # Provide the Role ARN with access to Kinesis. This role should have a trust relationship with osis-pipelines.amazonaws.com sts_role_arn: "arn:aws:iam::111122223333
:role/Example-Role
" # Provide the AWS 区域 of the domain. region: "us-east-1
" sink: - opensearch: # Provide an OpenSearch Serverless domain endpoint hosts: [ "https://search-mydomain-1a2a3a4a5a6a7a8a9a0a9a8a7a.us-east-1.es.amazonaws.com
" ] index: "index_${getMetadata(\"stream_name\")}" # Mapping for documentid based on partition key, shard sequence number and subsequence number metadata attributes document_id: "${getMetadata(\"partition_key\")}_${getMetadata(\"sequence_number\")}_${getMetadata(\"sub_sequence_number\")}" aws: # Provide a Role ARN with access to the domain. This role should have a trust relationship with osis-pipelines.amazonaws.com sts_role_arn: "arn:aws:iam::111122223333:role/Example-Role
" # Provide the AWS 区域 of the domain. region: "us-east-1
" # Enable the 'serverless' flag if the sink is an OpenSearch Serverless collection serverless: false # serverless_options: # Specify a name here to create or update network policy for the serverless collection # network_policy_name:network-policy-name
# Enable the 'distribution_version' setting if the OpenSearch Serverless domain is of version Elasticsearch 6.x # distribution_version: "es6" # Enable and switch the 'enable_request_compression' flag if the default compression setting is changed in the domain. See https://docs.aws.amazon.com/opensearch-service/latest/developerguide/gzip.html # enable_request_compression: true/false # Optional: Enable the S3 DLQ to capture any failed requests in an S3 bucket. Delete this entire block if you don't want a DLQ. dlq: s3: # Provide an Amazon S3 bucket bucket: "your-dlq-bucket-name
" # Provide a key path prefix for the failed requests # key_path_prefix: "alb-access-log-pipeline/logs/dlq
" # Provide the AWS 区域 of the bucket. region: "us-east-1
" # Provide a Role ARN with access to the bucket. This role should have a trust relationship with osis-pipelines.amazonaws.com sts_role_arn: "arn:aws:iam::111122223333:role/Example-Role
" -
-
OSI 管道角色 Kinesis Data Streams
-
IAM Policy
将以下策略添加到管道角色。将
placeholder values
替换为您自己的信息。 -
信任策略
为了从直播账户提取数据,你需要在管道摄取角色和直播账户之间建立信任关系。将以下内容添加到管道角色中。将
placeholder values
替换为您自己的信息。
-