Amazon S3 の CSV ファイルへの環境メタデータのエクスポート - Amazon Managed Workflows for Apache Airflow

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Amazon S3 の CSV ファイルへの環境メタデータのエクスポート

次のコード例は、データベースに対して一連の DAG 実行情報を照会し、Amazon S3 に保存されている.csvファイルにデータを書き込む有向非循環グラフ (DAG) を作成する方法を示しています。

お使いの環境の Aurora PostgreSQL データベースから情報をエクスポートして、データをローカルで検査したり、オブジェクトストレージにアーカイブしたり、Amazon S3 to Amazon Redshift オペレータデータベースクリーンアップなどのツールと組み合わせたりして Amazon MWAA メタデータを環境外に移動し、future 分析のために保存しておきたい場合があります。

Apache Airflow のモデルにリストされているオブジェクトのいずれかに対してデータベースをクエリできます。このコードサンプルでは、3つのモデル DagRunTaskFail、および TaskInstance を使用しており、DAG実行に関連する情報を提供します。

バージョン

  • このページのコード例は、「Python 3.10」で Apache Airflow v2 以上と共に使用可能です。

前提条件

このページのサンプルコードを使用するには、以下が必要です。

許可

Amazon MWAA は、クエリされたメタデータ情報を Amazon S3に書き込むためのアクションs3:PutObject の許可が必要です。次のポリシーステートメントを、環境の実行ロールに追加します。

{ "Effect": "Allow", "Action": "s3:PutObject*", "Resource": "arn:aws:s3:::your-new-export-bucket" }

このポリシーは、書き込みアクセスを新規エクスポートバケットのみに制限します。

要件

コードサンプル

次のステップでは、Aurora PostgreSQL にクエリを実行し、その結果を新しい Amazon S3 バケットに書き込む DAG を作成する方法について説明します。

  1. ターミナルで、DAG コードが保存されているディレクトリに移動します。例:

    cd dags
  2. 次のコード例の内容をコピーし、metadata_to_csv.py としてローカルに保存します。DAG がメタデータデータベースからクエリする最古のレコードの年齢を制御するために、MAX_AGE_IN_DAYS に割り当てられた値を変更することができます。

    from airflow.decorators import dag, task from airflow import settings import os import boto3 from airflow.utils.dates import days_ago from airflow.models import DagRun, TaskFail, TaskInstance import csv, re from io import StringIO DAG_ID = os.path.basename(__file__).replace(".py", "") MAX_AGE_IN_DAYS = 30 S3_BUCKET = '<your-export-bucket>' S3_KEY = 'files/export/{0}.csv' # You can add other objects to export from the metadatabase, OBJECTS_TO_EXPORT = [ [DagRun,DagRun.execution_date], [TaskFail,TaskFail.execution_date], [TaskInstance, TaskInstance.execution_date], ] @task() def export_db_task(**kwargs): session = settings.Session() print("session: ",str(session)) oldest_date = days_ago(MAX_AGE_IN_DAYS) print("oldest_date: ",oldest_date) s3 = boto3.client('s3') for x in OBJECTS_TO_EXPORT: query = session.query(x[0]).filter(x[1] >= days_ago(MAX_AGE_IN_DAYS)) print("type",type(query)) allrows=query.all() name=re.sub("[<>']", "", str(x[0])) print(name,": ",str(allrows)) if len(allrows) > 0: outfileStr="" f = StringIO(outfileStr) w = csv.DictWriter(f, vars(allrows[0]).keys()) w.writeheader() for y in allrows: w.writerow(vars(y)) outkey = S3_KEY.format(name[6:]) s3.put_object(Bucket=S3_BUCKET, Key=outkey, Body=f.getvalue()) @dag( dag_id=DAG_ID, schedule_interval=None, start_date=days_ago(1), ) def export_db(): t = export_db_task() metadb_to_s3_test = export_db()
  3. 以下の AWS CLI コマンドを実行して、DAG を環境のバケットにコピーし、次に Apache Airflow UI を使用して DAG をトリガーします。

    $ aws s3 cp your-dag.py s3://your-environment-bucket/dags/
  4. 成功した場合は、export_dbタスクのタスクログに以下のような出力が表示されます。

    [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - type <class 'sqlalchemy.orm.query.Query'>
    [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - class airflow.models.dagrun.DagRun : [your-tasks]
    [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - type <class 'sqlalchemy.orm.query.Query'>
    [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - class airflow.models.taskfail.TaskFail :  [your-tasks]
    [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - type <class 'sqlalchemy.orm.query.Query'>
    [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - class airflow.models.taskinstance.TaskInstance :  [your-tasks]
    [2022-01-01, 12:00:00 PDT] {{python.py:152}} INFO - Done. Returned value was: OK
    [2022-01-01, 12:00:00 PDT] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=metadb_to_s3, task_id=export_db, execution_date=20220101T000000, start_date=20220101T000000, end_date=20220101T000000
    [2022-01-01, 12:00:00 PDT] {{local_task_job.py:154}} INFO - Task exited with return code 0
    [2022-01-01, 12:00:00 PDT] {{local_task_job.py:264}} INFO - 0 downstream tasks scheduled from follow-on schedule check

    今、/files/export/ の新しい Amazon S3 バケット内の.csv ファイルにアクセスしてダウンロードできます。