Amazon MWAA 環境上的 Aurora 資料庫清理 - Amazon Managed Workflows for Apache Airflow

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

Amazon MWAA 環境上的 Aurora 資料庫清理

適用於 Apache 氣流的 Amazon 受管工作流程使用 Aurora PostgreSQL 資料庫做為 Apache 氣流中繼資料庫,在此資料庫執行 DAG 執行和任務執行個體 下列範例程式碼會定期清除 Amazon MWAA 環境專用 Aurora PostgreSQL 資料庫中的項目。

版本

  • 您可以使用此頁面上的代碼示例與 Apache 氣流 v2 及更高版本在 Python 3.10 中。

必要條件

若要使用此頁面上的範例程式碼,您需要下列項目:

相依性

  • 若要將此程式碼範例與 Apache Airflow v2 搭配使用,不需要額外的相依性。該代碼使用 Apache 氣流 v2 基本安裝在您的環境。

範例程式碼

下列 DAG 會清除中指定之表格的中繼資料資料庫TABLES_TO_CLEAN。這個範例會刪除指定資料表中過去七天的資料。若要調整項目的刪除距離,請將其設定MAX_AGE_IN_DAYS為不同的值。

Apache Airflow v2
from airflow import settings from airflow.utils.dates import days_ago from airflow.models import DagTag, DagModel, DagRun, ImportError, Log, SlaMiss, RenderedTaskInstanceFields, TaskInstance, TaskReschedule, XCom from airflow.decorators import dag, task from airflow.utils.dates import days_ago from time import sleep from airflow.version import version major_version, minor_version = int(version.split('.')[0]), int(version.split('.')[1]) if major_version >= 2 and minor_version >= 6: from airflow.jobs.job import Job else: # The BaseJob class was renamed as of Apache Airflow v2.6 from airflow.jobs.base_job import BaseJob as Job # Delete entries for the past seven days. Adjust MAX_AGE_IN_DAYS to set how far back this DAG cleans the database. MAX_AGE_IN_DAYS = 7 MIN_AGE_IN_DAYS = 0 DECREMENT = -7 # This is a list of (table, time) tuples. # table = the table to clean in the metadata database # time = the column in the table associated to the timestamp of an entry # or None if not applicable. TABLES_TO_CLEAN = [[Job, Job.latest_heartbeat], [TaskInstance, TaskInstance.execution_date], [TaskReschedule, TaskReschedule.execution_date], [DagTag, None], [DagModel, DagModel.last_parsed_time], [DagRun, DagRun.execution_date], [ImportError, ImportError.timestamp], [Log, Log.dttm], [SlaMiss, SlaMiss.execution_date], [RenderedTaskInstanceFields, RenderedTaskInstanceFields.execution_date], [XCom, XCom.execution_date], ] @task() def cleanup_db_fn(x): session = settings.Session() if x[1]: for oldest_days_ago in range(MAX_AGE_IN_DAYS, MIN_AGE_IN_DAYS, DECREMENT): earliest_days_ago = max(oldest_days_ago + DECREMENT, MIN_AGE_IN_DAYS) print(f"deleting {str(x[0])} entries between {earliest_days_ago} and {oldest_days_ago} days old...") earliest_date = days_ago(earliest_days_ago) oldest_date = days_ago(oldest_days_ago) query = session.query(x[0]).filter(x[1] >= oldest_date).filter(x[1] <= earliest_date) query.delete(synchronize_session= False) session.commit() sleep(5) else: # No time column specified for the table. Delete all entries print("deleting", str(x[0]), "...") query = session.query(x[0]) query.delete(synchronize_session= False) session.commit() session.close() @dag( dag_id="cleanup_db", schedule_interval="@weekly", start_date=days_ago(7), catchup=False, is_paused_upon_creation=False ) def clean_db_dag_fn(): t_last=None for x in TABLES_TO_CLEAN: t=cleanup_db_fn(x) if t_last: t_last >> t t_last = t clean_db_dag = clean_db_dag_fn()