Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.
Descripción general de las funciones de canalización en Amazon OpenSearch Ingestion
Amazon OpenSearch Ingestion aprovisiona las canalizaciones, que constan de una fuente, un búfer, cero o más procesadores y uno o más receptores. Las canalizaciones de incorporación funcionan con Data Prepper como motor de datos. Para obtener información general sobre los distintos componentes de una canalización, consulte Conceptos clave.
En las siguientes secciones se ofrece información general sobre algunas de las funciones más utilizadas en Amazon OpenSearch Ingestion.
nota
No es una lista exhaustiva de las características disponibles para las canalización. Para obtener una documentación completa de todas las funciones de canalización disponibles, consulte Documentación de Data Prepper
Temas
Almacenamiento en búfer persistente
Un búfer persistente almacena los datos en un búfer basado en disco en varias zonas de disponibilidad para añadir durabilidad a los datos. Puede utilizar el almacenamiento en búfer persistente para ingerir datos de todas las fuentes push compatibles sin necesidad de configurar un búfer independiente. Estas incluyen el HTTP y OpenTelemetry las fuentes de registros, seguimientos y métricas.
Para habilitar el almacenamiento en búfer persistente, selecciona Habilitar el búfer persistente al crear o actualizar una canalización. Para obtener más información, consulte. Creación de canalizaciones OpenSearch de Amazon Ingestion OpenSearch La ingestión determina automáticamente la capacidad de almacenamiento en búfer requerida en función de las unidades de OpenSearch cómputo de ingestión (OCU de ingestión) que especifique para la canalización.
De forma predeterminada, las canalizaciones utilizan una para cifrar los datos del búfer. Clave propiedad de AWS Estas canalizaciones no necesitan ningún permiso adicional para desempeñar la función de canalización. Como alternativa, puedes especificar una clave gestionada por el cliente y añadir los siguientes permisos de IAM a la función de canalización:
{ "Version": "2012-10-17", "Statement": [ { "Sid": "KeyAccess", "Effect": "Allow", "Action": [ "kms:Decrypt", "kms:GenerateDataKeyWithoutPlaintext" ], "Resource": "arn:aws:kms:
{region}
:{aws-account-id}
:key/1234abcd-12ab-34cd-56ef-1234567890ab
" } ] }
Para más información, consulte las claves administradas por el cliente en la Guía para desarrolladores de AWS Key Management Service .
nota
Si deshabilita el almacenamiento en búfer persistente, su canalización se actualizará para que se ejecute por completo con el almacenamiento en búfer en memoria.
Ajustar el tamaño máximo de la carga útil solicitada
Si habilitas el almacenamiento en búfer persistente para una canalización, el tamaño máximo de la carga útil de solicitud se establece de forma predeterminada en 1 MB. El valor predeterminado ofrece el mejor rendimiento. Sin embargo, puede aumentar este valor si sus clientes envían solicitudes que superen 1 MB. Para ajustar el tamaño máximo de la carga útil, defina la max_request_length
opción en la configuración de origen. Al igual que el almacenamiento en búfer persistente, esta opción solo es compatible con HTTP y OpenTelemetry las fuentes de registros, seguimientos y métricas.
Los únicos valores válidos para la max_request_length
opción son 1 MB, 1,5 MB, 2 MB, 2,5 MB, 3 MB, 3,5 MB y 4 MB. Si especifica un valor diferente, recibirá un error.
El siguiente ejemplo muestra cómo configurar el tamaño máximo de carga útil dentro de una configuración de canalización:
... log-pipeline: source: http: path: "/${pipelineName}/logs" max_request_length:
4mb
processor: ...
Si no habilitas el almacenamiento en búfer persistente para una canalización, el valor de la max_request_length
opción se establece de forma predeterminada en 10 MB para todas las fuentes y no se puede modificar.
División
Puedes configurar una canalización de OpenSearch ingestión para dividir los eventos entrantes en una subcanalización, lo que te permitirá realizar diferentes tipos de procesamiento en el mismo evento entrante.
El siguiente ejemplo de canalización divide los eventos entrantes en dos subcanalizaciones. Cada subcanalización utiliza su propio procesador para enriquecer y manipular los datos y, a continuación, los envía a distintos índices. OpenSearch
version: "2" log-pipeline: source: http: ... sink: - pipeline: name: "logs_enriched_one_pipeline" - pipeline: name: "logs_enriched_two_pipeline" logs_enriched_one_pipeline: source: log-pipeline processor: ... sink: - opensearch: # Provide a domain or collection endpoint # Enable the 'serverless' flag if the sink is an OpenSearch Serverless collection aws: ... index: "enriched_one_logs" logs_enriched_two_pipeline: source: log-pipeline processor: ... sink: - opensearch: # Provide a domain or collection endpoint # Enable the 'serverless' flag if the sink is an OpenSearch Serverless collection aws: ... index: "enriched_two_logs"
Encadenar
Puede encadenar varias subcanalizaciones para procesar y enriquecer los datos por partes. En otras palabras, puede enriquecer un evento entrante con determinadas capacidades de procesamiento en una subcanalización, luego enviarlo a otra subcanalización para enriquecerla aún más con un procesador diferente y, finalmente, enviarlo a su receptor. OpenSearch
En el siguiente ejemplo, la log_pipeline
subcanalización enriquece un evento de registro entrante con un conjunto de procesadores y, a continuación, envía el evento a un índice denominado. OpenSearch enriched_logs
La canalización envía el mismo evento a la log_advanced_pipeline
subcanalización, que lo procesa y lo envía a un índice diferente OpenSearch denominado. enriched_advanced_logs
version: "2" log-pipeline: source: http: ... processor: ... sink: - opensearch: # Provide a domain or collection endpoint # Enable the 'serverless' flag if the sink is an OpenSearch Serverless collection aws: ... index: "enriched_logs" - pipeline: name: "log_advanced_pipeline" log_advanced_pipeline: source: log-pipeline processor: ... sink: - opensearch: # Provide a domain or collection endpoint # Enable the 'serverless' flag if the sink is an OpenSearch Serverless collection aws: ... index: "enriched_advanced_logs"
Colas de mensajes fallidos
Las colas de mensajes fallidos (DLQ) son los destinos de los eventos que una canalización no logra registrar en un receptor. En OpenSearch Ingestion, debe especificar un bucket de Amazon S3 con los permisos de escritura adecuados para usarlo como DLQ. Puede añadir una configuración de DLQ a cada receptor de una canalización. Cuando una canalización detecta errores de escritura, crea objetos de DLQ en el bucket de S3 configurado. Los objetos DLQ existen dentro de un archivo JSON como una matriz de eventos fallidos.
Una canalización escribe eventos en la DLQ cuando se cumple alguna de las siguientes condiciones:
-
Los
max_retries
del OpenSearch fregadero están agotados. OpenSearch La ingestión requiere un mínimo de 16 para esta opción. -
El receptor rechaza los eventos debido a una condición de error.
Configuración
Para configurar una cola de mensajes fallidos para una subcanalización, especifique la opción dlq
en la configuración del receptor opensearch
:
apache-log-pipeline: ... sink: opensearch: dlq: s3: bucket: "my-dlq-bucket" key_path_prefix: "dlq-files" region: "us-west-2" sts_role_arn: "arn:aws:iam::123456789012:role/dlq-role"
Los archivos que se escriban en este DLQ de S3 tendrán el siguiente patrón de nomenclatura:
dlq-v${version}-${pipelineName}-${pluginId}-${timestampIso8601}-${uniqueId}
Para obtener más información, consulte Colas de mensajes fallidos (DLQ)
Para obtener las instrucciones de la configuración del rol sts_role_arn
, consulte Escribir en una cola de mensajes fallidos.
Ejemplo
Considere el siguiente ejemplo de archivo DLQ:
dlq-v2-apache-log-pipeline-opensearch-2023-04-05T15:26:19.152938Z-e7eb675a-f558-4048-8566-dac15a4f8343
Este es un ejemplo de datos que no se pudieron escribir en el receptor y que se envían al bucket DLQ de S3 para su posterior análisis:
Record_0 pluginId "opensearch" pluginName "opensearch" pipelineName "apache-log-pipeline" failedData index "logs" indexId null status 0 message "Number of retries reached the limit of max retries (configured value 15)" document log "sample log" timestamp "2023-04-14T10:36:01.070Z" Record_1 pluginId "opensearch" pluginName "opensearch" pipelineName "apache-log-pipeline" failedData index "logs" indexId null status 0 message "Number of retries reached the limit of max retries (configured value 15)" document log "another sample log" timestamp "2023-04-14T10:36:01.071Z"
Administración de índices
Amazon OpenSearch Ingestion cuenta con numerosas funciones de administración de índices, entre las que se incluyen las siguientes.
Creación de índices
Puede especificar un nombre de índice en un colector de canalización e OpenSearch Ingestion crea el índice cuando aprovisiona la canalización. Si ya existe un índice, la canalización lo usa para indexar los eventos entrantes. Si detiene y reinicia una canalización, o si actualiza su configuración de YAML, la canalización intentará crear nuevos índices si aún no existen. Una canalización nunca puede eliminar un índice.
En el siguiente ejemplo, los receptores crean dos índices cuando se aprovisiona la canalización:
sink: - opensearch: index: apache_logs - opensearch: index: nginx_logs
Creación de nombres y patrones de índice
Puede generar nombres de índice dinámicos mediante variables de los campos de los eventos entrantes. En la configuración del receptor, utilice el formato string${}
para indicar la interpolación de cadenas y utilice un puntero JSON para extraer los campos de los eventos. Las opciones para index_type
son custom
o management_disabled
. Como el index_type
valor predeterminado es custom
para los OpenSearch dominios y management_disabled
para las colecciones OpenSearch sin servidor, se puede dejar sin configurar.
Por ejemplo, la siguiente canalización selecciona el campo metadataType
entre los eventos entrantes para generar nombres de índice.
pipeline: ... sink: opensearch: index: "metadata-${metadataType}"
La siguiente configuración sigue generando un índice nuevo cada día o cada hora.
pipeline: ... sink: opensearch: index: "metadata-${metadataType}-%{yyyy.MM.dd}" pipeline: ... sink: opensearch: index: "metadata-${metadataType}-%{yyyy.MM.dd.HH}"
El nombre del índice también puede ser una cadena simple con un patrón de fecha y hora como sufijo, por ejemplo my-index-%{yyyy.MM.dd}
. Cuando el receptor envía datos a OpenSearch, reemplaza el patrón de fecha y hora por la hora UTC y crea un índice nuevo para cada día, por ejemplo. my-index-2022.01.25
Para obtener más información, consulta la DateTimeFormatter
Este nombre de índice también puede ser una cadena formateada (con o sin un sufijo de patrón de fecha y hora), como my-${index}-name
. Cuando el receptor envía datos a OpenSearch, reemplaza la "${index}"
parte por el valor en caso de que se esté procesando. Si el formato lo es "${index1/index2/index3}"
, reemplaza el campo index1/index2/index3
por su valor en el evento.
Creación de identificadores de documentos
Una canalización puede generar un identificador de documento mientras indexa los documentos en él. OpenSearch Puede deducir estos identificadores de documentos a partir de los campos incluidos en los eventos entrantes.
En este ejemplo, se utiliza el campo uuid
de un evento entrante para generar un identificador de documento.
pipeline: ... sink: opensearch: index_type: custom index: "metadata-${metadataType}-%{yyyy.MM.dd}" document_id_field: "uuid"
En el ejemplo siguiente, el procesador Add entriesuuid
y other_field
del evento entrante para generar un identificador de documento.
La acción create
garantiza que los documentos con identificadores idénticos no se sobrescriban. La canalización elimina los documentos duplicados sin ningún tipo de reintento ni ningún evento de DLQ. Esta es una expectativa razonable para los autores de canalización que utilizan esta acción, ya que el objetivo es evitar actualizar los documentos existentes.
pipeline: ... processor: - add_entries: entries: - key: "my_doc_id_field" format: "${uuid}-${other_field}" sink: - opensearch: ... action: "create" document_id_field: "my_doc_id_field"
Es posible que desee establecer el identificador del documento de un evento en un campo de un subobjeto. En el siguiente ejemplo, el complemento OpenSearch sink usa el subobjeto info/id
para generar un ID de documento.
sink: - opensearch: ... document_id_field: info/id
Si se produce el siguiente evento, la canalización generará un documento con el campo _id
establecido en json001
:
{ "fieldA":"arbitrary value", "info":{ "id":"json001", "fieldA":"xyz", "fieldB":"def" } }
Creación de identificadores de enrutamiento
Puedes usar la routing_field
opción del complemento OpenSearch sink para establecer el valor de una propiedad de enrutamiento de documentos (_routing
) en el valor de un evento entrante.
El enrutamiento admite la sintaxis de punteros JSON, por lo que también están disponibles campos anidados, no solo campos de nivel superior.
sink: - opensearch: ... routing_field: metadata/id document_id_field: id
Si se produce el siguiente evento, el complemento genera un documento con el campo _routing
establecido en abcd
:
{ "id":"123", "metadata":{ "id":"abcd", "fieldA":"valueA" }, "fieldB":"valueB" }
Para obtener instrucciones sobre cómo crear plantillas de índice que las canalizaciones puedan utilizar durante la creación del índice, consulte Plantillas de índice
E: nd-to-end acuse de recibo
OpenSearch La ingestión garantiza la durabilidad y la fiabilidad de los datos al rastrear su entrega desde su origen hasta los sumideros de los oleoductos sin estado mediante el uso de acuse de recibo. end-to-end Actualmente, solo el complemento fuente de S3 admite el reconocimiento
Con el end-to-end acuse de recibo, el complemento Pipeline Source crea un conjunto de acuses de recibo para monitorear un lote de eventos. Recibe un reconocimiento positivo cuando esos eventos se envían correctamente a sus receptores, o un reconocimiento negativo cuando alguno de los eventos no se ha podido enviar a sus receptores.
En caso de fallo o caída de un componente de la canalización, o si un origen no recibe un reconocimiento, se agota el tiempo de espera y toma las medidas necesarias, como volver a intentarlo o registrar el fallo. Si la canalización tiene configurados varios receptores o subcanalizaciones, las confirmaciones a nivel de evento solo se envían después de que el evento se envíe a todos los receptores de todas las subcanalizaciones. Si un receptor tiene un DLQ configurado, los acuses de end-to-end recibo también rastrean los eventos escritos en el DLQ.
Para habilitar el end-to-end reconocimiento, incluya la acknowledgments
opción en la configuración de origen:
s3-pipeline: source: s3: acknowledgments: true ...
Contrapresión de la fuente
Una canalización puede sufrir una contrapresión cuando está ocupada procesando datos o si sus sumideros están temporalmente inactivos o tardan en ingerir datos. OpenSearch La ingestión tiene diferentes formas de gestionar la contrapresión en función del complemento fuente que utilice la canalización.
Origen de HTTP
Las canalizaciones que utilizan el complemento origen de HTTP
-
Búferes: cuando los búferes están llenos, la canalización comienza a devolver el estado HTTP
REQUEST_TIMEOUT
con el código de error 408 al punto de conexión de origen. A medida que se van liberando los búferes, la canalización vuelve a procesar los eventos HTTP. -
Subprocesos de origen: cuando todos los subprocesos de origen HTTP están ocupados ejecutando solicitudes y el tamaño de la cola de solicitudes sin procesar ha superado el número máximo de solicitudes permitido, la canalización comienza a devolver el estado HTTP
TOO_MANY_REQUESTS
con el código de error 429 al punto de conexión de origen. Cuando la cola de solicitudes cae por debajo del tamaño máximo permitido, la canalización vuelve a procesar las solicitudes.
Origen de OTel
Cuando los búferes están llenos para las canalizaciones que utilizan OpenTelemetry fuentes (registros de OtelREQUEST_TIMEOUT
con el código de error 408 al punto final de origen. A medida que se van liberando los búferes, la canalización vuelve a procesar los eventos.
Fuente de S3
Cuando los búferes están llenos para las canalizaciones con un origen de S3
Si un sumidero no funciona o no puede ingerir datos y el acuse de end-to-end recibo está activado para la fuente, la canalización deja de procesar las notificaciones de SQS hasta que reciba un acuse de recibo correcto de todos los sumideros.