Amazon S3의 CSV 파일로 환경 메타데이터 내보내기 - Amazon Managed Workflows for Apache Airflow

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

Amazon S3의 CSV 파일로 환경 메타데이터 내보내기

다음 코드 예제는 다양한 DAG 실행 정보에 대해 데이터베이스를 쿼리하고 Amazon S3에 저장된 .csv 파일에 데이터를 쓰는 DAG(방향성 비순환 그래프)를 생성하는 방법을 보여줍니다.

데이터를 로컬에서 검사하거나, 이를 객체 스토리지에 보관하거나, Amazon S3에서 Amazon Redshift 운영자데이터베이스 정리와 같은 도구와 결합하기 위해, Amazon MWAA 메타데이터를 환경 외부로 이동하고 향후 분석을 위해 보존하기 위해 사용자 환경의 Aurora PostgreSQL 데이터베이스에서 정보를 내보내고자 할 수 있습니다.

Apache Airflow 모델에 나열된 모든 객체에 대해 데이터베이스를 쿼리할 수 있습니다. 이 코드 샘플은 DAG 실행과 관련된 정보를 제공하는 세 가지 모델인 DagRun, TaskFailTaskInstance을 사용합니다.

버전

  • 이 페이지의 코드 예제는 Python 3.10Apache Airflow v2에서 사용할 수 있습니다.

필수 조건

이 페이지의 이 샘플 코드를 사용하려면 다음 항목이 필요합니다.

권한

Amazon MWAA에는 쿼리된 메타데이터 정보를 Amazon S3에 쓰기 위한 Amazon S3 작업s3:PutObject에 대해 권한이 필요합니다. 다음 정책 문을 사용자 환경의 실행 역할에 추가합니다.

{ "Effect": "Allow", "Action": "s3:PutObject*", "Resource": "arn:aws:s3:::your-new-export-bucket" }

이 정책은 your-new-export-bucket에 대해서만 쓰기 액세스를 제한합니다.

요구 사항

  • 이 코드 예제를 Apache Airflow v2와 함께 사용하려면 추가 종속성이 필요하지 않습니다. 코드는 사용자 환경에 설치된 Apache Airflow v2 기본 설치를 사용합니다.

코드 샘플

다음 단계는 Aurora PostgreSQL을 쿼리하고 새 Amazon S3 버킷에 결과를 쓰는 DAG를 생성하는 방법을 설명합니다.

  1. 터미널에서 DAG 코드가 저장된 디렉터리로 이동합니다. 예:

    cd dags
  2. 다음 코드 예제의 콘텐츠를 복사하고 로컬에서 metadata_to_csv.py로 저장합니다. 사용자 DAG가 메타데이터 데이터베이스에서 쿼리하는 가장 오래된 기록의 보존 기간을 제어하기 위해 MAX_AGE_IN_DAYS에 할당된 값을 변경할 수 있습니다.

    from airflow.decorators import dag, task from airflow import settings import os import boto3 from airflow.utils.dates import days_ago from airflow.models import DagRun, TaskFail, TaskInstance import csv, re from io import StringIO DAG_ID = os.path.basename(__file__).replace(".py", "") MAX_AGE_IN_DAYS = 30 S3_BUCKET = '<your-export-bucket>' S3_KEY = 'files/export/{0}.csv' # You can add other objects to export from the metadatabase, OBJECTS_TO_EXPORT = [ [DagRun,DagRun.execution_date], [TaskFail,TaskFail.execution_date], [TaskInstance, TaskInstance.execution_date], ] @task() def export_db_task(**kwargs): session = settings.Session() print("session: ",str(session)) oldest_date = days_ago(MAX_AGE_IN_DAYS) print("oldest_date: ",oldest_date) s3 = boto3.client('s3') for x in OBJECTS_TO_EXPORT: query = session.query(x[0]).filter(x[1] >= days_ago(MAX_AGE_IN_DAYS)) print("type",type(query)) allrows=query.all() name=re.sub("[<>']", "", str(x[0])) print(name,": ",str(allrows)) if len(allrows) > 0: outfileStr="" f = StringIO(outfileStr) w = csv.DictWriter(f, vars(allrows[0]).keys()) w.writeheader() for y in allrows: w.writerow(vars(y)) outkey = S3_KEY.format(name[6:]) s3.put_object(Bucket=S3_BUCKET, Key=outkey, Body=f.getvalue()) @dag( dag_id=DAG_ID, schedule_interval=None, start_date=days_ago(1), ) def export_db(): t = export_db_task() metadb_to_s3_test = export_db()
  3. 다음 AWS CLI 명령을 실행하여 DAG를 환경 버킷에 복사한 다음 Apache Airflow UI를 사용하여 DAG를 트리거합니다.

    $ aws s3 cp your-dag.py s3://your-environment-bucket/dags/
  4. 성공적으로 실행되면 export_db 작업의 작업 로그에 다음과 유사한 출력이 표시됩니다.

    [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - type <class 'sqlalchemy.orm.query.Query'>
    [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - class airflow.models.dagrun.DagRun : [your-tasks]
    [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - type <class 'sqlalchemy.orm.query.Query'>
    [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - class airflow.models.taskfail.TaskFail :  [your-tasks]
    [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - type <class 'sqlalchemy.orm.query.Query'>
    [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - class airflow.models.taskinstance.TaskInstance :  [your-tasks]
    [2022-01-01, 12:00:00 PDT] {{python.py:152}} INFO - Done. Returned value was: OK
    [2022-01-01, 12:00:00 PDT] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=metadb_to_s3, task_id=export_db, execution_date=20220101T000000, start_date=20220101T000000, end_date=20220101T000000
    [2022-01-01, 12:00:00 PDT] {{local_task_job.py:154}} INFO - Task exited with return code 0
    [2022-01-01, 12:00:00 PDT] {{local_task_job.py:264}} INFO - 0 downstream tasks scheduled from follow-on schedule check

    이제 새 Amazon S3 버킷에서 내보낸 .csv 파일에 액세스하여 /files/export/에 다운로드할 수 있습니다.