Installazione di plugin personalizzati - Amazon Managed Workflows for Apache Airflow

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Installazione di plugin personalizzati

Amazon Managed Workflows for Apache Airflow supporta il gestore di plugin integrato di Apache Airflow, che consente di utilizzare operatori, hook, sensori o interfacce Apache Airflow personalizzati. Questa pagina descrive i passaggi per installare i plug-in personalizzati Apache Airflow sul tuo ambiente Amazon MWAA utilizzando un file. plugins.zip

Prerequisiti

Avrai bisogno di quanto segue prima di completare i passaggi di questa pagina.

  • Autorizzazioni: al tuo AWS account deve essere stato concesso dall'amministratore l'accesso alla politica di controllo degli accessi di AmazonMWAA per il tuo FullConsoleAccess ambiente. Inoltre, il tuo ambiente Amazon MWAA deve essere autorizzato dal tuo ruolo di esecuzione ad accedere alle AWS risorse utilizzate dal tuo ambiente.

  • Accesso: se è necessario accedere agli archivi pubblici per installare le dipendenze direttamente sul server Web, l'ambiente deve essere configurato con l'accesso al server Web di rete pubblica. Per ulteriori informazioni, consulta Modalità di accesso Apache Airflow.

  • Configurazione Amazon S3 : il bucket Amazon S3 utilizzato per archiviare i DAGplugins.zip, i plug-in personalizzati e le dipendenze requirements.txt Python deve essere configurato con Public Access Blocked e Versioning Enabled.

Come funziona

Per eseguire plugin personalizzati nel tuo ambiente, devi fare tre cose:

  1. Crea un plugins.zip file localmente.

  2. Carica il plugins.zip file locale nel tuo bucket Amazon S3.

  3. Specificare la versione di questo file nel campo File Plugins sulla console Amazon MWAA.

Nota

Se è la prima volta che carichi un plugins.zip file nel tuo bucket Amazon S3, devi anche specificare il percorso del file sulla console Amazon MWAA. Devi completare questo passaggio solo una volta.

Cosa è cambiato nella v2

  • Novità: operatori, ganci ed esecutori. Le istruzioni di importazione nei tuoi DAG e i plug-in personalizzati specificati in un MWAA su plugins.zip Amazon sono cambiati tra Apache Airflow v1 e Apache Airflow v2. Ad esempio, in Apache Airflow v1 è cambiato from airflow.contrib.hooks.aws_hook import AwsHook in Apache Airflow v2. from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook Per saperne di più, consulta Python API Reference nella guida di riferimento di Apache Airflow.

  • Novità: importazioni nei plugin. L'importazione di operatori, sensori, hook aggiunti tramite plugin non airflow.{operators,sensors,hooks}.<plugin_name> è più supportata. Queste estensioni devono essere importate come normali moduli Python. Nella versione 2 e successive, l'approccio consigliato consiste nel metterle nella directory DAG e creare e utilizzare un file.airflowignore per escluderle dall'analisi come DAG. Per ulteriori informazioni, consulta Gestione dei moduli e creazione di un operatore personalizzato nella guida di riferimento di Apache Airflow.

Panoramica dei plugin personalizzati

Il gestore di plugin integrato di Apache Airflow può integrare funzionalità esterne al suo interno semplicemente trascinando i file in una cartella. $AIRFLOW_HOME/plugins Consente di utilizzare operatori, hook, sensori o interfacce Apache Airflow personalizzati. La sezione seguente fornisce un esempio di strutture di directory piatte e annidate in un ambiente di sviluppo locale e le istruzioni di importazione risultanti, che determinano la struttura delle directory all'interno di un plugins.zip.

Limiti di directory e dimensioni dei plugin personalizzati

Apache Airflow Scheduler e Workers cercano plugin personalizzati durante l'avvio sul contenitore Fargate gestito da AWS Fargate per il vostro ambiente in. /usr/local/airflow/plugins/*

  • Struttura delle directory. La struttura delle cartelle (at/*) si basa sul contenuto del plugins.zip file. Ad esempio, se la directory è plugins.zip operators contenuta come directory di primo livello, la directory verrà estratta nell'ambiente /usr/local/airflow/plugins/operators in cui si trova.

  • Limite di dimensione. Consigliamo un plugins.zip file inferiore a 1 GB. Maggiore è la dimensione di un plugins.zip file, maggiore è il tempo di avvio in un ambiente. Sebbene Amazon MWAA non limiti esplicitamente la dimensione di un plugins.zip file, se le dipendenze non possono essere installate entro dieci minuti, il servizio Fargate andrà in timeout e tenterà di ripristinare l'ambiente a uno stato stabile.

Nota

Per gli ambienti che utilizzano Apache Airflow v1.10.12 o Apache Airflow v2.0.2, Amazon MWAA limita il traffico in uscita sul server Web Apache Airflow e non consente di installare plugin o dipendenze Python direttamente sul server Web. A partire da Apache Airflow v2.2.2, Amazon MWAA può installare plugin e dipendenze direttamente sul server Web.

Esempi di plugin personalizzati

La sezione seguente utilizza il codice di esempio contenuto nella guida di riferimento di Apache Airflow per mostrare come strutturare l'ambiente di sviluppo locale.

Esempio di utilizzo di una struttura di directory piatta in plugins.zip

Apache Airflow v2

L'esempio seguente mostra un plugins.zip file con una struttura di directory piatta per Apache Airflow v2.

Esempio directory piatta con plugins.zip PythonVirtualenvOperator

L'esempio seguente mostra l'albero di primo livello di un file plugins.zip per il plugin PythonVirtualenvOperator personalizzato inCreazione di un plugin personalizzato per Apache AirflowPythonVirtualenvOperator.

├── virtual_python_plugin.py
Esempio plugins/virtual_python_plugin.py

L'esempio seguente mostra il plugin PythonVirtualenvOperator personalizzato.

""" Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. """ from airflow.plugins_manager import AirflowPlugin import airflow.utils.python_virtualenv from typing import List def _generate_virtualenv_cmd(tmp_dir: str, python_bin: str, system_site_packages: bool) -> List[str]: cmd = ['python3','/usr/local/airflow/.local/lib/python3.7/site-packages/virtualenv', tmp_dir] if system_site_packages: cmd.append('--system-site-packages') if python_bin is not None: cmd.append(f'--python={python_bin}') return cmd airflow.utils.python_virtualenv._generate_virtualenv_cmd=_generate_virtualenv_cmd class VirtualPythonPlugin(AirflowPlugin): name = 'virtual_python_plugin'
Apache Airflow v1

L'esempio seguente mostra un plugins.zip file con una struttura di directory piatta per Apache Airflow v1.

Esempio directory piatta con plugins.zip PythonVirtualenvOperator

L'esempio seguente mostra l'albero di primo livello di un file plugins.zip per il plugin PythonVirtualenvOperator personalizzato inCreazione di un plugin personalizzato per Apache AirflowPythonVirtualenvOperator.

├── virtual_python_plugin.py
Esempio plugins/virtual_python_plugin.py

L'esempio seguente mostra il plugin PythonVirtualenvOperator personalizzato.

from airflow.plugins_manager import AirflowPlugin from airflow.operators.python_operator import PythonVirtualenvOperator def _generate_virtualenv_cmd(self, tmp_dir): cmd = ['python3','/usr/local/airflow/.local/lib/python3.7/site-packages/virtualenv', tmp_dir] if self.system_site_packages: cmd.append('--system-site-packages') if self.python_version is not None: cmd.append('--python=python{}'.format(self.python_version)) return cmd PythonVirtualenvOperator._generate_virtualenv_cmd=_generate_virtualenv_cmd class EnvVarPlugin(AirflowPlugin): name = 'virtual_python_plugin'

Esempio di utilizzo di una struttura di directory annidata in plugins.zip

Apache Airflow v2

L'esempio seguente mostra un plugins.zip file con directory separate per e una sensors directory per hooks Apache Airflow v2. operators

Esempio plugins.zip
__init__.py my_airflow_plugin.py hooks/ |-- __init__.py |-- my_airflow_hook.py operators/ |-- __init__.py |-- my_airflow_operator.py |-- hello_operator.py sensors/ |-- __init__.py |-- my_airflow_sensor.py

L'esempio seguente mostra le istruzioni di importazione nel DAG (cartella DAG) che utilizza i plugin personalizzati.

Esempio dags/your_dag.py
from airflow import DAG from datetime import datetime, timedelta from operators.my_airflow_operator import MyOperator from sensors.my_airflow_sensor import MySensor from operators.hello_operator import HelloOperator default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2018, 1, 1), 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), } with DAG('customdag', max_active_runs=3, schedule_interval='@once', default_args=default_args) as dag: sens = MySensor( task_id='taskA' ) op = MyOperator( task_id='taskB', my_field='some text' ) hello_task = HelloOperator(task_id='sample-task', name='foo_bar') sens >> op >> hello_task
Esempio plugins/my_airflow_plugin.py
from airflow.plugins_manager import AirflowPlugin from hooks.my_airflow_hook import * from operators.my_airflow_operator import * class PluginName(AirflowPlugin): name = 'my_airflow_plugin' hooks = [MyHook] operators = [MyOperator] sensors = [MySensor]

I seguenti esempi mostrano ciascuna delle istruzioni di importazione necessarie nei file dei plugin personalizzati.

Esempio hooks/my_airflow_hook.py
from airflow.hooks.base import BaseHook class MyHook(BaseHook): def my_method(self): print("Hello World")
Esempio sensors/my_airflow_sensor.py
from airflow.sensors.base import BaseSensorOperator from airflow.utils.decorators import apply_defaults class MySensor(BaseSensorOperator): @apply_defaults def __init__(self, *args, **kwargs): super(MySensor, self).__init__(*args, **kwargs) def poke(self, context): return True
Esempio operators/my_airflow_operator.py
from airflow.operators.bash import BaseOperator from airflow.utils.decorators import apply_defaults from hooks.my_airflow_hook import MyHook class MyOperator(BaseOperator): @apply_defaults def __init__(self, my_field, *args, **kwargs): super(MyOperator, self).__init__(*args, **kwargs) self.my_field = my_field def execute(self, context): hook = MyHook('my_conn') hook.my_method()
Esempio operators/hello_operator.py
from airflow.models.baseoperator import BaseOperator from airflow.utils.decorators import apply_defaults class HelloOperator(BaseOperator): @apply_defaults def __init__( self, name: str, **kwargs) -> None: super().__init__(**kwargs) self.name = name def execute(self, context): message = "Hello {}".format(self.name) print(message) return message

Segui i passaggi descritti in Testare i plug-in personalizzati utilizzando l'utilità CLI di Amazon MWAA e quindi Creazione di un file plugins.zip per comprimere i contenuti all'interno della directory. plugins Ad esempio, cd plugins.

Apache Airflow v1

L'esempio seguente mostra un plugins.zip file con directory separate per e una directory per sensors Apache hooks operators Airflow v1.10.12.

Esempio plugins.zip
__init__.py my_airflow_plugin.py hooks/ |-- __init__.py |-- my_airflow_hook.py operators/ |-- __init__.py |-- my_airflow_operator.py |-- hello_operator.py sensors/ |-- __init__.py |-- my_airflow_sensor.py

L'esempio seguente mostra le istruzioni di importazione nel DAG (cartella DAG) che utilizza i plugin personalizzati.

Esempio dags/your_dag.py
from airflow import DAG from datetime import datetime, timedelta from operators.my_operator import MyOperator from sensors.my_sensor import MySensor from operators.hello_operator import HelloOperator default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2018, 1, 1), 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), } with DAG('customdag', max_active_runs=3, schedule_interval='@once', default_args=default_args) as dag: sens = MySensor( task_id='taskA' ) op = MyOperator( task_id='taskB', my_field='some text' ) hello_task = HelloOperator(task_id='sample-task', name='foo_bar') sens >> op >> hello_task
Esempio plugins/my_airflow_plugin.py
from airflow.plugins_manager import AirflowPlugin from hooks.my_airflow_hook import * from operators.my_airflow_operator import * from utils.my_utils import * class PluginName(AirflowPlugin): name = 'my_airflow_plugin' hooks = [MyHook] operators = [MyOperator] sensors = [MySensor]

I seguenti esempi mostrano ciascuna delle istruzioni di importazione necessarie nei file dei plugin personalizzati.

Esempio hooks/my_airflow_hook.py
from airflow.hooks.base_hook import BaseHook class MyHook(BaseHook): def my_method(self): print("Hello World")
Esempio sensors/my_airflow_sensor.py
from airflow.sensors.base_sensor_operator import BaseSensorOperator from airflow.utils.decorators import apply_defaults class MySensor(BaseSensorOperator): @apply_defaults def __init__(self, *args, **kwargs): super(MySensor, self).__init__(*args, **kwargs) def poke(self, context): return True
Esempio operators/my_airflow_operator.py
from airflow.operators.bash_operator import BaseOperator from airflow.utils.decorators import apply_defaults from hooks.my_hook import MyHook class MyOperator(BaseOperator): @apply_defaults def __init__(self, my_field, *args, **kwargs): super(MyOperator, self).__init__(*args, **kwargs) self.my_field = my_field def execute(self, context): hook = MyHook('my_conn') hook.my_method()
Esempio operators/hello_operator.py
from airflow.models.baseoperator import BaseOperator from airflow.utils.decorators import apply_defaults class HelloOperator(BaseOperator): @apply_defaults def __init__( self, name: str, **kwargs) -> None: super().__init__(**kwargs) self.name = name def execute(self, context): message = "Hello {}".format(self.name) print(message) return message

Segui i passaggi descritti in Testare i plug-in personalizzati utilizzando l'utilità CLI di Amazon MWAA e quindi Creazione di un file plugins.zip per comprimere i contenuti all'interno della directory. plugins Ad esempio, cd plugins.

Creazione di un file plugins.zip

I passaggi seguenti descrivono i passaggi consigliati per creare un file plugins.zip localmente.

Fase uno: testare i plugin personalizzati utilizzando l'utilità CLI di Amazon MWAA

  • L'utilità CLI (Command Line Interface) replica localmente un ambiente Amazon Managed Workflows for Apache Airflow.

  • La CLI crea localmente un'immagine del contenitore Docker simile a un'immagine di produzione Amazon MWAA. Ciò consente di eseguire un ambiente Apache Airflow locale per sviluppare e testare DAG, plug-in personalizzati e dipendenze prima della distribuzione su Amazon MWAA.

  • Per eseguire la CLI, vedi on. aws-mwaa-local-runner GitHub

Fase due: creare il file plugins.zip

È possibile utilizzare un'utilità di archiviazione ZIP integrata o qualsiasi altra utilità ZIP (come 7zip) per creare un file.zip.

Nota

L'utilità zip integrata per il sistema operativo Windows può aggiungere sottocartelle quando si crea un file con estensione zip. Ti consigliamo di verificare il contenuto del file plugins.zip prima di caricarlo nel tuo bucket Amazon S3 per assicurarti che non siano state aggiunte altre directory.

  1. Cambia le directory nella cartella locale dei plugin Airflow. Per esempio:

    myproject$ cd plugins
  2. Esegui il comando seguente per assicurarti che i contenuti abbiano autorizzazioni eseguibili (solo macOS e Linux).

    plugins$ chmod -R 755 .
  3. Comprimi il contenuto all'interno della cartellaplugins.

    plugins$ zip -r plugins.zip .

Caricamento plugins.zip su Amazon S3

Puoi utilizzare la console Amazon S3 o il AWS Command Line Interface (AWS CLI) per caricare un plugins.zip file nel tuo bucket Amazon S3.

Utilizzo di AWS CLI

AWS Command Line Interface (AWS CLI) è uno strumento open source che consente di interagire con i servizi AWS utilizzando i comandi nella shell a riga di comando. Per completare i passaggi indicati in questa pagina, è necessario quanto segue:

Per caricare utilizzando il AWS CLI
  1. Nel prompt dei comandi, accedi alla directory in cui è archiviato il plugins.zip file. Per esempio:

    cd plugins
  2. Usa il seguente comando per elencare tutti i tuoi bucket Amazon S3.

    aws s3 ls
  3. Usa il seguente comando per elencare i file e le cartelle nel bucket Amazon S3 per il tuo ambiente.

    aws s3 ls s3://YOUR_S3_BUCKET_NAME
  4. Usa il seguente comando per caricare il plugins.zip file nel bucket Amazon S3 per il tuo ambiente.

    aws s3 cp plugins.zip s3://YOUR_S3_BUCKET_NAME/plugins.zip

Utilizzo della console Amazon S3

La console Amazon S3 è un'interfaccia utente basata sul Web che consente di creare e gestire le risorse nel bucket Amazon S3.

Per caricare utilizzando la console Amazon S3
  1. Apri la pagina Ambienti sulla console Amazon MWAA.

  2. Scegli un ambiente.

  3. Seleziona il link del bucket S3 nel codice DAG nel riquadro S3 per aprire il bucket di archiviazione sulla console Amazon S3.

  4. Scegli Carica.

  5. Scegli Aggiungi file.

  6. Seleziona la copia locale del tuoplugins.zip, scegli Carica.

Installazione di plugin personalizzati nel tuo ambiente

Questa sezione descrive come installare i plugin personalizzati caricati nel bucket Amazon S3 specificando il percorso del file plugins.zip e specificando la versione del file plugins.zip ogni volta che il file zip viene aggiornato.

Specificazione del percorso plugins.zip sulla console Amazon MWAA (la prima volta)

Se è la prima volta che carichi un plugins.zip file nel tuo bucket Amazon S3, devi anche specificare il percorso del file sulla console Amazon MWAA. Devi completare questo passaggio solo una volta.

  1. Apri la pagina Ambienti sulla console Amazon MWAA.

  2. Scegli un ambiente.

  3. Scegli Modifica.

  4. Nel riquadro del codice DAG di Amazon S3, scegli Browse S3 accanto al file Plugins (campo opzionale).

  5. Seleziona il plugins.zip file nel tuo bucket Amazon S3.

  6. Scegliere Choose (Scegli).

  7. Scegli Avanti, Aggiorna ambiente.

Specificazione della plugins.zip versione sulla console Amazon MWAA

È necessario specificare la versione del plugins.zip file sulla console Amazon MWAA ogni volta che si carica una nuova versione del file plugins.zip nel bucket Amazon S3.

  1. Apri la pagina Ambienti sulla console Amazon MWAA.

  2. Scegli un ambiente.

  3. Scegli Modifica.

  4. Nel riquadro del codice DAG di Amazon S3, scegli plugins.zip una versione nell'elenco a discesa.

  5. Seleziona Avanti.

Esempi di casi d'uso per plugins.zip

Fasi successive

  • Testa i tuoi DAG, i plugin personalizzati e le dipendenze Python localmente usando on. aws-mwaa-local-runner GitHub