

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 在 Amazon MWAA 环境中清理 Aurora PostgreSQL 数据库
<a name="samples-database-cleanup"></a>

Amazon Managed Workflows for Apache Airflow 使用 Aurora PostgreSQL 数据库作为 DAG 运行并存储任务实例的 Apache Airflow 元数据库。以下示例代码会定期为 Amazon MWAA 环境清除专用 Aurora PostgreSQL 数据库中的条目。

**Topics**
+ [版本](#samples-database-cleanup-version)
+ [先决条件](#samples-database-cleanup-prereqs)
+ [依赖项](#samples-sql-server-dependencies)
+ [代码示例](#samples-database-cleanup-code)

## 版本
<a name="samples-database-cleanup-version"></a>

本页上的代码示例特定于亚马逊 MWAA 支持的 Apache Airflow v2 和 v3。请参阅[支持的 Apache Airflow 版本](airflow-versions.md)。

## 先决条件
<a name="samples-database-cleanup-prereqs"></a>

要使用本页上的示例代码，您需要以下内容：
+ [Amazon MWAA 环境。](get-started.md)

## 依赖项
<a name="samples-sql-server-dependencies"></a>

要在 Apache Airflow v2 中使用此代码示例，无需附加依赖项。使用 [aws-mwaa-docker-images](https://github.com/aws/amazon-mwaa-docker-images) 安装 Apache Airflow。

## 代码示例
<a name="samples-database-cleanup-code"></a>

以下 DAG 会清理 `TABLES_TO_CLEAN` 中指定表的元数据数据库。该示例将删除指定表中存在超过 30 天的数据。要调整删除条目的存续时间，请将 `MAX_AGE_IN_DAYS` 设置为其他值。

------
#### [ Apache Airflow v3.0.6 to 3.2.1 ]

```
from datetime import datetime
from airflow import DAG
from airflow.providers.standard.operators.bash import BashOperator

# Note: Database commands might time out if running longer than 5 minutes. If this occurs, please increase the MAX_AGE_IN_DAYS (or change 
# timestamp parameter to an earlier date) for initial runs, then reduce on subsequent runs until the desired retention is met.

MAX_AGE_IN_DAYS = 30

# To clean specific tables, please provide a comma-separated list per
# https://airflow.apache.org/docs/apache-airflow/stable/cli-and-env-variables-ref.html#clean

# A value of None will clean all tables
TABLES_TO_CLEAN = None

with DAG(
    dag_id="clean_db_dag",
    schedule=None,
    catchup=False,
    start_date=datetime(2026, 1, 1),
) as dag:
    tables_flag = f"--tables '{TABLES_TO_CLEAN}' " if TABLES_TO_CLEAN else ""

    bash_command = (
        f"TIMESTAMP=$(date -u -d '{MAX_AGE_IN_DAYS} days ago' '+%Y-%m-%d %H:%M:%S' 2>/dev/null "
        f"|| date -u -v-{MAX_AGE_IN_DAYS}d '+%Y-%m-%d %H:%M:%S') && "
        "echo \"Cleaning records before: $TIMESTAMP\" && "
        "airflow db clean "
        "--clean-before-timestamp \"$TIMESTAMP\" "
        f"{tables_flag}"
        "--skip-archive --yes"
    )

    cli_command = BashOperator(
        task_id="bash_command",
        bash_command=bash_command,
    )
```

------
#### [ Apache Airflow v2.7.2 to 2.11.0 ]

```
from airflow import DAG
from airflow.models.param import Param
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago

from datetime import datetime, timedelta

# Note: Database commands might time out if running longer than 5 minutes. If this occurs, please increase the MAX_AGE_IN_DAYS (or change 
# timestamp parameter to an earlier date) for initial runs, then reduce on subsequent runs until the desired retention is met.

MAX_AGE_IN_DAYS = 30

# To clean specific tables, please provide a comma-separated list per 
# https://airflow.apache.org/docs/apache-airflow/stable/cli-and-env-variables-ref.html#clean
# A value of None will clean all tables

TABLES_TO_CLEAN = None

with DAG(
    dag_id="clean_db_dag", 
    schedule_interval=None, 
    catchup=False, 
    start_date=days_ago(1),
    params={
        "timestamp": Param(
            default=(datetime.now()-timedelta(days=MAX_AGE_IN_DAYS)).strftime("%Y-%m-%d %H:%M:%S"),
            type="string",
            minLength=1,
            maxLength=255,
        ),     
    }   
) as dag:
    if TABLES_TO_CLEAN:
        bash_command="airflow db clean --clean-before-timestamp '{{ params.timestamp }}' --tables '"+TABLES_TO_CLEAN+"' --skip-archive --yes"
    else:
        bash_command="airflow db clean --clean-before-timestamp '{{ params.timestamp }}' --skip-archive --yes"

    cli_command = BashOperator(
        task_id="bash_command",
        bash_command=bash_command
    )
```

------