Amazon MWAA での dbt の使用 - Amazon Managed Workflows for Apache Airflow

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Amazon MWAA での dbt の使用

このトピックでは、Amazon MWAA で dbt と Postgres を使用する方法を示します。次のステップでは、必要な依存関係を requirements.txt に追加し、サンプルの dbt プロジェクトを環境の Amazon S3 バケットにアップロードします。次に、サンプル DAG を使用して Amazon MWAA が依存関係をインストールしたことを確認し、最後に BashOperator を使用して dbt プロジェクトを実行します。

Version

  • このページのコード例は、「Python 3.10」で Apache Airflow v2 以上と共に使用可能です。

前提条件

次の手順を完了するには、以下のものが必要です。

  • Apache Airflow v2.2.2 を使用する Amazon MWAA 環境。このサンプルは v2.2.2 で作成され、テストされています。他の Apache Airflow バージョンで使用するためには、サンプルを変更する必要がある場合があります。

  • dbt プロジェクトのサンプル。Amazon MWAA で dbt の使用を開始するには、フォークを作成し、dbt-labs リポジトリから dbt スタータープロジェクトのクローンを作成できます。 GitHub

依存関係

dbt で Amazon MWAA を使用するには、次のスタートアップスクリプトを環境に追加します。詳細については、「Amazon MWAA でのスタートアップスクリプトの使用」を参照してください。

#!/bin/bash if [[ "${MWAA_AIRFLOW_COMPONENT}" != "worker" ]] then exit 0 fi echo "------------------------------" echo "Installing virtual Python env" echo "------------------------------" pip3 install --upgrade pip echo "Current Python version:" python3 --version echo "..." sudo pip3 install --user virtualenv sudo mkdir python3-virtualenv cd python3-virtualenv sudo python3 -m venv dbt-env sudo chmod -R 777 * echo "------------------------------" echo "Activating venv in" $DBT_ENV_PATH echo "------------------------------" source dbt-env/bin/activate pip3 list echo "------------------------------" echo "Installing libraries..." echo "------------------------------" # do not use sudo, as it will install outside the venv pip3 install dbt-redshift==1.6.1 dbt-postgres==1.6.1 echo "------------------------------" echo "Venv libraries..." echo "------------------------------" pip3 list dbt --version echo "------------------------------" echo "Deactivating venv..." echo "------------------------------" deactivate

以下のセクションでは、dbt プロジェクトディレクトリを Amazon S3 にアップロードし、Amazon MWAA が必要な dbt 依存関係を正常にインストールしたかどうかを検証する DAG を実行します。

DBT プロジェクトを Amazon S3 にアップロードする

Amazon MWAA 環境で dbt プロジェクトを使用できるようにするには、プロジェクトディレクトリ全体を環境のdagsフォルダにアップロードできます。環境が更新されると、Amazon MWAA は dbt ディレクトリをローカルusr/local/airflow/dags/フォルダにダウンロードします。

DBT プロジェクトを Amazon S3 にアップロードするには
  1. dbt スタータープロジェクトをクローンしたディレクトリに移動します。

  2. 次の Amazon S3 AWS CLI コマンドを実行して、 --recursiveパラメータを使用してプロジェクトのコンテンツを環境の dagsフォルダに再帰的にコピーします。このコマンドは、dbtと呼ばれるサブディレクトリを作成し、これをすべてのdbtプロジェクトに使用できます。サブディレクトリが既に存在する場合、プロジェクトファイルは既存のディレクトリにコピーされ、新しいディレクトリは作成されません。このコマンドは、この特定のスターターのために dbt ディレクトリ内にサブディレクトリも作成します。

    $ aws s3 cp dbt-starter-project s3://mwaa-bucket/dags/dbt/dbt-starter-project --recursive

    プロジェクトのサブディレクトリに異なる名前を使用して、親 dbt ディレクトリ内の複数の dbt プロジェクトを整理できます。

DAG を使用して dbt 依存関係のインストールを検証します。

次の DAGは、BashOperator を使用し、requirements.txt で指定されたdbtの依存関係を正常にインストールしたかどうかを Amazon MWAA が確認します。

from airflow import DAG from airflow.operators.bash_operator import BashOperator from airflow.utils.dates import days_ago with DAG(dag_id="dbt-installation-test", schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag: cli_command = BashOperator( task_id="bash_command", bash_command="/usr/local/airflow/.local/bin/dbt --version" )

以下を実行してタスクログを表示し、dbt とその依存関係がインストールされていることを確認します。

  1. Amazon MWAA コンソールに移動し、使用可能な環境のリストから [Airflow UI を開く] を選択します。

  2. Apache Airflow UI のリストから dbt-installation-test DAG を探し、Last Run列の下にある日付を選択して、最後に成功したタスクを開きます。

  3. グラフビューを使用してタスクを選択し、bash_commandタスクインスタンスの詳細を開きます。

  4. [Log] を選択してタスクログを開き、次に、requirements.txt で指定されたdbtのバージョンをログが正常にリストしていることを確認してください。

DAG を使用して dbt プロジェクトを実行します。

次の DAG は、BashOperatorを使用して Amazon S3 にアップロードした dbt プロジェクトをローカル usr/local/airflow/dags/ ディレクトリから書き込み可能な/tmpディレクトリにコピーし、dbt プロジェクトを実行します。bash コマンドは、dbt-starter-projectというタイトルのスターター dbt プロジェクトを想定しています。プロジェクトディレクトリの名前に従ってディレクトリ名を変更します。

from airflow import DAG from airflow.operators.bash_operator import BashOperator from airflow.utils.dates import days_ago import os DAG_ID = os.path.basename(__file__).replace(".py", "") # assumes all files are in a subfolder of DAGs called dbt with DAG(dag_id=DAG_ID, schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag: cli_command = BashOperator( task_id="bash_command", bash_command="source /usr/local/airflow/python3-virtualenv/dbt-env/bin/activate;\ cp -R /usr/local/airflow/dags/dbt /tmp;\ echo 'listing project files:';\ ls -R /tmp;\ cd /tmp/dbt/mwaa_dbt_test_project;\ /usr/local/airflow/python3-virtualenv/dbt-env/bin/dbt run --project-dir /tmp/dbt/mwaa_dbt_test_project --profiles-dir ..;\ cat /tmp/dbt_logs/dbt.log;\ rm -rf /tmp/dbt/mwaa_dbt_test_project" )