Utiliser dbt avec Amazon MWAA - Amazon Managed Workflows for Apache Airflow

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Utiliser dbt avec Amazon MWAA

Cette rubrique explique comment utiliser dbt et Postgres avec Amazon MWAA. Au cours des étapes suivantes, vous allez ajouter les dépendances requises à votre requirements.txt et télécharger un exemple de projet dbt dans le compartiment Amazon S3 de votre environnement. Ensuite, vous utiliserez un exemple de DAG pour vérifier qu'Amazon MWAA a installé les dépendances, puis vous utiliserez le BashOperator pour exécuter le projet dbt.

Version

  • Vous pouvez utiliser l'exemple de code présenté sur cette page avec Apache Airflow v2 ou version ultérieure en Python 3.10.

Prérequis

Avant de pouvoir effectuer les étapes suivantes, vous aurez besoin des éléments suivants :

  • Un environnement Amazon MWAA utilisant Apache Airflow v2.2.2. Cet exemple a été écrit et testé avec la version 2.2.2. Vous devrez peut-être modifier l'exemple pour l'utiliser avec d'autres versions d'Apache Airflow.

  • Un exemple de projet de dette. Pour commencer à utiliser dbt avec Amazon MWAA, vous pouvez créer un fork et cloner le projet de démarrage dbt depuis le référentiel dbt-labs. GitHub

Dépendances

Pour utiliser Amazon MWAA avec dbt, ajoutez le script de démarrage suivant à votre environnement. Pour en savoir plus, consultez la section Utilisation d'un script de démarrage avec Amazon MWAA.

#!/bin/bash if [[ "${MWAA_AIRFLOW_COMPONENT}" != "worker" ]] then exit 0 fi echo "------------------------------" echo "Installing virtual Python env" echo "------------------------------" pip3 install --upgrade pip echo "Current Python version:" python3 --version echo "..." sudo pip3 install --user virtualenv sudo mkdir python3-virtualenv cd python3-virtualenv sudo python3 -m venv dbt-env sudo chmod -R 777 * echo "------------------------------" echo "Activating venv in" $DBT_ENV_PATH echo "------------------------------" source dbt-env/bin/activate pip3 list echo "------------------------------" echo "Installing libraries..." echo "------------------------------" # do not use sudo, as it will install outside the venv pip3 install dbt-redshift==1.6.1 dbt-postgres==1.6.1 echo "------------------------------" echo "Venv libraries..." echo "------------------------------" pip3 list dbt --version echo "------------------------------" echo "Deactivating venv..." echo "------------------------------" deactivate

Dans les sections suivantes, vous allez télécharger le répertoire de votre projet dbt sur Amazon S3 et exécuter un DAG qui vérifie si Amazon MWAA a correctement installé les dépendances dbt requises.

Importer un projet dbt sur Amazon S3

Pour pouvoir utiliser un projet dbt avec votre environnement Amazon MWAA, vous pouvez télécharger l'intégralité du répertoire du projet dans le dossier de dags votre environnement. Lorsque l'environnement est mis à jour, Amazon MWAA télécharge le répertoire dbt dans le dossier localusr/local/airflow/dags/.

Pour télécharger un projet dbt sur Amazon S3
  1. Accédez au répertoire dans lequel vous avez cloné le projet de démarrage dbt.

  2. Exécutez la AWS CLI commande Amazon S3 suivante pour copier de manière récursive le contenu du projet dans le dags dossier de votre environnement à l'aide du --recursive paramètre. La commande crée un sous-répertoire appelé dbt que vous pouvez utiliser pour tous vos projets dbt. Si le sous-répertoire existe déjà, les fichiers du projet sont copiés dans le répertoire existant et aucun nouveau répertoire n'est créé. La commande crée également un sous-répertoire dans le dbt répertoire pour ce projet de démarrage spécifique.

    $ aws s3 cp dbt-starter-project s3://mwaa-bucket/dags/dbt/dbt-starter-project --recursive

    Vous pouvez utiliser différents noms pour les sous-répertoires de projets afin d'organiser plusieurs projets dbt dans le répertoire parentdbt.

Utiliser un DAG pour vérifier l'installation de la dépendance dbt

Le DAG suivant utilise une commande BashOperator et une commande bash pour vérifier si Amazon MWAA a correctement installé les dépendances dbt spécifiées dans. requirements.txt

from airflow import DAG from airflow.operators.bash_operator import BashOperator from airflow.utils.dates import days_ago with DAG(dag_id="dbt-installation-test", schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag: cli_command = BashOperator( task_id="bash_command", bash_command="/usr/local/airflow/.local/bin/dbt --version" )

Procédez comme suit pour afficher les journaux des tâches et vérifier que dbt et ses dépendances ont été installés.

  1. Accédez à la console Amazon MWAA, puis choisissez Open Airflow UI dans la liste des environnements disponibles.

  2. Dans l'interface utilisateur d'Apache Airflow, recherchez le dbt-installation-test DAG dans la liste, puis choisissez la date sous la Last Run colonne pour ouvrir la dernière tâche réussie.

  3. À l'aide de Graph View, choisissez la bash_command tâche pour ouvrir les détails de l'instance de tâche.

  4. Choisissez Log pour ouvrir les journaux des tâches, puis vérifiez que les journaux répertorient correctement la version de dbt dans requirements.txt laquelle nous avons spécifiée.

Utiliser un DAG pour exécuter un projet dbt

Le DAG suivant utilise un BashOperator pour copier les projets dbt que vous avez chargés sur Amazon S3 depuis le usr/local/airflow/dags/ répertoire local vers le /tmp répertoire accessible en écriture, puis exécute le projet dbt. Les commandes bash supposent un projet dbt de démarrage intitulé. dbt-starter-project Modifiez le nom du répertoire en fonction du nom du répertoire de votre projet.

from airflow import DAG from airflow.operators.bash_operator import BashOperator from airflow.utils.dates import days_ago import os DAG_ID = os.path.basename(__file__).replace(".py", "") # assumes all files are in a subfolder of DAGs called dbt with DAG(dag_id=DAG_ID, schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag: cli_command = BashOperator( task_id="bash_command", bash_command="source /usr/local/airflow/python3-virtualenv/dbt-env/bin/activate;\ cp -R /usr/local/airflow/dags/dbt /tmp;\ echo 'listing project files:';\ ls -R /tmp;\ cd /tmp/dbt/mwaa_dbt_test_project;\ /usr/local/airflow/python3-virtualenv/dbt-env/bin/dbt run --project-dir /tmp/dbt/mwaa_dbt_test_project --profiles-dir ..;\ cat /tmp/dbt_logs/dbt.log;\ rm -rf /tmp/dbt/mwaa_dbt_test_project" )