Présentation des fonctionnalités du pipeline dans Amazon OpenSearch Ingestion - Amazon OpenSearch Service

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Présentation des fonctionnalités du pipeline dans Amazon OpenSearch Ingestion

Amazon OpenSearch Ingestion approvisionne des pipelines, qui se composent d'une source, d'une mémoire tampon, de zéro ou plusieurs processeurs et d'un ou plusieurs récepteurs. Les pipelines d'ingestion sont alimentés par Data Prepper en tant que moteur de données. Pour un aperçu des différents composants d'un pipeline, voirConcepts clés.

Les sections suivantes fournissent un aperçu de certaines des fonctionnalités les plus couramment utilisées dans Amazon OpenSearch Ingestion.

Note

Il ne s'agit pas d'une liste exhaustive des fonctionnalités disponibles pour les pipelines. Pour une documentation complète de toutes les fonctionnalités de pipeline disponibles, consultez la documentation Data Prepper. Notez que OpenSearch l'ingestion impose certaines contraintes sur les plugins et les options que vous pouvez utiliser. Pour plus d’informations, consultez Plug-ins et options pris en charge pour les pipelines OpenSearch Amazon Ingestion.

Mise en mémoire tampon persistante

Une mémoire tampon persistante stocke vos données dans une mémoire tampon sur disque dans plusieurs zones de disponibilité afin de renforcer la durabilité de vos données. Vous pouvez utiliser la mise en mémoire tampon persistante pour ingérer les données de toutes les sources push prises en charge sans avoir à configurer une mémoire tampon autonome. Il s'agit notamment du protocole HTTP et des OpenTelemetry sources pour les journaux, les traces et les métriques.

Pour activer la mise en mémoire tampon persistante, choisissez Activer la mémoire tampon persistante lors de la création ou de la mise à jour d'un pipeline. Pour plus d'informations, consultezCréation de pipelines OpenSearch Amazon Ingestion. OpenSearch L'ingestion détermine automatiquement la capacité de mise en mémoire tampon requise en fonction des unités de OpenSearch calcul d'ingestion (OCU d'ingestion) que vous spécifiez pour le pipeline.

Par défaut, les pipelines utilisent un Clé détenue par AWS pour chiffrer les données de la mémoire tampon. Ces pipelines ne nécessitent aucune autorisation supplémentaire pour le rôle de pipeline. Vous pouvez également spécifier une clé gérée par le client et ajouter les autorisations IAM suivantes au rôle de pipeline :

{ "Version": "2012-10-17", "Statement": [ { "Sid": "KeyAccess", "Effect": "Allow", "Action": [ "kms:Decrypt", "kms:GenerateDataKeyWithoutPlaintext" ], "Resource": "arn:aws:kms:{region}:{aws-account-id}:key/1234abcd-12ab-34cd-56ef-1234567890ab" } ] }

Pour plus d'informations, consultez Clés gérées par le client dans le Guide du développeur AWS Key Management Service (langue française non garantie).

Note

Si vous désactivez la mise en mémoire tampon persistante, votre pipeline sera mis à jour pour fonctionner entièrement sur la mise en mémoire tampon en mémoire.

Réglage de la taille maximale de la charge utile des demandes

Si vous activez la mise en mémoire tampon persistante pour un pipeline, la taille maximale de la charge utile des demandes est par défaut de 1 Mo. La valeur par défaut offre les meilleures performances. Toutefois, vous pouvez augmenter cette valeur si vos clients envoient des demandes supérieures à 1 Mo. Pour régler la taille maximale de la charge utile, définissez l'max_request_lengthoption dans la configuration source. Tout comme la mise en mémoire tampon persistante, cette option n'est prise en charge que pour le HTTP et les OpenTelemetry sources pour les journaux, les traces et les métriques.

Les seules valeurs valides pour l'max_request_lengthoption sont 1 Mo, 1,5 Mo, 2 Mo, 2,5 Mo, 3 Mo, 3,5 Mo et 4 Mo. Si vous spécifiez une valeur différente, un message d'erreur s'affiche.

L'exemple suivant montre comment configurer la taille de charge utile maximale dans une configuration de pipeline :

... log-pipeline: source: http: path: "/${pipelineName}/logs" max_request_length: 4mb processor: ...

Si vous n'activez pas la mise en mémoire tampon persistante pour un pipeline, la valeur de l'max_request_lengthoption est par défaut de 10 Mo pour toutes les sources et ne peut pas être modifiée.

Fractionnement

Vous pouvez configurer un pipeline d' OpenSearch ingestion pour diviser les événements entrants en un sous-pipeline, ce qui vous permet d'effectuer différents types de traitement sur le même événement entrant.

L'exemple de pipeline suivant divise les événements entrants en deux sous-pipelines. Chaque sous-pipeline utilise son propre processeur pour enrichir et manipuler les données, puis envoie les données vers différents OpenSearch index.

version: "2" log-pipeline: source: http: ... sink: - pipeline: name: "logs_enriched_one_pipeline" - pipeline: name: "logs_enriched_two_pipeline" logs_enriched_one_pipeline: source: log-pipeline processor: ... sink: - opensearch: # Provide a domain or collection endpoint # Enable the 'serverless' flag if the sink is an OpenSearch Serverless collection aws: ... index: "enriched_one_logs" logs_enriched_two_pipeline: source: log-pipeline processor: ... sink: - opensearch: # Provide a domain or collection endpoint # Enable the 'serverless' flag if the sink is an OpenSearch Serverless collection aws: ... index: "enriched_two_logs"

Création de chaînes

Vous pouvez enchaîner plusieurs sous-pipelines afin d'effectuer le traitement et l'enrichissement des données par morceaux. En d'autres termes, vous pouvez enrichir un événement entrant avec certaines capacités de traitement dans un sous-pipeline, puis l'envoyer vers un autre sous-pipeline pour un enrichissement supplémentaire avec un processeur différent, et enfin l'envoyer vers son OpenSearch récepteur.

Dans l'exemple suivant, le log_pipeline sous-pipeline enrichit un événement de journal entrant avec un ensemble de processeurs, puis envoie l'événement à un OpenSearch index nommé. enriched_logs Le pipeline envoie le même événement au log_advanced_pipeline sous-pipeline, qui le traite et l'envoie à un OpenSearch index différent nomméenriched_advanced_logs.

version: "2" log-pipeline: source: http: ... processor: ... sink: - opensearch: # Provide a domain or collection endpoint # Enable the 'serverless' flag if the sink is an OpenSearch Serverless collection aws: ... index: "enriched_logs" - pipeline: name: "log_advanced_pipeline" log_advanced_pipeline: source: log-pipeline processor: ... sink: - opensearch: # Provide a domain or collection endpoint # Enable the 'serverless' flag if the sink is an OpenSearch Serverless collection aws: ... index: "enriched_advanced_logs"

Files d’attente de lettres mortes

Les files d'attente de lettres mortes (DLQ) sont des destinations pour les événements qu'un pipeline ne parvient pas à écrire dans un récepteur. Dans OpenSearch Ingestion, vous devez spécifier un compartiment Amazon S3 doté des autorisations d'écriture appropriées à utiliser comme DLQ. Vous pouvez ajouter une configuration DLQ à chaque récepteur d'un pipeline. Lorsqu'un pipeline rencontre des erreurs d'écriture, il crée des objets DLQ dans le compartiment S3 configuré. Les objets DLQ existent dans un fichier JSON sous la forme d'un tableau d'événements ayant échoué.

Un pipeline écrit des événements dans le DLQ lorsque l'une des conditions suivantes est remplie :

  • Les max_retries parois du OpenSearch lavabo sont épuisées. OpenSearch L'ingestion nécessite un minimum de 16 pour cette option.

  • Les événements sont rejetés par le récepteur en raison d'une erreur.

Configuration

Pour configurer une file d'attente de lettres mortes pour un sous-pipeline, spécifiez l'dlqoption dans la configuration du opensearch récepteur :

apache-log-pipeline: ... sink: opensearch: dlq: s3: bucket: "my-dlq-bucket" key_path_prefix: "dlq-files" region: "us-west-2" sts_role_arn: "arn:aws:iam::123456789012:role/dlq-role"

Les fichiers écrits dans ce DLQ S3 auront le modèle de dénomination suivant :

dlq-v${version}-${pipelineName}-${pluginId}-${timestampIso8601}-${uniqueId}

Pour plus d'informations, consultez Dead-Letter Queues (DLQ).

Pour obtenir des instructions sur la configuration du sts_role_arn rôle, consultezÉcrire dans une file d'attente de lettres mortes.

Exemple

Prenons l'exemple de fichier DLQ suivant :

dlq-v2-apache-log-pipeline-opensearch-2023-04-05T15:26:19.152938Z-e7eb675a-f558-4048-8566-dac15a4f8343

Voici un exemple de données qui n'ont pas pu être écrites dans le récepteur et qui sont envoyées au compartiment DLQ S3 pour une analyse plus approfondie :

Record_0 pluginId "opensearch" pluginName "opensearch" pipelineName "apache-log-pipeline" failedData index "logs" indexId null status 0 message "Number of retries reached the limit of max retries (configured value 15)" document log "sample log" timestamp "2023-04-14T10:36:01.070Z" Record_1 pluginId "opensearch" pluginName "opensearch" pipelineName "apache-log-pipeline" failedData index "logs" indexId null status 0 message "Number of retries reached the limit of max retries (configured value 15)" document log "another sample log" timestamp "2023-04-14T10:36:01.071Z"

Gestion des indices

Amazon OpenSearch Ingestion possède de nombreuses fonctionnalités de gestion d'index, notamment les suivantes.

Création d'index

Vous pouvez spécifier un nom d'index dans un récepteur de pipeline et OpenSearch Ingestion crée l'index lorsqu'elle approvisionne le pipeline. Si un index existe déjà, le pipeline l'utilise pour indexer les événements entrants. Si vous arrêtez et redémarrez un pipeline, ou si vous mettez à jour sa configuration YAML, le pipeline tente de créer de nouveaux index s'ils n'existent pas déjà. Un pipeline ne peut jamais supprimer un index.

Les exemples de cuvettes suivants créent deux index lorsque le pipeline est approvisionné :

sink: - opensearch: index: apache_logs - opensearch: index: nginx_logs

Génération de noms et de modèles d'index

Vous pouvez générer des noms d'index dynamiques en utilisant des variables issues des champs des événements entrants. Dans la configuration du récepteur, utilisez le format string${} pour signaler l'interpolation des chaînes et utilisez un pointeur JSON pour extraire les champs des événements. Les options pour index_type sont custom oumanagement_disabled. Comme la index_type valeur par défaut est custom pour OpenSearch les domaines et management_disabled pour les collections OpenSearch sans serveur, elle peut être désactivée.

Par exemple, le pipeline suivant sélectionne le metadataType champ parmi les événements entrants pour générer des noms d'index.

pipeline: ... sink: opensearch: index: "metadata-${metadataType}"

La configuration suivante continue de générer un nouvel index tous les jours ou toutes les heures.

pipeline: ... sink: opensearch: index: "metadata-${metadataType}-%{yyyy.MM.dd}" pipeline: ... sink: opensearch: index: "metadata-${metadataType}-%{yyyy.MM.dd.HH}"

Le nom de l'index peut également être une chaîne simple avec un modèle date-heure comme suffixe, tel que. my-index-%{yyyy.MM.dd} Lorsque le récepteur envoie des données à OpenSearch, il remplace le modèle date-heure par l'heure UTC et crée un nouvel index pour chaque jour, tel que. my-index-2022.01.25 Pour plus d'informations, consultez le DateTimeFormattercours.

Ce nom d'index peut également être une chaîne formatée (avec ou sans suffixe de modèle date-heure), telle que. my-${index}-name Lorsque le récepteur envoie des données à OpenSearch, il remplace la "${index}" partie par la valeur de l'événement en cours de traitement. Si le format est le cas"${index1/index2/index3}", il remplace le champ index1/index2/index3 par sa valeur dans l'événement.

Génération d'identifiants de documents

Un pipeline peut générer un identifiant de document lors de l'indexation de documents vers OpenSearch. Il peut déduire ces identifiants de documents à partir des champs des événements entrants.

Cet exemple utilise le uuid champ d'un événement entrant pour générer un identifiant de document.

pipeline: ... sink: opensearch: index_type: custom index: "metadata-${metadataType}-%{yyyy.MM.dd}" document_id_field: "uuid"

Dans l'exemple suivant, le processeur d'ajout d'entrées fusionne les champs uuid et ceux other_field de l'événement entrant pour générer un identifiant de document.

Cette create action garantit que les documents portant des identifiants identiques ne sont pas remplacés. Le pipeline supprime les documents dupliqués sans aucune nouvelle tentative ni événement DLQ. C'est une attente raisonnable pour les auteurs de pipeline qui utilisent cette action, car l'objectif est d'éviter de mettre à jour les documents existants.

pipeline: ... processor: - add_entries: entries: - key: "my_doc_id_field" format: "${uuid}-${other_field}" sink: - opensearch: ... action: "create" document_id_field: "my_doc_id_field"

Vous souhaiterez peut-être définir l'ID du document d'un événement sur un champ d'un sous-objet. Dans l'exemple suivant, le plugin OpenSearch sink utilise le sous-objet info/id pour générer un identifiant de document.

sink: - opensearch: ... document_id_field: info/id

Compte tenu de l'événement suivant, le pipeline générera un document dont le _id champ est défini sur json001 :

{ "fieldA":"arbitrary value", "info":{ "id":"json001", "fieldA":"xyz", "fieldB":"def" } }

Génération d'identifiants de routage

Vous pouvez utiliser l'routing_fieldoption du plugin OpenSearch sink pour définir la valeur d'une propriété de routage de documents (_routing) sur une valeur provenant d'un événement entrant.

Le routage prend en charge la syntaxe des pointeurs JSON, de sorte que les champs imbriqués sont également disponibles, et pas seulement les champs de niveau supérieur.

sink: - opensearch: ... routing_field: metadata/id document_id_field: id

Compte tenu de l'événement suivant, le plugin génère un document dont le _routing champ est défini sur abcd :

{ "id":"123", "metadata":{ "id":"abcd", "fieldA":"valueA" }, "fieldB":"valueB" }

Pour obtenir des instructions sur la création de modèles d'index que les pipelines peuvent utiliser lors de la création d'index, voir Modèles d'index.

nd-to-end Reconnaissance électronique

OpenSearch L'ingestion garantit la durabilité et la fiabilité des données en suivant leur transmission de la source aux récepteurs dans des pipelines apatrides à l'aide d'un end-to-endaccusé de réception. Actuellement, seul le plugin source S3 prend en charge l' end-to-end accusé de réception.

Avec un end-to-end accusé de réception, le plugin source du pipeline crée un ensemble d'accusés de réception pour surveiller un lot d'événements. Il reçoit un accusé de réception positif lorsque ces événements sont envoyés avec succès à leurs récepteurs, ou un accusé de réception négatif lorsqu'aucun des événements n'a pu être envoyé à leurs récepteurs.

En cas de panne ou de crash d'un composant du pipeline, ou si une source ne reçoit pas d'accusé de réception, la source expire et prend les mesures nécessaires, telles qu'une nouvelle tentative ou l'enregistrement de la panne. Si le pipeline possède plusieurs récepteurs ou plusieurs sous-pipelines configurés, les accusés de réception au niveau de l'événement ne sont envoyés qu'une fois que l'événement a été envoyé à tous les récepteurs de tous les sous-pipelines. Si une DLQ est configurée sur un récepteur, les accusés de end-to-end réception suivent également les événements écrits sur la DLQ.

Pour activer l' end-to-end accusé de réception, incluez l'acknowledgmentsoption dans la configuration source :

s3-pipeline: source: s3: acknowledgments: true ...

Contre-pression à la source

Un pipeline peut subir une contre-pression lorsqu'il est occupé à traiter des données, ou si ses récepteurs sont temporairement inactifs ou lents à ingérer les données. OpenSearch L'ingestion permet de gérer la contre-pression de différentes manières en fonction du plugin source utilisé par un pipeline.

Source HTTP

Les pipelines qui utilisent le plug-in source HTTP gèrent la contre-pression différemment selon le composant du pipeline congestionné :

  • Tampons — Lorsque les tampons sont pleins, le pipeline commence à renvoyer l'état HTTP REQUEST_TIMEOUT avec le code d'erreur 408 au point de terminaison source. Lorsque les tampons sont libérés, le pipeline recommence à traiter les événements HTTP.

  • Threads source — Lorsque tous les threads source HTTP sont occupés à exécuter des requêtes et que la taille de la file d'attente de demandes non traitées dépasse le nombre maximum autorisé de demandes, le pipeline commence à renvoyer l'état HTTP TOO_MANY_REQUESTS avec le code d'erreur 429 au point de terminaison source. Lorsque la file d'attente de demandes tombe en dessous de la taille de file d'attente maximale autorisée, le pipeline recommence à traiter les demandes.

Source de l'hôtel

Lorsque les tampons sont pleins pour les pipelines qui utilisent des OpenTelemetry sources (journaux OTel, métriques OTel et trace OTel), le pipeline commence à renvoyer l'état HTTP REQUEST_TIMEOUT avec le code d'erreur 408 au point de terminaison source. Lorsque les tampons sont libérés, le pipeline recommence à traiter les événements.

Source S3

Lorsque les tampons sont pleins pour les pipelines avec une source S3, les pipelines arrêtent de traiter les notifications SQS. Au fur et à mesure que les tampons sont libérés, les pipelines recommencent à traiter les notifications.

Si un récepteur est en panne ou ne parvient pas à ingérer les données et qu'un end-to-end accusé de réception est activé pour la source, le pipeline arrête de traiter les notifications SQS jusqu'à ce qu'il reçoive un accusé de réception de la part de tous les récepteurs.