Descripción general de las funciones de canalización en Amazon OpenSearch Ingestion - OpenSearch Servicio Amazon

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Descripción general de las funciones de canalización en Amazon OpenSearch Ingestion

Amazon OpenSearch Ingestion aprovisiona las canalizaciones, que constan de una fuente, un búfer, cero o más procesadores y uno o más receptores. Las canalizaciones de incorporación funcionan con Data Prepper como motor de datos. Para obtener información general sobre los distintos componentes de una canalización, consulte Conceptos clave.

En las siguientes secciones se ofrece información general sobre algunas de las funciones más utilizadas en Amazon OpenSearch Ingestion.

nota

No es una lista exhaustiva de las características disponibles para las canalización. Para obtener una documentación completa de todas las funciones de canalización disponibles, consulte Documentación de Data Prepper. Tenga en cuenta que OpenSearch Ingestion impone algunas restricciones a los complementos y las opciones que puede utilizar. Para obtener más información, consulte Plugins y opciones compatibles para las canalizaciones de Amazon OpenSearch Ingestion.

Almacenamiento en búfer persistente

Un búfer persistente almacena los datos en un búfer basado en disco en varias zonas de disponibilidad para añadir durabilidad a los datos. Puede utilizar el almacenamiento en búfer persistente para ingerir datos de todas las fuentes push compatibles sin necesidad de configurar un búfer independiente. Estas incluyen el HTTP y OpenTelemetry las fuentes de registros, seguimientos y métricas.

Para habilitar el almacenamiento en búfer persistente, selecciona Habilitar el búfer persistente al crear o actualizar una canalización. Para obtener más información, consulte. Creación de canalizaciones OpenSearch de Amazon Ingestion OpenSearch La ingestión determina automáticamente la capacidad de almacenamiento en búfer requerida en función de las unidades de OpenSearch cómputo de ingestión (OCU de ingestión) que especifique para la canalización.

De forma predeterminada, las canalizaciones utilizan una para cifrar los datos del búfer. Clave propiedad de AWS Estas canalizaciones no necesitan ningún permiso adicional para desempeñar la función de canalización. Como alternativa, puedes especificar una clave gestionada por el cliente y añadir los siguientes permisos de IAM a la función de canalización:

{ "Version": "2012-10-17", "Statement": [ { "Sid": "KeyAccess", "Effect": "Allow", "Action": [ "kms:Decrypt", "kms:GenerateDataKeyWithoutPlaintext" ], "Resource": "arn:aws:kms:{region}:{aws-account-id}:key/1234abcd-12ab-34cd-56ef-1234567890ab" } ] }

Para más información, consulte las claves administradas por el cliente en la Guía para desarrolladores de AWS Key Management Service .

nota

Si deshabilita el almacenamiento en búfer persistente, su canalización se actualizará para que se ejecute por completo con el almacenamiento en búfer en memoria.

Ajustar el tamaño máximo de la carga útil solicitada

Si habilitas el almacenamiento en búfer persistente para una canalización, el tamaño máximo de la carga útil de solicitud se establece de forma predeterminada en 1 MB. El valor predeterminado ofrece el mejor rendimiento. Sin embargo, puede aumentar este valor si sus clientes envían solicitudes que superen 1 MB. Para ajustar el tamaño máximo de la carga útil, defina la max_request_length opción en la configuración de origen. Al igual que el almacenamiento en búfer persistente, esta opción solo es compatible con HTTP y OpenTelemetry las fuentes de registros, seguimientos y métricas.

Los únicos valores válidos para la max_request_length opción son 1 MB, 1,5 MB, 2 MB, 2,5 MB, 3 MB, 3,5 MB y 4 MB. Si especifica un valor diferente, recibirá un error.

El siguiente ejemplo muestra cómo configurar el tamaño máximo de carga útil dentro de una configuración de canalización:

... log-pipeline: source: http: path: "/${pipelineName}/logs" max_request_length: 4mb processor: ...

Si no habilitas el almacenamiento en búfer persistente para una canalización, el valor de la max_request_length opción se establece de forma predeterminada en 10 MB para todas las fuentes y no se puede modificar.

División

Puedes configurar una canalización de OpenSearch ingestión para dividir los eventos entrantes en una subcanalización, lo que te permitirá realizar diferentes tipos de procesamiento en el mismo evento entrante.

El siguiente ejemplo de canalización divide los eventos entrantes en dos subcanalizaciones. Cada subcanalización utiliza su propio procesador para enriquecer y manipular los datos y, a continuación, los envía a distintos índices. OpenSearch

version: "2" log-pipeline: source: http: ... sink: - pipeline: name: "logs_enriched_one_pipeline" - pipeline: name: "logs_enriched_two_pipeline" logs_enriched_one_pipeline: source: log-pipeline processor: ... sink: - opensearch: # Provide a domain or collection endpoint # Enable the 'serverless' flag if the sink is an OpenSearch Serverless collection aws: ... index: "enriched_one_logs" logs_enriched_two_pipeline: source: log-pipeline processor: ... sink: - opensearch: # Provide a domain or collection endpoint # Enable the 'serverless' flag if the sink is an OpenSearch Serverless collection aws: ... index: "enriched_two_logs"

Encadenar

Puede encadenar varias subcanalizaciones para procesar y enriquecer los datos por partes. En otras palabras, puede enriquecer un evento entrante con determinadas capacidades de procesamiento en una subcanalización, luego enviarlo a otra subcanalización para enriquecerla aún más con un procesador diferente y, finalmente, enviarlo a su receptor. OpenSearch

En el siguiente ejemplo, la log_pipeline subcanalización enriquece un evento de registro entrante con un conjunto de procesadores y, a continuación, envía el evento a un índice denominado. OpenSearch enriched_logs La canalización envía el mismo evento a la log_advanced_pipeline subcanalización, que lo procesa y lo envía a un índice diferente OpenSearch denominado. enriched_advanced_logs

version: "2" log-pipeline: source: http: ... processor: ... sink: - opensearch: # Provide a domain or collection endpoint # Enable the 'serverless' flag if the sink is an OpenSearch Serverless collection aws: ... index: "enriched_logs" - pipeline: name: "log_advanced_pipeline" log_advanced_pipeline: source: log-pipeline processor: ... sink: - opensearch: # Provide a domain or collection endpoint # Enable the 'serverless' flag if the sink is an OpenSearch Serverless collection aws: ... index: "enriched_advanced_logs"

Colas de mensajes fallidos

Las colas de mensajes fallidos (DLQ) son los destinos de los eventos que una canalización no logra registrar en un receptor. En OpenSearch Ingestion, debe especificar un bucket de Amazon S3 con los permisos de escritura adecuados para usarlo como DLQ. Puede añadir una configuración de DLQ a cada receptor de una canalización. Cuando una canalización detecta errores de escritura, crea objetos de DLQ en el bucket de S3 configurado. Los objetos DLQ existen dentro de un archivo JSON como una matriz de eventos fallidos.

Una canalización escribe eventos en la DLQ cuando se cumple alguna de las siguientes condiciones:

  • Los max_retries del OpenSearch fregadero están agotados. OpenSearch La ingestión requiere un mínimo de 16 para esta opción.

  • El receptor rechaza los eventos debido a una condición de error.

Configuración

Para configurar una cola de mensajes fallidos para una subcanalización, especifique la opción dlq en la configuración del receptor opensearch:

apache-log-pipeline: ... sink: opensearch: dlq: s3: bucket: "my-dlq-bucket" key_path_prefix: "dlq-files" region: "us-west-2" sts_role_arn: "arn:aws:iam::123456789012:role/dlq-role"

Los archivos que se escriban en este DLQ de S3 tendrán el siguiente patrón de nomenclatura:

dlq-v${version}-${pipelineName}-${pluginId}-${timestampIso8601}-${uniqueId}

Para obtener más información, consulte Colas de mensajes fallidos (DLQ).

Para obtener las instrucciones de la configuración del rol sts_role_arn, consulte Escribir en una cola de mensajes fallidos.

Ejemplo

Considere el siguiente ejemplo de archivo DLQ:

dlq-v2-apache-log-pipeline-opensearch-2023-04-05T15:26:19.152938Z-e7eb675a-f558-4048-8566-dac15a4f8343

Este es un ejemplo de datos que no se pudieron escribir en el receptor y que se envían al bucket DLQ de S3 para su posterior análisis:

Record_0 pluginId "opensearch" pluginName "opensearch" pipelineName "apache-log-pipeline" failedData index "logs" indexId null status 0 message "Number of retries reached the limit of max retries (configured value 15)" document log "sample log" timestamp "2023-04-14T10:36:01.070Z" Record_1 pluginId "opensearch" pluginName "opensearch" pipelineName "apache-log-pipeline" failedData index "logs" indexId null status 0 message "Number of retries reached the limit of max retries (configured value 15)" document log "another sample log" timestamp "2023-04-14T10:36:01.071Z"

Administración de índices

Amazon OpenSearch Ingestion cuenta con numerosas funciones de administración de índices, entre las que se incluyen las siguientes.

Creación de índices

Puede especificar un nombre de índice en un colector de canalización e OpenSearch Ingestion crea el índice cuando aprovisiona la canalización. Si ya existe un índice, la canalización lo usa para indexar los eventos entrantes. Si detiene y reinicia una canalización, o si actualiza su configuración de YAML, la canalización intentará crear nuevos índices si aún no existen. Una canalización nunca puede eliminar un índice.

En el siguiente ejemplo, los receptores crean dos índices cuando se aprovisiona la canalización:

sink: - opensearch: index: apache_logs - opensearch: index: nginx_logs

Creación de nombres y patrones de índice

Puede generar nombres de índice dinámicos mediante variables de los campos de los eventos entrantes. En la configuración del receptor, utilice el formato string${} para indicar la interpolación de cadenas y utilice un puntero JSON para extraer los campos de los eventos. Las opciones para index_type son custom o management_disabled. Como el index_type valor predeterminado es custom para los OpenSearch dominios y management_disabled para las colecciones OpenSearch sin servidor, se puede dejar sin configurar.

Por ejemplo, la siguiente canalización selecciona el campo metadataType entre los eventos entrantes para generar nombres de índice.

pipeline: ... sink: opensearch: index: "metadata-${metadataType}"

La siguiente configuración sigue generando un índice nuevo cada día o cada hora.

pipeline: ... sink: opensearch: index: "metadata-${metadataType}-%{yyyy.MM.dd}" pipeline: ... sink: opensearch: index: "metadata-${metadataType}-%{yyyy.MM.dd.HH}"

El nombre del índice también puede ser una cadena simple con un patrón de fecha y hora como sufijo, por ejemplo my-index-%{yyyy.MM.dd}. Cuando el receptor envía datos a OpenSearch, reemplaza el patrón de fecha y hora por la hora UTC y crea un índice nuevo para cada día, por ejemplo. my-index-2022.01.25 Para obtener más información, consulta la DateTimeFormatterclase.

Este nombre de índice también puede ser una cadena formateada (con o sin un sufijo de patrón de fecha y hora), como my-${index}-name. Cuando el receptor envía datos a OpenSearch, reemplaza la "${index}" parte por el valor en caso de que se esté procesando. Si el formato lo es "${index1/index2/index3}", reemplaza el campo index1/index2/index3 por su valor en el evento.

Creación de identificadores de documentos

Una canalización puede generar un identificador de documento mientras indexa los documentos en él. OpenSearch Puede deducir estos identificadores de documentos a partir de los campos incluidos en los eventos entrantes.

En este ejemplo, se utiliza el campo uuid de un evento entrante para generar un identificador de documento.

pipeline: ... sink: opensearch: index_type: custom index: "metadata-${metadataType}-%{yyyy.MM.dd}" document_id_field: "uuid"

En el ejemplo siguiente, el procesador Add entries combina los campos uuid y other_field del evento entrante para generar un identificador de documento.

La acción create garantiza que los documentos con identificadores idénticos no se sobrescriban. La canalización elimina los documentos duplicados sin ningún tipo de reintento ni ningún evento de DLQ. Esta es una expectativa razonable para los autores de canalización que utilizan esta acción, ya que el objetivo es evitar actualizar los documentos existentes.

pipeline: ... processor: - add_entries: entries: - key: "my_doc_id_field" format: "${uuid}-${other_field}" sink: - opensearch: ... action: "create" document_id_field: "my_doc_id_field"

Es posible que desee establecer el identificador del documento de un evento en un campo de un subobjeto. En el siguiente ejemplo, el complemento OpenSearch sink usa el subobjeto info/id para generar un ID de documento.

sink: - opensearch: ... document_id_field: info/id

Si se produce el siguiente evento, la canalización generará un documento con el campo _id establecido en json001:

{ "fieldA":"arbitrary value", "info":{ "id":"json001", "fieldA":"xyz", "fieldB":"def" } }

Creación de identificadores de enrutamiento

Puedes usar la routing_field opción del complemento OpenSearch sink para establecer el valor de una propiedad de enrutamiento de documentos (_routing) en el valor de un evento entrante.

El enrutamiento admite la sintaxis de punteros JSON, por lo que también están disponibles campos anidados, no solo campos de nivel superior.

sink: - opensearch: ... routing_field: metadata/id document_id_field: id

Si se produce el siguiente evento, el complemento genera un documento con el campo _routing establecido en abcd:

{ "id":"123", "metadata":{ "id":"abcd", "fieldA":"valueA" }, "fieldB":"valueB" }

Para obtener instrucciones sobre cómo crear plantillas de índice que las canalizaciones puedan utilizar durante la creación del índice, consulte Plantillas de índice.

E: nd-to-end acuse de recibo

OpenSearch La ingestión garantiza la durabilidad y la fiabilidad de los datos al rastrear su entrega desde su origen hasta los sumideros de los oleoductos sin estado mediante el uso de acuse de recibo. end-to-end Actualmente, solo el complemento fuente de S3 admite el reconocimiento. end-to-end

Con el end-to-end acuse de recibo, el complemento Pipeline Source crea un conjunto de acuses de recibo para monitorear un lote de eventos. Recibe un reconocimiento positivo cuando esos eventos se envían correctamente a sus receptores, o un reconocimiento negativo cuando alguno de los eventos no se ha podido enviar a sus receptores.

En caso de fallo o caída de un componente de la canalización, o si un origen no recibe un reconocimiento, se agota el tiempo de espera y toma las medidas necesarias, como volver a intentarlo o registrar el fallo. Si la canalización tiene configurados varios receptores o subcanalizaciones, las confirmaciones a nivel de evento solo se envían después de que el evento se envíe a todos los receptores de todas las subcanalizaciones. Si un receptor tiene un DLQ configurado, los acuses de end-to-end recibo también rastrean los eventos escritos en el DLQ.

Para habilitar el end-to-end reconocimiento, incluya la acknowledgments opción en la configuración de origen:

s3-pipeline: source: s3: acknowledgments: true ...

Contrapresión de la fuente

Una canalización puede sufrir una contrapresión cuando está ocupada procesando datos o si sus sumideros están temporalmente inactivos o tardan en ingerir datos. OpenSearch La ingestión tiene diferentes formas de gestionar la contrapresión en función del complemento fuente que utilice la canalización.

Origen de HTTP

Las canalizaciones que utilizan el complemento origen de HTTP gestionan la contrapresión de forma diferente en función del componente de la canalización que esté congestionado:

  • Búferes: cuando los búferes están llenos, la canalización comienza a devolver el estado HTTP REQUEST_TIMEOUT con el código de error 408 al punto de conexión de origen. A medida que se van liberando los búferes, la canalización vuelve a procesar los eventos HTTP.

  • Subprocesos de origen: cuando todos los subprocesos de origen HTTP están ocupados ejecutando solicitudes y el tamaño de la cola de solicitudes sin procesar ha superado el número máximo de solicitudes permitido, la canalización comienza a devolver el estado HTTP TOO_MANY_REQUESTS con el código de error 429 al punto de conexión de origen. Cuando la cola de solicitudes cae por debajo del tamaño máximo permitido, la canalización vuelve a procesar las solicitudes.

Origen de OTel

Cuando los búferes están llenos para las canalizaciones que utilizan OpenTelemetry fuentes (registros de Otel, métricas de Otel y rastreo de Otel), la canalización comienza a devolver el estado HTTP REQUEST_TIMEOUT con el código de error 408 al punto final de origen. A medida que se van liberando los búferes, la canalización vuelve a procesar los eventos.

Fuente de S3

Cuando los búferes están llenos para las canalizaciones con un origen de S3, las canalizaciones dejan de procesar las notificaciones de SQS. A medida que se van liberando los búferes, la canalización vuelve a procesar las notificaciones.

Si un sumidero no funciona o no puede ingerir datos y el acuse de end-to-end recibo está activado para la fuente, la canalización deja de procesar las notificaciones de SQS hasta que reciba un acuse de recibo correcto de todos los sumideros.