Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.
Aufrufen von DAGs mit einer Lambda-Funktion
Das folgende Codebeispiel verwendet eine AWS LambdaFunktion, um ein Apache Airflow-CLI-Token abzurufen und einen gerichteten azyklischen Graphen (DAG) in einer Amazon MWAA-Umgebung aufzurufen.
Version
-
Sie können das Codebeispiel auf dieser Seite mit Apache Airflow v2 und höher in Python 3.10
verwenden.
Voraussetzungen
Um dieses Codebeispiel zu verwenden, müssen Sie:
-
Verwenden Sie den öffentlichen Netzwerkzugriffsmodus für Ihre Amazon MWAA-Umgebung.
-
Verwenden Sie eine Lambda-Funktion, die die neueste Python-Laufzeit verwendet.
Anmerkung
Wenn sich die Lambda-Funktion und Ihre Amazon MWAA-Umgebung in derselben VPC befinden, können Sie diesen Code in einem privaten Netzwerk verwenden. Für diese Konfiguration benötigt die Ausführungsrolle der Lambda-Funktion die Erlaubnis, den CreateNetworkInterface API-Vorgang Amazon Elastic Compute Cloud (Amazon EC2) aufzurufen. Sie können diese Berechtigung mithilfe der AWSLambdaVPCAccessExecutionRole
Berechtigungen
Um das Codebeispiel auf dieser Seite verwenden zu können, benötigt die Ausführungsrolle Ihrer Amazon MWAA-Umgebung Zugriff, um die airflow:CreateCliToken
Aktion auszuführen. Sie können diese Berechtigung mithilfe der AmazonMWAAAirflowCliAccess
AWS verwalteten Richtlinie erteilen:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "airflow:CreateCliToken" ], "Resource": "*" } ] }
Weitere Informationen finden Sie unter Apache Airflow CLI-Richtlinie: AirflowCli AmazonMWAA Access.
Abhängigkeiten
-
Um dieses Codebeispiel mit Apache Airflow v2 zu verwenden, sind keine zusätzlichen Abhängigkeiten erforderlich. Der Code verwendet die Apache Airflow v2-Basisinstallation
in Ihrer Umgebung.
Codebeispiel
-
Öffnen Sie die AWS Lambda Konsole unter https://console.aws.amazon.com/lambda/
. -
Wählen Sie Ihre Lambda-Funktion aus der Funktionsliste aus.
-
Kopieren Sie auf der Funktionsseite den folgenden Code und ersetzen Sie ihn durch die Namen Ihrer Ressourcen:
-
YOUR_ENVIRONMENT_NAME
— Der Name Ihrer Amazon MWAA-Umgebung. -
YOUR_DAG_NAME
— Der Name der DAG, die Sie aufrufen möchten.
import boto3 import http.client import base64 import ast mwaa_env_name = 'YOUR_ENVIRONMENT_NAME' dag_name = 'YOUR_DAG_NAME' mwaa_cli_command = 'dags trigger' client = boto3.client('mwaa') def lambda_handler(event, context): # get web token mwaa_cli_token = client.create_cli_token( Name=mwaa_env_name ) conn = http.client.HTTPSConnection(mwaa_cli_token['WebServerHostname']) payload = mwaa_cli_command + " " + dag_name headers = { 'Authorization': 'Bearer ' + mwaa_cli_token['CliToken'], 'Content-Type': 'text/plain' } conn.request("POST", "/aws_mwaa/cli", payload, headers) res = conn.getresponse() data = res.read() dict_str = data.decode("UTF-8") mydata = ast.literal_eval(dict_str) return base64.b64decode(mydata['stdout'])
-
-
Wählen Sie Bereitstellen.
-
Wählen Sie Test, um Ihre Funktion mit der Lambda-Konsole aufzurufen.
-
Um zu überprüfen, ob Ihr Lambda Ihre DAG erfolgreich aufgerufen hat, verwenden Sie die Amazon MWAA-Konsole, um zur Apache Airflow-Benutzeroberfläche Ihrer Umgebung zu navigieren, und gehen Sie dann wie folgt vor:
-
Suchen Sie auf der DAG-Seite Ihre neue Ziel-DAG in der Liste der DAGs.
-
Überprüfen Sie unter Letzte Ausführung den Zeitstempel für die letzte DAG-Ausführung. Dieser Zeitstempel sollte genau mit dem letzten Zeitstempel für Ihre andere
invoke_dag
Umgebung übereinstimmen. -
Überprüfen Sie unter Letzte Aufgaben, ob die letzte Ausführung erfolgreich war.
-