Memanggil DAG di lingkungan Amazon MWAA yang berbeda - Amazon Managed Workflows for Apache Airflow (MWAA)

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Memanggil DAG di lingkungan Amazon MWAA yang berbeda

Contoh kode berikut membuat token Apache Airflow CLI. Kode tersebut kemudian menggunakan grafik asiklik terarah (DAG) dalam satu lingkungan Amazon MWAA untuk memanggil DAG di lingkungan Amazon MWAA yang berbeda.

Versi

  • Anda dapat menggunakan contoh kode di halaman ini denganApache Airflow v2 dan di atasnyadiPython 3.10.

Prasyarat

Untuk menggunakan contoh kode pada halaman ini, Anda memerlukan yang berikut:

  • DuaLingkungan Amazon MWAAbersamajaringan publikakses server web, termasuk lingkungan Anda saat ini.

  • Sampel DAG yang diunggah ke bucket Amazon Simple Storage Service (Amazon S3) lingkungan target Anda.

Izin

Untuk menggunakan contoh kode di halaman ini, peran eksekusi lingkungan Anda harus memiliki izin untuk membuat token CLI Apache Airflow. Anda dapat melampirkanAWSkebijakan terkelolaAmazonMWAAAirflowCliAccessuntuk memberikan izin ini.

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "airflow:CreateCliToken" ], "Resource": "*" } ]

Untuk informasi selengkapnya, lihat Kebijakan CLI Aliran Udara Apache: Akses AmazonMWAA AirflowCli.

Dependensi

  • Untuk menggunakan contoh kode ini dengan Apache Airflow v2, tidak diperlukan dependensi tambahan. Kode menggunakanInstalasi dasar Apache Airflow v2pada lingkungan Anda.

Contoh kode

Contoh kode berikut mengasumsikan bahwa Anda menggunakan DAG di lingkungan Anda saat ini untuk memanggil DAG di lingkungan lain.

  1. Di terminal Anda, navigasikan ke direktori tempat kode DAG Anda disimpan. Misalnya:

    cd dags
  2. Salin isi contoh kode berikut dan simpan secara lokal sebagaiinvoke_dag.py. Ganti nilai berikut dengan informasi Anda.

    • your-new-environment-name- Nama lingkungan lain di mana Anda ingin memanggil DAG.

    • your-target-dag-id- ID DAG di lingkungan lain yang ingin Anda panggil.

    from airflow.decorators import dag, task import boto3 from datetime import datetime, timedelta import os, requests DAG_ID = os.path.basename(__file__).replace(".py", "") @task() def invoke_dag_task(**kwargs): client = boto3.client('mwaa') token = client.create_cli_token(Name='your-new-environment-name') url = f"https://{token['WebServerHostname']}/aws_mwaa/cli" body = 'dags trigger your-target-dag-id' headers = { 'Authorization' : 'Bearer ' + token['CliToken'], 'Content-Type': 'text/plain' } requests.post(url, data=body, headers=headers) @dag( dag_id=DAG_ID, schedule_interval=None, start_date=datetime(2022, 1, 1), dagrun_timeout=timedelta(minutes=60), catchup=False ) def invoke_dag(): t = invoke_dag_task() invoke_dag_test = invoke_dag()
  3. Jalankan yang berikutAWS CLIperintah untuk menyalin DAG ke bucket lingkungan Anda, lalu memicu DAG menggunakan UI Apache Airflow.

    $ aws s3 cp your-dag.py s3://your-environment-bucket/dags/
  4. Jika DAG berjalan dengan sukses, Anda akan melihat output yang mirip dengan yang berikut di log tugas untukinvoke_dag_task.

    [2022-01-01, 12:00:00 PDT] {{python.py:152}} INFO - Done. Returned value was: None
    [2022-01-01, 12:00:00 PDT] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=invoke_dag, task_id=invoke_dag_task, execution_date=20220101T120000, start_date=20220101T120000, end_date=20220101T120000
    [2022-01-01, 12:00:00 PDT] {{local_task_job.py:154}} INFO - Task exited with return code 0
    [2022-01-01, 12:00:00 PDT] {{local_task_job.py:264}} INFO - 0 downstream tasks scheduled from follow-on schedule check

    Untuk memverifikasi bahwa DAG Anda berhasil dipanggil, buka UI Apache Airflow untuk lingkungan baru Anda, lalu lakukan hal berikut:

    1. PadaDAGhalaman, cari DAG target baru Anda dalam daftar DAG.

    2. Di bawahRun terakhir, periksa stempel waktu untuk menjalankan DAG terbaru. Stempel waktu ini harus sesuai dengan stempel waktu terbaru untukinvoke_dagdi lingkungan Anda yang lain.

    3. Di bawahTugas Terbaru, periksa apakah lari terakhir berhasil.