Invocation de DAG dans différents environnements Amazon MAAA - Amazon Managed Workflows for Apache Airflow

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Invocation de DAG dans différents environnements Amazon MAAA

L'exemple de code suivant crée un jeton CLI Apache Airflow. Le code utilise ensuite un graphe acyclique dirigé (DAG) dans un environnement Amazon MWAA pour appeler un DAG dans un autre environnement Amazon MWAA.

Version

  • Vous pouvez utiliser l'exemple de code de cette page avecApache Airflow v2 et versions ultérieuresdansPython 3.10.

Prérequis

Pour utiliser l'exemple de code de cette page, vous devez disposer des éléments suivants :

  • DeuxEnvironnements Amazon MWAavecréseau publicaccès au serveur Web, y compris à votre environnement actuel.

  • Un exemple de DAG chargé dans le compartiment Amazon Simple Storage Service (Amazon S3) de votre environnement cible.

Autorisations

Pour utiliser l'exemple de code de cette page, le rôle d'exécution de votre environnement doit être autorisé à créer un jeton CLI Apache Airflow. Vous pouvez joindre leAWSpolitique géréeAmazonMWAAAirflowCliAccesspour accorder cette autorisation.

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "airflow:CreateCliToken" ], "Resource": "*" } ]

Pour plus d'informations, veuillez consulter Politique de la CLI Apache Airflow : Accès Amazon AirflowCli MWAA.

Dépendances

Exemple de code

L'exemple de code suivant suppose que vous utilisez un DAG dans votre environnement actuel pour appeler un DAG dans un autre environnement.

  1. Dans votre terminal, accédez au répertoire dans lequel votre code DAG est enregistré. Par exemple :

    cd dags
  2. Copiez le contenu de l'exemple de code suivant et enregistrez-le localement sousinvoke_dag.py. Remplacez les valeurs suivantes par vos informations.

    • your-new-environment-name— Le nom de l'autre environnement dans lequel vous souhaitez appeler le DAG.

    • your-target-dag-id— L'ID du DAG dans l'autre environnement que vous souhaitez appeler.

    from airflow.decorators import dag, task import boto3 from datetime import datetime, timedelta import os, requests DAG_ID = os.path.basename(__file__).replace(".py", "") @task() def invoke_dag_task(**kwargs): client = boto3.client('mwaa') token = client.create_cli_token(Name='your-new-environment-name') url = f"https://{token['WebServerHostname']}/aws_mwaa/cli" body = 'dags trigger your-target-dag-id' headers = { 'Authorization' : 'Bearer ' + token['CliToken'], 'Content-Type': 'text/plain' } requests.post(url, data=body, headers=headers) @dag( dag_id=DAG_ID, schedule_interval=None, start_date=datetime(2022, 1, 1), dagrun_timeout=timedelta(minutes=60), catchup=False ) def invoke_dag(): t = invoke_dag_task() invoke_dag_test = invoke_dag()
  3. Exécutez la commande suivanteAWS CLIcommande permettant de copier le DAG dans le bucket de votre environnement, puis de déclencher le DAG à l'aide de l'interface utilisateur d'Apache Airflow.

    $ aws s3 cp your-dag.py s3://your-environment-bucket/dags/
  4. Si le DAG fonctionne correctement, vous verrez un résultat similaire à ce qui suit dans les journaux des tâches pourinvoke_dag_task.

    [2022-01-01, 12:00:00 PDT] {{python.py:152}} INFO - Done. Returned value was: None
    [2022-01-01, 12:00:00 PDT] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=invoke_dag, task_id=invoke_dag_task, execution_date=20220101T120000, start_date=20220101T120000, end_date=20220101T120000
    [2022-01-01, 12:00:00 PDT] {{local_task_job.py:154}} INFO - Task exited with return code 0
    [2022-01-01, 12:00:00 PDT] {{local_task_job.py:264}} INFO - 0 downstream tasks scheduled from follow-on schedule check

    Pour vérifier que votre DAG a bien été appelé, accédez à l'interface utilisateur d'Apache Airflow correspondant à votre nouvel environnement, puis procédez comme suit :

    1. Sur leDAGpage, localisez votre nouveau DAG cible dans la liste des DAG.

    2. SousDernière course, vérifiez l'horodatage de la dernière exécution du DAG. Cet horodatage doit correspondre étroitement au dernier horodatage pourinvoke_dagdans votre autre environnement.

    3. SousTâches récentes, vérifiez que la dernière exécution s'est bien déroulée.