As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.
Use um pipeline OpenSearch de ingestão com o Amazon Kinesis Data Streams
Use um pipeline OpenSearch de ingestão com o Amazon Kinesis Data Streams para ingerir dados de registros de stream de vários streams em domínios e coleções do Amazon Service. OpenSearch O pipeline OpenSearch de ingestão incorpora a infraestrutura de ingestão de streaming para fornecer uma forma de alta escala e baixa latência de ingerir continuamente registros de streaming do Kinesis.
Tópicos
Amazon Kinesis Data Streams como fonte
Com o procedimento a seguir, você aprenderá a configurar um pipeline de OpenSearch ingestão que usa o Amazon Kinesis Data Streams como fonte de dados. Esta seção aborda os pré-requisitos necessários, como criar um domínio de OpenSearch serviço ou uma coleção OpenSearch sem servidor e percorrer as etapas para configurar a função do pipeline e criar o pipeline.
Pré-requisitos
Para configurar seu pipeline, você precisa de um ou mais Kinesis Data Streams ativos. Esses fluxos devem estar recebendo registros ou prontos para receber registros de outras fontes. Para obter mais informações, consulte Visão geral da OpenSearch ingestão.
Para configurar seu funil
-
Crie um domínio OpenSearch de serviço ou uma coleção OpenSearch sem servidor
Para criar um domínio ou uma coleção, consulte Introdução à OpenSearch ingestão.
Para criar uma função do IAM com as permissões corretas para acessar dados de gravação na coleção ou no domínio, consulte Políticas baseadas em recursos.
-
Configure a função do pipeline com permissões
Configure a função do pipeline que você deseja usar na configuração do pipeline e adicione as seguintes permissões a ela. Substitua os
placeholder values
por suas próprias informações.Se a criptografia do lado do servidor estiver ativada nos fluxos, a AWS KMS política a seguir permitirá descriptografar os registros. Substitua os
placeholder values
por suas próprias informações.Para que um pipeline grave dados em um domínio, o domínio deve ter uma política de acesso em nível de domínio que permita que a função de pipeline sts_role_arn o acesse.
O exemplo a seguir é uma política de acesso ao domínio que permite que a função de pipeline criada na etapa anterior (
pipeline-role
) grave dados noingestion-domain
domínio. Substitua osplaceholder values
por suas próprias informações.{ "Statement": [ { "Effect": "Allow", "Principal": { "AWS": "arn:aws:iam::
your-account-id
:role/pipeline-role
" }, "Action": ["es:DescribeDomain", "es:ESHttp*"], "Resource": "arn:aws:es:Região da AWS
:account-id
:domain/domain-name
/*" } ] } -
Criar o pipeline
Configure um pipeline OpenSearch de ingestão especificando K inesis-data-streams como a origem. Você pode localizar um blueprint pronto disponível no Console OpenSearch de ingestão para criar esse pipeline. (Opcional) Para criar o pipeline usando o AWS CLI, você pode usar um blueprint chamado "
AWS-KinesisDataStreamsPipeline
”. Substitua osplaceholder values
por suas próprias informações.version: "2" kinesis-pipeline: source: kinesis_data_streams: acknowledgments: true codec: # Based on whether kinesis records are aggregated or not, you could choose json, newline or ndjson codec for processing the records. # JSON codec supports parsing nested CloudWatch Events into individual log entries that will be written as documents into OpenSearch. # json: # key_name: "logEvents" # These keys contain the metadata sent by CloudWatch Subscription Filters # in addition to the individual log events: # include_keys: [ 'owner', 'logGroup', 'logStream' ] newline: streams: - stream_name: "
stream name
" # Enable this if ingestion should start from the start of the stream. # initial_position: "EARLIEST" # checkpoint_interval: "PT5M" # Compression will always be gzip for CloudWatch, but will vary for other sources: # compression: "gzip" - stream_name: "stream name
" # Enable this if ingestion should start from the start of the stream. # initial_position: "EARLIEST" # checkpoint_interval: "PT5M" # Compression will always be gzip for CloudWatch, but will vary for other sources: # compression: "gzip" # buffer_timeout: "1s" # records_to_accumulate: 100 # Change the consumer strategy to "polling". Default consumer strategy will use enhanced "fan-out" supported by KDS. # consumer_strategy: "polling" # if consumer strategy is set to "polling", enable the polling config below. # polling: # max_polling_records: 100 # idle_time_between_reads: "250ms" aws: # Provide the Role ARN with access to Amazon Kinesis Data Streams. This role should have a trust relationship with osis-pipelines.amazonaws.com sts_role_arn: "arn:aws:iam::111122223333:role/Example-Role
" # Provide the Região da AWS of the Data Stream. region: "us-east-1
" sink: - opensearch: # Provide an Amazon OpenSearch Serverless domain endpoint hosts: [ "https://search-mydomain-1a2a3a4a5a6a7a8a9a0a9a8a7a.us-east-1.es.amazonaws.com
" ] index: "index_${getMetadata(\"stream_name\")}" # Ensure adding unique document id as a combination of the metadata attributes available. document_id: "${getMetadata(\"partition_key\")}_${getMetadata(\"sequence_number\")}_${getMetadata(\"sub_sequence_number\")}" aws: # Provide a Role ARN with access to the domain. This role should have a trust relationship with osis-pipelines.amazonaws.com sts_role_arn: "arn:aws:iam::111122223333:role/Example-Role
" # Provide the Região da AWS of the domain. region: "us-east-1
" # Enable the 'serverless' flag if the sink is an Amazon OpenSearch Serverless collection serverless: false # serverless_options: # Specify a name here to create or update network policy for the serverless collection # network_policy_name: "network-policy-name" # Enable the 'distribution_version' setting if the OpenSearch Serverless domain is of version Elasticsearch 6.x # distribution_version: "es6" # Enable and switch the 'enable_request_compression' flag if the default compression setting is changed in the domain. See https://docs.aws.amazon.com/opensearch-service/latest/developerguide/gzip.html # enable_request_compression: true/false # Optional: Enable the S3 DLQ to capture any failed requests in an S3 bucket. Delete this entire block if you don't want a DLQ. dlq: s3: # Provide an S3 bucket bucket: "your-dlq-bucket-name
" # Provide a key path prefix for the failed requests # key_path_prefix: "kinesis-pipeline/logs/dlq" # Provide the region of the bucket. region: "us-east-1
" # Provide a Role ARN with access to the bucket. This role should have a trust relationship with osis-pipelines.amazonaws.com sts_role_arn: "arn:aws:iam::111122223333:role/Example-Role
"Opções de configuração
Para ver as opções de configuração do Kinesis, consulte Opções de configuração
na OpenSearchdocumentação. Atributos de metadados disponíveis
-
stream_name — Nome do Kinesis Data Streams de onde o registro foi ingerido
-
partition_key — Chave de partição do registro do Kinesis Data Streams que está sendo ingerido
-
sequence_number — Número de sequência do registro do Kinesis Data Streams que está sendo ingerido
-
sub_sequence_number — Número da subsequência do registro do Kinesis Data Streams que está sendo ingerido
-
-
(Opcional) Configure as unidades de computação recomendadas (OCUs) para o pipeline do Kinesis Data Streams
Um pipeline de origem do OpenSearch Kinesis Data Streams também pode ser configurado para ingerir registros de stream de mais de um stream. Para uma ingestão mais rápida, recomendamos que você adicione uma unidade computacional adicional a cada novo stream adicionado.
Consistência de dados
OpenSearch A ingestão suporta o end-to-end reconhecimento para garantir a durabilidade dos dados. Quando o pipeline lê registros de stream do Kinesis, ele distribui dinamicamente o trabalho de leitura de registros de stream com base nos fragmentos associados aos streams. O Pipeline verificará automaticamente os fluxos quando receber uma confirmação após ingerir todos os registros no domínio ou na coleção. OpenSearch Isso evitará o processamento duplicado dos registros do stream.
Para criar o índice com base no nome do stream, defina-o na seção opensearch sink como “index_$ {getMetadata (\" stream_name\”)}”.
Conta cruzada do Amazon Kinesis Data Streams como fonte
Você pode conceder acesso a várias contas com o Amazon Kinesis Data Streams OpenSearch para que os pipelines de ingestão possam acessar o Kinesis Data Streams em outra conta como fonte. Conclua as etapas a seguir para ativar o acesso entre contas:
Configure o acesso entre contas
-
Defina a política de recursos na conta que tem o stream do Kinesis
Substitua os
placeholder values
por suas próprias informações. -
(Opcional) Configurar a Política de Recursos do Consumidor e do Consumidor
Essa é uma etapa opcional e só será necessária se você planeja usar a estratégia Enhanced Fanout Consumer para ler registros de stream. Para obter mais informações, consulte Desenvolver consumidores avançados com taxa de transferência dedicada.
-
Configurar consumidor
Para reutilizar um consumidor existente, você pode pular essa etapa. Para obter mais informações, consulte a RegisterStreamConsumerReferência da API do Amazon Kinesis Data Streams.
No exemplo de comando CLI a seguir, substitua o por suas
placeholder values
próprias informações.exemplo Exemplo de comando da CLI:
aws kinesis register-stream-consumer \ --stream-arn "arn:aws:kinesis:
Região da AWS
:account-id
:stream/stream-name
" \ --consumer-nameconsumer-name
-
Configurar a política de recursos do consumidor
Na declaração a seguir,
placeholder values
substitua o por suas próprias informações.
-
-
Configuração do pipeline
Para ingestão entre contas, adicione os seguintes atributos abaixo
kinesis_data_streams
para cada stream:-
stream_arn
- o arn do fluxo pertencente à conta em que o fluxo existe -
consumer_arn
- esse é um atributo opcional e deve ser especificado se a estratégia padrão do consumidor de fanout aprimorada for escolhida. Especifique o valor real do consumidor para esse campo. Substitua osplaceholder values
por suas próprias informações.
version: "2" kinesis-pipeline: source: kinesis_data_streams: acknowledgments: true codec: newline: streams: - stream_arn: "arn:aws:kinesis:
region
:stream-account-id
:stream/stream-name
" consumer_arn: "consumer arn
" # Enable this if ingestion should start from the start of the stream. # initial_position: "EARLIEST" # checkpoint_interval: "PT5M" - stream_arn: "arn:aws:kinesis:region
:stream-account-id
:stream/stream-name
" consumer_arn: "consumer arn
" # initial_position: "EARLIEST" # buffer_timeout: "1s" # records_to_accumulate: 100 # Enable the consumer strategy to "polling". Default consumer strategy will use enhanced "fan-out" supported by KDS. # consumer_strategy: "polling" # if consumer strategy is set to "polling", enable the polling config below. # polling: # max_polling_records: 100 # idle_time_between_reads: "250ms" aws: # Provide the Role ARN with access to Kinesis. This role should have a trust relationship with osis-pipelines.amazonaws.com sts_role_arn: "arn:aws:iam::111122223333
:role/Example-Role
" # Provide the Região da AWS of the domain. region: "us-east-1
" sink: - opensearch: # Provide an OpenSearch Serverless domain endpoint hosts: [ "https://search-mydomain-1a2a3a4a5a6a7a8a9a0a9a8a7a.us-east-1.es.amazonaws.com
" ] index: "index_${getMetadata(\"stream_name\")}" # Mapping for documentid based on partition key, shard sequence number and subsequence number metadata attributes document_id: "${getMetadata(\"partition_key\")}_${getMetadata(\"sequence_number\")}_${getMetadata(\"sub_sequence_number\")}" aws: # Provide a Role ARN with access to the domain. This role should have a trust relationship with osis-pipelines.amazonaws.com sts_role_arn: "arn:aws:iam::111122223333:role/Example-Role
" # Provide the Região da AWS of the domain. region: "us-east-1
" # Enable the 'serverless' flag if the sink is an OpenSearch Serverless collection serverless: false # serverless_options: # Specify a name here to create or update network policy for the serverless collection # network_policy_name:network-policy-name
# Enable the 'distribution_version' setting if the OpenSearch Serverless domain is of version Elasticsearch 6.x # distribution_version: "es6" # Enable and switch the 'enable_request_compression' flag if the default compression setting is changed in the domain. See https://docs.aws.amazon.com/opensearch-service/latest/developerguide/gzip.html # enable_request_compression: true/false # Optional: Enable the S3 DLQ to capture any failed requests in an S3 bucket. Delete this entire block if you don't want a DLQ. dlq: s3: # Provide an Amazon S3 bucket bucket: "your-dlq-bucket-name
" # Provide a key path prefix for the failed requests # key_path_prefix: "alb-access-log-pipeline/logs/dlq
" # Provide the Região da AWS of the bucket. region: "us-east-1
" # Provide a Role ARN with access to the bucket. This role should have a trust relationship with osis-pipelines.amazonaws.com sts_role_arn: "arn:aws:iam::111122223333:role/Example-Role
" -
-
Função do pipeline OSI | Kinesis Data Streams
-
Política do IAM
Adicione a política a seguir à função do pipeline. Substitua os
placeholder values
por suas próprias informações. -
Política de confiança
Para ingerir dados da conta do stream, você precisará estabelecer uma relação de confiança entre a função de ingestão do pipeline e a conta do stream. Adicione o seguinte à função do pipeline. Substitua os
placeholder values
por suas próprias informações.
-