Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.
Esegui la migrazione di tabelle esterne Oracle su Amazon Aurora SQL Postgre -Compatible
Creato da anuradha chintha () e Rakesh Raghav () AWS AWS
Ambiente: PoC o pilota | Fonte: Oracle | Obiettivo: Aurora Postgre SQL |
Tipo R: Re-architect | Carico di lavoro: open source | Tecnologie: migrazione; database; modernizzazione |
AWSservizi: AWS Identity and Access Management; AWS Lambda; Amazon S3; Amazon; SNS Amazon Aurora |
Riepilogo
Le tabelle esterne offrono a Oracle la possibilità di interrogare i dati archiviati all'esterno del database in file flat. È possibile utilizzare il LOADER driver ORACLE _ per accedere a qualsiasi dato memorizzato in qualsiasi formato che può essere caricato dall'utilità SQL *Loader. Non è possibile utilizzare Data Manipulation Language (DML) su tabelle esterne, ma è possibile utilizzare tabelle esterne per operazioni di interrogazione, join e ordinamento.
Amazon Aurora Postgre SQL -Compatible Edition non fornisce funzionalità simili alle tabelle esterne di Oracle. È invece necessario utilizzare la modernizzazione per sviluppare una soluzione scalabile che soddisfi i requisiti funzionali e sia parsimoniosa.
Questo modello fornisce i passaggi per la migrazione di diversi tipi di tabelle esterne Oracle ad Aurora SQL Postgre -Compatible Edition su Amazon Web Services AWS () Cloud utilizzando l'estensione. aws_s3
Consigliamo di testare a fondo questa soluzione prima di implementarla in un ambiente di produzione.
Prerequisiti e limitazioni
Prerequisiti
Un AWS account attivo
AWSInterfaccia a riga di comando (AWSCLI)
Un'istanza di database compatibile con Aurora Postgre SQL disponibile.
Un database Oracle locale con una tabella esterna
PG.Client API
File di dati
Limitazioni
Questo modello non fornisce la funzionalità necessaria per sostituire le tabelle esterne Oracle. Tuttavia, i passaggi e il codice di esempio possono essere ulteriormente migliorati per raggiungere gli obiettivi di modernizzazione del database.
I file non devono contenere il carattere che viene utilizzato come delimitatore nelle funzioni di
aws_s3
esportazione e importazione.
Versioni del prodotto
Per importare da Amazon S3 a Postgre, SQL il database deve eseguire SQL Postgre versione 10.7 o successiva. RDS
Architettura
Stack tecnologico di origine
Oracle
Architettura di origine
Stack tecnologico Target
Compatibile con Amazon Aurora Postgres SQL
Amazon CloudWatch
AWSLambda
AWS Secrets Manager
Servizio di notifica semplice Amazon (AmazonSNS)
Amazon Simple Storage Service (Amazon S3)
Architettura Target
Il diagramma seguente mostra una rappresentazione di alto livello della soluzione.
I file vengono caricati nel bucket S3.
Viene avviata la funzione Lambda.
La funzione Lambda avvia la chiamata alla funzione DB.
Secrets Manager fornisce le credenziali per l'accesso al database.
A seconda della funzione DB, viene creato un SNS allarme.
Automazione e scalabilità
Qualsiasi aggiunta o modifica alle tabelle esterne può essere gestita con la manutenzione dei metadati.
Strumenti
Amazon Aurora Postgre -Compatible Edition SQL — Amazon Aurora SQL Postgre -Compatible Edition è un motore di database relazionale completamente gestito, compatibile con SQL Postgre e conforme che combina la velocità ACID e l'affidabilità dei database commerciali di fascia alta con l'economicità dei database open source.
AWSCLI— AWS CLI Command Line Interface () è uno strumento unificato per AWS gestire i tuoi servizi. AWS Con un solo strumento da scaricare e configurare, puoi controllare più AWS servizi dalla riga di comando e automatizzarli tramite script.
Amazon CloudWatch: Amazon CloudWatch monitora le risorse e l'utilizzo di Amazon S3.
AWSLambda: AWS Lambda è un servizio di elaborazione serverless che supporta l'esecuzione di codice senza effettuare il provisioning o la gestione di server, creare una logica di scalabilità del cluster in base al carico di lavoro, mantenere integrazioni di eventi o gestire i runtime. In questo modello, Lambda esegue la funzione di database ogni volta che un file viene caricato su Amazon S3.
AWSSecrets Manager — AWS Secrets Manager è un servizio per l'archiviazione e il recupero delle credenziali. Utilizzando Secrets Manager, è possibile sostituire le credenziali codificate nel codice, comprese le password, con una chiamata a API Secrets Manager per recuperare il segreto a livello di codice.
Amazon S3 — Amazon Simple Storage Service (Amazon S3) fornisce un livello di storage per ricevere e archiviare file per il consumo e la trasmissione da e verso il cluster compatibile con Aurora Postgre. SQL
aws_s3 — L'estensione
aws_s3
integra Amazon S3 e Aurora Postgre -Compatible. SQLAmazon SNS — Amazon Simple Notification Service (AmazonSNS) coordina e gestisce la consegna o l'invio di messaggi tra editori e clienti. In questo modello, Amazon SNS viene utilizzato per inviare notifiche.
Codice
Ogni volta che un file viene inserito nel bucket S3, è necessario creare e richiamare una funzione DB dall'applicazione di elaborazione o dalla funzione Lambda. Per i dettagli, consulta il codice (allegato).
Epiche
Attività | Descrizione | Competenze richieste |
---|---|---|
Aggiungere un file esterno al database di origine. | Crea un file esterno e spostalo nella | DBA |
Attività | Descrizione | Competenze richieste |
---|---|---|
Crea un database Aurora SQL Postgre. | Crea un'istanza DB nel tuo cluster compatibile con Amazon Aurora SQL Postgre. | DBA |
Crea uno schema, un'estensione aws_s3 e tabelle. | Usa il codice | DBA, Sviluppatore |
Crea la funzione DB. | Per creare la funzione DB, utilizzate il codice sotto | DBA, Sviluppatore |
Attività | Descrizione | Competenze richieste |
---|---|---|
Creare un ruolo. | Crea un ruolo con autorizzazioni per accedere ad Amazon S3 e Amazon Relational Database Service (Amazon). RDS Questo ruolo verrà assegnato a Lambda per l'esecuzione del pattern. | DBA |
Creazione della funzione Lambda | Crea una funzione Lambda che legga il nome del file da Amazon S3 (ad esempio A seconda del risultato della chiamata alla funzione, verrà avviata una SNS notifica (ad esempio,). In base alle esigenze aziendali, è possibile creare una funzione Lambda con codice aggiuntivo, se necessario. Per ulteriori informazioni, consulta la documentazione di Lambda. | DBA |
Configura un trigger di evento del bucket S3. | Configura un meccanismo per chiamare la funzione Lambda per tutti gli eventi di creazione di oggetti nel bucket S3. | DBA |
Crea un segreto. | Crea un nome segreto per le credenziali del database utilizzando Secrets Manager. Passa il segreto nella funzione Lambda. | DBA |
Carica i file di supporto Lambda. | Carica un file.zip che contiene i pacchetti di supporto Lambda e lo script Python allegato per la connessione a Aurora Postgre -Compatible. SQL Il codice Python richiama la funzione che hai creato nel database. | DBA |
Creazione di un argomento SNS. | Crea un SNS argomento per inviare posta in caso di successo o fallimento del caricamento dei dati. | DBA |
Attività | Descrizione | Competenze richieste |
---|---|---|
Crea un bucket S3. | Sulla console Amazon S3, crea un bucket S3 con un nome univoco che non contenga barre iniziali. Il nome di un bucket S3 è unico a livello globale e lo spazio dei nomi è condiviso da tutti gli account. AWS | DBA |
Crea politiche. IAM | Per creare le policy di AWS Identity and Access Management (IAM), utilizzate il codice | DBA |
Crea ruoli. | Crea due ruoli per Aurora Postgre SQL -Compatible, un ruolo per l'importazione e un ruolo per l'esportazione. Assegna le politiche corrispondenti ai ruoli. | DBA |
Associa i ruoli al cluster Aurora SQL Postgre -Compatible. | In Gestisci ruoli, collega i ruoli di importazione ed esportazione al cluster Aurora SQL Postgre. | DBA |
Crea oggetti di supporto per Aurora SQL Postgre -Compatible. | Per gli script delle tabelle, usa il codice riportato Per la funzione personalizzata, usa il codice riportato | DBA |
Attività | Descrizione | Competenze richieste |
---|---|---|
Carica un file nel bucket S3. | Per caricare un file di test nel bucket S3, usa la console o il seguente comando in. AWS CLI
Non appena il file viene caricato, un evento bucket avvia la funzione Lambda, che esegue la funzione Aurora Postgre -Compatible. SQL | DBA |
Controlla i dati e i file di registro e di errore. | La funzione Aurora Postgre SQL -Compatible carica i file nella tabella principale | DBA |
Monitora la soluzione. | Nella CloudWatch console Amazon, monitora la funzione Lambda. | DBA |
Risorse correlate
Informazioni aggiuntive
ext_table_scripts
CREATE EXTENSION aws_s3 CASCADE; CREATE TABLE IF NOT EXISTS meta_EXTERNAL_TABLE ( table_name_stg character varying(100) , table_name character varying(100) , col_list character varying(1000) , data_type character varying(100) , col_order numeric, start_pos numeric, end_pos numeric, no_position character varying(100) , date_mask character varying(100) , delimeter character(1) , directory character varying(100) , file_name character varying(100) , header_exist character varying(5) ); CREATE TABLE IF NOT EXISTS ext_tbl_stg ( col1 text ); CREATE TABLE IF NOT EXISTS error_table ( error_details text, file_name character varying(100), processed_time timestamp without time zone ); CREATE TABLE IF NOT EXISTS log_table ( file_name character varying(50) COLLATE pg_catalog."default", processed_date timestamp without time zone, tot_rec_count numeric, proc_rec_count numeric, error_rec_count numeric ); sample insert scripts of meta data: INSERT INTO meta_EXTERNAL_TABLE (table_name_stg, table_name, col_list, data_type, col_order, start_pos, end_pos, no_position, date_mask, delimeter, directory, file_name, header_exist) VALUES ('F_EX_APS_TRANSACTIONS_STG', 'F_EX_APS_TRANSACTIONS', 'source_filename', 'character varying', 2, 8, 27, NULL, NULL, NULL, 'databasedev', 'externalinterface/loaddir/APS', 'NO'); INSERT INTO meta_EXTERNAL_TABLE (table_name_stg, table_name, col_list, data_type, col_order, start_pos, end_pos, no_position, date_mask, delimeter, directory, file_name, header_exist) VALUES ('F_EX_APS_TRANSACTIONS_STG', 'F_EX_APS_TRANSACTIONS', 'record_type_identifier', 'character varying', 3, 28, 30, NULL, NULL, NULL, 'databasedev', 'externalinterface/loaddir/APS', 'NO'); INSERT INTO meta_EXTERNAL_TABLE (table_name_stg, table_name, col_list, data_type, col_order, start_pos, end_pos, no_position, date_mask, delimeter, directory, file_name, header_exist) VALUES ('F_EX_APS_TRANSACTIONS_STG', 'F_EX_APS_TRANSACTIONS', 'fad_code', 'numeric', 4, 31, 36, NULL, NULL, NULL, 'databasedev', 'externalinterface/loaddir/APS', 'NO'); INSERT INTO meta_EXTERNAL_TABLE (table_name_stg, table_name, col_list, data_type, col_order, start_pos, end_pos, no_position, date_mask, delimeter, directory, file_name, header_exist) VALUES ('F_EX_APS_TRANSACTIONS_STG', 'F_EX_APS_TRANSACTIONS', 'session_sequence_number', 'numeric', 5, 37, 42, NULL, NULL, NULL, 'databasedev', 'externalinterface/loaddir/APS', 'NO'); INSERT INTO meta_EXTERNAL_TABLE (table_name_stg, table_name, col_list, data_type, col_order, start_pos, end_pos, no_position, date_mask, delimeter, directory, file_name, header_exist) VALUES ('F_EX_APS_TRANSACTIONS_STG', 'F_EX_APS_TRANSACTIONS', 'transaction_sequence_number', 'numeric', 6, 43, 48, NULL, NULL, NULL, 'databasedev', 'externalinterface/loaddir/APS', 'NO');
s3bucketpolicy_for import
---Import role policy --Create an IAM policy to allow, Get, and list actions on S3 bucket { "Version": "2012-10-17", "Statement": [ { "Sid": "s3import", "Action": [ "s3:GetObject", "s3:ListBucket" ], "Effect": "Allow", "Resource": [ "arn:aws:s3:::s3importtest", "arn:aws:s3:::s3importtest/*" ] } ] } --Export Role policy --Create an IAM policy to allow, put, and list actions on S3 bucket { "Version": "2012-10-17", "Statement": [ { "Sid": "s3export", "Action": [ "S3:PutObject", "s3:ListBucket" ], "Effect": "Allow", "Resource": [ "arn:aws:s3:::s3importtest/*" ] } ] }
Esempio di funzione DB load_external_tables_latest
CREATE OR REPLACE FUNCTION public.load_external_tables(pi_filename text) RETURNS character varying LANGUAGE plpgsql AS $function$ /* Loading data from S3 bucket into a APG table */ DECLARE v_final_sql TEXT; pi_ext_table TEXT; r refCURSOR; v_sqlerrm text; v_chunk numeric; i integer; v_col_list TEXT; v_postion_list CHARACTER VARYING(1000); v_len integer; v_delim varchar; v_file_name CHARACTER VARYING(1000); v_directory CHARACTER VARYING(1000); v_table_name_stg CHARACTER VARYING(1000); v_sql_col TEXT; v_sql TEXT; v_sql1 TEXT; v_sql2 TEXT; v_sql3 TEXT; v_cnt integer; v_sql_dynamic TEXT; v_sql_ins TEXT; proc_rec_COUNT integer; error_rec_COUNT integer; tot_rec_COUNT integer; v_rec_val integer; rec record; v_col_cnt integer; kv record; v_val text; v_header text; j integer; ERCODE VARCHAR(5); v_region text; cr CURSOR FOR SELECT distinct DELIMETER, FILE_NAME, DIRECTORY FROM meta_EXTERNAL_TABLE WHERE table_name = pi_ext_table AND DELIMETER IS NOT NULL; cr1 CURSOR FOR SELECT col_list, data_type, start_pos, END_pos, concat_ws('',' ',TABLE_NAME_STG) as TABLE_NAME_STG, no_position,date_mask FROM meta_EXTERNAL_TABLE WHERE table_name = pi_ext_table order by col_order asc; cr2 cursor FOR SELECT distinct table_name,table_name_stg FROM meta_EXTERNAL_TABLE WHERE upper(file_name) = upper(pi_filename); BEGIN -- PERFORM utl_file_utility.init(); v_region := 'us-east-1'; /* find tab details from file name */ --DELETE FROM ERROR_TABLE WHERE file_name= pi_filename; -- DELETE FROM log_table WHERE file_name= pi_filename; BEGIN SELECT distinct table_name,table_name_stg INTO strict pi_ext_table,v_table_name_stg FROM meta_EXTERNAL_TABLE WHERE upper(file_name) = upper(pi_filename); EXCEPTION WHEN NO_DATA_FOUND THEN raise notice 'error 1,%',sqlerrm; pi_ext_table := null; v_table_name_stg := null; RAISE USING errcode = 'NTFIP' ; when others then raise notice 'error others,%',sqlerrm; END; j :=1 ; for rec in cr2 LOOP pi_ext_table := rec.table_name; v_table_name_stg := rec.table_name_stg; v_col_list := null; IF pi_ext_table IS NOT NULL THEN --EXECUTE concat_ws('','truncate table ' ,pi_ext_table) ; EXECUTE concat_ws('','truncate table ' ,v_table_name_stg) ; SELECT distinct DELIMETER INTO STRICT v_delim FROM meta_EXTERNAL_TABLE WHERE table_name = pi_ext_table; IF v_delim IS NOT NULL THEN SELECT distinct DELIMETER, FILE_NAME, DIRECTORY , concat_ws('',' ',table_name_stg), case header_exist when 'YES' then 'CSV HEADER' else 'CSV' end as header_exist INTO STRICT v_delim,v_file_name,v_directory,v_table_name_stg,v_header FROM meta_EXTERNAL_TABLE WHERE table_name = pi_ext_table AND DELIMETER IS NOT NULL; IF upper(v_delim) = 'CSV' THEN v_sql := concat_ws('','SELECT aws_s3.table_import_FROM_s3 ( ''', v_table_name_stg,''','''', ''DELIMITER '''','''' CSV HEADER QUOTE ''''"'''''', aws_commons.create_s3_uri ( ''', v_directory,''',''',v_file_name,''', ''',v_region,'''))'); ELSE v_sql := concat_ws('','SELECT aws_s3.table_import_FROM_s3(''', v_table_name_stg, ''','''', ''DELIMITER AS ''''^''''',''',',' aws_commons.create_s3_uri ( ''',v_directory, ''',''', v_file_name, ''',', '''',v_region,''') )'); raise notice 'v_sql , %',v_sql; begin EXECUTE v_sql; EXCEPTION WHEN OTHERS THEN raise notice 'error 1'; RAISE USING errcode = 'S3IMP' ; END; select count(col_list) INTO v_col_cnt from meta_EXTERNAL_TABLE where table_name = pi_ext_table; -- raise notice 'v_sql 2, %',concat_ws('','update ',v_table_name_stg, ' set col1 = col1||''',v_delim,''''); execute concat_ws('','update ',v_table_name_stg, ' set col1 = col1||''',v_delim,''''); i :=1; FOR rec in cr1 loop v_sql1 := concat_ws('',v_sql1,'split_part(col1,''',v_delim,''',', i,')',' as ',rec.col_list,','); v_sql2 := concat_ws('',v_sql2,rec.col_list,','); -- v_sql3 := concat_ws('',v_sql3,'rec.',rec.col_list,'::',rec.data_type,','); case WHEN upper(rec.data_type) = 'NUMERIC' THEN v_sql3 := concat_ws('',v_sql3,' case WHEN length(trim(split_part(col1,''',v_delim,''',', i,'))) =0 THEN null ELSE coalesce((trim(split_part(col1,''',v_delim,''',', i,')))::NUMERIC,0)::',rec.data_type,' END as ',rec.col_list,',') ; WHEN UPPER(rec.data_type) = 'TIMESTAMP WITHOUT TIME ZONE' AND rec.date_mask = 'YYYYMMDD' THEN v_sql3 := concat_ws('',v_sql3,' case WHEN length(trim(split_part(col1,''',v_delim,''',', i,'))) =0 THEN null ELSE to_date(coalesce((trim(split_part(col1,''',v_delim,''',', i,'))),''99990101''),''YYYYMMDD'')::',rec.data_type,' END as ',rec.col_list,','); WHEN UPPER(rec.data_type) = 'TIMESTAMP WITHOUT TIME ZONE' AND rec.date_mask = 'MM/DD/YYYY hh24:mi:ss' THEN v_sql3 := concat_ws('',v_sql3,' case WHEN length(trim(split_part(col1,''',v_delim,''',', i,'))) =0 THEN null ELSE to_date(coalesce((trim(split_part(col1,''',v_delim,''',', i,'))),''01/01/9999 0024:00:00''),''MM/DD/YYYY hh24:mi:ss'')::',rec.data_type,' END as ',rec.col_list,','); ELSE v_sql3 := concat_ws('',v_sql3,' case WHEN length(trim(split_part(col1,''',v_delim,''',', i,'))) =0 THEN null ELSE coalesce((trim(split_part(col1,''',v_delim,''',', i,'))),'''')::',rec.data_type,' END as ',rec.col_list,',') ; END case; i :=i+1; end loop; -- raise notice 'v_sql 3, %',v_sql3; SELECT trim(trailing ' ' FROM v_sql1) INTO v_sql1; SELECT trim(trailing ',' FROM v_sql1) INTO v_sql1; SELECT trim(trailing ' ' FROM v_sql2) INTO v_sql2; SELECT trim(trailing ',' FROM v_sql2) INTO v_sql2; SELECT trim(trailing ' ' FROM v_sql3) INTO v_sql3; SELECT trim(trailing ',' FROM v_sql3) INTO v_sql3; END IF; raise notice 'v_delim , %',v_delim; EXECUTE concat_ws('','SELECT COUNT(*) FROM ',v_table_name_stg) INTO v_cnt; raise notice 'stg cnt , %',v_cnt; /* if upper(v_delim) = 'CSV' then v_sql_ins := concat_ws('', ' SELECT * from ' ,v_table_name_stg ); else -- v_sql_ins := concat_ws('',' SELECT ',v_sql1,' from (select col1 from ' ,v_table_name_stg , ')sub '); v_sql_ins := concat_ws('',' SELECT ',v_sql3,' from (select col1 from ' ,v_table_name_stg , ')sub '); END IF;*/ v_chunk := v_cnt/100; for i in 1..101 loop BEGIN -- raise notice 'v_sql , %',v_sql; -- raise notice 'Chunk number , %',i; v_sql_ins := concat_ws('',' SELECT ',v_sql3,' from (select col1 from ' ,v_table_name_stg , ' offset ',v_chunk*(i-1), ' limit ',v_chunk,') sub '); v_sql := concat_ws('','insert into ', pi_ext_table ,' ', v_sql_ins); -- raise notice 'select statement , %',v_sql_ins; -- v_sql := null; -- EXECUTE concat_ws('','insert into ', pi_ext_table ,' ', v_sql_ins, 'offset ',v_chunk*(i-1), ' limit ',v_chunk ); --v_sql := concat_ws('','insert into ', pi_ext_table ,' ', v_sql_ins ); -- raise notice 'insert statement , %',v_sql; raise NOTICE 'CHUNK START %',v_chunk*(i-1); raise NOTICE 'CHUNK END %',v_chunk; EXECUTE v_sql; EXCEPTION WHEN OTHERS THEN -- v_sql_ins := concat_ws('',' SELECT ',v_sql1, ' from (select col1 from ' ,v_table_name_stg , ' )sub '); -- raise notice 'Chunk number for cursor , %',i; raise NOTICE 'Cursor - CHUNK START %',v_chunk*(i-1); raise NOTICE 'Cursor - CHUNK END %',v_chunk; v_sql_ins := concat_ws('',' SELECT ',v_sql3, ' from (select col1 from ' ,v_table_name_stg , ' )sub '); v_final_sql := REPLACE (v_sql_ins, ''''::text, ''''''::text); -- raise notice 'v_final_sql %',v_final_sql; v_sql :=concat_ws('','do $a$ declare r refcursor;v_sql text; i numeric;v_conname text; v_typ ',pi_ext_table,'[]; v_rec ','record','; begin open r for execute ''select col1 from ',v_table_name_stg ,' offset ',v_chunk*(i-1), ' limit ',v_chunk,'''; loop begin fetch r into v_rec; EXIT WHEN NOT FOUND; v_sql := concat_ws('''',''insert into ',pi_ext_table,' SELECT ',REPLACE (v_sql3, ''''::text, ''''''::text) , ' from ( select '''''',v_rec.col1,'''''' as col1) v''); execute v_sql; exception when others then v_sql := ''INSERT INTO ERROR_TABLE VALUES (concat_ws('''''''',''''Error Name: '''',$$''||SQLERRM||''$$,''''Error State: '''',''''''||SQLSTATE||'''''',''''record : '''',$$''||v_rec.col1||''$$),'''''||pi_filename||''''',now())''; execute v_sql; continue; end ; end loop; close r; exception when others then raise; end ; $a$'); -- raise notice ' inside excp v_sql %',v_sql; execute v_sql; -- raise notice 'v_sql %',v_sql; END; END LOOP; ELSE SELECT distinct DELIMETER,FILE_NAME,DIRECTORY ,concat_ws('',' ',table_name_stg), case header_exist when 'YES' then 'CSV HEADER' else 'CSV' end as header_exist INTO STRICT v_delim,v_file_name,v_directory,v_table_name_stg,v_header FROM meta_EXTERNAL_TABLE WHERE table_name = pi_ext_table ; v_sql := concat_ws('','SELECT aws_s3.table_import_FROM_s3(''', v_table_name_stg, ''','''', ''DELIMITER AS ''''#'''' ',v_header,' '',',' aws_commons.create_s3_uri ( ''',v_directory, ''',''', v_file_name, ''',', '''',v_region,''') )'); EXECUTE v_sql; FOR rec in cr1 LOOP IF rec.start_pos IS NULL AND rec.END_pos IS NULL AND rec.no_position = 'recnum' THEN v_rec_val := 1; ELSE case WHEN upper(rec.data_type) = 'NUMERIC' THEN v_sql1 := concat_ws('',' case WHEN length(trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1))) =0 THEN null ELSE coalesce((trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1)))::NUMERIC,0)::',rec.data_type,' END as ',rec.col_list,',') ; WHEN UPPER(rec.data_type) = 'TIMESTAMP WITHOUT TIME ZONE' AND rec.date_mask = 'YYYYMMDD' THEN v_sql1 := concat_ws('','case WHEN length(trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1))) =0 THEN null ELSE to_date(coalesce((trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1))),''99990101''),''YYYYMMDD'')::',rec.data_type,' END as ',rec.col_list,','); WHEN UPPER(rec.data_type) = 'TIMESTAMP WITHOUT TIME ZONE' AND rec.date_mask = 'YYYYMMDDHH24MISS' THEN v_sql1 := concat_ws('','case WHEN length(trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1))) =0 THEN null ELSE to_date(coalesce((trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1))),''9999010100240000''),''YYYYMMDDHH24MISS'')::',rec.data_type,' END as ',rec.col_list,','); ELSE v_sql1 := concat_ws('',' case WHEN length(trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1))) =0 THEN null ELSE coalesce((trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1))),'''')::',rec.data_type,' END as ',rec.col_list,',') ; END case; END IF; v_col_list := concat_ws('',v_col_list ,v_sql1); END LOOP; SELECT trim(trailing ' ' FROM v_col_list) INTO v_col_list; SELECT trim(trailing ',' FROM v_col_list) INTO v_col_list; v_sql_col := concat_ws('',trim(trailing ',' FROM v_col_list) , ' FROM ',v_table_name_stg,' WHERE col1 IS NOT NULL AND length(col1)>0 '); v_sql_dynamic := v_sql_col; EXECUTE concat_ws('','SELECT COUNT(*) FROM ',v_table_name_stg) INTO v_cnt; IF v_rec_val = 1 THEN v_sql_ins := concat_ws('',' select row_number() over(order by ctid) as line_number ,' ,v_sql_dynamic) ; ELSE v_sql_ins := concat_ws('',' SELECT' ,v_sql_dynamic) ; END IF; BEGIN EXECUTE concat_ws('','insert into ', pi_ext_table ,' ', v_sql_ins); EXCEPTION WHEN OTHERS THEN IF v_rec_val = 1 THEN v_final_sql := ' select row_number() over(order by ctid) as line_number ,col1 from '; ELSE v_final_sql := ' SELECT col1 from'; END IF; v_sql :=concat_ws('','do $a$ declare r refcursor;v_rec_val numeric := ',coalesce(v_rec_val,0),';line_number numeric; col1 text; v_typ ',pi_ext_table,'[]; v_rec ',pi_ext_table,'; begin open r for execute ''',v_final_sql, ' ',v_table_name_stg,' WHERE col1 IS NOT NULL AND length(col1)>0 '' ; loop begin if v_rec_val = 1 then fetch r into line_number,col1; else fetch r into col1; end if; EXIT WHEN NOT FOUND; if v_rec_val = 1 then select line_number,',trim(trailing ',' FROM v_col_list) ,' into v_rec; else select ',trim(trailing ',' FROM v_col_list) ,' into v_rec; end if; insert into ',pi_ext_table,' select v_rec.*; exception when others then INSERT INTO ERROR_TABLE VALUES (concat_ws('''',''Error Name: '',SQLERRM,''Error State: '',SQLSTATE,''record : '',v_rec),''',pi_filename,''',now()); continue; end ; end loop; close r; exception when others then raise; end ; $a$'); execute v_sql; END; END IF; EXECUTE concat_ws('','SELECT COUNT(*) FROM ' ,pi_ext_table) INTO proc_rec_COUNT; EXECUTE concat_ws('','SELECT COUNT(*) FROM error_table WHERE file_name =''',pi_filename,''' and processed_time::date = clock_timestamp()::date') INTO error_rec_COUNT; EXECUTE concat_ws('','SELECT COUNT(*) FROM ',v_table_name_stg) INTO tot_rec_COUNT; INSERT INTO log_table values(pi_filename,now(),tot_rec_COUNT,proc_rec_COUNT, error_rec_COUNT); raise notice 'v_directory, %',v_directory; raise notice 'pi_filename, %',pi_filename; raise notice 'v_region, %',v_region; perform aws_s3.query_export_to_s3('SELECT replace(trim(substring(error_details,position(''('' in error_details)+1),'')''),'','','';''),file_name,processed_time FROM error_table WHERE file_name = '''||pi_filename||'''', aws_commons.create_s3_uri(v_directory, pi_filename||'.bad', v_region), options :='FORmat csv, header, delimiter $$,$$' ); raise notice 'v_directory, %',v_directory; raise notice 'pi_filename, %',pi_filename; raise notice 'v_region, %',v_region; perform aws_s3.query_export_to_s3('SELECT * FROM log_table WHERE file_name = '''||pi_filename||'''', aws_commons.create_s3_uri(v_directory, pi_filename||'.log', v_region), options :='FORmat csv, header, delimiter $$,$$' ); END IF; j := j+1; END LOOP; RETURN 'OK'; EXCEPTION WHEN OTHERS THEN raise notice 'error %',sqlerrm; ERCODE=SQLSTATE; IF ERCODE = 'NTFIP' THEN v_sqlerrm := concat_Ws('',sqlerrm,'No data for the filename'); ELSIF ERCODE = 'S3IMP' THEN v_sqlerrm := concat_Ws('',sqlerrm,'Error While exporting the file from S3'); ELSE v_sqlerrm := sqlerrm; END IF; select distinct directory into v_directory from meta_EXTERNAL_TABLE; raise notice 'exc v_directory, %',v_directory; raise notice 'exc pi_filename, %',pi_filename; raise notice 'exc v_region, %',v_region; perform aws_s3.query_export_to_s3('SELECT * FROM error_table WHERE file_name = '''||pi_filename||'''', aws_commons.create_s3_uri(v_directory, pi_filename||'.bad', v_region), options :='FORmat csv, header, delimiter $$,$$' ); RETURN null; END; $function$