Vorverarbeitung von Daten mithilfe einer Lambda-Funktion - Amazon-Kinesis-Data-Analytics für SQL-Anwendungen

Für neue Projekte empfehlen wir, den neuen Managed Service für Apache Flink Studio anstelle von Kinesis-Data-Analytics-for-SQL-Anwendungen zu verwenden. Der Managed Service für Apache Flink Studio kombiniert Benutzerfreundlichkeit mit fortschrittlichen Analysefunktionen, sodass Sie in wenigen Minuten anspruchsvolle Anwendungen zur Stream-Verarbeitung erstellen können.

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Vorverarbeitung von Daten mithilfe einer Lambda-Funktion

Anmerkung

Nach dem 12. September 2023 können Sie keine neuen Anwendungen mit Kinesis Data Firehose als Quelle erstellen, wenn Sie Kinesis Data Analytics für SQL nicht bereits verwenden. Weitere Informationen finden Sie unter Limits.

Wenn die Daten in Ihrem Stream Formatkonvertierung, Transformation, Anreicherung oder Filterung erfordern, können Sie die Daten mithilfe einer - AWS Lambda Funktion vorverarbeiten. Sie können dies vor der Ausführung von SQL-Code in der Anwendung tun, oder bevor die Anwendung ein Schema aus Ihrem Datenstrom erstellt.

Die Verwendung einer Lambda-Funktion für die Vorverarbeitung von Datensätzen ist in den folgenden Fällen nützlich:

  • Umwandeln von Datensätzen aus anderen Formaten (z. B. KPL oder GZIP) in Formate, die Kinesis Data Analytics analysieren kann. Kinesis Data Analytics unterstützt derzeit JSON- oder CSV-Datenformate.

  • Erweitern von Daten in ein Format, das für Operationen wie beispielsweise Aggregation oder Entdeckung von Anomalien leichter zugänglich ist. Wenn z. B. mehrere Datenwerte zusammen in einer Zeichenfolge gespeichert werden, können Sie die Daten in separate Spalten erweitern.

  • Die Datenanreicherung mit anderen Amazon Services, wie z. B. Extrapolation oder Fehlerkorrektur.

  • Anwenden einer komplexen Zeichenfolgentransformation auf Datensatzfelder.

  • Datenfilterung für die Bereinigung der Daten.

Verwenden einer Lambda-Funktion für die Vorverarbeitung von Datensätzen

Wenn Sie Ihre Kinesis Data Analytics-Anwendung erstellen, aktivieren Sie die Lambda-Vorverarbeitung auf der Seite Mit einer Quelle verbinden.

Verwendung einer Lambda-Funktion für die Vorverarbeitung von Datensätzen in einer Kinesis Data Analytics-Anwendung
  1. Melden Sie sich bei der an AWS Management Console und öffnen Sie die Managed Service für Apache Flink-Konsole unter https://console.aws.amazon.com/kinesisanalytics.

  2. Wählen Sie auf der Seite Mit einer Quelle verbinden für Ihre Anwendung die Option Aktiviert im Bereich Datensatzvorverarbeitung mit AWS Lambda.

  3. Um eine bereits von Ihnen erstellte Lambda-Funktion zu verwenden, wählen Sie die Funktion aus der Dropdown-Liste Lambda-Funktion aus.

  4. Um eine neue Lambda-Funktion aus einer der Lambda-Vorverarbeitungsvorlagen zu erstellen, wählen Sie die Vorlage aus der Dropdown-Liste aus. Klicken Sie dann auf View <template name> in Lambda (<Vorlagenname> in Lambda anzeigen), um die Funktion zu bearbeiten.

  5. Um eine neue Lambda-Funktion zu erstellen, wählen Sie Neu erstellen. Informationen zum Erstellen einer Lambda-Funktion finden Sie unter Erstellen einer HelloWorld Lambda-Funktion und Erkunden der Konsole im AWS Lambda -Entwicklerhandbuch.

  6. Wählen Sie die Version der zu verwendenden Lambda-Funktion aus, die genutzt werden soll. Um die neueste Version zu verwenden, wählen Sie $LATEST.

Wenn Sie eine Lambda-Funktion für die Datensatz-Vorverarbeitung auswählen oder erstellen, werden die Datensätze vorverarbeitet, bevor der SQL-Code Ihrer Anwendung ausgeführt wird oder Ihre Anwendung ein Schema aus den Datensätzen erstellt.

Lambda-Vorverarbeitungsberechtigungen

Zur Verwendung der Lambda-Vorverarbeitung benötigt die IAM-Rolle der Anwendung die folgende Berechtigungsrichtlinie:

{ "Sid": "UseLambdaFunction", "Effect": "Allow", "Action": [ "lambda:InvokeFunction", "lambda:GetFunctionConfiguration" ], "Resource": "<FunctionARN>" }

Lambda-Vorverarbeitungsmetriken

Sie können Amazon verwenden, CloudWatch um die Anzahl der Lambda-Aufrufe, verarbeiteten Bytes, Erfolge und Fehler usw. zu überwachen. Informationen zu CloudWatch Metriken, die von der Lambda-Vorverarbeitung von Kinesis Data Analytics ausgegeben werden, finden Sie unter Amazon Kinesis Analytics-Metriken.

Verwenden von AWS Lambda mit der Kinesis Producer Library

Die Kinesis Producer Library (KPL) aggregiert kleine vom Benutzer formatierte Datensätze in größere Datensätze von bis zu 1 MB, um den Durchsatz von Amazon Kinesis Data Streams zu optimieren. Die Kinesis Client Library (KCL) für Java unterstützt eine Disaggregation dieser Datensätze. Sie müssen jedoch ein spezielles Modul verwenden, um die Datensätze zu deaggregieren, wenn Sie AWS Lambda als Verbraucher Ihrer Streams verwenden.

Den erforderlichen Projektcode und Anweisungen finden Sie in den Kinesis Producer Library Deaggregation Modules for AWS Lambda auf GitHub. Sie können die Komponenten in diesem Projekt verwenden, um serialisierte KPL-Daten in AWS Lambda in Java, Node.js und Python zu verarbeiten. Die Komponenten können auch als Teil einer mehrsprachigen KCL-Anwendung verwendet werden.

Ereignis-Eingabedatenmodell/Datensatz-Antwortmodell der Datenvorverarbeitung

Zur Vorverarbeitung von Datensätzen muss Ihre Lambda-Funktion mit den benötigten Ereignis-Eingabedaten und Datensatz-Antwortmodellen konform sein.

Ereignis-Eingabedatenmodell

Kinesis Data Analytics liest kontinuierlich Daten aus Ihrem Kinesis-Datenstrom oder Firehose-Bereitstellungsdatenstrom. Für jeden abgerufenen Stapel von Datensätzen verwaltet der Service, wie jeder Stapel an Ihre Lambda-Funktion übergeben wird. Die Funktion empfängt eine Liste der Datensätze als Eingabe. Innerhalb Ihrer Funktion durchlaufen Sie die Liste und wenden Ihre Geschäftslogik an, um Ihre Vorverarbeitungsanforderungen (wie z. B. Datenformatkonvertierung oder Anreicherung) zu erfüllen.

Das Eingabemodell für Ihre Vorverarbeitungsfunktion variiert geringfügig, je nachdem, ob die Daten aus einem Kinesis-Datenstrom oder einem Firehose-Bereitstellungs-Stream empfangen wurden.

Wenn die Quelle ein Firehose-Bereitstellungs-Stream ist, lautet das Ereigniseingabedatenmodell wie folgt:

Kinesis Data Firehose-Anforderungsdatenmodell

Feld Beschreibung
invocationId Die Lambda-Aufrufs-ID (zufällige GUID).
applicationArn Der Amazon-Ressourcenname (ARN) der Kinesis Data Analytics-Anwendung
streamArn ARN des Bereitstellungs-Streams
Datensätze
Feld Beschreibung
recordId Datensatz-ID (zufällige GUID)
kinesisFirehoseRecordMetadata
Feld Beschreibung
approximateArrivalTimestamp Ungefähre Ankunftszeit des Datensatzes des Bereitstellungs-Streams
data Base64-kodierte Quell-Datensatz-Nutzlast

Das folgende Beispiel zeigt die Eingabe aus einem Firehose-Bereitstellungs-Stream:

{ "invocationId":"00540a87-5050-496a-84e4-e7d92bbaf5e2", "applicationArn":"arn:aws:kinesisanalytics:us-east-1:12345678911:application/lambda-test", "streamArn":"arn:aws:firehose:us-east-1:AAAAAAAAAAAA:deliverystream/lambda-test", "records":[ { "recordId":"49572672223665514422805246926656954630972486059535892482", "data":"aGVsbG8gd29ybGQ=", "kinesisFirehoseRecordMetadata":{ "approximateArrivalTimestamp":1520280173 } } ] }

Wenn die Quelle ein Kinesis-Datenstrom ist, ist dies das Ereignis-Eingabemodell:

Kinesis-Streams-Anforderungsdatenmodell

Feld Beschreibung
invocationId Die Lambda-Aufrufs-ID (zufällige GUID).
applicationArn ARN der Kinesis Data Analytics-Anwendung
streamArn ARN des Bereitstellungs-Streams
Datensätze
Feld Beschreibung
recordId Datensatz-ID basierend auf Kinesis-Datensatz-Sequenznummer
kinesisStreamRecordMetadata
Feld Beschreibung
sequenceNumber Sequenznummern aus dem Kinesis-Stream-Datensatz
partitionKey Partitionsschlüssel aus dem Kinesis-Stream-Datensatz
shardId ShardId aus dem Kinesis-Stream-Datensatz
approximateArrivalTimestamp Ungefähre Ankunftszeit des Datensatzes des Bereitstellungs-Streams
data Base64-kodierte Quell-Datensatz-Nutzlast

Das folgende Beispiel zeigt die Eingabe aus einem Kinesis-Daten-Stream:

{ "invocationId": "00540a87-5050-496a-84e4-e7d92bbaf5e2", "applicationArn": "arn:aws:kinesisanalytics:us-east-1:12345678911:application/lambda-test", "streamArn": "arn:aws:kinesis:us-east-1:AAAAAAAAAAAA:stream/lambda-test", "records": [ { "recordId": "49572672223665514422805246926656954630972486059535892482", "data": "aGVsbG8gd29ybGQ=", "kinesisStreamRecordMetadata":{ "shardId" :"shardId-000000000003", "partitionKey":"7400791606", "sequenceNumber":"49572672223665514422805246926656954630972486059535892482", "approximateArrivalTimestamp":1520280173 } } ] }

Datensatz-Antwortmodell

Alle von Ihrer Lambda-Vorverarbeitungsfunktion (mit Datensatz-IDs) zurückgegebenen Datensätze, die an die Lambda-Funktion gesendet werden, müssen zurückgegeben werden. Sie müssen die folgenden Parameter enthalten. Andernfalls weist Kinesis Data Analytics sie zurück und behandelt sie wie Vorverarbeitungsfehler. Die Datennutzlast des Datensatzes kann entsprechend den Vorverarbeitungsanforderungen umgewandelt werden.

Antwortdatenmodell

Datensätze
Feld Beschreibung
recordId Die Datensatz-ID wird während des Aufrufs von Kinesis Data Analytics an Lambda übertragen. Der transformierte Datensatz muss dieselbe Datensatz-ID enthalten. Jede fehlende Übereinstimmung zwischen der ID des ursprünglichen Datensatzes und der ID des transformierten Datensatzes wird als Datenvorverarbeitungsfehler behandelt.
result Der Status der Datentransformation des Datensatzes. Die möglichen Werte sind:
  • Ok: Der Datensatz wurde erfolgreich umgewandelt. Kinesis Data Analytics nimmt den Datensatz für die SQL-Verarbeitung auf.

  • Dropped: Der Datensatz wurde absichtlich von Ihrer Verarbeitungs-Logik herausgenommen. Kinesis Data Analytics nimmt den Datensatz aus der SQL-Verarbeitung. Das Feld der Datennutzlast ist für einen Dropped-Datensatz optional.

  • ProcessingFailed: Der Datensatz konnte nicht umgewandelt werden. Kinesis Data Analytics sieht die Verarbeitung durch Ihre Lambda-Funktion als nicht erfolgreich an und schreibt einen Fehler in den Fehler-Stream. Weitere Informationen zum Fehler-Stream finden Sie unter Fehlerbehandlung. Das Feld der Datennutzlast ist für einen ProcessingFailed-Datensatz optional.

data Die transformierte Datennutzlast nach der base64-Kodierung. Jede Datennutzlast kann mehrere JSON-Dokumente enthalten, wenn die Anwendung das JSON-Upload-Datenformat verwendet. Oder es können mehrere CSV-Zeilen (mit einem Zeilentrennzeichen in jede Zeile) enthalten sein, wenn die Anwendung das CSV-Upload-Datenformat verwendet. Der Kinesis Data Analytics-Service analysiert und verarbeitet Daten, die entweder aus mehreren JSON-Dokumenten bestehen oder mehrere CSV-Zeilen enthalten, innerhalb derselben Datennutzlast erfolgreich.

Das folgende Beispiel zeigt die Ausgabe einer Lambda-Funktion:

{ "records": [ { "recordId": "49572672223665514422805246926656954630972486059535892482", "result": "Ok", "data": "SEVMTE8gV09STEQ=" } ] }

Geläufige Datenverarbeitungsfehler

Dies sind die häufigsten Gründe, weshalb eine Vorverarbeitung fehlschlagen kann.

  • Nicht alle Datensätze (mit Datensatz-IDs) in einem Stapel, die zur Lambda-Funktion gesendet wurden, werden an den Kinesis Data Analytics-Service zurückgegeben.

  • In der Antwort fehlt entweder die Datensatz-ID, der Status oder das Feld der Datennutzlast. Das Feld der Datennutzlast ist für einen Dropped- oder ProcessingFailed-Datensatz optional.

  • Die Zeitüberschreitungen der Lambda-Funktion reichen nicht zur Vorverarbeitung der Daten aus.

  • Die Antwort der Lambda-Funktion überschreitet die durch den AWS Lambda -Service auferlegten Antwort-Limits.

Im Falle von Fehlern bei der Datenvorverarbeitung führt Kinesis Data Analytics Lambda-Aufrufe auf derselben Gruppe von Datensätzen erneut durch, bis der Aufruf erfolgreich ist. Sie können die folgenden CloudWatch Metriken überwachen, um Einblicke in Fehler zu erhalten.

  • Kinesis Data Analytics-Anwendung MillisBehindLatest: Gibt an, mit welcher zeitlichen Differenz eine Anwendung aus der Streaming-Quelle liest.

  • InputPreprocessing CloudWatch Anwendungsmetriken von Kinesis Data Analytics: Gibt unter anderem die Anzahl der Erfolge und Fehler an. Weitere Informationen finden Sie unter Amazon Kinesis Analytics-Metriken.

  • AWS Lambda - CloudWatch Funktionsmetriken und -protokolle.