Usa una pipeline OpenSearch di ingestione con Amazon Kinesis Data Streams - OpenSearch Servizio Amazon

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Usa una pipeline OpenSearch di ingestione con Amazon Kinesis Data Streams

Utilizza una pipeline di OpenSearch ingestione con Amazon Kinesis Data Streams per importare i dati dei record di flusso da più flussi in domini e raccolte di Amazon Service. OpenSearch La pipeline OpenSearch Ingestion incorpora l'infrastruttura di inserimento dello streaming per fornire un modo su larga scala e a bassa latenza per importare continuamente i record di streaming da Kinesis.

Amazon Kinesis Data Streams come fonte

Con la procedura seguente, imparerai come configurare una pipeline di OpenSearch ingestione che utilizza Amazon Kinesis Data Streams come origine dati. Questa sezione descrive i prerequisiti necessari, come la creazione di un dominio di OpenSearch servizio o una collezione OpenSearch Serverless, e illustra i passaggi per configurare il ruolo della pipeline e creare la pipeline.

Prerequisiti

Per configurare la pipeline, sono necessari uno o più Kinesis Data Streams attivi. Questi flussi devono ricevere record o essere pronti a ricevere record da altre fonti. Per ulteriori informazioni, vedere Panoramica sull' OpenSearch ingestione.

Per configurare la pipeline
  1. Crea un dominio OpenSearch di servizio o una raccolta OpenSearch Serverless

    Per creare un dominio o una raccolta, consulta Guida introduttiva a OpenSearch Ingestion.

    Per creare un ruolo IAM con le autorizzazioni corrette per accedere ai dati di scrittura nella raccolta o nel dominio, consulta Politiche basate sulle risorse.

  2. Configura il ruolo della pipeline con le autorizzazioni

    Imposta il ruolo della pipeline che desideri utilizzare nella configurazione della pipeline e aggiungi le seguenti autorizzazioni. Sostituire placeholder values con le proprie informazioni.

    JSON
    { "Version": "2012-10-17", "Statement": [ { "Sid": "allowReadFromStream", "Effect": "Allow", "Action": [ "kinesis:DescribeStream", "kinesis:DescribeStreamConsumer", "kinesis:DescribeStreamSummary", "kinesis:GetRecords", "kinesis:GetShardIterator", "kinesis:ListShards", "kinesis:ListStreams", "kinesis:ListStreamConsumers", "kinesis:RegisterStreamConsumer", "kinesis:SubscribeToShard" ], "Resource": [ "arn:aws:kinesis:us-east-1:111122223333:stream/stream-name" ] } ] }

    Se la crittografia lato server è abilitata negli stream, la seguente AWS KMS politica consente di decrittografare i record. Sostituire placeholder values con le proprie informazioni.

    JSON
    { "Version": "2012-10-17", "Statement": [ { "Sid": "allowDecryptionOfCustomManagedKey", "Effect": "Allow", "Action": [ "kms:Decrypt", "kms:GenerateDataKey" ], "Resource": "arn:aws:kms:us-east-1:111122223333:key/key-id" } ] }

    Affinché una pipeline possa scrivere dati su un dominio, il dominio deve disporre di una politica di accesso a livello di dominio che consenta al ruolo della pipeline sts_role_arn di accedervi.

    L'esempio seguente è una politica di accesso al dominio che consente al ruolo pipeline creato nel passaggio precedente (), di scrivere dati nel dominio. pipeline-role ingestion-domain Sostituire placeholder values con le proprie informazioni.

    { "Statement": [ { "Effect": "Allow", "Principal": { "AWS": "arn:aws:iam::your-account-id:role/pipeline-role" }, "Action": ["es:DescribeDomain", "es:ESHttp*"], "Resource": "arn:aws:es:Regione AWS:account-id:domain/domain-name/*" } ] }
  3. Crea la pipeline

    Configura una pipeline OpenSearch di ingestione specificando K come origine. inesis-data-streams È possibile individuare un blueprint già pronto e disponibile nella OpenSearch Ingestion Console per la creazione di tale pipeline. (Facoltativo) Per creare la pipeline utilizzando AWS CLI, è possibile utilizzare un blueprint denominato "». AWS-KinesisDataStreamsPipeline Sostituire placeholder values con le proprie informazioni.

    version: "2" kinesis-pipeline: source: kinesis_data_streams: acknowledgments: true codec: # Based on whether kinesis records are aggregated or not, you could choose json, newline or ndjson codec for processing the records. # JSON codec supports parsing nested CloudWatch Events into individual log entries that will be written as documents into OpenSearch. # json: # key_name: "logEvents" # These keys contain the metadata sent by CloudWatch Subscription Filters # in addition to the individual log events: # include_keys: [ 'owner', 'logGroup', 'logStream' ] newline: streams: - stream_name: "stream name" # Enable this if ingestion should start from the start of the stream. # initial_position: "EARLIEST" # checkpoint_interval: "PT5M" # Compression will always be gzip for CloudWatch, but will vary for other sources: # compression: "gzip" - stream_name: "stream name" # Enable this if ingestion should start from the start of the stream. # initial_position: "EARLIEST" # checkpoint_interval: "PT5M" # Compression will always be gzip for CloudWatch, but will vary for other sources: # compression: "gzip" # buffer_timeout: "1s" # records_to_accumulate: 100 # Change the consumer strategy to "polling". Default consumer strategy will use enhanced "fan-out" supported by KDS. # consumer_strategy: "polling" # if consumer strategy is set to "polling", enable the polling config below. # polling: # max_polling_records: 100 # idle_time_between_reads: "250ms" aws: # Provide the Role ARN with access to Amazon Kinesis Data Streams. This role should have a trust relationship with osis-pipelines.amazonaws.com sts_role_arn: "arn:aws:iam::111122223333:role/Example-Role" # Provide the Regione AWS of the Data Stream. region: "us-east-1" sink: - opensearch: # Provide an Amazon OpenSearch Serverless domain endpoint hosts: [ "https://search-mydomain-1a2a3a4a5a6a7a8a9a0a9a8a7a.us-east-1.es.amazonaws.com" ] index: "index_${getMetadata(\"stream_name\")}" # Ensure adding unique document id as a combination of the metadata attributes available. document_id: "${getMetadata(\"partition_key\")}_${getMetadata(\"sequence_number\")}_${getMetadata(\"sub_sequence_number\")}" aws: # Provide a Role ARN with access to the domain. This role should have a trust relationship with osis-pipelines.amazonaws.com sts_role_arn: "arn:aws:iam::111122223333:role/Example-Role" # Provide the Regione AWS of the domain. region: "us-east-1" # Enable the 'serverless' flag if the sink is an Amazon OpenSearch Serverless collection serverless: false # serverless_options: # Specify a name here to create or update network policy for the serverless collection # network_policy_name: "network-policy-name" # Enable the 'distribution_version' setting if the OpenSearch Serverless domain is of version Elasticsearch 6.x # distribution_version: "es6" # Enable and switch the 'enable_request_compression' flag if the default compression setting is changed in the domain. See https://docs.aws.amazon.com/opensearch-service/latest/developerguide/gzip.html # enable_request_compression: true/false # Optional: Enable the S3 DLQ to capture any failed requests in an S3 bucket. Delete this entire block if you don't want a DLQ. dlq: s3: # Provide an S3 bucket bucket: "your-dlq-bucket-name" # Provide a key path prefix for the failed requests # key_path_prefix: "kinesis-pipeline/logs/dlq" # Provide the region of the bucket. region: "us-east-1" # Provide a Role ARN with access to the bucket. This role should have a trust relationship with osis-pipelines.amazonaws.com sts_role_arn: "arn:aws:iam::111122223333:role/Example-Role"
    Opzioni di configurazione

    Per le opzioni di configurazione di Kinesis, consulta Opzioni di configurazione nella OpenSearchdocumentazione.

    Attributi dei metadati disponibili
    • stream_name — Nome del Kinesis Data Streams da cui è stato importato il record

    • partition_key — Chiave di partizione del record Kinesis Data Streams che viene importato

    • sequence_number — Numero di sequenza del record Kinesis Data Streams che viene importato

    • sub_sequence_number — Numero di sottosequenza del record Kinesis Data Streams che viene importato

  4. (Facoltativo) Configura le unità di calcolo consigliate (OCUs) per la pipeline Kinesis Data Streams

    Una pipeline di origine OpenSearch Kinesis Data Streams può anche essere configurata per importare record di stream da più di un flusso. Per un'acquisizione più rapida, ti consigliamo di aggiungere un'unità di calcolo aggiuntiva per ogni nuovo flusso aggiunto.

Coerenza dei dati

OpenSearch Ingestion supporta il end-to-end riconoscimento per garantire la durabilità dei dati. Quando la pipeline legge i record dei flussi da Kinesis, distribuisce dinamicamente il lavoro di lettura dei record dei flussi in base agli shard associati agli stream. Pipeline effettuerà automaticamente il checkpoint dei flussi quando riceve una conferma dopo aver acquisito tutti i record del dominio o della raccolta. OpenSearch Ciò eviterà l'elaborazione duplicata dei record di stream.

Per creare l'indice in base al nome dello stream, definisci l'indice nella sezione opensearch sink come «index_$ {getMetaData (\" stream_name\»)}».

Amazon Kinesis Data Streams su più account come fonte

Puoi concedere l'accesso a più account con Amazon Kinesis Data Streams in OpenSearch modo che le pipeline di ingestione possano accedere a Kinesis Data Streams in un altro account come origine. Completa i seguenti passaggi per abilitare l'accesso tra più account:

Configurare l'accesso tra più account
  1. Imposta la politica delle risorse nell'account che ha il flusso Kinesis

    Sostituire placeholder values con le proprie informazioni.

    JSON
    { "Version": "2012-10-17", "Statement": [ { "Sid": "StreamReadStatementID", "Effect": "Allow", "Principal": { "AWS": "arn:aws:iam::111122223333:role/Pipeline-Role" }, "Action": [ "kinesis:DescribeStreamSummary", "kinesis:GetRecords", "kinesis:GetShardIterator", "kinesis:ListShards" ], "Resource": "arn:aws:kinesis:us-east-1:444455556666:stream/stream-name" }, { "Sid": "StreamEFOReadStatementID", "Effect": "Allow", "Principal": { "AWS": "arn:aws:iam::111122223333:role/Pipeline-Role" }, "Action": [ "kinesis:DescribeStreamSummary", "kinesis:ListShards" ], "Resource": "arn:aws:kinesis:us-east-1:444455556666:stream/stream-name/consumer/consumer-name" } ] }
  2. (Facoltativo) Imposta la politica relativa alle risorse destinate ai consumatori e ai consumatori

    Questo passaggio è facoltativo e sarà necessario solo se prevedi di utilizzare la strategia Enhanced Fanout Consumer per leggere i record degli stream. Per ulteriori informazioni, consulta Sviluppare utenti fan-out avanzati con throughput dedicato.

    1. Configura il consumatore

      Per riutilizzare un consumatore esistente, puoi saltare questo passaggio. Per ulteriori informazioni, consulta RegisterStreamConsumerAmazon Kinesis Data Streams API Reference.

      Nel seguente comando CLI di esempio, sostituisci il placeholder values con le tue informazioni.

      Esempio : Esempio di comando CLI
      aws kinesis register-stream-consumer \ --stream-arn "arn:aws:kinesis:Regione AWS:account-id:stream/stream-name" \ --consumer-name consumer-name
    2. Imposta la politica delle risorse per i consumatori

      Nella seguente dichiarazione, sostituiscili placeholder values con le tue informazioni.

      JSON
      { "Version": "2012-10-17", "Statement": [ { "Sid": "ConsumerEFOReadStatementID", "Effect": "Allow", "Principal": { "AWS": "arn:aws:iam::111122223333:role/Pipeline-Role" }, "Action": [ "kinesis:DescribeStreamConsumer", "kinesis:SubscribeToShard" ], "Resource": "arn:aws:kinesis:us-east-1:444455556666:stream/stream-1/consumer/consumer-name" } ] }
  3. Configurazione della pipeline

    Per l'ingestione tra più account, aggiungi i seguenti attributi sotto kinesis_data_streams per ogni stream:

    • stream_arn- l'arn dello stream appartenente all'account in cui esiste lo stream

    • consumer_arn- questo è un attributo facoltativo e deve essere specificato se viene scelta la strategia di consumo fanout avanzata predefinita. Specificare l'effettivo valore del consumatore per questo campo. Sostituire placeholder values con le proprie informazioni.

    version: "2" kinesis-pipeline: source: kinesis_data_streams: acknowledgments: true codec: newline: streams: - stream_arn: "arn:aws:kinesis:region:stream-account-id:stream/stream-name" consumer_arn: "consumer arn" # Enable this if ingestion should start from the start of the stream. # initial_position: "EARLIEST" # checkpoint_interval: "PT5M" - stream_arn: "arn:aws:kinesis:region:stream-account-id:stream/stream-name" consumer_arn: "consumer arn" # initial_position: "EARLIEST" # buffer_timeout: "1s" # records_to_accumulate: 100 # Enable the consumer strategy to "polling". Default consumer strategy will use enhanced "fan-out" supported by KDS. # consumer_strategy: "polling" # if consumer strategy is set to "polling", enable the polling config below. # polling: # max_polling_records: 100 # idle_time_between_reads: "250ms" aws: # Provide the Role ARN with access to Kinesis. This role should have a trust relationship with osis-pipelines.amazonaws.com sts_role_arn: "arn:aws:iam::111122223333:role/Example-Role" # Provide the Regione AWS of the domain. region: "us-east-1" sink: - opensearch: # Provide an OpenSearch Serverless domain endpoint hosts: [ "https://search-mydomain-1a2a3a4a5a6a7a8a9a0a9a8a7a.us-east-1.es.amazonaws.com" ] index: "index_${getMetadata(\"stream_name\")}" # Mapping for documentid based on partition key, shard sequence number and subsequence number metadata attributes document_id: "${getMetadata(\"partition_key\")}_${getMetadata(\"sequence_number\")}_${getMetadata(\"sub_sequence_number\")}" aws: # Provide a Role ARN with access to the domain. This role should have a trust relationship with osis-pipelines.amazonaws.com sts_role_arn: "arn:aws:iam::111122223333:role/Example-Role" # Provide the Regione AWS of the domain. region: "us-east-1" # Enable the 'serverless' flag if the sink is an OpenSearch Serverless collection serverless: false # serverless_options: # Specify a name here to create or update network policy for the serverless collection # network_policy_name: network-policy-name # Enable the 'distribution_version' setting if the OpenSearch Serverless domain is of version Elasticsearch 6.x # distribution_version: "es6" # Enable and switch the 'enable_request_compression' flag if the default compression setting is changed in the domain. See https://docs.aws.amazon.com/opensearch-service/latest/developerguide/gzip.html # enable_request_compression: true/false # Optional: Enable the S3 DLQ to capture any failed requests in an S3 bucket. Delete this entire block if you don't want a DLQ. dlq: s3: # Provide an Amazon S3 bucket bucket: "your-dlq-bucket-name" # Provide a key path prefix for the failed requests # key_path_prefix: "alb-access-log-pipeline/logs/dlq" # Provide the Regione AWS of the bucket. region: "us-east-1" # Provide a Role ARN with access to the bucket. This role should have a trust relationship with osis-pipelines.amazonaws.com sts_role_arn: "arn:aws:iam::111122223333:role/Example-Role"
  4. Ruolo della pipeline OSI Kinesis Data Streams
    1. Politica IAM

      Aggiungi la seguente politica al ruolo della pipeline. Sostituire placeholder values con le proprie informazioni.

      JSON
      { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kinesis:DescribeStreamConsumer", "kinesis:SubscribeToShard" ], "Resource": [ "arn:aws:kinesis:us-east-1:111122223333:stream/my-stream" ] }, { "Sid": "allowReadFromStream", "Effect": "Allow", "Action": [ "kinesis:DescribeStream", "kinesis:DescribeStreamSummary", "kinesis:GetRecords", "kinesis:GetShardIterator", "kinesis:ListShards", "kinesis:ListStreams", "kinesis:ListStreamConsumers", "kinesis:RegisterStreamConsumer" ], "Resource": [ "arn:aws:kinesis:us-east-1:111122223333:stream/my-stream" ] } ] }
    2. Policy di attendibilità

      Per importare i dati dall'account di streaming, è necessario stabilire una relazione di fiducia tra il ruolo di importazione della pipeline e l'account di streaming. Aggiungi quanto segue al ruolo pipeline. Sostituire placeholder values con le proprie informazioni.

      JSON
      { "Version": "2012-10-17", "Statement": [{ "Effect": "Allow", "Principal": { "AWS": "arn:aws:iam::111122223333:root" }, "Action": "sts:AssumeRole" }] }