Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.
Utilizzo di un DAG per importare variabili nella CLI
Il seguente codice di esempio importa le variabili utilizzando l'interfaccia a riga di comando su Amazon Managed Workflows per Apache Airflow.
Versione
-
Puoi usare l'esempio di codice in questa pagina conApache Airflow v2 e versioni successivenelPython 3.10
.
Prerequisiti
-
Non sono necessarie autorizzazioni aggiuntive per utilizzare l'esempio di codice in questa pagina.
Autorizzazioni
Il tuoAWSl'account deve accedere alAmazonMWAAAirflowCliAccess
politica. Per ulteriori informazioni, consulta Politica CLI di Apache Airflow: AmazonMWAA AirflowCliAccess.
Dipendenze
-
Per utilizzare questo esempio di codice con Apache Airflow v2, non sono necessarie dipendenze aggiuntive. Il codice utilizzaInstallazione base di Apache Airflow v2
sul tuo ambiente.
Esempio di codice
Il seguente codice di esempio richiede tre input: il nome del tuo ambiente Amazon MWAA (inmwaa_env
), ilAWSRegione del tuo ambiente (inaws_region
) e il file locale che contiene le variabili da importare (invar_file
).
import boto3 import json import requests import base64 import getopt import sys argv = sys.argv[1:] mwaa_env='' aws_region='' var_file='' try: opts, args = getopt.getopt(argv, 'e:v:r:', ['environment', 'variable-file','region']) #if len(opts) == 0 and len(opts) > 3: if len(opts) != 3: print ('Usage: -e MWAA environment -v variable file location and filename -r aws region') else: for opt, arg in opts: if opt in ("-e"): mwaa_env=arg elif opt in ("-r"): aws_region=arg elif opt in ("-v"): var_file=arg boto3.setup_default_session(region_name="{}".format(aws_region)) mwaa_env_name = "{}".format(mwaa_env) client = boto3.client('mwaa') mwaa_cli_token = client.create_cli_token( Name=mwaa_env_name ) with open ("{}".format(var_file), "r") as myfile: fileconf = myfile.read().replace('\n', '') json_dictionary = json.loads(fileconf) for key in json_dictionary: print(key, " ", json_dictionary[key]) val = (key + " " + json_dictionary[key]) mwaa_auth_token = 'Bearer ' + mwaa_cli_token['CliToken'] mwaa_webserver_hostname = 'https://{0}/aws_mwaa/cli'.format(mwaa_cli_token['WebServerHostname']) raw_data = "variables set {0}".format(val) mwaa_response = requests.post( mwaa_webserver_hostname, headers={ 'Authorization': mwaa_auth_token, 'Content-Type': 'text/plain' }, data=raw_data ) mwaa_std_err_message = base64.b64decode(mwaa_response.json()['stderr']).decode('utf8') mwaa_std_out_message = base64.b64decode(mwaa_response.json()['stdout']).decode('utf8') print(mwaa_response.status_code) print(mwaa_std_err_message) print(mwaa_std_out_message) except: print('Use this script with the following options: -e MWAA environment -v variable file location and filename -r aws region') print("Unexpected error:", sys.exc_info()[0]) sys.exit(2)
Fasi successive
-
Scopri come caricare il codice DAG in questo esempio su
dags
cartella nel tuo bucket Amazon S3Aggiunta o aggiornamento di DAG.