Apache Airflow CLI 命令參考 - Amazon Managed Workflows for Apache Airflow

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

Apache Airflow CLI 命令參考

本主題說明 Amazon Managed Workflows for Apache Airflow 上支援和不支援的 Apache Airflow CLI 命令。

必要條件

下一節說明使用此頁面上的命令和指令碼所需的初步步驟。

存取

AWS CLI

AWS Command Line Interface (AWS CLI) 是一種開放原始碼工具,可讓您使用命令列 Shell 中的命令與 AWS 服務互動。若要完成此頁面上的步驟,您需要下列項目:

v2 中的變更

  • 新增:Airflow CLI 命令結構。Apache Airflow v2 CLI 經過組織,因此相關命令會分組為子命令,這表示如果您想要升級至 Apache Airflow v2,您需要更新 Apache Airflow v1 指令碼。例如,unpause在 Apache Airflow v1 中,現在位於 Apache Airflow v2 dags unpause中。若要進一步了解,請參閱 Apache Airflow 參考指南中的 2 中的 Airflow CLI 變更

支援的 CLI 命令

下一節列出 Amazon CLI 上可用的 Apache Airflow MWAA 命令。

支援的命令

Apache Airflow v2

使用剖析 DAGs 的命令

如果您的環境正在執行 Apache Airflow v1.10.12 或 v2.0.2,如果 CLI 使用依賴透過 安裝的套件的外掛程式,則剖析 DAGs 的 DAG 命令將會失敗requirements.txt

Apache Airflow 2.0.2 版
  • dags backfill

  • dags list

  • dags list-runs

  • dags next-execution

如果您的 CLI 不使用依賴透過 安裝的套件的外掛程式,您可以使用這些 DAGs 命令requirements.txt

範本程式碼

下一節包含使用 Apache Airflow CLI 的不同方法範例。

設定、取得或刪除 Apache Airflow v2 變數

您可以使用下列範例程式碼來設定、取得或刪除格式為 的變數<script> <mwaa env name> get | set | delete <variable> <variable value> </variable> </variable>

[ $# -eq 0 ] && echo "Usage: $0 MWAA environment name " && exit if [[ $2 == "" ]]; then dag="variables list" elif [ $2 == "get" ] || [ $2 == "delete" ] || [ $2 == "set" ]; then dag="variables $2 $3 $4 $5" else echo "Not a valid command" exit 1 fi CLI_JSON=$(aws mwaa --region $AWS_REGION create-cli-token --name $1) \ && CLI_TOKEN=$(echo $CLI_JSON | jq -r '.CliToken') \ && WEB_SERVER_HOSTNAME=$(echo $CLI_JSON | jq -r '.WebServerHostname') \ && CLI_RESULTS=$(curl --request POST "https://$WEB_SERVER_HOSTNAME/aws_mwaa/cli" \ --header "Authorization: Bearer $CLI_TOKEN" \ --header "Content-Type: text/plain" \ --data-raw "$dag" ) \ && echo "Output:" \ && echo $CLI_RESULTS | jq -r '.stdout' | base64 --decode \ && echo "Errors:" \ && echo $CLI_RESULTS | jq -r '.stderr' | base64 --decode

觸發 DAG 時新增組態

您可以在觸發 DAG 時,搭配 Apache Airflow v1 和 Apache Airflow v2 使用下列範例程式碼來新增組態,例如 airflow trigger_dag 'dag_name' —conf '{"key":"value"}'

import boto3 import json import requests import base64 mwaa_env_name = 'YOUR_ENVIRONMENT_NAME' dag_name = 'YOUR_DAG_NAME' key = "YOUR_KEY" value = "YOUR_VALUE" conf = "{\"" + key + "\":\"" + value + "\"}" client = boto3.client('mwaa') mwaa_cli_token = client.create_cli_token( Name=mwaa_env_name ) mwaa_auth_token = 'Bearer ' + mwaa_cli_token['CliToken'] mwaa_webserver_hostname = 'https://{0}/aws_mwaa/cli'.format(mwaa_cli_token['WebServerHostname']) raw_data = "trigger_dag {0} -c '{1}'".format(dag_name, conf) mwaa_response = requests.post( mwaa_webserver_hostname, headers={ 'Authorization': mwaa_auth_token, 'Content-Type': 'text/plain' }, data=raw_data ) mwaa_std_err_message = base64.b64decode(mwaa_response.json()['stderr']).decode('utf8') mwaa_std_out_message = base64.b64decode(mwaa_response.json()['stdout']).decode('utf8') print(mwaa_response.status_code) print(mwaa_std_err_message) print(mwaa_std_out_message)

在 CLI 通道上執行 SSH 命令至基礎結構主機

下列範例示範如何使用 CLI 通道代理執行 Airflow SSH 命令至 Linux Bastion 主機。

使用 curl
  1. ssh -D 8080 -f -C -q -N YOUR_USER@YOUR_BASTION_HOST
  2. curl -x socks5h://0:8080 --request POST https://YOUR_HOST_NAME/aws_mwaa/cli --header YOUR_HEADERS --data-raw YOUR_CLI_COMMAND

in GitHub 和 AWS 教學課程範例