Esegui la migrazione di tabelle esterne Oracle verso Amazon Aurora, compatibile con PostgreSQL - Prontuario AWS

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Esegui la migrazione di tabelle esterne Oracle verso Amazon Aurora, compatibile con PostgreSQL

Creato da anuradha chintha (AWS) e Rakesh Raghav (AWS)

Ambiente: PoC o pilota

Fonte: Oracle

Destinazione: Aurora PostgreSQL

Tipo R: Re-architect

Carico di lavoro: open source

Tecnologie: migrazione; database; modernizzazione

Servizi AWS: AWS Identity and Access Management; AWS Lambda; Amazon S3; Amazon SNS; Amazon Aurora

Riepilogo

Le tabelle esterne offrono a Oracle la possibilità di interrogare i dati archiviati all'esterno del database in file flat. È possibile utilizzare il driver ORACLE_LOADER per accedere a qualsiasi dato memorizzato in qualsiasi formato che possa essere caricato dall'utilità SQL*Loader. Non è possibile utilizzare Data Manipulation Language (DML) su tabelle esterne, ma è possibile utilizzare tabelle esterne per operazioni di interrogazione, join e ordinamento.

Amazon Aurora PostgreSQL Compatible Edition non offre funzionalità simili alle tabelle esterne di Oracle. È invece necessario utilizzare la modernizzazione per sviluppare una soluzione scalabile che soddisfi i requisiti funzionali e sia parsimoniosa.

Questo modello fornisce i passaggi per la migrazione di diversi tipi di tabelle esterne Oracle all'edizione compatibile con Aurora PostgreSQL sul cloud Amazon Web Services (AWS) utilizzando l'estensione. aws_s3

Consigliamo di testare a fondo questa soluzione prima di implementarla in un ambiente di produzione.

Prerequisiti e limitazioni

Prerequisiti

  • Un account AWS attivo

  • Interfaccia a riga di comando di AWS (CLI AWS)

  • Un'istanza di database compatibile con Aurora PostgreSQL disponibile.

  • Un database Oracle locale con una tabella esterna

  • API PG.Client

  • File di dati 

Limitazioni

  • Questo modello non fornisce la funzionalità necessaria per sostituire le tabelle esterne Oracle. Tuttavia, i passaggi e il codice di esempio possono essere ulteriormente migliorati per raggiungere gli obiettivi di modernizzazione del database.

  • I file non devono contenere il carattere che viene utilizzato come delimitatore nelle funzioni di aws_s3 esportazione e importazione.

Versioni del prodotto

  • Per importare da Amazon S3 in RDS per PostgreSQL, il database deve eseguire PostgreSQL versione 10.7 o successiva.

Architettura

Stack tecnologico di origine

  • Oracle

Architettura di origine

Diagramma dei file di dati inseriti in una directory e in una tabella nel database Oracle locale.

Stack tecnologico Target

  • Compatibile con Amazon Aurora PostgreSQL

  • Amazon CloudWatch

  • AWS Lambda

  • AWS Secrets Manager

  • Servizio di notifica semplice Amazon (Amazon Simple Notification Service (Amazon SNS))

  • Amazon Simple Storage Service (Amazon S3)

Architettura Target

Il diagramma seguente mostra una rappresentazione di alto livello della soluzione.

La descrizione è dopo il diagramma.
  1. I file vengono caricati nel bucket S3.

  2. Viene avviata la funzione Lambda.

  3. La funzione Lambda avvia la chiamata alla funzione DB.

  4. Secrets Manager fornisce le credenziali per l'accesso al database.

  5. A seconda della funzione DB, viene creato un allarme SNS.

Automazione e scalabilità

Qualsiasi aggiunta o modifica alle tabelle esterne può essere gestita con la manutenzione dei metadati.

Strumenti

  • Compatibile con Amazon Aurora PostgreSQL — Amazon Aurora PostgreSQL Compatible Edition è un motore di database relazionale completamente gestito, compatibile con PostgreSQL e conforme ad ACID che combina la velocità e l'affidabilità dei database commerciali di fascia alta con l'economicità dei database open source.

  • AWS CLI — AWS Command Line Interface (AWS CLI) è uno strumento unificato per gestire i servizi AWS. Con un solo strumento da scaricare e configurare, puoi controllare più servizi AWS dalla riga di comando e automatizzarli tramite script.

  • Amazon CloudWatch: Amazon CloudWatch monitora le risorse e l'utilizzo di Amazon S3.

  • AWS Lambda: AWS Lambda è un servizio di elaborazione serverless che supporta l'esecuzione di codice senza effettuare il provisioning o la gestione di server, creare una logica di scalabilità del cluster in base al carico di lavoro, mantenere integrazioni di eventi o gestire i runtime. In questo modello, Lambda esegue la funzione di database ogni volta che un file viene caricato su Amazon S3.

  • AWS Secrets Manager — AWS Secrets Manager è un servizio per l'archiviazione e il recupero delle credenziali. Utilizzando Secrets Manager, puoi sostituire le credenziali codificate nel codice, comprese le password, con una chiamata API a Secrets Manager per recuperare il segreto a livello di codice.

  • Amazon S3 — Amazon Simple Storage Service (Amazon S3) fornisce un livello di storage per ricevere e archiviare file per il consumo e la trasmissione da e verso il cluster Aurora compatibile con PostgreSQL.

  • aws_s3 — L'estensione aws_s3 integra la compatibilità con Amazon S3 e Aurora PostgreSQL.

  • Amazon SNS — Amazon Simple Notification Service (Amazon SNS) coordina e gestisce la consegna o l'invio di messaggi tra editori e clienti. In questo modello, Amazon SNS viene utilizzato per inviare notifiche.

Codice

Ogni volta che un file viene inserito nel bucket S3, è necessario creare e richiamare una funzione DB dall'applicazione di elaborazione o dalla funzione Lambda. Per i dettagli, consulta il codice (allegato).

Epiche

AttivitàDescrizioneCompetenze richieste

Aggiungere un file esterno al database di origine.

Crea un file esterno e spostalo nella oracle directory.

DBA
AttivitàDescrizioneCompetenze richieste

Crea un database Aurora PostgreSQL.

Crea un'istanza DB nel tuo cluster compatibile con Amazon Aurora PostgreSQL.

DBA

Crea uno schema, un'estensione aws_s3 e tabelle.

Usa il codice ext_tbl_scripts nella sezione Informazioni aggiuntive. Le tabelle includono tabelle effettive, tabelle intermedie, tabelle di errore e di registro e una metatabella.

DBA, Sviluppatore

Crea la funzione DB.

Per creare la funzione DB, utilizzate il codice sotto load_external_table_latest la funzione nella sezione Informazioni aggiuntive.

DBA, Sviluppatore
AttivitàDescrizioneCompetenze richieste

Creare un ruolo.

Crea un ruolo con autorizzazioni per accedere ad Amazon S3 e Amazon Relational Database Service (Amazon RDS). Questo ruolo verrà assegnato a Lambda per l'esecuzione del pattern.

DBA

Creazione della funzione Lambda

Crea una funzione Lambda che legga il nome del file da Amazon S3 (ad esempiofile_key = info.get('object', {}).get('key')) e chiami la funzione DB (ad esempiocurs.callproc("load_external_tables", [file_key])) con il nome del file come parametro di input.

A seconda del risultato della chiamata alla funzione, verrà avviata una notifica SNS (ad esempio,). client.publish(TopicArn='arn:',Message='fileloadsuccess',Subject='fileloadsuccess')

In base alle esigenze aziendali, è possibile creare una funzione Lambda con codice aggiuntivo, se necessario. Per ulteriori informazioni, consulta la documentazione di Lambda.

DBA

Configura un trigger di evento del bucket S3.

Configura un meccanismo per chiamare la funzione Lambda per tutti gli eventi di creazione di oggetti nel bucket S3.

DBA

Crea un segreto.

Crea un nome segreto per le credenziali del database utilizzando Secrets Manager. Passa il segreto nella funzione Lambda.

DBA

Carica i file di supporto Lambda.

Carica un file.zip che contiene i pacchetti di supporto Lambda e lo script Python allegato per la connessione a Aurora PostgreSQL compatibile. Il codice Python richiama la funzione che hai creato nel database.

DBA

Creare un argomento SNS.

Crea un argomento SNS per inviare posta in caso di successo o fallimento del caricamento dei dati.

DBA
AttivitàDescrizioneCompetenze richieste

Crea un bucket S3.

Sulla console Amazon S3, crea un bucket S3 con un nome univoco che non contenga barre iniziali. Il nome di un bucket S3 è unico a livello globale e lo spazio dei nomi è condiviso da tutti gli account AWS.

DBA

Crea politiche IAM.

Per creare le policy di AWS Identity and Access Management (IAM), usa il codice s3bucketpolicy_for_import nella sezione Informazioni aggiuntive.

DBA

Crea ruoli.

Crea due ruoli per la compatibilità con Aurora PostgreSQL, un ruolo per l'importazione e un ruolo per l'esportazione. Assegna le politiche corrispondenti ai ruoli.

DBA

Collega i ruoli al cluster compatibile con Aurora PostgreSQL.

In Gestisci ruoli, collega i ruoli di importazione ed esportazione al cluster Aurora PostgreSQL.

DBA

Crea oggetti di supporto compatibili con Aurora PostgreSQL.

Per gli script delle tabelle, usa il codice riportato nella sezione Informazioni aggiuntive. ext_tbl_scripts

Per la funzione personalizzata, usa il codice riportato load_external_Table_latest nella sezione Informazioni aggiuntive.

DBA
AttivitàDescrizioneCompetenze richieste

Carica un file nel bucket S3.

Per caricare un file di test nel bucket S3, usa la console o il seguente comando nella CLI di AWS. 

aws s3 cp /Users/Desktop/ukpost/exttbl/"testing files"/aps s3://s3importtest/inputext/aps

Non appena il file viene caricato, un evento bucket avvia la funzione Lambda, che esegue la funzione compatibile con Aurora PostgreSQL.

DBA

Controlla i dati e i file di registro e di errore.

La funzione compatibile con Aurora PostgreSQL carica i file nella tabella principale e crea .log file nel bucket S3. .bad

DBA

Monitora la soluzione.

Nella CloudWatch console Amazon, monitora la funzione Lambda.

DBA

Risorse correlate

Informazioni aggiuntive

ext_table_scripts

CREATE EXTENSION aws_s3 CASCADE; CREATE TABLE IF NOT EXISTS meta_EXTERNAL_TABLE (     table_name_stg character varying(100) ,     table_name character varying(100)  ,     col_list character varying(1000)  ,     data_type character varying(100)  ,     col_order numeric,     start_pos numeric,     end_pos numeric,     no_position character varying(100)  ,     date_mask character varying(100)  ,     delimeter character(1)  ,     directory character varying(100)  ,     file_name character varying(100)  ,     header_exist character varying(5) ); CREATE TABLE IF NOT EXISTS ext_tbl_stg (     col1 text ); CREATE TABLE IF NOT EXISTS error_table (     error_details text,     file_name character varying(100),     processed_time timestamp without time zone ); CREATE TABLE IF NOT EXISTS log_table (     file_name character varying(50) COLLATE pg_catalog."default",     processed_date timestamp without time zone,     tot_rec_count numeric,     proc_rec_count numeric,     error_rec_count numeric ); sample insert scripts of meta data: INSERT INTO meta_EXTERNAL_TABLE (table_name_stg, table_name, col_list, data_type, col_order, start_pos, end_pos, no_position, date_mask, delimeter, directory, file_name, header_exist) VALUES ('F_EX_APS_TRANSACTIONS_STG', 'F_EX_APS_TRANSACTIONS', 'source_filename', 'character varying', 2, 8, 27, NULL, NULL, NULL, 'databasedev', 'externalinterface/loaddir/APS', 'NO'); INSERT INTO meta_EXTERNAL_TABLE (table_name_stg, table_name, col_list, data_type, col_order, start_pos, end_pos, no_position, date_mask, delimeter, directory, file_name, header_exist) VALUES ('F_EX_APS_TRANSACTIONS_STG', 'F_EX_APS_TRANSACTIONS', 'record_type_identifier', 'character varying', 3, 28, 30, NULL, NULL, NULL, 'databasedev', 'externalinterface/loaddir/APS', 'NO'); INSERT INTO meta_EXTERNAL_TABLE (table_name_stg, table_name, col_list, data_type, col_order, start_pos, end_pos, no_position, date_mask, delimeter, directory, file_name, header_exist) VALUES ('F_EX_APS_TRANSACTIONS_STG', 'F_EX_APS_TRANSACTIONS', 'fad_code', 'numeric', 4, 31, 36, NULL, NULL, NULL, 'databasedev', 'externalinterface/loaddir/APS', 'NO'); INSERT INTO meta_EXTERNAL_TABLE (table_name_stg, table_name, col_list, data_type, col_order, start_pos, end_pos, no_position, date_mask, delimeter, directory, file_name, header_exist) VALUES ('F_EX_APS_TRANSACTIONS_STG', 'F_EX_APS_TRANSACTIONS', 'session_sequence_number', 'numeric', 5, 37, 42, NULL, NULL, NULL, 'databasedev', 'externalinterface/loaddir/APS', 'NO'); INSERT INTO meta_EXTERNAL_TABLE (table_name_stg, table_name, col_list, data_type, col_order, start_pos, end_pos, no_position, date_mask, delimeter, directory, file_name, header_exist) VALUES ('F_EX_APS_TRANSACTIONS_STG', 'F_EX_APS_TRANSACTIONS', 'transaction_sequence_number', 'numeric', 6, 43, 48, NULL, NULL, NULL, 'databasedev', 'externalinterface/loaddir/APS', 'NO');

s3bucketpolicy_for import

---Import role policy --Create an IAM policy to allow, Get,  and list actions on S3 bucket  {     "Version": "2012-10-17",     "Statement": [         {             "Sid": "s3import",             "Action": [                 "s3:GetObject",                 "s3:ListBucket"             ],             "Effect": "Allow",             "Resource": [                 "arn:aws:s3:::s3importtest",                 "arn:aws:s3:::s3importtest/*"             ]         }     ] } --Export Role policy --Create an IAM policy to allow, put,  and list actions on S3 bucket {     "Version": "2012-10-17",     "Statement": [         {             "Sid": "s3export",             "Action": [                 "S3:PutObject",                 "s3:ListBucket"             ],             "Effect": "Allow",             "Resource": [                 "arn:aws:s3:::s3importtest/*"             ]         }     ] }

Esempio di funzione DB load_external_tables_latest

CREATE OR REPLACE FUNCTION public.load_external_tables(pi_filename text)  RETURNS character varying  LANGUAGE plpgsql AS $function$ /* Loading data from S3 bucket into a APG table */ DECLARE  v_final_sql TEXT;  pi_ext_table TEXT;  r refCURSOR;  v_sqlerrm text;  v_chunk numeric;  i integer;  v_col_list TEXT;  v_postion_list CHARACTER VARYING(1000);  v_len  integer;  v_delim varchar;  v_file_name CHARACTER VARYING(1000);  v_directory CHARACTER VARYING(1000);  v_table_name_stg CHARACTER VARYING(1000);  v_sql_col TEXT;  v_sql TEXT;  v_sql1 TEXT;  v_sql2 TEXT;  v_sql3 TEXT;  v_cnt integer;  v_sql_dynamic TEXT;  v_sql_ins TEXT;  proc_rec_COUNT integer;  error_rec_COUNT integer;  tot_rec_COUNT integer;  v_rec_val integer;  rec record;  v_col_cnt integer;  kv record;  v_val text;  v_header text;  j integer;  ERCODE VARCHAR(5);  v_region text;  cr CURSOR FOR  SELECT distinct DELIMETER,    FILE_NAME,    DIRECTORY  FROM  meta_EXTERNAL_TABLE  WHERE table_name = pi_ext_table    AND DELIMETER IS NOT NULL;  cr1 CURSOR FOR    SELECT   col_list,    data_type,    start_pos,    END_pos,    concat_ws('',' ',TABLE_NAME_STG) as TABLE_NAME_STG,    no_position,date_mask  FROM  meta_EXTERNAL_TABLE  WHERE table_name = pi_ext_table  order by col_order asc; cr2 cursor FOR SELECT  distinct table_name,table_name_stg    FROM  meta_EXTERNAL_TABLE    WHERE upper(file_name) = upper(pi_filename); BEGIN  -- PERFORM utl_file_utility.init();    v_region := 'us-east-1';    /* find tab details from file name */    --DELETE FROM  ERROR_TABLE WHERE file_name= pi_filename;   -- DELETE FROM  log_table WHERE file_name= pi_filename;  BEGIN    SELECT distinct table_name,table_name_stg INTO strict pi_ext_table,v_table_name_stg    FROM  meta_EXTERNAL_TABLE    WHERE upper(file_name) = upper(pi_filename);  EXCEPTION    WHEN NO_DATA_FOUND THEN     raise notice 'error 1,%',sqlerrm;     pi_ext_table := null;     v_table_name_stg := null;       RAISE USING errcode = 'NTFIP' ;     when others then         raise notice 'error others,%',sqlerrm;  END;  j :=1 ;    for rec in  cr2  LOOP   pi_ext_table     := rec.table_name;   v_table_name_stg := rec.table_name_stg;   v_col_list := null;  IF pi_ext_table IS NOT NULL   THEN     --EXECUTE concat_ws('','truncate table  ' ,pi_ext_table) ;    EXECUTE concat_ws('','truncate table  ' ,v_table_name_stg) ;        SELECT distinct DELIMETER INTO STRICT v_delim        FROM  meta_EXTERNAL_TABLE        WHERE table_name = pi_ext_table;        IF v_delim IS NOT NULL THEN      SELECT distinct DELIMETER,        FILE_NAME,        DIRECTORY ,        concat_ws('',' ',table_name_stg),        case  header_exist when 'YES' then 'CSV HEADER' else 'CSV' end as header_exist      INTO STRICT v_delim,v_file_name,v_directory,v_table_name_stg,v_header      FROM  meta_EXTERNAL_TABLE      WHERE table_name = pi_ext_table        AND DELIMETER IS NOT NULL;      IF    upper(v_delim) = 'CSV'      THEN        v_sql := concat_ws('','SELECT aws_s3.table_import_FROM_s3 ( ''',        v_table_name_stg,''','''',        ''DELIMITER '''','''' CSV HEADER QUOTE ''''"'''''', aws_commons.create_s3_uri ( ''',        v_directory,''',''',v_file_name,''', ''',v_region,'''))');        ELSE        v_sql := concat_ws('','SELECT aws_s3.table_import_FROM_s3(''',            v_table_name_stg, ''','''', ''DELIMITER AS ''''^''''',''',','           aws_commons.create_s3_uri            ( ''',v_directory, ''',''',            v_file_name, ''',',             '''',v_region,''')           )');           raise notice 'v_sql , %',v_sql;        begin         EXECUTE  v_sql;        EXCEPTION          WHEN OTHERS THEN            raise notice 'error 1';          RAISE USING errcode = 'S3IMP' ;        END;        select count(col_list) INTO v_col_cnt        from  meta_EXTERNAL_TABLE where table_name = pi_ext_table;         -- raise notice 'v_sql 2, %',concat_ws('','update ',v_table_name_stg, ' set col1 = col1||''',v_delim,'''');        execute concat_ws('','update ',v_table_name_stg, ' set col1 = col1||''',v_delim,'''');        i :=1;        FOR rec in cr1        loop        v_sql1 := concat_ws('',v_sql1,'split_part(col1,''',v_delim,''',', i,')',' as ',rec.col_list,',');        v_sql2 := concat_ws('',v_sql2,rec.col_list,',');    --    v_sql3 := concat_ws('',v_sql3,'rec.',rec.col_list,'::',rec.data_type,',');        case          WHEN upper(rec.data_type) = 'NUMERIC'          THEN v_sql3 := concat_ws('',v_sql3,' case WHEN length(trim(split_part(col1,''',v_delim,''',', i,'))) =0                 THEN null                  ELSE                  coalesce((trim(split_part(col1,''',v_delim,''',', i,')))::NUMERIC,0)::',rec.data_type,' END as ',rec.col_list,',') ;          WHEN UPPER(rec.data_type) = 'TIMESTAMP WITHOUT TIME ZONE' AND rec.date_mask = 'YYYYMMDD'          THEN v_sql3 := concat_ws('',v_sql3,' case WHEN length(trim(split_part(col1,''',v_delim,''',', i,'))) =0                 THEN null                  ELSE                  to_date(coalesce((trim(split_part(col1,''',v_delim,''',', i,'))),''99990101''),''YYYYMMDD'')::',rec.data_type,' END as ',rec.col_list,',');          WHEN UPPER(rec.data_type) = 'TIMESTAMP WITHOUT TIME ZONE' AND rec.date_mask =  'MM/DD/YYYY hh24:mi:ss'          THEN v_sql3 := concat_ws('',v_sql3,' case WHEN length(trim(split_part(col1,''',v_delim,''',', i,'))) =0                 THEN null                  ELSE                  to_date(coalesce((trim(split_part(col1,''',v_delim,''',', i,'))),''01/01/9999 0024:00:00''),''MM/DD/YYYY hh24:mi:ss'')::',rec.data_type,' END as ',rec.col_list,',');           ELSE         v_sql3 := concat_ws('',v_sql3,' case WHEN length(trim(split_part(col1,''',v_delim,''',', i,'))) =0                 THEN null                  ELSE                   coalesce((trim(split_part(col1,''',v_delim,''',', i,'))),'''')::',rec.data_type,' END as ',rec.col_list,',') ;        END case;        i :=i+1;        end loop;          -- raise notice 'v_sql 3, %',v_sql3;        SELECT trim(trailing ' ' FROM v_sql1) INTO v_sql1;        SELECT trim(trailing ',' FROM v_sql1) INTO v_sql1;        SELECT trim(trailing ' ' FROM v_sql2) INTO v_sql2;        SELECT trim(trailing ',' FROM v_sql2) INTO v_sql2;        SELECT trim(trailing ' ' FROM v_sql3) INTO v_sql3;        SELECT trim(trailing ',' FROM v_sql3) INTO v_sql3;        END IF;       raise notice 'v_delim , %',v_delim;      EXECUTE concat_ws('','SELECT COUNT(*) FROM ',v_table_name_stg)  INTO v_cnt;     raise notice 'stg cnt , %',v_cnt;     /* if upper(v_delim) = 'CSV' then        v_sql_ins := concat_ws('', ' SELECT * from ' ,v_table_name_stg );      else       -- v_sql_ins := concat_ws('',' SELECT ',v_sql1,'  from (select col1 from ' ,v_table_name_stg , ')sub ');        v_sql_ins := concat_ws('',' SELECT ',v_sql3,'  from (select col1 from ' ,v_table_name_stg , ')sub ');        END IF;*/ v_chunk := v_cnt/100; for i in 1..101 loop      BEGIN     -- raise notice 'v_sql , %',v_sql;        -- raise notice 'Chunk number , %',i;        v_sql_ins := concat_ws('',' SELECT ',v_sql3,'  from (select col1 from ' ,v_table_name_stg , ' offset ',v_chunk*(i-1), ' limit ',v_chunk,') sub ');      v_sql := concat_ws('','insert into  ', pi_ext_table ,' ', v_sql_ins);      -- raise notice 'select statement , %',v_sql_ins;           -- v_sql := null;      -- EXECUTE concat_ws('','insert into  ', pi_ext_table ,' ', v_sql_ins, 'offset ',v_chunk*(i-1), ' limit ',v_chunk );      --v_sql := concat_ws('','insert into  ', pi_ext_table ,' ', v_sql_ins );      -- raise notice 'insert statement , %',v_sql;     raise NOTICE 'CHUNK START %',v_chunk*(i-1);    raise NOTICE 'CHUNK END %',v_chunk;      EXECUTE v_sql;   EXCEPTION        WHEN OTHERS THEN        -- v_sql_ins := concat_ws('',' SELECT ',v_sql1, '  from (select col1 from ' ,v_table_name_stg , ' )sub ');          -- raise notice 'Chunk number for cursor , %',i;     raise NOTICE 'Cursor - CHUNK START %',v_chunk*(i-1);    raise NOTICE 'Cursor -  CHUNK END %',v_chunk;          v_sql_ins := concat_ws('',' SELECT ',v_sql3, '  from (select col1 from ' ,v_table_name_stg , ' )sub ');          v_final_sql := REPLACE (v_sql_ins, ''''::text, ''''''::text);         -- raise notice 'v_final_sql %',v_final_sql;          v_sql :=concat_ws('','do $a$ declare  r refcursor;v_sql text; i numeric;v_conname text;  v_typ  ',pi_ext_table,'[]; v_rec  ','record',';            begin            open r for execute ''select col1 from ',v_table_name_stg ,'  offset ',v_chunk*(i-1), ' limit ',v_chunk,''';            loop            begin            fetch r into v_rec;            EXIT WHEN NOT FOUND;            v_sql := concat_ws('''',''insert into  ',pi_ext_table,' SELECT ',REPLACE (v_sql3, ''''::text, ''''''::text) , '  from ( select '''''',v_rec.col1,'''''' as col1) v'');             execute v_sql;            exception             when others then           v_sql := ''INSERT INTO  ERROR_TABLE VALUES (concat_ws('''''''',''''Error Name: '''',$$''||SQLERRM||''$$,''''Error State: '''',''''''||SQLSTATE||'''''',''''record : '''',$$''||v_rec.col1||''$$),'''''||pi_filename||''''',now())'';                execute v_sql;              continue;            end ;            end loop;            close r;            exception            when others then          raise;            end ; $a$');       -- raise notice ' inside excp v_sql %',v_sql;           execute v_sql;       --  raise notice 'v_sql %',v_sql;        END;   END LOOP;      ELSE      SELECT distinct DELIMETER,FILE_NAME,DIRECTORY ,concat_ws('',' ',table_name_stg),        case  header_exist when 'YES' then 'CSV HEADER' else 'CSV' end as header_exist        INTO STRICT v_delim,v_file_name,v_directory,v_table_name_stg,v_header      FROM  meta_EXTERNAL_TABLE      WHERE table_name = pi_ext_table                  ;      v_sql := concat_ws('','SELECT aws_s3.table_import_FROM_s3(''',        v_table_name_stg, ''','''', ''DELIMITER AS ''''#'''' ',v_header,' '',','       aws_commons.create_s3_uri        ( ''',v_directory, ''',''',        v_file_name, ''',',         '''',v_region,''')       )');          EXECUTE  v_sql;      FOR rec in cr1      LOOP       IF rec.start_pos IS NULL AND rec.END_pos IS NULL AND rec.no_position = 'recnum'       THEN         v_rec_val := 1;       ELSE        case          WHEN upper(rec.data_type) = 'NUMERIC'          THEN v_sql1 := concat_ws('',' case WHEN length(trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1))) =0                 THEN null                  ELSE                  coalesce((trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1)))::NUMERIC,0)::',rec.data_type,' END as ',rec.col_list,',') ;          WHEN UPPER(rec.data_type) = 'TIMESTAMP WITHOUT TIME ZONE' AND rec.date_mask = 'YYYYMMDD'          THEN v_sql1 := concat_ws('','case WHEN length(trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1))) =0                 THEN null                  ELSE                  to_date(coalesce((trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1))),''99990101''),''YYYYMMDD'')::',rec.data_type,' END as ',rec.col_list,',');          WHEN UPPER(rec.data_type) = 'TIMESTAMP WITHOUT TIME ZONE' AND rec.date_mask = 'YYYYMMDDHH24MISS'          THEN v_sql1 := concat_ws('','case WHEN length(trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1))) =0                 THEN null                  ELSE                  to_date(coalesce((trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1))),''9999010100240000''),''YYYYMMDDHH24MISS'')::',rec.data_type,' END as ',rec.col_list,',');           ELSE         v_sql1 := concat_ws('',' case WHEN length(trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1))) =0                 THEN null                  ELSE                   coalesce((trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1))),'''')::',rec.data_type,' END as ',rec.col_list,',') ;        END case;       END IF;       v_col_list := concat_ws('',v_col_list ,v_sql1);      END LOOP;            SELECT trim(trailing ' ' FROM v_col_list) INTO v_col_list;            SELECT trim(trailing ',' FROM v_col_list) INTO v_col_list;            v_sql_col   :=  concat_ws('',trim(trailing ',' FROM v_col_list) , ' FROM  ',v_table_name_stg,' WHERE col1 IS NOT NULL AND length(col1)>0 ');            v_sql_dynamic := v_sql_col;            EXECUTE  concat_ws('','SELECT COUNT(*) FROM ',v_table_name_stg) INTO v_cnt;          IF v_rec_val = 1 THEN              v_sql_ins := concat_ws('',' select row_number() over(order by ctid) as line_number ,' ,v_sql_dynamic) ;          ELSE                v_sql_ins := concat_ws('',' SELECT' ,v_sql_dynamic) ;            END IF;      BEGIN        EXECUTE concat_ws('','insert into  ', pi_ext_table ,' ', v_sql_ins);            EXCEPTION               WHEN OTHERS THEN           IF v_rec_val = 1 THEN                   v_final_sql := ' select row_number() over(order by ctid) as line_number ,col1 from ';                 ELSE                  v_final_sql := ' SELECT col1 from';                END IF;        v_sql :=concat_ws('','do $a$ declare  r refcursor;v_rec_val numeric := ',coalesce(v_rec_val,0),';line_number numeric; col1 text; v_typ  ',pi_ext_table,'[]; v_rec  ',pi_ext_table,';              begin              open r for execute ''',v_final_sql, ' ',v_table_name_stg,' WHERE col1 IS NOT NULL AND length(col1)>0 '' ;              loop              begin              if   v_rec_val = 1 then              fetch r into line_number,col1;              else              fetch r into col1;              end if;              EXIT WHEN NOT FOUND;               if v_rec_val = 1 then               select line_number,',trim(trailing ',' FROM v_col_list) ,' into v_rec;               else                 select ',trim(trailing ',' FROM v_col_list) ,' into v_rec;               end if;              insert into  ',pi_ext_table,' select v_rec.*;               exception               when others then                INSERT INTO  ERROR_TABLE VALUES (concat_ws('''',''Error Name: '',SQLERRM,''Error State: '',SQLSTATE,''record : '',v_rec),''',pi_filename,''',now());                continue;               end ;                end loop;              close r;               exception               when others then               raise;               end ; $a$');          execute v_sql;      END;          END IF;    EXECUTE concat_ws('','SELECT COUNT(*) FROM  ' ,pi_ext_table)   INTO proc_rec_COUNT;    EXECUTE concat_ws('','SELECT COUNT(*) FROM  error_table WHERE file_name =''',pi_filename,''' and processed_time::date = clock_timestamp()::date')  INTO error_rec_COUNT;    EXECUTE concat_ws('','SELECT COUNT(*) FROM ',v_table_name_stg)   INTO tot_rec_COUNT;    INSERT INTO  log_table values(pi_filename,now(),tot_rec_COUNT,proc_rec_COUNT, error_rec_COUNT);    raise notice 'v_directory, %',v_directory;    raise notice 'pi_filename, %',pi_filename;    raise notice 'v_region, %',v_region;   perform aws_s3.query_export_to_s3('SELECT replace(trim(substring(error_details,position(''('' in error_details)+1),'')''),'','','';''),file_name,processed_time FROM  error_table WHERE file_name = '''||pi_filename||'''',    aws_commons.create_s3_uri(v_directory, pi_filename||'.bad', v_region),    options :='FORmat csv, header, delimiter $$,$$'    ); raise notice 'v_directory, %',v_directory;    raise notice 'pi_filename, %',pi_filename;    raise notice 'v_region, %',v_region;   perform aws_s3.query_export_to_s3('SELECT * FROM  log_table WHERE file_name = '''||pi_filename||'''',    aws_commons.create_s3_uri(v_directory, pi_filename||'.log', v_region),    options :='FORmat csv, header, delimiter $$,$$'    );    END IF;  j := j+1;  END LOOP;        RETURN 'OK'; EXCEPTION     WHEN  OTHERS THEN   raise notice 'error %',sqlerrm;    ERCODE=SQLSTATE;    IF ERCODE = 'NTFIP' THEN      v_sqlerrm := concat_Ws('',sqlerrm,'No data for the filename');    ELSIF ERCODE = 'S3IMP' THEN     v_sqlerrm := concat_Ws('',sqlerrm,'Error While exporting the file from S3');    ELSE       v_sqlerrm := sqlerrm;    END IF;  select distinct directory into v_directory from  meta_EXTERNAL_TABLE;  raise notice 'exc v_directory, %',v_directory;    raise notice 'exc pi_filename, %',pi_filename;    raise notice 'exc v_region, %',v_region;   perform aws_s3.query_export_to_s3('SELECT * FROM  error_table WHERE file_name = '''||pi_filename||'''',    aws_commons.create_s3_uri(v_directory, pi_filename||'.bad', v_region),    options :='FORmat csv, header, delimiter $$,$$'    );     RETURN null; END; $function$