Migrieren Sie externe Oracle-Tabellen zu Amazon Aurora PostgreSQL-kompatibel - AWS Prescriptive Guidance

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Migrieren Sie externe Oracle-Tabellen zu Amazon Aurora PostgreSQL-kompatibel

Erstellt von Anuradha Chintha (AWS) und Rakesh Raghav (AWS)

Umgebung: PoC oder Pilot

Quelle: Oracle

Ziel: Aurora PostgreSQL

R-Typ: Re-Architect

Arbeitsaufwand: Open Source

Technologien: Migration; Datenbanken; Modernisierung

AWS-Services: AWS Identity and Access Management; AWS Lambda; Amazon S3; Amazon SNS; Amazon Aurora

Übersicht

Externe Tabellen geben Oracle die Möglichkeit, Daten abzufragen, die außerhalb der Datenbank in Flatfiles gespeichert sind. Sie können den ORACLE_LOADER-Treiber verwenden, um auf alle Daten zuzugreifen, die in einem beliebigen Format gespeichert sind und vom SQL*Loader-Hilfsprogramm geladen werden können. Sie können Data Manipulation Language (DML) nicht für externe Tabellen verwenden, aber Sie können externe Tabellen für Abfrage-, Join- und Sortieroperationen verwenden.

Amazon Aurora PostgreSQL-Compatible Edition bietet keine ähnliche Funktionalität wie externe Tabellen in Oracle. Stattdessen müssen Sie die Modernisierung nutzen, um eine skalierbare Lösung zu entwickeln, die funktionale Anforderungen erfüllt und sparsam ist.

Dieses Muster enthält Schritte für die Migration verschiedener Typen von externen Oracle-Tabellen zur Aurora PostgreSQL-kompatiblen Edition in der Amazon Web Services (AWS) Cloud mithilfe der Erweiterung. aws_s3

Wir empfehlen, diese Lösung gründlich zu testen, bevor Sie sie in einer Produktionsumgebung implementieren.

Voraussetzungen und Einschränkungen

Voraussetzungen

  • Ein aktives AWS-Konto

  • AWS-Befehlszeilenschnittstelle (AWS Command Line Interface, AWS CLI)

  • Eine verfügbare Aurora PostgreSQL-kompatible Datenbankinstanz.

  • Eine lokale Oracle-Datenbank mit einer externen Tabelle

  • PG.Client-API

  • Datendateien 

Einschränkungen

  • Dieses Muster bietet nicht die Funktionalität, um externe Oracle-Tabellen zu ersetzen. Die Schritte und der Beispielcode können jedoch weiter verbessert werden, um Ihre Ziele bei der Datenbankmodernisierung zu erreichen.

  • Dateien sollten nicht das Zeichen enthalten, das in aws_s3 Export- und Importfunktionen als Trennzeichen übergeben wird.

Produktversionen

  • Um aus Amazon S3 in RDS for PostgreSQL zu importieren, muss auf der Datenbank PostgreSQL Version 10.7 oder höher ausgeführt werden.

Architektur

Quelltechnologie-Stack

  • Oracle

Quellarchitektur

Diagramm der Datendateien, die in ein Verzeichnis und eine Tabelle in der lokalen Oracle-Datenbank verschoben werden.

Zieltechnologie-Stack

  • Amazon Aurora PostgreSQL-kompatibel

  • Amazon CloudWatch

  • AWS Lambda

  • AWS Secrets Manager

  • Amazon-Simple-Notification-Service (Amazon-SNS)

  • Amazon-Simple-Storage-Service (Amazon-S3)

Zielarchitektur

Das folgende Diagramm zeigt eine allgemeine Darstellung der Lösung.

Die Beschreibung befindet sich hinter dem Diagramm.
  1. Dateien werden in den S3-Bucket hochgeladen.

  2. Die Lambda-Funktion wird initiiert.

  3. Die Lambda-Funktion initiiert den DB-Funktionsaufruf.

  4. Secrets Manager stellt die Anmeldeinformationen für den Datenbankzugriff bereit.

  5. Abhängig von der DB-Funktion wird ein SNS-Alarm ausgelöst.

Automatisierung und Skalierung

Alle Ergänzungen oder Änderungen an den externen Tabellen können mit der Metadatenpflege behandelt werden.

Tools

  • Amazon Aurora PostgreSQL-kompatibel — Amazon Aurora PostgreSQL-Compatible Edition ist eine vollständig verwaltete, PostgreSQL-kompatible und ACID-konforme relationale Datenbank-Engine, die die Geschwindigkeit und Zuverlässigkeit kommerzieller High-End-Datenbanken mit der Kosteneffizienz von Open-Source-Datenbanken kombiniert.

  • AWS CLI — AWS Command Line Interface (AWS CLI) ist ein einheitliches Tool zur Verwaltung Ihrer AWS-Services. Mit nur einem Tool zum Herunterladen und Konfigurieren können Sie mehrere AWS-Services von der Befehlszeile aus steuern und über Skripts automatisieren.

  • Amazon CloudWatch — Amazon CloudWatch überwacht die Ressourcen und die Nutzung von Amazon S3.

  • AWS Lambda — AWS Lambda ist ein serverloser Rechenservice, der die Ausführung von Code ohne Bereitstellung oder Verwaltung von Servern, die Erstellung einer auslastungsorientierten Cluster-Skalierungslogik, die Verwaltung von Eventintegrationen oder die Verwaltung von Laufzeiten unterstützt. In diesem Muster führt Lambda die Datenbankfunktion immer dann aus, wenn eine Datei auf Amazon S3 hochgeladen wird.

  • AWS Secrets Manager — AWS Secrets Manager ist ein Service zum Speichern und Abrufen von Anmeldeinformationen. Mit Secrets Manager können Sie hartcodierte Anmeldeinformationen in Ihrem Code, einschließlich Kennwörtern, durch einen API-Aufruf an Secrets Manager ersetzen, um das Geheimnis programmgesteuert abzurufen.

  • Amazon S3 — Amazon Simple Storage Service (Amazon S3) bietet eine Speicherebene zum Empfangen und Speichern von Dateien zur Nutzung und Übertragung zum und vom Aurora PostgreSQL-kompatiblen Cluster.

  • aws_s3 — Die aws_s3 Erweiterung integriert Amazon S3 und Aurora PostgreSQL-kompatibel.

  • Amazon SNS — Amazon Simple Notification Service (Amazon SNS) koordiniert und verwaltet die Zustellung oder den Versand von Nachrichten zwischen Herausgebern und Kunden. In diesem Muster wird Amazon SNS zum Senden von Benachrichtigungen verwendet.

Code

Immer wenn eine Datei im S3-Bucket platziert wird, muss eine DB-Funktion erstellt und von der verarbeitenden Anwendung oder der Lambda-Funktion aus aufgerufen werden. Einzelheiten finden Sie im Code (beigefügt).

Epen

AufgabeBeschreibungErforderliche Fähigkeiten

Fügen Sie der Quelldatenbank eine externe Datei hinzu.

Erstellen Sie eine externe Datei und verschieben Sie sie in das oracle Verzeichnis.

DBA
AufgabeBeschreibungErforderliche Fähigkeiten

Erstellen Sie eine Aurora PostgreSQL-Datenbank.

Erstellen Sie eine DB-Instance in Ihrem Amazon Aurora PostgreSQL-kompatiblen Cluster.

DBA

Erstellen Sie ein Schema, die Erweiterung aws_s3 und Tabellen.

Verwenden Sie den Code unter ext_tbl_scripts dem Abschnitt Zusätzliche Informationen. Die Tabellen enthalten tatsächliche Tabellen, Staging-Tabellen, Fehler- und Protokolltabellen sowie eine Metatabelle.

DBA, Entwickler

Erstellen Sie die DB-Funktion.

Um die DB-Funktion zu erstellen, verwenden Sie den Code unter load_external_table_latest Funktion im Abschnitt Zusätzliche Informationen.

DBA, Entwickler
AufgabeBeschreibungErforderliche Fähigkeiten

Erstellen Sie eine Rolle.

Erstellen Sie eine Rolle mit Zugriffsberechtigungen für Amazon S3 und Amazon Relational Database Service (Amazon RDS). Diese Rolle wird Lambda für die Ausführung des Musters zugewiesen.

DBA

So erstellen Sie die Lambda-Funktion:

Erstellen Sie eine Lambda-Funktion, die den Dateinamen aus Amazon S3 liest (z. B.file_key = info.get('object', {}).get('key')) und die DB-Funktion (z. B.curs.callproc("load_external_tables", [file_key])) mit dem Dateinamen als Eingabeparameter aufruft.

Abhängig vom Ergebnis des Funktionsaufrufs wird eine SNS-Benachrichtigung ausgelöst (z. B.client.publish(TopicArn='arn:',Message='fileloadsuccess',Subject='fileloadsuccess')).

Basierend auf Ihren Geschäftsanforderungen können Sie bei Bedarf eine Lambda-Funktion mit zusätzlichem Code erstellen. Weitere Informationen finden Sie in der Lambda-Dokumentation.

DBA

Konfigurieren Sie einen S3-Bucket-Ereignisauslöser.

Konfigurieren Sie einen Mechanismus zum Aufrufen der Lambda-Funktion für alle Objekterstellungsereignisse im S3-Bucket.

DBA

Erstelle ein Geheimnis.

Erstellen Sie mit Secrets Manager einen geheimen Namen für die Datenbankanmeldedaten. Übergeben Sie das Geheimnis in der Lambda-Funktion.

DBA

Laden Sie die Lambda-Unterstützungsdateien hoch.

Laden Sie eine ZIP-Datei hoch, die die Lambda-Support-Pakete und das angehängte Python-Skript für die Verbindung mit Aurora PostgreSQL-kompatibel enthält. Der Python-Code ruft die Funktion auf, die Sie in der Datenbank erstellt haben.

DBA

Erstellen Sie ein SNS-Thema.

Erstellen Sie ein SNS-Thema, um E-Mails über den Erfolg oder Misserfolg des Datenladens zu senden.

DBA
AufgabeBeschreibungErforderliche Fähigkeiten

Erstellen Sie einen S3-Bucket.

Erstellen Sie auf der Amazon S3 S3-Konsole einen S3-Bucket mit einem eindeutigen Namen, der keine führenden Schrägstriche enthält. Ein S3-Bucket-Name ist weltweit eindeutig, und der Namespace wird von allen AWS-Konten gemeinsam genutzt.

DBA

Erstellen Sie IAM-Richtlinien.

Verwenden Sie den Code unter s3bucketpolicy_for_import dem Abschnitt Zusätzliche Informationen, um die AWS Identity and Access Management (IAM) -Richtlinien zu erstellen.

DBA

Rollen erstellen.

Erstellen Sie zwei Rollen für Aurora PostgreSQL-kompatibel, eine Rolle für Import und eine Rolle für Export. Weisen Sie den Rollen die entsprechenden Richtlinien zu.

DBA

Hängen Sie die Rollen an den Aurora PostgreSQL-kompatiblen Cluster an.

Fügen Sie unter Rollen verwalten die Import- und Exportrollen dem Aurora PostgreSQL-Cluster hinzu.

DBA

Erstellen Sie unterstützende Objekte für Aurora PostgreSQL-kompatibel.

Verwenden Sie für die Tabellenskripten den Code unter dem Abschnitt ext_tbl_scripts Zusätzliche Informationen.

Verwenden Sie für die benutzerdefinierte Funktion den Code load_external_Table_latest unter dem Abschnitt Zusätzliche Informationen.

DBA
AufgabeBeschreibungErforderliche Fähigkeiten

Laden Sie eine Datei in den S3-Bucket hoch.

Verwenden Sie die Konsole oder den folgenden Befehl in der AWS-CLI, um eine Testdatei in den S3-Bucket hochzuladen. 

aws s3 cp /Users/Desktop/ukpost/exttbl/"testing files"/aps s3://s3importtest/inputext/aps

Sobald die Datei hochgeladen wurde, initiiert ein Bucket-Ereignis die Lambda-Funktion, die die Aurora PostgreSQL-kompatible Funktion ausführt.

DBA

Überprüfen Sie die Daten sowie die Protokoll- und Fehlerdateien.

Die Aurora PostgreSQL-kompatible Funktion lädt die Dateien in die Haupttabelle .log und erstellt .bad Dateien im S3-Bucket.

DBA

Überwachen Sie die Lösung.

Überwachen Sie in der CloudWatch Amazon-Konsole die Lambda-Funktion.

DBA

Zugehörige Ressourcen

Zusätzliche Informationen

ext_table_scripts

CREATE EXTENSION aws_s3 CASCADE; CREATE TABLE IF NOT EXISTS meta_EXTERNAL_TABLE (     table_name_stg character varying(100) ,     table_name character varying(100)  ,     col_list character varying(1000)  ,     data_type character varying(100)  ,     col_order numeric,     start_pos numeric,     end_pos numeric,     no_position character varying(100)  ,     date_mask character varying(100)  ,     delimeter character(1)  ,     directory character varying(100)  ,     file_name character varying(100)  ,     header_exist character varying(5) ); CREATE TABLE IF NOT EXISTS ext_tbl_stg (     col1 text ); CREATE TABLE IF NOT EXISTS error_table (     error_details text,     file_name character varying(100),     processed_time timestamp without time zone ); CREATE TABLE IF NOT EXISTS log_table (     file_name character varying(50) COLLATE pg_catalog."default",     processed_date timestamp without time zone,     tot_rec_count numeric,     proc_rec_count numeric,     error_rec_count numeric ); sample insert scripts of meta data: INSERT INTO meta_EXTERNAL_TABLE (table_name_stg, table_name, col_list, data_type, col_order, start_pos, end_pos, no_position, date_mask, delimeter, directory, file_name, header_exist) VALUES ('F_EX_APS_TRANSACTIONS_STG', 'F_EX_APS_TRANSACTIONS', 'source_filename', 'character varying', 2, 8, 27, NULL, NULL, NULL, 'databasedev', 'externalinterface/loaddir/APS', 'NO'); INSERT INTO meta_EXTERNAL_TABLE (table_name_stg, table_name, col_list, data_type, col_order, start_pos, end_pos, no_position, date_mask, delimeter, directory, file_name, header_exist) VALUES ('F_EX_APS_TRANSACTIONS_STG', 'F_EX_APS_TRANSACTIONS', 'record_type_identifier', 'character varying', 3, 28, 30, NULL, NULL, NULL, 'databasedev', 'externalinterface/loaddir/APS', 'NO'); INSERT INTO meta_EXTERNAL_TABLE (table_name_stg, table_name, col_list, data_type, col_order, start_pos, end_pos, no_position, date_mask, delimeter, directory, file_name, header_exist) VALUES ('F_EX_APS_TRANSACTIONS_STG', 'F_EX_APS_TRANSACTIONS', 'fad_code', 'numeric', 4, 31, 36, NULL, NULL, NULL, 'databasedev', 'externalinterface/loaddir/APS', 'NO'); INSERT INTO meta_EXTERNAL_TABLE (table_name_stg, table_name, col_list, data_type, col_order, start_pos, end_pos, no_position, date_mask, delimeter, directory, file_name, header_exist) VALUES ('F_EX_APS_TRANSACTIONS_STG', 'F_EX_APS_TRANSACTIONS', 'session_sequence_number', 'numeric', 5, 37, 42, NULL, NULL, NULL, 'databasedev', 'externalinterface/loaddir/APS', 'NO'); INSERT INTO meta_EXTERNAL_TABLE (table_name_stg, table_name, col_list, data_type, col_order, start_pos, end_pos, no_position, date_mask, delimeter, directory, file_name, header_exist) VALUES ('F_EX_APS_TRANSACTIONS_STG', 'F_EX_APS_TRANSACTIONS', 'transaction_sequence_number', 'numeric', 6, 43, 48, NULL, NULL, NULL, 'databasedev', 'externalinterface/loaddir/APS', 'NO');

s3bucketpolicy_für den Import

---Import role policy --Create an IAM policy to allow, Get,  and list actions on S3 bucket  {     "Version": "2012-10-17",     "Statement": [         {             "Sid": "s3import",             "Action": [                 "s3:GetObject",                 "s3:ListBucket"             ],             "Effect": "Allow",             "Resource": [                 "arn:aws:s3:::s3importtest",                 "arn:aws:s3:::s3importtest/*"             ]         }     ] } --Export Role policy --Create an IAM policy to allow, put,  and list actions on S3 bucket {     "Version": "2012-10-17",     "Statement": [         {             "Sid": "s3export",             "Action": [                 "S3:PutObject",                 "s3:ListBucket"             ],             "Effect": "Allow",             "Resource": [                 "arn:aws:s3:::s3importtest/*"             ]         }     ] }

Beispiel für eine DB-Funktion load_external_tables_latest

CREATE OR REPLACE FUNCTION public.load_external_tables(pi_filename text)  RETURNS character varying  LANGUAGE plpgsql AS $function$ /* Loading data from S3 bucket into a APG table */ DECLARE  v_final_sql TEXT;  pi_ext_table TEXT;  r refCURSOR;  v_sqlerrm text;  v_chunk numeric;  i integer;  v_col_list TEXT;  v_postion_list CHARACTER VARYING(1000);  v_len  integer;  v_delim varchar;  v_file_name CHARACTER VARYING(1000);  v_directory CHARACTER VARYING(1000);  v_table_name_stg CHARACTER VARYING(1000);  v_sql_col TEXT;  v_sql TEXT;  v_sql1 TEXT;  v_sql2 TEXT;  v_sql3 TEXT;  v_cnt integer;  v_sql_dynamic TEXT;  v_sql_ins TEXT;  proc_rec_COUNT integer;  error_rec_COUNT integer;  tot_rec_COUNT integer;  v_rec_val integer;  rec record;  v_col_cnt integer;  kv record;  v_val text;  v_header text;  j integer;  ERCODE VARCHAR(5);  v_region text;  cr CURSOR FOR  SELECT distinct DELIMETER,    FILE_NAME,    DIRECTORY  FROM  meta_EXTERNAL_TABLE  WHERE table_name = pi_ext_table    AND DELIMETER IS NOT NULL;  cr1 CURSOR FOR    SELECT   col_list,    data_type,    start_pos,    END_pos,    concat_ws('',' ',TABLE_NAME_STG) as TABLE_NAME_STG,    no_position,date_mask  FROM  meta_EXTERNAL_TABLE  WHERE table_name = pi_ext_table  order by col_order asc; cr2 cursor FOR SELECT  distinct table_name,table_name_stg    FROM  meta_EXTERNAL_TABLE    WHERE upper(file_name) = upper(pi_filename); BEGIN  -- PERFORM utl_file_utility.init();    v_region := 'us-east-1';    /* find tab details from file name */    --DELETE FROM  ERROR_TABLE WHERE file_name= pi_filename;   -- DELETE FROM  log_table WHERE file_name= pi_filename;  BEGIN    SELECT distinct table_name,table_name_stg INTO strict pi_ext_table,v_table_name_stg    FROM  meta_EXTERNAL_TABLE    WHERE upper(file_name) = upper(pi_filename);  EXCEPTION    WHEN NO_DATA_FOUND THEN     raise notice 'error 1,%',sqlerrm;     pi_ext_table := null;     v_table_name_stg := null;       RAISE USING errcode = 'NTFIP' ;     when others then         raise notice 'error others,%',sqlerrm;  END;  j :=1 ;    for rec in  cr2  LOOP   pi_ext_table     := rec.table_name;   v_table_name_stg := rec.table_name_stg;   v_col_list := null;  IF pi_ext_table IS NOT NULL   THEN     --EXECUTE concat_ws('','truncate table  ' ,pi_ext_table) ;    EXECUTE concat_ws('','truncate table  ' ,v_table_name_stg) ;        SELECT distinct DELIMETER INTO STRICT v_delim        FROM  meta_EXTERNAL_TABLE        WHERE table_name = pi_ext_table;        IF v_delim IS NOT NULL THEN      SELECT distinct DELIMETER,        FILE_NAME,        DIRECTORY ,        concat_ws('',' ',table_name_stg),        case  header_exist when 'YES' then 'CSV HEADER' else 'CSV' end as header_exist      INTO STRICT v_delim,v_file_name,v_directory,v_table_name_stg,v_header      FROM  meta_EXTERNAL_TABLE      WHERE table_name = pi_ext_table        AND DELIMETER IS NOT NULL;      IF    upper(v_delim) = 'CSV'      THEN        v_sql := concat_ws('','SELECT aws_s3.table_import_FROM_s3 ( ''',        v_table_name_stg,''','''',        ''DELIMITER '''','''' CSV HEADER QUOTE ''''"'''''', aws_commons.create_s3_uri ( ''',        v_directory,''',''',v_file_name,''', ''',v_region,'''))');        ELSE        v_sql := concat_ws('','SELECT aws_s3.table_import_FROM_s3(''',            v_table_name_stg, ''','''', ''DELIMITER AS ''''^''''',''',','           aws_commons.create_s3_uri            ( ''',v_directory, ''',''',            v_file_name, ''',',             '''',v_region,''')           )');           raise notice 'v_sql , %',v_sql;        begin         EXECUTE  v_sql;        EXCEPTION          WHEN OTHERS THEN            raise notice 'error 1';          RAISE USING errcode = 'S3IMP' ;        END;        select count(col_list) INTO v_col_cnt        from  meta_EXTERNAL_TABLE where table_name = pi_ext_table;         -- raise notice 'v_sql 2, %',concat_ws('','update ',v_table_name_stg, ' set col1 = col1||''',v_delim,'''');        execute concat_ws('','update ',v_table_name_stg, ' set col1 = col1||''',v_delim,'''');        i :=1;        FOR rec in cr1        loop        v_sql1 := concat_ws('',v_sql1,'split_part(col1,''',v_delim,''',', i,')',' as ',rec.col_list,',');        v_sql2 := concat_ws('',v_sql2,rec.col_list,',');    --    v_sql3 := concat_ws('',v_sql3,'rec.',rec.col_list,'::',rec.data_type,',');        case          WHEN upper(rec.data_type) = 'NUMERIC'          THEN v_sql3 := concat_ws('',v_sql3,' case WHEN length(trim(split_part(col1,''',v_delim,''',', i,'))) =0                 THEN null                  ELSE                  coalesce((trim(split_part(col1,''',v_delim,''',', i,')))::NUMERIC,0)::',rec.data_type,' END as ',rec.col_list,',') ;          WHEN UPPER(rec.data_type) = 'TIMESTAMP WITHOUT TIME ZONE' AND rec.date_mask = 'YYYYMMDD'          THEN v_sql3 := concat_ws('',v_sql3,' case WHEN length(trim(split_part(col1,''',v_delim,''',', i,'))) =0                 THEN null                  ELSE                  to_date(coalesce((trim(split_part(col1,''',v_delim,''',', i,'))),''99990101''),''YYYYMMDD'')::',rec.data_type,' END as ',rec.col_list,',');          WHEN UPPER(rec.data_type) = 'TIMESTAMP WITHOUT TIME ZONE' AND rec.date_mask =  'MM/DD/YYYY hh24:mi:ss'          THEN v_sql3 := concat_ws('',v_sql3,' case WHEN length(trim(split_part(col1,''',v_delim,''',', i,'))) =0                 THEN null                  ELSE                  to_date(coalesce((trim(split_part(col1,''',v_delim,''',', i,'))),''01/01/9999 0024:00:00''),''MM/DD/YYYY hh24:mi:ss'')::',rec.data_type,' END as ',rec.col_list,',');           ELSE         v_sql3 := concat_ws('',v_sql3,' case WHEN length(trim(split_part(col1,''',v_delim,''',', i,'))) =0                 THEN null                  ELSE                   coalesce((trim(split_part(col1,''',v_delim,''',', i,'))),'''')::',rec.data_type,' END as ',rec.col_list,',') ;        END case;        i :=i+1;        end loop;          -- raise notice 'v_sql 3, %',v_sql3;        SELECT trim(trailing ' ' FROM v_sql1) INTO v_sql1;        SELECT trim(trailing ',' FROM v_sql1) INTO v_sql1;        SELECT trim(trailing ' ' FROM v_sql2) INTO v_sql2;        SELECT trim(trailing ',' FROM v_sql2) INTO v_sql2;        SELECT trim(trailing ' ' FROM v_sql3) INTO v_sql3;        SELECT trim(trailing ',' FROM v_sql3) INTO v_sql3;        END IF;       raise notice 'v_delim , %',v_delim;      EXECUTE concat_ws('','SELECT COUNT(*) FROM ',v_table_name_stg)  INTO v_cnt;     raise notice 'stg cnt , %',v_cnt;     /* if upper(v_delim) = 'CSV' then        v_sql_ins := concat_ws('', ' SELECT * from ' ,v_table_name_stg );      else       -- v_sql_ins := concat_ws('',' SELECT ',v_sql1,'  from (select col1 from ' ,v_table_name_stg , ')sub ');        v_sql_ins := concat_ws('',' SELECT ',v_sql3,'  from (select col1 from ' ,v_table_name_stg , ')sub ');        END IF;*/ v_chunk := v_cnt/100; for i in 1..101 loop      BEGIN     -- raise notice 'v_sql , %',v_sql;        -- raise notice 'Chunk number , %',i;        v_sql_ins := concat_ws('',' SELECT ',v_sql3,'  from (select col1 from ' ,v_table_name_stg , ' offset ',v_chunk*(i-1), ' limit ',v_chunk,') sub ');      v_sql := concat_ws('','insert into  ', pi_ext_table ,' ', v_sql_ins);      -- raise notice 'select statement , %',v_sql_ins;           -- v_sql := null;      -- EXECUTE concat_ws('','insert into  ', pi_ext_table ,' ', v_sql_ins, 'offset ',v_chunk*(i-1), ' limit ',v_chunk );      --v_sql := concat_ws('','insert into  ', pi_ext_table ,' ', v_sql_ins );      -- raise notice 'insert statement , %',v_sql;     raise NOTICE 'CHUNK START %',v_chunk*(i-1);    raise NOTICE 'CHUNK END %',v_chunk;      EXECUTE v_sql;   EXCEPTION        WHEN OTHERS THEN        -- v_sql_ins := concat_ws('',' SELECT ',v_sql1, '  from (select col1 from ' ,v_table_name_stg , ' )sub ');          -- raise notice 'Chunk number for cursor , %',i;     raise NOTICE 'Cursor - CHUNK START %',v_chunk*(i-1);    raise NOTICE 'Cursor -  CHUNK END %',v_chunk;          v_sql_ins := concat_ws('',' SELECT ',v_sql3, '  from (select col1 from ' ,v_table_name_stg , ' )sub ');          v_final_sql := REPLACE (v_sql_ins, ''''::text, ''''''::text);         -- raise notice 'v_final_sql %',v_final_sql;          v_sql :=concat_ws('','do $a$ declare  r refcursor;v_sql text; i numeric;v_conname text;  v_typ  ',pi_ext_table,'[]; v_rec  ','record',';            begin            open r for execute ''select col1 from ',v_table_name_stg ,'  offset ',v_chunk*(i-1), ' limit ',v_chunk,''';            loop            begin            fetch r into v_rec;            EXIT WHEN NOT FOUND;            v_sql := concat_ws('''',''insert into  ',pi_ext_table,' SELECT ',REPLACE (v_sql3, ''''::text, ''''''::text) , '  from ( select '''''',v_rec.col1,'''''' as col1) v'');             execute v_sql;            exception             when others then           v_sql := ''INSERT INTO  ERROR_TABLE VALUES (concat_ws('''''''',''''Error Name: '''',$$''||SQLERRM||''$$,''''Error State: '''',''''''||SQLSTATE||'''''',''''record : '''',$$''||v_rec.col1||''$$),'''''||pi_filename||''''',now())'';                execute v_sql;              continue;            end ;            end loop;            close r;            exception            when others then          raise;            end ; $a$');       -- raise notice ' inside excp v_sql %',v_sql;           execute v_sql;       --  raise notice 'v_sql %',v_sql;        END;   END LOOP;      ELSE      SELECT distinct DELIMETER,FILE_NAME,DIRECTORY ,concat_ws('',' ',table_name_stg),        case  header_exist when 'YES' then 'CSV HEADER' else 'CSV' end as header_exist        INTO STRICT v_delim,v_file_name,v_directory,v_table_name_stg,v_header      FROM  meta_EXTERNAL_TABLE      WHERE table_name = pi_ext_table                  ;      v_sql := concat_ws('','SELECT aws_s3.table_import_FROM_s3(''',        v_table_name_stg, ''','''', ''DELIMITER AS ''''#'''' ',v_header,' '',','       aws_commons.create_s3_uri        ( ''',v_directory, ''',''',        v_file_name, ''',',         '''',v_region,''')       )');          EXECUTE  v_sql;      FOR rec in cr1      LOOP       IF rec.start_pos IS NULL AND rec.END_pos IS NULL AND rec.no_position = 'recnum'       THEN         v_rec_val := 1;       ELSE        case          WHEN upper(rec.data_type) = 'NUMERIC'          THEN v_sql1 := concat_ws('',' case WHEN length(trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1))) =0                 THEN null                  ELSE                  coalesce((trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1)))::NUMERIC,0)::',rec.data_type,' END as ',rec.col_list,',') ;          WHEN UPPER(rec.data_type) = 'TIMESTAMP WITHOUT TIME ZONE' AND rec.date_mask = 'YYYYMMDD'          THEN v_sql1 := concat_ws('','case WHEN length(trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1))) =0                 THEN null                  ELSE                  to_date(coalesce((trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1))),''99990101''),''YYYYMMDD'')::',rec.data_type,' END as ',rec.col_list,',');          WHEN UPPER(rec.data_type) = 'TIMESTAMP WITHOUT TIME ZONE' AND rec.date_mask = 'YYYYMMDDHH24MISS'          THEN v_sql1 := concat_ws('','case WHEN length(trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1))) =0                 THEN null                  ELSE                  to_date(coalesce((trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1))),''9999010100240000''),''YYYYMMDDHH24MISS'')::',rec.data_type,' END as ',rec.col_list,',');           ELSE         v_sql1 := concat_ws('',' case WHEN length(trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1))) =0                 THEN null                  ELSE                   coalesce((trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1))),'''')::',rec.data_type,' END as ',rec.col_list,',') ;        END case;       END IF;       v_col_list := concat_ws('',v_col_list ,v_sql1);      END LOOP;            SELECT trim(trailing ' ' FROM v_col_list) INTO v_col_list;            SELECT trim(trailing ',' FROM v_col_list) INTO v_col_list;            v_sql_col   :=  concat_ws('',trim(trailing ',' FROM v_col_list) , ' FROM  ',v_table_name_stg,' WHERE col1 IS NOT NULL AND length(col1)>0 ');            v_sql_dynamic := v_sql_col;            EXECUTE  concat_ws('','SELECT COUNT(*) FROM ',v_table_name_stg) INTO v_cnt;          IF v_rec_val = 1 THEN              v_sql_ins := concat_ws('',' select row_number() over(order by ctid) as line_number ,' ,v_sql_dynamic) ;          ELSE                v_sql_ins := concat_ws('',' SELECT' ,v_sql_dynamic) ;            END IF;      BEGIN        EXECUTE concat_ws('','insert into  ', pi_ext_table ,' ', v_sql_ins);            EXCEPTION               WHEN OTHERS THEN           IF v_rec_val = 1 THEN                   v_final_sql := ' select row_number() over(order by ctid) as line_number ,col1 from ';                 ELSE                  v_final_sql := ' SELECT col1 from';                END IF;        v_sql :=concat_ws('','do $a$ declare  r refcursor;v_rec_val numeric := ',coalesce(v_rec_val,0),';line_number numeric; col1 text; v_typ  ',pi_ext_table,'[]; v_rec  ',pi_ext_table,';              begin              open r for execute ''',v_final_sql, ' ',v_table_name_stg,' WHERE col1 IS NOT NULL AND length(col1)>0 '' ;              loop              begin              if   v_rec_val = 1 then              fetch r into line_number,col1;              else              fetch r into col1;              end if;              EXIT WHEN NOT FOUND;               if v_rec_val = 1 then               select line_number,',trim(trailing ',' FROM v_col_list) ,' into v_rec;               else                 select ',trim(trailing ',' FROM v_col_list) ,' into v_rec;               end if;              insert into  ',pi_ext_table,' select v_rec.*;               exception               when others then                INSERT INTO  ERROR_TABLE VALUES (concat_ws('''',''Error Name: '',SQLERRM,''Error State: '',SQLSTATE,''record : '',v_rec),''',pi_filename,''',now());                continue;               end ;                end loop;              close r;               exception               when others then               raise;               end ; $a$');          execute v_sql;      END;          END IF;    EXECUTE concat_ws('','SELECT COUNT(*) FROM  ' ,pi_ext_table)   INTO proc_rec_COUNT;    EXECUTE concat_ws('','SELECT COUNT(*) FROM  error_table WHERE file_name =''',pi_filename,''' and processed_time::date = clock_timestamp()::date')  INTO error_rec_COUNT;    EXECUTE concat_ws('','SELECT COUNT(*) FROM ',v_table_name_stg)   INTO tot_rec_COUNT;    INSERT INTO  log_table values(pi_filename,now(),tot_rec_COUNT,proc_rec_COUNT, error_rec_COUNT);    raise notice 'v_directory, %',v_directory;    raise notice 'pi_filename, %',pi_filename;    raise notice 'v_region, %',v_region;   perform aws_s3.query_export_to_s3('SELECT replace(trim(substring(error_details,position(''('' in error_details)+1),'')''),'','','';''),file_name,processed_time FROM  error_table WHERE file_name = '''||pi_filename||'''',    aws_commons.create_s3_uri(v_directory, pi_filename||'.bad', v_region),    options :='FORmat csv, header, delimiter $$,$$'    ); raise notice 'v_directory, %',v_directory;    raise notice 'pi_filename, %',pi_filename;    raise notice 'v_region, %',v_region;   perform aws_s3.query_export_to_s3('SELECT * FROM  log_table WHERE file_name = '''||pi_filename||'''',    aws_commons.create_s3_uri(v_directory, pi_filename||'.log', v_region),    options :='FORmat csv, header, delimiter $$,$$'    );    END IF;  j := j+1;  END LOOP;        RETURN 'OK'; EXCEPTION     WHEN  OTHERS THEN   raise notice 'error %',sqlerrm;    ERCODE=SQLSTATE;    IF ERCODE = 'NTFIP' THEN      v_sqlerrm := concat_Ws('',sqlerrm,'No data for the filename');    ELSIF ERCODE = 'S3IMP' THEN     v_sqlerrm := concat_Ws('',sqlerrm,'Error While exporting the file from S3');    ELSE       v_sqlerrm := sqlerrm;    END IF;  select distinct directory into v_directory from  meta_EXTERNAL_TABLE;  raise notice 'exc v_directory, %',v_directory;    raise notice 'exc pi_filename, %',pi_filename;    raise notice 'exc v_region, %',v_region;   perform aws_s3.query_export_to_s3('SELECT * FROM  error_table WHERE file_name = '''||pi_filename||'''',    aws_commons.create_s3_uri(v_directory, pi_filename||'.bad', v_region),    options :='FORmat csv, header, delimiter $$,$$'    );     RETURN null; END; $function$