Installation benutzerdefinierter Plugins - Amazon Managed Workflows für Apache Airflow

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Installation benutzerdefinierter Plugins

Amazon Managed Workflows for Apache Airflow unterstützt den integrierten Plugin-Manager von Apache Airflow, sodass Sie benutzerdefinierte Apache Airflow-Operatoren, Hooks, Sensoren oder Schnittstellen verwenden können. Auf dieser Seite werden die Schritte zur Installation benutzerdefinierter Apache Airflow-Plugins in Ihrer MWAA Amazon-Umgebung mithilfe einer plugins.zip Datei beschrieben.

Voraussetzungen

Sie benötigen Folgendes, bevor Sie die Schritte auf dieser Seite ausführen können.

  • Berechtigungen — Ihr AWS Konto muss von Ihrem Administrator Zugriff auf die mazonMWAAFullConsoleAccessA-Zugriffskontrollrichtlinie für Ihre Umgebung erhalten haben. Darüber hinaus muss Ihrer MWAA Amazon-Umgebung von Ihrer Ausführungsrolle der Zugriff auf die von Ihrer Umgebung verwendeten AWS Ressourcen gestattet werden.

  • Zugriff — Wenn Sie Zugriff auf öffentliche Repositorys benötigen, um Abhängigkeiten direkt auf dem Webserver zu installieren, muss Ihre Umgebung für den Zugriff auf öffentliche Netzwerk-Webserver konfiguriert sein. Weitere Informationen finden Sie unter Apache Airflow-Zugriffsmodi.

  • Amazon S3 S3-Konfiguration — Der Amazon S3 S3-BucketDAGs, in dem Ihre benutzerdefinierten Plugins und Python-Abhängigkeiten gespeichert werdenplugins.zip, requirements.txt muss mit geblocktem öffentlichem Zugriff und aktivierter Versionierung konfiguriert sein.

Funktionsweise

Um benutzerdefinierte Plugins in Ihrer Umgebung auszuführen, müssen Sie drei Dinge tun:

  1. Erstellen Sie lokal eine plugins.zip Datei.

  2. Laden Sie die lokale plugins.zip Datei in Ihren Amazon S3 S3-Bucket hoch.

  3. Geben Sie die Version dieser Datei im Feld Plugins-Datei auf der MWAA Amazon-Konsole an.

Anmerkung

Wenn Sie zum ersten Mal einen plugins.zip in Ihren Amazon S3 S3-Bucket hochladen, müssen Sie auch den Pfad zu der Datei auf der MWAA Amazon-Konsole angeben. Sie müssen diesen Schritt nur einmal ausführen.

Wann sollten die Plugins verwendet werden

Plugins sind nur für die Erweiterung der Apache Airflow-Benutzeroberfläche erforderlich, wie in der Apache Airflow-Dokumentation beschrieben. Benutzerdefinierte Operatoren können direkt im /dags Ordner neben Ihrem DAG Code platziert werden.

Wenn Sie Ihre eigenen Integrationen mit externen Systemen erstellen müssen, platzieren Sie sie im dags Ordner/oder einem Unterordner darin, aber nicht im plugins.zip Ordner. In Apache Airflow 2.x werden Plugins hauptsächlich zur Erweiterung der Benutzeroberfläche verwendet.

Ebenso sollten andere Abhängigkeiten nicht hinzugefügt werden. plugins.zip Stattdessen können sie an einem Ort unter dem Amazon S3 /dags S3-Ordner gespeichert werden, wo sie vor dem Start von Apache Airflow mit jedem MWAA Amazon-Container synchronisiert werden.

Anmerkung

Jede Datei im /dags Ordner oder in der Dateiplugins.zip, die nicht explizit ein Apache DAG Airflow-Objekt definiert, muss in einer .airflowignore Datei aufgeführt werden.

Übersicht über benutzerdefinierte Plugins

Der integrierte Plugin-Manager von Apache Airflow kann externe Funktionen in seinen Kern integrieren, indem er Dateien einfach in einem $AIRFLOW_HOME/plugins Ordner ablegt. Es ermöglicht Ihnen, benutzerdefinierte Apache Airflow-Operatoren, Hooks, Sensoren oder Schnittstellen zu verwenden. Der folgende Abschnitt enthält ein Beispiel für flache und verschachtelte Verzeichnisstrukturen in einer lokalen Entwicklungsumgebung und die daraus resultierenden Importanweisungen, die die Verzeichnisstruktur innerhalb einer plugins.zip bestimmen.

Verzeichnis- und Größenbeschränkungen für benutzerdefinierte Plugins

Der Apache Airflow Scheduler und die Workers suchen beim Start auf dem AWS-verwalteten Fargate-Container für Ihre Umgebung unter nach benutzerdefinierten Plugins. /usr/local/airflow/plugins/*

  • Verzeichnisstruktur. Die Verzeichnisstruktur (at/*) basiert auf dem Inhalt Ihrer plugins.zip Datei. Wenn Ihr Verzeichnis beispielsweise als operators Verzeichnis der obersten Ebene plugins.zip enthält, wird das Verzeichnis in Ihre Umgebung extrahiert. /usr/local/airflow/plugins/operators

  • Größenbeschränkung. Wir empfehlen eine plugins.zip Datei mit weniger als 1 GB. Je größer eine plugins.zip Datei, desto länger ist die Startzeit in einer Umgebung. Amazon begrenzt zwar MWAA nicht explizit die Größe einer plugins.zip Datei, aber wenn Abhängigkeiten nicht innerhalb von zehn Minuten installiert werden können, gibt der Fargate-Dienst ein Timeout ab und versucht, die Umgebung auf einen stabilen Zustand zurückzusetzen.

Anmerkung

Für Umgebungen, die Apache Airflow v1.10.12 oder Apache Airflow v2.0.2 verwenden, MWAA begrenzt Amazon den ausgehenden Datenverkehr auf dem Apache Airflow-Webserver und erlaubt Ihnen nicht, Plugins oder Python-Abhängigkeiten direkt auf dem Webserver zu installieren. Ab Apache Airflow v2.2.2 MWAA kann Amazon Plugins und Abhängigkeiten direkt auf dem Webserver installieren.

Beispiele für benutzerdefinierte Plugins

Im folgenden Abschnitt wird anhand von Beispielcode aus dem Apache Airflow-Referenzhandbuch gezeigt, wie Sie Ihre lokale Entwicklungsumgebung strukturieren können.

Beispiel für die Verwendung einer flachen Verzeichnisstruktur in plugins.zip

Apache Airflow v2

Das folgende Beispiel zeigt eine plugins.zip Datei mit einer flachen Verzeichnisstruktur für Apache Airflow v2.

Beispiel flaches Verzeichnis mit plugins.zip PythonVirtualenvOperator

Das folgende Beispiel zeigt die oberste Baumstruktur einer Datei plugins.zip für das PythonVirtualenvOperator benutzerdefinierte Plugin inEin benutzerdefiniertes Plugin für Apache Airflow erstellen PythonVirtualenvOperator.

├── virtual_python_plugin.py
Beispiel plugins/virtual_python_plugin.py

Das folgende Beispiel zeigt das PythonVirtualenvOperator benutzerdefinierte Plugin.

""" Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. """ from airflow.plugins_manager import AirflowPlugin import airflow.utils.python_virtualenv from typing import List def _generate_virtualenv_cmd(tmp_dir: str, python_bin: str, system_site_packages: bool) -> List[str]: cmd = ['python3','/usr/local/airflow/.local/lib/python3.7/site-packages/virtualenv', tmp_dir] if system_site_packages: cmd.append('--system-site-packages') if python_bin is not None: cmd.append(f'--python={python_bin}') return cmd airflow.utils.python_virtualenv._generate_virtualenv_cmd=_generate_virtualenv_cmd class VirtualPythonPlugin(AirflowPlugin): name = 'virtual_python_plugin'
Apache Airflow v1

Das folgende Beispiel zeigt eine plugins.zip Datei mit einer flachen Verzeichnisstruktur für Apache Airflow v1.

Beispiel flaches Verzeichnis mit plugins.zip PythonVirtualenvOperator

Das folgende Beispiel zeigt die oberste Baumstruktur einer Datei plugins.zip für das PythonVirtualenvOperator benutzerdefinierte Plugin inEin benutzerdefiniertes Plugin für Apache Airflow erstellen PythonVirtualenvOperator.

├── virtual_python_plugin.py
Beispiel plugins/virtual_python_plugin.py

Das folgende Beispiel zeigt das PythonVirtualenvOperator benutzerdefinierte Plugin.

from airflow.plugins_manager import AirflowPlugin from airflow.operators.python_operator import PythonVirtualenvOperator def _generate_virtualenv_cmd(self, tmp_dir): cmd = ['python3','/usr/local/airflow/.local/lib/python3.7/site-packages/virtualenv', tmp_dir] if self.system_site_packages: cmd.append('--system-site-packages') if self.python_version is not None: cmd.append('--python=python{}'.format(self.python_version)) return cmd PythonVirtualenvOperator._generate_virtualenv_cmd=_generate_virtualenv_cmd class EnvVarPlugin(AirflowPlugin): name = 'virtual_python_plugin'

Beispiel mit einer verschachtelten Verzeichnisstruktur in plugins.zip

Apache Airflow v2

Das folgende Beispiel zeigt eine plugins.zip Datei mit separaten Verzeichnissen für hooksoperators, und einem sensors Verzeichnis für Apache Airflow v2.

Beispiel plugins.zip
__init__.py my_airflow_plugin.py hooks/ |-- __init__.py |-- my_airflow_hook.py operators/ |-- __init__.py |-- my_airflow_operator.py |-- hello_operator.py sensors/ |-- __init__.py |-- my_airflow_sensor.py

Das folgende Beispiel zeigt die Import-Anweisungen in dem DAG (DAGsOrdner), der die benutzerdefinierten Plugins verwendet.

Beispiel dags/your_dag.py
from airflow import DAG from datetime import datetime, timedelta from operators.my_airflow_operator import MyOperator from sensors.my_airflow_sensor import MySensor from operators.hello_operator import HelloOperator default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2018, 1, 1), 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), } with DAG('customdag', max_active_runs=3, schedule_interval='@once', default_args=default_args) as dag: sens = MySensor( task_id='taskA' ) op = MyOperator( task_id='taskB', my_field='some text' ) hello_task = HelloOperator(task_id='sample-task', name='foo_bar') sens >> op >> hello_task
Beispiel plugins/my_airflow_plugin.py
from airflow.plugins_manager import AirflowPlugin from hooks.my_airflow_hook import * from operators.my_airflow_operator import * class PluginName(AirflowPlugin): name = 'my_airflow_plugin' hooks = [MyHook] operators = [MyOperator] sensors = [MySensor]

Die folgenden Beispiele zeigen die einzelnen Importanweisungen, die in den benutzerdefinierten Plugin-Dateien benötigt werden.

Beispiel hooks/my_airflow_hook.py
from airflow.hooks.base import BaseHook class MyHook(BaseHook): def my_method(self): print("Hello World")
Beispiel sensors/my_airflow_sensor.py
from airflow.sensors.base import BaseSensorOperator from airflow.utils.decorators import apply_defaults class MySensor(BaseSensorOperator): @apply_defaults def __init__(self, *args, **kwargs): super(MySensor, self).__init__(*args, **kwargs) def poke(self, context): return True
Beispiel operators/my_airflow_operator.py
from airflow.operators.bash import BaseOperator from airflow.utils.decorators import apply_defaults from hooks.my_airflow_hook import MyHook class MyOperator(BaseOperator): @apply_defaults def __init__(self, my_field, *args, **kwargs): super(MyOperator, self).__init__(*args, **kwargs) self.my_field = my_field def execute(self, context): hook = MyHook('my_conn') hook.my_method()
Beispiel operators/hello_operator.py
from airflow.models.baseoperator import BaseOperator from airflow.utils.decorators import apply_defaults class HelloOperator(BaseOperator): @apply_defaults def __init__( self, name: str, **kwargs) -> None: super().__init__(**kwargs) self.name = name def execute(self, context): message = "Hello {}".format(self.name) print(message) return message

Folgen Sie den Schritten unter Testen benutzerdefinierter Plug-ins mit dem MWAA CLI Amazon-Hilfsprogramm und dann Erstellen einer Datei plugins.zip, um den Inhalt in Ihrem plugins Verzeichnis zu komprimieren. Beispiel, cd plugins.

Apache Airflow v1

Das folgende Beispiel zeigt eine plugins.zip Datei mit separaten Verzeichnissen für hooksoperators, und einem sensors Verzeichnis für Apache Airflow v1.10.12.

Beispiel plugins.zip
__init__.py my_airflow_plugin.py hooks/ |-- __init__.py |-- my_airflow_hook.py operators/ |-- __init__.py |-- my_airflow_operator.py |-- hello_operator.py sensors/ |-- __init__.py |-- my_airflow_sensor.py

Das folgende Beispiel zeigt die Import-Anweisungen in dem DAG (DAGsOrdner), der die benutzerdefinierten Plugins verwendet.

Beispiel dags/your_dag.py
from airflow import DAG from datetime import datetime, timedelta from operators.my_operator import MyOperator from sensors.my_sensor import MySensor from operators.hello_operator import HelloOperator default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2018, 1, 1), 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), } with DAG('customdag', max_active_runs=3, schedule_interval='@once', default_args=default_args) as dag: sens = MySensor( task_id='taskA' ) op = MyOperator( task_id='taskB', my_field='some text' ) hello_task = HelloOperator(task_id='sample-task', name='foo_bar') sens >> op >> hello_task
Beispiel plugins/my_airflow_plugin.py
from airflow.plugins_manager import AirflowPlugin from hooks.my_airflow_hook import * from operators.my_airflow_operator import * from utils.my_utils import * class PluginName(AirflowPlugin): name = 'my_airflow_plugin' hooks = [MyHook] operators = [MyOperator] sensors = [MySensor]

Die folgenden Beispiele zeigen die einzelnen Importanweisungen, die in den benutzerdefinierten Plugin-Dateien benötigt werden.

Beispiel hooks/my_airflow_hook.py
from airflow.hooks.base_hook import BaseHook class MyHook(BaseHook): def my_method(self): print("Hello World")
Beispiel sensors/my_airflow_sensor.py
from airflow.sensors.base_sensor_operator import BaseSensorOperator from airflow.utils.decorators import apply_defaults class MySensor(BaseSensorOperator): @apply_defaults def __init__(self, *args, **kwargs): super(MySensor, self).__init__(*args, **kwargs) def poke(self, context): return True
Beispiel operators/my_airflow_operator.py
from airflow.operators.bash_operator import BaseOperator from airflow.utils.decorators import apply_defaults from hooks.my_hook import MyHook class MyOperator(BaseOperator): @apply_defaults def __init__(self, my_field, *args, **kwargs): super(MyOperator, self).__init__(*args, **kwargs) self.my_field = my_field def execute(self, context): hook = MyHook('my_conn') hook.my_method()
Beispiel operators/hello_operator.py
from airflow.models.baseoperator import BaseOperator from airflow.utils.decorators import apply_defaults class HelloOperator(BaseOperator): @apply_defaults def __init__( self, name: str, **kwargs) -> None: super().__init__(**kwargs) self.name = name def execute(self, context): message = "Hello {}".format(self.name) print(message) return message

Folgen Sie den Schritten unter Testen benutzerdefinierter Plug-ins mit dem MWAA CLI Amazon-Hilfsprogramm und dann Erstellen einer Datei plugins.zip, um den Inhalt in Ihrem plugins Verzeichnis zu komprimieren. Beispiel, cd plugins.

Eine Datei plugins.zip erstellen

In den folgenden Schritten werden die Schritte beschrieben, die wir empfehlen, um eine Datei plugins.zip lokal zu erstellen.

Schritt eins: Testen Sie benutzerdefinierte Plugins mit dem MWAA CLI Amazon-Hilfsprogramm

  • Das Befehlszeilenprogramm interface (CLI) repliziert eine Amazon Managed Workflows for Apache Airflow-Umgebung lokal.

  • Das CLI erstellt lokal ein Docker-Container-Image, das einem MWAA Amazon-Produktions-Image ähnelt. Auf diese Weise können Sie eine lokale Apache Airflow-Umgebung ausführen, um benutzerdefinierte Plugins und Abhängigkeiten zu entwickeln und zu testenDAGs, bevor Sie sie auf Amazon MWAA bereitstellen.

  • Informationen zum CLI Ausführen von finden Sie aws-mwaa-local-runnerunter GitHub.

Schritt zwei: Erstellen Sie die Datei plugins.zip

Sie können ein integriertes ZIP Archivierungsprogramm oder ein anderes ZIP Hilfsprogramm (z. B. 7zip) verwenden, um eine ZIP-Datei zu erstellen.

Anmerkung

Das integrierte ZIP-Hilfsprogramm für Windows OS fügt möglicherweise Unterordner hinzu, wenn Sie eine ZIP-Datei erstellen. Wir empfehlen, den Inhalt der Datei plugins.zip vor dem Hochladen in Ihren Amazon S3 S3-Bucket zu überprüfen, um sicherzustellen, dass keine zusätzlichen Verzeichnisse hinzugefügt wurden.

  1. Wechseln Sie zu den Verzeichnissen in Ihr lokales Airflow-Plugin-Verzeichnis. Beispielsweise:

    myproject$ cd plugins
  2. Führen Sie den folgenden Befehl aus, um sicherzustellen, dass der Inhalt über Ausführungsberechtigungen verfügt (nur macOS und Linux).

    plugins$ chmod -R 755 .
  3. Komprimieren Sie den Inhalt Ihres plugins Ordners.

    plugins$ zip -r plugins.zip .

Auf Amazon plugins.zip S3 hochladen

Sie können die Amazon S3 S3-Konsole oder die AWS Command Line Interface (AWS CLI) verwenden, um eine plugins.zip Datei in Ihren Amazon S3 S3-Bucket hochzuladen.

Mit dem AWS CLI

The AWS Command Line Interface (AWS CLI) ist ein Open-Source-Tool, mit dem Sie mithilfe von Befehlen in Ihrer Befehlszeilen-Shell mit AWS Diensten interagieren können. Um die Schritte auf dieser Seite abzuschließen, benötigen Sie Folgendes:

Zum Hochladen mit dem AWS CLI
  1. Navigieren Sie in der Befehlszeile zu dem Verzeichnis, in dem Ihre plugins.zip Datei gespeichert ist. Beispielsweise:

    cd plugins
  2. Verwenden Sie den folgenden Befehl, um alle Ihre Amazon S3 S3-Buckets aufzulisten.

    aws s3 ls
  3. Verwenden Sie den folgenden Befehl, um die Dateien und Ordner im Amazon S3 S3-Bucket für Ihre Umgebung aufzulisten.

    aws s3 ls s3://YOUR_S3_BUCKET_NAME
  4. Verwenden Sie den folgenden Befehl, um die plugins.zip Datei in den Amazon S3 S3-Bucket für Ihre Umgebung hochzuladen.

    aws s3 cp plugins.zip s3://YOUR_S3_BUCKET_NAME/plugins.zip

Verwenden der Amazon S3-Konsole

Die Amazon S3 S3-Konsole ist eine webbasierte Benutzeroberfläche, mit der Sie die Ressourcen in Ihrem Amazon S3 S3-Bucket erstellen und verwalten können.

Um mit der Amazon S3 S3-Konsole hochzuladen
  1. Öffnen Sie die Seite Umgebungen auf der MWAA Amazon-Konsole.

  2. Wählen Sie eine Umgebung aus.

  3. Wählen Sie im Bereich DAGCode im Bereich S3 den Link S3-Bucket aus, um Ihren Speicher-Bucket auf der Amazon S3 S3-Konsole zu öffnen.

  4. Klicken Sie auf Hochladen.

  5. Wählen Sie Datei hinzufügen.

  6. Wählen Sie die lokale Kopie Ihres aus plugins.zip und wählen Sie Hochladen.

Installation benutzerdefinierter Plugins in Ihrer Umgebung

In diesem Abschnitt wird beschrieben, wie Sie die benutzerdefinierten Plugins, die Sie in Ihren Amazon S3 S3-Bucket hochgeladen haben, installieren, indem Sie bei jeder Aktualisierung der ZIP-Datei den Pfad zur Datei plugins.zip und die Version der Datei plugins.zip angeben.

Angabe des Pfads zu plugins.zip auf der MWAA Amazon-Konsole (beim ersten Mal)

Wenn Sie zum ersten Mal einen plugins.zip in Ihren Amazon S3 S3-Bucket hochladen, müssen Sie auch den Pfad zu der Datei auf der MWAA Amazon-Konsole angeben. Sie müssen diesen Schritt nur einmal ausführen.

  1. Öffnen Sie die Seite Umgebungen auf der MWAA Amazon-Konsole.

  2. Wählen Sie eine Umgebung aus.

  3. Wählen Sie Edit (Bearbeiten) aus.

  4. Wählen Sie im Bereich DAGCode in Amazon S3 neben dem Feld Plugins-Datei — optional die Option S3 durchsuchen aus.

  5. Wählen Sie die plugins.zip Datei in Ihrem Amazon S3 S3-Bucket aus.

  6. Wählen Sie Choose (Auswählen) aus.

  7. Wählen Sie Weiter, Umgebung aktualisieren.

Angabe der plugins.zip Version auf der MWAA Amazon-Konsole

Sie müssen jedes Mal, wenn Sie eine neue Version Ihrer plugins.zip Datei plugins.zip in Ihren Amazon S3-Bucket hochladen, die Version Ihrer Datei auf der MWAA Amazon-Konsole angeben.

  1. Öffnen Sie die Seite Umgebungen auf der MWAA Amazon-Konsole.

  2. Wählen Sie eine Umgebung aus.

  3. Wählen Sie Edit (Bearbeiten) aus.

  4. Wählen Sie im Bereich DAGCode in Amazon S3 eine plugins.zip Version aus der Dropdownliste aus.

  5. Wählen Sie Weiter.

Beispielhafte Anwendungsfälle für plugins.zip

Als nächstes

  • Testen Sie Ihre DAGs benutzerdefinierten Plugins und Python-Abhängigkeiten lokal mit dem aws-mwaa-local-runneron GitHub.