阿帕奇氣流 CLI 命令參考 - Amazon Managed Workflows for Apache Airflow

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

阿帕奇氣流 CLI 命令參考

本頁說明在 Amazon 管理的 Apache 氣流程中支援和不支援的 Apache 氣流 CLI 命令。

必要條件

以下段落說明使用此頁面上的命令和命令檔所需的初步步驟。

存取

AWS CLI

AWS Command Line Interface (AWS CLI) 是開放原始碼工具,可讓您使用命令列殼層中的命令與 AWS 服務互動。若要完成此頁面上的步驟,您需要下列項目:

v2 中有什麼變化

  • 新增:氣流 CLI 命令結構。Apache 氣流 v2 CLI 的組織,以便相關的命令被組合在一起作為子命令,這意味著你需要更新 Apache 氣流 v1 腳本,如果你想升級到 Apache 氣流 V2。例如,unpause在阿帕奇氣流 V1 現在是dags unpause在阿帕奇氣流 V2。若要深入了解,請參閱 Apache 氣流參考指南中的 2 中的氣流 CLI 變更。

支援的 CLI 指令

下一節列出 Amazon MWAA 上可用的 Apache 氣流 CLI 命令。

支援的命令

Apache Airflow v2

使用剖析 DAG 的命令

如果您的環境正在執行 Apache 氣流 v1.10.12 或 v2.0.2,如果 DAG 使用的外掛程式依賴透過下列方式安裝的套件,剖析 DAG 的 CLI 命令將會失敗:requirements.txt

阿帕奇氣流 v2.0.2
  • dags backfill

  • dags list

  • dags list-runs

  • dags next-execution

如果您的 DAG 不使用相依於透過requirements.txt.

範本程式碼

下一節包含使用 Apache 氣流 CLI 的不同方式的範例。

設置,獲取或刪除阿帕奇氣流 v2 變量

您可以使用下列範例程式碼來設定、取得或刪除格式的變數<script> <mwaa env name> get | set | delete <variable> <variable value> </variable> </variable>

[ $# -eq 0 ] && echo "Usage: $0 MWAA environment name " && exit if [[ $2 == "" ]]; then dag="variables list" elif [ $2 == "get" ] || [ $2 == "delete" ] || [ $2 == "set" ]; then dag="variables $2 $3 $4 $5" else echo "Not a valid command" exit 1 fi CLI_JSON=$(aws mwaa --region $AWS_REGION create-cli-token --name $1) \ && CLI_TOKEN=$(echo $CLI_JSON | jq -r '.CliToken') \ && WEB_SERVER_HOSTNAME=$(echo $CLI_JSON | jq -r '.WebServerHostname') \ && CLI_RESULTS=$(curl --request POST "https://$WEB_SERVER_HOSTNAME/aws_mwaa/cli" \ --header "Authorization: Bearer $CLI_TOKEN" \ --header "Content-Type: text/plain" \ --data-raw "$dag" ) \ && echo "Output:" \ && echo $CLI_RESULTS | jq -r '.stdout' | base64 --decode \ && echo "Errors:" \ && echo $CLI_RESULTS | jq -r '.stderr' | base64 --decode

觸發 DAG 時新增組態

您可以使用下列範例程式碼搭配 Apache 氣流 v1 和 Apache 氣流 v2,在觸發 DAG 時新增組態,例如airflow trigger_dag 'dag_name' —conf '{"key":"value"}'.

import boto3 import json import requests import base64 mwaa_env_name = 'YOUR_ENVIRONMENT_NAME' dag_name = 'YOUR_DAG_NAME' key = "YOUR_KEY" value = "YOUR_VALUE" conf = "{\"" + key + "\":\"" + value + "\"}" client = boto3.client('mwaa') mwaa_cli_token = client.create_cli_token( Name=mwaa_env_name ) mwaa_auth_token = 'Bearer ' + mwaa_cli_token['CliToken'] mwaa_webserver_hostname = 'https://{0}/aws_mwaa/cli'.format(mwaa_cli_token['WebServerHostname']) raw_data = "trigger_dag {0} -c '{1}'".format(dag_name, conf) mwaa_response = requests.post( mwaa_webserver_hostname, headers={ 'Authorization': mwaa_auth_token, 'Content-Type': 'text/plain' }, data=raw_data ) mwaa_std_err_message = base64.b64decode(mwaa_response.json()['stderr']).decode('utf8') mwaa_std_out_message = base64.b64decode(mwaa_response.json()['stdout']).decode('utf8') print(mwaa_response.status_code) print(mwaa_std_err_message) print(mwaa_std_out_message)

在 SSH 通道上執行 CLI 命令到防禦主機

下列範例顯示如何使用安全殼層通道代理伺服器對 Linux 防禦主機執行氣流 CLI 命令。

使用捲曲
  1. ssh -D 8080 -f -C -q -N YOUR_USER@YOUR_BASTION_HOST
  2. curl -x socks5h://0:8080 --request POST https://YOUR_HOST_NAME/aws_mwaa/cli --header YOUR_HEADERS --data-raw YOUR_CLI_COMMAND

範例 GitHub 和教 AWS 學課程