本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
在 Aurora PostgreSQL-Compatible 上设置 Oracle UTL_FILE 功能
由 Rakesh Raghav (AWS) 和 anuradha chintha (AWS) 编写
摘要
在从 Oracle 迁移到 Amazon Web Services (AWS) 云上的 Amazon Aurora PostgreSQL 兼容版的过程中,您可能会遇到多种挑战。例如,迁移依赖于 Oracle UTL_FILE
实用程序的代码始终是一项挑战。在 Oracle PL/SQL 中,UTL_FILE
软件包与底层操作系统一起用于文件操作,例如读取和写入。该 UTL_FILE
实用程序适用于服务器和客户机系统。
Amazon Aurora PostgreSQL-Compatible 的是一款托管式数据库产品。因此,无法访问 Database Server 上的文件。此模式将引导您完成整合 Amazon Simple Storage Service (Amazon S3) 和 Amazon Aurora PostgreSQL 兼容版,以实现 UTL_FILE
功能子网。使用此集成,我们可以创建和使用文件,而无需使用第三方提取、转换、加载(ETL)工具或服务。
或者,您可以设置亚马逊 CloudWatch 监控和亚马逊 SNS 通知。
建议在生产环境中实施该解决方案之前,对方案进行全面测试。
先决条件和限制
先决条件
一个有效的 Amazon Web Services account
AWS Database Migration Service (AWS DMS) 专业知识
PL/pgSQL 编码专长
Amazon Aurora PostgreSQL-Compatible 集群
一个 S3 存储桶
限制
此模式不提供替代 Oracle UTL_FILE
实用程序的功能。但是,可以进一步增强步骤和示例代码,以实现数据库现代化目标。
产品版本
Amazon Aurora PostgreSQL-Compatible 版本 11.9。
架构
目标技术堆栈
Amazon Aurora(兼容 PostgreSQL)
Amazon CloudWatch
Amazon Simple Notification Service (Amazon SNS)
Amazon S3
目标架构
下图高度概括此解决方案。

文件将从应用程序上传至 S3 存储桶。
该
aws_s3
扩展使用 PL/pgSQL 访问数据,并将数据上传到兼容 Aurora PostgreSQL 的版本。
工具
Amazon Aurora PostgreSQL-Compatible – Amazon Aurora PostgreSQL-Compatible Edition 是一个完全托管式、兼容 PostgreSQL 和 ACID 的关系数据库引擎。您已了解了 MySQL 和 PostgreSQL 不仅具有高端商用数据库的速度和可靠性,同时还具有开源数据库的简单性和成本效益。
AWS CLI - AWS 命令行界面(AWS CLI)是用于管理 Amazon Web Services 的统一工具。只通过一个工具进行下载和配置,您就可以使用命令行控制多个 Amazon Web Services 并利用脚本来自动执行这些服务。
亚马逊 CloudWatch — 亚马逊 CloudWatch 监控亚马逊 S3 的资源和使用情况。
Amazon S3 – Amazon Simple Storage Service (Amazon S3) 是一项面向互联网的存储服务。Amazon S3 提供了一个存储层,用于接收和存储文件,以便在兼容 Aurora PostgreSQL 的集群之间使用和传输这些文件。
aws_s3 – 该
aws_s3
扩展程序集成了 Amazon S3 和 Aurora PostgreSQL-Compatible。Amazon SNS – Amazon Simple Notification Service(Amazon SNS)可协调和管理发布者和客户端之间消息的传送或发送。在这种模式中,Amazon SNS 用于发送通知。
pgAdmin
– pgAdmin 是 Postgres 的开源管理工具。pgadmin 4 提供了一个用于创建、维护和使用数据库对象的图形界面。
代码
为了实现所需功能,该模式创建了多个与 UTL_FILE
命名类似的函数。其他信息 部分包含这些函数的代码库。
在代码中,将 testaurorabucket
替换为您的 S3 存储桶名称。将 us-east-1
替换为您测试 S3 存储桶所在的 AWS 区域。
操作说明
Task | 描述 | 所需技能 |
---|---|---|
设置 IAM policy。 | 用于访问 S3 存储桶的 AWS Identity and Access Management (IAM) 角色。关于此代码,请参阅更多信息章节。 | AWS 管理员,数据库管理员 |
将 Amazon S3 访问角色添加至 Aurora PostgreSQL。 | 创建两个 IAM 角色:一个角色用于对 Amazon S3 进行读取访问,一个角色用于对 Amazon S3 进行写入访问。将角色附加到兼容 Aurora PostgreSQL 的集群。
有关更多信息,请参阅 Aurora PostgreSQL-Compatible 文档,介绍如何将数据导入和导出到 Amazon S3。 | AWS 管理员,数据库管理员 |
Task | 描述 | 所需技能 |
---|---|---|
创建 aws_commons 扩展。 |
| 数据库管理员、开发人员 |
创建 aws_s3 扩展。 |
| 数据库管理员、开发人员 |
Task | 描述 | 所需技能 |
---|---|---|
将 Amazon S3 中的数据导入到 Aurora PostgreSQL | 要测试将文件导入到 Aurora PostgreSQL 兼容版中,请创建一个示例 CSV 文件并将其上传到 S3 存储桶中。基于 CSV 文件创建表定义,然后使用 | 数据库管理员、开发人员 |
测试将文件从 Aurora PostgreSQL 导出至 Amazon S3。 | 要测试从兼容 Aurora PostgreSQL 的导出文件,请创建一个测试表,在其中填充数据,然后使用 | 数据库管理员、开发人员 |
Task | 描述 | 所需技能 |
---|---|---|
创建 utl_file_utility 架构。 | 该架构将包装器函数汇聚在一起。运行以下命令以创建 EIP。
| 数据库管理员、开发人员 |
创建 file_type 类型。 | 使用以下代码创建
| 数据库管理员/开发人员 |
创建 init 函数 |
| 数据库管理员/开发人员 |
创建包装器函数。 | 创建包装函数 | 数据库管理员、开发人员 |
Task | 描述 | 所需技能 |
---|---|---|
在写入模式测试包装器函数。 | 要在写入模式下测试包装器函数,请使用其他信息部分中提供的代码。 | 数据库管理员、开发人员 |
在追加模式下测试包装器函数。 | 要在追加模式下测试包装器函数,请使用其他信息部分中提供的代码。 | 数据库管理员、开发人员 |
相关资源
其他信息
设置 IAM policy
创建以下策略。
策略名称 | JSON |
S3 IntRead |
|
S3 IntWrite |
|
创建 init 函数
若要初始化常用变量,例如 bucket
或 region
,请使用以下代码创建 init
函数。
CREATE OR REPLACE FUNCTION utl_file_utility.init( ) RETURNS void LANGUAGE 'plpgsql' COST 100 VOLATILE AS $BODY$ BEGIN perform set_config ( format( '%s.%s','UTL_FILE_UTILITY', 'region' ) , 'us-east-1'::text , false ); perform set_config ( format( '%s.%s','UTL_FILE_UTILITY', 's3bucket' ) , 'testaurorabucket'::text , false ); END; $BODY$;
创建包装器函数
创建 fopen
、put_line
和 fclose
包装器函数。
fopen
CREATE OR REPLACE FUNCTION utl_file_utility.fopen( p_file_name character varying, p_path character varying, p_mode character DEFAULT 'W'::bpchar, OUT p_file_type utl_file_utility.file_type) RETURNS utl_file_utility.file_type LANGUAGE 'plpgsql' COST 100 VOLATILE AS $BODY$ declare v_sql character varying; v_cnt_stat integer; v_cnt integer; v_tabname character varying; v_filewithpath character varying; v_region character varying; v_bucket character varying; BEGIN /*initialize common variable */ PERFORM utl_file_utility.init(); v_region := current_setting( format( '%s.%s', 'UTL_FILE_UTILITY', 'region' ) ); v_bucket := current_setting( format( '%s.%s', 'UTL_FILE_UTILITY', 's3bucket' ) ); /* set tabname*/ v_tabname := substring(p_file_name,1,case when strpos(p_file_name,'.') = 0 then length(p_file_name) else strpos(p_file_name,'.') - 1 end ); v_filewithpath := case when NULLif(p_path,'') is null then p_file_name else concat_ws('/',p_path,p_file_name) end ; raise notice 'v_bucket %, v_filewithpath % , v_region %', v_bucket,v_filewithpath, v_region; /* APPEND MODE HANDLING; RETURN EXISTING FILE DETAILS IF PRESENT ELSE CREATE AN EMPTY FILE */ IF p_mode = 'A' THEN v_sql := concat_ws('','create temp table if not exists ', v_tabname,' (col1 text)'); execute v_sql; begin PERFORM aws_s3.table_import_from_s3 ( v_tabname, '', 'DELIMITER AS ''#''', aws_commons.create_s3_uri ( v_bucket, v_filewithpath , v_region) ); exception when others then raise notice 'File load issue ,%',sqlerrm; raise; end; execute concat_ws('','select count(*) from ',v_tabname) into v_cnt; IF v_cnt > 0 then p_file_type.p_path := p_path; p_file_type.p_file_name := p_file_name; else PERFORM aws_s3.query_export_to_s3('select ''''', aws_commons.create_s3_uri(v_bucket, v_filewithpath, v_region) ); p_file_type.p_path := p_path; p_file_type.p_file_name := p_file_name; end if; v_sql := concat_ws('','drop table ', v_tabname); execute v_sql; ELSEIF p_mode = 'W' THEN PERFORM aws_s3.query_export_to_s3('select ''''', aws_commons.create_s3_uri(v_bucket, v_filewithpath, v_region) ); p_file_type.p_path := p_path; p_file_type.p_file_name := p_file_name; END IF; EXCEPTION when others then p_file_type.p_path := p_path; p_file_type.p_file_name := p_file_name; raise notice 'fopenerror,%',sqlerrm; raise; END; $BODY$;
put_line
CREATE OR REPLACE FUNCTION utl_file_utility.put_line( p_file_name character varying, p_path character varying, p_line text, p_flag character DEFAULT 'W'::bpchar) RETURNS boolean LANGUAGE 'plpgsql' COST 100 VOLATILE AS $BODY$ /************************************************************************** * Write line, p_line in windows format to file, p_fp - with carriage return * added before new line. **************************************************************************/ declare v_sql varchar; v_ins_sql varchar; v_cnt INTEGER; v_filewithpath character varying; v_tabname character varying; v_bucket character varying; v_region character varying; BEGIN PERFORM utl_file_utility.init(); /* check if temp table already exist */ v_tabname := substring(p_file_name,1,case when strpos(p_file_name,'.') = 0 then length(p_file_name) else strpos(p_file_name,'.') - 1 end ); v_sql := concat_ws('','select count(1) FROM pg_catalog.pg_class c LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace where n.nspname like ''pg_temp_%''' ,' AND pg_catalog.pg_table_is_visible(c.oid) AND Upper(relname) = Upper( ''' , v_tabname ,''' ) '); execute v_sql into v_cnt; IF v_cnt = 0 THEN v_sql := concat_ws('','create temp table ',v_tabname,' (col text)'); execute v_sql; /* CHECK IF APPEND MODE */ IF upper(p_flag) = 'A' THEN PERFORM utl_file_utility.init(); v_region := current_setting( format( '%s.%s', 'UTL_FILE_UTILITY', 'region' ) ); v_bucket := current_setting( format( '%s.%s', 'UTL_FILE_UTILITY', 's3bucket' ) ); /* set tabname*/ v_filewithpath := case when NULLif(p_path,'') is null then p_file_name else concat_ws('/',p_path,p_file_name) end ; begin PERFORM aws_s3.table_import_from_s3 ( v_tabname, '', 'DELIMITER AS ''#''', aws_commons.create_s3_uri ( v_bucket, v_filewithpath, v_region ) ); exception when others then raise notice 'Error Message : %',sqlerrm; raise; end; END IF; END IF; /* INSERT INTO TEMP TABLE */ v_ins_sql := concat_ws('','insert into ',v_tabname,' values(''',p_line,''')'); execute v_ins_sql; RETURN TRUE; exception when others then raise notice 'Error Message : %',sqlerrm; raise; END; $BODY$;
fclose
CREATE OR REPLACE FUNCTION utl_file_utility.fclose( p_file_name character varying, p_path character varying) RETURNS boolean LANGUAGE 'plpgsql' COST 100 VOLATILE AS $BODY$ DECLARE v_filewithpath character varying; v_bucket character varying; v_region character varying; v_tabname character varying; v_sql character varying; BEGIN PERFORM utl_file_utility.init(); v_region := current_setting( format( '%s.%s', 'UTL_FILE_UTILITY', 'region' ) ); v_bucket := current_setting( format( '%s.%s', 'UTL_FILE_UTILITY', 's3bucket' ) ); v_tabname := substring(p_file_name,1,case when strpos(p_file_name,'.') = 0 then length(p_file_name) else strpos(p_file_name,'.') - 1 end ); v_filewithpath := case when NULLif(p_path,'') is null then p_file_name else concat_ws('/',p_path,p_file_name) end ; raise notice 'v_bucket %, v_filewithpath % , v_region %', v_bucket,v_filewithpath, v_region ; /* exporting to s3 */ perform aws_s3.query_export_to_s3 (concat_ws('','select * from ',v_tabname,' order by ctid asc'), aws_commons.create_s3_uri(v_bucket, v_filewithpath, v_region) ); v_sql := concat_ws('','drop table ', v_tabname); execute v_sql; RETURN TRUE; EXCEPTION when others then raise notice 'error fclose %',sqlerrm; RAISE; END; $BODY$;
测试您的设置与包装器功能
使用下面的代码示例测试设置。
测试写入模式
以下代码在 S3 存储桶中写入名为 s3inttest
的文件。
do $$ declare l_file_name varchar := 's3inttest' ; l_path varchar := 'integration_test' ; l_mode char(1) := 'W'; l_fs utl_file_utility.file_type ; l_status boolean; begin select * from utl_file_utility.fopen( l_file_name, l_path , l_mode ) into l_fs ; raise notice 'fopen : l_fs : %', l_fs; select * from utl_file_utility.put_line( l_file_name, l_path ,'this is test file:in s3bucket: for test purpose', l_mode ) into l_status ; raise notice 'put_line : l_status %', l_status; select * from utl_file_utility.fclose( l_file_name , l_path ) into l_status ; raise notice 'fclose : l_status %', l_status; end; $$
测试追加模式
以下代码将行追加到上一个测试中创建的 s3inttest
文件上。
do $$ declare l_file_name varchar := 's3inttest' ; l_path varchar := 'integration_test' ; l_mode char(1) := 'A'; l_fs utl_file_utility.file_type ; l_status boolean; begin select * from utl_file_utility.fopen( l_file_name, l_path , l_mode ) into l_fs ; raise notice 'fopen : l_fs : %', l_fs; select * from utl_file_utility.put_line( l_file_name, l_path ,'this is test file:in s3bucket: for test purpose : append 1', l_mode ) into l_status ; raise notice 'put_line : l_status %', l_status; select * from utl_file_utility.put_line( l_file_name, l_path ,'this is test file:in s3bucket : for test purpose : append 2', l_mode ) into l_status ; raise notice 'put_line : l_status %', l_status; select * from utl_file_utility.fclose( l_file_name , l_path ) into l_status ; raise notice 'fclose : l_status %', l_status; end; $$
Amazon SNS 通知
或者,您可以在 S3 存储桶上设置亚马逊 CloudWatch 监控和 Amazon SNS 通知。有关更多信息,请参阅监控 Amazon S3 和设置 Amazon SNS 通知。