本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
安裝自定義插件
適用於 Apache 氣流的 Amazon 受管工作流程支援 Apache Airflow 的內建外掛程式管理器,可讓您使用自訂的 Apache 氣流操作員、掛鉤、感應器或介面。本頁說明使用plugins.zip
檔案在您的 Amazon MWAA 環境中安裝 Apache 氣流自訂外掛程式的步驟。
必要條件
您需要下列項目,才能完成此頁面上的步驟。
運作方式
若要在您的環境中執行自訂外掛程式,您必須執行以下三項作業:
-
在本機建立plugins.zip
檔案。
-
將本機plugins.zip
檔案上傳到您的 Amazon S3 儲存貯體。
-
在 Amazon MWAA 主控台的外掛程式檔案欄位中指定此檔案的版本。
如果這是您第一次上傳plugins.zip
到 Amazon S3 儲存貯體,您還需要在 Amazon MWAA 主控台上指定檔案的路徑。您只需要完成此步驟一次。
何時使用插件
只有在擴充 Apache 氣流使用者介面時才需要外掛程式,如 Apache 氣流說明文件所述。自訂運算子可以直接放置在DAG
程式碼旁邊的/dags
資料夾中。
如果您需要建立自己與外部系統的整合,請將它們放在/dags
資料夾或其中的子資料夾中,但不放在plugins.zip
資料夾中。在阿帕奇氣流 2.x 中,插件主要用於擴展 UI。
同樣地,不應將其他相依性置於中plugins.zip
。相反地,它們可以存放在 Amazon S3 /dags
資料夾下的位置,在 Apache 氣流開始之前,它們會同步到每個 Amazon MWAA 容器。
/dags
資料夾中或其中plugins.zip
未明確定義 Apache Airflow DAG 物件的任何檔案都必須列在.airflowignore
檔案中。
自定義插件概述
Apache Airflow 的內置插件管理器可以通過簡單地將文件放在文件$AIRFLOW_HOME/plugins
夾中將外部功能集成到其核心。它允許您使用自定義的 Apache 氣流操作員,掛鉤,傳感器或接口。下節提供本機開發環境中平坦與巢狀目錄結構的範例,以及產生的 import 陳述式,這些陳述式會決定 plugins.zip 中的目錄結構。
自定義插件目錄和大小限制
Apache 氣流排程器和工作人員會在啟動期間,在 AWS受管理的 Fargate 容器上尋找自訂外掛程式,適用於您的環境。/usr/local/airflow/plugins/*
-
目錄結構。目錄結構 (at/*
) 以plugins.zip
檔案的內容為基礎。例如,如果您將operators
目錄作為頂層目錄plugins.zip
包含,則該目錄將被解壓縮到您/usr/local/airflow/plugins/operators
的環境中。
-
大小限制。我們建議使用小於 1 GB 的plugins.zip
檔案。plugins.zip
檔案大小越大,環境上的啟動時間就越長。雖然 Amazon 並MWAA未明確限制plugins.zip
檔案的大小,但是如果無法在十分鐘內安裝相依性,Fargate 服務會逾時並嘗試將環境回復到穩定狀態。
對於使用 Apache 氣流 v1.10.12 或 Apache 氣流 v2.0.2 的環境,Amazon MWAA 限制了 Apache 氣流網絡服務器上的出站流量,並且不允許您直接在 Web 服務器上安裝插件或 Python 依賴項。從 Apache 氣流 v2.2.2 開始,Amazon MWAA 可以直接在 Web 服務器上安裝插件和依賴關係。
自定義插件的例子
以下章節使用 Apache Airflow 參考指南中的範例程式碼,說明如何建構本機開發環境。
在 plugins.zip 中使用平面目錄結構的示例
- Apache Airflow v2
-
下面的例子顯示了 Apache 氣流 V2 的扁平目錄結構的plugins.zip
文件。
範例 plugins/virtual_python_plugin.py
下面的例子顯示了 PythonVirtualenvOperator 自定義插件。
"""
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software is furnished to do so.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
"""
from airflow.plugins_manager import AirflowPlugin
import airflow.utils.python_virtualenv
from typing import List
def _generate_virtualenv_cmd(tmp_dir: str, python_bin: str, system_site_packages: bool) -> List[str]:
cmd = ['python3','/usr/local/airflow/.local/lib/python3.7/site-packages/virtualenv', tmp_dir]
if system_site_packages:
cmd.append('--system-site-packages')
if python_bin is not None:
cmd.append(f'--python={python_bin}')
return cmd
airflow.utils.python_virtualenv._generate_virtualenv_cmd=_generate_virtualenv_cmd
class VirtualPythonPlugin(AirflowPlugin):
name = 'virtual_python_plugin'
- Apache Airflow v1
-
下面的例子顯示了一個plugins.zip
文件與 Apache 氣流 V1 的扁平目錄結構。
範例 plugins/virtual_python_plugin.py
下面的例子顯示了 PythonVirtualenvOperator 自定義插件。
from airflow.plugins_manager import AirflowPlugin
from airflow.operators.python_operator import PythonVirtualenvOperator
def _generate_virtualenv_cmd(self, tmp_dir):
cmd = ['python3','/usr/local/airflow/.local/lib/python3.7/site-packages/virtualenv', tmp_dir]
if self.system_site_packages:
cmd.append('--system-site-packages')
if self.python_version is not None:
cmd.append('--python=python{}'.format(self.python_version))
return cmd
PythonVirtualenvOperator._generate_virtualenv_cmd=_generate_virtualenv_cmd
class EnvVarPlugin(AirflowPlugin):
name = 'virtual_python_plugin'
在 plugins.zip 中使用嵌套目錄結構的示例
- Apache Airflow v2
-
下列範例顯示的plugins.zip
檔案具有不同目錄的hooks
operators
、和 Apache 氣流 v2 的目sensors
錄。
範例 plugins.zip
__init__.py
my_airflow_plugin.py
hooks/
|-- __init__.py
|-- my_airflow_hook.py
operators/
|-- __init__.py
|-- my_airflow_operator.py
|-- hello_operator.py
sensors/
|-- __init__.py
|-- my_airflow_sensor.py
下列範例會顯示 DAG (DAGs資料夾) 中使用自訂外掛程式的 import 陳述式。
範例 dags/your_dag.py
from airflow import DAG
from datetime import datetime, timedelta
from operators.my_airflow_operator import MyOperator
from sensors.my_airflow_sensor import MySensor
from operators.hello_operator import HelloOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2018, 1, 1),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
with DAG('customdag',
max_active_runs=3,
schedule_interval='@once',
default_args=default_args) as dag:
sens = MySensor(
task_id='taskA'
)
op = MyOperator(
task_id='taskB',
my_field='some text'
)
hello_task = HelloOperator(task_id='sample-task', name='foo_bar')
sens >> op >> hello_task
範例 plugins/my_airflow_plugin.py
from airflow.plugins_manager import AirflowPlugin
from hooks.my_airflow_hook import *
from operators.my_airflow_operator import *
class PluginName(AirflowPlugin):
name = 'my_airflow_plugin'
hooks = [MyHook]
operators = [MyOperator]
sensors = [MySensor]
下列範例顯示自訂外掛程式檔案中所需的每個 import 陳述式。
範例 hooks/my_airflow_hook.py
from airflow.hooks.base import BaseHook
class MyHook(BaseHook):
def my_method(self):
print("Hello World")
範例 sensors/my_airflow_sensor.py
from airflow.sensors.base import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
class MySensor(BaseSensorOperator):
@apply_defaults
def __init__(self,
*args,
**kwargs):
super(MySensor, self).__init__(*args, **kwargs)
def poke(self, context):
return True
範例 operators/my_airflow_operator.py
from airflow.operators.bash import BaseOperator
from airflow.utils.decorators import apply_defaults
from hooks.my_airflow_hook import MyHook
class MyOperator(BaseOperator):
@apply_defaults
def __init__(self,
my_field,
*args,
**kwargs):
super(MyOperator, self).__init__(*args, **kwargs)
self.my_field = my_field
def execute(self, context):
hook = MyHook('my_conn')
hook.my_method()
範例 operators/hello_operator.py
from airflow.models.baseoperator import BaseOperator
from airflow.utils.decorators import apply_defaults
class HelloOperator(BaseOperator):
@apply_defaults
def __init__(
self,
name: str,
**kwargs) -> None:
super().__init__(**kwargs)
self.name = name
def execute(self, context):
message = "Hello {}".format(self.name)
print(message)
return message
請遵循使用 Amazon MWAA CLI 公用程式測試自訂外掛程式中的步驟,然後建立 plugins.zip 檔案以壓縮plugins
目錄中的內容。例如:cd plugins
。
- Apache Airflow v1
-
下列範例顯示的plugins.zip
檔案具有不同目錄的hooks
operators
、以及 Apache 氣流 v1.10.12 的目sensors
錄。
範例 plugins.zip
__init__.py
my_airflow_plugin.py
hooks/
|-- __init__.py
|-- my_airflow_hook.py
operators/
|-- __init__.py
|-- my_airflow_operator.py
|-- hello_operator.py
sensors/
|-- __init__.py
|-- my_airflow_sensor.py
下列範例會顯示 DAG (DAGs資料夾) 中使用自訂外掛程式的 import 陳述式。
範例 dags/your_dag.py
from airflow import DAG
from datetime import datetime, timedelta
from operators.my_operator import MyOperator
from sensors.my_sensor import MySensor
from operators.hello_operator import HelloOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2018, 1, 1),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
with DAG('customdag',
max_active_runs=3,
schedule_interval='@once',
default_args=default_args) as dag:
sens = MySensor(
task_id='taskA'
)
op = MyOperator(
task_id='taskB',
my_field='some text'
)
hello_task = HelloOperator(task_id='sample-task', name='foo_bar')
sens >> op >> hello_task
範例 plugins/my_airflow_plugin.py
from airflow.plugins_manager import AirflowPlugin
from hooks.my_airflow_hook import *
from operators.my_airflow_operator import *
from utils.my_utils import *
class PluginName(AirflowPlugin):
name = 'my_airflow_plugin'
hooks = [MyHook]
operators = [MyOperator]
sensors = [MySensor]
下列範例顯示自訂外掛程式檔案中所需的每個 import 陳述式。
範例 hooks/my_airflow_hook.py
from airflow.hooks.base_hook import BaseHook
class MyHook(BaseHook):
def my_method(self):
print("Hello World")
範例 sensors/my_airflow_sensor.py
from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
class MySensor(BaseSensorOperator):
@apply_defaults
def __init__(self,
*args,
**kwargs):
super(MySensor, self).__init__(*args, **kwargs)
def poke(self, context):
return True
範例 operators/my_airflow_operator.py
from airflow.operators.bash_operator import BaseOperator
from airflow.utils.decorators import apply_defaults
from hooks.my_hook import MyHook
class MyOperator(BaseOperator):
@apply_defaults
def __init__(self,
my_field,
*args,
**kwargs):
super(MyOperator, self).__init__(*args, **kwargs)
self.my_field = my_field
def execute(self, context):
hook = MyHook('my_conn')
hook.my_method()
範例 operators/hello_operator.py
from airflow.models.baseoperator import BaseOperator
from airflow.utils.decorators import apply_defaults
class HelloOperator(BaseOperator):
@apply_defaults
def __init__(
self,
name: str,
**kwargs) -> None:
super().__init__(**kwargs)
self.name = name
def execute(self, context):
message = "Hello {}".format(self.name)
print(message)
return message
請遵循使用 Amazon MWAA CLI 公用程式測試自訂外掛程式中的步驟,然後建立 plugins.zip 檔案以壓縮plugins
目錄中的內容。例如:cd plugins
。
創建一個 plugins.zip 文件
下列步驟說明我們建議在本機建立 plugins.zip 檔案的步驟。
第一步:使用 Amazon MWAA CLI 公用程式測試自訂外掛程式
-
命令列介面 (CLI) 公用程式可在本機複寫 Apache 氣流環境的 Amazon 受管工作流程。
-
它會在本地CLI構建類似於 Amazon MWAA 生產映像的 Docker 容器映像。這可讓您在部署到 Amazon 之前執行本機 Apache Airflow 環境來開發和測試DAGs、自訂外掛程式和相依性MWAA。
-
要運行CLI,請參閱(詳見) GitHub。aws-mwaa-local-runner
第二步:創建 plugins.zip 文件
您可以使用內建的ZIP封存公用程式或任何其他ZIP公用程式 (例如 7zip) 來建立 .zip 檔案。
當您建立 .zip 檔案時,Windows 作業系統的內建 zip 公用程式可能會新增子資料夾。我們建議您在上傳到 Amazon S3 儲存貯體之前先驗證 plugins.zip 檔案的內容,以確保沒有新增其他目錄。
-
將目錄變更為您的本機 Airflow 外掛程式目錄。例如:
myproject$ cd plugins
-
執行下列命令,以確保內容具有可執行權限 (僅限 macOS 和 Linux)。
plugins$ chmod -R 755 .
-
壓縮文plugins
件夾中的內容。
plugins$ zip -r plugins.zip .
上傳plugins.zip
到 Amazon S3
您可以使用 Amazon S3 主控台或 AWS Command Line Interface (AWS CLI) 將plugins.zip
檔案上傳到您的 Amazon S3 儲存貯體。
使用 AWS CLI
AWS Command Line Interface (AWS CLI) 是開放原始碼工具,可讓您使用命令列殼層中的命令與 AWS 服務互動。若要完成此頁面上的步驟,您需要下列項目:
若要使用 AWS CLI
-
在命令提示字元中,導覽至儲存plugins.zip
檔案的目錄。例如:
cd plugins
-
使用下列命令列出您所有的 Amazon S3 儲存貯體。
aws s3 ls
-
使用下列命令列出您環境之 Amazon S3 儲存貯體中的檔案和資料夾。
aws s3 ls s3://YOUR_S3_BUCKET_NAME
-
使用下列命令將plugins.zip
檔案上傳到您環境的 Amazon S3 儲存貯體。
aws s3 cp plugins.zip s3://YOUR_S3_BUCKET_NAME
/plugins.zip
使用 Amazon S3 主控台
Amazon S3 主控台是一個以網路為基礎的使用者界面,可讓您建立和管理 Amazon S3 儲存貯體中的資源。
使用 Amazon S3 主控台上傳
-
在 Amazon MWAA 控制台上打開「環境」頁面。
-
選擇一個環境。
-
在 S3 窗格的DAG程式碼中選取 S3 儲存貯體連結,以在 Amazon S3 主控台上開啟儲存貯體。
-
選擇上傳。
-
選擇 [新增檔案]。
-
選擇您的本地副本plugins.zip
,選擇上傳。
在環境中安裝自訂外掛程式
本節說明如何安裝您上傳到 Amazon S3 儲存貯體的自訂外掛程式,方法是指定 plugins.zip 檔案的路徑,並在每次更新 zip 檔案時指定 plugins.zip 檔案的版本。
在 Amazon MWAA 主控台plugins.zip
上指定的路徑 (第一次)
如果這是您第一次上傳plugins.zip
到 Amazon S3 儲存貯體,您還需要在 Amazon MWAA 主控台上指定檔案的路徑。您只需要完成此步驟一次。
-
在 Amazon MWAA 控制台上打開「環境」頁面。
-
選擇一個環境。
-
選擇編輯。
-
在 Amazon S3 窗格中的程式DAG碼上,選擇外掛程式檔案旁邊的瀏覽 S3-選用欄位。
-
選取您的 Amazon S3 儲存貯體上的plugins.zip
檔案。
-
選擇 Choose (選擇)。
-
選擇下一步,更新環境。
在 Amazon MWAA 控制台上指定plugins.zip
版本
每次plugins.zip
在 Amazon Amazon S3 上傳新版本時,都需要在 Amazon MWAA 主控台上指定plugins.zip
檔案的版本。
-
在 Amazon MWAA 控制台上打開「環境」頁面。
-
選擇一個環境。
-
選擇編輯。
-
在 Amazon S3 窗格中的程式DAG碼上,從下拉式清單中選擇一個plugins.zip
版本。
-
選擇 Next (下一步)。
plugins.zip 的範例使用案例
後續步驟?