Jalankan alur kerja ETL/ELT menggunakan Amazon Redshift (Lambda, API Data Amazon Redshift) - AWS Step Functions

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Jalankan alur kerja ETL/ELT menggunakan Amazon Redshift (Lambda, API Data Amazon Redshift)

Proyek sampel ini menunjukkan cara menggunakan Step Functions dan API Data Amazon Redshift untuk menjalankan alur kerja ETL/ELT yang memuat data ke gudang data Amazon Redshift.

Dalam proyek ini, Step Functions menggunakan AWS Lambda fungsi dan Amazon Redshift Data API untuk membuat objek database yang diperlukan dan untuk menghasilkan satu set data contoh, kemudian mengeksekusi dua pekerjaan secara paralel yang melakukan pemuatan tabel dimensi, diikuti oleh tabel fakta. Setelah kedua tugas memuat dimensi berakhir dengan sukses, Step Functions mengeksekusi beban tugas untuk tabel fakta, menjalankan tugas validasi, kemudian menjeda klaster Amazon Redshift.

catatan

Anda dapat mengubah logika ETL untuk menerima data dari sumber lain seperti Amazon S3, yang dapat menggunakan perintah SALIN untuk menyalin data dari Amazon S3 ke tabel Amazon Redshift.

Untuk informasi selengkapnya tentang integrasi layanan Amazon Redshift dan Step Functions, lihat di bawah ini:

catatan

Proyek sampel ini mungkin dikenakan biaya.

Untuk AWS pengguna baru, tingkat penggunaan gratis tersedia. Pada tingkat ini, layanan akan gratis di bawah tingkat penggunaan tertentu. Untuk informasi selengkapnya tentang AWS biaya dan Tingkat Gratis, lihat AWS Step Functions harga.

Langkah 1: Buat mesin negara dan sumber daya penyediaan

  1. Buka Konsol Step Functions dan pilih Buat mesin status.

  2. Ketik ETL job in Amazon Redshift di kotak pencarian, lalu pilih pekerjaan ETL Amazon Redshift dari hasil pencarian yang dikembalikan.

  3. Pilih Next untuk melanjutkan.

  4. Step Functions mencantumkan yang AWS layanan digunakan dalam proyek sampel yang Anda pilih. Ini juga menunjukkan grafik alur kerja untuk proyek sampel. Menyebarkan proyek ini ke Anda Akun AWS atau menggunakannya sebagai titik awal untuk membangun proyek Anda sendiri. Berdasarkan cara Anda ingin melanjutkan, pilih Jalankan demo atau Bangun di atasnya.

    Proyek contoh ini menyebarkan sumber daya berikut:

    • Sebuah Amazon Redshift cluster

    • Dua fungsi Lambda

    • Amazon RedshiftSkema

    • Lima Amazon Redshift tabel

    • Mesin AWS Step Functions negara

    • Peran terkait AWS Identity and Access Management (IAM).

    Gambar berikut menunjukkan grafik alur kerja untuk pekerjaan ETL dalam proyek Amazon Redshift sampel:

    Grafik alur kerja dari pekerjaan ETL dalam proyek Amazon Redshift sampel.
  5. Pilih Gunakan templat untuk melanjutkan pilihan Anda.

  6. Lakukan salah satu hal berikut:

    • Jika Anda memilih Build on it, Step Functions akan membuat prototipe alur kerja untuk proyek sampel yang Anda pilih. Step Functions tidak menyebarkan sumber daya yang tercantum dalam definisi alur kerja.

      Di Workflow StudioMode desain, seret dan lepas status dari Peramban status untuk terus membangun prototipe alur kerja Anda. Atau beralih ke Mode kode yang menyediakan editor kode terintegrasi yang mirip dengan VS Code untuk memperbarui definisi Amazon States Language (ASL) mesin status Anda dalam konsol Step Functions. Untuk informasi selengkapnya tentang penggunaan Workflow Studio untuk membangun mesin status Anda, lihatMenggunakan Workflow Studio.

    • Jika Anda memilih Jalankan demo, Step Functions akan membuat proyek sampel hanya-baca yang menggunakan AWS CloudFormation templat untuk menyebarkan AWS sumber daya yang tercantum dalam templat tersebut ke templat Anda. Akun AWS

      Tip

      Untuk melihat definisi mesin status dari proyek sampel, pilih Kode.

      Saat Anda siap, pilih Deploy dan jalankan untuk menyebarkan proyek sampel dan membuat sumber daya.

      Tindakan ini dapat memakan waktu hingga 10 menit untuk membuat sumber daya dan izin IAM terkait. Saat sumber daya Anda sedang digunakan, Anda dapat membuka tautan CloudFormation Stack ID untuk melihat sumber daya mana yang sedang disediakan.

      Setelah semua sumber daya dalam proyek sampel dibuat, Anda dapat melihat proyek sampel baru yang tercantum di halaman mesin Negara.

      penting

      Biaya standar mungkin berlaku untuk setiap layanan yang digunakan dalam CloudFormation templat.

Langkah 2: Jalankan mesin negara

  1. Pada halaman mesin Negara, pilih proyek sampel Anda.

  2. Pada halaman proyek sampel, pilih Mulai eksekusi.

  3. Dalam kotak dialog Mulai eksekusi, lakukan hal berikut:

    1. (Opsional) Untuk mengidentifikasi eksekusi Anda, Anda dapat menentukan nama untuk itu di Nama kotak. Secara default, Step Functions menghasilkan nama eksekusi unik secara otomatis.

      catatan

      Step Functions memungkinkan Anda membuat nama untuk mesin status, eksekusi, dan aktivitas, serta label yang berisi karakter non-ASCII. Nama-nama non-ASCII ini tidak berfungsi dengan Amazon. CloudWatch Untuk memastikan bahwa Anda dapat melacak CloudWatch metrik, pilih nama yang hanya menggunakan karakter ASCII.

    2. (Opsional) Dalam kotak Input, masukkan nilai input dalam format JSON untuk menjalankan alur kerja Anda.

      Jika Anda memilih untuk Menjalankan demo, Anda tidak perlu memberikan input eksekusi apa pun.

      catatan

      Jika proyek demo yang Anda gunakan berisi data input eksekusi yang telah diisi sebelumnya, gunakan input tersebut untuk menjalankan mesin status.

    3. Pilih Mulai Eksekusi.

    4. Konsol Step Functions mengarahkan Anda ke halaman yang berjudul dengan ID eksekusi Anda. Halaman ini dikenal sebagai halaman Detail Eksekusi. Di halaman ini, Anda dapat meninjau hasil eksekusi saat eksekusi berlangsung atau setelah selesai.

      Untuk meninjau hasil eksekusi, pilih status individual pada tampilan Grafik, lalu pilih tab individual di Detail langkah panel untuk melihat detail setiap status termasuk input, output, dan definisi masing-masing. Untuk detail tentang informasi eksekusi yang dapat Anda lihat di halaman Rincian Eksekusi, lihatHalaman Detail Eksekusi - Ikhtisar antarmuka.

Contoh Kode Mesin Status

Mesin status dalam proyek sampel ini terintegrasi AWS Lambda dengan meneruskan logika ETL sebagai InputPath langsung ke sumber daya tersebut dan dieksekusi secara asinkron menggunakan Amazon Redshift Data API.

Jelajahi contoh state machine ini untuk melihat bagaimana Step Functions mengontrol AWS Lambda dan Amazon Redshift Data API.

Untuk informasi selengkapnya tentang AWS Step Functions cara mengontrol AWS layanan lain, lihatMenggunakan AWS Step Functions dengan layanan lain.

{ "Comment": "A simple ETL workflow for loading dimension and fact tables", "StartAt": "InitializeCheckCluster", "States": { "InitializeCheckCluster": { "Type": "Pass", "Next": "GetStateOfCluster", "Result": { "input": { "redshift_cluster_id": "cfn36-redshiftcluster-AKIAI44QH8DHBEXAMPLE", "operation": "status" } } }, "GetStateOfCluster": { "Type": "Task", "Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftOperations-AKIAIOSFODNN7EXAMPLE", "TimeoutSeconds": 180, "HeartbeatSeconds": 60, "Next": "IsClusterAvailable", "InputPath": "$", "ResultPath": "$.clusterStatus" }, "IsClusterAvailable": { "Type": "Choice", "Choices": [ { "Variable": "$.clusterStatus", "StringEquals": "available", "Next": "InitializeBuildDB" }, { "Variable": "$.clusterStatus", "StringEquals": "paused", "Next": "InitializeResumeCluster" }, { "Variable": "$.clusterStatus", "StringEquals": "unavailable", "Next": "ClusterUnavailable" }, { "Variable": "$.clusterStatus", "StringEquals": "resuming", "Next": "ClusterWait" } ] }, "ClusterWait": { "Type": "Wait", "Seconds": 720, "Next": "InitializeCheckCluster" }, "InitializeResumeCluster": { "Type": "Pass", "Next": "ResumeCluster", "Result": { "input": { "redshift_cluster_id": "cfn36-redshiftcluster-AKIAI44QH8DHBEXAMPLE", "operation": "resume" } } }, "ResumeCluster": { "Type": "Task", "Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftOperations-AKIAIOSFODNN7EXAMPLE", "TimeoutSeconds": 180, "HeartbeatSeconds": 60, "Next": "ClusterWait", "InputPath": "$", "ResultPath": "$" }, "InitializeBuildDB": { "Type": "Pass", "Next": "BuildDB", "Result": { "input": { "redshift_cluster_id": "cfn36-redshiftcluster-AKIAI44QH8DHBEXAMPLE", "redshift_database": "dev", "redshift_user": "awsuser", "redshift_schema": "tpcds", "action": "build_database", "sql_statement": [ "create schema if not exists {0} authorization {1};", "create table if not exists {0}.customer", "(c_customer_sk int4 not null encode az64", ",c_customer_id char(16) not null encode zstd", ",c_current_addr_sk int4 encode az64", ",c_first_name char(20) encode zstd", ",c_last_name char(30) encode zstd", ",primary key (c_customer_sk)", ") distkey(c_customer_sk);", "--", "create table if not exists {0}.customer_address", "(ca_address_sk int4 not null encode az64", ",ca_address_id char(16) not null encode zstd", ",ca_state char(2) encode zstd", ",ca_zip char(10) encode zstd", ",ca_country varchar(20) encode zstd", ",primary key (ca_address_sk)", ") distkey(ca_address_sk);", "--", "create table if not exists {0}.date_dim", "(d_date_sk integer not null encode az64", ",d_date_id char(16) not null encode zstd", ",d_date date encode az64", ",d_day_name char(9) encode zstd", ",primary key (d_date_sk)", ") diststyle all;", "--", "create table if not exists {0}.item", "(i_item_sk int4 not null encode az64", ",i_item_id char(16) not null encode zstd", ",i_rec_start_date date encode az64", ",i_rec_end_date date encode az64", ",i_current_price numeric(7,2) encode az64", ",i_category char(50) encode zstd", ",i_product_name char(50) encode zstd", ",primary key (i_item_sk)", ") distkey(i_item_sk) sortkey(i_category);", "--", "create table if not exists {0}.store_sales", "(ss_sold_date_sk int4", ",ss_item_sk int4 not null encode az64", ",ss_customer_sk int4 encode az64", ",ss_addr_sk int4 encode az64", ",ss_store_sk int4 encode az64", ",ss_ticket_number int8 not null encode az64", ",ss_quantity int4 encode az64", ",ss_net_paid numeric(7,2) encode az64", ",ss_net_profit numeric(7,2) encode az64", ",primary key (ss_item_sk, ss_ticket_number)", ") distkey(ss_item_sk) sortkey(ss_sold_date_sk);" ] } } }, "BuildDB": { "Type": "Task", "Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE", "TimeoutSeconds": 180, "HeartbeatSeconds": 60, "Next": "GetBuildDBStatus", "InputPath": "$", "ResultPath": "$" }, "GetBuildDBStatus": { "Type": "Task", "Next": "CheckBuildDBStatus", "Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE", "TimeoutSeconds": 180, "HeartbeatSeconds": 60, "InputPath": "$", "ResultPath": "$.status" }, "CheckBuildDBStatus": { "Type": "Choice", "Choices": [ { "Variable": "$.status", "StringEquals": "FAILED", "Next": "FailBuildDB" }, { "Variable": "$.status", "StringEquals": "FINISHED", "Next": "InitializeBaselineData" } ], "Default": "BuildDBWait" }, "BuildDBWait": { "Type": "Wait", "Seconds": 15, "Next": "GetBuildDBStatus" }, "FailBuildDB": { "Type": "Fail", "Cause": "Database Build Failed", "Error": "Error" }, "InitializeBaselineData": { "Type": "Pass", "Next": "LoadBaselineData", "Result": { "input": { "redshift_cluster_id": "cfn36-redshiftcluster-AKIAI44QH8DHBEXAMPLE", "redshift_database": "dev", "redshift_user": "awsuser", "redshift_schema": "tpcds", "action": "load_baseline_data", "sql_statement": [ "begin transaction;", "truncate table {0}.customer;", "insert into {0}.customer (c_customer_sk,c_customer_id,c_current_addr_sk,c_first_name,c_last_name)", "values", "(7550,'AAAAAAAAOHNBAAAA',9264662,'Michelle','Deaton'),", "(37079,'AAAAAAAAHNAJAAAA',13971208,'Michael','Simms'),", "(40626,'AAAAAAAACLOJAAAA',1959255,'Susan','Ryder'),", "(2142876,'AAAAAAAAMJCLACAA',7644556,'Justin','Brown');", "analyze {0}.customer;", "--", "truncate table {0}.customer_address;", "insert into {0}.customer_address (ca_address_sk,ca_address_id,ca_state,ca_zip,ca_country)", "values", "(13971208,'AAAAAAAAIAPCFNAA','NE','63451','United States'),", "(7644556,'AAAAAAAAMIFKEHAA','SD','58883','United States'),", "(9264662,'AAAAAAAAGBOFNIAA','CA','99310','United States');", "analyze {0}.customer_address;", "--", "truncate table {0}.item;", "insert into {0}.item (i_item_sk,i_item_id,i_rec_start_date,i_rec_end_date,i_current_price,i_category,i_product_name)", "values", "(3417,'AAAAAAAAIFNAAAAA','1997-10-27',NULL,14.29,'Electronics','ationoughtesepri '),", "(9615,'AAAAAAAAOIFCAAAA','1997-10-27',NULL,9.68,'Home','antioughtcallyn st'),", "(3693,'AAAAAAAAMGOAAAAA','2001-03-12',NULL,2.10,'Men','prin stcallypri'),", "(3630,'AAAAAAAAMCOAAAAA','2001-10-27',NULL,2.95,'Electronics','barpricallypri'),", "(16506,'AAAAAAAAIHAEAAAA','2001-10-27',NULL,3.85,'Home','callybaranticallyought'),", "(7866,'AAAAAAAAILOBAAAA','2001-10-27',NULL,12.60,'Jewelry','callycallyeingation');", "--", "analyze {0}.item;", "truncate table {0}.date_dim;", "insert into {0}.date_dim (d_date_sk,d_date_id,d_date,d_day_name)", "values", "(2450521,'AAAAAAAAJFEGFCAA','1997-03-13','Thursday'),", "(2450749,'AAAAAAAANDFGFCAA','1997-10-27','Monday'),", "(2451251,'AAAAAAAADDHGFCAA','1999-03-13','Saturday'),", "(2451252,'AAAAAAAAEDHGFCAA','1999-03-14','Sunday'),", "(2451981,'AAAAAAAANAKGFCAA','2001-03-12','Monday'),", "(2451982,'AAAAAAAAOAKGFCAA','2001-03-13','Tuesday'),", "(2452210,'AAAAAAAACPKGFCAA','2001-10-27','Saturday'),", "(2452641,'AAAAAAAABKMGFCAA','2003-01-01','Wednesday'),", "(2452642,'AAAAAAAACKMGFCAA','2003-01-02','Thursday');", "--", "analyze {0}.date_dim;", "-- commit and End transaction", "commit;", "end transaction;" ] } } }, "LoadBaselineData": { "Type": "Task", "Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE", "TimeoutSeconds": 180, "HeartbeatSeconds": 60, "Next": "GetBaselineData", "InputPath": "$", "ResultPath": "$" }, "GetBaselineData": { "Type": "Task", "Next": "CheckBaselineData", "Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE", "TimeoutSeconds": 180, "HeartbeatSeconds": 60, "InputPath": "$", "ResultPath": "$.status" }, "CheckBaselineData": { "Type": "Choice", "Choices": [ { "Variable": "$.status", "StringEquals": "FAILED", "Next": "FailLoadBaselineData" }, { "Variable": "$.status", "StringEquals": "FINISHED", "Next": "ParallelizeDimensionLoadJob" } ], "Default": "BaselineDataWait" }, "BaselineDataWait": { "Type": "Wait", "Seconds": 20, "Next": "GetBaselineData" }, "FailLoadBaselineData": { "Type": "Fail", "Cause": "Load Baseline Data Failed", "Error": "Error" }, "ParallelizeDimensionLoadJob": { "Type": "Parallel", "Next": "InitializeSalesFactLoadJob", "ResultPath": "$.status", "Branches": [ { "StartAt": "InitializeCustomerAddressDimensionLoadJob", "States": { "InitializeCustomerAddressDimensionLoadJob": { "Type": "Pass", "Next": "ExecuteCustomerAddressDimensionLoadJob", "Result": { "input": { "redshift_cluster_id": "cfn36-redshiftcluster-AKIAI44QH8DHBEXAMPLE", "redshift_database": "dev", "redshift_user": "awsuser", "redshift_schema": "tpcds", "action": "load_customer_address", "sql_statement": [ "begin transaction;", "/* Create a staging table to hold the input data. Staging table is created with BACKUP NO option for faster inserts and also data temporary */", "drop table if exists {0}.stg_customer_address;", "create table if not exists {0}.stg_customer_address", "(ca_address_id varchar(16) encode zstd", ",ca_state varchar(2) encode zstd", ",ca_zip varchar(10) encode zstd", ",ca_country varchar(20) encode zstd", ")", "backup no", "diststyle even;", "/* Ingest data from source */", "insert into {0}.stg_customer_address (ca_address_id,ca_state,ca_zip,ca_country)", "values", "('AAAAAAAACFBBAAAA','NE','','United States'),", "('AAAAAAAAGAEFAAAA','NE','61749','United States'),", "('AAAAAAAAPJKKAAAA','OK','','United States'),", "('AAAAAAAAMIHGAAAA','AL','','United States');", "/* Perform UPDATE for existing data with refreshed attribute values */", "update {0}.customer_address", " set ca_state = stg_customer_address.ca_state,", " ca_zip = stg_customer_address.ca_zip,", " ca_country = stg_customer_address.ca_country", " from {0}.stg_customer_address", " where customer_address.ca_address_id = stg_customer_address.ca_address_id;", "/* Perform insert for new rows */", "insert into {0}.customer_address", "(ca_address_sk", ",ca_address_id", ",ca_state", ",ca_zip", ",ca_country", ")", "with max_customer_address_sk as", "(select max(ca_address_sk) max_ca_address_sk", "from {0}.customer_address)", "select row_number() over (order by stg_customer_address.ca_address_id) + max_customer_address_sk.max_ca_address_sk as ca_address_sk", ",stg_customer_address.ca_address_id", ",stg_customer_address.ca_state", ",stg_customer_address.ca_zip", ",stg_customer_address.ca_country", "from {0}.stg_customer_address,", "max_customer_address_sk", "where stg_customer_address.ca_address_id not in (select customer_address.ca_address_id from {0}.customer_address);", "/* Commit and End transaction */", "commit;", "end transaction;" ] } } }, "ExecuteCustomerAddressDimensionLoadJob": { "Type": "Task", "Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE", "TimeoutSeconds": 180, "HeartbeatSeconds": 60, "Next": "GetCustomerAddressDimensionLoadStatus", "InputPath": "$", "ResultPath": "$" }, "GetCustomerAddressDimensionLoadStatus": { "Type": "Task", "Next": "CheckCustomerAddressDimensionLoadStatus", "Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE", "TimeoutSeconds": 180, "HeartbeatSeconds": 60, "InputPath": "$", "ResultPath": "$.status" }, "CheckCustomerAddressDimensionLoadStatus": { "Type": "Choice", "Choices": [ { "Variable": "$.status", "StringEquals": "FAILED", "Next": "FailCustomerAddressDimensionLoad" }, { "Variable": "$.status", "StringEquals": "FINISHED", "Next": "CompleteCustomerAddressDimensionLoad" } ], "Default": "CustomerAddressWait" }, "CustomerAddressWait": { "Type": "Wait", "Seconds": 5, "Next": "GetCustomerAddressDimensionLoadStatus" }, "CompleteCustomerAddressDimensionLoad": { "Type": "Task", "Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE", "TimeoutSeconds": 180, "HeartbeatSeconds": 60, "End": true }, "FailCustomerAddressDimensionLoad": { "Type": "Fail", "Cause": "ETL Workflow Failed", "Error": "Error" } } }, { "StartAt": "InitializeItemDimensionLoadJob", "States": { "InitializeItemDimensionLoadJob": { "Type": "Pass", "Next": "ExecuteItemDimensionLoadJob", "Result": { "input": { "redshift_cluster_id": "cfn36-redshiftcluster-AKIAI44QH8DHBEXAMPLE", "redshift_database": "dev", "redshift_user": "awsuser", "redshift_schema": "tpcds", "action": "load_item", "sql_statement": [ "begin transaction;", "/* Create a staging table to hold the input data. Staging table is created with BACKUP NO option for faster inserts and also data temporary */", "drop table if exists {0}.stg_item;", "create table if not exists {0}.stg_item", "(i_item_id varchar(16) encode zstd", ",i_rec_start_date date encode zstd", ",i_rec_end_date date encode zstd", ",i_current_price numeric(7,2) encode zstd", ",i_category varchar(50) encode zstd", ",i_product_name varchar(50) encode zstd", ")", "backup no", "diststyle even;", "/* Ingest data from source */", "insert into {0}.stg_item", "(i_item_id,i_rec_start_date,i_rec_end_date,i_current_price,i_category,i_product_name)", "values", "('AAAAAAAAABJBAAAA','2000-10-27',NULL,4.10,'Books','ationoughtesecally'),", "('AAAAAAAAOPKBAAAA','2001-10-27',NULL,4.22,'Books','ableoughtn stcally'),", "('AAAAAAAAHGPAAAAA','1997-10-27',NULL,29.30,'Books','priesen stpri'),", "('AAAAAAAAICMAAAAA','2001-10-27',NULL,1.93,'Books','eseoughtoughtpri'),", "('AAAAAAAAGPGBAAAA','2001-10-27',NULL,9.96,'Books','bareingeinganti'),", "('AAAAAAAANBEBAAAA','1997-10-27',NULL,2.25,'Music','n steseoughtanti'),", "('AAAAAAAACLAAAAAA','2001-10-27',NULL,1.71,'Home','bareingought'),", "('AAAAAAAAOBBDAAAA','2001-10-27',NULL,5.55,'Books','callyationantiableought');", "/************************************************************************************************************************", "** Type 2 is maintained for i_current_price column.", "** Update all attributes for the item when the price is not changed", "** Sunset existing active item record with current i_rec_end_date and insert a new record when the price does not match", "*************************************************************************************************************************/", "update {0}.item", " set i_category = stg_item.i_category,", " i_product_name = stg_item.i_product_name", " from {0}.stg_item", " where item.i_item_id = stg_item.i_item_id", " and item.i_rec_end_date is null", " and item.i_current_price = stg_item.i_current_price;", "insert into {0}.item", "(i_item_sk", ",i_item_id", ",i_rec_start_date", ",i_rec_end_date", ",i_current_price", ",i_category", ",i_product_name", ")", "with max_item_sk as", "(select max(i_item_sk) max_item_sk", " from {0}.item)", "select row_number() over (order by stg_item.i_item_id) + max_item_sk as i_item_sk", " ,stg_item.i_item_id", " ,trunc(sysdate) as i_rec_start_date", " ,null as i_rec_end_date", " ,stg_item.i_current_price", " ,stg_item.i_category", " ,stg_item.i_product_name", " from {0}.stg_item, {0}.item, max_item_sk", " where item.i_item_id = stg_item.i_item_id", " and item.i_rec_end_date is null", " and item.i_current_price <> stg_item.i_current_price;", "/* Sunset penultimate records that were inserted as type 2 */", "update {0}.item", " set i_rec_end_date = trunc(sysdate)", " from {0}.stg_item", " where item.i_item_id = stg_item.i_item_id", " and item.i_rec_end_date is null", " and item.i_current_price <> stg_item.i_current_price;", "/* Commit and End transaction */", "commit;", "end transaction;" ] } } }, "ExecuteItemDimensionLoadJob": { "Type": "Task", "Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE", "TimeoutSeconds": 180, "HeartbeatSeconds": 60, "Next": "GetItemDimensionLoadStatus", "InputPath": "$", "ResultPath": "$" }, "GetItemDimensionLoadStatus": { "Type": "Task", "Next": "CheckItemDimensionLoadStatus", "Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE", "TimeoutSeconds": 180, "HeartbeatSeconds": 60, "InputPath": "$", "ResultPath": "$.status" }, "CheckItemDimensionLoadStatus": { "Type": "Choice", "Choices": [ { "Variable": "$.status", "StringEquals": "FAILED", "Next": "FailItemDimensionLoad" }, { "Variable": "$.status", "StringEquals": "FINISHED", "Next": "CompleteItemDimensionLoad" } ], "Default": "ItemWait" }, "ItemWait": { "Type": "Wait", "Seconds": 5, "Next": "GetItemDimensionLoadStatus" }, "CompleteItemDimensionLoad": { "Type": "Task", "Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE", "TimeoutSeconds": 180, "HeartbeatSeconds": 60, "End": true }, "FailItemDimensionLoad": { "Type": "Fail", "Cause": "ETL Workflow Failed", "Error": "Error" } } } ] }, "InitializeSalesFactLoadJob": { "Type": "Pass", "Next": "ExecuteSalesFactLoadJob", "Result": { "input": { "redshift_cluster_id": "cfn36-redshiftcluster-AKIAI44QH8DHBEXAMPLE", "redshift_database": "dev", "redshift_user": "awsuser", "redshift_schema": "tpcds", "snapshot_date": "2003-01-02", "action": "load_sales_fact", "sql_statement": [ "begin transaction;", "/* Create a stg_store_sales staging table */", "drop table if exists {0}.stg_store_sales;", "create table {0}.stg_store_sales", "(sold_date date encode zstd", ",i_item_id varchar(16) encode zstd", ",c_customer_id varchar(16) encode zstd", ",ca_address_id varchar(16) encode zstd", ",ss_ticket_number integer encode zstd", ",ss_quantity integer encode zstd", ",ss_net_paid numeric(7,2) encode zstd", ",ss_net_profit numeric(7,2) encode zstd", ")", "backup no", "diststyle even;", "/* Ingest data from source */", "insert into {0}.stg_store_sales", "(sold_date,i_item_id,c_customer_id,ca_address_id,ss_ticket_number,ss_quantity,ss_net_paid,ss_net_profit)", "values", "('2003-01-02','AAAAAAAAIFNAAAAA','AAAAAAAAOHNBAAAA','AAAAAAAAGBOFNIAA',1403191,13,5046.37,150.97),", "('2003-01-02','AAAAAAAAIFNAAAAA','AAAAAAAAOHNBAAAA','AAAAAAAAGBOFNIAA',1403191,13,2103.72,-124.08),", "('2003-01-02','AAAAAAAAILOBAAAA','AAAAAAAAOHNBAAAA','AAAAAAAAGBOFNIAA',1403191,13,959.10,-1304.70),", "('2003-01-02','AAAAAAAAILOBAAAA','AAAAAAAAHNAJAAAA','AAAAAAAAIAPCFNAA',1403191,13,962.65,-475.80),", "('2003-01-02','AAAAAAAAMCOAAAAA','AAAAAAAAHNAJAAAA','AAAAAAAAIAPCFNAA',1201746,17,111.60,-241.65),", "('2003-01-02','AAAAAAAAMCOAAAAA','AAAAAAAAHNAJAAAA','AAAAAAAAIAPCFNAA',1201746,17,4013.02,-1111.48),", "('2003-01-02','AAAAAAAAMCOAAAAA','AAAAAAAAMJCLACAA','AAAAAAAAMIFKEHAA',1201746,17,2689.12,-5572.28),", "('2003-01-02','AAAAAAAAMGOAAAAA','AAAAAAAAMJCLACAA','AAAAAAAAMIFKEHAA',193971,18,1876.89,-556.35);", "/* Delete any rows from target store_sales for the input date for idempotency */", "delete from {0}.store_sales where ss_sold_date_sk in (select d_date_sk from {0}.date_dim where d_date='{1}');", "/* Insert data from staging table to the target table */", "insert into {0}.store_sales", "(ss_sold_date_sk", ",ss_item_sk", ",ss_customer_sk", ",ss_addr_sk", ",ss_ticket_number", ",ss_quantity", ",ss_net_paid", ",ss_net_profit", ")", "select date_dim.d_date_sk ss_sold_date_sk", " ,item.i_item_sk ss_item_sk", " ,customer.c_customer_sk ss_customer_sk", " ,customer_address.ca_address_sk ss_addr_sk", " ,ss_ticket_number", " ,ss_quantity", " ,ss_net_paid", " ,ss_net_profit", " from {0}.stg_store_sales as store_sales", " inner join {0}.date_dim on store_sales.sold_date = date_dim.d_date", " left join {0}.item on store_sales.i_item_id = item.i_item_id and item.i_rec_end_date is null", " left join {0}.customer on store_sales.c_customer_id = customer.c_customer_id", " left join {0}.customer_address on store_sales.ca_address_id = customer_address.ca_address_id;", "/* Drop staging table */", "drop table if exists {0}.stg_store_sales;", "/* Commit and End transaction */", "commit;", "end transaction;" ] } } }, "ExecuteSalesFactLoadJob": { "Type": "Task", "Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE", "TimeoutSeconds": 180, "HeartbeatSeconds": 60, "Next": "GetSalesFactLoadStatus", "InputPath": "$", "ResultPath": "$" }, "GetSalesFactLoadStatus": { "Type": "Task", "Next": "CheckSalesFactLoadStatus", "Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE", "TimeoutSeconds": 180, "HeartbeatSeconds": 60, "InputPath": "$", "ResultPath": "$.status" }, "CheckSalesFactLoadStatus": { "Type": "Choice", "Choices": [ { "Variable": "$.status", "StringEquals": "FAILED", "Next": "FailSalesFactLoad" }, { "Variable": "$.status", "StringEquals": "FINISHED", "Next": "SalesETLPipelineComplete" } ], "Default": "SalesWait" }, "SalesWait": { "Type": "Wait", "Seconds": 5, "Next": "GetSalesFactLoadStatus" }, "FailSalesFactLoad": { "Type": "Fail", "Cause": "ETL Workflow Failed", "Error": "Error" }, "ClusterUnavailable": { "Type": "Fail", "Cause": "Redshift cluster is not available", "Error": "Error" }, "SalesETLPipelineComplete": { "Type": "Pass", "Next": "ValidateSalesMetric", "Result": { "input": { "redshift_cluster_id": "cfn36-redshiftcluster-AKIAI44QH8DHBEXAMPLE", "redshift_database": "dev", "redshift_user": "awsuser", "redshift_schema": "tpcds", "snapshot_date": "2003-01-02", "action": "validate_sales_metric", "sql_statement": [ "select 1/count(1) from {0}.store_sales where ss_sold_date_sk in (select d_date_sk from {0}.date_dim where d_date='{1}')" ] } } }, "ValidateSalesMetric": { "Type": "Task", "Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE", "TimeoutSeconds": 180, "HeartbeatSeconds": 60, "Next": "GetValidateSalesMetricStatus", "InputPath": "$", "ResultPath": "$" }, "GetValidateSalesMetricStatus": { "Type": "Task", "Next": "CheckValidateSalesMetricStatus", "Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE", "TimeoutSeconds": 180, "HeartbeatSeconds": 60, "InputPath": "$", "ResultPath": "$.status" }, "CheckValidateSalesMetricStatus": { "Type": "Choice", "Choices": [ { "Variable": "$.status", "StringEquals": "FAILED", "Next": "FailSalesMetricValidation" }, { "Variable": "$.status", "StringEquals": "FINISHED", "Next": "DataValidationComplete" } ], "Default": "SalesValidationWait" }, "SalesValidationWait": { "Type": "Wait", "Seconds": 5, "Next": "GetValidateSalesMetricStatus" }, "FailSalesMetricValidation": { "Type": "Fail", "Cause": "Data Validation Failed", "Error": "Error" }, "DataValidationComplete": { "Type": "Pass", "Next": "InitializePauseCluster" }, "InitializePauseCluster": { "Type": "Pass", "Next": "PauseCluster", "Result": { "input": { "redshift_cluster_id": "cfn36-redshiftcluster-AKIAI44QH8DHBEXAMPLE", "operation": "pause" } } }, "PauseCluster": { "Type": "Task", "Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftOperations-AKIAIOSFODNN7EXAMPLE", "TimeoutSeconds": 180, "HeartbeatSeconds": 60, "Next": "PauseClusterWait", "InputPath": "$", "ResultPath": "$.clusterStatus", "Catch": [ { "ErrorEquals": [ "States.ALL" ], "Next": "ClusterPausedComplete" } ] }, "InitializeCheckPauseCluster": { "Type": "Pass", "Next": "GetStateOfPausedCluster", "Result": { "input": { "redshift_cluster_id": "cfn36-redshiftcluster-AKIAI44QH8DHBEXAMPLE", "operation": "status" } } }, "GetStateOfPausedCluster": { "Type": "Task", "Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftOperations-AKIAIOSFODNN7EXAMPLE", "TimeoutSeconds": 180, "HeartbeatSeconds": 60, "Next": "IsClusterPaused", "InputPath": "$", "ResultPath": "$.clusterStatus" }, "IsClusterPaused": { "Type": "Choice", "Choices": [ { "Variable": "$.clusterStatus", "StringEquals": "available", "Next": "InitializePauseCluster" }, { "Variable": "$.clusterStatus", "StringEquals": "paused", "Next": "ClusterPausedComplete" }, { "Variable": "$.clusterStatus", "StringEquals": "unavailable", "Next": "ClusterUnavailable" }, { "Variable": "$.clusterStatus", "StringEquals": "resuming", "Next": "PauseClusterWait" } ] }, "PauseClusterWait": { "Type": "Wait", "Seconds": 720, "Next": "InitializeCheckPauseCluster" }, "ClusterPausedComplete": { "Type": "Pass", "End": true } } }

Untuk informasi tentang cara mengonfigurasi IAM saat menggunakan Step Functions dengan AWS layanan lain, lihatKebijakan IAM untuk layanan terintegrasi.

Contoh IAM

Contoh kebijakan AWS Identity and Access Management (IAM) yang dihasilkan oleh proyek sampel ini mencakup hak istimewa paling sedikit yang diperlukan untuk mengeksekusi mesin negara dan sumber daya terkait. Kami merekomendasikan agar Anda hanya menyertakan izin yang diperlukan dalam kebijakan IAM Anda.

{ "Version": "2012-10-17", "Statement": [ { "Action": [ "lambda:InvokeFunction" ], "Resource": [ "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE", "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftOperations-AKIAIOSFODNN7EXAMPLE" ], "Effect": "Allow" } ] }

Untuk informasi tentang cara mengonfigurasi IAM saat menggunakan Step Functions dengan AWS layanan lain, lihatKebijakan IAM untuk layanan terintegrasi.