Apache 气流命令参考 CLI - Amazon Managed Workflows for Apache Airflow

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

Apache 气流命令参考 CLI

本主题介绍适用CLI于 Apache Airflow 的亚马逊托管工作流程中支持和不支持的 Apache Airflow 命令。

先决条件

下一节介绍了使用本页上的命令和脚本所需的初步步骤。

访问

AWS CLI

AWS Command Line Interface (AWS CLI) 是一个开源工具,可让您使用命令行 shell 中的命令与 AWS 服务进行交互。要完成本节中的步骤,您需要以下满足以下条件:

v2 中发生了什么变化

  • 新增:气流CLI命令结构。Apache Airflow v2 的组织方式CLI是将相关命令分组为子命令,这意味着如果你想升级到 Apache Airflow v2,则需要更新 Apache Airflow v1 脚本。例如,Apache Airflow v1 中的 unpause 已更新为 Apache Airflow v2 中的 dags unpause。要了解更多信息,请参阅 A pache Airflow 参考指南中 2 中的气流CLI变化

支持的 CLI 命令

以下部分列出了亚马逊上可用的 Apache Airflow CLI 命令。MWAA

支持的 命令

Apache Airflow v2

使用解析命令 DAGs

如果您的环境运行的是 Apache Airflow v1.10.12 或 v2.0.2,则如果DAG使用依赖于通过以下方式安装的软件包的插件,则解析CLI命令DAGs将失败:requirements.txt

Apache Airflow v2.0.2
  • dags backfill

  • dags list

  • dags list-runs

  • dags next-execution

如果您DAGs不使用依赖于通过安装的软件包的插件,则可以使用这些CLI命令requirements.txt

代码示例

以下部分包含使用 Apache Air CLI flow 的不同方法的示例。

设置、获取或删除 Apache Airflow v2 变量

您可以使用以下示例代码设置、获取或删除 <script> <mwaa env name> get | set | delete <variable> <variable value> </variable> </variable> 格式的变量。

[ $# -eq 0 ] && echo "Usage: $0 MWAA environment name " && exit if [[ $2 == "" ]]; then dag="variables list" elif [ $2 == "get" ] || [ $2 == "delete" ] || [ $2 == "set" ]; then dag="variables $2 $3 $4 $5" else echo "Not a valid command" exit 1 fi CLI_JSON=$(aws mwaa --region $AWS_REGION create-cli-token --name $1) \ && CLI_TOKEN=$(echo $CLI_JSON | jq -r '.CliToken') \ && WEB_SERVER_HOSTNAME=$(echo $CLI_JSON | jq -r '.WebServerHostname') \ && CLI_RESULTS=$(curl --request POST "https://$WEB_SERVER_HOSTNAME/aws_mwaa/cli" \ --header "Authorization: Bearer $CLI_TOKEN" \ --header "Content-Type: text/plain" \ --data-raw "$dag" ) \ && echo "Output:" \ && echo $CLI_RESULTS | jq -r '.stdout' | base64 --decode \ && echo "Errors:" \ && echo $CLI_RESULTS | jq -r '.stderr' | base64 --decode

触发时添加配置 DAG

您可以在触发 Apache Airflow v1 和 Apache Airflow v2 时使用以下示例代码来添加配置,例如。DAG airflow trigger_dag 'dag_name' —conf '{"key":"value"}'

import boto3 import json import requests import base64 mwaa_env_name = 'YOUR_ENVIRONMENT_NAME' dag_name = 'YOUR_DAG_NAME' key = "YOUR_KEY" value = "YOUR_VALUE" conf = "{\"" + key + "\":\"" + value + "\"}" client = boto3.client('mwaa') mwaa_cli_token = client.create_cli_token( Name=mwaa_env_name ) mwaa_auth_token = 'Bearer ' + mwaa_cli_token['CliToken'] mwaa_webserver_hostname = 'https://{0}/aws_mwaa/cli'.format(mwaa_cli_token['WebServerHostname']) raw_data = "trigger_dag {0} -c '{1}'".format(dag_name, conf) mwaa_response = requests.post( mwaa_webserver_hostname, headers={ 'Authorization': mwaa_auth_token, 'Content-Type': 'text/plain' }, data=raw_data ) mwaa_std_err_message = base64.b64decode(mwaa_response.json()['stderr']).decode('utf8') mwaa_std_out_message = base64.b64decode(mwaa_response.json()['stdout']).decode('utf8') print(mwaa_response.status_code) print(mwaa_std_err_message) print(mwaa_std_out_message)

在通往堡垒主机的SSH隧道上运行CLI命令

以下示例说明如何使用SSH隧道代理运行 Airflow CLI 命令到 Linux 堡垒主机。

使用 curl
  1. ssh -D 8080 -f -C -q -N YOUR_USER@YOUR_BASTION_HOST
  2. curl -x socks5h://0:8080 --request POST https://YOUR_HOST_NAME/aws_mwaa/cli --header YOUR_HEADERS --data-raw YOUR_CLI_COMMAND

中的示例 GitHub 和 AWS 教程