Amazon MWAA 環境でのAurora PostgreSQL データベースのクリーンアップ - Amazon Managed Workflows for Apache Airflow

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Amazon MWAA 環境でのAurora PostgreSQL データベースのクリーンアップ

Amazon Managed Workflows for Apache Airflow は、Apache Airflow メタデータデータベースとして Aurora PostgreSQL データベースを使用します。このデータベースでは、DAG が実行され、タスクインスタンスが保存されます。次のサンプルコードは、Amazon MWAA 環境の専用の Aurora PostgreSQL データベースから定期的にエントリを消去します。

バージョン

  • このページのコード例は、「Python 3.10」で Apache Airflow v2 以上と共に使用可能です。

前提条件

このページのサンプルコードを使用するには、以下が必要です。

依存関係

  • このコード例を Apache Airflow v2 で使用する場合、追加の依存関係は必要ありません。このコードでは、ご使用の環境にある「Apache Airflow v2 のベースインストール」を使用します。

コードサンプル

次の DAG は、 で指定されたテーブルのメタデータデータベースをクリーンアップしますTABLES_TO_CLEAN。この例では、指定したテーブルから過去 7 日間のデータを削除します。エントリの削除範囲を調整するには、 MAX_AGE_IN_DAYSを別の値に設定します。

Apache Airflow v2
from airflow import settings from airflow.utils.dates import days_ago from airflow.models import DagTag, DagModel, DagRun, ImportError, Log, SlaMiss, RenderedTaskInstanceFields, TaskInstance, TaskReschedule, XCom from airflow.decorators import dag, task from airflow.utils.dates import days_ago from time import sleep from airflow.version import version major_version, minor_version = int(version.split('.')[0]), int(version.split('.')[1]) if major_version >= 2 and minor_version >= 6: from airflow.jobs.job import Job else: # The BaseJob class was renamed as of Apache Airflow v2.6 from airflow.jobs.base_job import BaseJob as Job # Delete entries for the past seven days. Adjust MAX_AGE_IN_DAYS to set how far back this DAG cleans the database. MAX_AGE_IN_DAYS = 7 MIN_AGE_IN_DAYS = 0 DECREMENT = -7 # This is a list of (table, time) tuples. # table = the table to clean in the metadata database # time = the column in the table associated to the timestamp of an entry # or None if not applicable. TABLES_TO_CLEAN = [[Job, Job.latest_heartbeat], [TaskInstance, TaskInstance.execution_date], [TaskReschedule, TaskReschedule.execution_date], [DagTag, None], [DagModel, DagModel.last_parsed_time], [DagRun, DagRun.execution_date], [ImportError, ImportError.timestamp], [Log, Log.dttm], [SlaMiss, SlaMiss.execution_date], [RenderedTaskInstanceFields, RenderedTaskInstanceFields.execution_date], [XCom, XCom.execution_date], ] @task() def cleanup_db_fn(x): session = settings.Session() if x[1]: for oldest_days_ago in range(MAX_AGE_IN_DAYS, MIN_AGE_IN_DAYS, DECREMENT): earliest_days_ago = max(oldest_days_ago + DECREMENT, MIN_AGE_IN_DAYS) print(f"deleting {str(x[0])} entries between {earliest_days_ago} and {oldest_days_ago} days old...") earliest_date = days_ago(earliest_days_ago) oldest_date = days_ago(oldest_days_ago) query = session.query(x[0]).filter(x[1] >= oldest_date).filter(x[1] <= earliest_date) query.delete(synchronize_session= False) session.commit() sleep(5) else: # No time column specified for the table. Delete all entries print("deleting", str(x[0]), "...") query = session.query(x[0]) query.delete(synchronize_session= False) session.commit() session.close() @dag( dag_id="cleanup_db", schedule_interval="@weekly", start_date=days_ago(7), catchup=False, is_paused_upon_creation=False ) def clean_db_dag_fn(): t_last=None for x in TABLES_TO_CLEAN: t=cleanup_db_fn(x) if t_last: t_last >> t t_last = t clean_db_dag = clean_db_dag_fn()