本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
使用 Oracle 创建自定义插件
以下示例将引导您完成使用 Oracle for Amazon MWAA 创建自定义插件的步骤,该插件可以与 plugins.zip 文件中的其他自定义插件和二进制文件组合使用。
版本
-
本页上的示例代码可与 Python 3.7
中的 Apache Airflow v1 一起使用。
先决条件
要使用本页上的示例代码,您需要以下内容:
-
为环境启用任何日志级别
CRITICAL
或更高级别的工作线程日志记录。有关 Amazon MWAA 日志类型以及如何管理日志组的更多信息,请参阅 在 Amazon 中查看气流日志 CloudWatch
权限
-
无需其他权限即可使用本页上的代码示例。
要求
要使用本页上的示例代码,请将以下依赖项添加到 requirements.txt
。要了解更多信息,请参阅 安装 Python 依赖项。
代码示例
以下步骤描述了如何创建用于测试自定义插件的DAG代码。
-
在命令提示符下,导航到存储DAG代码的目录。例如:
cd dags
-
复制以下代码示例的内容,并在本地另存为
oracle.py
。from airflow import DAG from airflow.operators.python_operator import PythonOperator from airflow.utils.dates import days_ago import os import cx_Oracle DAG_ID = os.path.basename(__file__).replace(".py", "") def testHook(**kwargs): cx_Oracle.init_oracle_client() version = cx_Oracle.clientversion() print("cx_Oracle.clientversion",version) return version with DAG(dag_id=DAG_ID, schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag: hook_test = PythonOperator( task_id="hook_test", python_callable=testHook, provide_context=True )
创建自定义插件
本节介绍如何下载依赖项、创建自定义插件和 plugins.zip。
下载依赖项
亚马逊MWAA会将 plugins.zip 的内容提取到/usr/local/airflow/plugins
每个亚马逊MWAA调度程序和工作程序容器中。这用于向环境中添加二进制文件。以下步骤介绍如何组装自定义插件所需的文件。
拉取 Amazon Linux 容器镜像
-
在命令提示符下,提取 Amazon Linux 容器镜像,然后在本地运行该容器。例如:
docker pull amazonlinux docker run -it amazonlinux:latest /bin/bash
命令提示符应该调用 bash 命令行。例如:
bash-4.2#
-
安装 Linux 原生异步 I/O 工具(libaio)。
yum -y install libaio
-
请将此窗口保持打开状态以供后续步骤使用。我们将在本地复制以下文件:
lib64/libaio.so.1
、lib64/libaio.so.1.0.0
、lib64/libaio.so.1.0.1
。
下载客户端文件夹
-
在本地安装解压缩包。例如:
sudo yum install unzip
-
创建
oracle_plugin
目录。例如:mkdir oracle_plugin cd oracle_plugin
-
使用以下 curl 命令从适用于 Linux x86-64(64 位)的 Oracle 即时客户端下载中下载 instantclient-basic-linux.x64-18.5.0.0.0dbru.zip。
curl https://download.oracle.com/otn_software/linux/instantclient/185000/instantclient-basic-linux.x64-18.5.0.0.0dbru.zip > client.zip
-
解压缩
client.zip
文件。例如:unzip *.zip
从 Docker 中提取文件
-
在新的命令提示符下,显示并写下 Docker 容器 ID。例如:
docker container ls
您的命令提示符应返回所有容器及其容器IDs。例如:
debc16fd6970
-
在
oracle_plugin
目录中,将lib64/libaio.so.1
、lib64/libaio.so.1.0.0
、lib64/libaio.so.1.0.1
文件解压缩到本地instantclient_18_5
文件夹。例如:docker cp debc16fd6970:/lib64/libaio.so.1 instantclient_18_5/ docker cp debc16fd6970:/lib64/libaio.so.1.0.0 instantclient_18_5/ docker cp debc16fd6970:/lib64/libaio.so.1.0.1 instantclient_18_5/
自定义插件
Apache Airflow 将在启动时执行插件文件夹中的 Python 文件内容。这用于设置和修改环境变量。以下步骤介绍了此自定义插件的示例代码。
-
复制以下代码示例的内容,并在本地另存为
env_var_plugin_oracle.py
。from airflow.plugins_manager import AirflowPlugin import os os.environ["LD_LIBRARY_PATH"]='/usr/local/airflow/plugins/instantclient_18_5' os.environ["DPI_DEBUG_LEVEL"]="64" class EnvVarPlugin(AirflowPlugin): name = 'env_var_plugin'
Plugins.zip
以下步骤显示如何创建 plugins.zip
。此示例的内容可以与其他插件和二进制文件合并到单个 plugins.zip
文件中。
压缩插件目录中的内容。
-
在命令行提示符中,导航到
oracle_plugin
目录。例如:cd oracle_plugin
-
将
instantclient_18_5
目录压缩到 plugins.zip 中。例如:zip -r ../plugins.zip ./
-
您应该在命令提示符中看到如下内容:
oracle_plugin$ ls client.zip instantclient_18_5
-
移除该
client.zip
文件。例如:rm client.zip
压缩 env_var_plugin_oracle.py 文件
-
将
env_var_plugin_oracle.py
文件添加到 plugins.zip 文件的根目录。例如:zip plugins.zip env_var_plugin_oracle.py
-
plugins.zip 现在应包含以下内容:
env_var_plugin_oracle.py instantclient_18_5/
Airflow 配置选项
如果您使用的是 Apache Airflow v2,请添加 core.lazy_load_plugins : False
为 Apache Airflow 配置选项。要了解更多信息,请参阅 2 中的使用配置选项加载插件。
接下来做什么?
-
要了解如何将本示例中的
requirements.txt
文件上传到 Amazon S3 存储桶,请参阅 安装 Python 依赖项。 -
了解如何将本示例中的DAG代码上传到您的 Amazon S3 存储桶中的
dags
文件夹添加或更新 DAG。 -
要了解如何将本示例中的
plugins.zip
文件上传到 Amazon S3 存储桶,请参阅 安装自定义插件。