Cambiar la zona horaria DAG de un usuario en Amazon MWAA - Amazon Managed Workflows para Apache Airflow

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Cambiar la zona horaria DAG de un usuario en Amazon MWAA

Apache Airflow programa tu grafo acíclico dirigido (DAG) en UTC +0 de forma predeterminada. Los siguientes pasos muestran cómo puedes cambiar la zona horaria en la que Amazon MWAA ejecuta tu cuenta DAGs con Pendulum. Opcionalmente, en este tema se muestra cómo puede crear un complemento personalizado para cambiar la zona horaria de los registros de Apache Airflow de su entorno.

Versión

  • Puede usar el ejemplo de código de esta página con Apache Airflow v2 en Python 3.10.

Requisitos previos

Para usar el código de muestra de esta página, necesitará lo siguiente:

Permisos

  • No se necesitan permisos adicionales para usar el código de ejemplo de esta página.

Creación de un complemento para cambiar la zona horaria en los registros de Airflow

Apache Airflow ejecutará los archivos de Python en el directorio de plugins al inicio. Con el siguiente complemento, puede anular la zona horaria del ejecutor, lo que modifica la zona horaria en la que Apache Airflow escribe los registros.

  1. Cree un directorio llamado plugins para su complemento personalizado y vaya al directorio. Por ejemplo:

    $ mkdir plugins $ cd plugins
  2. Copie el contenido de la siguiente muestra de código y guárdelo localmente como dag-timezone-plugin.py en la carpeta plugins.

    import time import os os.environ['TZ'] = 'America/Los_Angeles' time.tzset()
  3. En el directorio plugins, cree un archivo de Python vacío llamado __init__.py. Su directorio plugins debería ser similar a lo siguiente:

    plugins/ |-- __init__.py |-- dag-timezone-plugin.py

Creación de un plugins.zip

Los siguientes pasos muestran cómo crear plugins.zip. El contenido de esta muestra se puede combinar con otros complementos y binarios en un solo archivo plugins.zip.

  1. En el símbolo del sistema, vaya hasta el directorio plugins del paso anterior. Por ejemplo:

    cd plugins
  2. Comprima el contenido en su directorio plugins.

    zip -r ../plugins.zip ./
  3. Carga de plugins.zip en el bucket S3

    $ aws s3 cp plugins.zip s3://your-mwaa-bucket/

Código de ejemplo

Para cambiar la zona horaria predeterminada (UTC+0) en la que DAG se ejecuta, utilizaremos una biblioteca llamada Pendulum, una biblioteca de Python para trabajar con datetime que reconoce la zona horaria.

  1. En la línea de comandos, navega hasta el directorio en el que estás almacenado. DAGs Por ejemplo:

    $ cd dags
  2. Copie el contenido de la siguiente muestra y guárdelo como tz-aware-dag.py.

    from airflow import DAG from airflow.operators.bash_operator import BashOperator from datetime import datetime, timedelta # Import the Pendulum library. import pendulum # Instantiate Pendulum and set your timezone. local_tz = pendulum.timezone("America/Los_Angeles") with DAG( dag_id = "tz_test", schedule_interval="0 12 * * *", catchup=False, start_date=datetime(2022, 1, 1, tzinfo=local_tz) ) as dag: bash_operator_task = BashOperator( task_id="tz_aware_task", dag=dag, bash_command="date" )
  3. Ejecute el siguiente AWS CLI comando para copiarlo en el bucket de su entorno y, DAG a continuación, actívalo DAG mediante la interfaz de usuario de Apache Airflow.

    $ aws s3 cp your-dag.py s3://your-environment-bucket/dags/
  4. Si se ejecuta correctamente, obtendrá un resultado similar al siguiente en los registros de tareas de tz_testDAG: tz_aware_task

    [2022-08-01, 12:00:00 PDT] {{subprocess.py:74}} INFO - Running command: ['bash', '-c', 'date']
    [2022-08-01, 12:00:00 PDT] {{subprocess.py:85}} INFO - Output:
    [2022-08-01, 12:00:00 PDT] {{subprocess.py:89}} INFO - Mon Aug  1 12:00:00 PDT 2022
    [2022-08-01, 12:00:00 PDT] {{subprocess.py:93}} INFO - Command exited with return code 0
    [2022-08-01, 12:00:00 PDT] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=tz_test, task_id=tz_aware_task, execution_date=20220801T190033, start_date=20220801T190035, end_date=20220801T190035
    [2022-08-01, 12:00:00 PDT] {{local_task_job.py:154}} INFO - Task exited with return code 0
    [2022-08-01, 12:00:00 PDT] {{local_task_job.py:264}} INFO - 0 downstream tasks scheduled from follow-on schedule check
                    

Siguientes pasos