Aurora-PostgreSQL-Datenbankbereinigung in einer Amazon-MWAA-Umgebung - Amazon Managed Workflows für Apache Airflow

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Aurora-PostgreSQL-Datenbankbereinigung in einer Amazon-MWAA-Umgebung

Amazon Managed Workflows for Apache Airflow verwendet eine Aurora-PostgreSQL-Datenbank als Apache-Airflow-Metadatendatenbank, in der DAG ausgeführt wird und Aufgaben-Instances gespeichert werden. Der folgende Beispielcode löscht regelmäßig Einträge aus der dedizierten Aurora-PostgreSQL-Datenbank für Ihre Amazon-MWAA-Umgebung.

Version

  • Sie können das Codebeispiel auf dieser Seite mit Apache Airflow v2 und höher in Python 3.10 verwenden.

Voraussetzungen

Um den Beispielcode auf dieser Seite zu verwenden, benötigen Sie Folgendes:

Abhängigkeiten

  • Um dieses Codebeispiel mit Apache Airflow v2 zu verwenden, sind keine zusätzlichen Abhängigkeiten erforderlich. Der Code verwendet die Apache Airflow v2-Basisinstallation in Ihrer Umgebung.

Codebeispiel

Die folgende DAG bereinigt die Metadatendatenbank für die in angegebenen TabellenTABLES_TO_CLEAN. Das Beispiel löscht Daten aus den angegebenen Tabellen der letzten sieben Tage. Um anzupassen, wie weit die Einträge gelöscht werden, legen Sie MAX_AGE_IN_DAYS auf einen anderen Wert fest.

Apache Airflow v2
from airflow import settings from airflow.utils.dates import days_ago from airflow.models import DagTag, DagModel, DagRun, ImportError, Log, SlaMiss, RenderedTaskInstanceFields, TaskInstance, TaskReschedule, XCom from airflow.decorators import dag, task from airflow.utils.dates import days_ago from time import sleep from airflow.version import version major_version, minor_version = int(version.split('.')[0]), int(version.split('.')[1]) if major_version >= 2 and minor_version >= 6: from airflow.jobs.job import Job else: # The BaseJob class was renamed as of Apache Airflow v2.6 from airflow.jobs.base_job import BaseJob as Job # Delete entries for the past seven days. Adjust MAX_AGE_IN_DAYS to set how far back this DAG cleans the database. MAX_AGE_IN_DAYS = 7 MIN_AGE_IN_DAYS = 0 DECREMENT = -7 # This is a list of (table, time) tuples. # table = the table to clean in the metadata database # time = the column in the table associated to the timestamp of an entry # or None if not applicable. TABLES_TO_CLEAN = [[Job, Job.latest_heartbeat], [TaskInstance, TaskInstance.execution_date], [TaskReschedule, TaskReschedule.execution_date], [DagTag, None], [DagModel, DagModel.last_parsed_time], [DagRun, DagRun.execution_date], [ImportError, ImportError.timestamp], [Log, Log.dttm], [SlaMiss, SlaMiss.execution_date], [RenderedTaskInstanceFields, RenderedTaskInstanceFields.execution_date], [XCom, XCom.execution_date], ] @task() def cleanup_db_fn(x): session = settings.Session() if x[1]: for oldest_days_ago in range(MAX_AGE_IN_DAYS, MIN_AGE_IN_DAYS, DECREMENT): earliest_days_ago = max(oldest_days_ago + DECREMENT, MIN_AGE_IN_DAYS) print(f"deleting {str(x[0])} entries between {earliest_days_ago} and {oldest_days_ago} days old...") earliest_date = days_ago(earliest_days_ago) oldest_date = days_ago(oldest_days_ago) query = session.query(x[0]).filter(x[1] >= oldest_date).filter(x[1] <= earliest_date) query.delete(synchronize_session= False) session.commit() sleep(5) else: # No time column specified for the table. Delete all entries print("deleting", str(x[0]), "...") query = session.query(x[0]) query.delete(synchronize_session= False) session.commit() session.close() @dag( dag_id="cleanup_db", schedule_interval="@weekly", start_date=days_ago(7), catchup=False, is_paused_upon_creation=False ) def clean_db_dag_fn(): t_last=None for x in TABLES_TO_CLEAN: t=cleanup_db_fn(x) if t_last: t_last >> t t_last = t clean_db_dag = clean_db_dag_fn()