Mengekspor metadata lingkungan ke file CSV di Amazon S3 - Amazon Managed Workflows for Apache Airflow (MWAA)

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Mengekspor metadata lingkungan ke file CSV di Amazon S3

Contoh kode berikut menunjukkan bagaimana Anda dapat membuat grafik asiklik terarah (DAG) yang menanyakan database untuk berbagai informasi yang dijalankan DAG, dan menulis data ke .csv file yang disimpan di Amazon S3.

Anda mungkin ingin mengekspor informasi dari database Aurora PostgreSQL lingkungan Anda untuk memeriksa data secara lokal, mengarsipkannya dalam penyimpanan objek, atau menggabungkannya dengan alat seperti Amazon S3 ke operator Amazon Redshift dan pembersihan basis data, untuk memindahkan metadata Amazon MWAA keluar dari lingkungan, tetapi melestarikannya untuk analisis masa depan.

Anda dapat menanyakan database untuk salah satu objek yang tercantum dalam model Apache Airflow. Contoh kode ini menggunakan tiga model,DagRun,TaskFail, danTaskInstance, yang memberikan informasi yang relevan dengan DAG run.

Versi

Prasyarat

Untuk menggunakan kode sampel di halaman ini, Anda memerlukan yang berikut:

Izin

Amazon MWAA memerlukan izin untuk s3:PutObject tindakan Amazon S3 untuk menulis informasi metadata yang ditanyakan ke Amazon S3. Tambahkan pernyataan kebijakan berikut ke peran eksekusi lingkungan Anda.

{ "Effect": "Allow", "Action": "s3:PutObject*", "Resource": "arn:aws:s3:::your-new-export-bucket" }

Kebijakan ini membatasi akses tulis saja your-new-export-bucket.

Persyaratan

Contoh kode

Langkah-langkah berikut menjelaskan bagaimana Anda dapat membuat DAG yang menanyakan Aurora PostgreSQL dan menulis hasilnya ke bucket Amazon S3 baru Anda.

  1. Di terminal Anda, arahkan ke direktori tempat kode DAG Anda disimpan. Misalnya:

    cd dags
  2. Salin isi contoh kode berikut dan simpan secara lokal sebagaimetadata_to_csv.py. Anda dapat mengubah nilai yang ditetapkan MAX_AGE_IN_DAYS untuk mengontrol usia rekaman terlama kueri DAG Anda dari database metadata.

    from airflow.decorators import dag, task from airflow import settings import os import boto3 from airflow.utils.dates import days_ago from airflow.models import DagRun, TaskFail, TaskInstance import csv, re from io import StringIO DAG_ID = os.path.basename(__file__).replace(".py", "") MAX_AGE_IN_DAYS = 30 S3_BUCKET = '<your-export-bucket>' S3_KEY = 'files/export/{0}.csv' # You can add other objects to export from the metadatabase, OBJECTS_TO_EXPORT = [ [DagRun,DagRun.execution_date], [TaskFail,TaskFail.execution_date], [TaskInstance, TaskInstance.execution_date], ] @task() def export_db_task(**kwargs): session = settings.Session() print("session: ",str(session)) oldest_date = days_ago(MAX_AGE_IN_DAYS) print("oldest_date: ",oldest_date) s3 = boto3.client('s3') for x in OBJECTS_TO_EXPORT: query = session.query(x[0]).filter(x[1] >= days_ago(MAX_AGE_IN_DAYS)) print("type",type(query)) allrows=query.all() name=re.sub("[<>']", "", str(x[0])) print(name,": ",str(allrows)) if len(allrows) > 0: outfileStr="" f = StringIO(outfileStr) w = csv.DictWriter(f, vars(allrows[0]).keys()) w.writeheader() for y in allrows: w.writerow(vars(y)) outkey = S3_KEY.format(name[6:]) s3.put_object(Bucket=S3_BUCKET, Key=outkey, Body=f.getvalue()) @dag( dag_id=DAG_ID, schedule_interval=None, start_date=days_ago(1), ) def export_db(): t = export_db_task() metadb_to_s3_test = export_db()
  3. Jalankan AWS CLI perintah berikut untuk menyalin DAG ke bucket lingkungan Anda, lalu picu DAG menggunakan Apache Airflow UI.

    $ aws s3 cp your-dag.py s3://your-environment-bucket/dags/
  4. Jika berhasil, Anda akan menampilkan yang serupa dengan yang berikut di log tugas untuk export_db tugas:

    [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - type <class 'sqlalchemy.orm.query.Query'>
    [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - class airflow.models.dagrun.DagRun : [your-tasks]
    [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - type <class 'sqlalchemy.orm.query.Query'>
    [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - class airflow.models.taskfail.TaskFail :  [your-tasks]
    [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - type <class 'sqlalchemy.orm.query.Query'>
    [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - class airflow.models.taskinstance.TaskInstance :  [your-tasks]
    [2022-01-01, 12:00:00 PDT] {{python.py:152}} INFO - Done. Returned value was: OK
    [2022-01-01, 12:00:00 PDT] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=metadb_to_s3, task_id=export_db, execution_date=20220101T000000, start_date=20220101T000000, end_date=20220101T000000
    [2022-01-01, 12:00:00 PDT] {{local_task_job.py:154}} INFO - Task exited with return code 0
    [2022-01-01, 12:00:00 PDT] {{local_task_job.py:264}} INFO - 0 downstream tasks scheduled from follow-on schedule check

    Anda sekarang dapat mengakses dan mengunduh .csv file yang diekspor di bucket Amazon S3 baru Anda. /files/export/