Führen Sie einen ETL ELT /-Workflow mit Step Functions und Amazon Redshift aus API - AWS Step Functions

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Führen Sie einen ETL ELT /-Workflow mit Step Functions und Amazon Redshift aus API

Dieses Beispielprojekt zeigt, wie Step Functions und Amazon Redshift Data verwendet werden, API um einenETL/ELT-Workflow auszuführen, der Daten in das Amazon Redshift Data Warehouse lädt.

In diesem Projekt verwendet Step Functions eine AWS Lambda Funktion und die Amazon Redshift Redshift-DatenAPI, um die erforderlichen Datenbankobjekte zu erstellen und einen Satz von Beispieldaten zu generieren. Anschließend werden zwei Jobs parallel ausgeführt, die das Laden von Dimensionstabellen durchführen, gefolgt von einer Faktentabelle. Sobald beide Dimensions-Ladeaufträge erfolgreich beendet wurden, führt Step Functions den Ladejob für die Faktentabelle aus, führt den Validierungsjob aus und unterbricht dann den Amazon Redshift Redshift-Cluster.

Anmerkung

Sie können die ETL Logik ändern, um Daten aus anderen Quellen wie Amazon S3 zu empfangen, das den COPYBefehl verwenden kann, um Daten von Amazon S3 in eine Amazon Redshift Redshift-Tabelle zu kopieren.

Weitere Informationen zu den Serviceintegrationen von Amazon Redshift und Step Functions finden Sie im Folgenden:

Anmerkung

Für dieses Beispielprojekt können Gebühren anfallen.

Für neue AWS Für Benutzer ist ein kostenloses Nutzungskontingent verfügbar. Im Rahmen dieses Kontingents sind die Services bis zu einem bestimmten Nutzungsumfang kostenlos. Weitere Informationen zur AWS Kosten und das kostenlose Kontingent finden Sie unter AWS Step Functions Preisgestaltung.

Schritt 1: Erstellen Sie die Zustandsmaschine

  1. Öffnen Sie die Step Functions Functions-Konsole und wählen Sie Create State Machine.

  2. Geben Sie ETL job in Amazon Redshift in das Suchfeld ein und wählen Sie dann ETL Job in Amazon Redshiftaus den zurückgegebenen Suchergebnissen.

  3. Wählen Sie Next (Weiter), um fortzufahren.

  4. Wählen Sie „Demo ausführen“, um eine schreibgeschützte Version und einen ready-to-deploy Workflow zu erstellen, oder wählen Sie „Darauf aufbauen“, um eine editierbare Zustandsmaschinen-Definition zu erstellen, auf der Sie aufbauen und diese später bereitstellen können.

    Dieses Beispielprojekt stellt die folgenden Ressourcen bereit:

    • Importieren in &S3; Amazon Redshift Cluster

    • Zwei Lambda Funktionen

    • Importieren in &S3; Amazon Redshift schema

    • Five Amazon Redshift Tische

    • Importieren in &S3; AWS Step Functions Zustandsautomat

    • Verwandt AWS Identity and Access Management (IAM) Rollen.

    Sie können das Workflow-Diagramm für den ETL Job in untersuchen Amazon Redshiftin der Step Functions Functions-Konsole.

  5. Wählen Sie Vorlage verwenden, um mit Ihrer Auswahl fortzufahren.

Die nächsten Schritte hängen von Ihrer vorherigen Auswahl ab:

  1. Führen Sie eine Demo durch — Sie können den Status Machine überprüfen, bevor Sie ein schreibgeschütztes Projekt mit Ressourcen erstellen, die bereitgestellt werden von AWS CloudFormation zu Ihrem AWS-Konto.

    Sie können sich die State-Machine-Definition ansehen. Wenn Sie bereit sind, wählen Sie Deploy and run, um das Projekt bereitzustellen und die Ressourcen zu erstellen.

    Die Bereitstellung kann bis zu 10 Minuten dauern, bis Ressourcen und Berechtigungen erstellt sind. Sie können den Stack-ID-Link verwenden, um den Fortschritt zu überwachen AWS CloudFormation.

    Nach Abschluss der Bereitstellung sollte Ihre neue Zustandsmaschine in der Konsole angezeigt werden.

  2. Darauf aufbauen — Sie können die Workflow-Definition überprüfen und bearbeiten. Möglicherweise müssen Sie Werte für Platzhalter im Beispielprojekt festlegen, bevor Sie versuchen, Ihren benutzerdefinierten Workflow auszuführen.

Anmerkung

Für Dienste, die für Ihr Konto bereitgestellt werden, können Standardgebühren anfallen.

Schritt 2: Führen Sie die Zustandsmaschine aus

  1. Wählen Sie auf der Seite State Machines Ihr Beispielprojekt aus.

  2. Wählen Sie auf der Seite mit dem Beispielprojekt die Option Ausführung starten aus.

  3. Gehen Sie im Dialogfeld Ausführung starten wie folgt vor:

    1. (Optional) Geben Sie einen benutzerdefinierten Ausführungsnamen ein, um den generierten Standard zu überschreiben.

      ASCIINichtnamen und Protokollierung

      Step Functions akzeptiert Namen für Zustandsmaschinen, Ausführungen, Aktivitäten und Beschriftungen, die ASCII Nichtzeichen enthalten. Da solche Zeichen nicht mit Amazon funktionieren, empfehlen wir CloudWatch, nur ASCII Zeichen zu verwenden, damit Sie die Messwerte verfolgen können CloudWatch.

    2. (Optional) Geben Sie im Eingabefeld die Eingabewerte als einJSON. Sie können diesen Schritt überspringen, wenn Sie eine Demo ausführen.

    3. Wählen Sie Start execution (Ausführung starten) aus.

    Die Step Functions Functions-Konsole leitet Sie zu einer Seite mit den Ausführungsdetails weiter, auf der Sie Status in der Diagrammansicht auswählen können, um die zugehörigen Informationen im Einzelheiten zu den Schritten Bereich zu untersuchen.

Beispiel für einen State-Machine-Code

Die Zustandsmaschine in diesem Beispielprojekt lässt sich integrieren in AWS Lambda indem die ETL Logik InputPath direkt an diese Ressourcen übergeben und mithilfe von Amazon Redshift Data asynchron ausgeführt wird. API

Sehen Sie sich die folgende Zustandsmaschine-Definition an, um zu sehen, wie Step Functions gesteuert wird AWS Lambda und die Amazon Redshift Redshift-DatenAPI.

Für Informationen darüber, wie Step Functions andere steuern kann AWS Dienste finden Sie unterIntegration von Diensten mit Step Functions.

{ "Comment": "An ETL workflow for loading dimension and fact tables", "StartAt": "InitializeCheckCluster", "States": { "InitializeCheckCluster": { "Type": "Pass", "Next": "GetStateOfCluster", "Result": { "input": { "redshift_cluster_id": "cfn36-redshiftcluster-AKIAI44QH8DHBEXAMPLE", "operation": "status" } } }, "GetStateOfCluster": { "Type": "Task", "Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftOperations-AKIAIOSFODNN7EXAMPLE", "TimeoutSeconds": 180, "HeartbeatSeconds": 60, "Next": "IsClusterAvailable", "InputPath": "$", "ResultPath": "$.clusterStatus" }, "IsClusterAvailable": { "Type": "Choice", "Choices": [ { "Variable": "$.clusterStatus", "StringEquals": "available", "Next": "InitializeBuildDB" }, { "Variable": "$.clusterStatus", "StringEquals": "paused", "Next": "InitializeResumeCluster" }, { "Variable": "$.clusterStatus", "StringEquals": "unavailable", "Next": "ClusterUnavailable" }, { "Variable": "$.clusterStatus", "StringEquals": "resuming", "Next": "ClusterWait" } ] }, "ClusterWait": { "Type": "Wait", "Seconds": 720, "Next": "InitializeCheckCluster" }, "InitializeResumeCluster": { "Type": "Pass", "Next": "ResumeCluster", "Result": { "input": { "redshift_cluster_id": "cfn36-redshiftcluster-AKIAI44QH8DHBEXAMPLE", "operation": "resume" } } }, "ResumeCluster": { "Type": "Task", "Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftOperations-AKIAIOSFODNN7EXAMPLE", "TimeoutSeconds": 180, "HeartbeatSeconds": 60, "Next": "ClusterWait", "InputPath": "$", "ResultPath": "$" }, "InitializeBuildDB": { "Type": "Pass", "Next": "BuildDB", "Result": { "input": { "redshift_cluster_id": "cfn36-redshiftcluster-AKIAI44QH8DHBEXAMPLE", "redshift_database": "dev", "redshift_user": "awsuser", "redshift_schema": "tpcds", "action": "build_database", "sql_statement": [ "create schema if not exists {0} authorization {1};", "create table if not exists {0}.customer", "(c_customer_sk int4 not null encode az64", ",c_customer_id char(16) not null encode zstd", ",c_current_addr_sk int4 encode az64", ",c_first_name char(20) encode zstd", ",c_last_name char(30) encode zstd", ",primary key (c_customer_sk)", ") distkey(c_customer_sk);", "--", "create table if not exists {0}.customer_address", "(ca_address_sk int4 not null encode az64", ",ca_address_id char(16) not null encode zstd", ",ca_state char(2) encode zstd", ",ca_zip char(10) encode zstd", ",ca_country varchar(20) encode zstd", ",primary key (ca_address_sk)", ") distkey(ca_address_sk);", "--", "create table if not exists {0}.date_dim", "(d_date_sk integer not null encode az64", ",d_date_id char(16) not null encode zstd", ",d_date date encode az64", ",d_day_name char(9) encode zstd", ",primary key (d_date_sk)", ") diststyle all;", "--", "create table if not exists {0}.item", "(i_item_sk int4 not null encode az64", ",i_item_id char(16) not null encode zstd", ",i_rec_start_date date encode az64", ",i_rec_end_date date encode az64", ",i_current_price numeric(7,2) encode az64", ",i_category char(50) encode zstd", ",i_product_name char(50) encode zstd", ",primary key (i_item_sk)", ") distkey(i_item_sk) sortkey(i_category);", "--", "create table if not exists {0}.store_sales", "(ss_sold_date_sk int4", ",ss_item_sk int4 not null encode az64", ",ss_customer_sk int4 encode az64", ",ss_addr_sk int4 encode az64", ",ss_store_sk int4 encode az64", ",ss_ticket_number int8 not null encode az64", ",ss_quantity int4 encode az64", ",ss_net_paid numeric(7,2) encode az64", ",ss_net_profit numeric(7,2) encode az64", ",primary key (ss_item_sk, ss_ticket_number)", ") distkey(ss_item_sk) sortkey(ss_sold_date_sk);" ] } } }, "BuildDB": { "Type": "Task", "Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE", "TimeoutSeconds": 180, "HeartbeatSeconds": 60, "Next": "GetBuildDBStatus", "InputPath": "$", "ResultPath": "$" }, "GetBuildDBStatus": { "Type": "Task", "Next": "CheckBuildDBStatus", "Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE", "TimeoutSeconds": 180, "HeartbeatSeconds": 60, "InputPath": "$", "ResultPath": "$.status" }, "CheckBuildDBStatus": { "Type": "Choice", "Choices": [ { "Variable": "$.status", "StringEquals": "FAILED", "Next": "FailBuildDB" }, { "Variable": "$.status", "StringEquals": "FINISHED", "Next": "InitializeBaselineData" } ], "Default": "BuildDBWait" }, "BuildDBWait": { "Type": "Wait", "Seconds": 15, "Next": "GetBuildDBStatus" }, "FailBuildDB": { "Type": "Fail", "Cause": "Database Build Failed", "Error": "Error" }, "InitializeBaselineData": { "Type": "Pass", "Next": "LoadBaselineData", "Result": { "input": { "redshift_cluster_id": "cfn36-redshiftcluster-AKIAI44QH8DHBEXAMPLE", "redshift_database": "dev", "redshift_user": "awsuser", "redshift_schema": "tpcds", "action": "load_baseline_data", "sql_statement": [ "begin transaction;", "truncate table {0}.customer;", "insert into {0}.customer (c_customer_sk,c_customer_id,c_current_addr_sk,c_first_name,c_last_name)", "values", "(7550,'AAAAAAAAOHNBAAAA',9264662,'Michelle','Deaton'),", "(37079,'AAAAAAAAHNAJAAAA',13971208,'Michael','Simms'),", "(40626,'AAAAAAAACLOJAAAA',1959255,'Susan','Ryder'),", "(2142876,'AAAAAAAAMJCLACAA',7644556,'Justin','Brown');", "analyze {0}.customer;", "--", "truncate table {0}.customer_address;", "insert into {0}.customer_address (ca_address_sk,ca_address_id,ca_state,ca_zip,ca_country)", "values", "(13971208,'AAAAAAAAIAPCFNAA','NE','63451','United States'),", "(7644556,'AAAAAAAAMIFKEHAA','SD','58883','United States'),", "(9264662,'AAAAAAAAGBOFNIAA','CA','99310','United States');", "analyze {0}.customer_address;", "--", "truncate table {0}.item;", "insert into {0}.item (i_item_sk,i_item_id,i_rec_start_date,i_rec_end_date,i_current_price,i_category,i_product_name)", "values", "(3417,'AAAAAAAAIFNAAAAA','1997-10-27',NULL,14.29,'Electronics','ationoughtesepri '),", "(9615,'AAAAAAAAOIFCAAAA','1997-10-27',NULL,9.68,'Home','antioughtcallyn st'),", "(3693,'AAAAAAAAMGOAAAAA','2001-03-12',NULL,2.10,'Men','prin stcallypri'),", "(3630,'AAAAAAAAMCOAAAAA','2001-10-27',NULL,2.95,'Electronics','barpricallypri'),", "(16506,'AAAAAAAAIHAEAAAA','2001-10-27',NULL,3.85,'Home','callybaranticallyought'),", "(7866,'AAAAAAAAILOBAAAA','2001-10-27',NULL,12.60,'Jewelry','callycallyeingation');", "--", "analyze {0}.item;", "truncate table {0}.date_dim;", "insert into {0}.date_dim (d_date_sk,d_date_id,d_date,d_day_name)", "values", "(2450521,'AAAAAAAAJFEGFCAA','1997-03-13','Thursday'),", "(2450749,'AAAAAAAANDFGFCAA','1997-10-27','Monday'),", "(2451251,'AAAAAAAADDHGFCAA','1999-03-13','Saturday'),", "(2451252,'AAAAAAAAEDHGFCAA','1999-03-14','Sunday'),", "(2451981,'AAAAAAAANAKGFCAA','2001-03-12','Monday'),", "(2451982,'AAAAAAAAOAKGFCAA','2001-03-13','Tuesday'),", "(2452210,'AAAAAAAACPKGFCAA','2001-10-27','Saturday'),", "(2452641,'AAAAAAAABKMGFCAA','2003-01-01','Wednesday'),", "(2452642,'AAAAAAAACKMGFCAA','2003-01-02','Thursday');", "--", "analyze {0}.date_dim;", "-- commit and End transaction", "commit;", "end transaction;" ] } } }, "LoadBaselineData": { "Type": "Task", "Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE", "TimeoutSeconds": 180, "HeartbeatSeconds": 60, "Next": "GetBaselineData", "InputPath": "$", "ResultPath": "$" }, "GetBaselineData": { "Type": "Task", "Next": "CheckBaselineData", "Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE", "TimeoutSeconds": 180, "HeartbeatSeconds": 60, "InputPath": "$", "ResultPath": "$.status" }, "CheckBaselineData": { "Type": "Choice", "Choices": [ { "Variable": "$.status", "StringEquals": "FAILED", "Next": "FailLoadBaselineData" }, { "Variable": "$.status", "StringEquals": "FINISHED", "Next": "ParallelizeDimensionLoadJob" } ], "Default": "BaselineDataWait" }, "BaselineDataWait": { "Type": "Wait", "Seconds": 20, "Next": "GetBaselineData" }, "FailLoadBaselineData": { "Type": "Fail", "Cause": "Load Baseline Data Failed", "Error": "Error" }, "ParallelizeDimensionLoadJob": { "Type": "Parallel", "Next": "InitializeSalesFactLoadJob", "ResultPath": "$.status", "Branches": [ { "StartAt": "InitializeCustomerAddressDimensionLoadJob", "States": { "InitializeCustomerAddressDimensionLoadJob": { "Type": "Pass", "Next": "ExecuteCustomerAddressDimensionLoadJob", "Result": { "input": { "redshift_cluster_id": "cfn36-redshiftcluster-AKIAI44QH8DHBEXAMPLE", "redshift_database": "dev", "redshift_user": "awsuser", "redshift_schema": "tpcds", "action": "load_customer_address", "sql_statement": [ "begin transaction;", "/* Create a staging table to hold the input data. Staging table is created with BACKUP NO option for faster inserts and also data temporary */", "drop table if exists {0}.stg_customer_address;", "create table if not exists {0}.stg_customer_address", "(ca_address_id varchar(16) encode zstd", ",ca_state varchar(2) encode zstd", ",ca_zip varchar(10) encode zstd", ",ca_country varchar(20) encode zstd", ")", "backup no", "diststyle even;", "/* Ingest data from source */", "insert into {0}.stg_customer_address (ca_address_id,ca_state,ca_zip,ca_country)", "values", "('AAAAAAAACFBBAAAA','NE','','United States'),", "('AAAAAAAAGAEFAAAA','NE','61749','United States'),", "('AAAAAAAAPJKKAAAA','OK','','United States'),", "('AAAAAAAAMIHGAAAA','AL','','United States');", "/* Perform UPDATE for existing data with refreshed attribute values */", "update {0}.customer_address", " set ca_state = stg_customer_address.ca_state,", " ca_zip = stg_customer_address.ca_zip,", " ca_country = stg_customer_address.ca_country", " from {0}.stg_customer_address", " where customer_address.ca_address_id = stg_customer_address.ca_address_id;", "/* Perform insert for new rows */", "insert into {0}.customer_address", "(ca_address_sk", ",ca_address_id", ",ca_state", ",ca_zip", ",ca_country", ")", "with max_customer_address_sk as", "(select max(ca_address_sk) max_ca_address_sk", "from {0}.customer_address)", "select row_number() over (order by stg_customer_address.ca_address_id) + max_customer_address_sk.max_ca_address_sk as ca_address_sk", ",stg_customer_address.ca_address_id", ",stg_customer_address.ca_state", ",stg_customer_address.ca_zip", ",stg_customer_address.ca_country", "from {0}.stg_customer_address,", "max_customer_address_sk", "where stg_customer_address.ca_address_id not in (select customer_address.ca_address_id from {0}.customer_address);", "/* Commit and End transaction */", "commit;", "end transaction;" ] } } }, "ExecuteCustomerAddressDimensionLoadJob": { "Type": "Task", "Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE", "TimeoutSeconds": 180, "HeartbeatSeconds": 60, "Next": "GetCustomerAddressDimensionLoadStatus", "InputPath": "$", "ResultPath": "$" }, "GetCustomerAddressDimensionLoadStatus": { "Type": "Task", "Next": "CheckCustomerAddressDimensionLoadStatus", "Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE", "TimeoutSeconds": 180, "HeartbeatSeconds": 60, "InputPath": "$", "ResultPath": "$.status" }, "CheckCustomerAddressDimensionLoadStatus": { "Type": "Choice", "Choices": [ { "Variable": "$.status", "StringEquals": "FAILED", "Next": "FailCustomerAddressDimensionLoad" }, { "Variable": "$.status", "StringEquals": "FINISHED", "Next": "CompleteCustomerAddressDimensionLoad" } ], "Default": "CustomerAddressWait" }, "CustomerAddressWait": { "Type": "Wait", "Seconds": 5, "Next": "GetCustomerAddressDimensionLoadStatus" }, "CompleteCustomerAddressDimensionLoad": { "Type": "Task", "Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE", "TimeoutSeconds": 180, "HeartbeatSeconds": 60, "End": true }, "FailCustomerAddressDimensionLoad": { "Type": "Fail", "Cause": "ETL Workflow Failed", "Error": "Error" } } }, { "StartAt": "InitializeItemDimensionLoadJob", "States": { "InitializeItemDimensionLoadJob": { "Type": "Pass", "Next": "ExecuteItemDimensionLoadJob", "Result": { "input": { "redshift_cluster_id": "cfn36-redshiftcluster-AKIAI44QH8DHBEXAMPLE", "redshift_database": "dev", "redshift_user": "awsuser", "redshift_schema": "tpcds", "action": "load_item", "sql_statement": [ "begin transaction;", "/* Create a staging table to hold the input data. Staging table is created with BACKUP NO option for faster inserts and also data temporary */", "drop table if exists {0}.stg_item;", "create table if not exists {0}.stg_item", "(i_item_id varchar(16) encode zstd", ",i_rec_start_date date encode zstd", ",i_rec_end_date date encode zstd", ",i_current_price numeric(7,2) encode zstd", ",i_category varchar(50) encode zstd", ",i_product_name varchar(50) encode zstd", ")", "backup no", "diststyle even;", "/* Ingest data from source */", "insert into {0}.stg_item", "(i_item_id,i_rec_start_date,i_rec_end_date,i_current_price,i_category,i_product_name)", "values", "('AAAAAAAAABJBAAAA','2000-10-27',NULL,4.10,'Books','ationoughtesecally'),", "('AAAAAAAAOPKBAAAA','2001-10-27',NULL,4.22,'Books','ableoughtn stcally'),", "('AAAAAAAAHGPAAAAA','1997-10-27',NULL,29.30,'Books','priesen stpri'),", "('AAAAAAAAICMAAAAA','2001-10-27',NULL,1.93,'Books','eseoughtoughtpri'),", "('AAAAAAAAGPGBAAAA','2001-10-27',NULL,9.96,'Books','bareingeinganti'),", "('AAAAAAAANBEBAAAA','1997-10-27',NULL,2.25,'Music','n steseoughtanti'),", "('AAAAAAAACLAAAAAA','2001-10-27',NULL,1.71,'Home','bareingought'),", "('AAAAAAAAOBBDAAAA','2001-10-27',NULL,5.55,'Books','callyationantiableought');", "/************************************************************************************************************************", "** Type 2 is maintained for i_current_price column.", "** Update all attributes for the item when the price is not changed", "** Sunset existing active item record with current i_rec_end_date and insert a new record when the price does not match", "*************************************************************************************************************************/", "update {0}.item", " set i_category = stg_item.i_category,", " i_product_name = stg_item.i_product_name", " from {0}.stg_item", " where item.i_item_id = stg_item.i_item_id", " and item.i_rec_end_date is null", " and item.i_current_price = stg_item.i_current_price;", "insert into {0}.item", "(i_item_sk", ",i_item_id", ",i_rec_start_date", ",i_rec_end_date", ",i_current_price", ",i_category", ",i_product_name", ")", "with max_item_sk as", "(select max(i_item_sk) max_item_sk", " from {0}.item)", "select row_number() over (order by stg_item.i_item_id) + max_item_sk as i_item_sk", " ,stg_item.i_item_id", " ,trunc(sysdate) as i_rec_start_date", " ,null as i_rec_end_date", " ,stg_item.i_current_price", " ,stg_item.i_category", " ,stg_item.i_product_name", " from {0}.stg_item, {0}.item, max_item_sk", " where item.i_item_id = stg_item.i_item_id", " and item.i_rec_end_date is null", " and item.i_current_price <> stg_item.i_current_price;", "/* Sunset penultimate records that were inserted as type 2 */", "update {0}.item", " set i_rec_end_date = trunc(sysdate)", " from {0}.stg_item", " where item.i_item_id = stg_item.i_item_id", " and item.i_rec_end_date is null", " and item.i_current_price <> stg_item.i_current_price;", "/* Commit and End transaction */", "commit;", "end transaction;" ] } } }, "ExecuteItemDimensionLoadJob": { "Type": "Task", "Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE", "TimeoutSeconds": 180, "HeartbeatSeconds": 60, "Next": "GetItemDimensionLoadStatus", "InputPath": "$", "ResultPath": "$" }, "GetItemDimensionLoadStatus": { "Type": "Task", "Next": "CheckItemDimensionLoadStatus", "Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE", "TimeoutSeconds": 180, "HeartbeatSeconds": 60, "InputPath": "$", "ResultPath": "$.status" }, "CheckItemDimensionLoadStatus": { "Type": "Choice", "Choices": [ { "Variable": "$.status", "StringEquals": "FAILED", "Next": "FailItemDimensionLoad" }, { "Variable": "$.status", "StringEquals": "FINISHED", "Next": "CompleteItemDimensionLoad" } ], "Default": "ItemWait" }, "ItemWait": { "Type": "Wait", "Seconds": 5, "Next": "GetItemDimensionLoadStatus" }, "CompleteItemDimensionLoad": { "Type": "Task", "Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE", "TimeoutSeconds": 180, "HeartbeatSeconds": 60, "End": true }, "FailItemDimensionLoad": { "Type": "Fail", "Cause": "ETL Workflow Failed", "Error": "Error" } } } ] }, "InitializeSalesFactLoadJob": { "Type": "Pass", "Next": "ExecuteSalesFactLoadJob", "Result": { "input": { "redshift_cluster_id": "cfn36-redshiftcluster-AKIAI44QH8DHBEXAMPLE", "redshift_database": "dev", "redshift_user": "awsuser", "redshift_schema": "tpcds", "snapshot_date": "2003-01-02", "action": "load_sales_fact", "sql_statement": [ "begin transaction;", "/* Create a stg_store_sales staging table */", "drop table if exists {0}.stg_store_sales;", "create table {0}.stg_store_sales", "(sold_date date encode zstd", ",i_item_id varchar(16) encode zstd", ",c_customer_id varchar(16) encode zstd", ",ca_address_id varchar(16) encode zstd", ",ss_ticket_number integer encode zstd", ",ss_quantity integer encode zstd", ",ss_net_paid numeric(7,2) encode zstd", ",ss_net_profit numeric(7,2) encode zstd", ")", "backup no", "diststyle even;", "/* Ingest data from source */", "insert into {0}.stg_store_sales", "(sold_date,i_item_id,c_customer_id,ca_address_id,ss_ticket_number,ss_quantity,ss_net_paid,ss_net_profit)", "values", "('2003-01-02','AAAAAAAAIFNAAAAA','AAAAAAAAOHNBAAAA','AAAAAAAAGBOFNIAA',1403191,13,5046.37,150.97),", "('2003-01-02','AAAAAAAAIFNAAAAA','AAAAAAAAOHNBAAAA','AAAAAAAAGBOFNIAA',1403191,13,2103.72,-124.08),", "('2003-01-02','AAAAAAAAILOBAAAA','AAAAAAAAOHNBAAAA','AAAAAAAAGBOFNIAA',1403191,13,959.10,-1304.70),", "('2003-01-02','AAAAAAAAILOBAAAA','AAAAAAAAHNAJAAAA','AAAAAAAAIAPCFNAA',1403191,13,962.65,-475.80),", "('2003-01-02','AAAAAAAAMCOAAAAA','AAAAAAAAHNAJAAAA','AAAAAAAAIAPCFNAA',1201746,17,111.60,-241.65),", "('2003-01-02','AAAAAAAAMCOAAAAA','AAAAAAAAHNAJAAAA','AAAAAAAAIAPCFNAA',1201746,17,4013.02,-1111.48),", "('2003-01-02','AAAAAAAAMCOAAAAA','AAAAAAAAMJCLACAA','AAAAAAAAMIFKEHAA',1201746,17,2689.12,-5572.28),", "('2003-01-02','AAAAAAAAMGOAAAAA','AAAAAAAAMJCLACAA','AAAAAAAAMIFKEHAA',193971,18,1876.89,-556.35);", "/* Delete any rows from target store_sales for the input date for idempotency */", "delete from {0}.store_sales where ss_sold_date_sk in (select d_date_sk from {0}.date_dim where d_date='{1}');", "/* Insert data from staging table to the target table */", "insert into {0}.store_sales", "(ss_sold_date_sk", ",ss_item_sk", ",ss_customer_sk", ",ss_addr_sk", ",ss_ticket_number", ",ss_quantity", ",ss_net_paid", ",ss_net_profit", ")", "select date_dim.d_date_sk ss_sold_date_sk", " ,item.i_item_sk ss_item_sk", " ,customer.c_customer_sk ss_customer_sk", " ,customer_address.ca_address_sk ss_addr_sk", " ,ss_ticket_number", " ,ss_quantity", " ,ss_net_paid", " ,ss_net_profit", " from {0}.stg_store_sales as store_sales", " inner join {0}.date_dim on store_sales.sold_date = date_dim.d_date", " left join {0}.item on store_sales.i_item_id = item.i_item_id and item.i_rec_end_date is null", " left join {0}.customer on store_sales.c_customer_id = customer.c_customer_id", " left join {0}.customer_address on store_sales.ca_address_id = customer_address.ca_address_id;", "/* Drop staging table */", "drop table if exists {0}.stg_store_sales;", "/* Commit and End transaction */", "commit;", "end transaction;" ] } } }, "ExecuteSalesFactLoadJob": { "Type": "Task", "Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE", "TimeoutSeconds": 180, "HeartbeatSeconds": 60, "Next": "GetSalesFactLoadStatus", "InputPath": "$", "ResultPath": "$" }, "GetSalesFactLoadStatus": { "Type": "Task", "Next": "CheckSalesFactLoadStatus", "Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE", "TimeoutSeconds": 180, "HeartbeatSeconds": 60, "InputPath": "$", "ResultPath": "$.status" }, "CheckSalesFactLoadStatus": { "Type": "Choice", "Choices": [ { "Variable": "$.status", "StringEquals": "FAILED", "Next": "FailSalesFactLoad" }, { "Variable": "$.status", "StringEquals": "FINISHED", "Next": "SalesETLPipelineComplete" } ], "Default": "SalesWait" }, "SalesWait": { "Type": "Wait", "Seconds": 5, "Next": "GetSalesFactLoadStatus" }, "FailSalesFactLoad": { "Type": "Fail", "Cause": "ETL Workflow Failed", "Error": "Error" }, "ClusterUnavailable": { "Type": "Fail", "Cause": "Redshift cluster is not available", "Error": "Error" }, "SalesETLPipelineComplete": { "Type": "Pass", "Next": "ValidateSalesMetric", "Result": { "input": { "redshift_cluster_id": "cfn36-redshiftcluster-AKIAI44QH8DHBEXAMPLE", "redshift_database": "dev", "redshift_user": "awsuser", "redshift_schema": "tpcds", "snapshot_date": "2003-01-02", "action": "validate_sales_metric", "sql_statement": [ "select 1/count(1) from {0}.store_sales where ss_sold_date_sk in (select d_date_sk from {0}.date_dim where d_date='{1}')" ] } } }, "ValidateSalesMetric": { "Type": "Task", "Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE", "TimeoutSeconds": 180, "HeartbeatSeconds": 60, "Next": "GetValidateSalesMetricStatus", "InputPath": "$", "ResultPath": "$" }, "GetValidateSalesMetricStatus": { "Type": "Task", "Next": "CheckValidateSalesMetricStatus", "Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE", "TimeoutSeconds": 180, "HeartbeatSeconds": 60, "InputPath": "$", "ResultPath": "$.status" }, "CheckValidateSalesMetricStatus": { "Type": "Choice", "Choices": [ { "Variable": "$.status", "StringEquals": "FAILED", "Next": "FailSalesMetricValidation" }, { "Variable": "$.status", "StringEquals": "FINISHED", "Next": "DataValidationComplete" } ], "Default": "SalesValidationWait" }, "SalesValidationWait": { "Type": "Wait", "Seconds": 5, "Next": "GetValidateSalesMetricStatus" }, "FailSalesMetricValidation": { "Type": "Fail", "Cause": "Data Validation Failed", "Error": "Error" }, "DataValidationComplete": { "Type": "Pass", "Next": "InitializePauseCluster" }, "InitializePauseCluster": { "Type": "Pass", "Next": "PauseCluster", "Result": { "input": { "redshift_cluster_id": "cfn36-redshiftcluster-AKIAI44QH8DHBEXAMPLE", "operation": "pause" } } }, "PauseCluster": { "Type": "Task", "Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftOperations-AKIAIOSFODNN7EXAMPLE", "TimeoutSeconds": 180, "HeartbeatSeconds": 60, "Next": "PauseClusterWait", "InputPath": "$", "ResultPath": "$.clusterStatus", "Catch": [ { "ErrorEquals": [ "States.ALL" ], "Next": "ClusterPausedComplete" } ] }, "InitializeCheckPauseCluster": { "Type": "Pass", "Next": "GetStateOfPausedCluster", "Result": { "input": { "redshift_cluster_id": "cfn36-redshiftcluster-AKIAI44QH8DHBEXAMPLE", "operation": "status" } } }, "GetStateOfPausedCluster": { "Type": "Task", "Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftOperations-AKIAIOSFODNN7EXAMPLE", "TimeoutSeconds": 180, "HeartbeatSeconds": 60, "Next": "IsClusterPaused", "InputPath": "$", "ResultPath": "$.clusterStatus" }, "IsClusterPaused": { "Type": "Choice", "Choices": [ { "Variable": "$.clusterStatus", "StringEquals": "available", "Next": "InitializePauseCluster" }, { "Variable": "$.clusterStatus", "StringEquals": "paused", "Next": "ClusterPausedComplete" }, { "Variable": "$.clusterStatus", "StringEquals": "unavailable", "Next": "ClusterUnavailable" }, { "Variable": "$.clusterStatus", "StringEquals": "resuming", "Next": "PauseClusterWait" } ] }, "PauseClusterWait": { "Type": "Wait", "Seconds": 720, "Next": "InitializeCheckPauseCluster" }, "ClusterPausedComplete": { "Type": "Pass", "End": true } } }

Beispiel für IAM eine Richtlinie

Das folgende Beispiel AWS Identity and Access Management (IAM) Die durch das Beispielprojekt generierte Richtlinie beinhaltet die geringsten Rechte, die für die Ausführung der Zustandsmaschine und der zugehörigen Ressourcen erforderlich sind. Wir empfehlen, dass Sie nur die Berechtigungen in Ihre IAM Richtlinien aufnehmen, die erforderlich sind.

{ "Version": "2012-10-17", "Statement": [ { "Action": [ "lambda:InvokeFunction" ], "Resource": [ "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE", "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftOperations-AKIAIOSFODNN7EXAMPLE" ], "Effect": "Allow" } ] }

Informationen zur Konfiguration IAM bei der Verwendung von Step Functions mit anderen AWS Dienste finden Sie unterSo generiert Step Functions IAM Richtlinien für integrierte Dienste.