使用 Oracle 創建一個自定義插件 - Amazon Managed Workflows for Apache Airflow

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

使用 Oracle 創建一個自定義插件

以下範例會引導您完成使用 Oracle to Amazon 建立自訂外掛程式的步驟,MWAA並且可以與 plugins.zip 檔案中的其他自訂外掛程式和二進位檔案結合使用。

版本

  • 此頁面上的示例代碼可以與 Apache 氣流 V1Python 3.7 中使用。

  • 您可以使用此頁面上的代碼示例與 Python 3.10 中的阿帕奇氣流 V2

必要條件

若要使用此頁面上的範例程式碼,您需要下列項目:

許可

  • 使用此頁面上的程式碼範例不需要其他權限。

要求

若要使用此頁面上的範例程式碼,請將下列相依性新增至requirements.txt. 如需進一步了解,請參閱 安裝 Python 相依性

Apache Airflow v2
-c https://raw.githubusercontent.com/apache/airflow/constraints-2.0.2/constraints-3.7.txt cx_Oracle apache-airflow-providers-oracle
Apache Airflow v1
cx_Oracle==8.1.0 apache-airflow[oracle]==1.10.12

範例程式碼

下列步驟說明如何建立將測試自訂外掛程式的程式DAG碼。

  1. 在命令提示符中,導航到存儲DAG代碼的目錄。例如:

    cd dags
  2. 複製下列程式碼範例的內容,並在本機儲存為oracle.py

    from airflow import DAG from airflow.operators.python_operator import PythonOperator from airflow.utils.dates import days_ago import os import cx_Oracle DAG_ID = os.path.basename(__file__).replace(".py", "") def testHook(**kwargs): cx_Oracle.init_oracle_client() version = cx_Oracle.clientversion() print("cx_Oracle.clientversion",version) return version with DAG(dag_id=DAG_ID, schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag: hook_test = PythonOperator( task_id="hook_test", python_callable=testHook, provide_context=True )

創建自定義插件

本節介紹如何下載依賴關係,創建自定義插件和 plugins.zip。

下載相依性

Amazon MWAA 將/usr/local/airflow/plugins在每個 Amazon MWAA 調度程序和工作容器上提取 plugins.zip 的內容。這是用來將二進製文件添加到您的環境中。下列步驟說明如何組合自訂外掛程式所需的檔案。

拉 Amazon Linux 容器映像
  1. 在命令提示字元中,提取 Amazon Linux 容器映像,然後在本機執行容器。例如:

    docker pull amazonlinux docker run -it amazonlinux:latest /bin/bash

    您的命令提示符應調用 bash 命令行。例如:

    bash-4.2#
  2. 安裝 Linux 原生非同步 I/O 設備 (利用)。

    yum -y install libaio
  3. 保持此視窗開啟,以便後續步驟使用。我們將在本機複製下列檔案:lib64/libaio.so.1lib64/libaio.so.1.0.0lib64/libaio.so.1.0.1

下載客戶資料夾
  1. 在本機安裝解壓縮套件。例如:

    sudo yum install unzip
  2. 建立 oracle_plugin 目錄。例如:

    mkdir oracle_plugin cd oracle_plugin
  3. 使用下列 curl 指令,從 Oracle 即時用戶端下載中下載適用於 Linux x86-64 (64 位元) 下載 instantclient-basic-linux.x64-18.5.0.0.0dBR.zip。

    curl https://download.oracle.com/otn_software/linux/instantclient/185000/instantclient-basic-linux.x64-18.5.0.0.0dbru.zip > client.zip
  4. 解壓縮 client.zip 檔案。例如:

    unzip *.zip
從碼頭提取文件
  1. 在新的命令提示字元中,顯示並記下您的 Docker 容器 ID。例如:

    docker container ls

    您的命令提示符應返回所有容器及其IDs. 例如:

    debc16fd6970
  2. 在您的oracle_plugin目錄中,將lib64/libaio.so.1lib64/libaio.so.1.0.0lib64/libaio.so.1.0.1檔案解壓縮至本機instantclient_18_5資料夾。例如:

    docker cp debc16fd6970:/lib64/libaio.so.1 instantclient_18_5/ docker cp debc16fd6970:/lib64/libaio.so.1.0.0 instantclient_18_5/ docker cp debc16fd6970:/lib64/libaio.so.1.0.1 instantclient_18_5/

自定義插件

阿帕奇氣流將在啟動時執行插件文件夾中的 Python 文件的內容。這是用來設置和修改環境變量。下列步驟說明自訂外掛程式的範例程式碼。

  • 複製下列程式碼範例的內容,並在本機儲存為env_var_plugin_oracle.py

    from airflow.plugins_manager import AirflowPlugin import os os.environ["LD_LIBRARY_PATH"]='/usr/local/airflow/plugins/instantclient_18_5' os.environ["DPI_DEBUG_LEVEL"]="64" class EnvVarPlugin(AirflowPlugin): name = 'env_var_plugin'

Plugins.zip

下列步驟顯示如何建立plugins.zip. 此範例的內容可以與其他外掛程式和二進位檔案結合成單一plugins.zip檔案。

壓縮插件目錄的內容
  1. 在命令提示字元中,導覽至目oracle_plugin錄。例如:

    cd oracle_plugin
  2. 壓縮 plugins.zip 中的instantclient_18_5目錄。例如:

    zip -r ../plugins.zip ./
  3. 您應該在命令提示符中看到以下內容:

    oracle_plugin$ ls client.zip instantclient_18_5
  4. 移除client.zip檔案。例如:

    rm client.zip
壓縮 env_var_plugin_oracle.py 文件
  1. env_var_plugin_oracle.py檔案新增至 plugins.zip 的根目錄。例如:

    zip plugins.zip env_var_plugin_oracle.py
  2. 您的 plugins.zip 現在應該包含以下內容:

    env_var_plugin_oracle.py instantclient_18_5/

氣流組態選項

如果您使用的是 Apache 氣流 v2,請添加core.lazy_load_plugins : False為 Apache 氣流配置選項。若要深入瞭解,請參閱使用設定選項載入外掛程式 2

後續步驟?

  • 在中了解如何將此範例中的requirements.txt檔案上傳到您的 Amazon S3 儲存貯體安裝 Python 相依性

  • 了解如何將此範例中的DAG程式碼上傳到的 Amazon S3 儲存貯體中的dags資料夾新增或更新 DAG

  • 在本範例中進一步了解如何將plugins.zip檔案上傳到中的 Amazon S3 儲存貯體安裝自定義插件