Como invocar DAGs em diferentes ambientes do Amazon MWAA - Amazon Managed Workflows for Apache Airflow

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Como invocar DAGs em diferentes ambientes do Amazon MWAA

O exemplo de código a seguir cria um token da CLI do Apache Airflow. O código, então, usa um gráfico acíclico direcionado (directed acyclic graph, DAG) em um ambiente do Amazon MWAA para invocar um DAG em um ambiente diferente do Amazon MWAA.

Versão

  • Você pode usar o exemplo de código nesta página com o Apache Airflow v2 e superior no Python 3.10.

Pré-requisitos

Para usar o código de exemplo nesta página, você precisará de:

  • Dois ambientes Amazon MWAA com acesso ao servidor web de rede pública, incluindo seu ambiente atual.

  • Um exemplo de DAG que foi carregado para o bucket do Amazon Simple Storage Service (Amazon S3) do seu ambiente de destino.

Permissões

Para usar o exemplo de código nesta página, o perfil de execução do seu ambiente deve ter permissão para criar um token da CLI do Apache Airflow. Você pode anexar a política AmazonMWAAAirflowCliAccess gerenciada pela AWS para conceder essa permissão.

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "airflow:CreateCliToken" ], "Resource": "*" } ]

Para obter mais informações, consulte Política de CLI do Apache Airflow: Acesso ao AmazonMWAA AirflowCli.

Dependências

Exemplo de código

O exemplo de código a seguir pressupõe que você esteja usando um DAG em seu ambiente atual para invocar um DAG em outro ambiente.

  1. Em seu terminal, navegue até o diretório em que seu código DAG está armazenado. Por exemplo:

    cd dags
  2. Copie o conteúdo do exemplo de código a seguir e salve-o localmente como invoke_dag.py. Substitua os seguintes valores por suas informações.

    • your-new-environment-name: o nome do outro ambiente onde deseja invocar o DAG.

    • your-target-dag-id: o ID do DAG no outro ambiente que você deseja invocar.

    from airflow.decorators import dag, task import boto3 from datetime import datetime, timedelta import os, requests DAG_ID = os.path.basename(__file__).replace(".py", "") @task() def invoke_dag_task(**kwargs): client = boto3.client('mwaa') token = client.create_cli_token(Name='your-new-environment-name') url = f"https://{token['WebServerHostname']}/aws_mwaa/cli" body = 'dags trigger your-target-dag-id' headers = { 'Authorization' : 'Bearer ' + token['CliToken'], 'Content-Type': 'text/plain' } requests.post(url, data=body, headers=headers) @dag( dag_id=DAG_ID, schedule_interval=None, start_date=datetime(2022, 1, 1), dagrun_timeout=timedelta(minutes=60), catchup=False ) def invoke_dag(): t = invoke_dag_task() invoke_dag_test = invoke_dag()
  3. Execute o seguinte comando AWS CLI para copiar o DAG para o bucket do seu ambiente e, em seguida, acionar o DAG usando a IU do Apache Airflow.

    $ aws s3 cp your-dag.py s3://your-environment-bucket/dags/
  4. Se o DAG for executado com êxito, você verá uma saída semelhante à seguinte nos logs de tarefas de invoke_dag_task.

    [2022-01-01, 12:00:00 PDT] {{python.py:152}} INFO - Done. Returned value was: None
    [2022-01-01, 12:00:00 PDT] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=invoke_dag, task_id=invoke_dag_task, execution_date=20220101T120000, start_date=20220101T120000, end_date=20220101T120000
    [2022-01-01, 12:00:00 PDT] {{local_task_job.py:154}} INFO - Task exited with return code 0
    [2022-01-01, 12:00:00 PDT] {{local_task_job.py:264}} INFO - 0 downstream tasks scheduled from follow-on schedule check

    Para verificar se seu DAG foi invocado com sucesso, navegue até a IU do Apache Airflow para seu novo ambiente e faça o seguinte:

    1. Na página DAGs, localize seu novo DAG de destino na lista de DAGs.

    2. Em Última execução, verifique a data e hora da última execução do DAG. Esse carimbo de data/hora deve ser semelhante ao carimbo de data/hora mais recente para invoke_dag em seu outro ambiente.

    3. Em Tarefas recentes, verifique se a última execução foi bem-sucedida.