本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
在不同的 Amazon MWAA 环境中调用 DAG
以下代码示例创建了一个 Apache Airflow CLI 令牌。然后,该代码使用一个 Amazon MWAA 环境中的有向无环图(DAG)在另一个 Amazon MWAA 环境中调用 DAG。
版本
-
您可以在 Python 3.10
中将本页上的代码示例与 Apache Airflow v2 一起使用。
先决条件
要使用本页上的代码示例,您需要以下内容:
-
两个具有公有网络 Web 服务器访问权限的 Amazon MWAA 环境,包括您当前的环境。
-
上传到目标环境的 Amazon Simple Storage Service(Amazon S3)桶的示例 DAG。
权限
要使用本页上的代码示例,环境的执行角色必须具有创建 Apache Airflow CLI 令牌的权限。您可以附加 AWS 托管策略 AmazonMWAAAirflowCliAccess
来授予此权限。
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "airflow:CreateCliToken" ], "Resource": "*" } ]
有关更多信息,请参阅 Apache Airflow CLI 策略:A mazonMWAAAirflow CliAccess。
依赖项
-
要在 Apache Airflow v2 中使用此代码示例,无需附加依赖项。该代码在环境中使用 Apache Airflow v2 基础版安装
。
代码示例
以下代码示例假设您在当前环境中使用 DAG 在另一个环境中调用 DAG。
-
在您的终端,导航到存储 DAG 代码的目录。例如:
cd dags
-
复制以下示例的内容并本地另存为
invoke_dag.py
。用您自己的信息替换以下值。-
your-new-environment-name
— 您要调用 DAG 的另一个环境的名称。 -
your-target-dag-id
— 您要调用 DAG 的另一个环境中的 DAG ID。
from airflow.decorators import dag, task import boto3 from datetime import datetime, timedelta import os, requests DAG_ID = os.path.basename(__file__).replace(".py", "") @task() def invoke_dag_task(**kwargs): client = boto3.client('mwaa') token = client.create_cli_token(Name='your-new-environment-name') url = f"https://{token['WebServerHostname']}/aws_mwaa/cli" body = 'dags trigger your-target-dag-id' headers = { 'Authorization' : 'Bearer ' + token['CliToken'], 'Content-Type': 'text/plain' } requests.post(url, data=body, headers=headers) @dag( dag_id=DAG_ID, schedule_interval=None, start_date=datetime(2022, 1, 1), dagrun_timeout=timedelta(minutes=60), catchup=False ) def invoke_dag(): t = invoke_dag_task() invoke_dag_test = invoke_dag()
-
-
运行以下 AWS CLI 命令将 DAG 复制到环境的存储桶,然后使用 Apache Airflow UI 触发 DAG。
$
aws s3 cp
your-dag
.py s3://your-environment-bucket
/dags/ -
如果 DAG 成功运行,您将看到类似于
invoke_dag_task
的任务日志中的以下内容的输出。[2022-01-01, 12:00:00 PDT] {{python.py:152}} INFO - Done. Returned value was: None [2022-01-01, 12:00:00 PDT] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=invoke_dag, task_id=invoke_dag_task, execution_date=20220101T120000, start_date=20220101T120000, end_date=20220101T120000 [2022-01-01, 12:00:00 PDT] {{local_task_job.py:154}} INFO - Task exited with return code 0 [2022-01-01, 12:00:00 PDT] {{local_task_job.py:264}} INFO - 0 downstream tasks scheduled from follow-on schedule check
要验证 DAG 是否已成功调用,请导航到新环境的 Apache Airflow UI,然后执行以下操作:
-
在 DAG 页面上,在 DAG 列表中找到新的目标 DAG。
-
在上次运行下,查看最新 DAG 运行的时间戳。此时间戳应与您其他环境中
invoke_dag
的最新时间戳非常匹配。 -
在近期任务下,检查上次运行是否成功。
-