Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.
Esegui la migrazione di tabelle esterne Oracle verso Amazon Aurora, compatibile con PostgreSQL
Creato da anuradha chintha (AWS) e Rakesh Raghav (AWS)
Riepilogo
Le tabelle esterne offrono a Oracle la possibilità di interrogare i dati archiviati all'esterno del database in file flat. È possibile utilizzare il driver ORACLE_LOADER per accedere a qualsiasi dato memorizzato in qualsiasi formato che possa essere caricato dall'utilità SQL*Loader. Non è possibile utilizzare Data Manipulation Language (DML) su tabelle esterne, ma è possibile utilizzare tabelle esterne per operazioni di interrogazione, join e ordinamento.
Amazon Aurora PostgreSQL Compatible Edition non offre funzionalità simili alle tabelle esterne di Oracle. È invece necessario utilizzare la modernizzazione per sviluppare una soluzione scalabile che soddisfi i requisiti funzionali e sia parsimoniosa.
Questo modello fornisce i passaggi per la migrazione di diversi tipi di tabelle esterne Oracle all'edizione compatibile con Aurora PostgreSQL sul cloud Amazon Web Services (AWS) utilizzando l'estensione. aws_s3
Consigliamo di testare a fondo questa soluzione prima di implementarla in un ambiente di produzione.
Prerequisiti e limitazioni
Prerequisiti
Un account AWS attivo
Interfaccia a riga di comando di AWS (CLI AWS)
Un'istanza di database compatibile con Aurora PostgreSQL disponibile.
Un database Oracle locale con una tabella esterna
API PG.Client
File di dati
Limitazioni
Questo modello non fornisce la funzionalità necessaria per sostituire le tabelle esterne Oracle. Tuttavia, i passaggi e il codice di esempio possono essere ulteriormente migliorati per raggiungere gli obiettivi di modernizzazione del database.
I file non devono contenere il carattere che viene utilizzato come delimitatore nelle funzioni di
aws_s3
esportazione e importazione.
Versioni del prodotto
Per importare da Amazon S3 in RDS per PostgreSQL, il database deve eseguire PostgreSQL versione 10.7 o successiva.
Architettura
Stack tecnologico di origine
Oracle
Architettura di origine

Stack tecnologico Target
Compatibile con Amazon Aurora PostgreSQL
Amazon CloudWatch
AWS Lambda
AWS Secrets Manager
Servizio di notifica semplice Amazon (Amazon Simple Notification Service (Amazon SNS))
Amazon Simple Storage Service (Amazon S3)
Architettura Target
Il diagramma seguente mostra una rappresentazione di alto livello della soluzione.

I file vengono caricati nel bucket S3.
Viene avviata la funzione Lambda.
La funzione Lambda avvia la chiamata alla funzione DB.
Secrets Manager fornisce le credenziali per l'accesso al database.
A seconda della funzione DB, viene creato un allarme SNS.
Automazione e scalabilità
Qualsiasi aggiunta o modifica alle tabelle esterne può essere gestita con la manutenzione dei metadati.
Strumenti
Compatibile con Amazon Aurora PostgreSQL — Amazon Aurora PostgreSQL Compatible Edition è un motore di database relazionale completamente gestito, compatibile con PostgreSQL e conforme ad ACID che combina la velocità e l'affidabilità dei database commerciali di fascia alta con l'economicità dei database open source.
AWS CLI — AWS Command Line Interface (AWS CLI) è uno strumento unificato per gestire i servizi AWS. Con un solo strumento da scaricare e configurare, puoi controllare più servizi AWS dalla riga di comando e automatizzarli tramite script.
Amazon CloudWatch: Amazon CloudWatch monitora le risorse e l'utilizzo di Amazon S3.
AWS Lambda: AWS Lambda è un servizio di elaborazione serverless che supporta l'esecuzione di codice senza effettuare il provisioning o la gestione di server, creare una logica di scalabilità del cluster in base al carico di lavoro, mantenere integrazioni di eventi o gestire i runtime. In questo modello, Lambda esegue la funzione di database ogni volta che un file viene caricato su Amazon S3.
AWS Secrets Manager — AWS Secrets Manager è un servizio per l'archiviazione e il recupero delle credenziali. Utilizzando Secrets Manager, puoi sostituire le credenziali codificate nel codice, comprese le password, con una chiamata API a Secrets Manager per recuperare il segreto a livello di codice.
Amazon S3 — Amazon Simple Storage Service (Amazon S3) fornisce un livello di storage per ricevere e archiviare file per il consumo e la trasmissione da e verso il cluster Aurora compatibile con PostgreSQL.
aws_s3 — L'estensione
aws_s3
integra la compatibilità con Amazon S3 e Aurora PostgreSQL.Amazon SNS — Amazon Simple Notification Service (Amazon SNS) coordina e gestisce la consegna o l'invio di messaggi tra editori e clienti. In questo modello, Amazon SNS viene utilizzato per inviare notifiche.
Codice
Ogni volta che un file viene inserito nel bucket S3, è necessario creare e richiamare una funzione DB dall'applicazione di elaborazione o dalla funzione Lambda. Per i dettagli, consulta il codice (allegato).
Epiche
Attività | Descrizione | Competenze richieste |
---|---|---|
Aggiungere un file esterno al database di origine. | Crea un file esterno e spostalo nella | DBA |
Attività | Descrizione | Competenze richieste |
---|---|---|
Crea un database Aurora PostgreSQL. | Crea un'istanza DB nel tuo cluster compatibile con Amazon Aurora PostgreSQL. | DBA |
Crea uno schema, un'estensione aws_s3 e tabelle. | Usa il codice riportato | DBA, Sviluppatore |
Crea la funzione DB. | Per creare la funzione DB, utilizzate il codice sotto | DBA, Sviluppatore |
Attività | Descrizione | Competenze richieste |
---|---|---|
Creare un ruolo. | Crea un ruolo con autorizzazioni per accedere ad Amazon S3 e Amazon Relational Database Service (Amazon RDS). Questo ruolo verrà assegnato a Lambda per l'esecuzione del pattern. | DBA |
Creazione della funzione Lambda | Crea una funzione Lambda che legga il nome del file da Amazon S3 (ad esempio A seconda del risultato della chiamata alla funzione, verrà avviata una notifica SNS (ad esempio,). In base alle esigenze aziendali, è possibile creare una funzione Lambda con codice aggiuntivo, se necessario. Per ulteriori informazioni, consulta la documentazione di Lambda. | DBA |
Configura un trigger di evento del bucket S3. | Configura un meccanismo per chiamare la funzione Lambda per tutti gli eventi di creazione di oggetti nel bucket S3. | DBA |
Crea un segreto. | Crea un nome segreto per le credenziali del database utilizzando Secrets Manager. Passa il segreto nella funzione Lambda. | DBA |
Carica i file di supporto Lambda. | Carica un file.zip che contiene i pacchetti di supporto Lambda e lo script Python allegato per la connessione a Aurora PostgreSQL Compatible. Il codice Python richiama la funzione che hai creato nel database. | DBA |
Creare un argomento SNS. | Crea un argomento SNS per inviare posta in caso di successo o fallimento del caricamento dei dati. | DBA |
Attività | Descrizione | Competenze richieste |
---|---|---|
Crea un bucket S3. | Sulla console Amazon S3, crea un bucket S3 con un nome univoco che non contenga barre iniziali. Il nome di un bucket S3 è unico a livello globale e lo spazio dei nomi è condiviso da tutti gli account AWS. | DBA |
Crea politiche IAM. | Per creare le policy di AWS Identity and Access Management (IAM), usa il codice | DBA |
Crea ruoli. | Crea due ruoli per la compatibilità con Aurora PostgreSQL, un ruolo per l'importazione e un ruolo per l'esportazione. Assegna le politiche corrispondenti ai ruoli. | DBA |
Collega i ruoli al cluster compatibile con Aurora PostgreSQL. | In Gestisci ruoli, collega i ruoli di importazione ed esportazione al cluster Aurora PostgreSQL. | DBA |
Crea oggetti di supporto compatibili con Aurora PostgreSQL. | Per gli script delle tabelle, usa il codice riportato nella sezione Informazioni aggiuntive. Per la funzione personalizzata, usa il codice riportato | DBA |
Attività | Descrizione | Competenze richieste |
---|---|---|
Carica un file nel bucket S3. | Per caricare un file di test nel bucket S3, usa la console o il seguente comando nella CLI di AWS.
Non appena il file viene caricato, un evento bucket avvia la funzione Lambda, che esegue la funzione compatibile con Aurora PostgreSQL. | DBA |
Controlla i dati e i file di registro e di errore. | La funzione compatibile con Aurora PostgreSQL carica i file nella tabella principale e crea | DBA |
Monitora la soluzione. | Nella CloudWatch console Amazon, monitora la funzione Lambda. | DBA |
Risorse correlate
Informazioni aggiuntive
ext_table_scripts
CREATE EXTENSION aws_s3 CASCADE;
CREATE TABLE IF NOT EXISTS meta_EXTERNAL_TABLE
(
table_name_stg character varying(100) ,
table_name character varying(100) ,
col_list character varying(1000) ,
data_type character varying(100) ,
col_order numeric,
start_pos numeric,
end_pos numeric,
no_position character varying(100) ,
date_mask character varying(100) ,
delimeter character(1) ,
directory character varying(100) ,
file_name character varying(100) ,
header_exist character varying(5)
);
CREATE TABLE IF NOT EXISTS ext_tbl_stg
(
col1 text
);
CREATE TABLE IF NOT EXISTS error_table
(
error_details text,
file_name character varying(100),
processed_time timestamp without time zone
);
CREATE TABLE IF NOT EXISTS log_table
(
file_name character varying(50) COLLATE pg_catalog."default",
processed_date timestamp without time zone,
tot_rec_count numeric,
proc_rec_count numeric,
error_rec_count numeric
);
sample insert scripts of meta data:
INSERT INTO meta_EXTERNAL_TABLE (table_name_stg, table_name, col_list, data_type, col_order, start_pos, end_pos, no_position, date_mask, delimeter, directory, file_name, header_exist) VALUES ('F_EX_APS_TRANSACTIONS_STG', 'F_EX_APS_TRANSACTIONS', 'source_filename', 'character varying', 2, 8, 27, NULL, NULL, NULL, 'databasedev', 'externalinterface/loaddir/APS', 'NO');
INSERT INTO meta_EXTERNAL_TABLE (table_name_stg, table_name, col_list, data_type, col_order, start_pos, end_pos, no_position, date_mask, delimeter, directory, file_name, header_exist) VALUES ('F_EX_APS_TRANSACTIONS_STG', 'F_EX_APS_TRANSACTIONS', 'record_type_identifier', 'character varying', 3, 28, 30, NULL, NULL, NULL, 'databasedev', 'externalinterface/loaddir/APS', 'NO');
INSERT INTO meta_EXTERNAL_TABLE (table_name_stg, table_name, col_list, data_type, col_order, start_pos, end_pos, no_position, date_mask, delimeter, directory, file_name, header_exist) VALUES ('F_EX_APS_TRANSACTIONS_STG', 'F_EX_APS_TRANSACTIONS', 'fad_code', 'numeric', 4, 31, 36, NULL, NULL, NULL, 'databasedev', 'externalinterface/loaddir/APS', 'NO');
INSERT INTO meta_EXTERNAL_TABLE (table_name_stg, table_name, col_list, data_type, col_order, start_pos, end_pos, no_position, date_mask, delimeter, directory, file_name, header_exist) VALUES ('F_EX_APS_TRANSACTIONS_STG', 'F_EX_APS_TRANSACTIONS', 'session_sequence_number', 'numeric', 5, 37, 42, NULL, NULL, NULL, 'databasedev', 'externalinterface/loaddir/APS', 'NO');
INSERT INTO meta_EXTERNAL_TABLE (table_name_stg, table_name, col_list, data_type, col_order, start_pos, end_pos, no_position, date_mask, delimeter, directory, file_name, header_exist) VALUES ('F_EX_APS_TRANSACTIONS_STG', 'F_EX_APS_TRANSACTIONS', 'transaction_sequence_number', 'numeric', 6, 43, 48, NULL, NULL, NULL, 'databasedev', 'externalinterface/loaddir/APS', 'NO');
s3bucketpolicy_for import
---Import role policy
--Create an IAM policy to allow, Get, and list actions on S3 bucket
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "s3import",
"Action": [
"s3:GetObject",
"s3:ListBucket"
],
"Effect": "Allow",
"Resource": [
"arn:aws:s3:::s3importtest",
"arn:aws:s3:::s3importtest/*"
]
}
]
}
--Export Role policy
--Create an IAM policy to allow, put, and list actions on S3 bucket
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "s3export",
"Action": [
"S3:PutObject",
"s3:ListBucket"
],
"Effect": "Allow",
"Resource": [
"arn:aws:s3:::s3importtest/*"
]
}
]
}
Esempio di funzione DB load_external_tables_latest
CREATE OR REPLACE FUNCTION public.load_external_tables(pi_filename text)
RETURNS character varying
LANGUAGE plpgsql
AS $function$
/* Loading data from S3 bucket into a APG table */
DECLARE
v_final_sql TEXT;
pi_ext_table TEXT;
r refCURSOR;
v_sqlerrm text;
v_chunk numeric;
i integer;
v_col_list TEXT;
v_postion_list CHARACTER VARYING(1000);
v_len integer;
v_delim varchar;
v_file_name CHARACTER VARYING(1000);
v_directory CHARACTER VARYING(1000);
v_table_name_stg CHARACTER VARYING(1000);
v_sql_col TEXT;
v_sql TEXT;
v_sql1 TEXT;
v_sql2 TEXT;
v_sql3 TEXT;
v_cnt integer;
v_sql_dynamic TEXT;
v_sql_ins TEXT;
proc_rec_COUNT integer;
error_rec_COUNT integer;
tot_rec_COUNT integer;
v_rec_val integer;
rec record;
v_col_cnt integer;
kv record;
v_val text;
v_header text;
j integer;
ERCODE VARCHAR(5);
v_region text;
cr CURSOR FOR
SELECT distinct DELIMETER,
FILE_NAME,
DIRECTORY
FROM meta_EXTERNAL_TABLE
WHERE table_name = pi_ext_table
AND DELIMETER IS NOT NULL;
cr1 CURSOR FOR
SELECT col_list,
data_type,
start_pos,
END_pos,
concat_ws('',' ',TABLE_NAME_STG) as TABLE_NAME_STG,
no_position,date_mask
FROM meta_EXTERNAL_TABLE
WHERE table_name = pi_ext_table
order by col_order asc;
cr2 cursor FOR
SELECT distinct table_name,table_name_stg
FROM meta_EXTERNAL_TABLE
WHERE upper(file_name) = upper(pi_filename);
BEGIN
-- PERFORM utl_file_utility.init();
v_region := 'us-east-1';
/* find tab details from file name */
--DELETE FROM ERROR_TABLE WHERE file_name= pi_filename;
-- DELETE FROM log_table WHERE file_name= pi_filename;
BEGIN
SELECT distinct table_name,table_name_stg INTO strict pi_ext_table,v_table_name_stg
FROM meta_EXTERNAL_TABLE
WHERE upper(file_name) = upper(pi_filename);
EXCEPTION
WHEN NO_DATA_FOUND THEN
raise notice 'error 1,%',sqlerrm;
pi_ext_table := null;
v_table_name_stg := null;
RAISE USING errcode = 'NTFIP' ;
when others then
raise notice 'error others,%',sqlerrm;
END;
j :=1 ;
for rec in cr2
LOOP
pi_ext_table := rec.table_name;
v_table_name_stg := rec.table_name_stg;
v_col_list := null;
IF pi_ext_table IS NOT NULL
THEN
--EXECUTE concat_ws('','truncate table ' ,pi_ext_table) ;
EXECUTE concat_ws('','truncate table ' ,v_table_name_stg) ;
SELECT distinct DELIMETER INTO STRICT v_delim
FROM meta_EXTERNAL_TABLE
WHERE table_name = pi_ext_table;
IF v_delim IS NOT NULL THEN
SELECT distinct DELIMETER,
FILE_NAME,
DIRECTORY ,
concat_ws('',' ',table_name_stg),
case header_exist when 'YES' then 'CSV HEADER' else 'CSV' end as header_exist
INTO STRICT v_delim,v_file_name,v_directory,v_table_name_stg,v_header
FROM meta_EXTERNAL_TABLE
WHERE table_name = pi_ext_table
AND DELIMETER IS NOT NULL;
IF upper(v_delim) = 'CSV'
THEN
v_sql := concat_ws('','SELECT aws_s3.table_import_FROM_s3 ( ''',
v_table_name_stg,''','''',
''DELIMITER '''','''' CSV HEADER QUOTE ''''"'''''', aws_commons.create_s3_uri ( ''',
v_directory,''',''',v_file_name,''', ''',v_region,'''))');
ELSE
v_sql := concat_ws('','SELECT aws_s3.table_import_FROM_s3(''',
v_table_name_stg, ''','''', ''DELIMITER AS ''''^''''',''',','
aws_commons.create_s3_uri
( ''',v_directory, ''',''',
v_file_name, ''',',
'''',v_region,''')
)');
raise notice 'v_sql , %',v_sql;
begin
EXECUTE v_sql;
EXCEPTION
WHEN OTHERS THEN
raise notice 'error 1';
RAISE USING errcode = 'S3IMP' ;
END;
select count(col_list) INTO v_col_cnt
from meta_EXTERNAL_TABLE where table_name = pi_ext_table;
-- raise notice 'v_sql 2, %',concat_ws('','update ',v_table_name_stg, ' set col1 = col1||''',v_delim,'''');
execute concat_ws('','update ',v_table_name_stg, ' set col1 = col1||''',v_delim,'''');
i :=1;
FOR rec in cr1
loop
v_sql1 := concat_ws('',v_sql1,'split_part(col1,''',v_delim,''',', i,')',' as ',rec.col_list,',');
v_sql2 := concat_ws('',v_sql2,rec.col_list,',');
-- v_sql3 := concat_ws('',v_sql3,'rec.',rec.col_list,'::',rec.data_type,',');
case
WHEN upper(rec.data_type) = 'NUMERIC'
THEN v_sql3 := concat_ws('',v_sql3,' case WHEN length(trim(split_part(col1,''',v_delim,''',', i,'))) =0
THEN null
ELSE
coalesce((trim(split_part(col1,''',v_delim,''',', i,')))::NUMERIC,0)::',rec.data_type,' END as ',rec.col_list,',') ;
WHEN UPPER(rec.data_type) = 'TIMESTAMP WITHOUT TIME ZONE' AND rec.date_mask = 'YYYYMMDD'
THEN v_sql3 := concat_ws('',v_sql3,' case WHEN length(trim(split_part(col1,''',v_delim,''',', i,'))) =0
THEN null
ELSE
to_date(coalesce((trim(split_part(col1,''',v_delim,''',', i,'))),''99990101''),''YYYYMMDD'')::',rec.data_type,' END as ',rec.col_list,',');
WHEN UPPER(rec.data_type) = 'TIMESTAMP WITHOUT TIME ZONE' AND rec.date_mask = 'MM/DD/YYYY hh24:mi:ss'
THEN v_sql3 := concat_ws('',v_sql3,' case WHEN length(trim(split_part(col1,''',v_delim,''',', i,'))) =0
THEN null
ELSE
to_date(coalesce((trim(split_part(col1,''',v_delim,''',', i,'))),''01/01/9999 0024:00:00''),''MM/DD/YYYY hh24:mi:ss'')::',rec.data_type,' END as ',rec.col_list,',');
ELSE
v_sql3 := concat_ws('',v_sql3,' case WHEN length(trim(split_part(col1,''',v_delim,''',', i,'))) =0
THEN null
ELSE
coalesce((trim(split_part(col1,''',v_delim,''',', i,'))),'''')::',rec.data_type,' END as ',rec.col_list,',') ;
END case;
i :=i+1;
end loop;
-- raise notice 'v_sql 3, %',v_sql3;
SELECT trim(trailing ' ' FROM v_sql1) INTO v_sql1;
SELECT trim(trailing ',' FROM v_sql1) INTO v_sql1;
SELECT trim(trailing ' ' FROM v_sql2) INTO v_sql2;
SELECT trim(trailing ',' FROM v_sql2) INTO v_sql2;
SELECT trim(trailing ' ' FROM v_sql3) INTO v_sql3;
SELECT trim(trailing ',' FROM v_sql3) INTO v_sql3;
END IF;
raise notice 'v_delim , %',v_delim;
EXECUTE concat_ws('','SELECT COUNT(*) FROM ',v_table_name_stg) INTO v_cnt;
raise notice 'stg cnt , %',v_cnt;
/* if upper(v_delim) = 'CSV' then
v_sql_ins := concat_ws('', ' SELECT * from ' ,v_table_name_stg );
else
-- v_sql_ins := concat_ws('',' SELECT ',v_sql1,' from (select col1 from ' ,v_table_name_stg , ')sub ');
v_sql_ins := concat_ws('',' SELECT ',v_sql3,' from (select col1 from ' ,v_table_name_stg , ')sub ');
END IF;*/
v_chunk := v_cnt/100;
for i in 1..101
loop
BEGIN
-- raise notice 'v_sql , %',v_sql;
-- raise notice 'Chunk number , %',i;
v_sql_ins := concat_ws('',' SELECT ',v_sql3,' from (select col1 from ' ,v_table_name_stg , ' offset ',v_chunk*(i-1), ' limit ',v_chunk,') sub ');
v_sql := concat_ws('','insert into ', pi_ext_table ,' ', v_sql_ins);
-- raise notice 'select statement , %',v_sql_ins;
-- v_sql := null;
-- EXECUTE concat_ws('','insert into ', pi_ext_table ,' ', v_sql_ins, 'offset ',v_chunk*(i-1), ' limit ',v_chunk );
--v_sql := concat_ws('','insert into ', pi_ext_table ,' ', v_sql_ins );
-- raise notice 'insert statement , %',v_sql;
raise NOTICE 'CHUNK START %',v_chunk*(i-1);
raise NOTICE 'CHUNK END %',v_chunk;
EXECUTE v_sql;
EXCEPTION
WHEN OTHERS THEN
-- v_sql_ins := concat_ws('',' SELECT ',v_sql1, ' from (select col1 from ' ,v_table_name_stg , ' )sub ');
-- raise notice 'Chunk number for cursor , %',i;
raise NOTICE 'Cursor - CHUNK START %',v_chunk*(i-1);
raise NOTICE 'Cursor - CHUNK END %',v_chunk;
v_sql_ins := concat_ws('',' SELECT ',v_sql3, ' from (select col1 from ' ,v_table_name_stg , ' )sub ');
v_final_sql := REPLACE (v_sql_ins, ''''::text, ''''''::text);
-- raise notice 'v_final_sql %',v_final_sql;
v_sql :=concat_ws('','do $a$ declare r refcursor;v_sql text; i numeric;v_conname text; v_typ ',pi_ext_table,'[]; v_rec ','record',';
begin
open r for execute ''select col1 from ',v_table_name_stg ,' offset ',v_chunk*(i-1), ' limit ',v_chunk,''';
loop
begin
fetch r into v_rec;
EXIT WHEN NOT FOUND;
v_sql := concat_ws('''',''insert into ',pi_ext_table,' SELECT ',REPLACE (v_sql3, ''''::text, ''''''::text) , ' from ( select '''''',v_rec.col1,'''''' as col1) v'');
execute v_sql;
exception
when others then
v_sql := ''INSERT INTO ERROR_TABLE VALUES (concat_ws('''''''',''''Error Name: '''',$$''||SQLERRM||''$$,''''Error State: '''',''''''||SQLSTATE||'''''',''''record : '''',$$''||v_rec.col1||''$$),'''''||pi_filename||''''',now())'';
execute v_sql;
continue;
end ;
end loop;
close r;
exception
when others then
raise;
end ; $a$');
-- raise notice ' inside excp v_sql %',v_sql;
execute v_sql;
-- raise notice 'v_sql %',v_sql;
END;
END LOOP;
ELSE
SELECT distinct DELIMETER,FILE_NAME,DIRECTORY ,concat_ws('',' ',table_name_stg),
case header_exist when 'YES' then 'CSV HEADER' else 'CSV' end as header_exist
INTO STRICT v_delim,v_file_name,v_directory,v_table_name_stg,v_header
FROM meta_EXTERNAL_TABLE
WHERE table_name = pi_ext_table ;
v_sql := concat_ws('','SELECT aws_s3.table_import_FROM_s3(''',
v_table_name_stg, ''','''', ''DELIMITER AS ''''#'''' ',v_header,' '',','
aws_commons.create_s3_uri
( ''',v_directory, ''',''',
v_file_name, ''',',
'''',v_region,''')
)');
EXECUTE v_sql;
FOR rec in cr1
LOOP
IF rec.start_pos IS NULL AND rec.END_pos IS NULL AND rec.no_position = 'recnum'
THEN
v_rec_val := 1;
ELSE
case
WHEN upper(rec.data_type) = 'NUMERIC'
THEN v_sql1 := concat_ws('',' case WHEN length(trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1))) =0
THEN null
ELSE
coalesce((trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1)))::NUMERIC,0)::',rec.data_type,' END as ',rec.col_list,',') ;
WHEN UPPER(rec.data_type) = 'TIMESTAMP WITHOUT TIME ZONE' AND rec.date_mask = 'YYYYMMDD'
THEN v_sql1 := concat_ws('','case WHEN length(trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1))) =0
THEN null
ELSE
to_date(coalesce((trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1))),''99990101''),''YYYYMMDD'')::',rec.data_type,' END as ',rec.col_list,',');
WHEN UPPER(rec.data_type) = 'TIMESTAMP WITHOUT TIME ZONE' AND rec.date_mask = 'YYYYMMDDHH24MISS'
THEN v_sql1 := concat_ws('','case WHEN length(trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1))) =0
THEN null
ELSE
to_date(coalesce((trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1))),''9999010100240000''),''YYYYMMDDHH24MISS'')::',rec.data_type,' END as ',rec.col_list,',');
ELSE
v_sql1 := concat_ws('',' case WHEN length(trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1))) =0
THEN null
ELSE
coalesce((trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1))),'''')::',rec.data_type,' END as ',rec.col_list,',') ;
END case;
END IF;
v_col_list := concat_ws('',v_col_list ,v_sql1);
END LOOP;
SELECT trim(trailing ' ' FROM v_col_list) INTO v_col_list;
SELECT trim(trailing ',' FROM v_col_list) INTO v_col_list;
v_sql_col := concat_ws('',trim(trailing ',' FROM v_col_list) , ' FROM ',v_table_name_stg,' WHERE col1 IS NOT NULL AND length(col1)>0 ');
v_sql_dynamic := v_sql_col;
EXECUTE concat_ws('','SELECT COUNT(*) FROM ',v_table_name_stg) INTO v_cnt;
IF v_rec_val = 1 THEN
v_sql_ins := concat_ws('',' select row_number() over(order by ctid) as line_number ,' ,v_sql_dynamic) ;
ELSE
v_sql_ins := concat_ws('',' SELECT' ,v_sql_dynamic) ;
END IF;
BEGIN
EXECUTE concat_ws('','insert into ', pi_ext_table ,' ', v_sql_ins);
EXCEPTION
WHEN OTHERS THEN
IF v_rec_val = 1 THEN
v_final_sql := ' select row_number() over(order by ctid) as line_number ,col1 from ';
ELSE
v_final_sql := ' SELECT col1 from';
END IF;
v_sql :=concat_ws('','do $a$ declare r refcursor;v_rec_val numeric := ',coalesce(v_rec_val,0),';line_number numeric; col1 text; v_typ ',pi_ext_table,'[]; v_rec ',pi_ext_table,';
begin
open r for execute ''',v_final_sql, ' ',v_table_name_stg,' WHERE col1 IS NOT NULL AND length(col1)>0 '' ;
loop
begin
if v_rec_val = 1 then
fetch r into line_number,col1;
else
fetch r into col1;
end if;
EXIT WHEN NOT FOUND;
if v_rec_val = 1 then
select line_number,',trim(trailing ',' FROM v_col_list) ,' into v_rec;
else
select ',trim(trailing ',' FROM v_col_list) ,' into v_rec;
end if;
insert into ',pi_ext_table,' select v_rec.*;
exception
when others then
INSERT INTO ERROR_TABLE VALUES (concat_ws('''',''Error Name: '',SQLERRM,''Error State: '',SQLSTATE,''record : '',v_rec),''',pi_filename,''',now());
continue;
end ;
end loop;
close r;
exception
when others then
raise;
end ; $a$');
execute v_sql;
END;
END IF;
EXECUTE concat_ws('','SELECT COUNT(*) FROM ' ,pi_ext_table) INTO proc_rec_COUNT;
EXECUTE concat_ws('','SELECT COUNT(*) FROM error_table WHERE file_name =''',pi_filename,''' and processed_time::date = clock_timestamp()::date') INTO error_rec_COUNT;
EXECUTE concat_ws('','SELECT COUNT(*) FROM ',v_table_name_stg) INTO tot_rec_COUNT;
INSERT INTO log_table values(pi_filename,now(),tot_rec_COUNT,proc_rec_COUNT, error_rec_COUNT);
raise notice 'v_directory, %',v_directory;
raise notice 'pi_filename, %',pi_filename;
raise notice 'v_region, %',v_region;
perform aws_s3.query_export_to_s3('SELECT replace(trim(substring(error_details,position(''('' in error_details)+1),'')''),'','','';''),file_name,processed_time FROM error_table WHERE file_name = '''||pi_filename||'''',
aws_commons.create_s3_uri(v_directory, pi_filename||'.bad', v_region),
options :='FORmat csv, header, delimiter $$,$$'
);
raise notice 'v_directory, %',v_directory;
raise notice 'pi_filename, %',pi_filename;
raise notice 'v_region, %',v_region;
perform aws_s3.query_export_to_s3('SELECT * FROM log_table WHERE file_name = '''||pi_filename||'''',
aws_commons.create_s3_uri(v_directory, pi_filename||'.log', v_region),
options :='FORmat csv, header, delimiter $$,$$'
);
END IF;
j := j+1;
END LOOP;
RETURN 'OK';
EXCEPTION
WHEN OTHERS THEN
raise notice 'error %',sqlerrm;
ERCODE=SQLSTATE;
IF ERCODE = 'NTFIP' THEN
v_sqlerrm := concat_Ws('',sqlerrm,'No data for the filename');
ELSIF ERCODE = 'S3IMP' THEN
v_sqlerrm := concat_Ws('',sqlerrm,'Error While exporting the file from S3');
ELSE
v_sqlerrm := sqlerrm;
END IF;
select distinct directory into v_directory from meta_EXTERNAL_TABLE;
raise notice 'exc v_directory, %',v_directory;
raise notice 'exc pi_filename, %',pi_filename;
raise notice 'exc v_region, %',v_region;
perform aws_s3.query_export_to_s3('SELECT * FROM error_table WHERE file_name = '''||pi_filename||'''',
aws_commons.create_s3_uri(v_directory, pi_filename||'.bad', v_region),
options :='FORmat csv, header, delimiter $$,$$'
);
RETURN null;
END;
$function$