本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
安装自定义插件
Amazon MWAA 支持 Apache Airflow 的内置插件管理器,允许您使用自定义 Apache Airflow 运算符、挂钩、传感器或接口。本页介绍使用文件在您的亚马逊MWAA环境中安装 Apache Airflow 自定义插件的plugins.zip
步骤。
先决条件
在完成本页上的步骤之前,您需要具备以下条件。
工作方式
要在环境中运行自定义插件,您必须做三件事:
-
在本地创建 plugins.zip
文件。
-
将 plugins.zip
文件上传到 Amazon S3 中的存储桶。
-
在 Amazon MWAA 控制台的 “插件文件” 字段中指定此文件的版本。
如果这是您首次将上传plugins.zip
到 Amazon S3 存储桶,则还需要在亚马逊MWAA控制台上指定文件路径。您只需要完成此步骤一次。
何时使用插件
正如 Apache Airflow 文档中所述,只有扩展 Apache Airflow 用户界面时才需要插件。自定义运算符可以直接放在DAG
代码旁边的/dags
文件夹中。
如果您需要创建自己的与外部系统的集成,请将其放在/ dags
文件夹或其中的子文件夹中,而不是放在该plugins.zip
文件夹中。在 Apache Airflow 2.x 中,插件主要用于扩展用户界面。
同样,不应将其他依赖项置于其中plugins.zip
。相反,它们可以存储在 Amazon S3 /dags
文件夹下的某个位置,在 Apache Airflow 启动之前,它们将在那里同步到每个亚马逊MWAA容器。
/dags
文件夹中plugins.zip
或其中未明确定义 Apache Airflow DAG 对象的任何文件都必须列在文件中。.airflowignore
自定义插件概述
Apache Airflow 的内置插件管理器只需将文件拖放到 $AIRFLOW_HOME/plugins
文件夹中即可将外部功能集成到其核心中。它允许您使用自定义 Apache Airflow 操作符、挂钩、传感器或接口。下一节提供了本地开发环境中平面和嵌套目录结构的示例,以及生成的 import 语句,这些语句决定了 plugins.zip 中的目录结构。
自定义插件目录和大小限制
在启动期间,Apache Airflow Scheduler 和 W orkers 会在您的环境的托管的 AWS Fargate 容器上查找自定义插件,网址为。/usr/local/airflow/plugins/*
-
目录结构。目录结构(在 /*
中)基于您 plugins.zip
文件的内容。例如,如果 plugins.zip
包含 operators
目录作为顶级目录,则该目录将被解压缩到环境的 /usr/local/airflow/plugins/operators
中。
-
大小限制。我们建议使用小于 1 GB 的 plugins.zip
文件。plugins.zip
文件大小越大,环境的启动时间就越长。尽管 Amazon MWAA 没有明确限制plugins.zip
文件的大小,但如果无法在十分钟内安装依赖项,Fargate 服务将超时并尝试将环境回滚到稳定状态。
对于使用 Apache Airflow v1.10.12 或 Apache Airflow v2.0.2 的环境,MWAA亚马逊会限制 Apache Airflow 网络服务器上的出站流量,并且不允许您直接在 Web 服务器上安装插件或 Python 依赖项。从 Apache Airflow v2.2.2 开始,MWAA亚马逊可以直接在网络服务器上安装插件和依赖项。
自定义插件示例
下一节使用《Apache Airflow 参考指南》中的示例代码来展示如何构建本地开发环境。
在 plugins.zip 中使用平面目录结构的示例
- Apache Airflow v2
-
以下示例显示了 Apache Airflow v2 中一个采用扁平目录结构的 plugins.zip
文件。
例 plugins/virtual_python_plugin.py
以下示例显示了 PythonVirtualenvOperator 自定义插件。
"""
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software is furnished to do so.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
"""
from airflow.plugins_manager import AirflowPlugin
import airflow.utils.python_virtualenv
from typing import List
def _generate_virtualenv_cmd(tmp_dir: str, python_bin: str, system_site_packages: bool) -> List[str]:
cmd = ['python3','/usr/local/airflow/.local/lib/python3.7/site-packages/virtualenv', tmp_dir]
if system_site_packages:
cmd.append('--system-site-packages')
if python_bin is not None:
cmd.append(f'--python={python_bin}')
return cmd
airflow.utils.python_virtualenv._generate_virtualenv_cmd=_generate_virtualenv_cmd
class VirtualPythonPlugin(AirflowPlugin):
name = 'virtual_python_plugin'
- Apache Airflow v1
-
以下示例显示了 Apache Airflow v1 中一个采用扁平目录结构的 plugins.zip
文件。
例 plugins/virtual_python_plugin.py
以下示例显示了 PythonVirtualenvOperator 自定义插件。
from airflow.plugins_manager import AirflowPlugin
from airflow.operators.python_operator import PythonVirtualenvOperator
def _generate_virtualenv_cmd(self, tmp_dir):
cmd = ['python3','/usr/local/airflow/.local/lib/python3.7/site-packages/virtualenv', tmp_dir]
if self.system_site_packages:
cmd.append('--system-site-packages')
if self.python_version is not None:
cmd.append('--python=python{}'.format(self.python_version))
return cmd
PythonVirtualenvOperator._generate_virtualenv_cmd=_generate_virtualenv_cmd
class EnvVarPlugin(AirflowPlugin):
name = 'virtual_python_plugin'
在 plugins.zip 中使用平面目录结构的示例
- Apache Airflow v2
-
以下示例显示了一个 plugins.zip
文件,其中包含 hooks
、operators
的单独目录和 Apache Airflow v2 的 sensors
目录。
例 Plugins.zip
__init__.py
my_airflow_plugin.py
hooks/
|-- __init__.py
|-- my_airflow_hook.py
operators/
|-- __init__.py
|-- my_airflow_operator.py
|-- hello_operator.py
sensors/
|-- __init__.py
|-- my_airflow_sensor.py
以下示例显示了DAG(DAGs文件夹)中使用自定义插件的导入语句。
例 dags/your_dag.py
from airflow import DAG
from datetime import datetime, timedelta
from operators.my_airflow_operator import MyOperator
from sensors.my_airflow_sensor import MySensor
from operators.hello_operator import HelloOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2018, 1, 1),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
with DAG('customdag',
max_active_runs=3,
schedule_interval='@once',
default_args=default_args) as dag:
sens = MySensor(
task_id='taskA'
)
op = MyOperator(
task_id='taskB',
my_field='some text'
)
hello_task = HelloOperator(task_id='sample-task', name='foo_bar')
sens >> op >> hello_task
例 plugins/my_airflow_plugin.py
from airflow.plugins_manager import AirflowPlugin
from hooks.my_airflow_hook import *
from operators.my_airflow_operator import *
class PluginName(AirflowPlugin):
name = 'my_airflow_plugin'
hooks = [MyHook]
operators = [MyOperator]
sensors = [MySensor]
以下示例显示了自定义插件文件中所需的每条导入语句。
例 hooks/my_airflow_hook.py
from airflow.hooks.base import BaseHook
class MyHook(BaseHook):
def my_method(self):
print("Hello World")
例 sensors/my_airflow_sensor.py
from airflow.sensors.base import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
class MySensor(BaseSensorOperator):
@apply_defaults
def __init__(self,
*args,
**kwargs):
super(MySensor, self).__init__(*args, **kwargs)
def poke(self, context):
return True
例 operators/my_airflow_operator.py
from airflow.operators.bash import BaseOperator
from airflow.utils.decorators import apply_defaults
from hooks.my_airflow_hook import MyHook
class MyOperator(BaseOperator):
@apply_defaults
def __init__(self,
my_field,
*args,
**kwargs):
super(MyOperator, self).__init__(*args, **kwargs)
self.my_field = my_field
def execute(self, context):
hook = MyHook('my_conn')
hook.my_method()
例 operators/hello_operator.py
from airflow.models.baseoperator import BaseOperator
from airflow.utils.decorators import apply_defaults
class HelloOperator(BaseOperator):
@apply_defaults
def __init__(
self,
name: str,
**kwargs) -> None:
super().__init__(**kwargs)
self.name = name
def execute(self, context):
message = "Hello {}".format(self.name)
print(message)
return message
按照使用 Amazon MWAA CLI 实用程序测试自定义插件中的步骤进行操作,然后创建 plugins.zip 文件以压缩plugins
目录中的内容。例如,cd plugins
。
- Apache Airflow v1
-
以下示例显示了一个 plugins.zip
文件,其中包含 hooks
、operators
的单独目录和 Apache Airflow v1.10.12 的 sensors
目录。
例 Plugins.zip
__init__.py
my_airflow_plugin.py
hooks/
|-- __init__.py
|-- my_airflow_hook.py
operators/
|-- __init__.py
|-- my_airflow_operator.py
|-- hello_operator.py
sensors/
|-- __init__.py
|-- my_airflow_sensor.py
以下示例显示了DAG(DAGs文件夹)中使用自定义插件的导入语句。
例 dags/your_dag.py
from airflow import DAG
from datetime import datetime, timedelta
from operators.my_operator import MyOperator
from sensors.my_sensor import MySensor
from operators.hello_operator import HelloOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2018, 1, 1),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
with DAG('customdag',
max_active_runs=3,
schedule_interval='@once',
default_args=default_args) as dag:
sens = MySensor(
task_id='taskA'
)
op = MyOperator(
task_id='taskB',
my_field='some text'
)
hello_task = HelloOperator(task_id='sample-task', name='foo_bar')
sens >> op >> hello_task
例 plugins/my_airflow_plugin.py
from airflow.plugins_manager import AirflowPlugin
from hooks.my_airflow_hook import *
from operators.my_airflow_operator import *
from utils.my_utils import *
class PluginName(AirflowPlugin):
name = 'my_airflow_plugin'
hooks = [MyHook]
operators = [MyOperator]
sensors = [MySensor]
以下示例显示了自定义插件文件中所需的每条导入语句。
例 hooks/my_airflow_hook.py
from airflow.hooks.base_hook import BaseHook
class MyHook(BaseHook):
def my_method(self):
print("Hello World")
例 sensors/my_airflow_sensor.py
from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
class MySensor(BaseSensorOperator):
@apply_defaults
def __init__(self,
*args,
**kwargs):
super(MySensor, self).__init__(*args, **kwargs)
def poke(self, context):
return True
例 operators/my_airflow_operator.py
from airflow.operators.bash_operator import BaseOperator
from airflow.utils.decorators import apply_defaults
from hooks.my_hook import MyHook
class MyOperator(BaseOperator):
@apply_defaults
def __init__(self,
my_field,
*args,
**kwargs):
super(MyOperator, self).__init__(*args, **kwargs)
self.my_field = my_field
def execute(self, context):
hook = MyHook('my_conn')
hook.my_method()
例 operators/hello_operator.py
from airflow.models.baseoperator import BaseOperator
from airflow.utils.decorators import apply_defaults
class HelloOperator(BaseOperator):
@apply_defaults
def __init__(
self,
name: str,
**kwargs) -> None:
super().__init__(**kwargs)
self.name = name
def execute(self, context):
message = "Hello {}".format(self.name)
print(message)
return message
按照使用 Amazon MWAA CLI 实用程序测试自定义插件中的步骤进行操作,然后创建 plugins.zip 文件以压缩plugins
目录中的内容。例如,cd plugins
。
创建 plugins.zip 文件
以下步骤描述了我们建议在本地创建 plugins.zip 文件的步骤。
第一步:使用 Amazon MWAA CLI 实用工具测试自定义插件
-
命令行界面 (CLI) 实用程序在本地复制适用于 Apache Airflow 的亚马逊托管工作流程。
-
在本地CLI构建 Docker 容器镜像,该镜像类似于亚马逊MWAA生产镜像。这允许您在部署到亚马逊之前运行本地 Apache Airflow 环境来开发和测试DAGs自定义插件和依赖项。MWAA
-
要运行CLI,请参阅 aws-mwaa-local-runneron GitHub。
步骤 2:创建 plugins.zip 文件
您可以使用内置的ZIP存档实用程序或任何其他ZIP实用程序(例如 7zip)来创建.zip 文件。
当您创建.zip 文件时,Windows 操作系统的内置 zip 实用工具可能会添加子文件夹。我们建议您验证 plugins.zip 文件的内容,然后再上传到 Amazon S3 存储桶,以确保没有添加其他目录。
-
将目录更改为本地 Airflow 插件目录。例如:
myproject$ cd plugins
-
运行以下命令以确保内容具有可执行权限(仅限 macOS 和 Linux)。
plugins$ chmod -R 755 .
-
将内容压缩到 plugins
文件夹中。
plugins$ zip -r plugins.zip .
上传 plugins.zip
到 Amazon S3
您可以使用 Amazon S3 控制台或 AWS Command Line Interface (AWS CLI) 将plugins.zip
文件上传到您的 Amazon S3 存储桶。
使用 AWS CLI
AWS Command Line Interface (AWS CLI) 是一个开源工具,可让您使用命令行 shell 中的命令与 AWS 服务进行交互。要完成本节中的步骤,您需要以下满足以下条件:
要使用上传 AWS CLI
-
在命令提示符下,导航到存储 plugins.zip
文件的目录。例如:
cd plugins
-
以下示例列出所有 Amazon S3 存储桶。
aws s3 ls
-
使用以下命令列出 Amazon S3 存储桶中适合环境的文件和文件夹。
aws s3 ls s3://YOUR_S3_BUCKET_NAME
-
使用以下命令将 plugins.zip
文件上传到环境的 Amazon S3 存储桶。
aws s3 cp plugins.zip s3://YOUR_S3_BUCKET_NAME
/plugins.zip
使用 Amazon S3 控制台
Amazon S3 控制台是一个基于 Web 的UI ,允许您创建和管理 Amazon S3 桶中的资源。
要使用 Amazon S3 控制台上传,请执行以下操作
-
在 Amazon MWAA 控制台上打开 “环境” 页面。
-
选择环境。
-
在 S3 窗格的DAG代码中选择 S3 存储桶链接,在 Amazon S3 控制台上打开您的存储桶。
-
选择上传。
-
选择 添加文件。
-
选择 plugins.zip
的本地副本,选择上传。
在环境中安装自定义插件
本节介绍如何安装您上传到 Amazon S3 存储桶的自定义插件,方法是指定 plugins.zip 文件的路径,并在每次更新 zip 文件时指定 plugins.zip 文件的版本。
在 Amazon MWAA 控制台plugins.zip
上指定路径(第一次)
如果这是您首次将上传plugins.zip
到 Amazon S3 存储桶,则还需要在亚马逊MWAA控制台上指定文件路径。您只需要完成此步骤一次。
-
在 Amazon MWAA 控制台上打开 “环境” 页面。
-
选择环境。
-
选择编辑。
-
在 Amazon S3 窗格的DAG代码中,选择插件文件-可选字段旁边的浏览 S3。
-
选择 Amazon S3 存储桶中的 plugins.zip
文件。
-
选择选择。
-
选择下一步、更新环境。
在 Amazon MWAA 控制台上指定plugins.zip
版本
每次在 Amazon S3 存储桶中上传新版本时,都需要plugins.zip
在亚马逊MWAA控制台上指定plugins.zip
文件的版本。
-
在 Amazon MWAA 控制台上打开 “环境” 页面。
-
选择环境。
-
选择编辑。
-
在 Amazon S3 窗格的DAG代码中,从下拉列表中选择一个plugins.zip
版本。
-
选择下一步。
plugins.zip 的用例示例
接下来做什么?