在 Aurora Postgre 上設定 Oracle UTL_FILE 功能 SQL- 相容 - AWS 方案指引

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

在 Aurora Postgre 上設定 Oracle UTL_FILE 功能 SQL- 相容

由 Rakesh Raghav (AWS) 和 anuradha chintha (AWS) 建立

環境:PoC 或試行

來源:Oracle

目標:Aurora PostgreSQL

R 類型:重新架構

工作負載:Oracle

技術:遷移;基礎設施;資料庫

AWS 服務:Amazon S3;Amazon Aurora

Summary

作為從 Oracle 遷移到 Amazon Web Services (AWS) Cloud 上 Amazon Aurora Postgre SQL-Compatible Edition 的一部分,您可能會遇到多個挑戰。例如,遷移依賴 Oracle UTL_FILE公用程式的程式碼始終是一項挑戰。在 Oracle PL/ 中SQL,UTL_FILE套件會搭配基礎作業系統用於檔案操作,例如讀取和寫入。此UTL_FILE公用程式適用於伺服器和用戶端機器系統。 

Amazon Aurora Postgre SQL-Compatible 是受管資料庫產品。因此,無法存取資料庫伺服器上的檔案。此模式會逐步引導您完成 Amazon Simple Storage Service (Amazon S3) 和 Amazon Aurora Postgre 的整合 SQL- 相容以實現功能子集UTL_FILE。使用此整合,我們可以建立和使用檔案,而無需使用第三方擷取、轉換和載入 (ETL) 工具或服務。

或者,您可以設定 Amazon CloudWatch 監控和 Amazon SNS通知。

我們建議在生產環境中實作此解決方案之前,先徹底測試此解決方案。

先決條件和限制

先決條件

  • 作用中AWS帳戶

  • AWS 資料庫遷移服務 (AWSDMS) 專業知識

  • PL/pgSQL 編碼的專業知識

  • Amazon Aurora Postgre SQL- 相容叢集

  • S3 儲存貯體

限制

此模式不提供可取代 Oracle UTL_FILE公用程式的功能。不過,您可以進一步增強步驟和範例程式碼,以實現資料庫現代化目標。

產品版本

  • Amazon Aurora Postgre SQL- 相容版本 11.9

架構

目標技術堆疊

  • Amazon Aurora Postgre SQL- 相容

  • Amazon CloudWatch

  • Amazon Simple Notification Service (Amazon SNS)

  • Amazon S3

目標架構

下圖顯示解決方案的高階表示法。

資料檔案會上傳到 S3 儲存貯體,使用 aws_s3 擴充功能處理,然後傳送至 Aurora 執行個體。
  1. 檔案會從應用程式上傳到 S3 儲存貯體。

  2. aws_s3 擴充功能會使用 PL/pg 存取資料SQL,並將資料上傳至 Aurora Postgre SQL-Compatible。

工具

  • Amazon Aurora Postgre SQL-Compatible – Amazon Aurora Postgre SQL-Compatible Edition 是完全受管、Postgre SQL相容且ACID合規的關聯式資料庫引擎。它結合了高階商業資料庫的速度和可靠性,以及開放原始碼資料庫的成本效益。

  • AWS CLI – AWS命令列介面 (AWS CLI) 是管理 AWS服務的統一工具。只需一個工具即可下載和設定,您可以從命令列控制多個AWS服務,並透過指令碼將其自動化。

  • Amazon CloudWatch – Amazon CloudWatch 會監控 Amazon S3 資源和使用。

  • Amazon S3 – Amazon Simple Storage Service (Amazon S3) 是網際網路的儲存體。在此模式中,Amazon S3 提供儲存層,用於接收和儲存檔案,以供使用和往返 Aurora Postgre SQL相容叢集傳輸。

  • aws_s3aws_s3延伸模組整合 Amazon S3 和 Aurora Postgre SQL-相容。

  • Amazon SNS – Amazon Simple Notification Service (Amazon SNS) 會協調和管理發佈者和用戶端之間的訊息傳遞或傳送。在此模式中,Amazon SNS用於傳送通知。

  • pgAdmin – pgAdmin 是 Postgres 的開放原始碼管理工具。 pgAdmin 4 提供圖形介面,用於建立、維護和使用資料庫物件。

Code

為了實現所需的功能,模式會建立多個命名類似於 的函數UTL_FILE其他資訊區段包含這些函數的程式碼基礎。

在程式碼中,將 取代testaurorabucket為測試 S3 儲存貯體的名稱。us-east-1 將 取代為測試 S3 儲存貯體所在的AWS區域。

史詩

任務描述所需的技能
設定IAM政策。

建立 AWS Identity and Access Management (IAM) 政策,以授予 S3 儲存貯體和物件的存取權。如需程式碼,請參閱其他資訊一節。

AWS 管理員、 DBA
將 Amazon S3 存取角色新增至 Aurora Postgre SQL。

建立兩個IAM角色:一個角色用於讀取,一個角色用於寫入存取 Amazon S3。將這兩個角色連接到 Aurora Postgre SQL- 相容叢集: 

  • S3Export 功能的一個角色

  • S3Import 功能的一個角色

如需詳細資訊,請參閱 Aurora Postgre SQL-Compatible 文件,關於將資料匯入匯出至 Amazon S3。

AWS 管理員、 DBA
任務描述所需的技能
建立 aws_commons 延伸模組。

aws_commons 延伸模組是aws_s3延伸模組的相依性。

DBA、開發人員
建立 aws_s3 延伸模組。

aws_s3 延伸模組會與 Amazon S3 互動。

DBA、開發人員
任務描述所需的技能
測試將檔案從 Amazon S3 匯入 Aurora PostgreSQL。

若要測試將檔案匯入 Aurora Postgre SQL-Compatible,請建立範例CSV檔案,並將其上傳至 S3 儲存貯體。根據 CSV 檔案建立資料表定義,並使用 aws_s3.table_import_from_s3函數將檔案載入資料表。

DBA、開發人員
測試將檔案從 Aurora PostgreSQL 匯出至 Amazon S3。

若要測試從 Aurora Postgre SQL-Compatible 匯出檔案,請建立測試資料表、填入資料,然後使用 aws_s3.query_export_to_s3函數匯出資料。

DBA、開發人員
任務描述所需的技能
建立 utl_file_utility 結構描述。

結構描述會將包裝函式放在一起。若要建立結構描述,請執行下列命令。

CREATE SCHEMA utl_file_utility;
DBA、開發人員
建立 file_type 類型。

若要建立 file_type 類型,請使用下列程式碼。

CREATE TYPE utl_file_utility.file_type AS (     p_path character varying(30),     p_file_name character varying );
DBA/開發人員
建立初始化函數。

init 函數會初始化常見的變數,例如 bucketregion。如需程式碼,請參閱其他資訊一節。

DBA/開發人員
建立包裝函式。

建立包裝函式 fopenput_linefclose。如需程式碼,請參閱其他資訊一節。

DBA、開發人員
任務描述所需的技能
在寫入模式下測試包裝函式。

若要在寫入模式下測試包裝函式,請使用其他資訊區段中提供的程式碼。

DBA、開發人員
在附加模式下測試包裝函式。

若要在附加模式下測試包裝函式,請使用其他資訊區段中提供的程式碼。

DBA、開發人員

相關資源

其他資訊

設定IAM政策

建立下列政策。

政策名稱

JSON

S3IntRead

{     "Version": "2012-10-17",     "Statement": [         {             "Sid": "S3integrationtest",             "Effect": "Allow",             "Action": [                 "s3:GetObject",                 "s3:ListBucket"             ],             "Resource": [          "arn:aws:s3:::testaurorabucket/*",          "arn:aws:s3:::testaurorabucket"             ]         }     ] }

S3IntWrite

{     "Version": "2012-10-17",     "Statement": [         {             "Sid": "S3integrationtest",             "Effect": "Allow",             "Action": [                 "s3:PutObject",                                 "s3:ListBucket"             ],             "Resource": [                "arn:aws:s3:::testaurorabucket/*",                "arn:aws:s3:::testaurorabucket"             ]         }     ] }

建立初始化函數

若要初始化常見變數,例如 bucketregion,請使用下列程式碼建立init函數。

CREATE OR REPLACE FUNCTION utl_file_utility.init(     )     RETURNS void     LANGUAGE 'plpgsql'     COST 100     VOLATILE AS $BODY$ BEGIN       perform set_config       ( format( '%s.%s','UTL_FILE_UTILITY', 'region' )       , 'us-east-1'::text       , false );       perform set_config       ( format( '%s.%s','UTL_FILE_UTILITY', 's3bucket' )       , 'testaurorabucket'::text       , false ); END; $BODY$;

建立包裝函式

建立 fopenput_linefclose 包裝函式函數。

fopen

CREATE OR REPLACE FUNCTION utl_file_utility.fopen(     p_file_name character varying,     p_path character varying,     p_mode character DEFAULT 'W'::bpchar,     OUT p_file_type utl_file_utility.file_type)     RETURNS utl_file_utility.file_type     LANGUAGE 'plpgsql'     COST 100     VOLATILE AS $BODY$ declare     v_sql character varying;     v_cnt_stat integer;     v_cnt integer;     v_tabname character varying;     v_filewithpath character varying;     v_region character varying;     v_bucket character varying; BEGIN     /*initialize common variable */     PERFORM utl_file_utility.init();     v_region := current_setting( format( '%s.%s', 'UTL_FILE_UTILITY', 'region' ) );     v_bucket :=  current_setting( format( '%s.%s', 'UTL_FILE_UTILITY', 's3bucket' ) );         /* set tabname*/     v_tabname := substring(p_file_name,1,case when strpos(p_file_name,'.') = 0 then length(p_file_name) else strpos(p_file_name,'.') - 1 end );     v_filewithpath := case when NULLif(p_path,'') is null then p_file_name else concat_ws('/',p_path,p_file_name) end ;     raise notice 'v_bucket %, v_filewithpath % , v_region %', v_bucket,v_filewithpath, v_region;         /* APPEND MODE HANDLING; RETURN EXISTING FILE DETAILS IF PRESENT ELSE CREATE AN EMPTY FILE */     IF p_mode = 'A' THEN         v_sql := concat_ws('','create temp table if not exists ', v_tabname,' (col1 text)');         execute v_sql;         begin         PERFORM aws_s3.table_import_from_s3             ( v_tabname,             '',               'DELIMITER AS ''#''',             aws_commons.create_s3_uri             (     v_bucket,                 v_filewithpath ,                 v_region)             );         exception             when others then              raise notice 'File load issue ,%',sqlerrm;              raise;         end;         execute concat_ws('','select count(*) from ',v_tabname) into v_cnt;         IF v_cnt > 0         then             p_file_type.p_path := p_path;             p_file_type.p_file_name := p_file_name;         else                     PERFORM aws_s3.query_export_to_s3('select ''''',                             aws_commons.create_s3_uri(v_bucket, v_filewithpath, v_region)                                           );             p_file_type.p_path := p_path;             p_file_type.p_file_name := p_file_name;                 end if;         v_sql := concat_ws('','drop table ', v_tabname);                 execute v_sql;                 ELSEIF p_mode = 'W' THEN             PERFORM aws_s3.query_export_to_s3('select ''''',                             aws_commons.create_s3_uri(v_bucket, v_filewithpath, v_region)                                           );             p_file_type.p_path := p_path;             p_file_type.p_file_name := p_file_name;     END IF;         EXCEPTION         when others then             p_file_type.p_path := p_path;             p_file_type.p_file_name := p_file_name;             raise notice 'fopenerror,%',sqlerrm;             raise; END; $BODY$;

put_line

CREATE OR REPLACE FUNCTION utl_file_utility.put_line(     p_file_name character varying,     p_path character varying,     p_line text,     p_flag character DEFAULT 'W'::bpchar)     RETURNS boolean     LANGUAGE 'plpgsql'     COST 100     VOLATILE AS $BODY$ /************************************************************************** * Write line, p_line in windows format to file, p_fp - with carriage return * added before new line. **************************************************************************/ declare     v_sql varchar;     v_ins_sql varchar;     v_cnt INTEGER;     v_filewithpath character varying;     v_tabname  character varying;     v_bucket character varying;     v_region character varying;     BEGIN  PERFORM utl_file_utility.init(); /* check if temp table already exist */  v_tabname := substring(p_file_name,1,case when strpos(p_file_name,'.') = 0 then length(p_file_name) else strpos(p_file_name,'.') - 1 end );  v_sql := concat_ws('','select count(1) FROM pg_catalog.pg_class c LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace where n.nspname like ''pg_temp_%'''                          ,' AND pg_catalog.pg_table_is_visible(c.oid) AND Upper(relname) = Upper( '''                          ,  v_tabname ,''' ) ');    execute v_sql into v_cnt;     IF v_cnt = 0 THEN          v_sql := concat_ws('','create temp table ',v_tabname,' (col text)');         execute v_sql;         /* CHECK IF APPEND MODE */         IF upper(p_flag) = 'A' THEN             PERFORM utl_file_utility.init();                                     v_region := current_setting( format( '%s.%s', 'UTL_FILE_UTILITY', 'region' ) );             v_bucket :=  current_setting( format( '%s.%s', 'UTL_FILE_UTILITY', 's3bucket' ) );                         /* set tabname*/                         v_filewithpath := case when NULLif(p_path,'') is null then p_file_name else concat_ws('/',p_path,p_file_name) end ;                                     begin                PERFORM aws_s3.table_import_from_s3                      ( v_tabname,                           '',                          'DELIMITER AS ''#''',                         aws_commons.create_s3_uri                            ( v_bucket,                                v_filewithpath,                                v_region    )                     );             exception                 when others then                     raise notice  'Error Message : %',sqlerrm;                     raise;             end;             END IF;         END IF;     /* INSERT INTO TEMP TABLE */                   v_ins_sql := concat_ws('','insert into ',v_tabname,' values(''',p_line,''')');     execute v_ins_sql;     RETURN TRUE;     exception             when others then                 raise notice  'Error Message : %',sqlerrm;                 raise; END; $BODY$;

關閉

CREATE OR REPLACE FUNCTION utl_file_utility.fclose(     p_file_name character varying,     p_path character varying)     RETURNS boolean     LANGUAGE 'plpgsql'     COST 100     VOLATILE AS $BODY$ DECLARE     v_filewithpath character varying;     v_bucket character varying;     v_region character varying;     v_tabname character varying; v_sql character varying; BEGIN       PERFORM utl_file_utility.init();       v_region := current_setting( format( '%s.%s', 'UTL_FILE_UTILITY', 'region' ) );     v_bucket :=  current_setting( format( '%s.%s', 'UTL_FILE_UTILITY', 's3bucket' ) );     v_tabname := substring(p_file_name,1,case when strpos(p_file_name,'.') = 0 then length(p_file_name) else strpos(p_file_name,'.') - 1 end );     v_filewithpath := case when NULLif(p_path,'') is null then p_file_name else concat_ws('/',p_path,p_file_name) end ;     raise notice 'v_bucket %, v_filewithpath % , v_region %', v_bucket,v_filewithpath, v_region ;         /* exporting to s3 */     perform aws_s3.query_export_to_s3         (concat_ws('','select * from ',v_tabname,'  order by ctid asc'),             aws_commons.create_s3_uri(v_bucket, v_filewithpath, v_region)         );    v_sql := concat_ws('','drop table ', v_tabname);     execute v_sql;        RETURN TRUE; EXCEPTION        when others then      raise notice 'error fclose %',sqlerrm;      RAISE; END; $BODY$;

測試您的設定和包裝函式

使用下列匿名程式碼區塊來測試您的設定。

測試寫入模式

下列程式碼會在 S3 儲存貯s3inttest體中寫入名為 的檔案。

do $$ declare l_file_name varchar := 's3inttest' ; l_path varchar := 'integration_test' ; l_mode char(1) := 'W'; l_fs utl_file_utility.file_type ; l_status boolean; begin select * from utl_file_utility.fopen( l_file_name, l_path , l_mode ) into l_fs ; raise notice 'fopen : l_fs : %', l_fs; select * from utl_file_utility.put_line( l_file_name, l_path ,'this is test file:in s3bucket: for test purpose', l_mode ) into l_status ; raise notice 'put_line : l_status %', l_status; select * from utl_file_utility.fclose( l_file_name , l_path ) into l_status ; raise notice 'fclose : l_status %', l_status; end; $$

測試附加模式

下列程式碼會在先前測試中建立s3inttest的檔案上附加行。

do $$ declare l_file_name varchar := 's3inttest' ; l_path varchar := 'integration_test' ; l_mode char(1) := 'A'; l_fs utl_file_utility.file_type ; l_status boolean; begin select * from utl_file_utility.fopen( l_file_name, l_path , l_mode ) into l_fs ; raise notice 'fopen : l_fs : %', l_fs; select * from utl_file_utility.put_line( l_file_name, l_path ,'this is test file:in s3bucket: for test purpose : append 1', l_mode ) into l_status ; raise notice 'put_line : l_status %', l_status; select * from utl_file_utility.put_line( l_file_name, l_path ,'this is test file:in s3bucket : for test purpose : append 2', l_mode ) into l_status ; raise notice 'put_line : l_status %', l_status; select * from utl_file_utility.fclose( l_file_name , l_path ) into l_status ; raise notice 'fclose : l_status %', l_status; end; $$

Amazon SNS通知

或者,您可以在 S3 儲存貯體上設定 Amazon CloudWatch 監控和 Amazon SNS通知。如需詳細資訊,請參閱監控 Amazon S3設定 Amazon SNS Notifications。