Apache Airflow CLI 명령 참조 - Amazon Managed Workflows for Apache Airflow

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

Apache Airflow CLI 명령 참조

이 페이지에서는 Amazon Managed Workflows for Apache Airflow에서 지원되는 Apache Airflow CLI 명령과 지원되지 않는 Apache Airflow CLI 명령을 설명합니다.

필수 조건

다음 섹션에서는 이 페이지의 명령과 스크립트를 사용하는 데 필요한 예비 단계를 설명합니다.

액세스

AWS CLI

AWS Command Line Interface (AWS CLI) 는 명령줄 셸의 명령을 사용하여 AWS 서비스와 상호 작용할 수 있게 해주는 오픈 소스 도구입니다. 이 페이지에서 단계를 완료하려면 다음이 필요합니다.

v2에서 변경된 사항

  • 신규: Airflow CLI 명령 구조. Apache Airflow v2 CLI는 관련 명령이 하위 명령으로 그룹화되도록 구성되어 있습니다. 이는 Apache Airflow v2로 업그레이드하려면 Apache Airflow v1 스크립트를 업데이트해야 한다는 의미입니다. 예를 들어, Apache Airflow v1에서 unpause이 이제 Apache Airflow v2에 있습니다. 자세한 내용을 알아보려면 Apache Airflow 참조 가이드에서 섹션 2의 Airflow CLI 변경 사항을 참조하십시오.

지원되는 CLI 명령

다음 섹션에는 Amazon MWAA에서 사용할 수 있는 Apache Airflow CLI 명령이 나와 있습니다.

지원되는 명령

Apache Airflow v2

DAG를 구문 분석하는 명령 사용

환경에서 Apache Airflow v1.10.12 또는 v2.0.2를 실행하는 경우 DAG가 다음을 통해 설치된 패키지에 의존하는 플러그인을 사용하는 경우 DAG를 구문 분석하는 CLI 명령이 실패합니다. requirements.txt

Apache Airflow v2.0.2
  • dags backfill

  • dags list

  • dags list-runs

  • dags next-execution

DAG가 requirements.txt를 통해 설치된 패키지에 의존하는 플러그인을 사용하지 않으면 이러한 CLI 명령을 사용할 수 있습니다.

샘플 코드

다음 섹션에는 Apache Airflow CLI를 사용하는 여러 방법의 예제가 나와 있습니다.

Apache Airflow v2 변수 설정, 가져오기 또는 삭제

다음 샘플 코드를 사용하여 <script> <mwaa env name> get | set | delete <variable> <variable value> </variable> </variable>의 형식으로 변수를 설정, 가져오기 또는 삭제할 수 있습니다.

[ $# -eq 0 ] && echo "Usage: $0 MWAA environment name " && exit if [[ $2 == "" ]]; then dag="variables list" elif [ $2 == "get" ] || [ $2 == "delete" ] || [ $2 == "set" ]; then dag="variables $2 $3 $4 $5" else echo "Not a valid command" exit 1 fi CLI_JSON=$(aws mwaa --region $AWS_REGION create-cli-token --name $1) \ && CLI_TOKEN=$(echo $CLI_JSON | jq -r '.CliToken') \ && WEB_SERVER_HOSTNAME=$(echo $CLI_JSON | jq -r '.WebServerHostname') \ && CLI_RESULTS=$(curl --request POST "https://$WEB_SERVER_HOSTNAME/aws_mwaa/cli" \ --header "Authorization: Bearer $CLI_TOKEN" \ --header "Content-Type: text/plain" \ --data-raw "$dag" ) \ && echo "Output:" \ && echo $CLI_RESULTS | jq -r '.stdout' | base64 --decode \ && echo "Errors:" \ && echo $CLI_RESULTS | jq -r '.stderr' | base64 --decode

DAG를 트리거할 때 구성 추가

Apache Airflow v1 및 Apache Airflow v2에서 다음 샘플 코드를 사용하여 DAG를 트리거할 때 airflow trigger_dag 'dag_name' —conf '{"key":"value"}'과 같이 구성을 추가할 수 있습니다.

import boto3 import json import requests import base64 mwaa_env_name = 'YOUR_ENVIRONMENT_NAME' dag_name = 'YOUR_DAG_NAME' key = "YOUR_KEY" value = "YOUR_VALUE" conf = "{\"" + key + "\":\"" + value + "\"}" client = boto3.client('mwaa') mwaa_cli_token = client.create_cli_token( Name=mwaa_env_name ) mwaa_auth_token = 'Bearer ' + mwaa_cli_token['CliToken'] mwaa_webserver_hostname = 'https://{0}/aws_mwaa/cli'.format(mwaa_cli_token['WebServerHostname']) raw_data = "trigger_dag {0} -c '{1}'".format(dag_name, conf) mwaa_response = requests.post( mwaa_webserver_hostname, headers={ 'Authorization': mwaa_auth_token, 'Content-Type': 'text/plain' }, data=raw_data ) mwaa_std_err_message = base64.b64decode(mwaa_response.json()['stderr']).decode('utf8') mwaa_std_out_message = base64.b64decode(mwaa_response.json()['stdout']).decode('utf8') print(mwaa_response.status_code) print(mwaa_std_err_message) print(mwaa_std_out_message)

Bastion Host에 대한 SSH 터널에서 CLI 명령 실행

다음 예제는 Linux Bastion Host에 대한 SSH 터널 프록시를 사용하여 Airflow CLI 명령을 실행하는 방법을 보여줍니다.

curl 사용
  1. ssh -D 8080 -f -C -q -N YOUR_USER@YOUR_BASTION_HOST
  2. curl -x socks5h://0:8080 --request POST https://YOUR_HOST_NAME/aws_mwaa/cli --header YOUR_HEADERS --data-raw YOUR_CLI_COMMAND

GitHub 샘플 AWS 및 튜토리얼