Apache Airflow CLI 命令參考 - Amazon Managed Workflows for Apache Airflow

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

Apache Airflow CLI 命令參考

本主題說明 Amazon Managed Workflows for Apache Airflow 上支援和不支援的 Apache Airflow CLI 命令。

提示

REST API 比 CLI 更現代化,旨在與外部系統進行程式設計整合。REST 是與 Apache Airflow 互動的偏好方式。

先決條件

下一節說明使用此頁面上的命令和指令碼所需的初步步驟。

存取

AWS CLI

AWS Command Line Interface (AWS CLI) 是一種開放原始碼工具,可讓您使用命令列 shell 中的命令與 AWS 服務互動。若要完成此頁面上的步驟,您需要下列項目:

v2 中的變更

  • 新增:Airflow CLI 命令結構。Apache Airflow v2 CLI 經過組織,因此相關命令會分組為子命令,這表示如果您想要升級至 Apache Airflow v2,則需要更新 Apache Airflow v1 指令碼。例如,unpauseApache Airflow v1 現在位於 dags unpause Apache Airflow v2。若要進一步了解,請參閱《Apache Airflow 參考指南》中的 2 中的 Airflow CLI 變更

支援的 CLI 命令

下一節列出 Amazon MWAA 上可用的 Apache Airflow CLI 命令。

支援的命令

Apache Airflow v2

使用剖析 DAGs命令

如果您的環境執行的是 Apache Airflow v1.10.12 或 v2.0.2,如果 DAG 使用依賴透過 安裝的套件的外掛程式,則剖析 DAGs CLI 命令將會失敗requirements.txt

Apache Airflow 2.0.2 版
  • dags backfill

  • dags list

  • dags list-runs

  • dags next-execution

如果您的 DAGs 不使用依賴透過 安裝的套件的外掛程式,您可以使用這些 CLI 命令requirements.txt

範本程式碼

下一節包含使用 Apache Airflow CLI 的不同方式範例。

設定、取得或刪除 Apache Airflow v2 變數

您可以使用下列範例程式碼來設定、取得或刪除格式為 的變數<script> <mwaa env name> get | set | delete <variable> <variable value> </variable> </variable>

[ $# -eq 0 ] && echo "Usage: $0 MWAA environment name " && exit if [[ $2 == "" ]]; then dag="variables list" elif [ $2 == "get" ] || [ $2 == "delete" ] || [ $2 == "set" ]; then dag="variables $2 $3 $4 $5" else echo "Not a valid command" exit 1 fi CLI_JSON=$(aws mwaa --region $AWS_REGION create-cli-token --name $1) \ && CLI_TOKEN=$(echo $CLI_JSON | jq -r '.CliToken') \ && WEB_SERVER_HOSTNAME=$(echo $CLI_JSON | jq -r '.WebServerHostname') \ && CLI_RESULTS=$(curl --request POST "https://$WEB_SERVER_HOSTNAME/aws_mwaa/cli" \ --header "Authorization: Bearer $CLI_TOKEN" \ --header "Content-Type: text/plain" \ --data-raw "$dag" ) \ && echo "Output:" \ && echo $CLI_RESULTS | jq -r '.stdout' | base64 --decode \ && echo "Errors:" \ && echo $CLI_RESULTS | jq -r '.stderr' | base64 --decode

觸發 DAG 時新增組態

您可以使用下列範例程式碼搭配 Apache Airflow v1 和 Apache Airflow v2,在觸發 DAG 時新增組態,例如 airflow trigger_dag 'dag_name' —conf '{"key":"value"}'

import boto3 import json import requests import base64 mwaa_env_name = 'YOUR_ENVIRONMENT_NAME' dag_name = 'YOUR_DAG_NAME' key = "YOUR_KEY" value = "YOUR_VALUE" conf = "{\"" + key + "\":\"" + value + "\"}" client = boto3.client('mwaa') mwaa_cli_token = client.create_cli_token( Name=mwaa_env_name ) mwaa_auth_token = 'Bearer ' + mwaa_cli_token['CliToken'] mwaa_webserver_hostname = 'https://{0}/aws_mwaa/cli'.format(mwaa_cli_token['WebServerHostname']) raw_data = "trigger_dag {0} -c '{1}'".format(dag_name, conf) mwaa_response = requests.post( mwaa_webserver_hostname, headers={ 'Authorization': mwaa_auth_token, 'Content-Type': 'text/plain' }, data=raw_data ) mwaa_std_err_message = base64.b64decode(mwaa_response.json()['stderr']).decode('utf8') mwaa_std_out_message = base64.b64decode(mwaa_response.json()['stdout']).decode('utf8') print(mwaa_response.status_code) print(mwaa_std_err_message) print(mwaa_std_out_message)

在對堡壘主機的 SSH 通道上執行 CLI 命令

下列範例示範如何使用 SSH 通道代理對 Linux 堡壘主機執行 Airflow CLI 命令。

使用 curl
  1. ssh -D 8080 -f -C -q -N YOUR_USER@YOUR_BASTION_HOST
  2. curl -x socks5h://0:8080 --request POST https://YOUR_HOST_NAME/aws_mwaa/cli --header YOUR_HEADERS --data-raw YOUR_CLI_COMMAND

GitHub 中的範例和 AWS 教學課程