DAGs in verschiedenen Amazon MWAA-Umgebungen aufrufen - Amazon Managed Workflows für Apache Airflow

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

DAGs in verschiedenen Amazon MWAA-Umgebungen aufrufen

Im folgenden Codebeispiel wird ein Apache Airflow-CLI-Token erstellt. Der Code verwendet dann einen gerichteten azyklischen Graphen (DAG) in einer Amazon MWAA-Umgebung, um eine DAG in einer anderen Amazon MWAA-Umgebung aufzurufen.

Version

  • Sie können das Codebeispiel auf dieser Seite verwenden mitApache Airflow v2 und höherinPython 3.10.

Voraussetzungen

Um das Codebeispiel auf dieser Seite zu verwenden, benötigen Sie Folgendes:

  • ZweiAmazon MWAA-Umgebungenmitöffentliches NetzwerkZugriff auf den Webserver, einschließlich Ihrer aktuellen Umgebung.

  • Eine Beispiel-DAG, die in den Amazon Simple Storage Service (Amazon S3) -Bucket Ihrer Zielumgebung hochgeladen wurde.

Berechtigungen

Um das Codebeispiel auf dieser Seite verwenden zu können, muss die Ausführungsrolle Ihrer Umgebung über die Berechtigung verfügen, ein Apache Airflow-CLI-Token zu erstellen. Sie können das anhängenAWSverwaltete RichtlinieAmazonMWAAAirflowCliAccessum diese Erlaubnis zu erteilen.

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "airflow:CreateCliToken" ], "Resource": "*" } ]

Weitere Informationen finden Sie unter Apache Airflow CLI-Richtlinie: AirflowCli AmazonMWAA Access.

Abhängigkeiten

  • Um dieses Codebeispiel mit Apache Airflow v2 zu verwenden, sind keine zusätzlichen Abhängigkeiten erforderlich. Der Code verwendet dieApache Airflow v2-Basisinstallationauf deine Umgebung.

Codebeispiel

Im folgenden Codebeispiel wird davon ausgegangen, dass Sie eine DAG in Ihrer aktuellen Umgebung verwenden, um eine DAG in einer anderen Umgebung aufzurufen.

  1. Navigieren Sie in Ihrem Terminal zu dem Verzeichnis, in dem Ihr DAG-Code gespeichert ist. Beispiele:

    cd dags
  2. Kopieren Sie den Inhalt des folgenden Codebeispiels und speichern Sie ihn lokal unterinvoke_dag.py. Ersetzen Sie die folgenden Werte durch Ihre Informationen.

    • your-new-environment-name— Der Name der anderen Umgebung, in der Sie die DAG aufrufen möchten.

    • your-target-dag-id— Die ID der DAG in der anderen Umgebung, die Sie aufrufen möchten.

    from airflow.decorators import dag, task import boto3 from datetime import datetime, timedelta import os, requests DAG_ID = os.path.basename(__file__).replace(".py", "") @task() def invoke_dag_task(**kwargs): client = boto3.client('mwaa') token = client.create_cli_token(Name='your-new-environment-name') url = f"https://{token['WebServerHostname']}/aws_mwaa/cli" body = 'dags trigger your-target-dag-id' headers = { 'Authorization' : 'Bearer ' + token['CliToken'], 'Content-Type': 'text/plain' } requests.post(url, data=body, headers=headers) @dag( dag_id=DAG_ID, schedule_interval=None, start_date=datetime(2022, 1, 1), dagrun_timeout=timedelta(minutes=60), catchup=False ) def invoke_dag(): t = invoke_dag_task() invoke_dag_test = invoke_dag()
  3. Führen Sie Folgendes ausAWS CLIBefehl, um die DAG in den Bucket Ihrer Umgebung zu kopieren und dann die DAG mithilfe der Apache Airflow-Benutzeroberfläche auszulösen.

    $ aws s3 cp your-dag.py s3://your-environment-bucket/dags/
  4. Wenn die DAG erfolgreich ausgeführt wird, sehen Sie in den Aufgabenprotokollen für eine Ausgabe ähnlich der folgendeninvoke_dag_task.

    [2022-01-01, 12:00:00 PDT] {{python.py:152}} INFO - Done. Returned value was: None
    [2022-01-01, 12:00:00 PDT] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=invoke_dag, task_id=invoke_dag_task, execution_date=20220101T120000, start_date=20220101T120000, end_date=20220101T120000
    [2022-01-01, 12:00:00 PDT] {{local_task_job.py:154}} INFO - Task exited with return code 0
    [2022-01-01, 12:00:00 PDT] {{local_task_job.py:264}} INFO - 0 downstream tasks scheduled from follow-on schedule check

    Um zu überprüfen, ob Ihre DAG erfolgreich aufgerufen wurde, navigieren Sie zur Apache Airflow-Benutzeroberfläche für Ihre neue Umgebung und gehen Sie dann wie folgt vor:

    1. Auf derDAGsSeite, suchen Sie Ihre neue Ziel-DAG in der Liste der DAGs.

    2. UnterLetzter Lauf, überprüfen Sie den Zeitstempel für den letzten DAG-Lauf. Dieser Zeitstempel sollte genau dem neuesten Zeitstempel für entsprecheninvoke_dagin deiner anderen Umgebung.

    3. UnterAktuelle Aufgaben, überprüfen Sie, ob der letzte Lauf erfolgreich war.