Installation benutzerdefinierter Plugins - Amazon Managed Workflows für Apache Airflow

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Installation benutzerdefinierter Plugins

Amazon Managed Workflows for Apache Airflow unterstützt den integrierten Plugin-Manager von Apache Airflow, sodass Sie benutzerdefinierte Apache Airflow-Operatoren, Hooks, Sensoren oder Schnittstellen verwenden können. Auf dieser Seite werden die Schritte zur Installation benutzerdefinierter Apache Airflow-Plugins in Ihrer Amazon MWAA-Umgebung mithilfe einer Datei beschrieben. plugins.zip

Voraussetzungen

Sie benötigen Folgendes, bevor Sie die Schritte auf dieser Seite abschließen können.

  • Berechtigungen — Ihr AWS Konto muss von Ihrem Administrator Zugriff auf die FullConsoleAccessAmazonMWAA-Zugriffskontrollrichtlinie für Ihre Umgebung erhalten haben. Darüber hinaus muss Ihrer Amazon MWAA-Umgebung von Ihrer Ausführungsrolle der Zugriff auf die von Ihrer Umgebung verwendeten AWS Ressourcen gestattet werden.

  • Zugriff — Wenn Sie Zugriff auf öffentliche Repositorys benötigen, um Abhängigkeiten direkt auf dem Webserver zu installieren, muss Ihre Umgebung für den Zugriff auf öffentliche Netzwerk-Webserver konfiguriert sein. Weitere Informationen finden Sie unter Apache Airflow-Zugriffsmodi.

  • Amazon S3 S3-Konfiguration — Der Amazon S3 S3-Bucket, der zum Speichern Ihrer DAGs, benutzerdefinierten Plugins und Python-Abhängigkeiten verwendet wirdplugins.zip, requirements.txt muss mit geblocktem öffentlichen Zugriff und aktivierter Versionierung konfiguriert sein.

Funktionsweise

Um benutzerdefinierte Plugins in Ihrer Umgebung auszuführen, müssen Sie drei Dinge tun:

  1. Erstellen Sie lokal eine plugins.zip Datei.

  2. Laden Sie die lokale plugins.zip Datei in Ihren Amazon S3 S3-Bucket hoch.

  3. Geben Sie die Version dieser Datei im Feld Plugins-Datei auf der Amazon MWAA-Konsole an.

Anmerkung

Wenn Sie zum ersten Mal einen plugins.zip in Ihren Amazon S3 S3-Bucket hochladen, müssen Sie auch den Pfad zu der Datei auf der Amazon MWAA-Konsole angeben. Sie müssen diesen Schritt nur einmal ausführen.

Was hat sich in Version 2 geändert

  • Neu: Operatoren, Hooks und Executors. Die Import-Anweisungen in Ihren DAGs und die benutzerdefinierten Plugins, die Sie in einem MWAA plugins.zip auf Amazon angeben, haben sich zwischen Apache Airflow v1 und Apache Airflow v2 geändert. Beispielsweise wurde from airflow.contrib.hooks.aws_hook import AwsHook in Apache Airflow v1 zu Apache Airflow v2 geändert. from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook Weitere Informationen finden Sie in der Python-API-Referenz im Apache Airflow-Referenzhandbuch.

  • Neu: Importe in Plugins. Das Importieren von Operatoren, Sensoren und Hooks, die mithilfe von Plugins hinzugefügt wurden, airflow.{operators,sensors,hooks}.<plugin_name> wird nicht mehr unterstützt. Diese Erweiterungen sollten als reguläre Python-Module importiert werden. In Version 2 und höher besteht der empfohlene Ansatz darin, sie im DAG-Verzeichnis zu platzieren und eine .airflowignore-Datei zu erstellen und zu verwenden, um sie von der Analyse als DAGs auszuschließen. Weitere Informationen finden Sie unter Modulverwaltung und Erstellen eines benutzerdefinierten Operators im Apache Airflow-Referenzhandbuch.

Übersicht über benutzerdefinierte Plugins

Der integrierte Plugin-Manager von Apache Airflow kann externe Funktionen in seinen Kern integrieren, indem er Dateien einfach in einem $AIRFLOW_HOME/plugins Ordner ablegt. Es ermöglicht Ihnen, benutzerdefinierte Apache Airflow-Operatoren, Hooks, Sensoren oder Schnittstellen zu verwenden. Der folgende Abschnitt enthält ein Beispiel für flache und verschachtelte Verzeichnisstrukturen in einer lokalen Entwicklungsumgebung und die daraus resultierenden Importanweisungen, die die Verzeichnisstruktur innerhalb einer plugins.zip bestimmen.

Verzeichnis- und Größenbeschränkungen für benutzerdefinierte Plugins

Der Apache Airflow Scheduler und die Workers suchen beim Start auf dem AWS -verwalteten Fargate-Container für Ihre Umgebung unter nach benutzerdefinierten Plugins. /usr/local/airflow/plugins/*

  • Verzeichnisstruktur. Die Verzeichnisstruktur (at/*) basiert auf dem Inhalt Ihrer plugins.zip Datei. Wenn Ihr Verzeichnis beispielsweise als operators Verzeichnis der obersten Ebene plugins.zip enthält, wird das Verzeichnis in Ihre Umgebung extrahiert. /usr/local/airflow/plugins/operators

  • Größenbeschränkung. Wir empfehlen eine plugins.zip Datei mit weniger als 1 GB. Je größer eine plugins.zip Datei, desto länger ist die Startzeit in einer Umgebung. Amazon MWAA begrenzt die Größe einer plugins.zip Datei zwar nicht explizit, aber wenn Abhängigkeiten nicht innerhalb von zehn Minuten installiert werden können, führt der Fargate-Service zu einem Timeout und versucht, die Umgebung auf einen stabilen Zustand zurückzusetzen.

Anmerkung

Für Umgebungen, die Apache Airflow v1.10.12 oder Apache Airflow v2.0.2 verwenden, begrenzt Amazon MWAA den ausgehenden Datenverkehr auf dem Apache Airflow-Webserver und erlaubt Ihnen nicht, Plugins oder Python-Abhängigkeiten direkt auf dem Webserver zu installieren. Ab Apache Airflow v2.2.2 kann Amazon MWAA Plugins und Abhängigkeiten direkt auf dem Webserver installieren.

Beispiele für benutzerdefinierte Plugins

Im folgenden Abschnitt wird anhand von Beispielcode aus dem Apache Airflow-Referenzhandbuch gezeigt, wie Sie Ihre lokale Entwicklungsumgebung strukturieren können.

Beispiel für die Verwendung einer flachen Verzeichnisstruktur in plugins.zip

Apache Airflow v2

Das folgende Beispiel zeigt eine plugins.zip Datei mit einer flachen Verzeichnisstruktur für Apache Airflow v2.

Beispiel flaches Verzeichnis mit plugins.zip PythonVirtualenvOperator

Das folgende Beispiel zeigt die oberste Baumstruktur einer Datei plugins.zip für das PythonVirtualenvOperator benutzerdefinierte Plugin inEin benutzerdefiniertes Plugin für Apache Airflow erstellenPythonVirtualenvOperator.

├── virtual_python_plugin.py
Beispiel plugins/virtual_python_plugin.py

Das folgende Beispiel zeigt das PythonVirtualenvOperator benutzerdefinierte Plugin.

""" Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. """ from airflow.plugins_manager import AirflowPlugin import airflow.utils.python_virtualenv from typing import List def _generate_virtualenv_cmd(tmp_dir: str, python_bin: str, system_site_packages: bool) -> List[str]: cmd = ['python3','/usr/local/airflow/.local/lib/python3.7/site-packages/virtualenv', tmp_dir] if system_site_packages: cmd.append('--system-site-packages') if python_bin is not None: cmd.append(f'--python={python_bin}') return cmd airflow.utils.python_virtualenv._generate_virtualenv_cmd=_generate_virtualenv_cmd class VirtualPythonPlugin(AirflowPlugin): name = 'virtual_python_plugin'
Apache Airflow v1

Das folgende Beispiel zeigt eine plugins.zip Datei mit einer flachen Verzeichnisstruktur für Apache Airflow v1.

Beispiel flaches Verzeichnis mit plugins.zip PythonVirtualenvOperator

Das folgende Beispiel zeigt die oberste Baumstruktur einer Datei plugins.zip für das PythonVirtualenvOperator benutzerdefinierte Plugin inEin benutzerdefiniertes Plugin für Apache Airflow erstellenPythonVirtualenvOperator.

├── virtual_python_plugin.py
Beispiel plugins/virtual_python_plugin.py

Das folgende Beispiel zeigt das PythonVirtualenvOperator benutzerdefinierte Plugin.

from airflow.plugins_manager import AirflowPlugin from airflow.operators.python_operator import PythonVirtualenvOperator def _generate_virtualenv_cmd(self, tmp_dir): cmd = ['python3','/usr/local/airflow/.local/lib/python3.7/site-packages/virtualenv', tmp_dir] if self.system_site_packages: cmd.append('--system-site-packages') if self.python_version is not None: cmd.append('--python=python{}'.format(self.python_version)) return cmd PythonVirtualenvOperator._generate_virtualenv_cmd=_generate_virtualenv_cmd class EnvVarPlugin(AirflowPlugin): name = 'virtual_python_plugin'

Beispiel für die Verwendung einer verschachtelten Verzeichnisstruktur in plugins.zip

Apache Airflow v2

Das folgende Beispiel zeigt eine plugins.zip Datei mit separaten Verzeichnissen für hooksoperators, und einem sensors Verzeichnis für Apache Airflow v2.

Beispiel plugins.zip
__init__.py my_airflow_plugin.py hooks/ |-- __init__.py |-- my_airflow_hook.py operators/ |-- __init__.py |-- my_airflow_operator.py |-- hello_operator.py sensors/ |-- __init__.py |-- my_airflow_sensor.py

Das folgende Beispiel zeigt die Import-Anweisungen in der DAG (DAGs-Ordner), die die benutzerdefinierten Plugins verwendet.

Beispiel dags/your_dag.py
from airflow import DAG from datetime import datetime, timedelta from operators.my_airflow_operator import MyOperator from sensors.my_airflow_sensor import MySensor from operators.hello_operator import HelloOperator default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2018, 1, 1), 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), } with DAG('customdag', max_active_runs=3, schedule_interval='@once', default_args=default_args) as dag: sens = MySensor( task_id='taskA' ) op = MyOperator( task_id='taskB', my_field='some text' ) hello_task = HelloOperator(task_id='sample-task', name='foo_bar') sens >> op >> hello_task
Beispiel plugins/my_airflow_plugin.py
from airflow.plugins_manager import AirflowPlugin from hooks.my_airflow_hook import * from operators.my_airflow_operator import * class PluginName(AirflowPlugin): name = 'my_airflow_plugin' hooks = [MyHook] operators = [MyOperator] sensors = [MySensor]

Die folgenden Beispiele zeigen die einzelnen Importanweisungen, die in den benutzerdefinierten Plugin-Dateien benötigt werden.

Beispiel hooks/my_airflow_hook.py
from airflow.hooks.base import BaseHook class MyHook(BaseHook): def my_method(self): print("Hello World")
Beispiel sensors/my_airflow_sensor.py
from airflow.sensors.base import BaseSensorOperator from airflow.utils.decorators import apply_defaults class MySensor(BaseSensorOperator): @apply_defaults def __init__(self, *args, **kwargs): super(MySensor, self).__init__(*args, **kwargs) def poke(self, context): return True
Beispiel operators/my_airflow_operator.py
from airflow.operators.bash import BaseOperator from airflow.utils.decorators import apply_defaults from hooks.my_airflow_hook import MyHook class MyOperator(BaseOperator): @apply_defaults def __init__(self, my_field, *args, **kwargs): super(MyOperator, self).__init__(*args, **kwargs) self.my_field = my_field def execute(self, context): hook = MyHook('my_conn') hook.my_method()
Beispiel operators/hello_operator.py
from airflow.models.baseoperator import BaseOperator from airflow.utils.decorators import apply_defaults class HelloOperator(BaseOperator): @apply_defaults def __init__( self, name: str, **kwargs) -> None: super().__init__(**kwargs) self.name = name def execute(self, context): message = "Hello {}".format(self.name) print(message) return message

Folgen Sie den Schritten unter Testen benutzerdefinierter Plug-ins mit dem Amazon MWAA-CLI-Hilfsprogramm und dann Erstellen einer Datei plugins.zip, um den Inhalt in Ihrem plugins Verzeichnis zu komprimieren. Beispiel: cd plugins

Apache Airflow v1

Das folgende Beispiel zeigt eine plugins.zip Datei mit separaten Verzeichnissen für hooksoperators, und einem sensors Verzeichnis für Apache Airflow v1.10.12.

Beispiel plugins.zip
__init__.py my_airflow_plugin.py hooks/ |-- __init__.py |-- my_airflow_hook.py operators/ |-- __init__.py |-- my_airflow_operator.py |-- hello_operator.py sensors/ |-- __init__.py |-- my_airflow_sensor.py

Das folgende Beispiel zeigt die Import-Anweisungen in der DAG (DAGs-Ordner), die die benutzerdefinierten Plugins verwendet.

Beispiel dags/your_dag.py
from airflow import DAG from datetime import datetime, timedelta from operators.my_operator import MyOperator from sensors.my_sensor import MySensor from operators.hello_operator import HelloOperator default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2018, 1, 1), 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), } with DAG('customdag', max_active_runs=3, schedule_interval='@once', default_args=default_args) as dag: sens = MySensor( task_id='taskA' ) op = MyOperator( task_id='taskB', my_field='some text' ) hello_task = HelloOperator(task_id='sample-task', name='foo_bar') sens >> op >> hello_task
Beispiel plugins/my_airflow_plugin.py
from airflow.plugins_manager import AirflowPlugin from hooks.my_airflow_hook import * from operators.my_airflow_operator import * from utils.my_utils import * class PluginName(AirflowPlugin): name = 'my_airflow_plugin' hooks = [MyHook] operators = [MyOperator] sensors = [MySensor]

Die folgenden Beispiele zeigen die einzelnen Importanweisungen, die in den benutzerdefinierten Plugin-Dateien benötigt werden.

Beispiel hooks/my_airflow_hook.py
from airflow.hooks.base_hook import BaseHook class MyHook(BaseHook): def my_method(self): print("Hello World")
Beispiel sensors/my_airflow_sensor.py
from airflow.sensors.base_sensor_operator import BaseSensorOperator from airflow.utils.decorators import apply_defaults class MySensor(BaseSensorOperator): @apply_defaults def __init__(self, *args, **kwargs): super(MySensor, self).__init__(*args, **kwargs) def poke(self, context): return True
Beispiel operators/my_airflow_operator.py
from airflow.operators.bash_operator import BaseOperator from airflow.utils.decorators import apply_defaults from hooks.my_hook import MyHook class MyOperator(BaseOperator): @apply_defaults def __init__(self, my_field, *args, **kwargs): super(MyOperator, self).__init__(*args, **kwargs) self.my_field = my_field def execute(self, context): hook = MyHook('my_conn') hook.my_method()
Beispiel operators/hello_operator.py
from airflow.models.baseoperator import BaseOperator from airflow.utils.decorators import apply_defaults class HelloOperator(BaseOperator): @apply_defaults def __init__( self, name: str, **kwargs) -> None: super().__init__(**kwargs) self.name = name def execute(self, context): message = "Hello {}".format(self.name) print(message) return message

Folgen Sie den Schritten unter Testen benutzerdefinierter Plug-ins mit dem Amazon MWAA-CLI-Hilfsprogramm und dann Erstellen einer Datei plugins.zip, um den Inhalt in Ihrem plugins Verzeichnis zu komprimieren. Beispiel: cd plugins

Eine Datei vom Typ plugins.zip erstellen

In den folgenden Schritten werden die Schritte beschrieben, die wir empfehlen, um eine Datei plugins.zip lokal zu erstellen.

Schritt eins: Testen Sie benutzerdefinierte Plugins mit dem Amazon MWAA CLI-Hilfsprogramm

  • Das Befehlszeilenschnittstellenprogramm (CLI) repliziert eine Amazon Managed Workflows for Apache Airflow-Umgebung lokal.

  • Die CLI erstellt lokal ein Docker-Container-Image, das einem Amazon MWAA-Produktionsimage ähnelt. Auf diese Weise können Sie eine lokale Apache Airflow-Umgebung ausführen, um DAGs, benutzerdefinierte Plugins und Abhängigkeiten zu entwickeln und zu testen, bevor Sie sie auf Amazon MWAA bereitstellen.

  • Informationen zum Ausführen der CLI finden Sie aws-mwaa-local-runnerunter GitHub.

Schritt zwei: Erstellen Sie die Datei plugins.zip

Sie können ein integriertes ZIP-Archivierungsprogramm oder ein anderes ZIP-Hilfsprogramm (z. B. 7zip) verwenden, um eine ZIP-Datei zu erstellen.

Anmerkung

Das integrierte ZIP-Hilfsprogramm für Windows OS fügt möglicherweise Unterordner hinzu, wenn Sie eine ZIP-Datei erstellen. Wir empfehlen, den Inhalt der Datei plugins.zip vor dem Hochladen in Ihren Amazon S3 S3-Bucket zu überprüfen, um sicherzustellen, dass keine zusätzlichen Verzeichnisse hinzugefügt wurden.

  1. Wechseln Sie zu den Verzeichnissen in Ihr lokales Airflow-Plugin-Verzeichnis. Beispiel:

    myproject$ cd plugins
  2. Führen Sie den folgenden Befehl aus, um sicherzustellen, dass der Inhalt über Ausführungsberechtigungen verfügt (nur macOS und Linux).

    plugins$ chmod -R 755 .
  3. Komprimieren Sie den Inhalt Ihres plugins Ordners.

    plugins$ zip -r plugins.zip .

Auf Amazon plugins.zip S3 hochladen

Sie können die Amazon S3 S3-Konsole oder die AWS Command Line Interface (AWS CLI) verwenden, um eine plugins.zip Datei in Ihren Amazon S3 S3-Bucket hochzuladen.

Verwenden der AWS CLI

Die AWS Command Line Interface (AWS CLI) ist ein Open-Source-Tool, mit dem Sie über Befehle in Ihrer Befehlszeilen-Shell mit den AWS-Services interagieren können. Um die Schritte auf dieser Seite abzuschließen, benötigen Sie Folgendes:

Zum Hochladen mit dem AWS CLI
  1. Navigieren Sie in der Befehlszeile zu dem Verzeichnis, in dem Ihre plugins.zip Datei gespeichert ist. Beispiel:

    cd plugins
  2. Verwenden Sie den folgenden Befehl, um alle Ihre Amazon S3 S3-Buckets aufzulisten.

    aws s3 ls
  3. Verwenden Sie den folgenden Befehl, um die Dateien und Ordner im Amazon S3 S3-Bucket für Ihre Umgebung aufzulisten.

    aws s3 ls s3://YOUR_S3_BUCKET_NAME
  4. Verwenden Sie den folgenden Befehl, um die plugins.zip Datei in den Amazon S3 S3-Bucket für Ihre Umgebung hochzuladen.

    aws s3 cp plugins.zip s3://YOUR_S3_BUCKET_NAME/plugins.zip

Verwenden der Amazon S3-Konsole

Die Amazon S3 S3-Konsole ist eine webbasierte Benutzeroberfläche, mit der Sie die Ressourcen in Ihrem Amazon S3 S3-Bucket erstellen und verwalten können.

Um mit der Amazon S3 S3-Konsole hochzuladen
  1. Öffnen Sie die Seite Umgebungen auf der Amazon MWAA-Konsole.

  2. Wählen Sie eine Umgebung aus.

  3. Wählen Sie im Bereich DAG-Code im Bereich S3 den Link S3-Bucket aus, um Ihren Speicher-Bucket auf der Amazon S3 S3-Konsole zu öffnen.

  4. Klicken Sie auf Hochladen.

  5. Wählen Sie Datei hinzufügen.

  6. Wählen Sie die lokale Kopie Ihres aus plugins.zip und wählen Sie Hochladen.

Installation benutzerdefinierter Plugins in Ihrer Umgebung

In diesem Abschnitt wird beschrieben, wie Sie die benutzerdefinierten Plugins, die Sie in Ihren Amazon S3 S3-Bucket hochgeladen haben, installieren, indem Sie bei jeder Aktualisierung der ZIP-Datei den Pfad zur Datei plugins.zip und die Version der Datei plugins.zip angeben.

Angeben des Pfads zu plugins.zip auf der Amazon MWAA-Konsole (beim ersten Mal)

Wenn Sie zum ersten Mal einen plugins.zip in Ihren Amazon S3 S3-Bucket hochladen, müssen Sie auch den Pfad zu der Datei auf der Amazon MWAA-Konsole angeben. Sie müssen diesen Schritt nur einmal ausführen.

  1. Öffnen Sie die Seite Umgebungen auf der Amazon MWAA-Konsole.

  2. Wählen Sie eine Umgebung aus.

  3. Wählen Sie Bearbeiten aus.

  4. Wählen Sie im Bereich DAG-Code in Amazon S3 neben dem Feld Plugins-Datei — optional die Option S3 durchsuchen aus.

  5. Wählen Sie die plugins.zip Datei in Ihrem Amazon S3 S3-Bucket aus.

  6. Wählen Sie Choose (Auswählen) aus.

  7. Wählen Sie Weiter, Umgebung aktualisieren.

Angabe der plugins.zip Version auf der Amazon MWAA-Konsole

Sie müssen die Version Ihrer plugins.zip Datei auf der Amazon MWAA-Konsole jedes Mal angeben, wenn Sie eine neue Version Ihrer Datei plugins.zip in Ihren Amazon S3 S3-Bucket hochladen.

  1. Öffnen Sie die Seite Umgebungen auf der Amazon MWAA-Konsole.

  2. Wählen Sie eine Umgebung aus.

  3. Wählen Sie Bearbeiten aus.

  4. Wählen Sie im Bereich DAG-Code in Amazon S3 eine plugins.zip Version aus der Dropdownliste aus.

  5. Wählen Sie Weiter.

Beispielhafte Anwendungsfälle für plugins.zip

Als nächstes

  • Testen Sie Ihre DAGs, benutzerdefinierten Plugins und Python-Abhängigkeiten lokal mit dem aws-mwaa-local-runneron GitHub.