Utilice una canalización OpenSearch de ingestión con Amazon Kinesis Data Streams - OpenSearch Servicio Amazon

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Utilice una canalización OpenSearch de ingestión con Amazon Kinesis Data Streams

Utilice una canalización de OpenSearch ingestión con Amazon Kinesis Data Streams para incorporar datos de registros de transmisiones de varias transmisiones a dominios y colecciones de OpenSearch Amazon Service. La canalización de OpenSearch ingestión incorpora la infraestructura de ingesta de streaming para proporcionar una forma a gran escala y baja latencia de ingerir continuamente los registros de streaming de Kinesis.

Amazon Kinesis Data Streams como fuente

Con el siguiente procedimiento, aprenderá a configurar una canalización de OpenSearch ingestión que utilice Amazon Kinesis Data Streams como fuente de datos. En esta sección se describen los requisitos previos necesarios, como la creación de un dominio de OpenSearch servicio o una colección OpenSearch sin servidor, y se explican los pasos para configurar la función de canalización y crear la canalización.

Requisitos previos

Para configurar la canalización, necesita uno o más Kinesis Data Streams activos. Estas transmisiones deben recibir registros o estar listas para recibir registros de otras fuentes. Para obtener más información, consulte Descripción general de la OpenSearch ingestión.

Para configurar tu canalización
  1. Crea un dominio OpenSearch de servicio o una colección OpenSearch sin servidor

    Para crear un dominio o una colección, consulta Cómo empezar a usar OpenSearch Ingestion.

    Para crear un rol de IAM con los permisos correctos para acceder a los datos de escritura en la colección o el dominio, consulta Políticas basadas en recursos.

  2. Configure el rol de canalización con permisos

    Configura el rol de canalización que quieres usar en tu configuración de canalización y añádele los siguientes permisos. Reemplace los placeholder values con su propia información.

    JSON
    { "Version": "2012-10-17", "Statement": [ { "Sid": "allowReadFromStream", "Effect": "Allow", "Action": [ "kinesis:DescribeStream", "kinesis:DescribeStreamConsumer", "kinesis:DescribeStreamSummary", "kinesis:GetRecords", "kinesis:GetShardIterator", "kinesis:ListShards", "kinesis:ListStreams", "kinesis:ListStreamConsumers", "kinesis:RegisterStreamConsumer", "kinesis:SubscribeToShard" ], "Resource": [ "arn:aws:kinesis:us-east-1:111122223333:stream/stream-name" ] } ] }

    Si el cifrado del lado del servidor está habilitado en las transmisiones, la siguiente AWS KMS política permite descifrar los registros. Reemplace los placeholder values con su propia información.

    JSON
    { "Version": "2012-10-17", "Statement": [ { "Sid": "allowDecryptionOfCustomManagedKey", "Effect": "Allow", "Action": [ "kms:Decrypt", "kms:GenerateDataKey" ], "Resource": "arn:aws:kms:us-east-1:111122223333:key/key-id" } ] }

    Para que una canalización escriba datos en un dominio, el dominio debe tener una política de acceso a nivel de dominio que permita al rol de canalización sts_role_arn acceder a ellos.

    El siguiente ejemplo es una política de acceso al dominio que permite al rol de canalización creado en el paso anterior (pipeline-role) escribir datos en el dominio. ingestion-domain Reemplace los placeholder values con su propia información.

    { "Statement": [ { "Effect": "Allow", "Principal": { "AWS": "arn:aws:iam::your-account-id:role/pipeline-role" }, "Action": ["es:DescribeDomain", "es:ESHttp*"], "Resource": "arn:aws:es:Región de AWS:account-id:domain/domain-name/*" } ] }
  3. Creación de la canalización

    Configure una canalización de OpenSearch ingestión especificando K inesis-data-streams como fuente. Puede encontrar un plano listo para usar en la consola de OpenSearch ingestión para crear dicha canalización. (Opcional) Para crear la canalización mediante el AWS CLI, puede utilizar un plano denominado "». AWS-KinesisDataStreamsPipeline Reemplace los placeholder values con su propia información.

    version: "2" kinesis-pipeline: source: kinesis_data_streams: acknowledgments: true codec: # Based on whether kinesis records are aggregated or not, you could choose json, newline or ndjson codec for processing the records. # JSON codec supports parsing nested CloudWatch Events into individual log entries that will be written as documents into OpenSearch. # json: # key_name: "logEvents" # These keys contain the metadata sent by CloudWatch Subscription Filters # in addition to the individual log events: # include_keys: [ 'owner', 'logGroup', 'logStream' ] newline: streams: - stream_name: "stream name" # Enable this if ingestion should start from the start of the stream. # initial_position: "EARLIEST" # checkpoint_interval: "PT5M" # Compression will always be gzip for CloudWatch, but will vary for other sources: # compression: "gzip" - stream_name: "stream name" # Enable this if ingestion should start from the start of the stream. # initial_position: "EARLIEST" # checkpoint_interval: "PT5M" # Compression will always be gzip for CloudWatch, but will vary for other sources: # compression: "gzip" # buffer_timeout: "1s" # records_to_accumulate: 100 # Change the consumer strategy to "polling". Default consumer strategy will use enhanced "fan-out" supported by KDS. # consumer_strategy: "polling" # if consumer strategy is set to "polling", enable the polling config below. # polling: # max_polling_records: 100 # idle_time_between_reads: "250ms" aws: # Provide the Role ARN with access to Amazon Kinesis Data Streams. This role should have a trust relationship with osis-pipelines.amazonaws.com sts_role_arn: "arn:aws:iam::111122223333:role/Example-Role" # Provide the Región de AWS of the Data Stream. region: "us-east-1" sink: - opensearch: # Provide an Amazon OpenSearch Serverless domain endpoint hosts: [ "https://search-mydomain-1a2a3a4a5a6a7a8a9a0a9a8a7a.us-east-1.es.amazonaws.com" ] index: "index_${getMetadata(\"stream_name\")}" # Ensure adding unique document id as a combination of the metadata attributes available. document_id: "${getMetadata(\"partition_key\")}_${getMetadata(\"sequence_number\")}_${getMetadata(\"sub_sequence_number\")}" aws: # Provide a Role ARN with access to the domain. This role should have a trust relationship with osis-pipelines.amazonaws.com sts_role_arn: "arn:aws:iam::111122223333:role/Example-Role" # Provide the Región de AWS of the domain. region: "us-east-1" # Enable the 'serverless' flag if the sink is an Amazon OpenSearch Serverless collection serverless: false # serverless_options: # Specify a name here to create or update network policy for the serverless collection # network_policy_name: "network-policy-name" # Enable the 'distribution_version' setting if the OpenSearch Serverless domain is of version Elasticsearch 6.x # distribution_version: "es6" # Enable and switch the 'enable_request_compression' flag if the default compression setting is changed in the domain. See https://docs.aws.amazon.com/opensearch-service/latest/developerguide/gzip.html # enable_request_compression: true/false # Optional: Enable the S3 DLQ to capture any failed requests in an S3 bucket. Delete this entire block if you don't want a DLQ. dlq: s3: # Provide an S3 bucket bucket: "your-dlq-bucket-name" # Provide a key path prefix for the failed requests # key_path_prefix: "kinesis-pipeline/logs/dlq" # Provide the region of the bucket. region: "us-east-1" # Provide a Role ARN with access to the bucket. This role should have a trust relationship with osis-pipelines.amazonaws.com sts_role_arn: "arn:aws:iam::111122223333:role/Example-Role"
    Opciones de configuración

    Para ver las opciones de configuración de Kinesis, consulte las opciones de configuración en la OpenSearchdocumentación.

    Atributos de metadatos disponibles
    • stream_name: nombre de las transmisiones de datos de Kinesis desde las que se ingirió el registro

    • partition_key: clave de partición del registro de Kinesis Data Streams que se está ingiriendo

    • sequence_number: número de secuencia del registro de Kinesis Data Streams que se está ingiriendo

    • sub_sequence_number: número de subsecuencia del registro de Kinesis Data Streams que se está ingiriendo

  4. (Opcional) Configure las unidades de cómputo recomendadas (OCUs) para la canalización de Kinesis Data Streams

    También se puede configurar una canalización de origen de OpenSearch Kinesis Data Streams para ingerir registros de transmisión de más de una transmisión. Para una ingesta más rápida, le recomendamos que añada una unidad de cómputo adicional por cada nueva transmisión que se añada.

Coherencia de datos

OpenSearch La ingestión permite el end-to-end reconocimiento para garantizar la durabilidad de los datos. Cuando la canalización lee los registros de transmisión de Kinesis, distribuye dinámicamente el trabajo de lectura de los registros de transmisión en función de los fragmentos asociados a las transmisiones. Pipeline comprobará automáticamente las transmisiones cuando reciba un acuse de recibo tras haber ingerido todos los registros del dominio o la colección. OpenSearch Esto evitará el procesamiento duplicado de los registros de transmisión.

Para crear el índice en función del nombre del flujo, defina el índice en la sección del receptor de opensearch como «index_$ {getMetadata (\" stream_name\»)}».

Cuenta cruzada de Amazon Kinesis Data Streams como fuente

Puede conceder acceso a todas las cuentas con Amazon Kinesis Data Streams para OpenSearch que las canalizaciones de ingestión puedan acceder a Kinesis Data Streams de otra cuenta como fuente. Complete los siguientes pasos para habilitar el acceso entre cuentas:

Configuración del acceso entre cuentas
  1. Establezca la política de recursos en la cuenta que tiene la transmisión de Kinesis

    Reemplace los placeholder values con su propia información.

    JSON
    { "Version": "2012-10-17", "Statement": [ { "Sid": "StreamReadStatementID", "Effect": "Allow", "Principal": { "AWS": "arn:aws:iam::111122223333:role/Pipeline-Role" }, "Action": [ "kinesis:DescribeStreamSummary", "kinesis:GetRecords", "kinesis:GetShardIterator", "kinesis:ListShards" ], "Resource": "arn:aws:kinesis:us-east-1:444455556666:stream/stream-name" }, { "Sid": "StreamEFOReadStatementID", "Effect": "Allow", "Principal": { "AWS": "arn:aws:iam::111122223333:role/Pipeline-Role" }, "Action": [ "kinesis:DescribeStreamSummary", "kinesis:ListShards" ], "Resource": "arn:aws:kinesis:us-east-1:444455556666:stream/stream-name/consumer/consumer-name" } ] }
  2. (Opcional) Configure una política para consumidores y recursos para consumidores

    Este paso es opcional y solo será necesario si planeas utilizar la estrategia Enhanced Fanout Consumer para leer los registros de las transmisiones. Para obtener más información, consulta Cómo desarrollar consumidores con una distribución mejorada y un rendimiento específico.

    1. Configure el consumidor

      Para reutilizar un consumidor existente, puedes saltarte este paso. Para obtener más información, consulte la RegisterStreamConsumerreferencia de la API de Amazon Kinesis Data Streams.

      En el siguiente ejemplo de comando CLI, sustituya el placeholder values por su propia información.

      ejemplo Ejemplo de comando de la CLI:
      aws kinesis register-stream-consumer \ --stream-arn "arn:aws:kinesis:Región de AWS:account-id:stream/stream-name" \ --consumer-name consumer-name
    2. Configure la política de recursos para el consumidor

      En la siguiente declaración, placeholder values sustitúyala por su propia información.

      JSON
      { "Version": "2012-10-17", "Statement": [ { "Sid": "ConsumerEFOReadStatementID", "Effect": "Allow", "Principal": { "AWS": "arn:aws:iam::111122223333:role/Pipeline-Role" }, "Action": [ "kinesis:DescribeStreamConsumer", "kinesis:SubscribeToShard" ], "Resource": "arn:aws:kinesis:us-east-1:444455556666:stream/stream-1/consumer/consumer-name" } ] }
  3. Configuración de canalización

    Para la ingesta entre cuentas, añada los siguientes atributos kinesis_data_streams para cada transmisión:

    • stream_arn- el arn de la transmisión que pertenece a la cuenta en la que existe la transmisión

    • consumer_arn- se trata de un atributo opcional y debe especificarse si se opta por la estrategia de consumo de fanout mejorada por defecto. Especifique el nombre real del consumidor para este campo. Reemplace los placeholder values con su propia información.

    version: "2" kinesis-pipeline: source: kinesis_data_streams: acknowledgments: true codec: newline: streams: - stream_arn: "arn:aws:kinesis:region:stream-account-id:stream/stream-name" consumer_arn: "consumer arn" # Enable this if ingestion should start from the start of the stream. # initial_position: "EARLIEST" # checkpoint_interval: "PT5M" - stream_arn: "arn:aws:kinesis:region:stream-account-id:stream/stream-name" consumer_arn: "consumer arn" # initial_position: "EARLIEST" # buffer_timeout: "1s" # records_to_accumulate: 100 # Enable the consumer strategy to "polling". Default consumer strategy will use enhanced "fan-out" supported by KDS. # consumer_strategy: "polling" # if consumer strategy is set to "polling", enable the polling config below. # polling: # max_polling_records: 100 # idle_time_between_reads: "250ms" aws: # Provide the Role ARN with access to Kinesis. This role should have a trust relationship with osis-pipelines.amazonaws.com sts_role_arn: "arn:aws:iam::111122223333:role/Example-Role" # Provide the Región de AWS of the domain. region: "us-east-1" sink: - opensearch: # Provide an OpenSearch Serverless domain endpoint hosts: [ "https://search-mydomain-1a2a3a4a5a6a7a8a9a0a9a8a7a.us-east-1.es.amazonaws.com" ] index: "index_${getMetadata(\"stream_name\")}" # Mapping for documentid based on partition key, shard sequence number and subsequence number metadata attributes document_id: "${getMetadata(\"partition_key\")}_${getMetadata(\"sequence_number\")}_${getMetadata(\"sub_sequence_number\")}" aws: # Provide a Role ARN with access to the domain. This role should have a trust relationship with osis-pipelines.amazonaws.com sts_role_arn: "arn:aws:iam::111122223333:role/Example-Role" # Provide the Región de AWS of the domain. region: "us-east-1" # Enable the 'serverless' flag if the sink is an OpenSearch Serverless collection serverless: false # serverless_options: # Specify a name here to create or update network policy for the serverless collection # network_policy_name: network-policy-name # Enable the 'distribution_version' setting if the OpenSearch Serverless domain is of version Elasticsearch 6.x # distribution_version: "es6" # Enable and switch the 'enable_request_compression' flag if the default compression setting is changed in the domain. See https://docs.aws.amazon.com/opensearch-service/latest/developerguide/gzip.html # enable_request_compression: true/false # Optional: Enable the S3 DLQ to capture any failed requests in an S3 bucket. Delete this entire block if you don't want a DLQ. dlq: s3: # Provide an Amazon S3 bucket bucket: "your-dlq-bucket-name" # Provide a key path prefix for the failed requests # key_path_prefix: "alb-access-log-pipeline/logs/dlq" # Provide the Región de AWS of the bucket. region: "us-east-1" # Provide a Role ARN with access to the bucket. This role should have a trust relationship with osis-pipelines.amazonaws.com sts_role_arn: "arn:aws:iam::111122223333:role/Example-Role"
  4. Función de OSI Pipeline: Kinesis Data Streams
    1. Política de IAM

      Añada la siguiente política a la función de canalización. Reemplace los placeholder values con su propia información.

      JSON
      { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kinesis:DescribeStreamConsumer", "kinesis:SubscribeToShard" ], "Resource": [ "arn:aws:kinesis:us-east-1:111122223333:stream/my-stream" ] }, { "Sid": "allowReadFromStream", "Effect": "Allow", "Action": [ "kinesis:DescribeStream", "kinesis:DescribeStreamSummary", "kinesis:GetRecords", "kinesis:GetShardIterator", "kinesis:ListShards", "kinesis:ListStreams", "kinesis:ListStreamConsumers", "kinesis:RegisterStreamConsumer" ], "Resource": [ "arn:aws:kinesis:us-east-1:111122223333:stream/my-stream" ] } ] }
    2. Política de confianza

      Para poder ingerir datos de la cuenta de transmisión, tendrás que establecer una relación de confianza entre la función de ingesta de canalización y la cuenta de transmisión. Agrega lo siguiente a la función de canalización. Reemplace los placeholder values con su propia información.

      JSON
      { "Version": "2012-10-17", "Statement": [{ "Effect": "Allow", "Principal": { "AWS": "arn:aws:iam::111122223333:root" }, "Action": "sts:AssumeRole" }] }