Für neue Projekte empfehlen wir, den neuen Managed Service für Apache Flink Studio anstelle von Kinesis-Data-Analytics-for-SQL-Anwendungen zu verwenden. Der Managed Service für Apache Flink Studio kombiniert Benutzerfreundlichkeit mit fortschrittlichen Analysefunktionen, sodass Sie in wenigen Minuten anspruchsvolle Anwendungen zur Stream-Verarbeitung erstellen können.
Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.
Vorverarbeitung von Daten mithilfe einer Lambda-Funktion
Anmerkung
Nach dem 12. September 2023 können Sie keine neuen Anwendungen mit Kinesis Data Firehose als Quelle erstellen, wenn Sie Kinesis Data Analytics für SQL nicht bereits verwenden. Weitere Informationen finden Sie unter Limits.
Wenn die Daten in Ihrem Stream Formatkonvertierung, Transformation, Anreicherung oder Filterung erfordern, können Sie die Daten mithilfe einer - AWS Lambda Funktion vorverarbeiten. Sie können dies vor der Ausführung von SQL-Code in der Anwendung tun, oder bevor die Anwendung ein Schema aus Ihrem Datenstrom erstellt.
Die Verwendung einer Lambda-Funktion für die Vorverarbeitung von Datensätzen ist in den folgenden Fällen nützlich:
-
Umwandeln von Datensätzen aus anderen Formaten (z. B. KPL oder GZIP) in Formate, die Kinesis Data Analytics analysieren kann. Kinesis Data Analytics unterstützt derzeit JSON- oder CSV-Datenformate.
-
Erweitern von Daten in ein Format, das für Operationen wie beispielsweise Aggregation oder Entdeckung von Anomalien leichter zugänglich ist. Wenn z. B. mehrere Datenwerte zusammen in einer Zeichenfolge gespeichert werden, können Sie die Daten in separate Spalten erweitern.
-
Die Datenanreicherung mit anderen Amazon Services, wie z. B. Extrapolation oder Fehlerkorrektur.
-
Anwenden einer komplexen Zeichenfolgentransformation auf Datensatzfelder.
-
Datenfilterung für die Bereinigung der Daten.
Verwenden einer Lambda-Funktion für die Vorverarbeitung von Datensätzen
Wenn Sie Ihre Kinesis Data Analytics-Anwendung erstellen, aktivieren Sie die Lambda-Vorverarbeitung auf der Seite Mit einer Quelle verbinden.
Verwendung einer Lambda-Funktion für die Vorverarbeitung von Datensätzen in einer Kinesis Data Analytics-Anwendung
Melden Sie sich bei der an AWS Management Console und öffnen Sie die Managed Service für Apache Flink-Konsole unter https://console.aws.amazon.com/kinesisanalytics
. -
Wählen Sie auf der Seite Mit einer Quelle verbinden für Ihre Anwendung die Option Aktiviert im Bereich Datensatzvorverarbeitung mit AWS Lambda.
-
Um eine bereits von Ihnen erstellte Lambda-Funktion zu verwenden, wählen Sie die Funktion aus der Dropdown-Liste Lambda-Funktion aus.
-
Um eine neue Lambda-Funktion aus einer der Lambda-Vorverarbeitungsvorlagen zu erstellen, wählen Sie die Vorlage aus der Dropdown-Liste aus. Klicken Sie dann auf View <template name> in Lambda (<Vorlagenname> in Lambda anzeigen), um die Funktion zu bearbeiten.
-
Um eine neue Lambda-Funktion zu erstellen, wählen Sie Neu erstellen. Informationen zum Erstellen einer Lambda-Funktion finden Sie unter Erstellen einer HelloWorld Lambda-Funktion und Erkunden der Konsole im AWS Lambda -Entwicklerhandbuch.
-
Wählen Sie die Version der zu verwendenden Lambda-Funktion aus, die genutzt werden soll. Um die neueste Version zu verwenden, wählen Sie $LATEST.
Wenn Sie eine Lambda-Funktion für die Datensatz-Vorverarbeitung auswählen oder erstellen, werden die Datensätze vorverarbeitet, bevor der SQL-Code Ihrer Anwendung ausgeführt wird oder Ihre Anwendung ein Schema aus den Datensätzen erstellt.
Lambda-Vorverarbeitungsberechtigungen
Zur Verwendung der Lambda-Vorverarbeitung benötigt die IAM-Rolle der Anwendung die folgende Berechtigungsrichtlinie:
{ "Sid": "UseLambdaFunction", "Effect": "Allow", "Action": [ "lambda:InvokeFunction", "lambda:GetFunctionConfiguration" ], "Resource": "<FunctionARN>" }
Lambda-Vorverarbeitungsmetriken
Sie können Amazon verwenden, CloudWatch um die Anzahl der Lambda-Aufrufe, verarbeiteten Bytes, Erfolge und Fehler usw. zu überwachen. Informationen zu CloudWatch Metriken, die von der Lambda-Vorverarbeitung von Kinesis Data Analytics ausgegeben werden, finden Sie unter Amazon Kinesis Analytics-Metriken.
Verwenden von AWS Lambda mit der Kinesis Producer Library
Die Kinesis Producer Library (KPL) aggregiert kleine vom Benutzer formatierte Datensätze in größere Datensätze von bis zu 1 MB, um den Durchsatz von Amazon Kinesis Data Streams zu optimieren. Die Kinesis Client Library (KCL) für Java unterstützt eine Disaggregation dieser Datensätze. Sie müssen jedoch ein spezielles Modul verwenden, um die Datensätze zu deaggregieren, wenn Sie AWS Lambda als Verbraucher Ihrer Streams verwenden.
Den erforderlichen Projektcode und Anweisungen finden Sie in den Kinesis Producer Library Deaggregation Modules for AWS Lambda
Ereignis-Eingabedatenmodell/Datensatz-Antwortmodell der Datenvorverarbeitung
Zur Vorverarbeitung von Datensätzen muss Ihre Lambda-Funktion mit den benötigten Ereignis-Eingabedaten und Datensatz-Antwortmodellen konform sein.
Ereignis-Eingabedatenmodell
Kinesis Data Analytics liest kontinuierlich Daten aus Ihrem Kinesis-Datenstrom oder Firehose-Bereitstellungsdatenstrom. Für jeden abgerufenen Stapel von Datensätzen verwaltet der Service, wie jeder Stapel an Ihre Lambda-Funktion übergeben wird. Die Funktion empfängt eine Liste der Datensätze als Eingabe. Innerhalb Ihrer Funktion durchlaufen Sie die Liste und wenden Ihre Geschäftslogik an, um Ihre Vorverarbeitungsanforderungen (wie z. B. Datenformatkonvertierung oder Anreicherung) zu erfüllen.
Das Eingabemodell für Ihre Vorverarbeitungsfunktion variiert geringfügig, je nachdem, ob die Daten aus einem Kinesis-Datenstrom oder einem Firehose-Bereitstellungs-Stream empfangen wurden.
Wenn die Quelle ein Firehose-Bereitstellungs-Stream ist, lautet das Ereigniseingabedatenmodell wie folgt:
Kinesis Data Firehose-Anforderungsdatenmodell
Feld | Beschreibung | ||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|
invocationId |
Die Lambda-Aufrufs-ID (zufällige GUID). | ||||||||||||
applicationArn |
Der Amazon-Ressourcenname (ARN) der Kinesis Data Analytics-Anwendung | ||||||||||||
streamArn |
ARN des Bereitstellungs-Streams | ||||||||||||
Datensätze
|
Das folgende Beispiel zeigt die Eingabe aus einem Firehose-Bereitstellungs-Stream:
{ "invocationId":"00540a87-5050-496a-84e4-e7d92bbaf5e2", "applicationArn":"arn:aws:kinesisanalytics:us-east-1:12345678911:application/lambda-test", "streamArn":"arn:aws:firehose:us-east-1:AAAAAAAAAAAA:deliverystream/lambda-test", "records":[ { "recordId":"49572672223665514422805246926656954630972486059535892482", "data":"aGVsbG8gd29ybGQ=", "kinesisFirehoseRecordMetadata":{ "approximateArrivalTimestamp":1520280173 } } ] }
Wenn die Quelle ein Kinesis-Datenstrom ist, ist dies das Ereignis-Eingabemodell:
Kinesis-Streams-Anforderungsdatenmodell
Feld | Beschreibung | ||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
invocationId |
Die Lambda-Aufrufs-ID (zufällige GUID). | ||||||||||||||||||
applicationArn |
ARN der Kinesis Data Analytics-Anwendung | ||||||||||||||||||
streamArn |
ARN des Bereitstellungs-Streams | ||||||||||||||||||
Datensätze
|
Das folgende Beispiel zeigt die Eingabe aus einem Kinesis-Daten-Stream:
{ "invocationId": "00540a87-5050-496a-84e4-e7d92bbaf5e2", "applicationArn": "arn:aws:kinesisanalytics:us-east-1:12345678911:application/lambda-test", "streamArn": "arn:aws:kinesis:us-east-1:AAAAAAAAAAAA:stream/lambda-test", "records": [ { "recordId": "49572672223665514422805246926656954630972486059535892482", "data": "aGVsbG8gd29ybGQ=", "kinesisStreamRecordMetadata":{ "shardId" :"shardId-000000000003", "partitionKey":"7400791606", "sequenceNumber":"49572672223665514422805246926656954630972486059535892482", "approximateArrivalTimestamp":1520280173 } } ] }
Datensatz-Antwortmodell
Alle von Ihrer Lambda-Vorverarbeitungsfunktion (mit Datensatz-IDs) zurückgegebenen Datensätze, die an die Lambda-Funktion gesendet werden, müssen zurückgegeben werden. Sie müssen die folgenden Parameter enthalten. Andernfalls weist Kinesis Data Analytics sie zurück und behandelt sie wie Vorverarbeitungsfehler. Die Datennutzlast des Datensatzes kann entsprechend den Vorverarbeitungsanforderungen umgewandelt werden.
Antwortdatenmodell
Datensätze
|
Das folgende Beispiel zeigt die Ausgabe einer Lambda-Funktion:
{ "records": [ { "recordId": "49572672223665514422805246926656954630972486059535892482", "result": "Ok", "data": "SEVMTE8gV09STEQ=" } ] }
Geläufige Datenverarbeitungsfehler
Dies sind die häufigsten Gründe, weshalb eine Vorverarbeitung fehlschlagen kann.
-
Nicht alle Datensätze (mit Datensatz-IDs) in einem Stapel, die zur Lambda-Funktion gesendet wurden, werden an den Kinesis Data Analytics-Service zurückgegeben.
-
In der Antwort fehlt entweder die Datensatz-ID, der Status oder das Feld der Datennutzlast. Das Feld der Datennutzlast ist für einen
Dropped
- oderProcessingFailed
-Datensatz optional. -
Die Zeitüberschreitungen der Lambda-Funktion reichen nicht zur Vorverarbeitung der Daten aus.
-
Die Antwort der Lambda-Funktion überschreitet die durch den AWS Lambda -Service auferlegten Antwort-Limits.
Im Falle von Fehlern bei der Datenvorverarbeitung führt Kinesis Data Analytics Lambda-Aufrufe auf derselben Gruppe von Datensätzen erneut durch, bis der Aufruf erfolgreich ist. Sie können die folgenden CloudWatch Metriken überwachen, um Einblicke in Fehler zu erhalten.
-
Kinesis Data Analytics-Anwendung
MillisBehindLatest
: Gibt an, mit welcher zeitlichen Differenz eine Anwendung aus der Streaming-Quelle liest. -
InputPreprocessing
CloudWatch Anwendungsmetriken von Kinesis Data Analytics: Gibt unter anderem die Anzahl der Erfolge und Fehler an. Weitere Informationen finden Sie unter Amazon Kinesis Analytics-Metriken. -
AWS Lambda - CloudWatch Funktionsmetriken und -protokolle.