使用 Oracle 创建自定义插件 - Amazon Managed Workflows for Apache Airflow

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用 Oracle 创建自定义插件

以下示例将引导您完成使用 Oracle for Amazon MWAA 创建自定义插件的步骤,该插件可以与 plugins.zip 文件中的其他自定义插件和二进制文件组合使用。

版本

  • 本页上的示例代码可与 Python 3.7 中的 Apache Airflow v1 一起使用。

先决条件

要使用本页上的示例代码,您需要以下内容:

权限

  • 无需其他权限即可使用本页上的代码示例。

要求

要使用本页上的示例代码,请将以下依赖项添加到 requirements.txt。要了解更多信息,请参阅 安装 Python 依赖项

Apache Airflow v2
-c https://raw.githubusercontent.com/apache/airflow/constraints-2.0.2/constraints-3.7.txt cx_Oracle apache-airflow-providers-oracle
Apache Airflow v1
cx_Oracle==8.1.0 apache-airflow[oracle]==1.10.12

代码示例

以下步骤描述了如何创建用于测试自定义插件的DAG代码。

  1. 在命令提示符下,导航到存储DAG代码的目录。例如:

    cd dags
  2. 复制以下代码示例的内容,并在本地另存为 oracle.py

    from airflow import DAG from airflow.operators.python_operator import PythonOperator from airflow.utils.dates import days_ago import os import cx_Oracle DAG_ID = os.path.basename(__file__).replace(".py", "") def testHook(**kwargs): cx_Oracle.init_oracle_client() version = cx_Oracle.clientversion() print("cx_Oracle.clientversion",version) return version with DAG(dag_id=DAG_ID, schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag: hook_test = PythonOperator( task_id="hook_test", python_callable=testHook, provide_context=True )

创建自定义插件

本节介绍如何下载依赖项、创建自定义插件和 plugins.zip。

下载依赖项

亚马逊MWAA会将 plugins.zip 的内容提取到/usr/local/airflow/plugins每个亚马逊MWAA调度程序和工作程序容器中。这用于向环境中添加二进制文件。以下步骤介绍如何组装自定义插件所需的文件。

拉取 Amazon Linux 容器镜像
  1. 在命令提示符下,提取 Amazon Linux 容器镜像,然后在本地运行该容器。例如:

    docker pull amazonlinux docker run -it amazonlinux:latest /bin/bash

    命令提示符应该调用 bash 命令行。例如:

    bash-4.2#
  2. 安装 Linux 原生异步 I/O 工具(libaio)。

    yum -y install libaio
  3. 请将此窗口保持打开状态以供后续步骤使用。我们将在本地复制以下文件:lib64/libaio.so.1lib64/libaio.so.1.0.0lib64/libaio.so.1.0.1

下载客户端文件夹
  1. 在本地安装解压缩包。例如:

    sudo yum install unzip
  2. 创建 oracle_plugin 目录。例如:

    mkdir oracle_plugin cd oracle_plugin
  3. 使用以下 curl 命令从适用于 Linux x86-64(64 位)的 Oracle 即时客户端下载中下载 instantclient-basic-linux.x64-18.5.0.0.0dbru.zip。

    curl https://download.oracle.com/otn_software/linux/instantclient/185000/instantclient-basic-linux.x64-18.5.0.0.0dbru.zip > client.zip
  4. 解压缩 client.zip 文件。例如:

    unzip *.zip
从 Docker 中提取文件
  1. 在新的命令提示符下,显示并写下 Docker 容器 ID。例如:

    docker container ls

    您的命令提示符应返回所有容器及其容器IDs。例如:

    debc16fd6970
  2. oracle_plugin 目录中,将 lib64/libaio.so.1lib64/libaio.so.1.0.0lib64/libaio.so.1.0.1 文件解压缩到本地 instantclient_18_5 文件夹。例如:

    docker cp debc16fd6970:/lib64/libaio.so.1 instantclient_18_5/ docker cp debc16fd6970:/lib64/libaio.so.1.0.0 instantclient_18_5/ docker cp debc16fd6970:/lib64/libaio.so.1.0.1 instantclient_18_5/

自定义插件

Apache Airflow 将在启动时执行插件文件夹中的 Python 文件内容。这用于设置和修改环境变量。以下步骤介绍了此自定义插件的示例代码。

  • 复制以下代码示例的内容,并在本地另存为 env_var_plugin_oracle.py

    from airflow.plugins_manager import AirflowPlugin import os os.environ["LD_LIBRARY_PATH"]='/usr/local/airflow/plugins/instantclient_18_5' os.environ["DPI_DEBUG_LEVEL"]="64" class EnvVarPlugin(AirflowPlugin): name = 'env_var_plugin'

Plugins.zip

以下步骤显示如何创建 plugins.zip。此示例的内容可以与其他插件和二进制文件合并到单个 plugins.zip 文件中。

压缩插件目录中的内容。
  1. 在命令行提示符中,导航到 oracle_plugin 目录。例如:

    cd oracle_plugin
  2. instantclient_18_5 目录压缩到 plugins.zip 中。例如:

    zip -r ../plugins.zip ./
  3. 您应该在命令提示符中看到如下内容:

    oracle_plugin$ ls client.zip instantclient_18_5
  4. 移除该 client.zip 文件。例如:

    rm client.zip
压缩 env_var_plugin_oracle.py 文件
  1. env_var_plugin_oracle.py 文件添加到 plugins.zip 文件的根目录。例如:

    zip plugins.zip env_var_plugin_oracle.py
  2. plugins.zip 现在应包含以下内容:

    env_var_plugin_oracle.py instantclient_18_5/

Airflow 配置选项

如果您使用的是 Apache Airflow v2,请添加 core.lazy_load_plugins : False 为 Apache Airflow 配置选项。要了解更多信息,请参阅 2 中的使用配置选项加载插件

接下来做什么?

  • 要了解如何将本示例中的 requirements.txt 文件上传到 Amazon S3 存储桶,请参阅 安装 Python 依赖项

  • 了解如何将本示例中的DAG代码上传到您的 Amazon S3 存储桶中的dags文件夹添加或更新 DAG

  • 要了解如何将本示例中的 plugins.zip 文件上传到 Amazon S3 存储桶,请参阅 安装自定义插件