在 Amazon MWAA 上更改 DAG 的时区 - Amazon Managed Workflows for Apache Airflow

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

在 Amazon MWAA 上更改 DAG 的时区

默认情况下,Apache Airflow 将有向无环图(DAG)安排在 UTC+0 中。以下步骤展示了如何使用 Pendulum 更改 Amazon MWAA 运行 DAG 的时区。或者,本主题演示如何创建自定义插件来更改环境的 Apache Airflow 日志的时区。

版本

  • 您可以将本页上的代码示例与 Python 3.10 中的 Apache Airflow v2 及更高版本一起使用。

先决条件

要使用本页上的示例代码,您需要以下内容:

权限

  • 无需其他权限即可使用本页上的代码示例。

创建插件以更改 Airflow 日志中的时区

Apache Airflow 将在启动时运行 plugins 目录中的 Python 文件。使用以下插件,您可以覆盖执行程序的时区,该时区会修改 Apache Airflow 写入日志的时区。

  1. 创建名为 plugins 的目录并导航到其中。例如:

    $ mkdir plugins $ cd plugins
  2. 复制以下代码示例的内容,并将其本地另存为 dag-timezone-plugin.py,保存在 plugins 文件夹中。

    import time import os os.environ['TZ'] = 'America/Los_Angeles' time.tzset()
  3. plugins 目录中创建名为 __init__.py 的空 Python 文件。plugins 目录应类似于以下内容。

    plugins/ |-- __init__.py |-- dag-timezone-plugin.py

创建 plugins.zip

以下步骤显示如何创建 plugins.zip。此示例的内容可以与其他插件和二进制文件组合成单个 plugins.zip 文件。

  1. 在命令提示符下,导航到上一步中的 plugins 目录。例如:

    cd plugins
  2. 将内容压缩到 plugins 目录中。

    zip -r ../plugins.zip ./
  3. plugins.zip 上传到 S3 存储桶。

    $ aws s3 cp plugins.zip s3://your-mwaa-bucket/

代码示例

要更改 DAG 运行的默认时区(UTC+0),我们将使用一个名为 Pendulum 的库,这是一个用于处理时区感知日期时间的 Python 库。

  1. 在命令提示符下,导航到存储 DAG 的目录。例如:

    $ cd dags
  2. 复制以下示例的内容并另存为 tz-aware-dag.py

    from airflow import DAG from airflow.operators.bash_operator import BashOperator from datetime import datetime, timedelta # Import the Pendulum library. import pendulum # Instantiate Pendulum and set your timezone. local_tz = pendulum.timezone("America/Los_Angeles") with DAG( dag_id = "tz_test", schedule_interval="0 12 * * *", catchup=False, start_date=datetime(2022, 1, 1, tzinfo=local_tz) ) as dag: bash_operator_task = BashOperator( task_id="tz_aware_task", dag=dag, bash_command="date" )
  3. 运行以下 AWS CLI 命令将 DAG 复制到环境的存储桶,然后使用 Apache Airflow UI 触发 DAG。

    $ aws s3 cp your-dag.py s3://your-environment-bucket/dags/
  4. 如果成功,您将输出类似于在 tz_test DAG 中的 tz_aware_task 的任务日志中的以下内容:

    [2022-08-01, 12:00:00 PDT] {{subprocess.py:74}} INFO - Running command: ['bash', '-c', 'date']
    [2022-08-01, 12:00:00 PDT] {{subprocess.py:85}} INFO - Output:
    [2022-08-01, 12:00:00 PDT] {{subprocess.py:89}} INFO - Mon Aug  1 12:00:00 PDT 2022
    [2022-08-01, 12:00:00 PDT] {{subprocess.py:93}} INFO - Command exited with return code 0
    [2022-08-01, 12:00:00 PDT] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=tz_test, task_id=tz_aware_task, execution_date=20220801T190033, start_date=20220801T190035, end_date=20220801T190035
    [2022-08-01, 12:00:00 PDT] {{local_task_job.py:154}} INFO - Task exited with return code 0
    [2022-08-01, 12:00:00 PDT] {{local_task_job.py:264}} INFO - 0 downstream tasks scheduled from follow-on schedule check
                    

接下来做什么?

  • 要了解如何将本示例中的 plugins.zip 文件上传到 Amazon S3 存储桶,请参阅 安装自定义插件