

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

# 여러 Amazon MWAA 환경에서 DAG 호출
<a name="samples-invoke-dag"></a>

다음 코드 예제는 Apache Airflow CLI 토큰을 생성합니다. 그런 다음 코드는 한 Amazon MWAA 환경에서 방향성 비순환 그래프(DAG)를 사용하여 다른 Amazon MWAA 환경에서 DAG를 호출합니다.

**Topics**
+ [버전](#samples-invoke-dag-version)
+ [사전 조건](#samples-invoke-dag-prereqs)
+ [권한](#samples-invoke-dag-permissions)
+ [종속성](#samples-invoke-dag-dependencies)
+ [코드 예제](#samples-invoke-dag-code)

## 버전
<a name="samples-invoke-dag-version"></a>

이 페이지의 코드 예제는 [Python 3.10](https://peps.python.org/pep-0619/)의 **Apache Airflow v2** 및 [Python 3.11](https://peps.python.org/pep-0664/)의 **Apache Airflow v3**에서 사용할 수 있습니다.

## 사전 조건
<a name="samples-invoke-dag-prereqs"></a>

이 페이지에서 코드 예제를 사용하려면 다음이 필요합니다.
+ **퍼블릭 네트워크** 웹 서버에 액세스할 수 있는 두 개의 [Amazon MWAA 환경](get-started.md)(현재 환경 포함).
+ 대상 환경의 Amazon Simple Storage Service(S3) 버킷에 업로드된 샘플 DAG.

## 권한
<a name="samples-invoke-dag-permissions"></a>

이 페이지의 코드 예제를 사용하려면 사용자 환경의 실행 역할에 Apache Airflow CLI 토큰을 생성할 권한이 있어야 합니다. AWS관리형 정책을 연결하여이 권한을 부여`AmazonMWAAAirflowCliAccess`할 수 있습니다.

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "airflow:CreateCliToken"
            ],
            "Resource": "*"
        }
    ]
}
```

------

자세한 정보는 [Apache Airflow CLI 정책: AmazonMWAAAirflowCliAccess](access-policies.md#cli-access) 섹션을 참조하세요.

## 종속성
<a name="samples-invoke-dag-dependencies"></a>

이 코드 예제를 Apache Airflow v2 이상에 사용하려면 추가 종속성이 필요하지 않습니다. [aws-mwaa-docker-images](https://github.com/aws/amazon-mwaa-docker-images)를 사용하여 Apache Airflow를 설치합니다.

## 코드 예제
<a name="samples-invoke-dag-code"></a>

다음 코드 예제는 현재 환경에서 DAG를 사용하여 다른 환경에서 DAG를 호출한다고 가정합니다.

1. 터미널에서 DAG 코드가 저장된 디렉터리로 이동합니다. 예제:

   ```
   cd dags
   ```

1. 다음 코드 예제의 내용을 복사하고 로컬에서 `invoke_dag.py`로 저장합니다. 다음 값을 사용자의 정보로 교체합니다.
   + `your-new-environment-name`— DAG를 호출하려는 다른 환경의 이름.
   + `your-target-dag-id`— 호출하려는 다른 환경에서 DAG의 ID.

   ```
   from airflow.decorators import dag, task
   import boto3
   from datetime import datetime, timedelta
   import os, requests
   
   DAG_ID = os.path.basename(__file__).replace(".py", "")
   
   @task()
   def invoke_dag_task(**kwargs):
       client = boto3.client('mwaa')
       token = client.create_cli_token(Name='your-new-environment-name')
       url = f"https://{token['WebServerHostname']}/aws_mwaa/cli"
       body = 'dags trigger your-target-dag-id'
       headers = {
           'Authorization' : 'Bearer ' + token['CliToken'],
           'Content-Type': 'text/plain'
           }
       requests.post(url, data=body, headers=headers)
   
   @dag(
       dag_id=DAG_ID,
       schedule_interval=None,
       start_date=datetime(2022, 1, 1),
       dagrun_timeout=timedelta(minutes=60),
       catchup=False
       )
   def invoke_dag():
       t = invoke_dag_task()
   
   invoke_dag_test = invoke_dag()
   ```

1.  다음 AWS CLI 명령을 실행하여 DAG를 환경의 버킷에 복사한 다음 Apache Airflow UI를 사용하여 DAG를 트리거합니다.

   ```
   aws s3 cp {{your-dag}}.py s3://{{your-environment-bucket}}/dags/
   ```

1. DAG가 성공적으로 실행되면 `invoke_dag_task`의 작업 로그에 다음과 유사한 출력을 얻게 됩니다.

   ```
   [2022-01-01, 12:00:00 PDT] {{python.py:152}} INFO - Done. Returned value was: None
   [2022-01-01, 12:00:00 PDT] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=invoke_dag, task_id=invoke_dag_task, execution_date=20220101T120000, start_date=20220101T120000, end_date=20220101T120000
   [2022-01-01, 12:00:00 PDT] {{local_task_job.py:154}} INFO - Task exited with return code 0
   [2022-01-01, 12:00:00 PDT] {{local_task_job.py:264}} INFO - 0 downstream tasks scheduled from follow-on schedule check
   ```

   DAG가 성공적으로 호출되었는지 확인하려면 새 환경에 대한 Apache Airflow UI로 이동한 다음 다음을 수행합니다.

   1. **DAG** 페이지의 DAG 목록에서 새 대상 DAG를 찾습니다.

   1. **마지막 실행**에서 최신 DAG 실행의 타임스탬프를 확인합니다. 이 타임스탬프는 사용자의 다른 환경에서 `invoke_dag`에 대한 최신 타임스탬프와 거의 일치해야 합니다.

   1. **최근 작업**에서 마지막 실행이 성공했는지 확인합니다.