Instalación de complementos personalizados - Amazon Managed Workflows para Apache Airflow

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Instalación de complementos personalizados

Amazon Managed Workflows para Apache Airflow es compatible con el administrador de complementos integrado en Apache Airflow, lo que le permite utilizar operadores, enlaces, sensores o interfaces personalizados de Apache Airflow. En esta página se describen los pasos para instalar los complementos personalizados de Apache Airflow en tu MWAA entorno de Amazon mediante un plugins.zip archivo.

Requisitos previos

Para poder llevar a cabo los pasos de esta página, necesitará lo siguiente.

  • Permisos: su administrador debe haber concedido a su AWS cuenta el acceso a la política de control de mazonMWAAFull ConsoleAccess acceso A para su entorno. Además, su rol de ejecución debe permitir que su MWAA entorno de Amazon acceda a los AWS recursos que utiliza su entorno.

  • Acceso: si tiene que acceder a los repositorios públicos para instalar dependencias directamente en el servidor web, su entorno debe estar configurado con acceso a un servidor web de red pública. Para obtener más información, consulte Modos de acceso de Apache Airflow.

  • Configuración de Amazon S3: el bucket de Amazon S3 que se utiliza para almacenar los complementos personalizados y las dependencias de Python requirements.txt debe estar configurado con el acceso público bloqueado y el control de versiones activado. DAGs plugins.zip

Funcionamiento

Para ejecutar complementos personalizados en su entorno, debe hacer tres cosas:

  1. Cree un archivo plugins.zip local.

  2. Cargue el archivo plugins.zip en su bucket de Amazon S3.

  3. Especifica la versión de este archivo en el campo Archivo de complementos de la MWAA consola de Amazon.

nota

Si es la primera vez que subes un archivo plugins.zip a tu bucket de Amazon S3, también tendrás que especificar la ruta al archivo en la MWAA consola de Amazon. Solo necesita realizar este paso una vez.

¿Cuándo usar los complementos

Los complementos solo son necesarios para ampliar la interfaz de usuario de Apache Airflow, tal y como se describe en la documentación de Apache Airflow. Los operadores personalizados se pueden colocar directamente en la /dags carpeta junto con el códigoDAG.

Si necesita crear sus propias integraciones con sistemas externos, colóquelas en la dags carpeta/o en una de sus subcarpetas, pero no en la plugins.zip carpeta. En Apache Airflow 2.x, los complementos se utilizan principalmente para ampliar la interfaz de usuario.

Del mismo modo, no se deben colocar otras dependencias. plugins.zip En su lugar, se pueden almacenar en una ubicación dentro de la /dags carpeta Amazon S3, donde se sincronizarán con cada MWAA contenedor de Amazon antes de que se inicie Apache Airflow.

nota

Cualquier archivo de la /dags carpeta o plugins.zip que no defina explícitamente un DAG objeto de Apache Airflow debe figurar en un archivo. .airflowignore

Información general de los complementos personalizados

El administrador de complementos integrado de Apache Airflow puede integrar características externas en su núcleo simplemente colocando archivos en una carpeta $AIRFLOW_HOME/plugins. Le permite utilizar operadores, enlaces, sensores o interfaces personalizados de Apache Airflow. La siguiente sección proporciona un ejemplo de estructuras de directorios planas y anidadas en un entorno de desarrollo local y las instrucciones de importación resultantes, que determinan la estructura de directorios dentro de un plugins.zip.

Directorio de complementos personalizados y límites de tamaño

Apache Airflow Scheduler y Workers buscan complementos personalizados durante el inicio en el contenedor Fargate AWS administrado para su entorno en. /usr/local/airflow/plugins/*

  • Estructura de directorios. La estructura de directorios (en /*) se basa en el contenido del archivo plugins.zip. Por ejemplo, si su plugins.zip contiene el directorio operators como directorio de nivel superior, el directorio se extraerá a /usr/local/airflow/plugins/operators en su entorno.

  • Límite de tamaño. Se recomienda un archivo plugins.zip de menos de 1 GB. Cuanto mayor sea el tamaño del archivo plugins.zip, mayor será el tiempo de inicio en un entorno. Aunque Amazon MWAA no limita el tamaño de un plugins.zip archivo de forma explícita, si las dependencias no se pueden instalar en diez minutos, el servicio Fargate agotará el tiempo de espera e intentará revertir el entorno a un estado estable.

nota

Para los entornos que utilizan Apache Airflow v1.10.12 o Apache Airflow v2.0.2, MWAA Amazon limita el tráfico saliente en el servidor web Apache Airflow y no permite instalar complementos ni dependencias de Python directamente en el servidor web. A partir de Apache Airflow v2.2.2, Amazon MWAA puede instalar complementos y dependencias directamente en el servidor web.

Ejemplos de complementos personalizados

En la siguiente sección, se utiliza un código de muestra de la guía de referencia de Apache Airflow para mostrar cómo estructurar su entorno de desarrollo local.

Ejemplo de uso de una estructura de directorios plana en plugins.zip

Apache Airflow v2

El siguiente ejemplo muestra un archivo plugins.zip con una estructura de directorios plana para Apache Airflow v2.

ejemplo directorio plano con plugins.zip PythonVirtualenvOperator

El siguiente ejemplo muestra el árbol de nivel superior de un archivo plugins.zip para el complemento PythonVirtualenvOperator personalizado deCreación de un complemento personalizado para Apache Airflow PythonVirtualenvOperator.

├── virtual_python_plugin.py
ejemplo plugins/virtual_python_plugin.py

El siguiente ejemplo muestra el complemento PythonVirtualenvOperator personalizado.

""" Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. """ from airflow.plugins_manager import AirflowPlugin import airflow.utils.python_virtualenv from typing import List def _generate_virtualenv_cmd(tmp_dir: str, python_bin: str, system_site_packages: bool) -> List[str]: cmd = ['python3','/usr/local/airflow/.local/lib/python3.7/site-packages/virtualenv', tmp_dir] if system_site_packages: cmd.append('--system-site-packages') if python_bin is not None: cmd.append(f'--python={python_bin}') return cmd airflow.utils.python_virtualenv._generate_virtualenv_cmd=_generate_virtualenv_cmd class VirtualPythonPlugin(AirflowPlugin): name = 'virtual_python_plugin'
Apache Airflow v1

El siguiente ejemplo muestra un archivo plugins.zip con una estructura de directorios plana para Apache Airflow v1.

ejemplo directorio plano con PythonVirtualenvOperator plugins.zip

El siguiente ejemplo muestra el árbol de nivel superior de un archivo plugins.zip para el complemento PythonVirtualenvOperator personalizado deCreación de un complemento personalizado para Apache Airflow PythonVirtualenvOperator.

├── virtual_python_plugin.py
ejemplo plugins/virtual_python_plugin.py

El siguiente ejemplo muestra el complemento PythonVirtualenvOperator personalizado.

from airflow.plugins_manager import AirflowPlugin from airflow.operators.python_operator import PythonVirtualenvOperator def _generate_virtualenv_cmd(self, tmp_dir): cmd = ['python3','/usr/local/airflow/.local/lib/python3.7/site-packages/virtualenv', tmp_dir] if self.system_site_packages: cmd.append('--system-site-packages') if self.python_version is not None: cmd.append('--python=python{}'.format(self.python_version)) return cmd PythonVirtualenvOperator._generate_virtualenv_cmd=_generate_virtualenv_cmd class EnvVarPlugin(AirflowPlugin): name = 'virtual_python_plugin'

Ejemplo de uso de una estructura de directorios anidada en plugins.zip

Apache Airflow v2

El siguiente ejemplo muestra un archivo plugins.zip con directorios independientes para hooks, operators y un directorio sensors para Apache Airflow v2.

ejemplo plugins.zip
__init__.py my_airflow_plugin.py hooks/ |-- __init__.py |-- my_airflow_hook.py operators/ |-- __init__.py |-- my_airflow_operator.py |-- hello_operator.py sensors/ |-- __init__.py |-- my_airflow_sensor.py

En el siguiente ejemplo, se muestran las instrucciones de importación de la DAG (DAGscarpeta) que utiliza los complementos personalizados.

ejemplo dags/your_dag.py
from airflow import DAG from datetime import datetime, timedelta from operators.my_airflow_operator import MyOperator from sensors.my_airflow_sensor import MySensor from operators.hello_operator import HelloOperator default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2018, 1, 1), 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), } with DAG('customdag', max_active_runs=3, schedule_interval='@once', default_args=default_args) as dag: sens = MySensor( task_id='taskA' ) op = MyOperator( task_id='taskB', my_field='some text' ) hello_task = HelloOperator(task_id='sample-task', name='foo_bar') sens >> op >> hello_task
ejemplo plugins/my_airflow_plugin.py
from airflow.plugins_manager import AirflowPlugin from hooks.my_airflow_hook import * from operators.my_airflow_operator import * class PluginName(AirflowPlugin): name = 'my_airflow_plugin' hooks = [MyHook] operators = [MyOperator] sensors = [MySensor]

Los siguientes ejemplos muestran cada una de las instrucciones de importación necesarias en los archivos de complementos personalizados.

ejemplo hooks/my_airflow_hook.py
from airflow.hooks.base import BaseHook class MyHook(BaseHook): def my_method(self): print("Hello World")
ejemplo sensors/my_airflow_sensor.py
from airflow.sensors.base import BaseSensorOperator from airflow.utils.decorators import apply_defaults class MySensor(BaseSensorOperator): @apply_defaults def __init__(self, *args, **kwargs): super(MySensor, self).__init__(*args, **kwargs) def poke(self, context): return True
ejemplo operators/my_airflow_operator.py
from airflow.operators.bash import BaseOperator from airflow.utils.decorators import apply_defaults from hooks.my_airflow_hook import MyHook class MyOperator(BaseOperator): @apply_defaults def __init__(self, my_field, *args, **kwargs): super(MyOperator, self).__init__(*args, **kwargs) self.my_field = my_field def execute(self, context): hook = MyHook('my_conn') hook.my_method()
ejemplo operators/hello_operator.py
from airflow.models.baseoperator import BaseOperator from airflow.utils.decorators import apply_defaults class HelloOperator(BaseOperator): @apply_defaults def __init__( self, name: str, **kwargs) -> None: super().__init__(**kwargs) self.name = name def execute(self, context): message = "Hello {}".format(self.name) print(message) return message

Sigue los pasos que se indican en Probar complementos personalizados con la MWAA CLI utilidad Amazon y, a continuación, en Crear un archivo plugins.zip para comprimir el contenido de tu plugins directorio. Por ejemplo, cd plugins.

Apache Airflow v1

El siguiente ejemplo muestra un archivo plugins.zip con directorios independientes para hooks, operators y un directorio sensors para Apache Airflow v1.10.12.

ejemplo plugins.zip
__init__.py my_airflow_plugin.py hooks/ |-- __init__.py |-- my_airflow_hook.py operators/ |-- __init__.py |-- my_airflow_operator.py |-- hello_operator.py sensors/ |-- __init__.py |-- my_airflow_sensor.py

El siguiente ejemplo muestra las instrucciones de importación en la DAG (DAGscarpeta) que usa los complementos personalizados.

ejemplo dags/your_dag.py
from airflow import DAG from datetime import datetime, timedelta from operators.my_operator import MyOperator from sensors.my_sensor import MySensor from operators.hello_operator import HelloOperator default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2018, 1, 1), 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), } with DAG('customdag', max_active_runs=3, schedule_interval='@once', default_args=default_args) as dag: sens = MySensor( task_id='taskA' ) op = MyOperator( task_id='taskB', my_field='some text' ) hello_task = HelloOperator(task_id='sample-task', name='foo_bar') sens >> op >> hello_task
ejemplo plugins/my_airflow_plugin.py
from airflow.plugins_manager import AirflowPlugin from hooks.my_airflow_hook import * from operators.my_airflow_operator import * from utils.my_utils import * class PluginName(AirflowPlugin): name = 'my_airflow_plugin' hooks = [MyHook] operators = [MyOperator] sensors = [MySensor]

Los siguientes ejemplos muestran cada una de las instrucciones de importación necesarias en los archivos de complementos personalizados.

ejemplo hooks/my_airflow_hook.py
from airflow.hooks.base_hook import BaseHook class MyHook(BaseHook): def my_method(self): print("Hello World")
ejemplo sensors/my_airflow_sensor.py
from airflow.sensors.base_sensor_operator import BaseSensorOperator from airflow.utils.decorators import apply_defaults class MySensor(BaseSensorOperator): @apply_defaults def __init__(self, *args, **kwargs): super(MySensor, self).__init__(*args, **kwargs) def poke(self, context): return True
ejemplo operators/my_airflow_operator.py
from airflow.operators.bash_operator import BaseOperator from airflow.utils.decorators import apply_defaults from hooks.my_hook import MyHook class MyOperator(BaseOperator): @apply_defaults def __init__(self, my_field, *args, **kwargs): super(MyOperator, self).__init__(*args, **kwargs) self.my_field = my_field def execute(self, context): hook = MyHook('my_conn') hook.my_method()
ejemplo operators/hello_operator.py
from airflow.models.baseoperator import BaseOperator from airflow.utils.decorators import apply_defaults class HelloOperator(BaseOperator): @apply_defaults def __init__( self, name: str, **kwargs) -> None: super().__init__(**kwargs) self.name = name def execute(self, context): message = "Hello {}".format(self.name) print(message) return message

Sigue los pasos que se indican en Probar complementos personalizados con la MWAA CLI utilidad Amazon y, a continuación, en Crear un archivo plugins.zip para comprimir el contenido de tu plugins directorio. Por ejemplo, cd plugins.

Crear un archivo plugins.zip

En los pasos siguientes se describen los pasos que recomendamos para crear un archivo plugins.zip de forma local.

Paso uno: prueba complementos personalizados con la MWAA CLI utilidad Amazon

  • La utilidad interface de línea de comandos (CLI) replica localmente un entorno de Amazon Managed Workflows para Apache Airflow.

  • CLICrea una imagen de contenedor Docker localmente similar a una imagen de MWAA producción de Amazon. Esto te permite ejecutar un entorno local de Apache Airflow para desarrollar y probar DAGs complementos personalizados y dependencias antes de implementarlos en Amazon. MWAA

  • Para ejecutar elCLI, consulte la aws-mwaa-local-runneropción. GitHub

Paso dos: crear el archivo plugins.zip

Puede utilizar una utilidad de ZIP archivado integrada o cualquier otra ZIP utilidad (como 7zip) para crear un archivo.zip.

nota

La utilidad zip integrada para el sistema operativo Windows puede agregar subcarpetas al crear un archivo .zip. Le recomendamos comprobar el contenido del archivo plugins.zip antes de subirlo a su bucket de Amazon S3 para asegurarse de que no se hayan añadido directorios adicionales.

  1. Cambie los directorios a su directorio local de complementos de Airflow. Por ejemplo:

    myproject$ cd plugins
  2. Ejecute el siguiente comando para asegurarse de que el contenido tiene permisos de ejecución (solo para macOS y Linux).

    plugins$ chmod -R 755 .
  3. Comprima el contenido de la carpeta plugins.

    plugins$ zip -r plugins.zip .

Cómo cargar plugins.zip a Amazon S3

Puede usar la consola Amazon S3 o AWS Command Line Interface (AWS CLI) para cargar un plugins.zip archivo en su bucket de Amazon S3.

Usando el AWS CLI

The AWS Command Line Interface (AWS CLI) es una herramienta de código abierto que le permite interactuar con AWS los servicios mediante comandos del shell de la línea de comandos. Para completar los pasos de esta página, necesita lo siguiente:

Para cargar mediante el AWS CLI
  1. En el símbolo del sistema, vaya hasta el directorio en el que está almacenado el archivo plugins.zip. Por ejemplo:

    cd plugins
  2. Use el siguiente comando para obtener una lista de todos los buckets de Amazon S3.

    aws s3 ls
  3. Utilice el comando siguiente para enumerar los archivos y las carpetas del bucket de Amazon S3 para su entorno.

    aws s3 ls s3://YOUR_S3_BUCKET_NAME
  4. Utilice el siguiente comando para cargar el archivo plugins.zip en el bucket de Amazon S3 para su entorno.

    aws s3 cp plugins.zip s3://YOUR_S3_BUCKET_NAME/plugins.zip

Uso de la consola de Amazon S3

La consola de Amazon S3 es una interfaz de usuario basada en la web que le permite crear y administrar los recursos de su bucket de Amazon S3.

Carga del contenido usando la consola de Amazon S3
  1. Abre la página Entornos en la MWAA consola de Amazon.

  2. Seleccione un entorno.

  3. Seleccione el enlace del depósito de S3 en el panel de DAGcódigos de S3 para abrir el depósito de almacenamiento en la consola de Amazon S3.

  4. Seleccione Cargar.

  5. Elija Añadir archivo.

  6. Seleccione la copia local de su plugins.zip, elija Cargar.

Instalación de complementos personalizados en su entorno

En esta sección se describe cómo instalar los complementos personalizados que ha cargado en su bucket de Amazon S3 especificando la ruta al archivo plugins.zip y especificando la versión del archivo plugins.zip cada vez que se actualiza el archivo zip.

Especificar la ruta a plugins.zip en la MWAA consola de Amazon (la primera vez)

Si es la primera vez que subes un archivo plugins.zip a tu bucket de Amazon S3, también tendrás que especificar la ruta al archivo en la MWAA consola de Amazon. Solo necesita realizar este paso una vez.

  1. Abre la página Entornos en la MWAA consola de Amazon.

  2. Seleccione un entorno.

  3. Elija Editar.

  4. En el panel DAGCódigo en Amazon S3, selecciona Browse S3 junto al campo Archivo de complementos (opcional).

  5. Seleccione el archivo plugins.zip en su bucket de Amazon S3.

  6. Seleccione Elegir.

  7. Seleccione Siguiente, Actualizar entorno.

Especificar la plugins.zip versión en la MWAA consola de Amazon

Debes especificar la versión de tu plugins.zip archivo en la MWAA consola de Amazon cada vez que subas una nueva versión tuya a tu plugins.zip bucket de Amazon S3.

  1. Abre la página Entornos en la MWAA consola de Amazon.

  2. Seleccione un entorno.

  3. Elija Editar.

  4. En el panel DAGCódigo en Amazon S3, selecciona una plugins.zip versión de la lista desplegable.

  5. Elija Next (Siguiente).

Ejemplos de casos de uso de plugins.zip

Siguientes pasos

  • Pruebe sus DAGs complementos personalizados y sus dependencias de Python de forma local con la función aws-mwaa-local-runneron GitHub.