Installation de plugins personnalisés - Amazon Managed Workflows for Apache Airflow

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Installation de plugins personnalisés

Amazon Managed Workflows pour Apache Airflow prend en charge le gestionnaire de plugins intégré d'Apache Airflow, qui vous permet d'utiliser des opérateurs, des hooks, des capteurs ou des interfaces Apache Airflow personnalisés. Cette page décrit les étapes d'installation des plugins personnalisés Apache Airflow sur votre environnement Amazon MWAA à l'aide d'un plugins.zip fichier.

Prérequis

Vous aurez besoin des éléments suivants avant de pouvoir effectuer les étapes indiquées sur cette page.

  • Autorisations — Votre AWS compte doit avoir été autorisé par votre administrateur à accéder à la politique de contrôle d'FullConsoleAccessaccès d'AmazonMWAA pour votre environnement. En outre, votre environnement Amazon MWAA doit être autorisé par votre rôle d'exécution à accéder aux AWS ressources utilisées par votre environnement.

  • Accès : si vous devez accéder à des référentiels publics pour installer des dépendances directement sur le serveur Web, votre environnement doit être configuré avec un accès au serveur Web du réseau public. Pour plus d'informations, consultez Modes d'accès à Apache Airflow.

  • Configuration Amazon S3 — Le compartiment Amazon S3 utilisé pour stocker vos DAG, vos plugins personnalisés et vos dépendances Python requirements.txt doit être configuré avec l'accès public bloqué et le versionnage activé. plugins.zip

Fonctionnement

Pour exécuter des plugins personnalisés sur votre environnement, vous devez effectuer trois opérations :

  1. Créez un plugins.zip fichier localement.

  2. Téléchargez le plugins.zip fichier local dans votre compartiment Amazon S3.

  3. Spécifiez la version de ce fichier dans le champ Fichier de plugins de la console Amazon MWAA.

Note

Si c'est la première fois que vous chargez un plugins.zip fichier dans votre compartiment Amazon S3, vous devez également spécifier le chemin d'accès au fichier sur la console Amazon MWAA. Vous ne devez effectuer cette étape qu'une seule fois.

Ce qui a changé dans la version 2

  • Nouveau : opérateurs, crochets et exécuteurs. Les instructions d'importation dans vos DAG et les plugins personnalisés que vous spécifiez dans un MWAA plugins.zip sur Amazon ont changé entre Apache Airflow v1 et Apache Airflow v2. Par exemple, from airflow.contrib.hooks.aws_hook import AwsHook dans Apache Airflow v1, il a été remplacé par Apache Airflow v2. from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook Pour en savoir plus, consultez le manuel de référence de l'API Python dans le guide de référence d'Apache Airflow.

  • Nouveau : Importations dans des plugins. L'importation d'opérateurs, de capteurs, de crochets ajoutés dans les plugins à l'aide de plugins n'airflow.{operators,sensors,hooks}.<plugin_name>est plus prise en charge. Ces extensions doivent être importées en tant que modules Python classiques. Dans les versions 2 et supérieures, l'approche recommandée consiste à les placer dans le répertoire DAG et à créer et utiliser un fichier .airflowignore pour les empêcher d'être analysés en tant que DAG. Pour en savoir plus, consultez les sections Gestion des modules et Création d'un opérateur personnalisé dans le guide de référence d'Apache Airflow.

Vue d'ensemble des plugins personnalisés

Le gestionnaire de plugins intégré à Apache Airflow peut intégrer des fonctionnalités externes à son cœur en déposant simplement des fichiers dans un $AIRFLOW_HOME/plugins dossier. Il vous permet d'utiliser des opérateurs, des hooks, des capteurs ou des interfaces Apache Airflow personnalisés. La section suivante fournit un exemple de structures de répertoires plates et imbriquées dans un environnement de développement local et les instructions d'importation qui en résultent, qui déterminent la structure de répertoire dans un fichier plugins.zip.

Répertoire des plugins personnalisés et limites de taille

Le planificateur Apache Airflow et les Workers recherchent des plugins personnalisés lors du démarrage sur le conteneur AWS Fargate géré pour votre environnement sur. /usr/local/airflow/plugins/*

  • Structure du répertoire. La structure du répertoire (at/*) est basée sur le contenu de votre plugins.zip fichier. Par exemple, si vous plugins.zip incluez le operators répertoire en tant que répertoire de premier niveau, le répertoire sera extrait dans votre environnement. /usr/local/airflow/plugins/operators

  • Limite de taille. Nous recommandons un plugins.zip fichier de moins de 1 Go. Plus la taille d'un plugins.zip fichier est importante, plus le temps de démarrage d'un environnement est long. Bien qu'Amazon MWAA ne limite pas explicitement la taille d'un plugins.zip fichier, si les dépendances ne peuvent pas être installées dans les dix minutes, le service Fargate expirera et tentera de rétablir la stabilité de l'environnement.

Note

Pour les environnements utilisant Apache Airflow v1.10.12 ou Apache Airflow v2.0.2, Amazon MWAA limite le trafic sortant sur le serveur Web Apache Airflow et ne vous permet pas d'installer des plugins ni des dépendances Python directement sur le serveur Web. À partir d'Apache Airflow v2.2.2, Amazon MWAA peut installer des plugins et des dépendances directement sur le serveur Web.

Exemples de plugins personnalisés

La section suivante utilise un exemple de code du guide de référence Apache Airflow pour montrer comment structurer votre environnement de développement local.

Exemple d'utilisation d'une structure de répertoire plate dans plugins.zip

Apache Airflow v2

L'exemple suivant montre un plugins.zip fichier avec une structure de répertoire plate pour Apache Airflow v2.

Exemple répertoire plat avec PythonVirtualenvOperator plugins.zip

L'exemple suivant montre l'arborescence de niveau supérieur d'un fichier plugins.zip pour le plugin PythonVirtualenvOperator personnalisé dansCréation d'un plugin personnalisé pour Apache AirflowPythonVirtualenvOperator.

├── virtual_python_plugin.py
Exemple plugins/virtual_python_plugin.py

L'exemple suivant montre le plugin PythonVirtualenvOperator personnalisé.

""" Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. """ from airflow.plugins_manager import AirflowPlugin import airflow.utils.python_virtualenv from typing import List def _generate_virtualenv_cmd(tmp_dir: str, python_bin: str, system_site_packages: bool) -> List[str]: cmd = ['python3','/usr/local/airflow/.local/lib/python3.7/site-packages/virtualenv', tmp_dir] if system_site_packages: cmd.append('--system-site-packages') if python_bin is not None: cmd.append(f'--python={python_bin}') return cmd airflow.utils.python_virtualenv._generate_virtualenv_cmd=_generate_virtualenv_cmd class VirtualPythonPlugin(AirflowPlugin): name = 'virtual_python_plugin'
Apache Airflow v1

L'exemple suivant montre un plugins.zip fichier avec une structure de répertoire plate pour Apache Airflow v1.

Exemple répertoire plat avec PythonVirtualenvOperator plugins.zip

L'exemple suivant montre l'arborescence de niveau supérieur d'un fichier plugins.zip pour le plugin PythonVirtualenvOperator personnalisé dansCréation d'un plugin personnalisé pour Apache AirflowPythonVirtualenvOperator.

├── virtual_python_plugin.py
Exemple plugins/virtual_python_plugin.py

L'exemple suivant montre le plugin PythonVirtualenvOperator personnalisé.

from airflow.plugins_manager import AirflowPlugin from airflow.operators.python_operator import PythonVirtualenvOperator def _generate_virtualenv_cmd(self, tmp_dir): cmd = ['python3','/usr/local/airflow/.local/lib/python3.7/site-packages/virtualenv', tmp_dir] if self.system_site_packages: cmd.append('--system-site-packages') if self.python_version is not None: cmd.append('--python=python{}'.format(self.python_version)) return cmd PythonVirtualenvOperator._generate_virtualenv_cmd=_generate_virtualenv_cmd class EnvVarPlugin(AirflowPlugin): name = 'virtual_python_plugin'

Exemple d'utilisation d'une structure de répertoire imbriquée dans plugins.zip

Apache Airflow v2

L'exemple suivant montre un plugins.zip fichier avec des répertoires distincts pourhooks,operators, et un sensors répertoire pour Apache Airflow v2.

Exemple plugins.zip
__init__.py my_airflow_plugin.py hooks/ |-- __init__.py |-- my_airflow_hook.py operators/ |-- __init__.py |-- my_airflow_operator.py |-- hello_operator.py sensors/ |-- __init__.py |-- my_airflow_sensor.py

L'exemple suivant montre les instructions d'importation dans le DAG (dossier DAG) qui utilise les plugins personnalisés.

Exemple dags/your_dag.py
from airflow import DAG from datetime import datetime, timedelta from operators.my_airflow_operator import MyOperator from sensors.my_airflow_sensor import MySensor from operators.hello_operator import HelloOperator default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2018, 1, 1), 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), } with DAG('customdag', max_active_runs=3, schedule_interval='@once', default_args=default_args) as dag: sens = MySensor( task_id='taskA' ) op = MyOperator( task_id='taskB', my_field='some text' ) hello_task = HelloOperator(task_id='sample-task', name='foo_bar') sens >> op >> hello_task
Exemple plugins/my_airflow_plugin.py
from airflow.plugins_manager import AirflowPlugin from hooks.my_airflow_hook import * from operators.my_airflow_operator import * class PluginName(AirflowPlugin): name = 'my_airflow_plugin' hooks = [MyHook] operators = [MyOperator] sensors = [MySensor]

Les exemples suivants montrent chacune des instructions d'importation nécessaires dans les fichiers de plug-in personnalisés.

Exemple hooks/my_airflow_hook.py
from airflow.hooks.base import BaseHook class MyHook(BaseHook): def my_method(self): print("Hello World")
Exemple sensors/my_airflow_sensor.py
from airflow.sensors.base import BaseSensorOperator from airflow.utils.decorators import apply_defaults class MySensor(BaseSensorOperator): @apply_defaults def __init__(self, *args, **kwargs): super(MySensor, self).__init__(*args, **kwargs) def poke(self, context): return True
Exemple operators/my_airflow_operator.py
from airflow.operators.bash import BaseOperator from airflow.utils.decorators import apply_defaults from hooks.my_airflow_hook import MyHook class MyOperator(BaseOperator): @apply_defaults def __init__(self, my_field, *args, **kwargs): super(MyOperator, self).__init__(*args, **kwargs) self.my_field = my_field def execute(self, context): hook = MyHook('my_conn') hook.my_method()
Exemple operators/hello_operator.py
from airflow.models.baseoperator import BaseOperator from airflow.utils.decorators import apply_defaults class HelloOperator(BaseOperator): @apply_defaults def __init__( self, name: str, **kwargs) -> None: super().__init__(**kwargs) self.name = name def execute(self, context): message = "Hello {}".format(self.name) print(message) return message

Suivez les étapes décrites dans Tester des plugins personnalisés à l'aide de l'utilitaire Amazon MWAA CLI, puis créez un fichier plugins.zip pour compresser le contenu dans votre plugins répertoire. Par exemple, cd plugins.

Apache Airflow v1

L'exemple suivant montre un plugins.zip fichier avec des répertoires distincts pour hooksoperators, et un sensors répertoire pour Apache Airflow v1.10.12.

Exemple plugins.zip
__init__.py my_airflow_plugin.py hooks/ |-- __init__.py |-- my_airflow_hook.py operators/ |-- __init__.py |-- my_airflow_operator.py |-- hello_operator.py sensors/ |-- __init__.py |-- my_airflow_sensor.py

L'exemple suivant montre les instructions d'importation dans le DAG (dossier DAG) qui utilise les plugins personnalisés.

Exemple dags/your_dag.py
from airflow import DAG from datetime import datetime, timedelta from operators.my_operator import MyOperator from sensors.my_sensor import MySensor from operators.hello_operator import HelloOperator default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2018, 1, 1), 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), } with DAG('customdag', max_active_runs=3, schedule_interval='@once', default_args=default_args) as dag: sens = MySensor( task_id='taskA' ) op = MyOperator( task_id='taskB', my_field='some text' ) hello_task = HelloOperator(task_id='sample-task', name='foo_bar') sens >> op >> hello_task
Exemple plugins/my_airflow_plugin.py
from airflow.plugins_manager import AirflowPlugin from hooks.my_airflow_hook import * from operators.my_airflow_operator import * from utils.my_utils import * class PluginName(AirflowPlugin): name = 'my_airflow_plugin' hooks = [MyHook] operators = [MyOperator] sensors = [MySensor]

Les exemples suivants montrent chacune des instructions d'importation nécessaires dans les fichiers de plug-in personnalisés.

Exemple hooks/my_airflow_hook.py
from airflow.hooks.base_hook import BaseHook class MyHook(BaseHook): def my_method(self): print("Hello World")
Exemple sensors/my_airflow_sensor.py
from airflow.sensors.base_sensor_operator import BaseSensorOperator from airflow.utils.decorators import apply_defaults class MySensor(BaseSensorOperator): @apply_defaults def __init__(self, *args, **kwargs): super(MySensor, self).__init__(*args, **kwargs) def poke(self, context): return True
Exemple operators/my_airflow_operator.py
from airflow.operators.bash_operator import BaseOperator from airflow.utils.decorators import apply_defaults from hooks.my_hook import MyHook class MyOperator(BaseOperator): @apply_defaults def __init__(self, my_field, *args, **kwargs): super(MyOperator, self).__init__(*args, **kwargs) self.my_field = my_field def execute(self, context): hook = MyHook('my_conn') hook.my_method()
Exemple operators/hello_operator.py
from airflow.models.baseoperator import BaseOperator from airflow.utils.decorators import apply_defaults class HelloOperator(BaseOperator): @apply_defaults def __init__( self, name: str, **kwargs) -> None: super().__init__(**kwargs) self.name = name def execute(self, context): message = "Hello {}".format(self.name) print(message) return message

Suivez les étapes décrites dans Tester des plugins personnalisés à l'aide de l'utilitaire Amazon MWAA CLI, puis créez un fichier plugins.zip pour compresser le contenu dans votre plugins répertoire. Par exemple, cd plugins.

Création d'un fichier plugins.zip

Les étapes suivantes décrivent les étapes que nous recommandons pour créer un fichier plugins.zip localement.

Première étape : tester les plugins personnalisés à l'aide de l'utilitaire Amazon MWAA CLI

  • L'utilitaire d'interface de ligne de commande (CLI) reproduit localement un environnement Amazon Managed Workflows pour Apache Airflow.

  • La CLI crée localement une image de conteneur Docker similaire à une image de production Amazon MWAA. Cela vous permet d'exécuter un environnement Apache Airflow local pour développer et tester des DAG, des plugins personnalisés et des dépendances avant le déploiement sur Amazon MWAA.

  • Pour exécuter la CLI, reportez-vous à la section aws-mwaa-local-runneron GitHub.

Deuxième étape : créer le fichier plugins.zip

Vous pouvez utiliser un utilitaire d'archivage ZIP intégré ou tout autre utilitaire ZIP (tel que 7zip) pour créer un fichier .zip.

Note

L'utilitaire zip intégré pour le système d'exploitation Windows peut ajouter des sous-dossiers lorsque vous créez un fichier .zip. Nous vous recommandons de vérifier le contenu du fichier plugins.zip avant de le télécharger dans votre compartiment Amazon S3 afin de vous assurer qu'aucun répertoire supplémentaire n'a été ajouté.

  1. Remplacez les répertoires par votre répertoire de plugins Airflow local. Par exemple :

    myproject$ cd plugins
  2. Exécutez la commande suivante pour vous assurer que le contenu dispose d'autorisations exécutables (macOS et Linux uniquement).

    plugins$ chmod -R 755 .
  3. Compressez le contenu de votre plugins dossier.

    plugins$ zip -r plugins.zip .

Téléchargement plugins.zip vers Amazon S3

Vous pouvez utiliser la console Amazon S3 ou le AWS Command Line Interface (AWS CLI) pour charger un plugins.zip fichier dans votre compartiment Amazon S3.

Utilisation de la AWS CLI

L'AWS Command Line Interface (AWS CLI) est un outil à code source libre qui vous permet d'interagir avec les services AWS à l'aide des commandes du terminal de ligne de commande. Pour effectuer les étapes indiquées sur cette page, vous avez besoin des éléments suivants :

Pour effectuer un téléchargement à l'aide du AWS CLI
  1. Dans votre invite de commande, accédez au répertoire dans lequel votre plugins.zip fichier est stocké. Par exemple :

    cd plugins
  2. Utilisez la commande suivante pour répertorier tous vos compartiments Amazon S3.

    aws s3 ls
  3. Utilisez la commande suivante pour répertorier les fichiers et les dossiers du compartiment Amazon S3 de votre environnement.

    aws s3 ls s3://YOUR_S3_BUCKET_NAME
  4. Utilisez la commande suivante pour charger le plugins.zip fichier dans le compartiment Amazon S3 de votre environnement.

    aws s3 cp plugins.zip s3://YOUR_S3_BUCKET_NAME/plugins.zip

Utilisation de la console Amazon S3

La console Amazon S3 est une interface utilisateur Web qui vous permet de créer et de gérer les ressources de votre compartiment Amazon S3.

Pour charger à l'aide de la console Amazon S3
  1. Ouvrez la page Environnements sur la console Amazon MWAA.

  2. Choisissez un environnement.

  3. Sélectionnez le lien du compartiment S3 dans le code DAG du volet S3 pour ouvrir votre compartiment de stockage sur la console Amazon S3.

  4. Sélectionnez Charger.

  5. Choisissez Ajouter un fichier.

  6. Sélectionnez la copie locale de votre fichierplugins.zip, puis choisissez Upload.

Installation de plugins personnalisés sur votre environnement

Cette section décrit comment installer les plugins personnalisés que vous avez chargés dans votre compartiment Amazon S3 en spécifiant le chemin d'accès au fichier plugins.zip et en spécifiant la version du fichier plugins.zip chaque fois que le fichier zip est mis à jour.

Spécifier le chemin d'accès plugins.zip sur la console Amazon MWAA (pour la première fois)

Si c'est la première fois que vous chargez un plugins.zip fichier dans votre compartiment Amazon S3, vous devez également spécifier le chemin d'accès au fichier sur la console Amazon MWAA. Vous ne devez effectuer cette étape qu'une seule fois.

  1. Ouvrez la page Environnements sur la console Amazon MWAA.

  2. Choisissez un environnement.

  3. Choisissez Modifier.

  4. Dans le code DAG du volet Amazon S3, choisissez Browse S3 à côté du champ facultatif « Fichier de plugins ».

  5. Sélectionnez le plugins.zip fichier dans votre compartiment Amazon S3.

  6. Choisissez Choisir.

  7. Choisissez Suivant, Mettre à jour l'environnement.

Spécification de la plugins.zip version sur la console Amazon MWAA

Vous devez spécifier la version de votre plugins.zip fichier sur la console Amazon MWAA chaque fois que vous chargez une nouvelle version de votre fichier plugins.zip dans votre compartiment Amazon S3.

  1. Ouvrez la page Environnements sur la console Amazon MWAA.

  2. Choisissez un environnement.

  3. Choisissez Modifier.

  4. Sur le code DAG dans le volet Amazon S3, choisissez une plugins.zip version dans la liste déroulante.

  5. Choisissez Suivant.

Exemples de cas d'utilisation pour plugins.zip

Quelle est la prochaine étape ?

  • Testez vos DAG, vos plugins personnalisés et vos dépendances Python localement à l'aide de l'option aws-mwaa-local-runneron GitHub.