将环境元数据导出到 Amazon S3 上的 CSV 文件 - Amazon Managed Workflows for Apache Airflow

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

将环境元数据导出到 Amazon S3 上的 CSV 文件

以下代码示例说明如何创建有向无环图(DAG),该图在数据库中查询一系列 DAG 运行信息,并将数据写入存储在 Amazon S3 上的 .csv 文件。

您可能需要从环境的 Aurora PostgreSQL 数据库中导出信息,以便在本地检查数据,将其存档到对象存储中,或者将它们与诸如 Amazon S3 到 Amazon Redshift 运算符数据库清理之类的工具结合使用,以便将 Amazon MWAA 元数据移出环境,但保留它们以备将来分析。

您可以在数据库中查询 Apache Airflow 模型中列出的任何对象。此代码示例使用三个模型:DagRunTaskFailTaskInstance,它们提供与 DAG 运行相关的信息。

版本

  • 您可以将本页上的代码示例与 Python 3.10 中的 Apache Airflow v2 及更高版本一起使用。

先决条件

要使用本页上的示例代码,您需要以下内容:

权限

Amazon MWAA 需要获得 Amazon S3 操作 s3:PutObject 的权限,才能将查询的元数据信息写入 Amazon S3。将以下策略声明添加到环境的执行角色中。

{ "Effect": "Allow", "Action": "s3:PutObject*", "Resource": "arn:aws:s3:::your-new-export-bucket" }

此策略将写入权限仅限于 your-new-export-bucket

要求

代码示例

以下步骤描述了如何创建 DAG,以查询 Aurora PostgreSQL 并将结果写入新的 Amazon S3 存储桶。

  1. 在您的终端,导航到存储 DAG 代码的目录。例如:

    cd dags
  2. 复制以下代码示例的内容并本地另存为 metadata_to_csv.py。您可以更改分配给 MAX_AGE_IN_DAYS 的值,以控制 DAG 从元数据数据库中查询的最早记录的龄期。

    from airflow.decorators import dag, task from airflow import settings import os import boto3 from airflow.utils.dates import days_ago from airflow.models import DagRun, TaskFail, TaskInstance import csv, re from io import StringIO DAG_ID = os.path.basename(__file__).replace(".py", "") MAX_AGE_IN_DAYS = 30 S3_BUCKET = '<your-export-bucket>' S3_KEY = 'files/export/{0}.csv' # You can add other objects to export from the metadatabase, OBJECTS_TO_EXPORT = [ [DagRun,DagRun.execution_date], [TaskFail,TaskFail.execution_date], [TaskInstance, TaskInstance.execution_date], ] @task() def export_db_task(**kwargs): session = settings.Session() print("session: ",str(session)) oldest_date = days_ago(MAX_AGE_IN_DAYS) print("oldest_date: ",oldest_date) s3 = boto3.client('s3') for x in OBJECTS_TO_EXPORT: query = session.query(x[0]).filter(x[1] >= days_ago(MAX_AGE_IN_DAYS)) print("type",type(query)) allrows=query.all() name=re.sub("[<>']", "", str(x[0])) print(name,": ",str(allrows)) if len(allrows) > 0: outfileStr="" f = StringIO(outfileStr) w = csv.DictWriter(f, vars(allrows[0]).keys()) w.writeheader() for y in allrows: w.writerow(vars(y)) outkey = S3_KEY.format(name[6:]) s3.put_object(Bucket=S3_BUCKET, Key=outkey, Body=f.getvalue()) @dag( dag_id=DAG_ID, schedule_interval=None, start_date=days_ago(1), ) def export_db(): t = export_db_task() metadb_to_s3_test = export_db()
  3. 运行以下 AWS CLI 命令将 DAG 复制到环境的存储桶,然后使用 Apache Airflow UI 触发 DAG。

    $ aws s3 cp your-dag.py s3://your-environment-bucket/dags/
  4. 如果成功,输出将类似于在 export_db 任务的任务日志中的以下内容:

    [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - type <class 'sqlalchemy.orm.query.Query'>
    [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - class airflow.models.dagrun.DagRun : [your-tasks]
    [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - type <class 'sqlalchemy.orm.query.Query'>
    [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - class airflow.models.taskfail.TaskFail :  [your-tasks]
    [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - type <class 'sqlalchemy.orm.query.Query'>
    [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - class airflow.models.taskinstance.TaskInstance :  [your-tasks]
    [2022-01-01, 12:00:00 PDT] {{python.py:152}} INFO - Done. Returned value was: OK
    [2022-01-01, 12:00:00 PDT] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=metadb_to_s3, task_id=export_db, execution_date=20220101T000000, start_date=20220101T000000, end_date=20220101T000000
    [2022-01-01, 12:00:00 PDT] {{local_task_job.py:154}} INFO - Task exited with return code 0
    [2022-01-01, 12:00:00 PDT] {{local_task_job.py:264}} INFO - 0 downstream tasks scheduled from follow-on schedule check

    现在,您可以在 /files/export/ 中的新 Amazon S3 存储桶中访问和下载导出的 .csv 文件。