기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.
여러 Amazon MWAA 환경에서 DAG 호출
다음 코드 예제는 Apache Airflow CLI 토큰을 생성합니다. 그런 다음 코드는 한 Amazon MWAA 환경에서 방향성 비순환 그래프(DAG)를 사용하여 다른 Amazon MWAA 환경에서 DAG를 호출합니다.
버전
-
이 페이지의 코드 예제는 Python 3.10
의 Apache Airflow v2에서 사용할 수 있습니다.
사전 조건
이 페이지에서 코드 예제를 사용하려면 다음이 필요합니다.
-
퍼블릭 네트워크 웹 서버에 액세스할 수 있는 두 개의 Amazon MWAA 환경(현재 환경 포함).
-
대상 환경의 Amazon Simple Storage Service(S3) 버킷에 업로드된 샘플 DAG.
권한
이 페이지의 코드 예제를 사용하려면 사용자 환경의 실행 역할에 Apache Airflow CLI 토큰을 생성할 권한이 있어야 합니다. AWS관리형 정책을 연결하여 이 권한을 AmazonMWAAAirflowCliAccess
부여할 수 있습니다.
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "airflow:CreateCliToken" ], "Resource": "*" } ]
자세한 내용은 Apache Airflow CLI 정책: AmazonMWAAAirflowCliAccessWord 섹션을 참조하십시오.
의존성
-
이 코드 예제를 Apache Airflow v2와 함께 사용하려면 추가 종속성이 필요하지 않습니다. 코드는 사용자 환경에 설치된 Apache Airflow v2 기본 설치
를 사용합니다.
코드 예제
다음 코드 예제는 현재 환경에서 DAG를 사용하여 다른 환경에서 DAG를 호출한다고 가정합니다.
-
터미널에서 DAG 코드가 저장된 디렉터리로 이동합니다. 예:
cd dags
-
다음 코드 예제의 내용을 복사하고 로컬에서
invoke_dag.py
로 저장합니다. 다음 값을 사용자의 정보로 교체합니다.-
your-new-environment-name
— DAG를 호출하려는 다른 환경의 이름. -
your-target-dag-id
— 호출하려는 다른 환경에서 DAG의 ID.
from airflow.decorators import dag, task import boto3 from datetime import datetime, timedelta import os, requests DAG_ID = os.path.basename(__file__).replace(".py", "") @task() def invoke_dag_task(**kwargs): client = boto3.client('mwaa') token = client.create_cli_token(Name='your-new-environment-name') url = f"https://{token['WebServerHostname']}/aws_mwaa/cli" body = 'dags trigger your-target-dag-id' headers = { 'Authorization' : 'Bearer ' + token['CliToken'], 'Content-Type': 'text/plain' } requests.post(url, data=body, headers=headers) @dag( dag_id=DAG_ID, schedule_interval=None, start_date=datetime(2022, 1, 1), dagrun_timeout=timedelta(minutes=60), catchup=False ) def invoke_dag(): t = invoke_dag_task() invoke_dag_test = invoke_dag()
-
-
다음 AWS CLI 명령을 실행하여 DAG를 환경 버킷에 복사한 다음 Apache Airflow UI를 사용하여 DAG를 트리거합니다.
$
aws s3 cp
your-dag
.py s3://your-environment-bucket
/dags/ -
DAG가 성공적으로 실행되면
invoke_dag_task
의 작업 로그에 다음과 유사한 출력이 표시됩니다.[2022-01-01, 12:00:00 PDT] {{python.py:152}} INFO - Done. Returned value was: None [2022-01-01, 12:00:00 PDT] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=invoke_dag, task_id=invoke_dag_task, execution_date=20220101T120000, start_date=20220101T120000, end_date=20220101T120000 [2022-01-01, 12:00:00 PDT] {{local_task_job.py:154}} INFO - Task exited with return code 0 [2022-01-01, 12:00:00 PDT] {{local_task_job.py:264}} INFO - 0 downstream tasks scheduled from follow-on schedule check
DAG가 성공적으로 호출되었는지 확인하려면 새 환경에 대한 Apache Airflow UI로 이동한 다음 다음을 수행합니다.
-
DAG 페이지의 DAG 목록에서 새 대상 DAG를 찾습니다.
-
마지막 실행에서 최신 DAG 실행의 타임스탬프를 확인합니다. 이 타임스탬프는 사용자의 다른 환경에서
invoke_dag
에 대한 최신 타임스탬프와 거의 일치해야 합니다. -
최근 작업에서 마지막 실행이 성공했는지 확인합니다.
-