Pulizia del database Aurora PostgreSQL in un ambiente Amazon MWAA - Amazon Managed Workflows for Apache Airflow

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Pulizia del database Aurora PostgreSQL in un ambiente Amazon MWAA

Amazon Managed Workflows for Apache Airflow utilizza un database Aurora PostgreSQL come database di metadati Apache Airflow, in cui vengono eseguite le operazioni DAG e vengono archiviate le istanze delle attività. Il seguente codice di esempio cancella periodicamente le voci dal database Aurora PostgreSQL dedicato per il tuo ambiente Amazon MWAA.

Versione

Prerequisiti

Per utilizzare il codice di esempio in questa pagina, avrai bisogno di quanto segue:

Dipendenze

  • Per utilizzare questo esempio di codice con Apache Airflow v2, non sono richieste dipendenze aggiuntive. Il codice utilizza l'installazione di base di Apache Airflow v2 nell'ambiente in uso.

Esempio di codice

Il seguente DAG pulisce il database dei metadati per le tabelle specificate in. TABLES_TO_CLEAN L'esempio elimina i dati dalle tabelle specificate degli ultimi sette giorni. Per modificare la data di eliminazione delle voci, MAX_AGE_IN_DAYS impostate un valore diverso.

Apache Airflow v2
from airflow import settings from airflow.utils.dates import days_ago from airflow.models import DagTag, DagModel, DagRun, ImportError, Log, SlaMiss, RenderedTaskInstanceFields, TaskInstance, TaskReschedule, XCom from airflow.decorators import dag, task from airflow.utils.dates import days_ago from time import sleep from airflow.version import version major_version, minor_version = int(version.split('.')[0]), int(version.split('.')[1]) if major_version >= 2 and minor_version >= 6: from airflow.jobs.job import Job else: # The BaseJob class was renamed as of Apache Airflow v2.6 from airflow.jobs.base_job import BaseJob as Job # Delete entries for the past seven days. Adjust MAX_AGE_IN_DAYS to set how far back this DAG cleans the database. MAX_AGE_IN_DAYS = 7 MIN_AGE_IN_DAYS = 0 DECREMENT = -7 # This is a list of (table, time) tuples. # table = the table to clean in the metadata database # time = the column in the table associated to the timestamp of an entry # or None if not applicable. TABLES_TO_CLEAN = [[Job, Job.latest_heartbeat], [TaskInstance, TaskInstance.execution_date], [TaskReschedule, TaskReschedule.execution_date], [DagTag, None], [DagModel, DagModel.last_parsed_time], [DagRun, DagRun.execution_date], [ImportError, ImportError.timestamp], [Log, Log.dttm], [SlaMiss, SlaMiss.execution_date], [RenderedTaskInstanceFields, RenderedTaskInstanceFields.execution_date], [XCom, XCom.execution_date], ] @task() def cleanup_db_fn(x): session = settings.Session() if x[1]: for oldest_days_ago in range(MAX_AGE_IN_DAYS, MIN_AGE_IN_DAYS, DECREMENT): earliest_days_ago = max(oldest_days_ago + DECREMENT, MIN_AGE_IN_DAYS) print(f"deleting {str(x[0])} entries between {earliest_days_ago} and {oldest_days_ago} days old...") earliest_date = days_ago(earliest_days_ago) oldest_date = days_ago(oldest_days_ago) query = session.query(x[0]).filter(x[1] >= oldest_date).filter(x[1] <= earliest_date) query.delete(synchronize_session= False) session.commit() sleep(5) else: # No time column specified for the table. Delete all entries print("deleting", str(x[0]), "...") query = session.query(x[0]) query.delete(synchronize_session= False) session.commit() session.close() @dag( dag_id="cleanup_db", schedule_interval="@weekly", start_date=days_ago(7), catchup=False, is_paused_upon_creation=False ) def clean_db_dag_fn(): t_last=None for x in TABLES_TO_CLEAN: t=cleanup_db_fn(x) if t_last: t_last >> t t_last = t clean_db_dag = clean_db_dag_fn()