安裝自定義插件 - Amazon Managed Workflows for Apache Airflow

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

安裝自定義插件

適用於 Apache 氣流的 Amazon 受管工作流程支援 Apache Airflow 的內建外掛程式管理器,可讓您使用自訂的 Apache 氣流操作員、掛鉤、感應器或介面。本頁說明使用plugins.zip檔案在您的 Amazon MWAA 環境中安裝 Apache 氣流自訂外掛程式的步驟。

必要條件

您需要下列項目,才能完成此頁面上的步驟。

運作方式

若要在您的環境中執行自訂外掛程式,您必須執行以下三項作業:

  1. 在本機建立plugins.zip檔案。

  2. 將本機plugins.zip檔案上傳到您的 Amazon S3 儲存貯體。

  3. 在 Amazon MWAA 主控台的外掛程式檔案欄位中指定此檔案的版本。

注意

如果這是您第一次上傳plugins.zip到 Amazon S3 儲存貯體,您還需要在 Amazon MWAA 主控台上指定檔案的路徑。您只需要完成此步驟一次。

何時使用插件

只有在擴充 Apache 氣流使用者介面時才需要外掛程式,如 Apache 氣流說明文件所述。自訂運算子可以直接放置在DAG程式碼旁邊的/dags資料夾中。

如果您需要建立自己與外部系統的整合,請將它們放在/dags資料夾或其中的子資料夾中,但不放在plugins.zip資料夾中。在阿帕奇氣流 2.x 中,插件主要用於擴展 UI。

同樣地,不應將其他相依性置於中plugins.zip。相反地,它們可以存放在 Amazon S3 /dags 資料夾下的位置,在 Apache 氣流開始之前,它們會同步到每個 Amazon MWAA 容器。

注意

/dags資料夾中或其中plugins.zip未明確定義 Apache Airflow DAG 物件的任何檔案都必須列在.airflowignore檔案中。

自定義插件概述

Apache Airflow 的內置插件管理器可以通過簡單地將文件放在文件$AIRFLOW_HOME/plugins夾中將外部功能集成到其核心。它允許您使用自定義的 Apache 氣流操作員,掛鉤,傳感器或接口。下節提供本機開發環境中平坦與巢狀目錄結構的範例,以及產生的 import 陳述式,這些陳述式會決定 plugins.zip 中的目錄結構。

自定義插件目錄和大小限制

Apache 氣流排程器和工作人員會在啟動期間,在 AWS受管理的 Fargate 容器上尋找自訂外掛程式,適用於您的環境。/usr/local/airflow/plugins/*

  • 目錄結構。目錄結構 (at/*) 以plugins.zip檔案的內容為基礎。例如,如果您將operators目錄作為頂層目錄plugins.zip包含,則該目錄將被解壓縮到您/usr/local/airflow/plugins/operators的環境中。

  • 大小限制。我們建議使用小於 1 GB 的plugins.zip檔案。plugins.zip檔案大小越大,環境上的啟動時間就越長。雖然 Amazon 並MWAA未明確限制plugins.zip檔案的大小,但是如果無法在十分鐘內安裝相依性,Fargate 服務會逾時並嘗試將環境回復到穩定狀態。

注意

對於使用 Apache 氣流 v1.10.12 或 Apache 氣流 v2.0.2 的環境,Amazon MWAA 限制了 Apache 氣流網絡服務器上的出站流量,並且不允許您直接在 Web 服務器上安裝插件或 Python 依賴項。從 Apache 氣流 v2.2.2 開始,Amazon MWAA 可以直接在 Web 服務器上安裝插件和依賴關係。

自定義插件的例子

以下章節使用 Apache Airflow 參考指南中的範例程式碼,說明如何建構本機開發環境。

在 plugins.zip 中使用平面目錄結構的示例

Apache Airflow v2

下面的例子顯示了 Apache 氣流 V2 的扁平目錄結構的plugins.zip文件。

範例 帶有 PythonVirtualenvOperator plugins.zip 的平面目錄

下列範例顯示中 PythonVirtualenvOperator 自訂外掛程式之 plugins.zip 檔案的頂層樹狀結構為 Apache 氣流創建一個自定義插件 PythonVirtualenvOperator

├── virtual_python_plugin.py
範例 plugins/virtual_python_plugin.py

下面的例子顯示了 PythonVirtualenvOperator 自定義插件。

""" Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. """ from airflow.plugins_manager import AirflowPlugin import airflow.utils.python_virtualenv from typing import List def _generate_virtualenv_cmd(tmp_dir: str, python_bin: str, system_site_packages: bool) -> List[str]: cmd = ['python3','/usr/local/airflow/.local/lib/python3.7/site-packages/virtualenv', tmp_dir] if system_site_packages: cmd.append('--system-site-packages') if python_bin is not None: cmd.append(f'--python={python_bin}') return cmd airflow.utils.python_virtualenv._generate_virtualenv_cmd=_generate_virtualenv_cmd class VirtualPythonPlugin(AirflowPlugin): name = 'virtual_python_plugin'
Apache Airflow v1

下面的例子顯示了一個plugins.zip文件與 Apache 氣流 V1 的扁平目錄結構。

範例 帶有 PythonVirtualenvOperator plugins.zip 的平面目錄

下列範例顯示中 PythonVirtualenvOperator 自訂外掛程式之 plugins.zip 檔案的頂層樹狀結構為 Apache 氣流創建一個自定義插件 PythonVirtualenvOperator

├── virtual_python_plugin.py
範例 plugins/virtual_python_plugin.py

下面的例子顯示了 PythonVirtualenvOperator 自定義插件。

from airflow.plugins_manager import AirflowPlugin from airflow.operators.python_operator import PythonVirtualenvOperator def _generate_virtualenv_cmd(self, tmp_dir): cmd = ['python3','/usr/local/airflow/.local/lib/python3.7/site-packages/virtualenv', tmp_dir] if self.system_site_packages: cmd.append('--system-site-packages') if self.python_version is not None: cmd.append('--python=python{}'.format(self.python_version)) return cmd PythonVirtualenvOperator._generate_virtualenv_cmd=_generate_virtualenv_cmd class EnvVarPlugin(AirflowPlugin): name = 'virtual_python_plugin'

在 plugins.zip 中使用嵌套目錄結構的示例

Apache Airflow v2

下列範例顯示的plugins.zip檔案具有不同目錄的hooksoperators、和 Apache 氣流 v2 的目sensors錄。

範例 plugins.zip
__init__.py my_airflow_plugin.py hooks/ |-- __init__.py |-- my_airflow_hook.py operators/ |-- __init__.py |-- my_airflow_operator.py |-- hello_operator.py sensors/ |-- __init__.py |-- my_airflow_sensor.py

下列範例會顯示 DAG (DAGs資料夾) 中使用自訂外掛程式的 import 陳述式。

範例 dags/your_dag.py
from airflow import DAG from datetime import datetime, timedelta from operators.my_airflow_operator import MyOperator from sensors.my_airflow_sensor import MySensor from operators.hello_operator import HelloOperator default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2018, 1, 1), 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), } with DAG('customdag', max_active_runs=3, schedule_interval='@once', default_args=default_args) as dag: sens = MySensor( task_id='taskA' ) op = MyOperator( task_id='taskB', my_field='some text' ) hello_task = HelloOperator(task_id='sample-task', name='foo_bar') sens >> op >> hello_task
範例 plugins/my_airflow_plugin.py
from airflow.plugins_manager import AirflowPlugin from hooks.my_airflow_hook import * from operators.my_airflow_operator import * class PluginName(AirflowPlugin): name = 'my_airflow_plugin' hooks = [MyHook] operators = [MyOperator] sensors = [MySensor]

下列範例顯示自訂外掛程式檔案中所需的每個 import 陳述式。

範例 hooks/my_airflow_hook.py
from airflow.hooks.base import BaseHook class MyHook(BaseHook): def my_method(self): print("Hello World")
範例 sensors/my_airflow_sensor.py
from airflow.sensors.base import BaseSensorOperator from airflow.utils.decorators import apply_defaults class MySensor(BaseSensorOperator): @apply_defaults def __init__(self, *args, **kwargs): super(MySensor, self).__init__(*args, **kwargs) def poke(self, context): return True
範例 operators/my_airflow_operator.py
from airflow.operators.bash import BaseOperator from airflow.utils.decorators import apply_defaults from hooks.my_airflow_hook import MyHook class MyOperator(BaseOperator): @apply_defaults def __init__(self, my_field, *args, **kwargs): super(MyOperator, self).__init__(*args, **kwargs) self.my_field = my_field def execute(self, context): hook = MyHook('my_conn') hook.my_method()
範例 operators/hello_operator.py
from airflow.models.baseoperator import BaseOperator from airflow.utils.decorators import apply_defaults class HelloOperator(BaseOperator): @apply_defaults def __init__( self, name: str, **kwargs) -> None: super().__init__(**kwargs) self.name = name def execute(self, context): message = "Hello {}".format(self.name) print(message) return message

請遵循使用 Amazon MWAA CLI 公用程式測試自訂外掛程式中的步驟,然後建立 plugins.zip 檔案以壓縮plugins目錄中的容。例如:cd plugins

Apache Airflow v1

下列範例顯示的plugins.zip檔案具有不同目錄的hooksoperators、以及 Apache 氣流 v1.10.12 的目sensors錄。

範例 plugins.zip
__init__.py my_airflow_plugin.py hooks/ |-- __init__.py |-- my_airflow_hook.py operators/ |-- __init__.py |-- my_airflow_operator.py |-- hello_operator.py sensors/ |-- __init__.py |-- my_airflow_sensor.py

下列範例會顯示 DAG (DAGs資料夾) 中使用自訂外掛程式的 import 陳述式。

範例 dags/your_dag.py
from airflow import DAG from datetime import datetime, timedelta from operators.my_operator import MyOperator from sensors.my_sensor import MySensor from operators.hello_operator import HelloOperator default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2018, 1, 1), 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), } with DAG('customdag', max_active_runs=3, schedule_interval='@once', default_args=default_args) as dag: sens = MySensor( task_id='taskA' ) op = MyOperator( task_id='taskB', my_field='some text' ) hello_task = HelloOperator(task_id='sample-task', name='foo_bar') sens >> op >> hello_task
範例 plugins/my_airflow_plugin.py
from airflow.plugins_manager import AirflowPlugin from hooks.my_airflow_hook import * from operators.my_airflow_operator import * from utils.my_utils import * class PluginName(AirflowPlugin): name = 'my_airflow_plugin' hooks = [MyHook] operators = [MyOperator] sensors = [MySensor]

下列範例顯示自訂外掛程式檔案中所需的每個 import 陳述式。

範例 hooks/my_airflow_hook.py
from airflow.hooks.base_hook import BaseHook class MyHook(BaseHook): def my_method(self): print("Hello World")
範例 sensors/my_airflow_sensor.py
from airflow.sensors.base_sensor_operator import BaseSensorOperator from airflow.utils.decorators import apply_defaults class MySensor(BaseSensorOperator): @apply_defaults def __init__(self, *args, **kwargs): super(MySensor, self).__init__(*args, **kwargs) def poke(self, context): return True
範例 operators/my_airflow_operator.py
from airflow.operators.bash_operator import BaseOperator from airflow.utils.decorators import apply_defaults from hooks.my_hook import MyHook class MyOperator(BaseOperator): @apply_defaults def __init__(self, my_field, *args, **kwargs): super(MyOperator, self).__init__(*args, **kwargs) self.my_field = my_field def execute(self, context): hook = MyHook('my_conn') hook.my_method()
範例 operators/hello_operator.py
from airflow.models.baseoperator import BaseOperator from airflow.utils.decorators import apply_defaults class HelloOperator(BaseOperator): @apply_defaults def __init__( self, name: str, **kwargs) -> None: super().__init__(**kwargs) self.name = name def execute(self, context): message = "Hello {}".format(self.name) print(message) return message

請遵循使用 Amazon MWAA CLI 公用程式測試自訂外掛程式中的步驟,然後建立 plugins.zip 檔案以壓縮plugins目錄中的容。例如:cd plugins

創建一個 plugins.zip 文件

下列步驟說明我們建議在本機建立 plugins.zip 檔案的步驟。

第一步:使用 Amazon MWAA CLI 公用程式測試自訂外掛程式

  • 命令列介面 (CLI) 公用程式可在本機複寫 Apache 氣流環境的 Amazon 受管工作流程。

  • 它會在本地CLI構建類似於 Amazon MWAA 生產映像的 Docker 容器映像。這可讓您在部署到 Amazon 之前執行本機 Apache Airflow 環境來開發和測試DAGs、自訂外掛程式和相依性MWAA。

  • 要運行CLI,請參閱(詳見) GitHub。aws-mwaa-local-runner

第二步:創建 plugins.zip 文件

您可以使用內建的ZIP封存公用程式或任何其他ZIP公用程式 (例如 7zip) 來建立 .zip 檔案。

注意

當您建立 .zip 檔案時,Windows 作業系統的內建 zip 公用程式可能會新增子資料夾。我們建議您在上傳到 Amazon S3 儲存貯體之前先驗證 plugins.zip 檔案的內容,以確保沒有新增其他目錄。

  1. 將目錄變更為您的本機 Airflow 外掛程式目錄。例如:

    myproject$ cd plugins
  2. 執行下列命令,以確保內容具有可執行權限 (僅限 macOS 和 Linux)。

    plugins$ chmod -R 755 .
  3. 壓縮文plugins件夾中的內容。

    plugins$ zip -r plugins.zip .

上傳plugins.zip到 Amazon S3

您可以使用 Amazon S3 主控台或 AWS Command Line Interface (AWS CLI) 將plugins.zip檔案上傳到您的 Amazon S3 儲存貯體。

使用 AWS CLI

AWS Command Line Interface (AWS CLI) 是開放原始碼工具,可讓您使用命令列殼層中的命令與 AWS 服務互動。若要完成此頁面上的步驟,您需要下列項目:

若要使用 AWS CLI
  1. 在命令提示字元中,導覽至儲存plugins.zip檔案的目錄。例如:

    cd plugins
  2. 使用下列命令列出您所有的 Amazon S3 儲存貯體。

    aws s3 ls
  3. 使用下列命令列出您環境之 Amazon S3 儲存貯體中的檔案和資料夾。

    aws s3 ls s3://YOUR_S3_BUCKET_NAME
  4. 使用下列命令將plugins.zip檔案上傳到您環境的 Amazon S3 儲存貯體。

    aws s3 cp plugins.zip s3://YOUR_S3_BUCKET_NAME/plugins.zip

使用 Amazon S3 主控台

Amazon S3 主控台是一個以網路為基礎的使用者界面,可讓您建立和管理 Amazon S3 儲存貯體中的資源。

使用 Amazon S3 主控台上傳
  1. 在 Amazon MWAA 控制台上打開「環境」頁面

  2. 選擇一個環境。

  3. S3 窗格的DAG程式碼中選取 S3 儲存貯體連結,以在 Amazon S3 主控台上開啟儲存貯體。

  4. 選擇上傳

  5. 選擇 [新增檔案]。

  6. 選擇您的本地副本plugins.zip,選擇上傳

在環境中安裝自訂外掛程式

本節說明如何安裝您上傳到 Amazon S3 儲存貯體的自訂外掛程式,方法是指定 plugins.zip 檔案的路徑,並在每次更新 zip 檔案時指定 plugins.zip 檔案的版本。

在 Amazon MWAA 主控台plugins.zip上指定的路徑 (第一次)

如果這是您第一次上傳plugins.zip到 Amazon S3 儲存貯體,您還需要在 Amazon MWAA 主控台上指定檔案的路徑。您只需要完成此步驟一次。

  1. 在 Amazon MWAA 控制台上打開「環境」頁面

  2. 選擇一個環境。

  3. 選擇編輯

  4. 在 Amazon S3 窗格中的程式DAG碼上,選擇外掛程式檔案旁邊的瀏覽 S3-選用欄位。

  5. 選取您的 Amazon S3 儲存貯體上的plugins.zip檔案。

  6. 選擇 Choose (選擇)

  7. 選擇下一步更新環境

在 Amazon MWAA 控制台上指定plugins.zip版本

每次plugins.zip在 Amazon Amazon S3 上傳新版本時,都需要在 Amazon MWAA 主控台上指定plugins.zip檔案的版本。

  1. 在 Amazon MWAA 控制台上打開「環境」頁面

  2. 選擇一個環境。

  3. 選擇編輯

  4. 在 Amazon S3 窗格中的程式DAG碼上,從下拉式清單中選擇一個plugins.zip版本。

  5. 選擇 Next (下一步)

plugins.zip 的範例使用案例

後續步驟?

  • 使用 aws-mwaa-local-runneron 在本地測試您的DAGs自定義插件和 Python 依賴關係 GitHub。