Exécutez des flux de travail ETL/ELT à l'aide d'Amazon Redshift (Lambda, API de données Amazon Redshift) - AWS Step Functions

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Exécutez des flux de travail ETL/ELT à l'aide d'Amazon Redshift (Lambda, API de données Amazon Redshift)

Cet exemple de projet montre comment utiliser Step Functions et l'API Amazon Redshift Data pour exécuter un flux de travail ETL/ELT qui charge des données dans l'entrepôt de données Amazon Redshift.

Dans ce projet, Step Functions utilise une AWS Lambda fonction et l'API Amazon Redshift Data pour créer les objets de base de données requis et générer un ensemble de données d'exemple, puis exécute deux tâches en parallèle qui consistent à charger des tables de dimensions, suivies d'une table de faits. Une fois les deux tâches de chargement des dimensions terminées avec succès, Step Functions exécute la tâche de chargement pour la table d'information, exécute la tâche de validation, puis met en pause le cluster Amazon Redshift.

Note

Vous pouvez modifier la logique ETL pour recevoir des données provenant d'autres sources telles qu'Amazon S3, qui peut utiliser la commande COPY pour copier des données d'Amazon S3 vers une table Amazon Redshift.

Pour plus d'informations sur les intégrations des services Amazon Redshift et Step Functions, consultez les pages suivantes :

Note

Cet exemple de projet peut entraîner des frais.

Pour AWS les nouveaux utilisateurs, un niveau d'utilisation gratuit est disponible. Dans cette offre, les services sont gratuits en-dessous d'un certain niveau d'utilisation. Pour plus d'informations sur AWS les coûts et le niveau gratuit, consultez AWS Step Functions les tarifs.

Étape 1 : créer la machine à états et provisionner les ressources

  1. Ouvrez la console Step Functions et choisissez Create state machine.

  2. Tapez ETL job in Amazon Redshift dans le champ de recherche, puis choisissez ETL job in dans les résultats Amazon Redshift de recherche renvoyés.

  3. Choisissez Next (Suivant) pour continuer.

  4. Step Functions répertorie les AWS services éléments utilisés dans l'exemple de projet que vous avez sélectionné. Il montre également un graphique de flux de travail pour l'exemple de projet. Déployez ce projet sur votre site Compte AWS ou utilisez-le comme point de départ pour créer vos propres projets. Selon la façon dont vous souhaitez procéder, choisissez Exécuter une démo ou Construire à partir de celle-ci.

    Cet exemple de projet déploie les ressources suivantes :

    • Un Amazon Redshift cluster

    • Deux fonctions Lambda

    • Un Amazon Redshift schéma

    • Cinq Amazon Redshift tables

    • Une machine AWS Step Functions étatique

    • Rôles associés AWS Identity and Access Management (IAM).

    L'image suivante montre le graphique du flux de travail pour la tâche ETL dans Amazon Redshift un exemple de projet :

    Graphique du flux de travail de la tâche ETL dans Amazon Redshift un exemple de projet.
  5. Choisissez Utiliser le modèle pour poursuivre votre sélection.

  6. Effectuez l’une des actions suivantes :

    • Si vous avez sélectionné Build on it, Step Functions crée le prototype de flux de travail pour l'exemple de projet que vous avez sélectionné. Step Functions ne déploie pas les ressources répertoriées dans la définition du flux de travail.

      Dans Workflow StudioMode de conception, glissez-déposez les états depuis le Navigateur d'états pour continuer à créer votre prototype de flux de travail. Vous pouvez également passer à un éditeur de code intégré similaire à VS Code pour mettre à jour la définition Amazon States Language (ASL) de votre machine à états dans la console Step Functions. Mode code Pour plus d'informations sur l'utilisation de Workflow Studio pour créer vos machines d'état, consultezUtilisation de Workflow Studio.

      Important

      N'oubliez pas de mettre à jour l'espace réservé Amazon Resource Name (ARN) pour les ressources utilisées dans l'exemple de projet avant d'exécuter votre flux de travail.

    • Si vous avez sélectionné Run a demo, Step Functions crée un exemple de projet en lecture seule qui utilise un AWS CloudFormation modèle pour déployer les AWS ressources répertoriées dans ce modèle sur votre. Compte AWS

      Astuce

      Pour afficher la définition de la machine à états de l'exemple de projet, choisissez Code.

      Lorsque vous êtes prêt, choisissez Déployer et exécuter pour déployer l'exemple de projet et créer les ressources.

      La création de ces ressources et des autorisations IAM associées peut prendre jusqu'à 10 minutes. Pendant le déploiement de vos ressources, vous pouvez ouvrir le lien CloudFormation Stack ID pour voir quelles ressources sont mises en service.

      Une fois que toutes les ressources de l'exemple de projet ont été créées, vous pouvez voir le nouvel exemple de projet répertorié sur la page State machines.

      Important

      Des frais standard peuvent s'appliquer pour chaque service utilisé dans le CloudFormation modèle.

Étape 2 : Exécuter la machine à états

  1. Sur la page State machines, choisissez votre exemple de projet.

  2. Sur la page d'exemple de projet, choisissez Démarrer l'exécution.

  3. Dans la boîte de dialogue Démarrer l'exécution, procédez comme suit :

    1. (Facultatif) Pour identifier votre exécution, vous pouvez lui donner un nom dans le champ Nom. Par défaut, Step Functions génère automatiquement un nom d'exécution unique.

      Note

      Step Functions vous permet de créer des noms pour les machines d'état, les exécutions et les activités, ainsi que des étiquettes contenant des caractères non ASCII. Ces noms non ASCII ne fonctionnent pas avec Amazon. CloudWatch Pour être sûr de pouvoir suivre CloudWatch les métriques, choisissez un nom qui utilise uniquement des caractères ASCII.

    2. (Facultatif) Dans la zone de saisie, entrez les valeurs d'entrée au format JSON pour exécuter votre flux de travail.

      Si vous avez choisi d'exécuter une démo, vous n'avez pas besoin de fournir d'entrée d'exécution.

      Note

      Si le projet de démonstration que vous avez déployé contient des données d'entrée d'exécution préremplies, utilisez ces entrées pour exécuter la machine à états.

    3. Choisissez Start execution (Démarrer l'exécution).

    4. La console Step Functions vous dirige vers une page intitulée avec votre ID d'exécution. Cette page est connue sous le nom de page Détails de l'exécution. Sur cette page, vous pouvez consulter les résultats de l'exécution au fur et à mesure que l'exécution progresse ou une fois celle-ci terminée.

      Pour consulter les résultats de l'exécution, choisissez des états individuels dans la vue graphique, puis choisissez les onglets individuels du Détails de l'étape volet pour afficher les détails de chaque état, y compris les entrées, les sorties et la définition respectivement. Pour plus de détails sur les informations d'exécution que vous pouvez consulter sur la page Détails de l'exécution, voirPage de détails d'exécution — Vue d'ensemble de l'interface.

Exemple de code de machine d'état

Dans cet exemple de projet, la machine à états s'intègre AWS Lambda en transmettant la logique ETL InputPath directement à ces ressources et en l'exécutant de manière asynchrone à l'aide de l'API Amazon Redshift Data.

Parcourez cet exemple de machine à états pour découvrir comment Step Functions contrôle AWS Lambda l'API Amazon Redshift Data.

Pour plus d'informations sur la manière de AWS Step Functions contrôler d'autres AWS services, consultezUtilisation AWS Step Functions avec d'autres services.

{ "Comment": "A simple ETL workflow for loading dimension and fact tables", "StartAt": "InitializeCheckCluster", "States": { "InitializeCheckCluster": { "Type": "Pass", "Next": "GetStateOfCluster", "Result": { "input": { "redshift_cluster_id": "cfn36-redshiftcluster-AKIAI44QH8DHBEXAMPLE", "operation": "status" } } }, "GetStateOfCluster": { "Type": "Task", "Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftOperations-AKIAIOSFODNN7EXAMPLE", "TimeoutSeconds": 180, "HeartbeatSeconds": 60, "Next": "IsClusterAvailable", "InputPath": "$", "ResultPath": "$.clusterStatus" }, "IsClusterAvailable": { "Type": "Choice", "Choices": [ { "Variable": "$.clusterStatus", "StringEquals": "available", "Next": "InitializeBuildDB" }, { "Variable": "$.clusterStatus", "StringEquals": "paused", "Next": "InitializeResumeCluster" }, { "Variable": "$.clusterStatus", "StringEquals": "unavailable", "Next": "ClusterUnavailable" }, { "Variable": "$.clusterStatus", "StringEquals": "resuming", "Next": "ClusterWait" } ] }, "ClusterWait": { "Type": "Wait", "Seconds": 720, "Next": "InitializeCheckCluster" }, "InitializeResumeCluster": { "Type": "Pass", "Next": "ResumeCluster", "Result": { "input": { "redshift_cluster_id": "cfn36-redshiftcluster-AKIAI44QH8DHBEXAMPLE", "operation": "resume" } } }, "ResumeCluster": { "Type": "Task", "Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftOperations-AKIAIOSFODNN7EXAMPLE", "TimeoutSeconds": 180, "HeartbeatSeconds": 60, "Next": "ClusterWait", "InputPath": "$", "ResultPath": "$" }, "InitializeBuildDB": { "Type": "Pass", "Next": "BuildDB", "Result": { "input": { "redshift_cluster_id": "cfn36-redshiftcluster-AKIAI44QH8DHBEXAMPLE", "redshift_database": "dev", "redshift_user": "awsuser", "redshift_schema": "tpcds", "action": "build_database", "sql_statement": [ "create schema if not exists {0} authorization {1};", "create table if not exists {0}.customer", "(c_customer_sk int4 not null encode az64", ",c_customer_id char(16) not null encode zstd", ",c_current_addr_sk int4 encode az64", ",c_first_name char(20) encode zstd", ",c_last_name char(30) encode zstd", ",primary key (c_customer_sk)", ") distkey(c_customer_sk);", "--", "create table if not exists {0}.customer_address", "(ca_address_sk int4 not null encode az64", ",ca_address_id char(16) not null encode zstd", ",ca_state char(2) encode zstd", ",ca_zip char(10) encode zstd", ",ca_country varchar(20) encode zstd", ",primary key (ca_address_sk)", ") distkey(ca_address_sk);", "--", "create table if not exists {0}.date_dim", "(d_date_sk integer not null encode az64", ",d_date_id char(16) not null encode zstd", ",d_date date encode az64", ",d_day_name char(9) encode zstd", ",primary key (d_date_sk)", ") diststyle all;", "--", "create table if not exists {0}.item", "(i_item_sk int4 not null encode az64", ",i_item_id char(16) not null encode zstd", ",i_rec_start_date date encode az64", ",i_rec_end_date date encode az64", ",i_current_price numeric(7,2) encode az64", ",i_category char(50) encode zstd", ",i_product_name char(50) encode zstd", ",primary key (i_item_sk)", ") distkey(i_item_sk) sortkey(i_category);", "--", "create table if not exists {0}.store_sales", "(ss_sold_date_sk int4", ",ss_item_sk int4 not null encode az64", ",ss_customer_sk int4 encode az64", ",ss_addr_sk int4 encode az64", ",ss_store_sk int4 encode az64", ",ss_ticket_number int8 not null encode az64", ",ss_quantity int4 encode az64", ",ss_net_paid numeric(7,2) encode az64", ",ss_net_profit numeric(7,2) encode az64", ",primary key (ss_item_sk, ss_ticket_number)", ") distkey(ss_item_sk) sortkey(ss_sold_date_sk);" ] } } }, "BuildDB": { "Type": "Task", "Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE", "TimeoutSeconds": 180, "HeartbeatSeconds": 60, "Next": "GetBuildDBStatus", "InputPath": "$", "ResultPath": "$" }, "GetBuildDBStatus": { "Type": "Task", "Next": "CheckBuildDBStatus", "Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE", "TimeoutSeconds": 180, "HeartbeatSeconds": 60, "InputPath": "$", "ResultPath": "$.status" }, "CheckBuildDBStatus": { "Type": "Choice", "Choices": [ { "Variable": "$.status", "StringEquals": "FAILED", "Next": "FailBuildDB" }, { "Variable": "$.status", "StringEquals": "FINISHED", "Next": "InitializeBaselineData" } ], "Default": "BuildDBWait" }, "BuildDBWait": { "Type": "Wait", "Seconds": 15, "Next": "GetBuildDBStatus" }, "FailBuildDB": { "Type": "Fail", "Cause": "Database Build Failed", "Error": "Error" }, "InitializeBaselineData": { "Type": "Pass", "Next": "LoadBaselineData", "Result": { "input": { "redshift_cluster_id": "cfn36-redshiftcluster-AKIAI44QH8DHBEXAMPLE", "redshift_database": "dev", "redshift_user": "awsuser", "redshift_schema": "tpcds", "action": "load_baseline_data", "sql_statement": [ "begin transaction;", "truncate table {0}.customer;", "insert into {0}.customer (c_customer_sk,c_customer_id,c_current_addr_sk,c_first_name,c_last_name)", "values", "(7550,'AAAAAAAAOHNBAAAA',9264662,'Michelle','Deaton'),", "(37079,'AAAAAAAAHNAJAAAA',13971208,'Michael','Simms'),", "(40626,'AAAAAAAACLOJAAAA',1959255,'Susan','Ryder'),", "(2142876,'AAAAAAAAMJCLACAA',7644556,'Justin','Brown');", "analyze {0}.customer;", "--", "truncate table {0}.customer_address;", "insert into {0}.customer_address (ca_address_sk,ca_address_id,ca_state,ca_zip,ca_country)", "values", "(13971208,'AAAAAAAAIAPCFNAA','NE','63451','United States'),", "(7644556,'AAAAAAAAMIFKEHAA','SD','58883','United States'),", "(9264662,'AAAAAAAAGBOFNIAA','CA','99310','United States');", "analyze {0}.customer_address;", "--", "truncate table {0}.item;", "insert into {0}.item (i_item_sk,i_item_id,i_rec_start_date,i_rec_end_date,i_current_price,i_category,i_product_name)", "values", "(3417,'AAAAAAAAIFNAAAAA','1997-10-27',NULL,14.29,'Electronics','ationoughtesepri '),", "(9615,'AAAAAAAAOIFCAAAA','1997-10-27',NULL,9.68,'Home','antioughtcallyn st'),", "(3693,'AAAAAAAAMGOAAAAA','2001-03-12',NULL,2.10,'Men','prin stcallypri'),", "(3630,'AAAAAAAAMCOAAAAA','2001-10-27',NULL,2.95,'Electronics','barpricallypri'),", "(16506,'AAAAAAAAIHAEAAAA','2001-10-27',NULL,3.85,'Home','callybaranticallyought'),", "(7866,'AAAAAAAAILOBAAAA','2001-10-27',NULL,12.60,'Jewelry','callycallyeingation');", "--", "analyze {0}.item;", "truncate table {0}.date_dim;", "insert into {0}.date_dim (d_date_sk,d_date_id,d_date,d_day_name)", "values", "(2450521,'AAAAAAAAJFEGFCAA','1997-03-13','Thursday'),", "(2450749,'AAAAAAAANDFGFCAA','1997-10-27','Monday'),", "(2451251,'AAAAAAAADDHGFCAA','1999-03-13','Saturday'),", "(2451252,'AAAAAAAAEDHGFCAA','1999-03-14','Sunday'),", "(2451981,'AAAAAAAANAKGFCAA','2001-03-12','Monday'),", "(2451982,'AAAAAAAAOAKGFCAA','2001-03-13','Tuesday'),", "(2452210,'AAAAAAAACPKGFCAA','2001-10-27','Saturday'),", "(2452641,'AAAAAAAABKMGFCAA','2003-01-01','Wednesday'),", "(2452642,'AAAAAAAACKMGFCAA','2003-01-02','Thursday');", "--", "analyze {0}.date_dim;", "-- commit and End transaction", "commit;", "end transaction;" ] } } }, "LoadBaselineData": { "Type": "Task", "Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE", "TimeoutSeconds": 180, "HeartbeatSeconds": 60, "Next": "GetBaselineData", "InputPath": "$", "ResultPath": "$" }, "GetBaselineData": { "Type": "Task", "Next": "CheckBaselineData", "Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE", "TimeoutSeconds": 180, "HeartbeatSeconds": 60, "InputPath": "$", "ResultPath": "$.status" }, "CheckBaselineData": { "Type": "Choice", "Choices": [ { "Variable": "$.status", "StringEquals": "FAILED", "Next": "FailLoadBaselineData" }, { "Variable": "$.status", "StringEquals": "FINISHED", "Next": "ParallelizeDimensionLoadJob" } ], "Default": "BaselineDataWait" }, "BaselineDataWait": { "Type": "Wait", "Seconds": 20, "Next": "GetBaselineData" }, "FailLoadBaselineData": { "Type": "Fail", "Cause": "Load Baseline Data Failed", "Error": "Error" }, "ParallelizeDimensionLoadJob": { "Type": "Parallel", "Next": "InitializeSalesFactLoadJob", "ResultPath": "$.status", "Branches": [ { "StartAt": "InitializeCustomerAddressDimensionLoadJob", "States": { "InitializeCustomerAddressDimensionLoadJob": { "Type": "Pass", "Next": "ExecuteCustomerAddressDimensionLoadJob", "Result": { "input": { "redshift_cluster_id": "cfn36-redshiftcluster-AKIAI44QH8DHBEXAMPLE", "redshift_database": "dev", "redshift_user": "awsuser", "redshift_schema": "tpcds", "action": "load_customer_address", "sql_statement": [ "begin transaction;", "/* Create a staging table to hold the input data. Staging table is created with BACKUP NO option for faster inserts and also data temporary */", "drop table if exists {0}.stg_customer_address;", "create table if not exists {0}.stg_customer_address", "(ca_address_id varchar(16) encode zstd", ",ca_state varchar(2) encode zstd", ",ca_zip varchar(10) encode zstd", ",ca_country varchar(20) encode zstd", ")", "backup no", "diststyle even;", "/* Ingest data from source */", "insert into {0}.stg_customer_address (ca_address_id,ca_state,ca_zip,ca_country)", "values", "('AAAAAAAACFBBAAAA','NE','','United States'),", "('AAAAAAAAGAEFAAAA','NE','61749','United States'),", "('AAAAAAAAPJKKAAAA','OK','','United States'),", "('AAAAAAAAMIHGAAAA','AL','','United States');", "/* Perform UPDATE for existing data with refreshed attribute values */", "update {0}.customer_address", " set ca_state = stg_customer_address.ca_state,", " ca_zip = stg_customer_address.ca_zip,", " ca_country = stg_customer_address.ca_country", " from {0}.stg_customer_address", " where customer_address.ca_address_id = stg_customer_address.ca_address_id;", "/* Perform insert for new rows */", "insert into {0}.customer_address", "(ca_address_sk", ",ca_address_id", ",ca_state", ",ca_zip", ",ca_country", ")", "with max_customer_address_sk as", "(select max(ca_address_sk) max_ca_address_sk", "from {0}.customer_address)", "select row_number() over (order by stg_customer_address.ca_address_id) + max_customer_address_sk.max_ca_address_sk as ca_address_sk", ",stg_customer_address.ca_address_id", ",stg_customer_address.ca_state", ",stg_customer_address.ca_zip", ",stg_customer_address.ca_country", "from {0}.stg_customer_address,", "max_customer_address_sk", "where stg_customer_address.ca_address_id not in (select customer_address.ca_address_id from {0}.customer_address);", "/* Commit and End transaction */", "commit;", "end transaction;" ] } } }, "ExecuteCustomerAddressDimensionLoadJob": { "Type": "Task", "Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE", "TimeoutSeconds": 180, "HeartbeatSeconds": 60, "Next": "GetCustomerAddressDimensionLoadStatus", "InputPath": "$", "ResultPath": "$" }, "GetCustomerAddressDimensionLoadStatus": { "Type": "Task", "Next": "CheckCustomerAddressDimensionLoadStatus", "Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE", "TimeoutSeconds": 180, "HeartbeatSeconds": 60, "InputPath": "$", "ResultPath": "$.status" }, "CheckCustomerAddressDimensionLoadStatus": { "Type": "Choice", "Choices": [ { "Variable": "$.status", "StringEquals": "FAILED", "Next": "FailCustomerAddressDimensionLoad" }, { "Variable": "$.status", "StringEquals": "FINISHED", "Next": "CompleteCustomerAddressDimensionLoad" } ], "Default": "CustomerAddressWait" }, "CustomerAddressWait": { "Type": "Wait", "Seconds": 5, "Next": "GetCustomerAddressDimensionLoadStatus" }, "CompleteCustomerAddressDimensionLoad": { "Type": "Task", "Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE", "TimeoutSeconds": 180, "HeartbeatSeconds": 60, "End": true }, "FailCustomerAddressDimensionLoad": { "Type": "Fail", "Cause": "ETL Workflow Failed", "Error": "Error" } } }, { "StartAt": "InitializeItemDimensionLoadJob", "States": { "InitializeItemDimensionLoadJob": { "Type": "Pass", "Next": "ExecuteItemDimensionLoadJob", "Result": { "input": { "redshift_cluster_id": "cfn36-redshiftcluster-AKIAI44QH8DHBEXAMPLE", "redshift_database": "dev", "redshift_user": "awsuser", "redshift_schema": "tpcds", "action": "load_item", "sql_statement": [ "begin transaction;", "/* Create a staging table to hold the input data. Staging table is created with BACKUP NO option for faster inserts and also data temporary */", "drop table if exists {0}.stg_item;", "create table if not exists {0}.stg_item", "(i_item_id varchar(16) encode zstd", ",i_rec_start_date date encode zstd", ",i_rec_end_date date encode zstd", ",i_current_price numeric(7,2) encode zstd", ",i_category varchar(50) encode zstd", ",i_product_name varchar(50) encode zstd", ")", "backup no", "diststyle even;", "/* Ingest data from source */", "insert into {0}.stg_item", "(i_item_id,i_rec_start_date,i_rec_end_date,i_current_price,i_category,i_product_name)", "values", "('AAAAAAAAABJBAAAA','2000-10-27',NULL,4.10,'Books','ationoughtesecally'),", "('AAAAAAAAOPKBAAAA','2001-10-27',NULL,4.22,'Books','ableoughtn stcally'),", "('AAAAAAAAHGPAAAAA','1997-10-27',NULL,29.30,'Books','priesen stpri'),", "('AAAAAAAAICMAAAAA','2001-10-27',NULL,1.93,'Books','eseoughtoughtpri'),", "('AAAAAAAAGPGBAAAA','2001-10-27',NULL,9.96,'Books','bareingeinganti'),", "('AAAAAAAANBEBAAAA','1997-10-27',NULL,2.25,'Music','n steseoughtanti'),", "('AAAAAAAACLAAAAAA','2001-10-27',NULL,1.71,'Home','bareingought'),", "('AAAAAAAAOBBDAAAA','2001-10-27',NULL,5.55,'Books','callyationantiableought');", "/************************************************************************************************************************", "** Type 2 is maintained for i_current_price column.", "** Update all attributes for the item when the price is not changed", "** Sunset existing active item record with current i_rec_end_date and insert a new record when the price does not match", "*************************************************************************************************************************/", "update {0}.item", " set i_category = stg_item.i_category,", " i_product_name = stg_item.i_product_name", " from {0}.stg_item", " where item.i_item_id = stg_item.i_item_id", " and item.i_rec_end_date is null", " and item.i_current_price = stg_item.i_current_price;", "insert into {0}.item", "(i_item_sk", ",i_item_id", ",i_rec_start_date", ",i_rec_end_date", ",i_current_price", ",i_category", ",i_product_name", ")", "with max_item_sk as", "(select max(i_item_sk) max_item_sk", " from {0}.item)", "select row_number() over (order by stg_item.i_item_id) + max_item_sk as i_item_sk", " ,stg_item.i_item_id", " ,trunc(sysdate) as i_rec_start_date", " ,null as i_rec_end_date", " ,stg_item.i_current_price", " ,stg_item.i_category", " ,stg_item.i_product_name", " from {0}.stg_item, {0}.item, max_item_sk", " where item.i_item_id = stg_item.i_item_id", " and item.i_rec_end_date is null", " and item.i_current_price <> stg_item.i_current_price;", "/* Sunset penultimate records that were inserted as type 2 */", "update {0}.item", " set i_rec_end_date = trunc(sysdate)", " from {0}.stg_item", " where item.i_item_id = stg_item.i_item_id", " and item.i_rec_end_date is null", " and item.i_current_price <> stg_item.i_current_price;", "/* Commit and End transaction */", "commit;", "end transaction;" ] } } }, "ExecuteItemDimensionLoadJob": { "Type": "Task", "Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE", "TimeoutSeconds": 180, "HeartbeatSeconds": 60, "Next": "GetItemDimensionLoadStatus", "InputPath": "$", "ResultPath": "$" }, "GetItemDimensionLoadStatus": { "Type": "Task", "Next": "CheckItemDimensionLoadStatus", "Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE", "TimeoutSeconds": 180, "HeartbeatSeconds": 60, "InputPath": "$", "ResultPath": "$.status" }, "CheckItemDimensionLoadStatus": { "Type": "Choice", "Choices": [ { "Variable": "$.status", "StringEquals": "FAILED", "Next": "FailItemDimensionLoad" }, { "Variable": "$.status", "StringEquals": "FINISHED", "Next": "CompleteItemDimensionLoad" } ], "Default": "ItemWait" }, "ItemWait": { "Type": "Wait", "Seconds": 5, "Next": "GetItemDimensionLoadStatus" }, "CompleteItemDimensionLoad": { "Type": "Task", "Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE", "TimeoutSeconds": 180, "HeartbeatSeconds": 60, "End": true }, "FailItemDimensionLoad": { "Type": "Fail", "Cause": "ETL Workflow Failed", "Error": "Error" } } } ] }, "InitializeSalesFactLoadJob": { "Type": "Pass", "Next": "ExecuteSalesFactLoadJob", "Result": { "input": { "redshift_cluster_id": "cfn36-redshiftcluster-AKIAI44QH8DHBEXAMPLE", "redshift_database": "dev", "redshift_user": "awsuser", "redshift_schema": "tpcds", "snapshot_date": "2003-01-02", "action": "load_sales_fact", "sql_statement": [ "begin transaction;", "/* Create a stg_store_sales staging table */", "drop table if exists {0}.stg_store_sales;", "create table {0}.stg_store_sales", "(sold_date date encode zstd", ",i_item_id varchar(16) encode zstd", ",c_customer_id varchar(16) encode zstd", ",ca_address_id varchar(16) encode zstd", ",ss_ticket_number integer encode zstd", ",ss_quantity integer encode zstd", ",ss_net_paid numeric(7,2) encode zstd", ",ss_net_profit numeric(7,2) encode zstd", ")", "backup no", "diststyle even;", "/* Ingest data from source */", "insert into {0}.stg_store_sales", "(sold_date,i_item_id,c_customer_id,ca_address_id,ss_ticket_number,ss_quantity,ss_net_paid,ss_net_profit)", "values", "('2003-01-02','AAAAAAAAIFNAAAAA','AAAAAAAAOHNBAAAA','AAAAAAAAGBOFNIAA',1403191,13,5046.37,150.97),", "('2003-01-02','AAAAAAAAIFNAAAAA','AAAAAAAAOHNBAAAA','AAAAAAAAGBOFNIAA',1403191,13,2103.72,-124.08),", "('2003-01-02','AAAAAAAAILOBAAAA','AAAAAAAAOHNBAAAA','AAAAAAAAGBOFNIAA',1403191,13,959.10,-1304.70),", "('2003-01-02','AAAAAAAAILOBAAAA','AAAAAAAAHNAJAAAA','AAAAAAAAIAPCFNAA',1403191,13,962.65,-475.80),", "('2003-01-02','AAAAAAAAMCOAAAAA','AAAAAAAAHNAJAAAA','AAAAAAAAIAPCFNAA',1201746,17,111.60,-241.65),", "('2003-01-02','AAAAAAAAMCOAAAAA','AAAAAAAAHNAJAAAA','AAAAAAAAIAPCFNAA',1201746,17,4013.02,-1111.48),", "('2003-01-02','AAAAAAAAMCOAAAAA','AAAAAAAAMJCLACAA','AAAAAAAAMIFKEHAA',1201746,17,2689.12,-5572.28),", "('2003-01-02','AAAAAAAAMGOAAAAA','AAAAAAAAMJCLACAA','AAAAAAAAMIFKEHAA',193971,18,1876.89,-556.35);", "/* Delete any rows from target store_sales for the input date for idempotency */", "delete from {0}.store_sales where ss_sold_date_sk in (select d_date_sk from {0}.date_dim where d_date='{1}');", "/* Insert data from staging table to the target table */", "insert into {0}.store_sales", "(ss_sold_date_sk", ",ss_item_sk", ",ss_customer_sk", ",ss_addr_sk", ",ss_ticket_number", ",ss_quantity", ",ss_net_paid", ",ss_net_profit", ")", "select date_dim.d_date_sk ss_sold_date_sk", " ,item.i_item_sk ss_item_sk", " ,customer.c_customer_sk ss_customer_sk", " ,customer_address.ca_address_sk ss_addr_sk", " ,ss_ticket_number", " ,ss_quantity", " ,ss_net_paid", " ,ss_net_profit", " from {0}.stg_store_sales as store_sales", " inner join {0}.date_dim on store_sales.sold_date = date_dim.d_date", " left join {0}.item on store_sales.i_item_id = item.i_item_id and item.i_rec_end_date is null", " left join {0}.customer on store_sales.c_customer_id = customer.c_customer_id", " left join {0}.customer_address on store_sales.ca_address_id = customer_address.ca_address_id;", "/* Drop staging table */", "drop table if exists {0}.stg_store_sales;", "/* Commit and End transaction */", "commit;", "end transaction;" ] } } }, "ExecuteSalesFactLoadJob": { "Type": "Task", "Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE", "TimeoutSeconds": 180, "HeartbeatSeconds": 60, "Next": "GetSalesFactLoadStatus", "InputPath": "$", "ResultPath": "$" }, "GetSalesFactLoadStatus": { "Type": "Task", "Next": "CheckSalesFactLoadStatus", "Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE", "TimeoutSeconds": 180, "HeartbeatSeconds": 60, "InputPath": "$", "ResultPath": "$.status" }, "CheckSalesFactLoadStatus": { "Type": "Choice", "Choices": [ { "Variable": "$.status", "StringEquals": "FAILED", "Next": "FailSalesFactLoad" }, { "Variable": "$.status", "StringEquals": "FINISHED", "Next": "SalesETLPipelineComplete" } ], "Default": "SalesWait" }, "SalesWait": { "Type": "Wait", "Seconds": 5, "Next": "GetSalesFactLoadStatus" }, "FailSalesFactLoad": { "Type": "Fail", "Cause": "ETL Workflow Failed", "Error": "Error" }, "ClusterUnavailable": { "Type": "Fail", "Cause": "Redshift cluster is not available", "Error": "Error" }, "SalesETLPipelineComplete": { "Type": "Pass", "Next": "ValidateSalesMetric", "Result": { "input": { "redshift_cluster_id": "cfn36-redshiftcluster-AKIAI44QH8DHBEXAMPLE", "redshift_database": "dev", "redshift_user": "awsuser", "redshift_schema": "tpcds", "snapshot_date": "2003-01-02", "action": "validate_sales_metric", "sql_statement": [ "select 1/count(1) from {0}.store_sales where ss_sold_date_sk in (select d_date_sk from {0}.date_dim where d_date='{1}')" ] } } }, "ValidateSalesMetric": { "Type": "Task", "Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE", "TimeoutSeconds": 180, "HeartbeatSeconds": 60, "Next": "GetValidateSalesMetricStatus", "InputPath": "$", "ResultPath": "$" }, "GetValidateSalesMetricStatus": { "Type": "Task", "Next": "CheckValidateSalesMetricStatus", "Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE", "TimeoutSeconds": 180, "HeartbeatSeconds": 60, "InputPath": "$", "ResultPath": "$.status" }, "CheckValidateSalesMetricStatus": { "Type": "Choice", "Choices": [ { "Variable": "$.status", "StringEquals": "FAILED", "Next": "FailSalesMetricValidation" }, { "Variable": "$.status", "StringEquals": "FINISHED", "Next": "DataValidationComplete" } ], "Default": "SalesValidationWait" }, "SalesValidationWait": { "Type": "Wait", "Seconds": 5, "Next": "GetValidateSalesMetricStatus" }, "FailSalesMetricValidation": { "Type": "Fail", "Cause": "Data Validation Failed", "Error": "Error" }, "DataValidationComplete": { "Type": "Pass", "Next": "InitializePauseCluster" }, "InitializePauseCluster": { "Type": "Pass", "Next": "PauseCluster", "Result": { "input": { "redshift_cluster_id": "cfn36-redshiftcluster-AKIAI44QH8DHBEXAMPLE", "operation": "pause" } } }, "PauseCluster": { "Type": "Task", "Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftOperations-AKIAIOSFODNN7EXAMPLE", "TimeoutSeconds": 180, "HeartbeatSeconds": 60, "Next": "PauseClusterWait", "InputPath": "$", "ResultPath": "$.clusterStatus", "Catch": [ { "ErrorEquals": [ "States.ALL" ], "Next": "ClusterPausedComplete" } ] }, "InitializeCheckPauseCluster": { "Type": "Pass", "Next": "GetStateOfPausedCluster", "Result": { "input": { "redshift_cluster_id": "cfn36-redshiftcluster-AKIAI44QH8DHBEXAMPLE", "operation": "status" } } }, "GetStateOfPausedCluster": { "Type": "Task", "Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftOperations-AKIAIOSFODNN7EXAMPLE", "TimeoutSeconds": 180, "HeartbeatSeconds": 60, "Next": "IsClusterPaused", "InputPath": "$", "ResultPath": "$.clusterStatus" }, "IsClusterPaused": { "Type": "Choice", "Choices": [ { "Variable": "$.clusterStatus", "StringEquals": "available", "Next": "InitializePauseCluster" }, { "Variable": "$.clusterStatus", "StringEquals": "paused", "Next": "ClusterPausedComplete" }, { "Variable": "$.clusterStatus", "StringEquals": "unavailable", "Next": "ClusterUnavailable" }, { "Variable": "$.clusterStatus", "StringEquals": "resuming", "Next": "PauseClusterWait" } ] }, "PauseClusterWait": { "Type": "Wait", "Seconds": 720, "Next": "InitializeCheckPauseCluster" }, "ClusterPausedComplete": { "Type": "Pass", "End": true } } }

Pour plus d'informations sur la configuration de l'IAM lors de l'utilisation de Step Functions avec d'autres AWS services, consultezPolitiques IAM pour les services intégrés.

Exemple IAM

Ces exemples de politiques AWS Identity and Access Management (IAM) générés par l'exemple de projet incluent le moindre privilège nécessaire pour exécuter la machine à états et les ressources associées. Nous vous recommandons de n'inclure que les autorisations nécessaires dans vos politiques IAM.

{ "Version": "2012-10-17", "Statement": [ { "Action": [ "lambda:InvokeFunction" ], "Resource": [ "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE", "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftOperations-AKIAIOSFODNN7EXAMPLE" ], "Effect": "Allow" } ] }

Pour plus d'informations sur la configuration de l'IAM lors de l'utilisation de Step Functions avec d'autres AWS services, consultezPolitiques IAM pour les services intégrés.