Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.
Utilisation d'un DAG pour importer des variables dans la CLI
L'exemple de code suivant importe des variables à l'aide de l'interface de ligne de commande sur Amazon Managed Workflows pour Apache Airflow.
Version
-
Vous pouvez utiliser l'exemple de code de cette page avecApache Airflow v2 et versions ultérieuresdansPython 3.10
.
Prérequis
-
Aucune autorisation supplémentaire n'est requise pour utiliser l'exemple de code de cette page.
Autorisations
VotreAWSle compte doit accéder auAmazonMWAAAirflowCliAccess
politique. Pour en savoir plus, consultez Politique de la CLI Apache Airflow : Accès Amazon AirflowCli MWAA.
Dépendances
-
Pour utiliser cet exemple de code avec Apache Airflow v2, aucune dépendance supplémentaire n'est requise. Le code utiliseInstallation de base d'Apache Airflow v2
sur votre environnement.
Exemple de code
L'exemple de code suivant nécessite trois entrées : le nom de votre environnement Amazon MWAA (dansmwaa_env
), leAWSRégion de votre environnement (enaws_region
) et le fichier local qui contient les variables que vous souhaitez importer (dansvar_file
).
import boto3 import json import requests import base64 import getopt import sys argv = sys.argv[1:] mwaa_env='' aws_region='' var_file='' try: opts, args = getopt.getopt(argv, 'e:v:r:', ['environment', 'variable-file','region']) #if len(opts) == 0 and len(opts) > 3: if len(opts) != 3: print ('Usage: -e MWAA environment -v variable file location and filename -r aws region') else: for opt, arg in opts: if opt in ("-e"): mwaa_env=arg elif opt in ("-r"): aws_region=arg elif opt in ("-v"): var_file=arg boto3.setup_default_session(region_name="{}".format(aws_region)) mwaa_env_name = "{}".format(mwaa_env) client = boto3.client('mwaa') mwaa_cli_token = client.create_cli_token( Name=mwaa_env_name ) with open ("{}".format(var_file), "r") as myfile: fileconf = myfile.read().replace('\n', '') json_dictionary = json.loads(fileconf) for key in json_dictionary: print(key, " ", json_dictionary[key]) val = (key + " " + json_dictionary[key]) mwaa_auth_token = 'Bearer ' + mwaa_cli_token['CliToken'] mwaa_webserver_hostname = 'https://{0}/aws_mwaa/cli'.format(mwaa_cli_token['WebServerHostname']) raw_data = "variables set {0}".format(val) mwaa_response = requests.post( mwaa_webserver_hostname, headers={ 'Authorization': mwaa_auth_token, 'Content-Type': 'text/plain' }, data=raw_data ) mwaa_std_err_message = base64.b64decode(mwaa_response.json()['stderr']).decode('utf8') mwaa_std_out_message = base64.b64decode(mwaa_response.json()['stdout']).decode('utf8') print(mwaa_response.status_code) print(mwaa_std_err_message) print(mwaa_std_out_message) except: print('Use this script with the following options: -e MWAA environment -v variable file location and filename -r aws region') print("Unexpected error:", sys.exc_info()[0]) sys.exit(2)
Quelle est la prochaine étape ?
-
Découvrez comment charger le code DAG de cet exemple vers le
dags
dossier dans votre compartiment Amazon S3 dansAjout ou mise à jour des DAG.