Menggunakan DAG untuk mengimpor variabel di CLI - Amazon Managed Workflows for Apache Airflow (MWAA)

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Menggunakan DAG untuk mengimpor variabel di CLI

Kode contoh berikut mengimpor variabel menggunakan CLI di Amazon Managed Workflow untuk Apache Airflow.

Versi

  • Anda dapat menggunakan contoh kode di halaman ini denganApache Airflow v2 dan di atasnyadiPython 3.10.

Prasyarat

  • Tidak ada izin tambahan yang diperlukan untuk menggunakan contoh kode di halaman ini.

Izin

AndaAWSmembutuhkan akses keAmazonMWAAAirflowCliAccesskebijakan. Untuk mempelajari selengkapnya, lihat Kebijakan CLI Aliran Udara Apache: AmazonMwaa AirflowCliAccess.

Dependensi

  • Untuk menggunakan contoh kode ini dengan Apache Airflow v2, tidak diperlukan dependensi tambahan. Kode menggunakanInstalasi dasar Apache Airflow v2pada lingkungan Anda.

Sampel kode

Kode contoh berikut mengambil tiga input: nama lingkungan Amazon MWAA Anda (dimwaa_env),AWSWilayah lingkungan Anda (diaws_region), dan file lokal yang berisi variabel yang ingin Anda impor (divar_file).

import boto3 import json import requests import base64 import getopt import sys argv = sys.argv[1:] mwaa_env='' aws_region='' var_file='' try: opts, args = getopt.getopt(argv, 'e:v:r:', ['environment', 'variable-file','region']) #if len(opts) == 0 and len(opts) > 3: if len(opts) != 3: print ('Usage: -e MWAA environment -v variable file location and filename -r aws region') else: for opt, arg in opts: if opt in ("-e"): mwaa_env=arg elif opt in ("-r"): aws_region=arg elif opt in ("-v"): var_file=arg boto3.setup_default_session(region_name="{}".format(aws_region)) mwaa_env_name = "{}".format(mwaa_env) client = boto3.client('mwaa') mwaa_cli_token = client.create_cli_token( Name=mwaa_env_name ) with open ("{}".format(var_file), "r") as myfile: fileconf = myfile.read().replace('\n', '') json_dictionary = json.loads(fileconf) for key in json_dictionary: print(key, " ", json_dictionary[key]) val = (key + " " + json_dictionary[key]) mwaa_auth_token = 'Bearer ' + mwaa_cli_token['CliToken'] mwaa_webserver_hostname = 'https://{0}/aws_mwaa/cli'.format(mwaa_cli_token['WebServerHostname']) raw_data = "variables set {0}".format(val) mwaa_response = requests.post( mwaa_webserver_hostname, headers={ 'Authorization': mwaa_auth_token, 'Content-Type': 'text/plain' }, data=raw_data ) mwaa_std_err_message = base64.b64decode(mwaa_response.json()['stderr']).decode('utf8') mwaa_std_out_message = base64.b64decode(mwaa_response.json()['stdout']).decode('utf8') print(mwaa_response.status_code) print(mwaa_std_err_message) print(mwaa_std_out_message) except: print('Use this script with the following options: -e MWAA environment -v variable file location and filename -r aws region') print("Unexpected error:", sys.exc_info()[0]) sys.exit(2)

Apa selanjutnya?