Aufrufen von DAGs mit einer Lambda-Funktion - Amazon Managed Workflows für Apache Airflow

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Aufrufen von DAGs mit einer Lambda-Funktion

Das folgende Codebeispiel verwendet eine AWS LambdaFunktion, um ein Apache Airflow-CLI-Token abzurufen und einen gerichteten azyklischen Graphen (DAG) in einer Amazon MWAA-Umgebung aufzurufen.

Version

  • Sie können das Codebeispiel auf dieser Seite mit Apache Airflow v2 und höher in Python 3.10 verwenden.

Voraussetzungen

Um dieses Codebeispiel zu verwenden, müssen Sie:

Anmerkung

Wenn sich die Lambda-Funktion und Ihre Amazon MWAA-Umgebung in derselben VPC befinden, können Sie diesen Code in einem privaten Netzwerk verwenden. Für diese Konfiguration benötigt die Ausführungsrolle der Lambda-Funktion die Erlaubnis, den CreateNetworkInterface API-Vorgang Amazon Elastic Compute Cloud (Amazon EC2) aufzurufen. Sie können diese Berechtigung mithilfe der AWSLambdaVPCAccessExecutionRole AWS verwalteten Richtlinie erteilen.

Berechtigungen

Um das Codebeispiel auf dieser Seite verwenden zu können, benötigt die Ausführungsrolle Ihrer Amazon MWAA-Umgebung Zugriff, um die airflow:CreateCliToken Aktion auszuführen. Sie können diese Berechtigung mithilfe der AmazonMWAAAirflowCliAccess AWS verwalteten Richtlinie erteilen:

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "airflow:CreateCliToken" ], "Resource": "*" } ] }

Weitere Informationen finden Sie unter Apache Airflow CLI-Richtlinie: AirflowCli AmazonMWAA Access.

Abhängigkeiten

  • Um dieses Codebeispiel mit Apache Airflow v2 zu verwenden, sind keine zusätzlichen Abhängigkeiten erforderlich. Der Code verwendet die Apache Airflow v2-Basisinstallation in Ihrer Umgebung.

Codebeispiel

  1. Öffnen Sie die AWS Lambda Konsole unter https://console.aws.amazon.com/lambda/.

  2. Wählen Sie Ihre Lambda-Funktion aus der Funktionsliste aus.

  3. Kopieren Sie auf der Funktionsseite den folgenden Code und ersetzen Sie ihn durch die Namen Ihrer Ressourcen:

    • YOUR_ENVIRONMENT_NAME— Der Name Ihrer Amazon MWAA-Umgebung.

    • YOUR_DAG_NAME— Der Name der DAG, die Sie aufrufen möchten.

    import boto3 import http.client import base64 import ast mwaa_env_name = 'YOUR_ENVIRONMENT_NAME' dag_name = 'YOUR_DAG_NAME' mwaa_cli_command = 'dags trigger' ​ client = boto3.client('mwaa') ​ def lambda_handler(event, context): # get web token mwaa_cli_token = client.create_cli_token( Name=mwaa_env_name ) conn = http.client.HTTPSConnection(mwaa_cli_token['WebServerHostname']) payload = mwaa_cli_command + " " + dag_name headers = { 'Authorization': 'Bearer ' + mwaa_cli_token['CliToken'], 'Content-Type': 'text/plain' } conn.request("POST", "/aws_mwaa/cli", payload, headers) res = conn.getresponse() data = res.read() dict_str = data.decode("UTF-8") mydata = ast.literal_eval(dict_str) return base64.b64decode(mydata['stdout'])
  4. Wählen Sie Bereitstellen.

  5. Wählen Sie Test, um Ihre Funktion mit der Lambda-Konsole aufzurufen.

  6. Um zu überprüfen, ob Ihr Lambda Ihre DAG erfolgreich aufgerufen hat, verwenden Sie die Amazon MWAA-Konsole, um zur Apache Airflow-Benutzeroberfläche Ihrer Umgebung zu navigieren, und gehen Sie dann wie folgt vor:

    1. Suchen Sie auf der DAG-Seite Ihre neue Ziel-DAG in der Liste der DAGs.

    2. Überprüfen Sie unter Letzte Ausführung den Zeitstempel für die letzte DAG-Ausführung. Dieser Zeitstempel sollte genau mit dem letzten Zeitstempel für Ihre andere invoke_dag Umgebung übereinstimmen.

    3. Überprüfen Sie unter Letzte Aufgaben, ob die letzte Ausführung erfolgreich war.