Invocar DAG en diferentes entornos de Amazon MWAA - Amazon Managed Workflows para Apache Airflow

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Invocar DAG en diferentes entornos de Amazon MWAA

El siguiente código de ejemplo crea un token CLI de Apache Airflow. A continuación, el código utiliza un gráfico acíclico dirigido (DAG) en un entorno de Amazon MWAA para invocar un DAG en un entorno distinto de Amazon MWAA.

Versión

  • Puede usar el código de ejemplo de esta página con Apache Airflow v2 y versiones posteriores en Python 3.10.

Requisitos previos

Para usar el exemplo de código en esta página, necesitará lo siguiente:

  • Dos entornos de Amazon MWAA con acceso a un servidor web de red pública, incluido su entorno actual.

  • Un DAG de muestra cargado en el bucket de Amazon Simple Storage Service (Amazon S3) de su entorno de destino.

Permisos

Para usar el código de ejemplo de esta página, el rol de ejecución de su entorno debe tener permiso para crear un token CLI de Apache Airflow. Puede adjuntar la política administrada AWS AmazonMWAAAirflowCliAccess para conceder este permiso.

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "airflow:CreateCliToken" ], "Resource": "*" } ]

Para obtener más información, consulte Política de CLI de Apache Airflow: AmazonMWAA AirflowCliAccess.

Dependencias

Ejemplo de código

En el siguiente código de ejemplo se supone que está utilizando un DAG en su entorno actual para invocar un DAG en otro entorno.

  1. En su terminal, vaya hasta el directorio en el que está almacenado el código de DAG. Por ejemplo:

    cd dags
  2. Copie el contenido del siguiente código de ejemplo y guárdelo localmente como invoke_dag.py. Reemplace los valores siguientes por sus propios valores.

    • your-new-environment-name: nombre del otro entorno en el que desea invocar el DAG.

    • your-target-dag-id: ID del DAG del otro entorno que desea invocar.

    from airflow.decorators import dag, task import boto3 from datetime import datetime, timedelta import os, requests DAG_ID = os.path.basename(__file__).replace(".py", "") @task() def invoke_dag_task(**kwargs): client = boto3.client('mwaa') token = client.create_cli_token(Name='your-new-environment-name') url = f"https://{token['WebServerHostname']}/aws_mwaa/cli" body = 'dags trigger your-target-dag-id' headers = { 'Authorization' : 'Bearer ' + token['CliToken'], 'Content-Type': 'text/plain' } requests.post(url, data=body, headers=headers) @dag( dag_id=DAG_ID, schedule_interval=None, start_date=datetime(2022, 1, 1), dagrun_timeout=timedelta(minutes=60), catchup=False ) def invoke_dag(): t = invoke_dag_task() invoke_dag_test = invoke_dag()
  3. Ejecute el siguiente comando de AWS CLI para copiar el DAG en el bucket de su entorno y, a continuación, active el DAG mediante la interfaz de usuario de Apache Airflow.

    $ aws s3 cp your-dag.py s3://your-environment-bucket/dags/
  4. Si el DAG se ejecuta correctamente, verá un resultado similar al siguiente en los registros de tareas de invoke_dag_task.

    [2022-01-01, 12:00:00 PDT] {{python.py:152}} INFO - Done. Returned value was: None
    [2022-01-01, 12:00:00 PDT] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=invoke_dag, task_id=invoke_dag_task, execution_date=20220101T120000, start_date=20220101T120000, end_date=20220101T120000
    [2022-01-01, 12:00:00 PDT] {{local_task_job.py:154}} INFO - Task exited with return code 0
    [2022-01-01, 12:00:00 PDT] {{local_task_job.py:264}} INFO - 0 downstream tasks scheduled from follow-on schedule check

    Para comprobar que el DAG se haya invocado correctamente, vaya a la interfaz de usuario de Apache Airflow del nuevo entorno y, a continuación, haga lo siguiente:

    1. En la página DAG, busque su nuevo DAG de destino en la lista de DAG.

    2. En Última ejecución, compruebe la marca de tiempo de la última ejecución del DAG. Esta marca de tiempo debe acercarse lo máximo posible a la última marca de tiempo para invoke_dag en su otro entorno.

    3. En Tareas recientes, compruebe que la última ejecución se haya realizado correctamente.