Visão geral dos recursos do pipeline no Amazon OpenSearch Ingestion - OpenSearch Serviço Amazon

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Visão geral dos recursos do pipeline no Amazon OpenSearch Ingestion

O Amazon OpenSearch Ingestion provisiona pipelines, que consistem em uma fonte, um buffer, zero ou mais processadores e um ou mais coletores. Os pipelines de ingestão são alimentados pelo Data Prepper como mecanismo de dados. Para obter uma visão geral de vários componentes de um pipeline, consulte Principais conceitos.

As seções a seguir fornecem uma visão geral de alguns dos recursos mais usados no Amazon OpenSearch Ingestion.

nota

Esta não é uma lista completa de atributos disponíveis para pipelines. Para obter uma documentação abrangente de todas as funcionalidades disponíveis do pipeline, consulte a documentação do Data Prepper. Observe que o OpenSearch Inestion impõe algumas restrições aos plug-ins e às opções que você pode usar. Para ter mais informações, consulte Plugins e opções compatíveis para pipelines OpenSearch de ingestão da Amazon.

Armazenamento em buffer persistente

Um buffer persistente armazena seus dados em um buffer baseado em disco em várias zonas de disponibilidade para adicionar durabilidade aos seus dados. Você pode usar o buffer persistente para ingerir dados de todas as fontes baseadas em push suportadas sem a necessidade de configurar um buffer independente. Isso inclui HTTP e OpenTelemetry fontes para registros, rastreamentos e métricas.

Para ativar o buffer persistente, escolha Ativar buffer persistente ao criar ou atualizar um pipeline. Para obter mais informações, consulteCriação de pipelines OpenSearch de ingestão da Amazon. OpenSearch A ingestão determina automaticamente a capacidade de armazenamento em buffer necessária com base nas unidades de OpenSearch processamento de ingestão (OCUs de ingestão) que você especifica para o pipeline.

Por padrão, os pipelines usam an Chave pertencente à AWS para criptografar dados do buffer. Esses pipelines não precisam de nenhuma permissão adicional para a função do pipeline. Como alternativa, você pode especificar uma chave gerenciada pelo cliente e adicionar as seguintes permissões do IAM à função do pipeline:

{ "Version": "2012-10-17", "Statement": [ { "Sid": "KeyAccess", "Effect": "Allow", "Action": [ "kms:Decrypt", "kms:GenerateDataKeyWithoutPlaintext" ], "Resource": "arn:aws:kms:{region}:{aws-account-id}:key/1234abcd-12ab-34cd-56ef-1234567890ab" } ] }

Para obter mais informações, consulte Chaves mestras do cliente (CMKs) no AWS Key Management Service Guia do desenvolvedor.

nota

Se desabilitar o armazenamento em buffer persistente, seu pipeline será atualizado para ser executado inteiramente no armazenamento em buffer na memória.

Ajustando o tamanho máximo da carga útil da solicitação

Se você ativar o buffer persistente para um pipeline, o tamanho máximo da carga útil da solicitação será padronizado para 1 MB. O valor padrão oferece o melhor desempenho. No entanto, você pode aumentar esse valor se seus clientes enviarem solicitações que excedam 1 MB. Para ajustar o tamanho máximo da carga útil, defina a max_request_length opção na configuração de origem. Assim como o buffer persistente, essa opção só é compatível com HTTP e OpenTelemetry fontes para registros, rastreamentos e métricas.

Os únicos valores válidos para a max_request_length opção são 1 MB, 1,5 MB, 2 MB, 2,5 MB, 3 MB, 3,5 MB e 4 MB. Se você especificar um valor diferente, receberá um erro.

O exemplo a seguir demonstra como configurar o tamanho máximo da carga útil em uma configuração de pipeline:

... log-pipeline: source: http: path: "/${pipelineName}/logs" max_request_length: 4mb processor: ...

Se você não habilitar o buffer persistente para um pipeline, o valor padrão da max_request_length opção será 10 MB para todas as fontes e não poderá ser modificado.

Dividindo

Você pode configurar um pipeline de OpenSearch ingestão para dividir os eventos recebidos em um subpipeline, permitindo que você execute diferentes tipos de processamento no mesmo evento de entrada.

O exemplo de pipeline a seguir divide os eventos recebidos em dois subpipelines. Cada subpipeline usa seu próprio processador para enriquecer e manipular os dados e, em seguida, envia os dados para índices diferentes. OpenSearch

version: "2" log-pipeline: source: http: ... sink: - pipeline: name: "logs_enriched_one_pipeline" - pipeline: name: "logs_enriched_two_pipeline" logs_enriched_one_pipeline: source: log-pipeline processor: ... sink: - opensearch: # Provide a domain or collection endpoint # Enable the 'serverless' flag if the sink is an OpenSearch Serverless collection aws: ... index: "enriched_one_logs" logs_enriched_two_pipeline: source: log-pipeline processor: ... sink: - opensearch: # Provide a domain or collection endpoint # Enable the 'serverless' flag if the sink is an OpenSearch Serverless collection aws: ... index: "enriched_two_logs"

Encadeamento

Você pode encadear vários subpipelines para realizar o processamento e o enriquecimento de dados em partes. Em outras palavras, você pode enriquecer um evento de entrada com determinados recursos de processamento em um subpipeline, enviá-lo para outro subpipeline para enriquecimento adicional com um processador diferente e, finalmente, enviá-lo para o coletor. OpenSearch

No exemplo a seguir, o log_pipeline subpipeline enriquece um evento de log de entrada com um conjunto de processadores e, em seguida, envia o evento para um índice chamado. OpenSearch enriched_logs O pipeline envia o mesmo evento para o log_advanced_pipeline subpipeline, que o processa e o envia para um OpenSearch índice diferente chamadoenriched_advanced_logs.

version: "2" log-pipeline: source: http: ... processor: ... sink: - opensearch: # Provide a domain or collection endpoint # Enable the 'serverless' flag if the sink is an OpenSearch Serverless collection aws: ... index: "enriched_logs" - pipeline: name: "log_advanced_pipeline" log_advanced_pipeline: source: log-pipeline processor: ... sink: - opensearch: # Provide a domain or collection endpoint # Enable the 'serverless' flag if the sink is an OpenSearch Serverless collection aws: ... index: "enriched_advanced_logs"

Filas de mensagens mortas

As filas de mensagens não entregues (DLQs) são destinos para eventos que um pipeline não consegue gravar em um coletor. Em OpenSearch Ingestão, você deve especificar um bucket do Amazon S3 com permissões de gravação apropriadas para ser usado como DLQ. Você pode adicionar uma configuração de DLQ a cada coletor em um pipeline. Quando um pipeline encontra erros de gravação, ele cria objetos DLQ no bucket S3 configurado. Os objetos DLQ existem em um arquivo JSON como uma matriz de eventos com falha.

Um pipeline grava eventos na DLQ quando uma das condições a seguir é atendida:

  • Os max_retries quatro da OpenSearch pia estão esgotados. OpenSearch A ingestão requer um mínimo de 16 para essa opção.

  • Os eventos são rejeitados pelo coletor devido a uma condição de erro.

Configuração

Para configurar uma fila de mensagens não entregues para um subpipeline, especifique a opção dlq na configuração do coletor: opensearch

apache-log-pipeline: ... sink: opensearch: dlq: s3: bucket: "my-dlq-bucket" key_path_prefix: "dlq-files" region: "us-west-2" sts_role_arn: "arn:aws:iam::123456789012:role/dlq-role"

Os arquivos gravados nessa DLQ do S3 terão o seguinte padrão de nomenclatura:

dlq-v${version}-${pipelineName}-${pluginId}-${timestampIso8601}-${uniqueId}

Para obter mais informações, consulte Filas de mensagens não entregues (DLQ).

Para obter instruções sobre como configurar a função sts_role_arn, consulte Gravar em uma fila de mensagens não entregues.

Exemplo

Considere o seguinte exemplo de arquivo DLQ:

dlq-v2-apache-log-pipeline-opensearch-2023-04-05T15:26:19.152938Z-e7eb675a-f558-4048-8566-dac15a4f8343

Aqui está um exemplo de dados que não foram gravados no coletor e foram enviados ao bucket DLQ S3 para análise posterior:

Record_0 pluginId "opensearch" pluginName "opensearch" pipelineName "apache-log-pipeline" failedData index "logs" indexId null status 0 message "Number of retries reached the limit of max retries (configured value 15)" document log "sample log" timestamp "2023-04-14T10:36:01.070Z" Record_1 pluginId "opensearch" pluginName "opensearch" pipelineName "apache-log-pipeline" failedData index "logs" indexId null status 0 message "Number of retries reached the limit of max retries (configured value 15)" document log "another sample log" timestamp "2023-04-14T10:36:01.071Z"

Gerenciamento de índices

O Amazon OpenSearch Ingestion tem muitos recursos de gerenciamento de índices, incluindo os seguintes.

Criar índices

Você pode especificar um nome de índice em um coletor de pipeline e o OpenSearch Ingestion cria o índice ao provisionar o pipeline. Se um índice já existir, o pipeline o usará para indexar eventos recebidos. Se você parar e reiniciar um pipeline ou atualizar sua configuração YAML, o pipeline tentará criar novos índices, caso eles ainda não existam. Um pipeline nunca pode excluir um índice.

Os coletores de exemplo a seguir criam dois índices quando o pipeline é provisionado:

sink: - opensearch: index: apache_logs - opensearch: index: nginx_logs

Geração de nomes e padrões de índice

Você pode gerar nomes de índices dinâmicos usando variáveis dos campos de eventos recebidos. Na configuração do coletor, use o formato string${} para sinalizar a interpolação de strings e use um ponteiro JSON para extrair campos de eventos. As opções para index_type são custom oumanagement_disabled. Como o index_type padrão é custom para OpenSearch domínios e management_disabled coleções OpenSearch sem servidor, ele pode ser deixado sem definição.

Por exemplo, o pipeline a seguir seleciona o campo metadataType dos eventos recebidos para gerar nomes de índice.

pipeline: ... sink: opensearch: index: "metadata-${metadataType}"

A configuração a seguir continua gerando um novo índice a cada dia ou a cada hora.

pipeline: ... sink: opensearch: index: "metadata-${metadataType}-%{yyyy.MM.dd}" pipeline: ... sink: opensearch: index: "metadata-${metadataType}-%{yyyy.MM.dd.HH}"

O nome do índice também pode ser uma string simples com um padrão de data e hora como sufixo, como my-index-%{yyyy.MM.dd}. Quando o coletor envia dados para OpenSearch, ele substitui o padrão de data e hora pela hora UTC e cria um novo índice para cada dia, como. my-index-2022.01.25 Para obter mais informações, consulte a DateTimeFormatteraula.

Esse nome de índice também pode ser uma string formatada (com ou sem um sufixo de padrão de data e hora), como my-${index}-name. Quando o coletor envia dados para OpenSearch, ele substitui a "${index}" parte pelo valor no evento que está sendo processado. Se o formato for "${index1/index2/index3}", ele substituirá o campo index1/index2/index3 por seu valor no evento.

Gerar IDs de documentos

Um pipeline pode gerar uma ID de documento ao indexar OpenSearch documentos em. Ele pode inferir esses IDs de documentos a partir dos campos nos eventos recebidos.

Este exemplo usa o campo uuid de um evento recebido para gerar um ID do documento.

pipeline: ... sink: opensearch: index_type: custom index: "metadata-${metadataType}-%{yyyy.MM.dd}" document_id_field: "uuid"

No exemplo a seguir, o processador Adicionar entradas mescla os campos uuid e other_field do evento recebido para gerar um ID do documento.

A ação create garante que documentos com IDs idênticos não sejam substituídos. O pipeline elimina documentos duplicados sem nenhuma nova tentativa ou evento de DLQ. Essa é uma expectativa razoável para autores de pipelines que usam essa ação, pois o objetivo é evitar a atualização de documentos existentes.

pipeline: ... processor: - add_entries: entries: - key: "my_doc_id_field" format: "${uuid}-${other_field}" sink: - opensearch: ... action: "create" document_id_field: "my_doc_id_field"

Talvez você queira definir o ID do documento de um evento como um campo de um subobjeto. No exemplo a seguir, o plug-in OpenSearch sink usa o subobjeto info/id para gerar uma ID de documento.

sink: - opensearch: ... document_id_field: info/id

Dado o evento a seguir, o pipeline gerará um documento com o campo _id definido como json001:

{ "fieldA":"arbitrary value", "info":{ "id":"json001", "fieldA":"xyz", "fieldB":"def" } }

Gerar IDs de roteamento

Você pode usar a routing_field opção no plug-in de OpenSearch coletor para definir o valor de uma propriedade de roteamento de documentos (_routing) como um valor de um evento de entrada.

O roteamento é compatível com a sintaxe de ponteiro do JSON, portanto, campos aninhados também estão disponíveis, e não apenas campos de nível superior.

sink: - opensearch: ... routing_field: metadata/id document_id_field: id

Dado o evento a seguir, o plug-in gerará um documento com o campo _routing definido como abcd:

{ "id":"123", "metadata":{ "id":"abcd", "fieldA":"valueA" }, "fieldB":"valueB" }

Para obter instruções sobre como criar modelos de índice que os pipelines podem usar durante a criação do índice, consulte Modelos de índice.

E: nd-to-end reconhecimento

OpenSearch A ingestão garante a durabilidade e a confiabilidade dos dados, rastreando sua entrega da origem aos sumidouros em pipelines sem estado usando reconhecimento. end-to-end Atualmente, somente o plug-in de origem do S3 oferece suporte à end-to-end confirmação.

Com a end-to-end confirmação, o plug-in de origem do pipeline cria um conjunto de confirmações para monitorar um lote de eventos. Ele recebe uma confirmação positiva quando esses eventos são enviados com sucesso para seus coletores ou uma confirmação negativa quando nenhum dos eventos pôde ser enviado para seus coletores.

No caso de um evento negativo ou falha de um componente do pipeline, ou se uma fonte não receber uma confirmação, a fonte atinge o tempo limite e toma as medidas necessárias, como tentar novamente ou registrar a falha. Se o pipeline tiver vários coletores ou vários subpipelines configurados, as confirmações em nível de evento serão enviadas somente após o evento ser enviado para todos os coletores em todos os subpipelines. Se um coletor tiver uma DLQ configurada, as end-to-end confirmações também rastrearão eventos gravados na DLQ.

Para ativar a end-to-end confirmação, inclua a acknowledgments opção na configuração de origem:

s3-pipeline: source: s3: acknowledgments: true ...

Contrapressão da fonte

Um pipeline pode sofrer contrapressão quando está ocupado processando dados ou se seus sumidouros estão temporariamente inativos ou lentos para ingerir dados. OpenSearch A ingestão tem maneiras diferentes de lidar com a contrapressão, dependendo do plug-in de origem que um pipeline está usando.

Origem HTTP

Os pipelines que usam o plug-in de origem HTTP lidam com a pressão oposta de maneira diferente, dependendo de qual componente do pipeline está congestionado:

  • Buffers: quando os buffers estão cheios, o pipeline começa a retornar o status HTTP REQUEST_TIMEOUT com o código de erro 408 de volta ao endpoint de origem. À medida que os buffers são liberados, o pipeline começa a processar eventos HTTP novamente.

  • Threads de origem: quando todos os threads de origem HTTP estão ocupados executando solicitações e o tamanho da fila de solicitações não processadas excede o número máximo permitido de solicitações, o pipeline começa a retornar o status HTTP TOO_MANY_REQUESTS com o código de erro 429 de volta ao endpoint de origem. Quando a fila de solicitações fica abaixo do tamanho máximo permitido, o pipeline começa a processar as solicitações novamente.

Origem OTel

Quando os buffers estão cheios para pipelines que usam OpenTelemetry fontes (registros OTel, métricas OTel e rastreamento OTel), o pipeline começa a retornar o status HTTP REQUEST_TIMEOUT com o código de erro 408 para o endpoint de origem. À medida que os buffers são liberados, o pipeline começa a processar eventos novamente.

Origem do S3

Quando os buffers estão cheios para pipelines com uma origem do S3, os pipelines param de processar notificações SQS. À medida que os buffers são liberados, os pipelines começam a processar as notificações novamente.

Se um coletor estiver inativo ou não conseguir ingerir dados e a end-to-end confirmação for ativada para a origem, o pipeline interromperá o processamento das notificações do SQS até receber uma confirmação bem-sucedida de todos os coletores.