Modifier le fuseau horaire DAG d'un sur Amazon MWAA - Amazon Managed Workflows for Apache Airflow

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Modifier le fuseau horaire DAG d'un sur Amazon MWAA

Apache Airflow planifie votre graphe acyclique dirigé (DAG) en UTC +0 par défaut. Les étapes suivantes montrent comment modifier le fuseau horaire dans lequel Amazon MWAA utilise votre appareil DAGs avec Pendulum. Cette rubrique explique éventuellement comment créer un plugin personnalisé pour modifier le fuseau horaire des journaux Apache Airflow de votre environnement.

Version

  • Vous pouvez utiliser l'exemple de code présenté sur cette page avec Apache Airflow v2 en Python 3.10.

Prérequis

Pour utiliser l'exemple de code présenté sur cette page, vous aurez besoin des éléments suivants :

Autorisations

  • Aucune autorisation supplémentaire n'est requise pour utiliser l'exemple de code présenté sur cette page.

Créez un plugin pour modifier le fuseau horaire dans les journaux Airflow

Apache Airflow exécutera les fichiers Python présents dans le plugins répertoire au démarrage. Avec le plugin suivant, vous pouvez remplacer le fuseau horaire de l'exécuteur, qui modifie le fuseau horaire dans lequel Apache Airflow écrit les journaux.

  1. Créez un répertoire portant le nom plugins de votre plugin personnalisé, puis naviguez jusqu'au répertoire. Par exemple :

    $ mkdir plugins $ cd plugins
  2. Copiez le contenu de l'exemple de code suivant et enregistrez-le localement dag-timezone-plugin.py dans le plugins dossier.

    import time import os os.environ['TZ'] = 'America/Los_Angeles' time.tzset()
  3. Dans le plugins répertoire, créez un fichier Python vide nommé__init__.py. Votre plugins répertoire doit être similaire au suivant :

    plugins/ |-- __init__.py |-- dag-timezone-plugin.py

Créer une plugins.zip

Les étapes suivantes indiquent comment créerplugins.zip. Le contenu de cet exemple peut être combiné avec d'autres plugins et binaires dans un seul plugins.zip fichier.

  1. Dans votre invite de commande, accédez au plugins répertoire de l'étape précédente. Par exemple :

    cd plugins
  2. Compressez le contenu dans votre plugins répertoire.

    zip -r ../plugins.zip ./
  3. Téléversez plugins.zip dans votre compartiment S3

    $ aws s3 cp plugins.zip s3://your-mwaa-bucket/

Exemple de code

Pour modifier le fuseau horaire par défaut (UTC+0) dans lequel il DAG s'exécute, nous utiliserons une bibliothèque appelée Pendulum, une bibliothèque Python permettant de travailler avec des date/heure tenant compte du fuseau horaire.

  1. Dans votre invite de commande, accédez au répertoire dans lequel vous DAGs êtes enregistré. Par exemple :

    $ cd dags
  2. Copiez le contenu de l'exemple suivant et enregistrez-le soustz-aware-dag.py.

    from airflow import DAG from airflow.operators.bash_operator import BashOperator from datetime import datetime, timedelta # Import the Pendulum library. import pendulum # Instantiate Pendulum and set your timezone. local_tz = pendulum.timezone("America/Los_Angeles") with DAG( dag_id = "tz_test", schedule_interval="0 12 * * *", catchup=False, start_date=datetime(2022, 1, 1, tzinfo=local_tz) ) as dag: bash_operator_task = BashOperator( task_id="tz_aware_task", dag=dag, bash_command="date" )
  3. Exécutez la AWS CLI commande suivante pour le copier dans le compartiment DAG de votre environnement, puis déclenchez-le à l'DAGaide de l'interface utilisateur d'Apache Airflow.

    $ aws s3 cp your-dag.py s3://your-environment-bucket/dags/
  4. En cas de succès, vous obtiendrez un résultat similaire à ce qui suit dans les journaux des tâches pour le tz_aware_task tz_test DAG :

    [2022-08-01, 12:00:00 PDT] {{subprocess.py:74}} INFO - Running command: ['bash', '-c', 'date']
    [2022-08-01, 12:00:00 PDT] {{subprocess.py:85}} INFO - Output:
    [2022-08-01, 12:00:00 PDT] {{subprocess.py:89}} INFO - Mon Aug  1 12:00:00 PDT 2022
    [2022-08-01, 12:00:00 PDT] {{subprocess.py:93}} INFO - Command exited with return code 0
    [2022-08-01, 12:00:00 PDT] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=tz_test, task_id=tz_aware_task, execution_date=20220801T190033, start_date=20220801T190035, end_date=20220801T190035
    [2022-08-01, 12:00:00 PDT] {{local_task_job.py:154}} INFO - Task exited with return code 0
    [2022-08-01, 12:00:00 PDT] {{local_task_job.py:264}} INFO - 0 downstream tasks scheduled from follow-on schedule check
                    

Quelle est la prochaine étape ?