Membuat plugin khusus untuk Apache Airflow PythonVirtualenvOperator - Amazon Managed Workflows for Apache Airflow (MWAA)

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Membuat plugin khusus untuk Apache Airflow PythonVirtualenvOperator

Contoh berikut menunjukkan cara menambal Apache Airflow PythonVirtualenvOperator dengan plugin khusus di Amazon Managed Workflows untuk Apache Airflow.

Versi

Prasyarat

Untuk menggunakan kode sampel di halaman ini, Anda memerlukan yang berikut:

Izin

  • Tidak diperlukan izin tambahan untuk menggunakan contoh kode di halaman ini.

Persyaratan

Untuk menggunakan kode sampel pada halaman ini, tambahkan dependensi berikut ke Anda. requirements.txt Untuk mempelajari selengkapnya, lihat Menginstal dependensi Python.

virtualenv

Kode sampel plugin kustom

Apache Airflow akan mengeksekusi isi file Python di folder plugin saat startup. Plugin ini akan menambal built-in PythonVirtualenvOperator selama proses startup itu agar kompatibel dengan AmazonMWAA. Langkah-langkah berikut menunjukkan kode sampel untuk plugin kustom.

Apache Airflow v2
  1. Di command prompt Anda, navigasikan ke plugins direktori di atas. Sebagai contoh:

    cd plugins
  2. Salin isi contoh kode berikut dan simpan secara lokal sebagaivirtual_python_plugin.py.

    """ Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. """ from airflow.plugins_manager import AirflowPlugin import airflow.utils.python_virtualenv from typing import List def _generate_virtualenv_cmd(tmp_dir: str, python_bin: str, system_site_packages: bool) -> List[str]: cmd = ['python3','/usr/local/airflow/.local/lib/python3.7/site-packages/virtualenv', tmp_dir] if system_site_packages: cmd.append('--system-site-packages') if python_bin is not None: cmd.append(f'--python={python_bin}') return cmd airflow.utils.python_virtualenv._generate_virtualenv_cmd=_generate_virtualenv_cmd class VirtualPythonPlugin(AirflowPlugin): name = 'virtual_python_plugin'
Apache Airflow v1
  1. Di command prompt Anda, navigasikan ke plugins direktori di atas. Sebagai contoh:

    cd plugins
  2. Salin isi contoh kode berikut dan simpan secara lokal sebagaivirtual_python_plugin.py.

    from airflow.plugins_manager import AirflowPlugin from airflow.operators.python_operator import PythonVirtualenvOperator def _generate_virtualenv_cmd(self, tmp_dir): cmd = ['python3','/usr/local/airflow/.local/lib/python3.7/site-packages/virtualenv', tmp_dir] if self.system_site_packages: cmd.append('--system-site-packages') if self.python_version is not None: cmd.append('--python=python{}'.format(self.python_version)) return cmd PythonVirtualenvOperator._generate_virtualenv_cmd=_generate_virtualenv_cmd class EnvVarPlugin(AirflowPlugin): name = 'virtual_python_plugin'

Plugins.zip

Langkah-langkah berikut menunjukkan cara membuatplugins.zip.

  1. Di prompt perintah Anda, arahkan ke direktori yang berisi virtual_python_plugin.py di atas. Sebagai contoh:

    cd plugins
  2. Zip konten di dalam plugins folder Anda.

    zip plugins.zip virtual_python_plugin.py

Contoh kode

Langkah-langkah berikut menjelaskan cara membuat DAG kode untuk plugin kustom.

Apache Airflow v2
  1. Di prompt perintah Anda, arahkan ke direktori tempat DAG kode Anda disimpan. Sebagai contoh:

    cd dags
  2. Salin isi contoh kode berikut dan simpan secara lokal sebagaivirtualenv_test.py.

    """ Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. """ from airflow import DAG from airflow.operators.python import PythonVirtualenvOperator from airflow.utils.dates import days_ago import os os.environ["PATH"] = os.getenv("PATH") + ":/usr/local/airflow/.local/bin" def virtualenv_fn(): import boto3 print("boto3 version ",boto3.__version__) with DAG(dag_id="virtualenv_test", schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag: virtualenv_task = PythonVirtualenvOperator( task_id="virtualenv_task", python_callable=virtualenv_fn, requirements=["boto3>=1.17.43"], system_site_packages=False, dag=dag, )
Apache Airflow v1
  1. Di prompt perintah Anda, arahkan ke direktori tempat DAG kode Anda disimpan. Sebagai contoh:

    cd dags
  2. Salin isi contoh kode berikut dan simpan secara lokal sebagaivirtualenv_test.py.

    """ Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. """ from airflow import DAG from airflow.operators.python_operator import PythonVirtualenvOperator from airflow.utils.dates import days_ago import os os.environ["PATH"] = os.getenv("PATH") + ":/usr/local/airflow/.local/bin" def virtualenv_fn(): import boto3 print("boto3 version ",boto3.__version__) with DAG(dag_id="virtualenv_test", schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag: virtualenv_task = PythonVirtualenvOperator( task_id="virtualenv_task", python_callable=virtualenv_fn, requirements=["boto3>=1.17.43"], system_site_packages=False, dag=dag, )

Opsi konfigurasi aliran udara

Jika Anda menggunakan Apache Airflow v2, tambahkan core.lazy_load_plugins : False sebagai opsi konfigurasi Apache Airflow. Untuk mempelajari lebih lanjut, lihat Menggunakan opsi konfigurasi untuk memuat plugin di 2.

Apa selanjutnya?