本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
Apache 气流命令参考 CLI
本主题介绍适用CLI于 Apache Airflow 的亚马逊托管工作流程中支持和不支持的 Apache Airflow 命令。
目录
先决条件
下一节介绍了使用本页上的命令和脚本所需的初步步骤。
访问
-
AWS AWS Identity and Access Management (IAM) 中的账户访问权限,可访问中的 Amazon MWAA 权限策略Apache Airflow 用户界面访问策略:A mazonMWAAWeb ServerAccess。
-
AWS AWS Identity and Access Management (IAM) 中的账户访问亚马逊MWAA权限策略完全访问API和控制台访问策略:A mazonMWAAFull ApiAccess。
AWS CLI
AWS Command Line Interface (AWS CLI) 是一个开源工具,可让您使用命令行 shell 中的命令与 AWS 服务进行交互。要完成本节中的步骤,您需要以下满足以下条件:
v2 中发生了什么变化
-
新增:气流CLI命令结构。Apache Airflow v2 的组织方式CLI是将相关命令分组为子命令,这意味着如果你想升级到 Apache Airflow v2,则需要更新 Apache Airflow v1 脚本。例如,Apache Airflow v1 中的
unpause
已更新为 Apache Airflow v2 中的dags unpause
。要了解更多信息,请参阅 A pache Airflow 参考指南中 2 中的气流CLI变化。
支持的 CLI 命令
以下部分列出了亚马逊上可用的 Apache Airflow CLI 命令。MWAA
支持的 命令
使用解析命令 DAGs
如果您的环境运行的是 Apache Airflow v1.10.12 或 v2.0.2,则如果DAG使用依赖于通过以下方式安装的软件包的插件,则解析CLI命令DAGs将失败:requirements.txt
Apache Airflow v2.0.2
-
dags backfill
-
dags list
-
dags list-runs
-
dags next-execution
如果您DAGs不使用依赖于通过安装的软件包的插件,则可以使用这些CLI命令requirements.txt
。
代码示例
以下部分包含使用 Apache Air CLI flow 的不同方法的示例。
设置、获取或删除 Apache Airflow v2 变量
您可以使用以下示例代码设置、获取或删除 <script> <mwaa env name> get | set | delete <variable> <variable value> </variable> </variable>
格式的变量。
[ $# -eq 0 ] && echo "Usage: $0 MWAA environment name " && exit if [[ $2 == "" ]]; then dag="variables list" elif [ $2 == "get" ] || [ $2 == "delete" ] || [ $2 == "set" ]; then dag="variables $2 $3 $4 $5" else echo "Not a valid command" exit 1 fi CLI_JSON=$(aws mwaa --region $AWS_REGION create-cli-token --name $1) \ && CLI_TOKEN=$(echo $CLI_JSON | jq -r '.CliToken') \ && WEB_SERVER_HOSTNAME=$(echo $CLI_JSON | jq -r '.WebServerHostname') \ && CLI_RESULTS=$(curl --request POST "https://$WEB_SERVER_HOSTNAME/aws_mwaa/cli" \ --header "Authorization: Bearer $CLI_TOKEN" \ --header "Content-Type: text/plain" \ --data-raw "$dag" ) \ && echo "Output:" \ && echo $CLI_RESULTS | jq -r '.stdout' | base64 --decode \ && echo "Errors:" \ && echo $CLI_RESULTS | jq -r '.stderr' | base64 --decode
触发时添加配置 DAG
您可以在触发 Apache Airflow v1 和 Apache Airflow v2 时使用以下示例代码来添加配置,例如。DAG airflow trigger_dag 'dag_name' —conf '{"key":"value"}'
import boto3 import json import requests import base64 mwaa_env_name = '
YOUR_ENVIRONMENT_NAME
' dag_name = 'YOUR_DAG_NAME
' key = "YOUR_KEY
" value = "YOUR_VALUE
" conf = "{\"" + key + "\":\"" + value + "\"}" client = boto3.client('mwaa') mwaa_cli_token = client.create_cli_token( Name=mwaa_env_name ) mwaa_auth_token = 'Bearer ' + mwaa_cli_token['CliToken'] mwaa_webserver_hostname = 'https://{0}/aws_mwaa/cli'.format(mwaa_cli_token['WebServerHostname']) raw_data = "trigger_dag {0} -c '{1}'".format(dag_name, conf) mwaa_response = requests.post( mwaa_webserver_hostname, headers={ 'Authorization': mwaa_auth_token, 'Content-Type': 'text/plain' }, data=raw_data ) mwaa_std_err_message = base64.b64decode(mwaa_response.json()['stderr']).decode('utf8') mwaa_std_out_message = base64.b64decode(mwaa_response.json()['stdout']).decode('utf8') print(mwaa_response.status_code) print(mwaa_std_err_message) print(mwaa_std_out_message)
在通往堡垒主机的SSH隧道上运行CLI命令
以下示例说明如何使用SSH隧道代理运行 Airflow CLI 命令到 Linux 堡垒主机。
使用 curl
-
ssh -D 8080 -f -C -q -N
YOUR_USER
@YOUR_BASTION_HOST
-
curl -x socks5h://0:8080 --request POST https://
YOUR_HOST_NAME
/aws_mwaa/cli --headerYOUR_HEADERS
--data-rawYOUR_CLI_COMMAND