在 Amazon MWAA 中使用 dbt - Amazon Managed Workflows for Apache Airflow

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

在 Amazon MWAA 中使用 dbt

本主题演示了如何在 Amazon MWAA 中使用 dbt 和 Postgres。在以下步骤中,您将所需的依赖项添加到 requirements.txt 中,并将示例 dbt 项目上传到环境的 Amazon S3 存储桶。然后,您将使用示例 DAG 来验证 Amazon MWAA 是否已安装依赖项,最后使用 BashOperator 来运行 dbt 项目。

版本

  • 您可以将本页上的代码示例与 Python 3.10 中的 Apache Airflow v2 及更高版本一起使用。

先决条件

在完成以下步骤之前,您需要具备以下条件:

  • 使用 Apache Airflow v2.2.2 的Amazon MWAA 环境。此示例已编写,并使用 v2.2.2 进行了测试。您可能需要修改示例以与其他 Apache Airflow 版本一起使用。

  • dbt 项目示例。要开始在 Amazon MWAA 中使用 dbt,您可以创建一个分支并从 dbt-labs GitHub 存储库中克隆 dbt 入门项目

附属物

要将 Amazon MWAA 与 dbt 配合使用,请将以下依赖项添加到 requirements.txt。要了解更多信息,请参阅 安装 Python 依赖项

当环境完成更新后,Amazon MWAA 将安装所需的 dbt 库和其他依赖项,例如 psycopg2

注意

Apache Airflow v2.2.2 提供的默认约束文件有一个相互冲突的 jsonschema 版本,本指南中使用的 dbt 版本不支持该版本。因此,在将 Amazon MWAA 与 dbt 配合使用时,您可以将 Apache Airflow 约束文件下载并修改到 Amazon S3 DAG 文件夹中,然后在 requirements.txt 文件中将其引用为 --constraint /usr/local/airflow/dags/my-updated-constraint.txt,也可以在 requirements.txt 中省略 --constraint,如下所示。

json-rpc==1.13.0 minimal-snowplow-tracker==0.0.2 packaging==20.9 networkx==2.6.3 mashumaro==2.5 sqlparse==0.4.2 logbook==1.5.3 agate==1.6.1 dbt-extractor==0.4.0 pyparsing==2.4.7 msgpack==1.0.2 parsedatetime==2.6 pytimeparse==1.1.8 leather==0.3.4 pyyaml==5.4.1 # Airflow constraints are jsonschema==3.2.0 jsonschema==3.1.1 hologram==0.0.14 dbt-core==0.21.1 psycopg2-binary==2.8.6 dbt-postgres==0.21.1 dbt-redshift==0.21.1

在以下章节中,您可将 dbt 项目目录上传到 Amazon S3 并运行 DAG 来验证 Amazon MWAA 是否已成功安装所需的 dbt 依赖项。

将 dbt 项目上传到 Amazon S3

为了能够在 Amazon MWAA 环境中使用 dbt 项目,您可以将整个项目目录上传到环境的 dags 文件夹中。当环境更新时,Amazon MWAA 会将 dbt 目录下载到本地 usr/local/airflow/dags/ 文件夹。

要将 dbt 项目上传到 Amazon S3,请执行以下操作
  1. 导航到您克隆 dbt 入门项目的目录。

  2. 运行以下 Amazon S3 AWS CLI 命令,使用 --recursive 参数以递归方式将项目内容复制到环境的 dags 文件夹。该命令会创建一个名为 dbt 的子目录,您可以将其用于所有 dbt 项目。如果子目录已经存在,则项目文件将被复制到现有目录中,并且不会创建新目录。该命令还会为该特定入门项目在 dbt 目录中创建一个子目录。

    $ aws s3 cp dbt-starter-project s3://mwaa-bucket/dags/dbt/dbt-starter-project --recursive

    您可以为项目子目录使用不同的名称,以便在 dbt 父目录中组织多个 dbt 项目。

使用 DAG 验证 dbt 依赖项的安装

以下 DAG 使用 BashOperator 和 bash 命令来验证 Amazon MWAA 是否已成功安装 requirements.txt 中指定的 dbt 依赖项。

from airflow import DAG from airflow.operators.bash_operator import BashOperator from airflow.utils.dates import days_ago with DAG(dag_id="dbt-installation-test", schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag: cli_command = BashOperator( task_id="bash_command", bash_command="/usr/local/airflow/.local/bin/dbt --version" )

执行以下操作以查看任务日志并验证是否已安装 dbt 及其依赖项。

  1. 导航到 Amazon MWAA 控制台,然后从可用环境列表中选择打开 Airflow UI

  2. 在 Apache Airflow UI 上,从列表中找到 dbt-installation-test DAG,然后在该 Last Run 列下选择打开上一个成功任务的日期。

  3. 使用图表视图,选择 bash_command 任务以打开任务实例的详细信息。

  4. 选择日志来打开任务日志,然后验证日志是否成功列出了我们在 requirements.txt 中指定的 dbt 版本。

使用 DAG 来运行 dbt 项目

以下 DAG 使用 BashOperator 将您从本地 usr/local/airflow/dags/ 目录上传到 Amazon S3 的 dbt 项目复制到可写入的 /tmp 目录,然后运行 dbt 项目。bash 命令假设一个名为 dbt-starter-project 的 入门 dbt 项目。根据您项目目录的名称修改目录名称。

from airflow import DAG from airflow.operators.bash_operator import BashOperator from airflow.utils.dates import days_ago import os DAG_ID = os.path.basename(__file__).replace(".py", "") with DAG(dag_id=DAG_ID, schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag: cli_command = BashOperator( task_id="bash_command", bash_command="cp -R /usr/local/airflow/dags/dbt /tmp;\ cd /tmp/dbt/dbt-starter-project;\ /usr/local/airflow/.local/bin/dbt run --project-dir /tmp/dbt/dbt-starter-project/ --profiles-dir ..;\ cat /tmp/dbt_logs/dbt.log" )