Como instalar plug-ins personalizados - Amazon Managed Workflows for Apache Airflow

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Como instalar plug-ins personalizados

O Amazon Managed Workflows for Apache Airflow oferece suporte ao gerenciador de plug-ins integrado do Apache Airflow, permitindo o uso de operadores, hooks, sensores ou interfaces personalizados do Apache Airflow. Esta página descreve as etapas para instalar os plug-ins personalizados do Apache Airflow em seu MWAA ambiente Amazon usando um plugins.zip arquivo.

Pré-requisitos

Você precisará do seguinte antes de concluir as etapas nesta página.

  • Permissões — Seu AWS administrador deve ter concedido acesso à política de controle de mazonMWAAFull ConsoleAccess acesso A para seu ambiente pelo administrador. Além disso, seu MWAA ambiente Amazon deve ser autorizado pela sua função de execução a acessar os AWS recursos usados pelo seu ambiente.

  • Acesso: se você precisar de acesso a repositórios públicos para instalar dependências diretamente no servidor web, seu ambiente deverá ser configurado com acesso ao servidor web de rede pública. Para obter mais informações, consulte Modos de acesso do Apache Airflow.

  • Configuração do Amazon S3 — O bucket do Amazon S3 usado para armazenar DAGs seus plug-ins plugins.zip personalizados e dependências do Python deve ser configurado com acesso público requirements.txt bloqueado e controle de versão ativado.

Como funciona

Para executar plug-ins personalizados em seu ambiente, você deve fazer três coisas:

  1. Crie um arquivo plugins.zip localmente.

  2. Faça upload do arquivo plugins.zip local para seu bucket no Amazon S3.

  3. Especifique a versão desse arquivo no campo Arquivo de plug-ins no MWAA console da Amazon.

nota

Se esta for a primeira vez que você carrega um plugins.zip para o seu bucket do Amazon S3, você também precisa especificar o caminho para o arquivo no console da AmazonMWAA. Você só precisa concluir esta etapa uma vez.

Quando usar os plug-ins

Os plug-ins são necessários somente para estender a interface de usuário do Apache Airflow, conforme descrito na documentação do Apache Airflow. Operadores personalizados podem ser colocados diretamente na /dags pasta ao lado DAG do seu código.

Se você precisar criar suas próprias integrações com sistemas externos, coloque-as na dags pasta/ou em uma subpasta dentro dela, mas não na plugins.zip pasta. No Apache Airflow 2.x, os plug-ins são usados principalmente para estender a interface do usuário.

Da mesma forma, outras dependências não devem ser inseridas. plugins.zip Em vez disso, eles podem ser armazenados em um local na /dags pasta Amazon S3, onde serão sincronizados com cada MWAA contêiner da Amazon antes do início do Apache Airflow.

nota

Qualquer arquivo na /dags pasta ou plugins.zip que não defina explicitamente um DAG objeto do Apache Airflow deve ser listado em um arquivo. .airflowignore

Visão geral dos plug-ins personalizados

O gerenciador de plug-ins embutido do Apache Airflow pode integrar atributos externos ao núcleo simplesmente soltando arquivos em uma pasta $AIRFLOW_HOME/plugins. Ele permite que você use operadores, hooks, sensores ou interfaces personalizados do Apache Airflow. A seção a seguir fornece um exemplo de estruturas de diretórios simples e aninhadas em um ambiente de desenvolvimento local e as instruções de importação resultantes, que determinam a estrutura de diretórios em um plugins.zip.

Diretório de plug-ins personalizados e limites de tamanho

O Apache Airflow Scheduler e os Workers procuram plug-ins personalizados durante a inicialização no contêiner AWS Fargate gerenciado para seu ambiente em. /usr/local/airflow/plugins/*

  • Estrutura de diretório. A estrutura do diretório (em /*) é baseada no conteúdo do seu arquivo plugins.zip. Por exemplo, se seu plugins.zip contiver o diretório operators como um diretório de nível superior, o diretório será extraído para /usr/local/airflow/plugins/operators em seu ambiente.

  • Limites de tamanho. Recomendamos um arquivo plugins.zip com menos de 1 GB. Quanto maior o tamanho de um arquivo plugins.zip, maior o tempo de inicialização em um ambiente. Embora a Amazon MWAA não limite explicitamente o tamanho de um plugins.zip arquivo, se as dependências não puderem ser instaladas em dez minutos, o serviço Fargate atingirá o tempo limite e tentará reverter o ambiente para um estado estável.

nota

Para ambientes que usam o Apache Airflow v1.10.12 ou o Apache Airflow v2.0.2, a MWAA Amazon limita o tráfego de saída no servidor web Apache Airflow e não permite que você instale plug-ins nem dependências do Python diretamente no servidor web. A partir do Apache Airflow v2.2.2, a MWAA Amazon pode instalar plug-ins e dependências diretamente no servidor web.

Exemplos de plug-ins personalizados

A seção a seguir usa um exemplo de código no Guia de referência do Apache Airflow para mostrar como estruturar seu ambiente de desenvolvimento local.

Exemplo de uso de uma estrutura de diretórios simples em plugins.zip

Apache Airflow v2

O exemplo a seguir mostra um arquivo plugins.zip com uma estrutura de diretório simples para o Apache Airflow v2.

exemplo diretório plano com PythonVirtualenvOperator plugins.zip

O exemplo a seguir mostra a árvore de nível superior de um arquivo plugins.zip para o plug-in PythonVirtualenvOperator personalizado emCriação de um plug-in personalizado para o Apache Airflow PythonVirtualenvOperator.

├── virtual_python_plugin.py
exemplo plugins/virtual_python_plugin.py

O exemplo a seguir mostra o plug-in PythonVirtualenvOperator personalizado.

""" Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. """ from airflow.plugins_manager import AirflowPlugin import airflow.utils.python_virtualenv from typing import List def _generate_virtualenv_cmd(tmp_dir: str, python_bin: str, system_site_packages: bool) -> List[str]: cmd = ['python3','/usr/local/airflow/.local/lib/python3.7/site-packages/virtualenv', tmp_dir] if system_site_packages: cmd.append('--system-site-packages') if python_bin is not None: cmd.append(f'--python={python_bin}') return cmd airflow.utils.python_virtualenv._generate_virtualenv_cmd=_generate_virtualenv_cmd class VirtualPythonPlugin(AirflowPlugin): name = 'virtual_python_plugin'
Apache Airflow v1

O exemplo a seguir mostra um arquivo plugins.zip com uma estrutura de diretório simples para o Apache Airflow v1.

exemplo diretório plano com PythonVirtualenvOperator plugins.zip

O exemplo a seguir mostra a árvore de nível superior de um arquivo plugins.zip para o plug-in PythonVirtualenvOperator personalizado emCriação de um plug-in personalizado para o Apache Airflow PythonVirtualenvOperator.

├── virtual_python_plugin.py
exemplo plugins/virtual_python_plugin.py

O exemplo a seguir mostra o plug-in PythonVirtualenvOperator personalizado.

from airflow.plugins_manager import AirflowPlugin from airflow.operators.python_operator import PythonVirtualenvOperator def _generate_virtualenv_cmd(self, tmp_dir): cmd = ['python3','/usr/local/airflow/.local/lib/python3.7/site-packages/virtualenv', tmp_dir] if self.system_site_packages: cmd.append('--system-site-packages') if self.python_version is not None: cmd.append('--python=python{}'.format(self.python_version)) return cmd PythonVirtualenvOperator._generate_virtualenv_cmd=_generate_virtualenv_cmd class EnvVarPlugin(AirflowPlugin): name = 'virtual_python_plugin'

Exemplo de uso de uma estrutura de diretórios aninhados em plugins.zip

Apache Airflow v2

O exemplo a seguir mostra um arquivo plugins.zip com diretórios separados para hooks, operators e um diretório sensors para o Apache Airflow v2.

exemplo plugins.zip
__init__.py my_airflow_plugin.py hooks/ |-- __init__.py |-- my_airflow_hook.py operators/ |-- __init__.py |-- my_airflow_operator.py |-- hello_operator.py sensors/ |-- __init__.py |-- my_airflow_sensor.py

O exemplo a seguir mostra as instruções de importação na DAG (DAGspasta) que usa os plug-ins personalizados.

exemplo dags/your_dag.py
from airflow import DAG from datetime import datetime, timedelta from operators.my_airflow_operator import MyOperator from sensors.my_airflow_sensor import MySensor from operators.hello_operator import HelloOperator default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2018, 1, 1), 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), } with DAG('customdag', max_active_runs=3, schedule_interval='@once', default_args=default_args) as dag: sens = MySensor( task_id='taskA' ) op = MyOperator( task_id='taskB', my_field='some text' ) hello_task = HelloOperator(task_id='sample-task', name='foo_bar') sens >> op >> hello_task
exemplo plugins/my_airflow_plugin.py
from airflow.plugins_manager import AirflowPlugin from hooks.my_airflow_hook import * from operators.my_airflow_operator import * class PluginName(AirflowPlugin): name = 'my_airflow_plugin' hooks = [MyHook] operators = [MyOperator] sensors = [MySensor]

Os exemplos a seguir mostram cada uma das instruções de importação necessárias nos arquivos de plug-in personalizados.

exemplo hooks/my_airflow_hook.py
from airflow.hooks.base import BaseHook class MyHook(BaseHook): def my_method(self): print("Hello World")
exemplo sensors/my_airflow_sensor.py
from airflow.sensors.base import BaseSensorOperator from airflow.utils.decorators import apply_defaults class MySensor(BaseSensorOperator): @apply_defaults def __init__(self, *args, **kwargs): super(MySensor, self).__init__(*args, **kwargs) def poke(self, context): return True
exemplo operators/my_airflow_operator.py
from airflow.operators.bash import BaseOperator from airflow.utils.decorators import apply_defaults from hooks.my_airflow_hook import MyHook class MyOperator(BaseOperator): @apply_defaults def __init__(self, my_field, *args, **kwargs): super(MyOperator, self).__init__(*args, **kwargs) self.my_field = my_field def execute(self, context): hook = MyHook('my_conn') hook.my_method()
exemplo operators/hello_operator.py
from airflow.models.baseoperator import BaseOperator from airflow.utils.decorators import apply_defaults class HelloOperator(BaseOperator): @apply_defaults def __init__( self, name: str, **kwargs) -> None: super().__init__(**kwargs) self.name = name def execute(self, context): message = "Hello {}".format(self.name) print(message) return message

Siga as etapas em Teste de plug-ins personalizados usando o MWAA CLI utilitário Amazon e, em seguida, Criando um arquivo plugins.zip para compactar o conteúdo em seu plugins diretório. Por exemplo, cd plugins.

Apache Airflow v1

O exemplo a seguir mostra um arquivo plugins.zip com diretórios separados para hooks, operators e um diretório sensors para o Apache Airflow v1.10.12.

exemplo plugins.zip
__init__.py my_airflow_plugin.py hooks/ |-- __init__.py |-- my_airflow_hook.py operators/ |-- __init__.py |-- my_airflow_operator.py |-- hello_operator.py sensors/ |-- __init__.py |-- my_airflow_sensor.py

O exemplo a seguir mostra as instruções de importação na DAG (DAGspasta) que usa os plug-ins personalizados.

exemplo dags/your_dag.py
from airflow import DAG from datetime import datetime, timedelta from operators.my_operator import MyOperator from sensors.my_sensor import MySensor from operators.hello_operator import HelloOperator default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2018, 1, 1), 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), } with DAG('customdag', max_active_runs=3, schedule_interval='@once', default_args=default_args) as dag: sens = MySensor( task_id='taskA' ) op = MyOperator( task_id='taskB', my_field='some text' ) hello_task = HelloOperator(task_id='sample-task', name='foo_bar') sens >> op >> hello_task
exemplo plugins/my_airflow_plugin.py
from airflow.plugins_manager import AirflowPlugin from hooks.my_airflow_hook import * from operators.my_airflow_operator import * from utils.my_utils import * class PluginName(AirflowPlugin): name = 'my_airflow_plugin' hooks = [MyHook] operators = [MyOperator] sensors = [MySensor]

Os exemplos a seguir mostram cada uma das instruções de importação necessárias nos arquivos de plug-in personalizados.

exemplo hooks/my_airflow_hook.py
from airflow.hooks.base_hook import BaseHook class MyHook(BaseHook): def my_method(self): print("Hello World")
exemplo sensors/my_airflow_sensor.py
from airflow.sensors.base_sensor_operator import BaseSensorOperator from airflow.utils.decorators import apply_defaults class MySensor(BaseSensorOperator): @apply_defaults def __init__(self, *args, **kwargs): super(MySensor, self).__init__(*args, **kwargs) def poke(self, context): return True
exemplo operators/my_airflow_operator.py
from airflow.operators.bash_operator import BaseOperator from airflow.utils.decorators import apply_defaults from hooks.my_hook import MyHook class MyOperator(BaseOperator): @apply_defaults def __init__(self, my_field, *args, **kwargs): super(MyOperator, self).__init__(*args, **kwargs) self.my_field = my_field def execute(self, context): hook = MyHook('my_conn') hook.my_method()
exemplo operators/hello_operator.py
from airflow.models.baseoperator import BaseOperator from airflow.utils.decorators import apply_defaults class HelloOperator(BaseOperator): @apply_defaults def __init__( self, name: str, **kwargs) -> None: super().__init__(**kwargs) self.name = name def execute(self, context): message = "Hello {}".format(self.name) print(message) return message

Siga as etapas em Teste de plug-ins personalizados usando o MWAA CLI utilitário Amazon e, em seguida, Criando um arquivo plugins.zip para compactar o conteúdo em seu plugins diretório. Por exemplo, cd plugins.

Criação de um arquivo plugins.zip

As etapas a seguir descrevem as etapas que recomendamos para criar um arquivo plugins.zip localmente.

Etapa 1: testar plug-ins personalizados usando o MWAA CLI utilitário Amazon

  • O utilitário de interface de linha de comando (CLI) replica localmente um ambiente Amazon Managed Workflows para Apache Airflow.

  • Ele CLI cria uma imagem de contêiner Docker localmente que é semelhante a uma imagem de MWAA produção da Amazon. Isso permite que você execute um ambiente Apache Airflow local para desenvolver e testar DAGs plug-ins e dependências personalizados antes da implantação na Amazon. MWAA

  • Para executar oCLI, veja o aws-mwaa-local-runnerligado GitHub.

Etapa 2: criar o arquivo plugins.zip

Você pode usar um utilitário de ZIP arquivamento incorporado ou qualquer outro ZIP utilitário (como 7zip) para criar um arquivo.zip.

nota

O utilitário zip integrado para o sistema operacional Windows pode adicionar subpastas quando você cria um arquivo .zip. Recomendamos verificar o conteúdo do arquivo plugins.zip antes de fazer o upload para o bucket do Amazon S3 para garantir que nenhum diretório adicional tenha sido adicionado.

  1. Altere os diretórios para o diretório local de plug-ins do Airflow. Por exemplo:

    myproject$ cd plugins
  2. Execute o comando a seguir para garantir que o conteúdo tenha permissões executáveis (somente macOS e Linux).

    plugins$ chmod -R 755 .
  3. Compacte o conteúdo em sua pasta plugins.

    plugins$ zip -r plugins.zip .

Como fazer upload de plugins.zip para o Amazon S3

Você pode usar o console do Amazon S3 ou o AWS Command Line Interface (AWS CLI) para carregar um plugins.zip arquivo no seu bucket do Amazon S3.

Usando o AWS CLI

O AWS Command Line Interface (AWS CLI) é uma ferramenta de código aberto que permite interagir com AWS serviços usando comandos em seu shell de linha de comando. Para concluir as etapas nesta página, é necessário o seguinte:

Para fazer o upload usando o AWS CLI
  1. No prompt de comando, navegue até o diretório em que seu arquivo plugins.zip está armazenado. Por exemplo:

    cd plugins
  2. Use o comando a seguir para listar todos os seus buckets do Amazon S3.

    aws s3 ls
  3. Use o seguinte comando para listar os arquivos e pastas no bucket do Amazon S3 para seu ambiente.

    aws s3 ls s3://YOUR_S3_BUCKET_NAME
  4. Use o seguinte comando para carregar o plugins.zip arquivo no bucket do Amazon S3 para seu ambiente.

    aws s3 cp plugins.zip s3://YOUR_S3_BUCKET_NAME/plugins.zip

Usar o console do Amazon S3

O console do Amazon S3 é uma interface de usuário baseada na Web que permite criar e gerenciar os recursos no bucket do Amazon S3.

Fazer o upload usando o console do Amazon S3
  1. Abra a página Ambientes no MWAA console da Amazon.

  2. Escolha um ambiente.

  3. Selecione o link do bucket S3 no DAGcódigo no painel S3 para abrir seu bucket de armazenamento no console do Amazon S3.

  4. Escolha Carregar.

  5. Escolha Adicionar arquivo.

  6. Selecione a cópia local do seu plugins.zip e escolha Carregar.

Instalando plug-ins personalizados em seu ambiente

Esta seção descreve como instalar os plug-ins personalizados que você carregou no seu bucket do Amazon S3 especificando o caminho para o arquivo plugins.zip e especificando a versão do arquivo plugins.zip sempre que o arquivo zip for atualizado.

Especificando o plugins.zip caminho para o MWAA console da Amazon (pela primeira vez)

Se esta for a primeira vez que você carrega um plugins.zip para o seu bucket do Amazon S3, você também precisa especificar o caminho para o arquivo no console da AmazonMWAA. Você só precisa concluir esta etapa uma vez.

  1. Abra a página Ambientes no MWAA console da Amazon.

  2. Escolha um ambiente.

  3. Selecione a opção Editar.

  4. No DAGcódigo no painel Amazon S3, escolha Procurar no S3 ao lado do campo Arquivo de plug-ins - opcional.

  5. Selecione o arquivo plugins.zip no bucket do Amazon S3.

  6. Selecione Escolher.

  7. Selecione Avançar, Atualizar ambiente.

Especificando a plugins.zip versão no console da Amazon MWAA

Você precisa especificar a versão do seu plugins.zip arquivo no MWAA console da Amazon sempre que fizer o upload de uma nova versão do seu arquivo plugins.zip no bucket do Amazon S3.

  1. Abra a página Ambientes no MWAA console da Amazon.

  2. Escolha um ambiente.

  3. Selecione a opção Editar.

  4. No DAGcódigo no painel do Amazon S3, escolha uma plugins.zip versão na lista suspensa.

  5. Escolha Próximo.

Exemplos de casos de uso para plugins.zip

Próximas etapas

  • Teste seus DAGs plug-ins personalizados e dependências do Python localmente usando o on. aws-mwaa-local-runner GitHub