여러 Amazon MWAA 환경에서 DAG 호출 - Amazon Managed Workflows for Apache Airflow

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

여러 Amazon MWAA 환경에서 DAG 호출

다음 코드 예제는 Apache Airflow CLI 토큰을 생성합니다. 그런 다음 코드는 한 Amazon MWAA 환경에서 방향성 비순환 그래프(DAG)를 사용하여 다른 Amazon MWAA 환경에서 DAG를 호출합니다.

버전

  • 이 페이지의 코드 예제는 Python 3.10Apache Airflow v2에서 사용할 수 있습니다.

사전 조건

이 페이지에서 코드 예제를 사용하려면 다음이 필요합니다.

  • 퍼블릭 네트워크 웹 서버에 액세스할 수 있는 두 개의 Amazon MWAA 환경(현재 환경 포함).

  • 대상 환경의 Amazon Simple Storage Service(S3) 버킷에 업로드된 샘플 DAG.

권한

이 페이지의 코드 예제를 사용하려면 사용자 환경의 실행 역할에 Apache Airflow CLI 토큰을 생성할 권한이 있어야 합니다. AWS관리형 정책을 연결하여 이 권한을 AmazonMWAAAirflowCliAccess 부여할 수 있습니다.

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "airflow:CreateCliToken" ], "Resource": "*" } ]

자세한 내용은 Apache Airflow CLI 정책: AmazonMWAAAirflowCliAccessWord 섹션을 참조하십시오.

의존성

  • 이 코드 예제를 Apache Airflow v2와 함께 사용하려면 추가 종속성이 필요하지 않습니다. 코드는 사용자 환경에 설치된 Apache Airflow v2 기본 설치를 사용합니다.

코드 예제

다음 코드 예제는 현재 환경에서 DAG를 사용하여 다른 환경에서 DAG를 호출한다고 가정합니다.

  1. 터미널에서 DAG 코드가 저장된 디렉터리로 이동합니다. 예:

    cd dags
  2. 다음 코드 예제의 내용을 복사하고 로컬에서 invoke_dag.py로 저장합니다. 다음 값을 사용자의 정보로 교체합니다.

    • your-new-environment-name— DAG를 호출하려는 다른 환경의 이름.

    • your-target-dag-id— 호출하려는 다른 환경에서 DAG의 ID.

    from airflow.decorators import dag, task import boto3 from datetime import datetime, timedelta import os, requests DAG_ID = os.path.basename(__file__).replace(".py", "") @task() def invoke_dag_task(**kwargs): client = boto3.client('mwaa') token = client.create_cli_token(Name='your-new-environment-name') url = f"https://{token['WebServerHostname']}/aws_mwaa/cli" body = 'dags trigger your-target-dag-id' headers = { 'Authorization' : 'Bearer ' + token['CliToken'], 'Content-Type': 'text/plain' } requests.post(url, data=body, headers=headers) @dag( dag_id=DAG_ID, schedule_interval=None, start_date=datetime(2022, 1, 1), dagrun_timeout=timedelta(minutes=60), catchup=False ) def invoke_dag(): t = invoke_dag_task() invoke_dag_test = invoke_dag()
  3. 다음 AWS CLI 명령을 실행하여 DAG를 환경 버킷에 복사한 다음 Apache Airflow UI를 사용하여 DAG를 트리거합니다.

    $ aws s3 cp your-dag.py s3://your-environment-bucket/dags/
  4. DAG가 성공적으로 실행되면 invoke_dag_task의 작업 로그에 다음과 유사한 출력이 표시됩니다.

    [2022-01-01, 12:00:00 PDT] {{python.py:152}} INFO - Done. Returned value was: None
    [2022-01-01, 12:00:00 PDT] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=invoke_dag, task_id=invoke_dag_task, execution_date=20220101T120000, start_date=20220101T120000, end_date=20220101T120000
    [2022-01-01, 12:00:00 PDT] {{local_task_job.py:154}} INFO - Task exited with return code 0
    [2022-01-01, 12:00:00 PDT] {{local_task_job.py:264}} INFO - 0 downstream tasks scheduled from follow-on schedule check

    DAG가 성공적으로 호출되었는지 확인하려면 새 환경에 대한 Apache Airflow UI로 이동한 다음 다음을 수행합니다.

    1. DAG 페이지의 DAG 목록에서 새 대상 DAG를 찾습니다.

    2. 마지막 실행에서 최신 DAG 실행의 타임스탬프를 확인합니다. 이 타임스탬프는 사용자의 다른 환경에서 invoke_dag에 대한 최신 타임스탬프와 거의 일치해야 합니다.

    3. 최근 작업에서 마지막 실행이 성공했는지 확인합니다.