Modifica del fuso orario di un DAG su Amazon MWAA - Amazon Managed Workflows for Apache Airflow

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Modifica del fuso orario di un DAG su Amazon MWAA

Per impostazione predefinita, Apache Airflow pianifica il grafo aciclico diretto (DAG) in UTC+0. I passaggi seguenti mostrano come modificare il fuso orario con cui Amazon MWAA esegue i tuoi DAGPendolo. Facoltativamente, questo argomento dimostra come creare un plug-in personalizzato per modificare il fuso orario dei log di Apache Airflow del tuo ambiente.

Versione

  • Puoi usare l'esempio di codice in questa pagina conApache Airflow v2 e versioni successivenelPython 3.10.

Prerequisiti

Per utilizzare il codice di esempio in questa pagina, avrai bisogno di quanto segue:

Autorizzazioni

  • Non sono necessarie autorizzazioni aggiuntive per utilizzare l'esempio di codice in questa pagina.

Crea un plugin per modificare il fuso orario nei log di Airflow

Apache Airflow eseguirà i file Python nelpluginselenco all'avvio. Con il seguente plugin, puoi sovrascrivere il fuso orario dell'esecutore, che modifica il fuso orario in cui Apache Airflow scrive i log.

  1. Crea una cartella denominatapluginsper il tuo plugin personalizzato e vai alla directory. Ad esempio:

    $ mkdir plugins $ cd plugins
  2. Copia il contenuto del seguente esempio di codice e salvalo localmente comedag-timezone-plugin.pynelpluginscartella.

    import time import os os.environ['TZ'] = 'America/Los_Angeles' time.tzset()
  3. Nelpluginsdirectory, crea un file Python vuoto denominato__init__.py. Il tuopluginsla cartella dovrebbe essere simile alla seguente:

    plugins/ |-- __init__.py |-- dag-timezone-plugin.py

Creazione di una destinazione plugins.zip

I passaggi seguenti mostrano come creareplugins.zip. Il contenuto di questo esempio può essere combinato con altri plugin e binari in un unicoplugins.zipfascicolo.

  1. Nel prompt dei comandi, accedi apluginscartella del passaggio precedente. Ad esempio:

    cd plugins
  2. Comprimi i contenuti all'interno del tuopluginsrubrica.

    zip -r ../plugins.zip ./
  3. Caricareplugins.zipal tuo bucket S3

    $ aws s3 cp plugins.zip s3://your-mwaa-bucket/

Esempio di codice

Per modificare il fuso orario predefinito (UTC+0) in cui viene eseguito il DAG, useremo una libreria chiamataPendolo, una libreria Python per lavorare con datetime compatibili con il fuso orario.

  1. Nel prompt dei comandi, accedi alla directory in cui sono archiviati i tuoi DAG. Ad esempio:

    $ cd dags
  2. Copia il contenuto dell'esempio seguente e salvalo con nometz-aware-dag.py.

    from airflow import DAG from airflow.operators.bash_operator import BashOperator from datetime import datetime, timedelta # Import the Pendulum library. import pendulum # Instantiate Pendulum and set your timezone. local_tz = pendulum.timezone("America/Los_Angeles") with DAG( dag_id = "tz_test", schedule_interval="0 12 * * *", catchup=False, start_date=datetime(2022, 1, 1, tzinfo=local_tz) ) as dag: bash_operator_task = BashOperator( task_id="tz_aware_task", dag=dag, bash_command="date" )
  3. Esegui quanto segueAWS CLIcomando per copiare il DAG nel bucket dell'ambiente, quindi attivare il DAG utilizzando l'interfaccia utente di Apache Airflow.

    $ aws s3 cp your-dag.py s3://your-environment-bucket/dags/
  4. In caso di successo, nei registri delle attività per iltz_aware_taskneltz_testGIORNO:

    [2022-08-01, 12:00:00 PDT] {{subprocess.py:74}} INFO - Running command: ['bash', '-c', 'date']
    [2022-08-01, 12:00:00 PDT] {{subprocess.py:85}} INFO - Output:
    [2022-08-01, 12:00:00 PDT] {{subprocess.py:89}} INFO - Mon Aug  1 12:00:00 PDT 2022
    [2022-08-01, 12:00:00 PDT] {{subprocess.py:93}} INFO - Command exited with return code 0
    [2022-08-01, 12:00:00 PDT] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=tz_test, task_id=tz_aware_task, execution_date=20220801T190033, start_date=20220801T190035, end_date=20220801T190035
    [2022-08-01, 12:00:00 PDT] {{local_task_job.py:154}} INFO - Task exited with return code 0
    [2022-08-01, 12:00:00 PDT] {{local_task_job.py:264}} INFO - 0 downstream tasks scheduled from follow-on schedule check
                    

Fasi successive