本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
在 Amazon MWAA 中使用 dbt
本主题演示了如何在 Amazon MWAA 中使用 dbt 和 Postgres。在以下步骤中,您将所需的依赖项添加到 requirements.txt
中,并将示例 dbt 项目上传到环境的 Amazon S3 存储桶。然后,您将使用示例 DAG 来验证 Amazon MWAA 是否已安装依赖项,最后使用 BashOperator
来运行 dbt 项目。
版本
-
您可以将本页上的代码示例与 Python 3.10
中的 Apache Airflow v2 及更高版本一起使用。
先决条件
在完成以下步骤之前,您需要具备以下条件:
-
使用 Apache Airflow v2.2.2 的Amazon MWAA 环境。此示例已编写,并使用 v2.2.2 进行了测试。您可能需要修改示例以与其他 Apache Airflow 版本一起使用。
-
dbt 项目示例。要开始在 Amazon MWAA 中使用 dbt,您可以创建一个分支并从 dbt-labs GitHub 存储库中克隆 dbt 入门项目
。
附属物
要将 Amazon MWAA 与 dbt 配合使用,请将以下依赖项添加到 requirements.txt
。要了解更多信息,请参阅 安装 Python 依赖项
当环境完成更新后,Amazon MWAA 将安装所需的 dbt 库和其他依赖项,例如 psycopg2
。
注意
Apache Airflow v2.2.2 提供的默认约束文件有一个相互冲突的 jsonschema
版本,本指南中使用的 dbt 版本不支持该版本。因此,在将 Amazon MWAA 与 dbt 配合使用时,您可以将 Apache Airflow 约束文件下载并修改到 Amazon S3 DAG 文件夹中,然后在 requirements.txt
文件中将其引用为 --constraint /usr/local/airflow/dags/my-updated-constraint.txt
,也可以在 requirements.txt
中省略 --constraint
,如下所示。
json-rpc==1.13.0 minimal-snowplow-tracker==0.0.2 packaging==20.9 networkx==2.6.3 mashumaro==2.5 sqlparse==0.4.2 logbook==1.5.3 agate==1.6.1 dbt-extractor==0.4.0 pyparsing==2.4.7 msgpack==1.0.2 parsedatetime==2.6 pytimeparse==1.1.8 leather==0.3.4 pyyaml==5.4.1 # Airflow constraints are jsonschema==3.2.0 jsonschema==3.1.1 hologram==0.0.14 dbt-core==0.21.1 psycopg2-binary==2.8.6 dbt-postgres==0.21.1 dbt-redshift==0.21.1
在以下章节中,您可将 dbt 项目目录上传到 Amazon S3 并运行 DAG 来验证 Amazon MWAA 是否已成功安装所需的 dbt 依赖项。
将 dbt 项目上传到 Amazon S3
为了能够在 Amazon MWAA 环境中使用 dbt 项目,您可以将整个项目目录上传到环境的 dags
文件夹中。当环境更新时,Amazon MWAA 会将 dbt 目录下载到本地 usr/local/airflow/dags/
文件夹。
要将 dbt 项目上传到 Amazon S3,请执行以下操作
-
导航到您克隆 dbt 入门项目的目录。
-
运行以下 Amazon S3 AWS CLI 命令,使用
--recursive
参数以递归方式将项目内容复制到环境的dags
文件夹。该命令会创建一个名为dbt
的子目录,您可以将其用于所有 dbt 项目。如果子目录已经存在,则项目文件将被复制到现有目录中,并且不会创建新目录。该命令还会为该特定入门项目在dbt
目录中创建一个子目录。$
aws s3 cp
dbt-starter-project
s3://mwaa-bucket
/dags/dbt/dbt-starter-project
--recursive您可以为项目子目录使用不同的名称,以便在
dbt
父目录中组织多个 dbt 项目。
使用 DAG 验证 dbt 依赖项的安装
以下 DAG 使用 BashOperator
和 bash 命令来验证 Amazon MWAA 是否已成功安装 requirements.txt
中指定的 dbt 依赖项。
from airflow import DAG from airflow.operators.bash_operator import BashOperator from airflow.utils.dates import days_ago with DAG(dag_id="dbt-installation-test", schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag: cli_command = BashOperator( task_id="bash_command", bash_command="/usr/local/airflow/.local/bin/dbt --version" )
执行以下操作以查看任务日志并验证是否已安装 dbt 及其依赖项。
-
导航到 Amazon MWAA 控制台,然后从可用环境列表中选择打开 Airflow UI。
-
在 Apache Airflow UI 上,从列表中找到
dbt-installation-test
DAG,然后在该Last Run
列下选择打开上一个成功任务的日期。 -
使用图表视图,选择
bash_command
任务以打开任务实例的详细信息。 -
选择日志来打开任务日志,然后验证日志是否成功列出了我们在
requirements.txt
中指定的 dbt 版本。
使用 DAG 来运行 dbt 项目
以下 DAG 使用 BashOperator
将您从本地 usr/local/airflow/dags/
目录上传到 Amazon S3 的 dbt 项目复制到可写入的 /tmp
目录,然后运行 dbt 项目。bash 命令假设一个名为 dbt-starter-project
的 入门 dbt 项目。根据您项目目录的名称修改目录名称。
from airflow import DAG from airflow.operators.bash_operator import BashOperator from airflow.utils.dates import days_ago import os DAG_ID = os.path.basename(__file__).replace(".py", "") with DAG(dag_id=DAG_ID, schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag: cli_command = BashOperator( task_id="bash_command", bash_command="cp -R /usr/local/airflow/dags/dbt /tmp;\ cd /tmp/dbt/
dbt-starter-project
;\ /usr/local/airflow/.local/bin/dbt run --project-dir /tmp/dbt/dbt-starter-project
/ --profiles-dir ..;\ cat /tmp/dbt_logs/dbt.log" )