Amazon Kinesis Data Streams에서 OpenSearch Ingestion 파이프라인 사용 - Amazon OpenSearch Service

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

Amazon Kinesis Data Streams에서 OpenSearch Ingestion 파이프라인 사용

Amazon Kinesis Data Streams와 함께 OpenSearch Ingestion 파이프라인을 사용하여 여러 스트림의 스트림 레코드 데이터를 Amazon OpenSearch Service 도메인 및 컬렉션으로 수집합니다. OpenSearch Ingestion 파이프라인은 스트리밍 수집 인프라를 통합하여 Kinesis에서 스트림 레코드를 지속적으로 수집할 수 있는 대규모의 짧은 지연 시간을 제공합니다.

소스로서의 Amazon Kinesis Data Streams

다음 절차에서는 Amazon Kinesis Data Streams를 데이터 소스로 사용하는 OpenSearch Ingestion 파이프라인을 설정하는 방법을 알아봅니다. 이 섹션에서는 OpenSearch Service 도메인 또는 OpenSearch Serverless 컬렉션을 생성하고 파이프라인 역할을 구성하고 파이프라인을 생성하는 단계를 안내하는 등 필요한 사전 조건을 다룹니다.

사전 조건

파이프라인을 설정하려면 하나 이상의 활성 Kinesis Data Streams가 필요합니다. 이러한 스트림은 레코드를 수신하거나 다른 소스에서 레코드를 수신할 준비가 되어 있어야 합니다. 자세한 내용은 OpenSearch Ingestion 개요를 참조하세요.

파이프라인을 설정하려면
  1. OpenSearch Service 도메인 또는 OpenSearch Serverless 컬렉션 생성

    도메인 또는 컬렉션을 생성하려면 OpenSearch Ingestion 시작하기를 참조하세요.

    컬렉션 또는 도메인에 쓰기 데이터에 액세스할 수 있는 올바른 권한이 있는 IAM 역할을 생성하려면 리소스 기반 정책을 참조하세요.

  2. 권한을 사용하여 파이프라인 역할 구성

    파이프라인 구성에 사용할 파이프라인 역할을 설정하고 다음 권한을 추가합니다. 자리 표시자를 자신의 정보로 바꿉니다.

    JSON
    { "Version": "2012-10-17", "Statement": [ { "Sid": "allowReadFromStream", "Effect": "Allow", "Action": [ "kinesis:DescribeStream", "kinesis:DescribeStreamConsumer", "kinesis:DescribeStreamSummary", "kinesis:GetRecords", "kinesis:GetShardIterator", "kinesis:ListShards", "kinesis:ListStreams", "kinesis:ListStreamConsumers", "kinesis:RegisterStreamConsumer", "kinesis:SubscribeToShard" ], "Resource": [ "arn:aws:kinesis:us-east-1:111122223333:stream/stream-name" ] } ] }

    스트림에서 서버 측 암호화가 활성화된 경우 다음 AWS KMS 정책은가 레코드를 해독하도록 허용합니다. 자리 표시자를 자신의 정보로 바꿉니다.

    JSON
    { "Version": "2012-10-17", "Statement": [ { "Sid": "allowDecryptionOfCustomManagedKey", "Effect": "Allow", "Action": [ "kms:Decrypt", "kms:GenerateDataKey" ], "Resource": "arn:aws:kms:us-east-1:111122223333:key/key-id" } ] }

    파이프라인이 도메인에 데이터를 쓰려면 도메인에 sts_role_arn 파이프라인 역할이 도메인에 액세스할 수 있도록 허용하는 도메인 수준 액세스 정책이 있어야 합니다.

    다음 예제는 이전 단계(pipeline-role)에서 생성된 파이프라인 역할이 도메인에 데이터를 쓸 수 있도록 허용하는 ingestion-domain 도메인 액세스 정책입니다. 자리 표시자를 자신의 정보로 바꿉니다.

    { "Statement": [ { "Effect": "Allow", "Principal": { "AWS": "arn:aws:iam::your-account-id:role/pipeline-role" }, "Action": ["es:DescribeDomain", "es:ESHttp*"], "Resource": "arn:aws:es:AWS 리전:account-id:domain/domain-name/*" } ] }
  3. 파이프라인 생성

    Kinesis-data-streams를 소스로 지정하여 OpenSearch Ingestion 파이프라인을 구성합니다. 이러한 파이프라인을 생성하기 위해 OpenSearch Ingestion 콘솔에서 사용 가능한 준비된 블루프린트를 찾을 수 있습니다. (선택 사항)를 사용하여 파이프라인을 생성하려면 "AWS-KinesisDataStreamsPipeline"라는 블루프린트를 사용할 AWS CLI수 있습니다. 자리 표시자를 자신의 정보로 바꿉니다.

    version: "2" kinesis-pipeline: source: kinesis_data_streams: acknowledgments: true codec: # Based on whether kinesis records are aggregated or not, you could choose json, newline or ndjson codec for processing the records. # JSON codec supports parsing nested CloudWatch Events into individual log entries that will be written as documents into OpenSearch. # json: # key_name: "logEvents" # These keys contain the metadata sent by CloudWatch Subscription Filters # in addition to the individual log events: # include_keys: [ 'owner', 'logGroup', 'logStream' ] newline: streams: - stream_name: "stream name" # Enable this if ingestion should start from the start of the stream. # initial_position: "EARLIEST" # checkpoint_interval: "PT5M" # Compression will always be gzip for CloudWatch, but will vary for other sources: # compression: "gzip" - stream_name: "stream name" # Enable this if ingestion should start from the start of the stream. # initial_position: "EARLIEST" # checkpoint_interval: "PT5M" # Compression will always be gzip for CloudWatch, but will vary for other sources: # compression: "gzip" # buffer_timeout: "1s" # records_to_accumulate: 100 # Change the consumer strategy to "polling". Default consumer strategy will use enhanced "fan-out" supported by KDS. # consumer_strategy: "polling" # if consumer strategy is set to "polling", enable the polling config below. # polling: # max_polling_records: 100 # idle_time_between_reads: "250ms" aws: # Provide the Role ARN with access to Amazon Kinesis Data Streams. This role should have a trust relationship with osis-pipelines.amazonaws.com sts_role_arn: "arn:aws:iam::111122223333:role/Example-Role" # Provide the AWS 리전 of the Data Stream. region: "us-east-1" sink: - opensearch: # Provide an Amazon OpenSearch Serverless domain endpoint hosts: [ "https://search-mydomain-1a2a3a4a5a6a7a8a9a0a9a8a7a.us-east-1.es.amazonaws.com" ] index: "index_${getMetadata(\"stream_name\")}" # Ensure adding unique document id as a combination of the metadata attributes available. document_id: "${getMetadata(\"partition_key\")}_${getMetadata(\"sequence_number\")}_${getMetadata(\"sub_sequence_number\")}" aws: # Provide a Role ARN with access to the domain. This role should have a trust relationship with osis-pipelines.amazonaws.com sts_role_arn: "arn:aws:iam::111122223333:role/Example-Role" # Provide the AWS 리전 of the domain. region: "us-east-1" # Enable the 'serverless' flag if the sink is an Amazon OpenSearch Serverless collection serverless: false # serverless_options: # Specify a name here to create or update network policy for the serverless collection # network_policy_name: "network-policy-name" # Enable the 'distribution_version' setting if the OpenSearch Serverless domain is of version Elasticsearch 6.x # distribution_version: "es6" # Enable and switch the 'enable_request_compression' flag if the default compression setting is changed in the domain. See https://docs.aws.amazon.com/opensearch-service/latest/developerguide/gzip.html # enable_request_compression: true/false # Optional: Enable the S3 DLQ to capture any failed requests in an S3 bucket. Delete this entire block if you don't want a DLQ. dlq: s3: # Provide an S3 bucket bucket: "your-dlq-bucket-name" # Provide a key path prefix for the failed requests # key_path_prefix: "kinesis-pipeline/logs/dlq" # Provide the region of the bucket. region: "us-east-1" # Provide a Role ARN with access to the bucket. This role should have a trust relationship with osis-pipelines.amazonaws.com sts_role_arn: "arn:aws:iam::111122223333:role/Example-Role"
    구성 옵션

    Kinesis 구성 옵션은 OpenSearch 설명서의 구성 옵션을 참조하세요.

    사용 가능한 메타데이터 속성
    • stream_name - 레코드가 수집된 Kinesis Data Streams의 이름입니다.

    • partition_key - 수집 중인 Kinesis Data Streams 레코드의 파티션 키

    • sequence_number – 수집 중인 Kinesis Data Streams 레코드의 시퀀스 번호

    • sub_sequence_number – 수집 중인 Kinesis Data Streams 레코드의 하위 시퀀스 번호

  4. (선택 사항) Kinesis Data Streams 파이프라인에 대한 권장 컴퓨팅 유닛(OCUs) 구성

    OpenSearch Kinesis Data Streams 소스 파이프라인은 둘 이상의 스트림에서 스트림 레코드를 수집하도록 구성할 수도 있습니다. 수집 속도를 높이려면 추가된 새 스트림당 컴퓨팅 단위를 추가하는 것이 좋습니다.

데이터 일관성

OpenSearch Ingestion은 데이터 내구성을 보장하는 엔드 투 엔드 승인을 지원합니다. 파이프라인은 Kinesis에서 스트림 레코드를 읽을 때 스트림과 연결된 샤드를 기반으로 스트림 레코드를 읽는 작업을 동적으로 분산합니다. 파이프라인은 OpenSearch 도메인 또는 컬렉션의 모든 레코드를 수집한 후 승인을 받으면 스트림을 자동으로 체크포인트합니다. 이렇게 하면 스트림 레코드의 중복 처리가 방지됩니다.

스트림 이름을 기반으로 인덱스를 생성하려면 오픈서치 싱크 섹션의 인덱스를 "index_${getMetadata(\"stream_name\")}"로 정의합니다.

Amazon Kinesis Data Streams 교차 계정을 소스로 사용

OpenSearch Ingestion 파이프라인이 다른 계정의 Kinesis Data Streams에 소스로 액세스할 수 있도록 Amazon Kinesis Data Streams를 사용하여 계정 간에 액세스 권한을 부여할 수 있습니다. 교차 계정 액세스를 활성화하려면 다음 단계를 완료하세요.

교차 계정 액세스 구성
  1. Kinesis 스트림이 있는 계정에서 리소스 정책 설정

    자리 표시자를 자신의 정보로 바꿉니다.

    JSON
    { "Version": "2012-10-17", "Statement": [ { "Sid": "StreamReadStatementID", "Effect": "Allow", "Principal": { "AWS": "arn:aws:iam::111122223333:role/Pipeline-Role" }, "Action": [ "kinesis:DescribeStreamSummary", "kinesis:GetRecords", "kinesis:GetShardIterator", "kinesis:ListShards" ], "Resource": "arn:aws:kinesis:us-east-1:444455556666:stream/stream-name" }, { "Sid": "StreamEFOReadStatementID", "Effect": "Allow", "Principal": { "AWS": "arn:aws:iam::111122223333:role/Pipeline-Role" }, "Action": [ "kinesis:DescribeStreamSummary", "kinesis:ListShards" ], "Resource": "arn:aws:kinesis:us-east-1:444455556666:stream/stream-name/consumer/consumer-name" } ] }
  2. (선택 사항) 소비자 및 소비자 리소스 정책 설정

    이는 선택적 단계이며 스트림 레코드 읽기에 향상된 팬아웃 소비자 전략을 사용할 계획인 경우에만 필요합니다. 자세한 내용은 전용 처리량으로 향상된 팬아웃 소비자 개발을 참조하세요.

    1. 소비자 설정

      기존 소비자를 재사용하려면이 단계를 건너뛸 수 있습니다. 자세한 내용은 Amazon Kinesis Data Streams API 참조의 RegisterStreamConsumer를 참조하세요. Amazon Kinesis

      다음 CLI 명령 예제에서 자리 표시자 값을 자신의 정보로 바꿉니다.

      예 CLI 명령 예제:
      aws kinesis register-stream-consumer \ --stream-arn "arn:aws:kinesis:AWS 리전:account-id:stream/stream-name" \ --consumer-name consumer-name
    2. 소비자 리소스 정책 설정

      다음 문에서 자리 표시자 값을 자신의 정보로 바꿉니다.

      JSON
      { "Version": "2012-10-17", "Statement": [ { "Sid": "ConsumerEFOReadStatementID", "Effect": "Allow", "Principal": { "AWS": "arn:aws:iam::111122223333:role/Pipeline-Role" }, "Action": [ "kinesis:DescribeStreamConsumer", "kinesis:SubscribeToShard" ], "Resource": "arn:aws:kinesis:us-east-1:444455556666:stream/stream-1/consumer/consumer-name" } ] }
  3. 파이프라인 구성

    교차 계정 수집의 경우 각 스트림에 kinesis_data_streams 대해 아래에 다음 속성을 추가합니다.

    • stream_arn - 스트림이 있는 계정에 속한 스트림의 ARN

    • consumer_arn - 이는 선택적 속성이며 기본 향상된 팬아웃 소비자 전략을 선택한 경우 지정해야 합니다. 이 필드의 실제 소비자 ARN을 지정합니다. 자리 표시자를 자신의 정보로 바꿉니다.

    version: "2" kinesis-pipeline: source: kinesis_data_streams: acknowledgments: true codec: newline: streams: - stream_arn: "arn:aws:kinesis:region:stream-account-id:stream/stream-name" consumer_arn: "consumer arn" # Enable this if ingestion should start from the start of the stream. # initial_position: "EARLIEST" # checkpoint_interval: "PT5M" - stream_arn: "arn:aws:kinesis:region:stream-account-id:stream/stream-name" consumer_arn: "consumer arn" # initial_position: "EARLIEST" # buffer_timeout: "1s" # records_to_accumulate: 100 # Enable the consumer strategy to "polling". Default consumer strategy will use enhanced "fan-out" supported by KDS. # consumer_strategy: "polling" # if consumer strategy is set to "polling", enable the polling config below. # polling: # max_polling_records: 100 # idle_time_between_reads: "250ms" aws: # Provide the Role ARN with access to Kinesis. This role should have a trust relationship with osis-pipelines.amazonaws.com sts_role_arn: "arn:aws:iam::111122223333:role/Example-Role" # Provide the AWS 리전 of the domain. region: "us-east-1" sink: - opensearch: # Provide an OpenSearch Serverless domain endpoint hosts: [ "https://search-mydomain-1a2a3a4a5a6a7a8a9a0a9a8a7a.us-east-1.es.amazonaws.com" ] index: "index_${getMetadata(\"stream_name\")}" # Mapping for documentid based on partition key, shard sequence number and subsequence number metadata attributes document_id: "${getMetadata(\"partition_key\")}_${getMetadata(\"sequence_number\")}_${getMetadata(\"sub_sequence_number\")}" aws: # Provide a Role ARN with access to the domain. This role should have a trust relationship with osis-pipelines.amazonaws.com sts_role_arn: "arn:aws:iam::111122223333:role/Example-Role" # Provide the AWS 리전 of the domain. region: "us-east-1" # Enable the 'serverless' flag if the sink is an OpenSearch Serverless collection serverless: false # serverless_options: # Specify a name here to create or update network policy for the serverless collection # network_policy_name: network-policy-name # Enable the 'distribution_version' setting if the OpenSearch Serverless domain is of version Elasticsearch 6.x # distribution_version: "es6" # Enable and switch the 'enable_request_compression' flag if the default compression setting is changed in the domain. See https://docs.aws.amazon.com/opensearch-service/latest/developerguide/gzip.html # enable_request_compression: true/false # Optional: Enable the S3 DLQ to capture any failed requests in an S3 bucket. Delete this entire block if you don't want a DLQ. dlq: s3: # Provide an Amazon S3 bucket bucket: "your-dlq-bucket-name" # Provide a key path prefix for the failed requests # key_path_prefix: "alb-access-log-pipeline/logs/dlq" # Provide the AWS 리전 of the bucket. region: "us-east-1" # Provide a Role ARN with access to the bucket. This role should have a trust relationship with osis-pipelines.amazonaws.com sts_role_arn: "arn:aws:iam::111122223333:role/Example-Role"
  4. OSI 파이프라인 역할 Kinesis Data Streams
    1. IAM 정책

      파이프라인 역할에 다음 정책을 추가합니다. 자리 표시자를 자신의 정보로 바꿉니다.

      JSON
      { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kinesis:DescribeStreamConsumer", "kinesis:SubscribeToShard" ], "Resource": [ "arn:aws:kinesis:us-east-1:111122223333:stream/my-stream" ] }, { "Sid": "allowReadFromStream", "Effect": "Allow", "Action": [ "kinesis:DescribeStream", "kinesis:DescribeStreamSummary", "kinesis:GetRecords", "kinesis:GetShardIterator", "kinesis:ListShards", "kinesis:ListStreams", "kinesis:ListStreamConsumers", "kinesis:RegisterStreamConsumer" ], "Resource": [ "arn:aws:kinesis:us-east-1:111122223333:stream/my-stream" ] } ] }
    2. 신뢰 정책

      스트림 계정에서 데이터를 수집하려면 파이프라인 수집 역할과 스트림 계정 간에 신뢰 관계를 설정해야 합니다. 파이프라인 역할에 다음을 추가합니다. 자리 표시자를 자신의 정보로 바꿉니다.

      JSON
      { "Version": "2012-10-17", "Statement": [{ "Effect": "Allow", "Principal": { "AWS": "arn:aws:iam::111122223333:root" }, "Action": "sts:AssumeRole" }] }