Use um pipeline OpenSearch de ingestão com o Amazon Kinesis Data Streams - OpenSearch Serviço Amazon

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Use um pipeline OpenSearch de ingestão com o Amazon Kinesis Data Streams

Use um pipeline OpenSearch de ingestão com o Amazon Kinesis Data Streams para ingerir dados de registros de stream de vários streams em domínios e coleções do Amazon Service. OpenSearch O pipeline OpenSearch de ingestão incorpora a infraestrutura de ingestão de streaming para fornecer uma forma de alta escala e baixa latência de ingerir continuamente registros de streaming do Kinesis.

Amazon Kinesis Data Streams como fonte

Com o procedimento a seguir, você aprenderá a configurar um pipeline de OpenSearch ingestão que usa o Amazon Kinesis Data Streams como fonte de dados. Esta seção aborda os pré-requisitos necessários, como criar um domínio de OpenSearch serviço ou uma coleção OpenSearch sem servidor e percorrer as etapas para configurar a função do pipeline e criar o pipeline.

Pré-requisitos

Para configurar seu pipeline, você precisa de um ou mais Kinesis Data Streams ativos. Esses fluxos devem estar recebendo registros ou prontos para receber registros de outras fontes. Para obter mais informações, consulte Visão geral da OpenSearch ingestão.

Para configurar seu funil
  1. Crie um domínio OpenSearch de serviço ou uma coleção OpenSearch sem servidor

    Para criar um domínio ou uma coleção, consulte Introdução à OpenSearch ingestão.

    Para criar uma função do IAM com as permissões corretas para acessar dados de gravação na coleção ou no domínio, consulte Políticas baseadas em recursos.

  2. Configure a função do pipeline com permissões

    Configure a função do pipeline que você deseja usar na configuração do pipeline e adicione as seguintes permissões a ela. Substitua os placeholder values por suas próprias informações.

    JSON
    { "Version": "2012-10-17", "Statement": [ { "Sid": "allowReadFromStream", "Effect": "Allow", "Action": [ "kinesis:DescribeStream", "kinesis:DescribeStreamConsumer", "kinesis:DescribeStreamSummary", "kinesis:GetRecords", "kinesis:GetShardIterator", "kinesis:ListShards", "kinesis:ListStreams", "kinesis:ListStreamConsumers", "kinesis:RegisterStreamConsumer", "kinesis:SubscribeToShard" ], "Resource": [ "arn:aws:kinesis:us-east-1:111122223333:stream/stream-name" ] } ] }

    Se a criptografia do lado do servidor estiver ativada nos fluxos, a AWS KMS política a seguir permitirá descriptografar os registros. Substitua os placeholder values por suas próprias informações.

    JSON
    { "Version": "2012-10-17", "Statement": [ { "Sid": "allowDecryptionOfCustomManagedKey", "Effect": "Allow", "Action": [ "kms:Decrypt", "kms:GenerateDataKey" ], "Resource": "arn:aws:kms:us-east-1:111122223333:key/key-id" } ] }

    Para que um pipeline grave dados em um domínio, o domínio deve ter uma política de acesso em nível de domínio que permita que a função de pipeline sts_role_arn o acesse.

    O exemplo a seguir é uma política de acesso ao domínio que permite que a função de pipeline criada na etapa anterior (pipeline-role) grave dados no ingestion-domain domínio. Substitua os placeholder values por suas próprias informações.

    { "Statement": [ { "Effect": "Allow", "Principal": { "AWS": "arn:aws:iam::your-account-id:role/pipeline-role" }, "Action": ["es:DescribeDomain", "es:ESHttp*"], "Resource": "arn:aws:es:Região da AWS:account-id:domain/domain-name/*" } ] }
  3. Criar o pipeline

    Configure um pipeline OpenSearch de ingestão especificando K inesis-data-streams como a origem. Você pode localizar um blueprint pronto disponível no Console OpenSearch de ingestão para criar esse pipeline. (Opcional) Para criar o pipeline usando o AWS CLI, você pode usar um blueprint chamado "AWS-KinesisDataStreamsPipeline”. Substitua os placeholder values por suas próprias informações.

    version: "2" kinesis-pipeline: source: kinesis_data_streams: acknowledgments: true codec: # Based on whether kinesis records are aggregated or not, you could choose json, newline or ndjson codec for processing the records. # JSON codec supports parsing nested CloudWatch Events into individual log entries that will be written as documents into OpenSearch. # json: # key_name: "logEvents" # These keys contain the metadata sent by CloudWatch Subscription Filters # in addition to the individual log events: # include_keys: [ 'owner', 'logGroup', 'logStream' ] newline: streams: - stream_name: "stream name" # Enable this if ingestion should start from the start of the stream. # initial_position: "EARLIEST" # checkpoint_interval: "PT5M" # Compression will always be gzip for CloudWatch, but will vary for other sources: # compression: "gzip" - stream_name: "stream name" # Enable this if ingestion should start from the start of the stream. # initial_position: "EARLIEST" # checkpoint_interval: "PT5M" # Compression will always be gzip for CloudWatch, but will vary for other sources: # compression: "gzip" # buffer_timeout: "1s" # records_to_accumulate: 100 # Change the consumer strategy to "polling". Default consumer strategy will use enhanced "fan-out" supported by KDS. # consumer_strategy: "polling" # if consumer strategy is set to "polling", enable the polling config below. # polling: # max_polling_records: 100 # idle_time_between_reads: "250ms" aws: # Provide the Role ARN with access to Amazon Kinesis Data Streams. This role should have a trust relationship with osis-pipelines.amazonaws.com sts_role_arn: "arn:aws:iam::111122223333:role/Example-Role" # Provide the Região da AWS of the Data Stream. region: "us-east-1" sink: - opensearch: # Provide an Amazon OpenSearch Serverless domain endpoint hosts: [ "https://search-mydomain-1a2a3a4a5a6a7a8a9a0a9a8a7a.us-east-1.es.amazonaws.com" ] index: "index_${getMetadata(\"stream_name\")}" # Ensure adding unique document id as a combination of the metadata attributes available. document_id: "${getMetadata(\"partition_key\")}_${getMetadata(\"sequence_number\")}_${getMetadata(\"sub_sequence_number\")}" aws: # Provide a Role ARN with access to the domain. This role should have a trust relationship with osis-pipelines.amazonaws.com sts_role_arn: "arn:aws:iam::111122223333:role/Example-Role" # Provide the Região da AWS of the domain. region: "us-east-1" # Enable the 'serverless' flag if the sink is an Amazon OpenSearch Serverless collection serverless: false # serverless_options: # Specify a name here to create or update network policy for the serverless collection # network_policy_name: "network-policy-name" # Enable the 'distribution_version' setting if the OpenSearch Serverless domain is of version Elasticsearch 6.x # distribution_version: "es6" # Enable and switch the 'enable_request_compression' flag if the default compression setting is changed in the domain. See https://docs.aws.amazon.com/opensearch-service/latest/developerguide/gzip.html # enable_request_compression: true/false # Optional: Enable the S3 DLQ to capture any failed requests in an S3 bucket. Delete this entire block if you don't want a DLQ. dlq: s3: # Provide an S3 bucket bucket: "your-dlq-bucket-name" # Provide a key path prefix for the failed requests # key_path_prefix: "kinesis-pipeline/logs/dlq" # Provide the region of the bucket. region: "us-east-1" # Provide a Role ARN with access to the bucket. This role should have a trust relationship with osis-pipelines.amazonaws.com sts_role_arn: "arn:aws:iam::111122223333:role/Example-Role"
    Opções de configuração

    Para ver as opções de configuração do Kinesis, consulte Opções de configuração na OpenSearchdocumentação.

    Atributos de metadados disponíveis
    • stream_name — Nome do Kinesis Data Streams de onde o registro foi ingerido

    • partition_key — Chave de partição do registro do Kinesis Data Streams que está sendo ingerido

    • sequence_number — Número de sequência do registro do Kinesis Data Streams que está sendo ingerido

    • sub_sequence_number — Número da subsequência do registro do Kinesis Data Streams que está sendo ingerido

  4. (Opcional) Configure as unidades de computação recomendadas (OCUs) para o pipeline do Kinesis Data Streams

    Um pipeline de origem do OpenSearch Kinesis Data Streams também pode ser configurado para ingerir registros de stream de mais de um stream. Para uma ingestão mais rápida, recomendamos que você adicione uma unidade computacional adicional a cada novo stream adicionado.

Consistência de dados

OpenSearch A ingestão suporta o end-to-end reconhecimento para garantir a durabilidade dos dados. Quando o pipeline lê registros de stream do Kinesis, ele distribui dinamicamente o trabalho de leitura de registros de stream com base nos fragmentos associados aos streams. O Pipeline verificará automaticamente os fluxos quando receber uma confirmação após ingerir todos os registros no domínio ou na coleção. OpenSearch Isso evitará o processamento duplicado dos registros do stream.

Para criar o índice com base no nome do stream, defina-o na seção opensearch sink como “index_$ {getMetadata (\" stream_name\”)}”.

Conta cruzada do Amazon Kinesis Data Streams como fonte

Você pode conceder acesso a várias contas com o Amazon Kinesis Data Streams OpenSearch para que os pipelines de ingestão possam acessar o Kinesis Data Streams em outra conta como fonte. Conclua as etapas a seguir para ativar o acesso entre contas:

Configure o acesso entre contas
  1. Defina a política de recursos na conta que tem o stream do Kinesis

    Substitua os placeholder values por suas próprias informações.

    JSON
    { "Version": "2012-10-17", "Statement": [ { "Sid": "StreamReadStatementID", "Effect": "Allow", "Principal": { "AWS": "arn:aws:iam::111122223333:role/Pipeline-Role" }, "Action": [ "kinesis:DescribeStreamSummary", "kinesis:GetRecords", "kinesis:GetShardIterator", "kinesis:ListShards" ], "Resource": "arn:aws:kinesis:us-east-1:444455556666:stream/stream-name" }, { "Sid": "StreamEFOReadStatementID", "Effect": "Allow", "Principal": { "AWS": "arn:aws:iam::111122223333:role/Pipeline-Role" }, "Action": [ "kinesis:DescribeStreamSummary", "kinesis:ListShards" ], "Resource": "arn:aws:kinesis:us-east-1:444455556666:stream/stream-name/consumer/consumer-name" } ] }
  2. (Opcional) Configurar a Política de Recursos do Consumidor e do Consumidor

    Essa é uma etapa opcional e só será necessária se você planeja usar a estratégia Enhanced Fanout Consumer para ler registros de stream. Para obter mais informações, consulte Desenvolver consumidores avançados com taxa de transferência dedicada.

    1. Configurar consumidor

      Para reutilizar um consumidor existente, você pode pular essa etapa. Para obter mais informações, consulte a RegisterStreamConsumerReferência da API do Amazon Kinesis Data Streams.

      No exemplo de comando CLI a seguir, substitua o por suas placeholder values próprias informações.

      exemplo Exemplo de comando da CLI:
      aws kinesis register-stream-consumer \ --stream-arn "arn:aws:kinesis:Região da AWS:account-id:stream/stream-name" \ --consumer-name consumer-name
    2. Configurar a política de recursos do consumidor

      Na declaração a seguir, placeholder values substitua o por suas próprias informações.

      JSON
      { "Version": "2012-10-17", "Statement": [ { "Sid": "ConsumerEFOReadStatementID", "Effect": "Allow", "Principal": { "AWS": "arn:aws:iam::111122223333:role/Pipeline-Role" }, "Action": [ "kinesis:DescribeStreamConsumer", "kinesis:SubscribeToShard" ], "Resource": "arn:aws:kinesis:us-east-1:444455556666:stream/stream-1/consumer/consumer-name" } ] }
  3. Configuração do pipeline

    Para ingestão entre contas, adicione os seguintes atributos abaixo kinesis_data_streams para cada stream:

    • stream_arn- o arn do fluxo pertencente à conta em que o fluxo existe

    • consumer_arn- esse é um atributo opcional e deve ser especificado se a estratégia padrão do consumidor de fanout aprimorada for escolhida. Especifique o valor real do consumidor para esse campo. Substitua os placeholder values por suas próprias informações.

    version: "2" kinesis-pipeline: source: kinesis_data_streams: acknowledgments: true codec: newline: streams: - stream_arn: "arn:aws:kinesis:region:stream-account-id:stream/stream-name" consumer_arn: "consumer arn" # Enable this if ingestion should start from the start of the stream. # initial_position: "EARLIEST" # checkpoint_interval: "PT5M" - stream_arn: "arn:aws:kinesis:region:stream-account-id:stream/stream-name" consumer_arn: "consumer arn" # initial_position: "EARLIEST" # buffer_timeout: "1s" # records_to_accumulate: 100 # Enable the consumer strategy to "polling". Default consumer strategy will use enhanced "fan-out" supported by KDS. # consumer_strategy: "polling" # if consumer strategy is set to "polling", enable the polling config below. # polling: # max_polling_records: 100 # idle_time_between_reads: "250ms" aws: # Provide the Role ARN with access to Kinesis. This role should have a trust relationship with osis-pipelines.amazonaws.com sts_role_arn: "arn:aws:iam::111122223333:role/Example-Role" # Provide the Região da AWS of the domain. region: "us-east-1" sink: - opensearch: # Provide an OpenSearch Serverless domain endpoint hosts: [ "https://search-mydomain-1a2a3a4a5a6a7a8a9a0a9a8a7a.us-east-1.es.amazonaws.com" ] index: "index_${getMetadata(\"stream_name\")}" # Mapping for documentid based on partition key, shard sequence number and subsequence number metadata attributes document_id: "${getMetadata(\"partition_key\")}_${getMetadata(\"sequence_number\")}_${getMetadata(\"sub_sequence_number\")}" aws: # Provide a Role ARN with access to the domain. This role should have a trust relationship with osis-pipelines.amazonaws.com sts_role_arn: "arn:aws:iam::111122223333:role/Example-Role" # Provide the Região da AWS of the domain. region: "us-east-1" # Enable the 'serverless' flag if the sink is an OpenSearch Serverless collection serverless: false # serverless_options: # Specify a name here to create or update network policy for the serverless collection # network_policy_name: network-policy-name # Enable the 'distribution_version' setting if the OpenSearch Serverless domain is of version Elasticsearch 6.x # distribution_version: "es6" # Enable and switch the 'enable_request_compression' flag if the default compression setting is changed in the domain. See https://docs.aws.amazon.com/opensearch-service/latest/developerguide/gzip.html # enable_request_compression: true/false # Optional: Enable the S3 DLQ to capture any failed requests in an S3 bucket. Delete this entire block if you don't want a DLQ. dlq: s3: # Provide an Amazon S3 bucket bucket: "your-dlq-bucket-name" # Provide a key path prefix for the failed requests # key_path_prefix: "alb-access-log-pipeline/logs/dlq" # Provide the Região da AWS of the bucket. region: "us-east-1" # Provide a Role ARN with access to the bucket. This role should have a trust relationship with osis-pipelines.amazonaws.com sts_role_arn: "arn:aws:iam::111122223333:role/Example-Role"
  4. Função do pipeline OSI | Kinesis Data Streams
    1. Política do IAM

      Adicione a política a seguir à função do pipeline. Substitua os placeholder values por suas próprias informações.

      JSON
      { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kinesis:DescribeStreamConsumer", "kinesis:SubscribeToShard" ], "Resource": [ "arn:aws:kinesis:us-east-1:111122223333:stream/my-stream" ] }, { "Sid": "allowReadFromStream", "Effect": "Allow", "Action": [ "kinesis:DescribeStream", "kinesis:DescribeStreamSummary", "kinesis:GetRecords", "kinesis:GetShardIterator", "kinesis:ListShards", "kinesis:ListStreams", "kinesis:ListStreamConsumers", "kinesis:RegisterStreamConsumer" ], "Resource": [ "arn:aws:kinesis:us-east-1:111122223333:stream/my-stream" ] } ] }
    2. Política de confiança

      Para ingerir dados da conta do stream, você precisará estabelecer uma relação de confiança entre a função de ingestão do pipeline e a conta do stream. Adicione o seguinte à função do pipeline. Substitua os placeholder values por suas próprias informações.

      JSON
      { "Version": "2012-10-17", "Statement": [{ "Effect": "Allow", "Principal": { "AWS": "arn:aws:iam::111122223333:root" }, "Action": "sts:AssumeRole" }] }