Amazon MWAA에서 DAG 시간대 변경 - Amazon Managed Workflows for Apache Airflow

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

Amazon MWAA에서 DAG 시간대 변경

Apache Airflow는 기본적으로 방향성 비순환 그래프(DAG)를 UTC+0으로 스케줄링합니다. 다음 단계는 Amazon MWAA가 Pendulum을 사용하여 DAG를 실행하는 시간대를 변경하는 방법을 보여줍니다. 선택적으로 이 주제에서는 사용자 지정 플러그인을 생성하여 사용자 환경의 Apache Airflow 로그의 시간대를 변경하는 방법을 보여줍니다.

버전

  • 이 페이지의 코드 예제는 Python 3.10Apache Airflow v2에서 사용할 수 있습니다.

필수 조건

이 페이지의 이 샘플 코드를 사용하려면 다음 항목이 필요합니다.

권한

  • 이 페이지의 코드 예제를 사용하는 데 추가 권한이 필요하지 않습니다.

Airflow 로그의 시간대를 변경하는 플러그인을 생성합니다.

Apache Airflow는 시작시 plugins 디렉터리의 Python 파일을 실행합니다. 다음 플러그인을 사용하면 실행자의 시간대를 재정의하여 Apache Airflow가 로그를 기록하는 시간대를 수정할 수 있습니다.

  1. 사용자 지정 플러그인에 대해 지정된 이름 plugins 디렉터리를 생성하고 디렉터리로 이동합니다. 예:

    $ mkdir plugins $ cd plugins
  2. 다음 코드 샘플의 콘텐츠를 복사하고 로컬의 plugins 폴더에 dag-timezone-plugin.py로 저장합니다.

    import time import os os.environ['TZ'] = 'America/Los_Angeles' time.tzset()
  3. plugins 디렉터리에서 __init__.py라는 빈 Python 파일을 만듭니다. plugins 디렉터리는 다음과 비슷해야 합니다.

    plugins/ |-- __init__.py |-- dag-timezone-plugin.py

plugins.zip 생성

다음 단계에서는 plugins.zip을 생성하는 방법을 보여줍니다. 이 예제의 콘텐츠를 다른 플러그인 및 바이너리와 결합하여 단일 plugins.zip 파일로 만들 수 있습니다.

  1. 명령 프롬프트에서 이전 단계의 plugins 디렉터리로 이동합니다. 예:

    cd plugins
  2. plugins 디렉터리 내의 콘텐츠를 압축합니다.

    zip -r ../plugins.zip ./
  3. S3 버킷에 plugins.zip를 업로드합니다.

    $ aws s3 cp plugins.zip s3://your-mwaa-bucket/

코드 샘플

DAG가 실행되는 기본 시간대(UTC+0)를 변경하려면 시간대를 인식하는 날짜/시간을 처리하는 Python 라이브러리인 Pendulum이라는 라이브러리를 사용합니다.

  1. 명령 프롬프트에서 DAG가 저장된 디렉터리로 이동합니다. 예:

    $ cd dags
  2. 다음 예제의 콘텐츠를 복사하고 tz-aware-dag.py로 저장합니다.

    from airflow import DAG from airflow.operators.bash_operator import BashOperator from datetime import datetime, timedelta # Import the Pendulum library. import pendulum # Instantiate Pendulum and set your timezone. local_tz = pendulum.timezone("America/Los_Angeles") with DAG( dag_id = "tz_test", schedule_interval="0 12 * * *", catchup=False, start_date=datetime(2022, 1, 1, tzinfo=local_tz) ) as dag: bash_operator_task = BashOperator( task_id="tz_aware_task", dag=dag, bash_command="date" )
  3. 다음 AWS CLI 명령을 실행하여 DAG를 환경 버킷에 복사한 다음 Apache Airflow UI를 사용하여 DAG를 트리거합니다.

    $ aws s3 cp your-dag.py s3://your-environment-bucket/dags/
  4. 성공하면 tz_test DAG의 tz_aware_task에 대한 작업 로그에 다음과 비슷한 결과가 출력됩니다.

    [2022-08-01, 12:00:00 PDT] {{subprocess.py:74}} INFO - Running command: ['bash', '-c', 'date']
    [2022-08-01, 12:00:00 PDT] {{subprocess.py:85}} INFO - Output:
    [2022-08-01, 12:00:00 PDT] {{subprocess.py:89}} INFO - Mon Aug  1 12:00:00 PDT 2022
    [2022-08-01, 12:00:00 PDT] {{subprocess.py:93}} INFO - Command exited with return code 0
    [2022-08-01, 12:00:00 PDT] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=tz_test, task_id=tz_aware_task, execution_date=20220801T190033, start_date=20220801T190035, end_date=20220801T190035
    [2022-08-01, 12:00:00 PDT] {{local_task_job.py:154}} INFO - Task exited with return code 0
    [2022-08-01, 12:00:00 PDT] {{local_task_job.py:264}} INFO - 0 downstream tasks scheduled from follow-on schedule check
                    

다음 단계