Esportazione di metadati ambientali in file CSV su Amazon S3 - Amazon Managed Workflows for Apache Airflow

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Esportazione di metadati ambientali in file CSV su Amazon S3

Il seguente esempio di codice mostra come creare un grafo aciclico diretto (DAG) che interroga il database per una serie di informazioni sull'esecuzione del DAG e scrive i dati in file archiviati .csv su Amazon S3.

Potresti voler esportare informazioni dal database Aurora PostgreSQL del tuo ambiente per ispezionare i dati localmente, archiviarli nello storage di oggetti o combinarli con strumenti come l'operatore Amazon S3 su Amazon Redshift e la pulizia del database, per spostare i metadati Amazon MWAA fuori dall'ambiente, ma conservarli per analisi future.

Puoi interrogare il database per qualsiasi oggetto elencato nei modelli Apache Airflow. Questo esempio di codice utilizza tre modelli,DagRun, e TaskFailTaskInstance, che forniscono informazioni pertinenti alle esecuzioni DAG.

Versione

Prerequisiti

Per utilizzare il codice di esempio in questa pagina, avrai bisogno di quanto segue:

Autorizzazioni

Amazon MWAA necessita dell'autorizzazione per l's3:PutObjectazione di Amazon S3 per scrivere le informazioni sui metadati richieste su Amazon S3. Aggiungi la seguente dichiarazione politica al ruolo di esecuzione del tuo ambiente.

{ "Effect": "Allow", "Action": "s3:PutObject*", "Resource": "arn:aws:s3:::your-new-export-bucket" }

Questa politica limita solo l'accesso in scrittura a your-new-export-bucket.

Requisiti

  • Per utilizzare questo esempio di codice con Apache Airflow v2, non sono richieste dipendenze aggiuntive. Il codice utilizza l'installazione di base di Apache Airflow v2 nell'ambiente in uso.

Esempio di codice

I passaggi seguenti descrivono come creare un DAG che interroga Aurora PostgreSQL e scrive il risultato nel nuovo bucket Amazon S3.

  1. Nel tuo terminale, accedi alla directory in cui è memorizzato il codice DAG. Ad esempio:

    cd dags
  2. Copia il contenuto del seguente esempio di codice e salvalo localmente comemetadata_to_csv.py. È possibile modificare il valore assegnato per MAX_AGE_IN_DAYS controllare l'età dei record più vecchi interrogati dal DAG dal database dei metadati.

    from airflow.decorators import dag, task from airflow import settings import os import boto3 from airflow.utils.dates import days_ago from airflow.models import DagRun, TaskFail, TaskInstance import csv, re from io import StringIO DAG_ID = os.path.basename(__file__).replace(".py", "") MAX_AGE_IN_DAYS = 30 S3_BUCKET = '<your-export-bucket>' S3_KEY = 'files/export/{0}.csv' # You can add other objects to export from the metadatabase, OBJECTS_TO_EXPORT = [ [DagRun,DagRun.execution_date], [TaskFail,TaskFail.execution_date], [TaskInstance, TaskInstance.execution_date], ] @task() def export_db_task(**kwargs): session = settings.Session() print("session: ",str(session)) oldest_date = days_ago(MAX_AGE_IN_DAYS) print("oldest_date: ",oldest_date) s3 = boto3.client('s3') for x in OBJECTS_TO_EXPORT: query = session.query(x[0]).filter(x[1] >= days_ago(MAX_AGE_IN_DAYS)) print("type",type(query)) allrows=query.all() name=re.sub("[<>']", "", str(x[0])) print(name,": ",str(allrows)) if len(allrows) > 0: outfileStr="" f = StringIO(outfileStr) w = csv.DictWriter(f, vars(allrows[0]).keys()) w.writeheader() for y in allrows: w.writerow(vars(y)) outkey = S3_KEY.format(name[6:]) s3.put_object(Bucket=S3_BUCKET, Key=outkey, Body=f.getvalue()) @dag( dag_id=DAG_ID, schedule_interval=None, start_date=days_ago(1), ) def export_db(): t = export_db_task() metadb_to_s3_test = export_db()
  3. Esegui il AWS CLI comando seguente per copiare il DAG nel bucket del tuo ambiente, quindi attiva il DAG utilizzando l'interfaccia utente di Apache Airflow.

    $ aws s3 cp your-dag.py s3://your-environment-bucket/dags/
  4. In caso di successo, nei log delle attività dell'operazione verrà generato un risultato simile al seguente: export_db

    [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - type <class 'sqlalchemy.orm.query.Query'>
    [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - class airflow.models.dagrun.DagRun : [your-tasks]
    [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - type <class 'sqlalchemy.orm.query.Query'>
    [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - class airflow.models.taskfail.TaskFail :  [your-tasks]
    [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - type <class 'sqlalchemy.orm.query.Query'>
    [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - class airflow.models.taskinstance.TaskInstance :  [your-tasks]
    [2022-01-01, 12:00:00 PDT] {{python.py:152}} INFO - Done. Returned value was: OK
    [2022-01-01, 12:00:00 PDT] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=metadb_to_s3, task_id=export_db, execution_date=20220101T000000, start_date=20220101T000000, end_date=20220101T000000
    [2022-01-01, 12:00:00 PDT] {{local_task_job.py:154}} INFO - Task exited with return code 0
    [2022-01-01, 12:00:00 PDT] {{local_task_job.py:264}} INFO - 0 downstream tasks scheduled from follow-on schedule check

    Ora puoi accedere e scaricare i .csv file esportati nel tuo nuovo bucket Amazon S3. /files/export/