Überblick über die Pipeline-Funktionen in Amazon OpenSearch Ingestion - OpenSearch Amazon-Dienst

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Überblick über die Pipeline-Funktionen in Amazon OpenSearch Ingestion

Amazon OpenSearch Ingestion stellt Pipelines bereit, die aus einer Quelle, einem Puffer, null oder mehr Prozessoren und einer oder mehreren Senken bestehen. Ingestion-Pipelines werden von Data Prepper als Daten-Engine unterstützt. Einen Überblick über die verschiedenen Komponenten einer Pipeline finden Sie unter. Die wichtigsten Konzepte

Die folgenden Abschnitte bieten einen Überblick über einige der am häufigsten verwendeten Funktionen in Amazon OpenSearch Ingestion.

Anmerkung

Dies ist keine vollständige Liste der Funktionen, die für Pipelines verfügbar sind. Eine umfassende Dokumentation aller verfügbaren Pipeline-Funktionen finden Sie in der Data Prepper-Dokumentation. Beachten Sie, dass OpenSearch Ingestion einige Einschränkungen hinsichtlich der Plugins und Optionen auferlegt, die Sie verwenden können. Weitere Informationen finden Sie unter Unterstützte Plugins und Optionen für Amazon OpenSearch Ingestion-Pipelines.

Dauerhafte Pufferung

Ein persistenter Puffer speichert Ihre Daten in einem festplattenbasierten Puffer über mehrere Availability Zones hinweg, um die Haltbarkeit Ihrer Daten zu erhöhen. Mithilfe der persistenten Pufferung können Sie Daten für alle unterstützten Push-basierten Quellen aufnehmen, ohne einen eigenständigen Puffer einrichten zu müssen. Dazu gehören HTTP und OpenTelemetry Quellen für Protokolle, Traces und Metriken.

Um die persistente Pufferung zu aktivieren, wählen Sie beim Erstellen oder Aktualisieren einer Pipeline die Option Persistenten Puffer aktivieren aus. Weitere Informationen finden Sie unterAmazon OpenSearch Ingestion-Pipelines erstellen. OpenSearch Die Aufnahme bestimmt automatisch die erforderliche Pufferkapazität auf der Grundlage der OpenSearch Ingestion-Recheneinheiten (Ingestion-OCUs), die Sie für die Pipeline angeben.

Standardmäßig verwenden Pipelines eine, um Pufferdaten zu verschlüsseln. AWS-eigener Schlüssel Diese Pipelines benötigen keine zusätzlichen Berechtigungen für die Pipeline-Rolle. Alternativ können Sie einen vom Kunden verwalteten Schlüssel angeben und der Pipeline-Rolle die folgenden IAM-Berechtigungen hinzufügen:

{ "Version": "2012-10-17", "Statement": [ { "Sid": "KeyAccess", "Effect": "Allow", "Action": [ "kms:Decrypt", "kms:GenerateDataKeyWithoutPlaintext" ], "Resource": "arn:aws:kms:{region}:{aws-account-id}:key/1234abcd-12ab-34cd-56ef-1234567890ab" } ] }

Weitere Informationen finden Sie unter Kundenverwaltete Schlüssel im AWS Key Management Service Entwicklerhandbuch.

Anmerkung

Wenn Sie die persistente Pufferung deaktivieren, wird Ihre Pipeline so aktualisiert, dass sie vollständig mit speicherinterner Pufferung läuft.

Optimierung der maximalen Payload-Größe für Anfragen

Wenn Sie die persistente Pufferung für eine Pipeline aktivieren, beträgt die maximale Größe der Anforderungsnutzlast standardmäßig 1 MB. Der Standardwert bietet die beste Leistung. Sie können diesen Wert jedoch erhöhen, wenn Ihre Kunden Anfragen senden, die 1 MB überschreiten. Um die maximale Nutzlastgröße einzustellen, legen Sie die max_request_length Option in der Quellkonfiguration fest. Genau wie die persistente Pufferung wird diese Option nur für HTTP und OpenTelemetry Quellen für Protokolle, Traces und Metriken unterstützt.

Die einzigen gültigen Werte für max_request_length diese Option sind 1 MB, 1,5 MB, 2 MB, 2,5 MB, 3 MB, 3,5 MB und 4 MB. Wenn Sie einen anderen Wert angeben, erhalten Sie eine Fehlermeldung.

Das folgende Beispiel zeigt, wie die maximale Nutzlastgröße innerhalb einer Pipeline-Konfiguration konfiguriert wird:

... log-pipeline: source: http: path: "/${pipelineName}/logs" max_request_length: 4mb processor: ...

Wenn Sie die persistente Pufferung für eine Pipeline nicht aktivieren, ist der Wert der max_request_length Option standardmäßig für alle Quellen auf 10 MB voreingestellt und kann nicht geändert werden.

Aufteilen

Sie können eine OpenSearch Ingestion-Pipeline so konfigurieren, dass eingehende Ereignisse in eine Unterpipeline aufgeteilt werden, sodass Sie verschiedene Arten der Verarbeitung desselben eingehenden Ereignisses durchführen können.

Die folgende Beispielpipeline teilt eingehende Ereignisse in zwei Unter-Pipelines auf. Jede Unterpipeline verwendet ihren eigenen Prozessor, um die Daten anzureichern und zu bearbeiten, und sendet die Daten dann an verschiedene Indizes. OpenSearch

version: "2" log-pipeline: source: http: ... sink: - pipeline: name: "logs_enriched_one_pipeline" - pipeline: name: "logs_enriched_two_pipeline" logs_enriched_one_pipeline: source: log-pipeline processor: ... sink: - opensearch: # Provide a domain or collection endpoint # Enable the 'serverless' flag if the sink is an OpenSearch Serverless collection aws: ... index: "enriched_one_logs" logs_enriched_two_pipeline: source: log-pipeline processor: ... sink: - opensearch: # Provide a domain or collection endpoint # Enable the 'serverless' flag if the sink is an OpenSearch Serverless collection aws: ... index: "enriched_two_logs"

Verkettung

Sie können mehrere Sub-Pipelines miteinander verketten, um die Datenverarbeitung und -anreicherung in Abschnitten durchzuführen. Mit anderen Worten, Sie können ein eingehendes Ereignis mit bestimmten Verarbeitungsfunktionen in einer Subpipeline anreichern, es dann zur weiteren Anreicherung mit einem anderen Prozessor an eine andere Subpipeline senden und es schließlich an dessen Senke senden. OpenSearch

Im folgenden Beispiel reichert die log_pipeline Subpipeline ein eingehendes Protokollereignis mit einer Reihe von Prozessoren an und sendet das Ereignis dann an einen Index mit dem Namen. OpenSearch enriched_logs Die Pipeline sendet dasselbe Ereignis an die log_advanced_pipeline Subpipeline, die es verarbeitet und an einen anderen OpenSearch Index mit dem Namen sendet. enriched_advanced_logs

version: "2" log-pipeline: source: http: ... processor: ... sink: - opensearch: # Provide a domain or collection endpoint # Enable the 'serverless' flag if the sink is an OpenSearch Serverless collection aws: ... index: "enriched_logs" - pipeline: name: "log_advanced_pipeline" log_advanced_pipeline: source: log-pipeline processor: ... sink: - opensearch: # Provide a domain or collection endpoint # Enable the 'serverless' flag if the sink is an OpenSearch Serverless collection aws: ... index: "enriched_advanced_logs"

Warteschlangen für unzustellbare Nachrichten

Dead-Letter-Warteschlangen (DLQs) sind Ziele für Ereignisse, bei denen eine Pipeline nicht in eine Senke schreiben kann. In OpenSearch Ingestion müssen Sie einen Amazon S3 S3-Bucket mit entsprechenden Schreibberechtigungen angeben, der als DLQ verwendet werden soll. Sie können jeder Senke innerhalb einer Pipeline eine DLQ-Konfiguration hinzufügen. Wenn eine Pipeline auf Schreibfehler stößt, erstellt sie DLQ-Objekte im konfigurierten S3-Bucket. DLQ-Objekte existieren in einer JSON-Datei als Array von fehlgeschlagenen Ereignissen.

Eine Pipeline schreibt Ereignisse in den DLQ, wenn eine der folgenden Bedingungen erfüllt ist:

  • Die max_retries für die OpenSearch Spüle benötigte Menge ist aufgebraucht. OpenSearch Für diese Option sind mindestens 16 für die Einnahme erforderlich.

  • Ereignisse werden aufgrund eines Fehlers von der Senke zurückgewiesen.

Konfiguration

Um eine Warteschlange mit unerlaubten Briefen für eine Subpipeline zu konfigurieren, geben Sie die dlq Option in der opensearch Senkenkonfiguration an:

apache-log-pipeline: ... sink: opensearch: dlq: s3: bucket: "my-dlq-bucket" key_path_prefix: "dlq-files" region: "us-west-2" sts_role_arn: "arn:aws:iam::123456789012:role/dlq-role"

Dateien, die in diesen S3-DLQ geschrieben werden, haben das folgende Benennungsmuster:

dlq-v${version}-${pipelineName}-${pluginId}-${timestampIso8601}-${uniqueId}

Weitere Informationen finden Sie unter Dead-Letter-Warteschlangen (DLQ).

Anweisungen zur Konfiguration der Rolle finden Sie untersts_role_arn. Schreiben in eine Warteschleife mit unzustellbaren Briefen

Beispiel

Betrachten Sie die folgende Beispiel-DLQ-Datei:

dlq-v2-apache-log-pipeline-opensearch-2023-04-05T15:26:19.152938Z-e7eb675a-f558-4048-8566-dac15a4f8343

Hier ist ein Beispiel für Daten, die nicht in die Senke geschrieben werden konnten und die zur weiteren Analyse an den DLQ S3-Bucket gesendet werden:

Record_0 pluginId "opensearch" pluginName "opensearch" pipelineName "apache-log-pipeline" failedData index "logs" indexId null status 0 message "Number of retries reached the limit of max retries (configured value 15)" document log "sample log" timestamp "2023-04-14T10:36:01.070Z" Record_1 pluginId "opensearch" pluginName "opensearch" pipelineName "apache-log-pipeline" failedData index "logs" indexId null status 0 message "Number of retries reached the limit of max retries (configured value 15)" document log "another sample log" timestamp "2023-04-14T10:36:01.071Z"

Indexverwaltung

Amazon OpenSearch Ingestion bietet viele Funktionen zur Indexverwaltung, darunter die folgenden.

Erstellen von Indizes

Sie können einen Indexnamen in einer Pipeline-Senke angeben, und OpenSearch Ingestion erstellt den Index, wenn die Pipeline bereitgestellt wird. Wenn bereits ein Index vorhanden ist, verwendet die Pipeline ihn, um eingehende Ereignisse zu indizieren. Wenn Sie eine Pipeline beenden und neu starten oder wenn Sie ihre YAML-Konfiguration aktualisieren, versucht die Pipeline, neue Indizes zu erstellen, sofern diese noch nicht vorhanden sind. Eine Pipeline kann niemals einen Index löschen.

Die folgenden Beispielsenken erstellen zwei Indizes, wenn die Pipeline bereitgestellt wird:

sink: - opensearch: index: apache_logs - opensearch: index: nginx_logs

Generieren von Indexnamen und -mustern

Sie können dynamische Indexnamen generieren, indem Sie Variablen aus den Feldern eingehender Ereignisse verwenden. Verwenden Sie in der Senkenkonfiguration das Format, string${} um die Interpolation von Zeichenketten zu signalisieren, und verwenden Sie einen JSON-Zeiger, um Felder aus Ereignissen zu extrahieren. Die Optionen für index_type sind custom oder. management_disabled Da die index_type Standardeinstellung custom für OpenSearch Domänen und management_disabled für OpenSearch serverlose Sammlungen ist, kann sie nicht konfiguriert werden.

Die folgende Pipeline wählt beispielsweise das metadataType Feld aus eingehenden Ereignissen aus, um Indexnamen zu generieren.

pipeline: ... sink: opensearch: index: "metadata-${metadataType}"

Die folgende Konfiguration generiert weiterhin jeden Tag oder jede Stunde einen neuen Index.

pipeline: ... sink: opensearch: index: "metadata-${metadataType}-%{yyyy.MM.dd}" pipeline: ... sink: opensearch: index: "metadata-${metadataType}-%{yyyy.MM.dd.HH}"

Der Indexname kann auch eine einfache Zeichenfolge mit einem Datums-/Uhrzeitmuster als Suffix sein, z. B. my-index-%{yyyy.MM.dd} Wenn die Senke Daten an sendet OpenSearch, ersetzt sie das Datums-/Uhrzeitmuster durch die UTC-Zeit und erstellt für jeden Tag einen neuen Index, z. B. my-index-2022.01.25 Weitere Informationen finden Sie in der DateTimeFormatterKlasse.

Dieser Indexname kann auch eine formatierte Zeichenfolge sein (mit oder ohne ein Datums-/Uhrzeit-Mustersuffix), z. B. my-${index}-name Wenn die Senke Daten an sendet OpenSearch, ersetzt sie den "${index}" Teil durch den Wert in dem Ereignis, das gerade verarbeitet wird. Wenn das Format "${index1/index2/index3}" zutrifft, ersetzt es das Feld index1/index2/index3 durch seinen Wert im Ereignis.

Dokument-IDs werden generiert

Eine Pipeline kann beim Indizieren von Dokumenten eine Dokument-ID generieren. OpenSearch Sie kann diese Dokument-IDs aus den Feldern in eingehenden Ereignissen ableiten.

In diesem Beispiel wird das uuid Feld aus einem eingehenden Ereignis verwendet, um eine Dokument-ID zu generieren.

pipeline: ... sink: opensearch: index_type: custom index: "metadata-${metadataType}-%{yyyy.MM.dd}" document_id_field: "uuid"

Im folgenden Beispiel führt der Prozessor „Einträge hinzufügen“ die Felder uuid other_field aus dem eingehenden Ereignis zusammen, um eine Dokument-ID zu generieren.

Die create Aktion stellt sicher, dass Dokumente mit identischen IDs nicht überschrieben werden. Die Pipeline löscht doppelte Dokumente, ohne dass ein erneuter Versuch oder ein DLQ-Ereignis erforderlich ist. Dies ist für Pipeline-Autoren, die diese Aktion verwenden, durchaus zu erwarten, da das Ziel darin besteht, die Aktualisierung vorhandener Dokumente zu vermeiden.

pipeline: ... processor: - add_entries: entries: - key: "my_doc_id_field" format: "${uuid}-${other_field}" sink: - opensearch: ... action: "create" document_id_field: "my_doc_id_field"

Möglicherweise möchten Sie die Dokument-ID eines Ereignisses auf ein Feld aus einem Unterobjekt festlegen. Im folgenden Beispiel verwendet das OpenSearch Sink-Plug-In das Unterobjekt, um eine Dokument-ID info/id zu generieren.

sink: - opensearch: ... document_id_field: info/id

Bei dem folgenden Ereignis generiert die Pipeline ein Dokument, bei dem das _id Feld wie folgt gesetzt json001 ist:

{ "fieldA":"arbitrary value", "info":{ "id":"json001", "fieldA":"xyz", "fieldB":"def" } }

Routing-IDs werden generiert

Sie können die routing_field Option innerhalb des OpenSearch Sink-Plug-ins verwenden, um den Wert einer Dokument-Routing-Eigenschaft (_routing) auf einen Wert aus einem eingehenden Ereignis festzulegen.

Routing unterstützt die JSON-Zeigersyntax, sodass auch verschachtelte Felder verfügbar sind, nicht nur Felder der obersten Ebene.

sink: - opensearch: ... routing_field: metadata/id document_id_field: id

Bei dem folgenden Ereignis generiert das Plugin ein Dokument, bei dem das _routing Feld auf gesetzt ist: abcd

{ "id":"123", "metadata":{ "id":"abcd", "fieldA":"valueA" }, "fieldB":"valueB" }

Anweisungen zum Erstellen von Indexvorlagen, die Pipelines bei der Indexerstellung verwenden können, finden Sie unter Indexvorlagen.

E-Bestätigung nd-to-end

OpenSearch Die Datenaufnahme gewährleistet die Haltbarkeit und Zuverlässigkeit von Daten, indem deren Übertragung von der Quelle bis zu den Senken in zustandslosen Pipelines mithilfe von Quittierung nachverfolgt wird. end-to-end Derzeit unterstützt nur das S3-Quell-Plugin die Bestätigung. end-to-end

Bei der end-to-end Bestätigung erstellt das Pipeline-Quell-Plugin einen Bestätigungssatz zur Überwachung einer Reihe von Ereignissen. Es erhält eine positive Bestätigung, wenn diese Ereignisse erfolgreich an ihre Senken gesendet wurden, oder eine negative Bestätigung, wenn eines der Ereignisse nicht an ihre Senken gesendet werden konnte.

Im Falle eines Fehlers oder Absturzes einer Pipeline-Komponente oder wenn eine Quelle keine Bestätigung erhält, läuft die Quelle ab und ergreift die erforderlichen Maßnahmen, wie z. B. einen erneuten Versuch oder die Protokollierung des Fehlers. Wenn für die Pipeline mehrere Senken oder mehrere Sub-Pipelines konfiguriert sind, werden Bestätigungen auf Ereignisebene erst gesendet, nachdem das Ereignis an alle Senken in allen Unter-Pipelines gesendet wurde. Wenn für eine Senke ein DLQ konfiguriert ist, verfolgt die Bestätigungsfunktion auch Ereignisse, die in den DLQ geschrieben wurden. end-to-end

Um die end-to-end Bestätigung zu aktivieren, fügen Sie die Option in die acknowledgments Quellkonfiguration ein:

s3-pipeline: source: s3: acknowledgments: true ...

Gegendruck an der Quelle

In einer Pipeline kann es zu Gegendruck kommen, wenn sie mit der Verarbeitung von Daten beschäftigt ist oder wenn ihre Senken vorübergehend ausgefallen sind oder nur langsam Daten aufnehmen. OpenSearch Je nachdem, welches Quell-Plugin eine Pipeline verwendet, gibt es bei der Datenaufnahme unterschiedliche Methoden, mit Gegendruck umzugehen.

HTTP-Quelle

Pipelines, die das HTTP-Quell-Plugin verwenden, behandeln Gegendruck unterschiedlich, je nachdem, welche Pipeline-Komponente überlastet ist:

  • Puffer — Wenn die Puffer voll sind, gibt die Pipeline den HTTP-Status REQUEST_TIMEOUT mit dem Fehlercode 408 zurück zum Quellendpunkt. Sobald Puffer freigegeben werden, beginnt die Pipeline erneut mit der Verarbeitung von HTTP-Ereignissen.

  • Quell-Threads — Wenn alle HTTP-Quell-Threads mit der Ausführung von Anfragen beschäftigt sind und die Größe der Warteschlange für unbearbeitete Anfragen die maximal zulässige Anzahl von Anfragen überschritten hat, gibt die Pipeline den HTTP-Status TOO_MANY_REQUESTS mit dem Fehlercode 429 zurück an den Quellendpunkt. Wenn die Anforderungswarteschlange die maximal zulässige Warteschlangengröße unterschreitet, beginnt die Pipeline erneut mit der Verarbeitung der Anfragen.

Die Quelle

Wenn die Puffer für Pipelines, die OpenTelemetry Quellen verwenden (OTel-Logs, oTel-Metriken und oTel-Trace), voll sind, beginnt die Pipeline, den HTTP-Status REQUEST_TIMEOUT mit dem Fehlercode 408 an den Quellendpunkt zurückzugeben. Sobald Puffer freigegeben werden, beginnt die Pipeline erneut mit der Verarbeitung von Ereignissen.

S3-Quelle

Wenn die Puffer für Pipelines mit einer S3-Quelle voll sind, beenden die Pipelines die Verarbeitung von SQS-Benachrichtigungen. Sobald die Puffer freigegeben sind, beginnen die Pipelines wieder mit der Verarbeitung von Benachrichtigungen.

Wenn eine Senke ausgefallen ist oder keine Daten aufnehmen kann und die end-to-end Bestätigung für die Quelle aktiviert ist, stoppt die Pipeline die Verarbeitung von SQS-Benachrichtigungen, bis sie eine erfolgreiche Bestätigung von allen Senken erhält.