在不同的 Amazon MWAA 环境中调用 DAG - Amazon Managed Workflows for Apache Airflow

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

在不同的 Amazon MWAA 环境中调用 DAG

以下代码示例创建了一个 Apache Airflow CLI 令牌。然后,该代码使用一个 Amazon MWAA 环境中的有向无环图(DAG)在另一个 Amazon MWAA 环境中调用 DAG。

版本

  • 您可以在 Python 3.10 中将本页上的代码示例与 Apache Airflow v2 一起使用。

先决条件

要使用本页上的代码示例,您需要以下内容:

  • 两个具有公有网络 Web 服务器访问权限的 Amazon MWAA 环境,包括您当前的环境。

  • 上传到目标环境的 Amazon Simple Storage Service(Amazon S3)桶的示例 DAG。

权限

要使用本页上的代码示例,环境的执行角色必须具有创建 Apache Airflow CLI 令牌的权限。您可以附加 AWS 托管策略 AmazonMWAAAirflowCliAccess 来授予此权限。

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "airflow:CreateCliToken" ], "Resource": "*" } ]

有关更多信息,请参阅 Apache Airflow CLI 策略:A mazonMWAAAirflow CliAccess

依赖项

代码示例

以下代码示例假设您在当前环境中使用 DAG 在另一个环境中调用 DAG。

  1. 在您的终端,导航到存储 DAG 代码的目录。例如:

    cd dags
  2. 复制以下示例的内容并本地另存为 invoke_dag.py。用您自己的信息替换以下值。

    • your-new-environment-name— 您要调用 DAG 的另一个环境的名称。

    • your-target-dag-id— 您要调用 DAG 的另一个环境中的 DAG ID。

    from airflow.decorators import dag, task import boto3 from datetime import datetime, timedelta import os, requests DAG_ID = os.path.basename(__file__).replace(".py", "") @task() def invoke_dag_task(**kwargs): client = boto3.client('mwaa') token = client.create_cli_token(Name='your-new-environment-name') url = f"https://{token['WebServerHostname']}/aws_mwaa/cli" body = 'dags trigger your-target-dag-id' headers = { 'Authorization' : 'Bearer ' + token['CliToken'], 'Content-Type': 'text/plain' } requests.post(url, data=body, headers=headers) @dag( dag_id=DAG_ID, schedule_interval=None, start_date=datetime(2022, 1, 1), dagrun_timeout=timedelta(minutes=60), catchup=False ) def invoke_dag(): t = invoke_dag_task() invoke_dag_test = invoke_dag()
  3. 运行以下 AWS CLI 命令将 DAG 复制到环境的存储桶,然后使用 Apache Airflow UI 触发 DAG。

    $ aws s3 cp your-dag.py s3://your-environment-bucket/dags/
  4. 如果 DAG 成功运行,您将看到类似于 invoke_dag_task 的任务日志中的以下内容的输出。

    [2022-01-01, 12:00:00 PDT] {{python.py:152}} INFO - Done. Returned value was: None
    [2022-01-01, 12:00:00 PDT] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=invoke_dag, task_id=invoke_dag_task, execution_date=20220101T120000, start_date=20220101T120000, end_date=20220101T120000
    [2022-01-01, 12:00:00 PDT] {{local_task_job.py:154}} INFO - Task exited with return code 0
    [2022-01-01, 12:00:00 PDT] {{local_task_job.py:264}} INFO - 0 downstream tasks scheduled from follow-on schedule check

    要验证 DAG 是否已成功调用,请导航到新环境的 Apache Airflow UI,然后执行以下操作:

    1. DAG 页面上,在 DAG 列表中找到新的目标 DAG。

    2. 上次运行下,查看最新 DAG 运行的时间戳。此时间戳应与您其他环境中 invoke_dag 的最新时间戳非常匹配。

    3. 近期任务下,检查上次运行是否成功。