Utilisation d'un DAG pour importer des variables dans la CLI - Amazon Managed Workflows for Apache Airflow

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Utilisation d'un DAG pour importer des variables dans la CLI

L'exemple de code suivant importe des variables à l'aide de l'interface de ligne de commande sur Amazon Managed Workflows pour Apache Airflow.

Version

  • Vous pouvez utiliser l'exemple de code de cette page avecApache Airflow v2 et versions ultérieuresdansPython 3.10.

Prérequis

  • Aucune autorisation supplémentaire n'est requise pour utiliser l'exemple de code de cette page.

Autorisations

VotreAWSle compte doit accéder auAmazonMWAAAirflowCliAccesspolitique. Pour en savoir plus, consultez Politique de la CLI Apache Airflow : Accès Amazon AirflowCli MWAA.

Dépendances

Exemple de code

L'exemple de code suivant nécessite trois entrées : le nom de votre environnement Amazon MWAA (dansmwaa_env), leAWSRégion de votre environnement (enaws_region) et le fichier local qui contient les variables que vous souhaitez importer (dansvar_file).

import boto3 import json import requests import base64 import getopt import sys argv = sys.argv[1:] mwaa_env='' aws_region='' var_file='' try: opts, args = getopt.getopt(argv, 'e:v:r:', ['environment', 'variable-file','region']) #if len(opts) == 0 and len(opts) > 3: if len(opts) != 3: print ('Usage: -e MWAA environment -v variable file location and filename -r aws region') else: for opt, arg in opts: if opt in ("-e"): mwaa_env=arg elif opt in ("-r"): aws_region=arg elif opt in ("-v"): var_file=arg boto3.setup_default_session(region_name="{}".format(aws_region)) mwaa_env_name = "{}".format(mwaa_env) client = boto3.client('mwaa') mwaa_cli_token = client.create_cli_token( Name=mwaa_env_name ) with open ("{}".format(var_file), "r") as myfile: fileconf = myfile.read().replace('\n', '') json_dictionary = json.loads(fileconf) for key in json_dictionary: print(key, " ", json_dictionary[key]) val = (key + " " + json_dictionary[key]) mwaa_auth_token = 'Bearer ' + mwaa_cli_token['CliToken'] mwaa_webserver_hostname = 'https://{0}/aws_mwaa/cli'.format(mwaa_cli_token['WebServerHostname']) raw_data = "variables set {0}".format(val) mwaa_response = requests.post( mwaa_webserver_hostname, headers={ 'Authorization': mwaa_auth_token, 'Content-Type': 'text/plain' }, data=raw_data ) mwaa_std_err_message = base64.b64decode(mwaa_response.json()['stderr']).decode('utf8') mwaa_std_out_message = base64.b64decode(mwaa_response.json()['stdout']).decode('utf8') print(mwaa_response.status_code) print(mwaa_std_err_message) print(mwaa_std_out_message) except: print('Use this script with the following options: -e MWAA environment -v variable file location and filename -r aws region') print("Unexpected error:", sys.exc_info()[0]) sys.exit(2)

Quelle est la prochaine étape ?