在 Aurora PostgreSQL-Compatible 上设置 Oracle UTL_FILE 功能 - AWS Prescriptive Guidance

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

在 Aurora PostgreSQL-Compatible 上设置 Oracle UTL_FILE 功能

由 Rakesh Raghav (AWS) 和 anuradha chintha (AWS) 编写

摘要

在从 Oracle 迁移到 Amazon Web Services (AWS) 云上的 Amazon Aurora PostgreSQL 兼容版的过程中,您可能会遇到多种挑战。例如,迁移依赖于 Oracle UTL_FILE 实用程序的代码始终是一项挑战。在 Oracle PL/SQL 中,UTL_FILE 软件包与底层操作系统一起用于文件操作,例如读取和写入。该 UTL_FILE 实用程序适用于服务器和客户机系统。 

Amazon Aurora PostgreSQL-Compatible 的是一款托管式数据库产品。因此,无法访问 Database Server 上的文件。此模式将引导您完成整合 Amazon Simple Storage Service (Amazon S3) 和 Amazon Aurora PostgreSQL 兼容版,以实现 UTL_FILE 功能子网。使用此集成,我们可以创建和使用文件,而无需使用第三方提取、转换、加载(ETL)工具或服务。

或者,您可以设置亚马逊 CloudWatch 监控和亚马逊 SNS 通知。

建议在生产环境中实施该解决方案之前,对方案进行全面测试。

先决条件和限制

先决条件

  • 一个有效的 Amazon Web Services account

  • AWS Database Migration Service (AWS DMS) 专业知识

  • PL/pgSQL 编码专长

  • Amazon Aurora PostgreSQL-Compatible 集群

  • 一个 S3 存储桶

限制

此模式不提供替代 Oracle UTL_FILE 实用程序的功能。但是,可以进一步增强步骤和示例代码,以实现数据库现代化目标。

产品版本

  • Amazon Aurora PostgreSQL-Compatible 版本 11.9。

架构

目标技术堆栈

  • Amazon Aurora(兼容 PostgreSQL)

  • Amazon CloudWatch

  • Amazon Simple Notification Service (Amazon SNS)

  • Amazon S3

目标架构

下图高度概括此解决方案。

数据文件上传至 S3 存储桶,使用 aws_s3 扩展进行处理,然后发送到 Aurora 实例。
  1. 文件将从应用程序上传至 S3 存储桶。

  2. aws_s3 扩展使用 PL/pgSQL 访问数据,并将数据上传到兼容 Aurora PostgreSQL 的版本。

工具

  • Amazon Aurora PostgreSQL-Compatible – Amazon Aurora PostgreSQL-Compatible Edition 是一个完全托管式、兼容 PostgreSQL 和 ACID 的关系数据库引擎。您已了解了 MySQL 和 PostgreSQL 不仅具有高端商用数据库的速度和可靠性,同时还具有开源数据库的简单性和成本效益。

  • AWS CLI - AWS 命令行界面(AWS CLI)是用于管理 Amazon Web Services 的统一工具。只通过一个工具进行下载和配置,您就可以使用命令行控制多个 Amazon Web Services 并利用脚本来自动执行这些服务。

  • 亚马逊 CloudWatch — 亚马逊 CloudWatch 监控亚马逊 S3 的资源和使用情况。

  • Amazon S3 – Amazon Simple Storage Service (Amazon S3) 是一项面向互联网的存储服务。Amazon S3 提供了一个存储层,用于接收和存储文件,以便在兼容 Aurora PostgreSQL 的集群之间使用和传输这些文件。

  • aws_s3 – 该 aws_s3 扩展程序集成了 Amazon S3 和 Aurora PostgreSQL-Compatible。

  • Amazon SNS – Amazon Simple Notification Service(Amazon SNS)可协调和管理发布者和客户端之间消息的传送或发送。在这种模式中,Amazon SNS 用于发送通知。

  • pgAdmin – pgAdmin 是 Postgres 的开源管理工具。pgadmin 4 提供了一个用于创建、维护和使用数据库对象的图形界面。

代码

为了实现所需功能,该模式创建了多个与 UTL_FILE 命名类似的函数。其他信息 部分包含这些函数的代码库。

在代码中,将 testaurorabucket 替换为您的 S3 存储桶名称。将 us-east-1 替换为您测试 S3 存储桶所在的 AWS 区域。

操作说明

Task描述所需技能
设置 IAM policy。

用于访问 S3 存储桶的 AWS Identity and Access Management (IAM) 角色。关于此代码,请参阅更多信息章节。

AWS 管理员,数据库管理员
将 Amazon S3 访问角色添加至 Aurora PostgreSQL。

创建两个 IAM 角色:一个角色用于对 Amazon S3 进行读取访问,一个角色用于对 Amazon S3 进行写入访问。将角色附加到兼容 Aurora PostgreSQL 的集群。 

  • S3Export 功能只有一个角色

  • S3Import 功能只有一个角色

有关更多信息,请参阅 Aurora PostgreSQL-Compatible 文档,介绍如何将数据导入导出到 Amazon S3。

AWS 管理员,数据库管理员
Task描述所需技能
创建 aws_commons 扩展。

aws_commons 扩展是 aws_s3 扩展的依赖项。

数据库管理员、开发人员
创建 aws_s3 扩展。

aws_s3 扩展程序与 Amazon S3 交互。

数据库管理员、开发人员
Task描述所需技能
将 Amazon S3 中的数据导入到 Aurora PostgreSQL

要测试将文件导入到 Aurora PostgreSQL 兼容版中,请创建一个示例 CSV 文件并将其上传到 S3 存储桶中。基于 CSV 文件创建表定义,然后使用 aws_s3.table_import_from_s3 函数将文件加载到表中。

数据库管理员、开发人员
测试将文件从 Aurora PostgreSQL 导出至 Amazon S3。

要测试从兼容 Aurora PostgreSQL 的导出文件,请创建一个测试表,在其中填充数据,然后使用 aws_s3.query_export_to_s3 函数导出数据。

数据库管理员、开发人员
Task描述所需技能
创建 utl_file_utility 架构。

该架构将包装器函数汇聚在一起。运行以下命令以创建 EIP。

CREATE SCHEMA utl_file_utility;
数据库管理员、开发人员
创建 file_type 类型。

使用以下代码创建 file_type 类型。

CREATE TYPE utl_file_utility.file_type AS (     p_path character varying(30),     p_file_name character varying );
数据库管理员/开发人员
创建 init 函数

init 函数初始化公共变量,例如 bucketregion。关于此代码,请参阅更多信息章节。

数据库管理员/开发人员
创建包装器函数。

创建包装函数 fopenput_linefclose。关于代码,请参阅更多信息章节。

数据库管理员、开发人员
Task描述所需技能
在写入模式测试包装器函数。

要在写入模式下测试包装器函数,请使用其他信息部分中提供的代码。

数据库管理员、开发人员
在追加模式下测试包装器函数。

要在追加模式下测试包装器函数,请使用其他信息部分中提供的代码。

数据库管理员、开发人员

相关资源

其他信息

设置 IAM policy

创建以下策略。

策略名称

JSON

S3 IntRead

{     "Version": "2012-10-17",     "Statement": [         {             "Sid": "S3integrationtest",             "Effect": "Allow",             "Action": [                 "s3:GetObject",                 "s3:ListBucket"             ],             "Resource": [          "arn:aws:s3:::testaurorabucket/*",          "arn:aws:s3:::testaurorabucket"             ]         }     ] }

S3 IntWrite

{     "Version": "2012-10-17",     "Statement": [         {             "Sid": "S3integrationtest",             "Effect": "Allow",             "Action": [                 "s3:PutObject",                                 "s3:ListBucket"             ],             "Resource": [                "arn:aws:s3:::testaurorabucket/*",                "arn:aws:s3:::testaurorabucket"             ]         }     ] }

创建 init 函数

若要初始化常用变量,例如 bucketregion,请使用以下代码创建 init 函数。

CREATE OR REPLACE FUNCTION utl_file_utility.init(     )     RETURNS void     LANGUAGE 'plpgsql'     COST 100     VOLATILE AS $BODY$ BEGIN       perform set_config       ( format( '%s.%s','UTL_FILE_UTILITY', 'region' )       , 'us-east-1'::text       , false );       perform set_config       ( format( '%s.%s','UTL_FILE_UTILITY', 's3bucket' )       , 'testaurorabucket'::text       , false ); END; $BODY$;

创建包装器函数

创建 fopenput_linefclose 包装器函数。

fopen

CREATE OR REPLACE FUNCTION utl_file_utility.fopen(     p_file_name character varying,     p_path character varying,     p_mode character DEFAULT 'W'::bpchar,     OUT p_file_type utl_file_utility.file_type)     RETURNS utl_file_utility.file_type     LANGUAGE 'plpgsql'     COST 100     VOLATILE AS $BODY$ declare     v_sql character varying;     v_cnt_stat integer;     v_cnt integer;     v_tabname character varying;     v_filewithpath character varying;     v_region character varying;     v_bucket character varying; BEGIN     /*initialize common variable */     PERFORM utl_file_utility.init();     v_region := current_setting( format( '%s.%s', 'UTL_FILE_UTILITY', 'region' ) );     v_bucket :=  current_setting( format( '%s.%s', 'UTL_FILE_UTILITY', 's3bucket' ) );         /* set tabname*/     v_tabname := substring(p_file_name,1,case when strpos(p_file_name,'.') = 0 then length(p_file_name) else strpos(p_file_name,'.') - 1 end );     v_filewithpath := case when NULLif(p_path,'') is null then p_file_name else concat_ws('/',p_path,p_file_name) end ;     raise notice 'v_bucket %, v_filewithpath % , v_region %', v_bucket,v_filewithpath, v_region;         /* APPEND MODE HANDLING; RETURN EXISTING FILE DETAILS IF PRESENT ELSE CREATE AN EMPTY FILE */     IF p_mode = 'A' THEN         v_sql := concat_ws('','create temp table if not exists ', v_tabname,' (col1 text)');         execute v_sql;         begin         PERFORM aws_s3.table_import_from_s3             ( v_tabname,             '',               'DELIMITER AS ''#''',             aws_commons.create_s3_uri             (     v_bucket,                 v_filewithpath ,                 v_region)             );         exception             when others then              raise notice 'File load issue ,%',sqlerrm;              raise;         end;         execute concat_ws('','select count(*) from ',v_tabname) into v_cnt;         IF v_cnt > 0         then             p_file_type.p_path := p_path;             p_file_type.p_file_name := p_file_name;         else                     PERFORM aws_s3.query_export_to_s3('select ''''',                             aws_commons.create_s3_uri(v_bucket, v_filewithpath, v_region)                                           );             p_file_type.p_path := p_path;             p_file_type.p_file_name := p_file_name;                 end if;         v_sql := concat_ws('','drop table ', v_tabname);                 execute v_sql;                 ELSEIF p_mode = 'W' THEN             PERFORM aws_s3.query_export_to_s3('select ''''',                             aws_commons.create_s3_uri(v_bucket, v_filewithpath, v_region)                                           );             p_file_type.p_path := p_path;             p_file_type.p_file_name := p_file_name;     END IF;         EXCEPTION         when others then             p_file_type.p_path := p_path;             p_file_type.p_file_name := p_file_name;             raise notice 'fopenerror,%',sqlerrm;             raise; END; $BODY$;

put_line

CREATE OR REPLACE FUNCTION utl_file_utility.put_line(     p_file_name character varying,     p_path character varying,     p_line text,     p_flag character DEFAULT 'W'::bpchar)     RETURNS boolean     LANGUAGE 'plpgsql'     COST 100     VOLATILE AS $BODY$ /************************************************************************** * Write line, p_line in windows format to file, p_fp - with carriage return * added before new line. **************************************************************************/ declare     v_sql varchar;     v_ins_sql varchar;     v_cnt INTEGER;     v_filewithpath character varying;     v_tabname  character varying;     v_bucket character varying;     v_region character varying;     BEGIN  PERFORM utl_file_utility.init(); /* check if temp table already exist */  v_tabname := substring(p_file_name,1,case when strpos(p_file_name,'.') = 0 then length(p_file_name) else strpos(p_file_name,'.') - 1 end );  v_sql := concat_ws('','select count(1) FROM pg_catalog.pg_class c LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace where n.nspname like ''pg_temp_%'''                          ,' AND pg_catalog.pg_table_is_visible(c.oid) AND Upper(relname) = Upper( '''                          ,  v_tabname ,''' ) ');    execute v_sql into v_cnt;     IF v_cnt = 0 THEN          v_sql := concat_ws('','create temp table ',v_tabname,' (col text)');         execute v_sql;         /* CHECK IF APPEND MODE */         IF upper(p_flag) = 'A' THEN             PERFORM utl_file_utility.init();                                     v_region := current_setting( format( '%s.%s', 'UTL_FILE_UTILITY', 'region' ) );             v_bucket :=  current_setting( format( '%s.%s', 'UTL_FILE_UTILITY', 's3bucket' ) );                         /* set tabname*/                         v_filewithpath := case when NULLif(p_path,'') is null then p_file_name else concat_ws('/',p_path,p_file_name) end ;                                     begin                PERFORM aws_s3.table_import_from_s3                      ( v_tabname,                           '',                          'DELIMITER AS ''#''',                         aws_commons.create_s3_uri                            ( v_bucket,                                v_filewithpath,                                v_region    )                     );             exception                 when others then                     raise notice  'Error Message : %',sqlerrm;                     raise;             end;             END IF;         END IF;     /* INSERT INTO TEMP TABLE */                   v_ins_sql := concat_ws('','insert into ',v_tabname,' values(''',p_line,''')');     execute v_ins_sql;     RETURN TRUE;     exception             when others then                 raise notice  'Error Message : %',sqlerrm;                 raise; END; $BODY$;

fclose

CREATE OR REPLACE FUNCTION utl_file_utility.fclose(     p_file_name character varying,     p_path character varying)     RETURNS boolean     LANGUAGE 'plpgsql'     COST 100     VOLATILE AS $BODY$ DECLARE     v_filewithpath character varying;     v_bucket character varying;     v_region character varying;     v_tabname character varying; v_sql character varying; BEGIN       PERFORM utl_file_utility.init();       v_region := current_setting( format( '%s.%s', 'UTL_FILE_UTILITY', 'region' ) );     v_bucket :=  current_setting( format( '%s.%s', 'UTL_FILE_UTILITY', 's3bucket' ) );     v_tabname := substring(p_file_name,1,case when strpos(p_file_name,'.') = 0 then length(p_file_name) else strpos(p_file_name,'.') - 1 end );     v_filewithpath := case when NULLif(p_path,'') is null then p_file_name else concat_ws('/',p_path,p_file_name) end ;     raise notice 'v_bucket %, v_filewithpath % , v_region %', v_bucket,v_filewithpath, v_region ;         /* exporting to s3 */     perform aws_s3.query_export_to_s3         (concat_ws('','select * from ',v_tabname,'  order by ctid asc'),             aws_commons.create_s3_uri(v_bucket, v_filewithpath, v_region)         );    v_sql := concat_ws('','drop table ', v_tabname);     execute v_sql;        RETURN TRUE; EXCEPTION        when others then      raise notice 'error fclose %',sqlerrm;      RAISE; END; $BODY$;

测试您的设置与包装器功能

使用下面的代码示例测试设置。

测试写入模式

以下代码在 S3 存储桶中写入名为 s3inttest 的文件。

do $$ declare l_file_name varchar := 's3inttest' ; l_path varchar := 'integration_test' ; l_mode char(1) := 'W'; l_fs utl_file_utility.file_type ; l_status boolean; begin select * from utl_file_utility.fopen( l_file_name, l_path , l_mode ) into l_fs ; raise notice 'fopen : l_fs : %', l_fs; select * from utl_file_utility.put_line( l_file_name, l_path ,'this is test file:in s3bucket: for test purpose', l_mode ) into l_status ; raise notice 'put_line : l_status %', l_status; select * from utl_file_utility.fclose( l_file_name , l_path ) into l_status ; raise notice 'fclose : l_status %', l_status; end; $$

测试追加模式

以下代码将行追加到上一个测试中创建的 s3inttest 文件上。

do $$ declare l_file_name varchar := 's3inttest' ; l_path varchar := 'integration_test' ; l_mode char(1) := 'A'; l_fs utl_file_utility.file_type ; l_status boolean; begin select * from utl_file_utility.fopen( l_file_name, l_path , l_mode ) into l_fs ; raise notice 'fopen : l_fs : %', l_fs; select * from utl_file_utility.put_line( l_file_name, l_path ,'this is test file:in s3bucket: for test purpose : append 1', l_mode ) into l_status ; raise notice 'put_line : l_status %', l_status; select * from utl_file_utility.put_line( l_file_name, l_path ,'this is test file:in s3bucket : for test purpose : append 2', l_mode ) into l_status ; raise notice 'put_line : l_status %', l_status; select * from utl_file_utility.fclose( l_file_name , l_path ) into l_status ; raise notice 'fclose : l_status %', l_status; end; $$

Amazon SNS 通知

或者,您可以在 S3 存储桶上设置亚马逊 CloudWatch 监控和 Amazon SNS 通知。有关更多信息,请参阅监控 Amazon S3设置 Amazon SNS 通知