As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.
Invocação DAGs em diferentes ambientes do Amazon MWAA
O exemplo de código a seguir cria um token da CLI do Apache Airflow. O código, usa um gráfico acíclico direcionado (directed acyclic graph, DAG) em um ambiente do Amazon MWAA para invocar um DAG em um ambiente diferente do Amazon MWAA.
Versão
-
É possível usar o exemplo de código nesta página com o Apache Airflow v2 no Python 3.10
.
Pré-requisitos
Para usar o código de exemplo nesta página, você precisará de:
-
Dois ambientes Amazon MWAA com acesso ao servidor web de rede pública, incluindo seu ambiente atual.
-
Um exemplo de DAG que foi carregado para o bucket do Amazon Simple Storage Service (Amazon S3) do seu ambiente de destino.
Permissões
Para usar o exemplo de código nesta página, o perfil de execução do seu ambiente deve ter permissão para criar um token da CLI do Apache Airflow. Você pode anexar a política AWS gerenciada AmazonMWAAAirflowCliAccess
para conceder essa permissão.
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "airflow:CreateCliToken" ], "Resource": "*" } ]
Para obter mais informações, consulte Política de CLI do Apache Airflow: Amazon MWAAAirflow CliAccess.
Dependências
-
Para usar esse exemplo de código com o Apache Airflow v2, nenhuma dependência adicional é necessária. O código usa a instalação básica do Apache Airflow v2
em seu ambiente.
Exemplo de código
O exemplo de código a seguir pressupõe que você esteja usando um DAG em seu ambiente atual para invocar um DAG em outro ambiente.
-
Em seu terminal, navegue até o diretório em que seu código DAG está armazenado. Por exemplo:
cd dags
-
Copie o conteúdo do exemplo de código a seguir e salve-o localmente como
invoke_dag.py
. Substitua os seguintes valores por suas informações.-
your-new-environment-name
: o nome do outro ambiente onde deseja invocar o DAG. -
your-target-dag-id
: o ID do DAG no outro ambiente que você deseja invocar.
from airflow.decorators import dag, task import boto3 from datetime import datetime, timedelta import os, requests DAG_ID = os.path.basename(__file__).replace(".py", "") @task() def invoke_dag_task(**kwargs): client = boto3.client('mwaa') token = client.create_cli_token(Name='your-new-environment-name') url = f"https://{token['WebServerHostname']}/aws_mwaa/cli" body = 'dags trigger your-target-dag-id' headers = { 'Authorization' : 'Bearer ' + token['CliToken'], 'Content-Type': 'text/plain' } requests.post(url, data=body, headers=headers) @dag( dag_id=DAG_ID, schedule_interval=None, start_date=datetime(2022, 1, 1), dagrun_timeout=timedelta(minutes=60), catchup=False ) def invoke_dag(): t = invoke_dag_task() invoke_dag_test = invoke_dag()
-
-
Execute o AWS CLI comando a seguir para copiar o DAG para o bucket do seu ambiente e, em seguida, acionar o DAG usando a interface do usuário do Apache Airflow.
$
aws s3 cp
your-dag
.py s3://your-environment-bucket
/dags/ -
Se o DAG for executado com êxito, você verá uma saída semelhante à seguinte nos logs de tarefas de
invoke_dag_task
.[2022-01-01, 12:00:00 PDT] {{python.py:152}} INFO - Done. Returned value was: None [2022-01-01, 12:00:00 PDT] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=invoke_dag, task_id=invoke_dag_task, execution_date=20220101T120000, start_date=20220101T120000, end_date=20220101T120000 [2022-01-01, 12:00:00 PDT] {{local_task_job.py:154}} INFO - Task exited with return code 0 [2022-01-01, 12:00:00 PDT] {{local_task_job.py:264}} INFO - 0 downstream tasks scheduled from follow-on schedule check
Para verificar se seu DAG foi invocado com sucesso, navegue até a IU do Apache Airflow para seu novo ambiente e faça o seguinte:
-
Na DAGspágina, localize seu novo DAG de destino na lista de DAGs.
-
Em Última execução, verifique a data e hora da última execução do DAG. Esse carimbo de data/hora deve ser semelhante ao carimbo de data/hora mais recente para
invoke_dag
em seu outro ambiente. -
Em Tarefas recentes, verifique se a última execução foi bem-sucedida.
-