Exportación de metadatos del entorno a archivos CSV en Amazon S3 - Amazon Managed Workflows para Apache Airflow

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Exportación de metadatos del entorno a archivos CSV en Amazon S3

El siguiente código de muestra indica cómo puede crear un gráfico acíclico dirigido (DAG) que consulte en la base de datos un rango de información de ejecución del DAG y escriba los datos en los archivos .csv almacenados en Amazon S3.

Quizás desee exportar información de la base de datos Aurora PostgreSQL de su entorno para inspeccionar los datos localmente, archivarlos en un almacenamiento de objetos o combinarlos con herramientas como el operador Amazon S3 a Amazon Redshift y la limpieza de la base de datos, a fin de sacar los metadatos de Amazon MWAA del entorno y conservarlos para futuros análisis.

Puede consultar en la base de datos cualquiera de los objetos enumerados en los modelos de Apache Airflow. En este código de muestra se utilizan tres modelos, DagRun, TaskFail y TaskInstance, que proporcionan información relevante para las ejecuciones del DAG.

Versión

  • Puede usar el código de ejemplo de esta página con Apache Airflow v2 y versiones posteriores en Python 3.10.

Requisitos previos

Para usar el código de muestra de esta página, necesitará lo siguiente:

Permisos

Amazon MWAA necesita permiso para que la acción s3:PutObject de Amazon S3 escriba la información de metadatos consultada en Amazon S3. Añada la siguiente instrucción de política a la función de ejecución de su entorno.

{ "Effect": "Allow", "Action": "s3:PutObject*", "Resource": "arn:aws:s3:::your-new-export-bucket" }

Esta política limita el acceso de escritura únicamente a su nuevo bucket de exportación.

Requisitos

Código de ejemplo

Los siguientes pasos describen cómo puede crear un DAG que consulte Aurora PostgreSQL y escriba el resultado en su nuevo bucket de Amazon S3.

  1. En su terminal, vaya hasta el directorio en el que está almacenado el código de DAG. Por ejemplo:

    cd dags
  2. Copie el contenido del siguiente código de ejemplo y guárdelo localmente como metadata_to_csv.py. Puede cambiar el valor asignado MAX_AGE_IN_DAYS para controlar la antigüedad de los registros más antiguos que el DAG consulta en la base de datos de metadatos.

    from airflow.decorators import dag, task from airflow import settings import os import boto3 from airflow.utils.dates import days_ago from airflow.models import DagRun, TaskFail, TaskInstance import csv, re from io import StringIO DAG_ID = os.path.basename(__file__).replace(".py", "") MAX_AGE_IN_DAYS = 30 S3_BUCKET = '<your-export-bucket>' S3_KEY = 'files/export/{0}.csv' # You can add other objects to export from the metadatabase, OBJECTS_TO_EXPORT = [ [DagRun,DagRun.execution_date], [TaskFail,TaskFail.execution_date], [TaskInstance, TaskInstance.execution_date], ] @task() def export_db_task(**kwargs): session = settings.Session() print("session: ",str(session)) oldest_date = days_ago(MAX_AGE_IN_DAYS) print("oldest_date: ",oldest_date) s3 = boto3.client('s3') for x in OBJECTS_TO_EXPORT: query = session.query(x[0]).filter(x[1] >= days_ago(MAX_AGE_IN_DAYS)) print("type",type(query)) allrows=query.all() name=re.sub("[<>']", "", str(x[0])) print(name,": ",str(allrows)) if len(allrows) > 0: outfileStr="" f = StringIO(outfileStr) w = csv.DictWriter(f, vars(allrows[0]).keys()) w.writeheader() for y in allrows: w.writerow(vars(y)) outkey = S3_KEY.format(name[6:]) s3.put_object(Bucket=S3_BUCKET, Key=outkey, Body=f.getvalue()) @dag( dag_id=DAG_ID, schedule_interval=None, start_date=days_ago(1), ) def export_db(): t = export_db_task() metadb_to_s3_test = export_db()
  3. Ejecute el siguiente comando de AWS CLI para copiar el DAG en el bucket de su entorno y, a continuación, active el DAG mediante la interfaz de usuario de Apache Airflow.

    $ aws s3 cp your-dag.py s3://your-environment-bucket/dags/
  4. Si se ejecuta correctamente, verá un resultado similar al siguiente en los registros de tareas para la tarea export_db:

    [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - type <class 'sqlalchemy.orm.query.Query'>
    [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - class airflow.models.dagrun.DagRun : [your-tasks]
    [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - type <class 'sqlalchemy.orm.query.Query'>
    [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - class airflow.models.taskfail.TaskFail :  [your-tasks]
    [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - type <class 'sqlalchemy.orm.query.Query'>
    [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - class airflow.models.taskinstance.TaskInstance :  [your-tasks]
    [2022-01-01, 12:00:00 PDT] {{python.py:152}} INFO - Done. Returned value was: OK
    [2022-01-01, 12:00:00 PDT] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=metadb_to_s3, task_id=export_db, execution_date=20220101T000000, start_date=20220101T000000, end_date=20220101T000000
    [2022-01-01, 12:00:00 PDT] {{local_task_job.py:154}} INFO - Task exited with return code 0
    [2022-01-01, 12:00:00 PDT] {{local_task_job.py:264}} INFO - 0 downstream tasks scheduled from follow-on schedule check

    Ahora puede acceder a los archivos exportados .csv y descargarlos en su nuevo bucket de Amazon S3 en /files/export/.