将 OpenSearch 摄取管道与 Amazon Kinesis Data Streams 配合使用 - 亚马逊 OpenSearch 服务

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

将 OpenSearch 摄取管道与 Amazon Kinesis Data Streams 配合使用

使用 OpenSearch 采集管道和 Amazon Kinesis Data Streams,将来自多个流的流记录数据提取到亚马逊 OpenSearch 服务域和集合中。 OpenSearch Ingestion 管道整合了流媒体摄取基础架构,为持续从 Kinesis 摄取直播记录提供了一种高规模、低延迟的方式。

亚马逊 Kinesis Data Streams 作为来源

通过以下过程,您将学习如何设置使用 Amazon K OpenSearch inesis Data Streams 作为数据源的摄取管道。本节介绍必要的先决条件,例如创建 OpenSearch 服务域或 OpenSearch 无服务器集合,并逐步完成配置管道角色和创建管道的步骤。

先决条件

要设置管道,你需要一个或多个活跃的 Kinesis Data Streams。这些流必须要么正在接收记录,要么准备好接收来自其他来源的记录。有关更多信息,请参阅 OpenSearch 摄取概述

设置您的管道
  1. 创建 OpenSearch 服务域或 OpenSearch 无服务器集合

    要创建域名或集合,请参阅 OpenSearch Ingestion 入门

    要创建具有正确权限的 IAM 角色来访问向集合或域写入数据,请参阅基于资源的策略

  2. 为管道角色配置权限

    设置要在工作流配置中使用的管道角色,并向其添加以下权限。将 placeholder values 替换为您自己的信息。

    JSON
    { "Version": "2012-10-17", "Statement": [ { "Sid": "allowReadFromStream", "Effect": "Allow", "Action": [ "kinesis:DescribeStream", "kinesis:DescribeStreamConsumer", "kinesis:DescribeStreamSummary", "kinesis:GetRecords", "kinesis:GetShardIterator", "kinesis:ListShards", "kinesis:ListStreams", "kinesis:ListStreamConsumers", "kinesis:RegisterStreamConsumer", "kinesis:SubscribeToShard" ], "Resource": [ "arn:aws:kinesis:us-east-1:111122223333:stream/stream-name" ] } ] }

    如果在直播中启用了服务器端加密,则以下 AWS KMS 策略允许对记录进行解密。将 placeholder values 替换为您自己的信息。

    JSON
    { "Version": "2012-10-17", "Statement": [ { "Sid": "allowDecryptionOfCustomManagedKey", "Effect": "Allow", "Action": [ "kms:Decrypt", "kms:GenerateDataKey" ], "Resource": "arn:aws:kms:us-east-1:111122223333:key/key-id" } ] }

    为使管道能够将数据写入域,域必须具有域级访问策略,以允许 sts_role_arn 管道角色访问域。

    以下示例是一个域访问策略,它允许在上一步中创建的管道角色 (pipeline-role) 向ingestion-domain域写入数据。将 placeholder values 替换为您自己的信息。

    { "Statement": [ { "Effect": "Allow", "Principal": { "AWS": "arn:aws:iam::your-account-id:role/pipeline-role" }, "Action": ["es:DescribeDomain", "es:ESHttp*"], "Resource": "arn:aws:es:AWS 区域:account-id:domain/domain-name/*" } ] }
  3. 创建管道

    配置一个将 K 指定inesis-data-streams为来源的 OpenSearch 摄取管道。您可以在 OpenSearch Ingestion 控制台上找到用于创建此类管道的现成蓝图。(可选)要使用创建管道 AWS CLI,您可以使用名为 “AWS-KinesisDataStreamsPipeline” 的蓝图。将 placeholder values 替换为您自己的信息。

    version: "2" kinesis-pipeline: source: kinesis_data_streams: acknowledgments: true codec: # Based on whether kinesis records are aggregated or not, you could choose json, newline or ndjson codec for processing the records. # JSON codec supports parsing nested CloudWatch Events into individual log entries that will be written as documents into OpenSearch. # json: # key_name: "logEvents" # These keys contain the metadata sent by CloudWatch Subscription Filters # in addition to the individual log events: # include_keys: [ 'owner', 'logGroup', 'logStream' ] newline: streams: - stream_name: "stream name" # Enable this if ingestion should start from the start of the stream. # initial_position: "EARLIEST" # checkpoint_interval: "PT5M" # Compression will always be gzip for CloudWatch, but will vary for other sources: # compression: "gzip" - stream_name: "stream name" # Enable this if ingestion should start from the start of the stream. # initial_position: "EARLIEST" # checkpoint_interval: "PT5M" # Compression will always be gzip for CloudWatch, but will vary for other sources: # compression: "gzip" # buffer_timeout: "1s" # records_to_accumulate: 100 # Change the consumer strategy to "polling". Default consumer strategy will use enhanced "fan-out" supported by KDS. # consumer_strategy: "polling" # if consumer strategy is set to "polling", enable the polling config below. # polling: # max_polling_records: 100 # idle_time_between_reads: "250ms" aws: # Provide the Role ARN with access to Amazon Kinesis Data Streams. This role should have a trust relationship with osis-pipelines.amazonaws.com sts_role_arn: "arn:aws:iam::111122223333:role/Example-Role" # Provide the AWS 区域 of the Data Stream. region: "us-east-1" sink: - opensearch: # Provide an Amazon OpenSearch Serverless domain endpoint hosts: [ "https://search-mydomain-1a2a3a4a5a6a7a8a9a0a9a8a7a.us-east-1.es.amazonaws.com" ] index: "index_${getMetadata(\"stream_name\")}" # Ensure adding unique document id as a combination of the metadata attributes available. document_id: "${getMetadata(\"partition_key\")}_${getMetadata(\"sequence_number\")}_${getMetadata(\"sub_sequence_number\")}" aws: # Provide a Role ARN with access to the domain. This role should have a trust relationship with osis-pipelines.amazonaws.com sts_role_arn: "arn:aws:iam::111122223333:role/Example-Role" # Provide the AWS 区域 of the domain. region: "us-east-1" # Enable the 'serverless' flag if the sink is an Amazon OpenSearch Serverless collection serverless: false # serverless_options: # Specify a name here to create or update network policy for the serverless collection # network_policy_name: "network-policy-name" # Enable the 'distribution_version' setting if the OpenSearch Serverless domain is of version Elasticsearch 6.x # distribution_version: "es6" # Enable and switch the 'enable_request_compression' flag if the default compression setting is changed in the domain. See https://docs.aws.amazon.com/opensearch-service/latest/developerguide/gzip.html # enable_request_compression: true/false # Optional: Enable the S3 DLQ to capture any failed requests in an S3 bucket. Delete this entire block if you don't want a DLQ. dlq: s3: # Provide an S3 bucket bucket: "your-dlq-bucket-name" # Provide a key path prefix for the failed requests # key_path_prefix: "kinesis-pipeline/logs/dlq" # Provide the region of the bucket. region: "us-east-1" # Provide a Role ARN with access to the bucket. This role should have a trust relationship with osis-pipelines.amazonaws.com sts_role_arn: "arn:aws:iam::111122223333:role/Example-Role"
    配置选项

    有关 Kinesis 配置选项,请参阅文档中的OpenSearch配置选项

    可用的元数据属性
    • stream_name — 从中提取记录的 Kinesis Data Streams 的名称

    • p@@ artition_key — 正在提取的 Kinesis Data Streams 记录的分区键

    • s@@ equence_n umber — 正在摄取的 Kinesis Data Streams 记录的序列号

    • sub_sequence_number — 正在摄取的 Kinesis Data Streams 记录的子序列号

  4. (可选)为 Kinesis Data Streams 管道配置推荐的计算单位 (OCUs)

    也可以将 OpenSearch Kinesis Data Streams 源管道配置为从多个流中提取直播记录。为了加快摄取速度,我们建议您为每个新添加的流添加额外的计算单元。

数据一致性

OpenSearch Ingestion 支持 end-to-end确认,以确保数据的持久性。当管道从 Kinesis 读取流记录时,它会根据与流关联的分片动态分配读取流记录的工作。Pipeline 在收录 OpenSearch 域或集合中的所有记录后收到确认后,将自动对流进行检查点。这将避免对直播记录进行重复处理。

要根据直播名称创建索引,请在 opensearch sink 部分中将索引定义为 “index_$ {getMetadata (\" stream_name\”)}”。

亚马逊 Kinesis Data Streams 跨账户作为来源

您可以使用 Amazon Kinesis Data Streams 跨账户授予访问权限 OpenSearch ,以便摄取管道可以访问另一个账户作为来源的 Kinesis Data Streams。完成以下步骤以启用跨账户访问权限:

配置跨账户访问
  1. 在拥有 Kinesis 直播的账户中设置资源策略

    placeholder values 替换为您自己的信息。

    JSON
    { "Version": "2012-10-17", "Statement": [ { "Sid": "StreamReadStatementID", "Effect": "Allow", "Principal": { "AWS": "arn:aws:iam::111122223333:role/Pipeline-Role" }, "Action": [ "kinesis:DescribeStreamSummary", "kinesis:GetRecords", "kinesis:GetShardIterator", "kinesis:ListShards" ], "Resource": "arn:aws:kinesis:us-east-1:444455556666:stream/stream-name" }, { "Sid": "StreamEFOReadStatementID", "Effect": "Allow", "Principal": { "AWS": "arn:aws:iam::111122223333:role/Pipeline-Role" }, "Action": [ "kinesis:DescribeStreamSummary", "kinesis:ListShards" ], "Resource": "arn:aws:kinesis:us-east-1:444455556666:stream/stream-name/consumer/consumer-name" } ] }
  2. (可选)设置消费者和消费者资源政策

    这是一个可选步骤,只有在您计划使用增强型 Fanout Consumer 策略读取直播记录时才需要执行此步骤。有关更多信息,请参阅开发具有专用吞吐量的增强型扇出消费者

    1. 设置消费者

      要重复使用现有消费者,可以跳过此步骤。有关更多信息,请参阅RegisterStreamConsumer亚马逊 Kinesis Data Streams API 参考》。

      在以下示例 CLI 命令中,placeholder values用您自己的信息替换。

      例 示例 CLI 命令:
      aws kinesis register-stream-consumer \ --stream-arn "arn:aws:kinesis:AWS 区域:account-id:stream/stream-name" \ --consumer-name consumer-name
    2. 设置消费者资源政策

      在以下语句中,placeholder values用您自己的信息替换。

      JSON
      { "Version": "2012-10-17", "Statement": [ { "Sid": "ConsumerEFOReadStatementID", "Effect": "Allow", "Principal": { "AWS": "arn:aws:iam::111122223333:role/Pipeline-Role" }, "Action": [ "kinesis:DescribeStreamConsumer", "kinesis:SubscribeToShard" ], "Resource": "arn:aws:kinesis:us-east-1:444455556666:stream/stream-1/consumer/consumer-name" } ] }
  3. 管道配置

    要进行跨账号提取,请在下方kinesis_data_streams为每个直播添加以下属性:

    • stream_arn-属于直播所在账号的直播的 arn

    • consumer_arn-这是一个可选属性,如果选择默认的增强型扇出消费者策略,则必须指定该属性。为此字段指定实际使用者 arn。将 placeholder values 替换为您自己的信息。

    version: "2" kinesis-pipeline: source: kinesis_data_streams: acknowledgments: true codec: newline: streams: - stream_arn: "arn:aws:kinesis:region:stream-account-id:stream/stream-name" consumer_arn: "consumer arn" # Enable this if ingestion should start from the start of the stream. # initial_position: "EARLIEST" # checkpoint_interval: "PT5M" - stream_arn: "arn:aws:kinesis:region:stream-account-id:stream/stream-name" consumer_arn: "consumer arn" # initial_position: "EARLIEST" # buffer_timeout: "1s" # records_to_accumulate: 100 # Enable the consumer strategy to "polling". Default consumer strategy will use enhanced "fan-out" supported by KDS. # consumer_strategy: "polling" # if consumer strategy is set to "polling", enable the polling config below. # polling: # max_polling_records: 100 # idle_time_between_reads: "250ms" aws: # Provide the Role ARN with access to Kinesis. This role should have a trust relationship with osis-pipelines.amazonaws.com sts_role_arn: "arn:aws:iam::111122223333:role/Example-Role" # Provide the AWS 区域 of the domain. region: "us-east-1" sink: - opensearch: # Provide an OpenSearch Serverless domain endpoint hosts: [ "https://search-mydomain-1a2a3a4a5a6a7a8a9a0a9a8a7a.us-east-1.es.amazonaws.com" ] index: "index_${getMetadata(\"stream_name\")}" # Mapping for documentid based on partition key, shard sequence number and subsequence number metadata attributes document_id: "${getMetadata(\"partition_key\")}_${getMetadata(\"sequence_number\")}_${getMetadata(\"sub_sequence_number\")}" aws: # Provide a Role ARN with access to the domain. This role should have a trust relationship with osis-pipelines.amazonaws.com sts_role_arn: "arn:aws:iam::111122223333:role/Example-Role" # Provide the AWS 区域 of the domain. region: "us-east-1" # Enable the 'serverless' flag if the sink is an OpenSearch Serverless collection serverless: false # serverless_options: # Specify a name here to create or update network policy for the serverless collection # network_policy_name: network-policy-name # Enable the 'distribution_version' setting if the OpenSearch Serverless domain is of version Elasticsearch 6.x # distribution_version: "es6" # Enable and switch the 'enable_request_compression' flag if the default compression setting is changed in the domain. See https://docs.aws.amazon.com/opensearch-service/latest/developerguide/gzip.html # enable_request_compression: true/false # Optional: Enable the S3 DLQ to capture any failed requests in an S3 bucket. Delete this entire block if you don't want a DLQ. dlq: s3: # Provide an Amazon S3 bucket bucket: "your-dlq-bucket-name" # Provide a key path prefix for the failed requests # key_path_prefix: "alb-access-log-pipeline/logs/dlq" # Provide the AWS 区域 of the bucket. region: "us-east-1" # Provide a Role ARN with access to the bucket. This role should have a trust relationship with osis-pipelines.amazonaws.com sts_role_arn: "arn:aws:iam::111122223333:role/Example-Role"
  4. OSI 管道角色 Kinesis Data Streams
    1. IAM Policy

      将以下策略添加到管道角色。将 placeholder values 替换为您自己的信息。

      JSON
      { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kinesis:DescribeStreamConsumer", "kinesis:SubscribeToShard" ], "Resource": [ "arn:aws:kinesis:us-east-1:111122223333:stream/my-stream" ] }, { "Sid": "allowReadFromStream", "Effect": "Allow", "Action": [ "kinesis:DescribeStream", "kinesis:DescribeStreamSummary", "kinesis:GetRecords", "kinesis:GetShardIterator", "kinesis:ListShards", "kinesis:ListStreams", "kinesis:ListStreamConsumers", "kinesis:RegisterStreamConsumer" ], "Resource": [ "arn:aws:kinesis:us-east-1:111122223333:stream/my-stream" ] } ] }
    2. 信任策略

      为了从直播账户提取数据,你需要在管道摄取角色和直播账户之间建立信任关系。将以下内容添加到管道角色中。将 placeholder values 替换为您自己的信息。

      JSON
      { "Version": "2012-10-17", "Statement": [{ "Effect": "Allow", "Principal": { "AWS": "arn:aws:iam::111122223333:root" }, "Action": "sts:AssumeRole" }] }