翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
Amazon Redshift (Lambda、Amazon Redshift データ API) を使用して ETL/ELT ワークフローを実行する
このサンプルプロジェクトでは、Step Functions と Amazon Redshift データ API を使用して、Amazon Redshift データウェアハウスにデータをロードする ETL/ELT ワークフローを実行する方法を示します。
このプロジェクトでは、Step Functions AWS Lambda は関数と Amazon Redshift Data API を使用して必要なデータベースオブジェクトを作成し、一連のサンプルデータを生成します。次に、ディメンションテーブルの読み込みとそれに続くファクトテーブルの読み込みを行う 2 つのジョブをparallel 実行します。両方のディメンションロードジョブが正常に終了すると、Step Functions はファクトテーブルのロードジョブを実行し、検証ジョブを実行し、Amazon Redshift クラスターを一時停止します。
注記
ETL ロジックを変更して、Amazon S3 などの他のソースからデータを受信できます。[COPY] (コピー) コマンドを使用して、Amazon S3 から Amazon Redshift テーブルにデータをコピーします。
Amazon Redshift とStep Functions サービスの統合の詳細については、以下を参照してください。
注記
このサンプルプロジェクトでは、料金が発生する場合があります。
AWS 新規ユーザーには、無料利用枠が用意されています。この枠では、サービスを利用しても一定のレベル以下であれば無料です。 AWS 費用と無料利用枠について詳しくは、AWS Step Functions 料金をご覧ください
ステップ 1: ステートマシンを作成してリソースをプロビジョニングする
-
Step Functions コンソール
を開き、[ステートマシンの作成] を選択します。 -
検索ボックスに
ETL job in Amazon Redshift
と入力し、返された検索結果から [Amazon Redshift の ETL ジョブ] を選択します。 -
[次へ] を選択して続行します。
-
Step Functions には、 AWS のサービス 選択したサンプルプロジェクトで使用されているものが一覧表示されます。サンプルプロジェクトのワークフローグラフも表示されます。 AWS アカウント このプロジェクトをデプロイするか、独自のプロジェクトを構築するための出発点として使用してください。進める方法に応じて、[デモの実行] または [その上に構築する] を選択します。
このサンプルプロジェクトは、以下のリソースをデプロイします。
-
Amazon Redshift クラスター
-
2 つの Lambda 関数
-
Amazon Redshift スキーマ
-
5 つの Amazon Redshift テーブル
-
AWS Step Functions ステートマシン。
-
関連 AWS Identity and Access Management (IAM) ロール。
以下のイメージは、[Amazon Redshift の ETL ジョブ] サンプルプロジェクトのワークフローグラフを示しています。
-
-
[テンプレートの使用] を選択して選択を続行します。
-
次のいずれかを行います。
-
[その上に構築する] を選択した場合、Step Functions は選択したサンプルプロジェクトのワークフロープロトタイプを作成します。Step Functions は、ワークフロー定義にリストされているリソースをデプロイしません。
Workflow Studio の デザインモード では、[State browser] (状態ブラウザ) から状態をドラッグアンドドロップして、ワークフロープロトタイプの構築を続行できます。または、VS Code と同様の統合コードエディタを提供する コードモード に切り替えて、Step Functions コンソール内のステートマシンの Amazon ステートメント言語 (ASL) 定義を更新してください。Workflow Studio を使用してステートマシンを構築する方法の詳細については、「Workflow Studio を使用する」を参照してください。
重要
ワークフローを実行する前に、サンプルプロジェクトで使用されているリソースのプレースホルダー Amazon リソースネーム (ARN) を必ず更新してください。
-
[デモの実行] を選択した場合、Step Functions AWS CloudFormation AWS はテンプレートを使用してそのテンプレートにリストされているリソースをにデプロイする読み取り専用のサンプルプロジェクトを作成します。 AWS アカウント
ヒント
サンプルプロジェクトのステートマシン定義を表示するには、[コード] を選択します。
準備できたら、[デプロイと実行] を選択してサンプルプロジェクトをデプロイし、リソースを作成します。
これらのリソースおよび関連する IAM 許可が作成されるまで、最大 10 分かかることがあります。リソースのデプロイ中に CloudFormation Stack ID リンクを開いて、どのリソースがプロビジョニングされているかを確認できます。
サンプルプロジェクトのすべてのリソースが作成されると、新しいサンプルプロジェクトが [ステートマシン] ページに表示されます。
重要
CloudFormation テンプレートで使用される各サービスには標準料金が適用される場合があります。
-
ステップ 2: ステートマシンを実行する
-
[ステートマシン] ページで、サンプルプロジェクトを選択します。
-
サンプルプロジェクトページで、[実行を開始] を選択します。
-
[実行を開始] ダイアログボックスで、以下の操作を行います。
-
(オプション) 実行を識別するには、[名前] ボックスに名前を指定します。デフォルトでは、Step Functions は自動的に一意の実行名を生成します。
注記
Step Functions では、ステートマシン、実行、アクティビティの名前と、非 ASCII 文字を含むラベルを作成できます。これらの非ASCII名はAmazonでは機能しません。 CloudWatch CloudWatch メトリクスを追跡できるようにするには、ASCII 文字のみを使用する名前を選択してください。
-
(オプション) [入力] ボックスに、JSON 形式の入力値を入力してワークフローを実行します。
[デモの実行] を選択した場合、実行入力を入力する必要はありません。
注記
デプロイしたデモプロジェクトに事前入力された実行入力データが含まれている場合は、その入力を使用してステートマシンを実行します。
-
[実行のスタート] を選択します。
-
Step Functions コンソールから実行 ID のタイトルが付いたページが表示されます。このページは、[実行の詳細] ページと呼ばれます。このページでは、実行の進行中または完了後に実行結果を確認できます。
実行結果を確認するには、[グラフビュー] で個々の状態を選択し、ステップの詳細 ペインの個々のタブを選択すると、入力、出力、定義などの各状態の詳細がそれぞれ表示されます。[実行の詳細] ページに表示できる実行情報の詳細については、「[実行の詳細] ページ - インターフェイスの概要」を参照してください。
-
ステートマシンのコード例
このサンプルプロジェクトのステートマシンは、ETL InputPath ロジックをそれらのリソースに直接渡し、Amazon Redshift Data API AWS Lambda を使用して非同期で実行することでと統合されます。
このステートマシンの例を参照して、Step Functions が Amazon Redshift データ API AWS Lambda をどのように制御するかを確認してください。
AWS Step Functions AWS 他のサービスを制御する方法の詳細については、を参照してくださいAWS Step Functions 他のサービスとの併用。
{
"Comment": "A simple ETL workflow for loading dimension and fact tables",
"StartAt": "InitializeCheckCluster",
"States": {
"InitializeCheckCluster": {
"Type": "Pass",
"Next": "GetStateOfCluster",
"Result": {
"input": {
"redshift_cluster_id": "cfn36-redshiftcluster-AKIAI44QH8DHBEXAMPLE",
"operation": "status"
}
}
},
"GetStateOfCluster": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftOperations-AKIAIOSFODNN7EXAMPLE",
"TimeoutSeconds": 180,
"HeartbeatSeconds": 60,
"Next": "IsClusterAvailable",
"InputPath": "$",
"ResultPath": "$.clusterStatus"
},
"IsClusterAvailable": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.clusterStatus",
"StringEquals": "available",
"Next": "InitializeBuildDB"
},
{
"Variable": "$.clusterStatus",
"StringEquals": "paused",
"Next": "InitializeResumeCluster"
},
{
"Variable": "$.clusterStatus",
"StringEquals": "unavailable",
"Next": "ClusterUnavailable"
},
{
"Variable": "$.clusterStatus",
"StringEquals": "resuming",
"Next": "ClusterWait"
}
]
},
"ClusterWait": {
"Type": "Wait",
"Seconds": 720,
"Next": "InitializeCheckCluster"
},
"InitializeResumeCluster": {
"Type": "Pass",
"Next": "ResumeCluster",
"Result": {
"input": {
"redshift_cluster_id": "cfn36-redshiftcluster-AKIAI44QH8DHBEXAMPLE",
"operation": "resume"
}
}
},
"ResumeCluster": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftOperations-AKIAIOSFODNN7EXAMPLE",
"TimeoutSeconds": 180,
"HeartbeatSeconds": 60,
"Next": "ClusterWait",
"InputPath": "$",
"ResultPath": "$"
},
"InitializeBuildDB": {
"Type": "Pass",
"Next": "BuildDB",
"Result": {
"input": {
"redshift_cluster_id": "cfn36-redshiftcluster-AKIAI44QH8DHBEXAMPLE",
"redshift_database": "dev",
"redshift_user": "awsuser",
"redshift_schema": "tpcds",
"action": "build_database",
"sql_statement": [
"create schema if not exists {0} authorization {1};",
"create table if not exists {0}.customer",
"(c_customer_sk int4 not null encode az64",
",c_customer_id char(16) not null encode zstd",
",c_current_addr_sk int4 encode az64",
",c_first_name char(20) encode zstd",
",c_last_name char(30) encode zstd",
",primary key (c_customer_sk)",
") distkey(c_customer_sk);",
"--",
"create table if not exists {0}.customer_address",
"(ca_address_sk int4 not null encode az64",
",ca_address_id char(16) not null encode zstd",
",ca_state char(2) encode zstd",
",ca_zip char(10) encode zstd",
",ca_country varchar(20) encode zstd",
",primary key (ca_address_sk)",
") distkey(ca_address_sk);",
"--",
"create table if not exists {0}.date_dim",
"(d_date_sk integer not null encode az64",
",d_date_id char(16) not null encode zstd",
",d_date date encode az64",
",d_day_name char(9) encode zstd",
",primary key (d_date_sk)",
") diststyle all;",
"--",
"create table if not exists {0}.item",
"(i_item_sk int4 not null encode az64",
",i_item_id char(16) not null encode zstd",
",i_rec_start_date date encode az64",
",i_rec_end_date date encode az64",
",i_current_price numeric(7,2) encode az64",
",i_category char(50) encode zstd",
",i_product_name char(50) encode zstd",
",primary key (i_item_sk)",
") distkey(i_item_sk) sortkey(i_category);",
"--",
"create table if not exists {0}.store_sales",
"(ss_sold_date_sk int4",
",ss_item_sk int4 not null encode az64",
",ss_customer_sk int4 encode az64",
",ss_addr_sk int4 encode az64",
",ss_store_sk int4 encode az64",
",ss_ticket_number int8 not null encode az64",
",ss_quantity int4 encode az64",
",ss_net_paid numeric(7,2) encode az64",
",ss_net_profit numeric(7,2) encode az64",
",primary key (ss_item_sk, ss_ticket_number)",
") distkey(ss_item_sk) sortkey(ss_sold_date_sk);"
]
}
}
},
"BuildDB": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE",
"TimeoutSeconds": 180,
"HeartbeatSeconds": 60,
"Next": "GetBuildDBStatus",
"InputPath": "$",
"ResultPath": "$"
},
"GetBuildDBStatus": {
"Type": "Task",
"Next": "CheckBuildDBStatus",
"Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE",
"TimeoutSeconds": 180,
"HeartbeatSeconds": 60,
"InputPath": "$",
"ResultPath": "$.status"
},
"CheckBuildDBStatus": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.status",
"StringEquals": "FAILED",
"Next": "FailBuildDB"
},
{
"Variable": "$.status",
"StringEquals": "FINISHED",
"Next": "InitializeBaselineData"
}
],
"Default": "BuildDBWait"
},
"BuildDBWait": {
"Type": "Wait",
"Seconds": 15,
"Next": "GetBuildDBStatus"
},
"FailBuildDB": {
"Type": "Fail",
"Cause": "Database Build Failed",
"Error": "Error"
},
"InitializeBaselineData": {
"Type": "Pass",
"Next": "LoadBaselineData",
"Result": {
"input": {
"redshift_cluster_id": "cfn36-redshiftcluster-AKIAI44QH8DHBEXAMPLE",
"redshift_database": "dev",
"redshift_user": "awsuser",
"redshift_schema": "tpcds",
"action": "load_baseline_data",
"sql_statement": [
"begin transaction;",
"truncate table {0}.customer;",
"insert into {0}.customer (c_customer_sk,c_customer_id,c_current_addr_sk,c_first_name,c_last_name)",
"values",
"(7550,'AAAAAAAAOHNBAAAA',9264662,'Michelle','Deaton'),",
"(37079,'AAAAAAAAHNAJAAAA',13971208,'Michael','Simms'),",
"(40626,'AAAAAAAACLOJAAAA',1959255,'Susan','Ryder'),",
"(2142876,'AAAAAAAAMJCLACAA',7644556,'Justin','Brown');",
"analyze {0}.customer;",
"--",
"truncate table {0}.customer_address;",
"insert into {0}.customer_address (ca_address_sk,ca_address_id,ca_state,ca_zip,ca_country)",
"values",
"(13971208,'AAAAAAAAIAPCFNAA','NE','63451','United States'),",
"(7644556,'AAAAAAAAMIFKEHAA','SD','58883','United States'),",
"(9264662,'AAAAAAAAGBOFNIAA','CA','99310','United States');",
"analyze {0}.customer_address;",
"--",
"truncate table {0}.item;",
"insert into {0}.item (i_item_sk,i_item_id,i_rec_start_date,i_rec_end_date,i_current_price,i_category,i_product_name)",
"values",
"(3417,'AAAAAAAAIFNAAAAA','1997-10-27',NULL,14.29,'Electronics','ationoughtesepri '),",
"(9615,'AAAAAAAAOIFCAAAA','1997-10-27',NULL,9.68,'Home','antioughtcallyn st'),",
"(3693,'AAAAAAAAMGOAAAAA','2001-03-12',NULL,2.10,'Men','prin stcallypri'),",
"(3630,'AAAAAAAAMCOAAAAA','2001-10-27',NULL,2.95,'Electronics','barpricallypri'),",
"(16506,'AAAAAAAAIHAEAAAA','2001-10-27',NULL,3.85,'Home','callybaranticallyought'),",
"(7866,'AAAAAAAAILOBAAAA','2001-10-27',NULL,12.60,'Jewelry','callycallyeingation');",
"--",
"analyze {0}.item;",
"truncate table {0}.date_dim;",
"insert into {0}.date_dim (d_date_sk,d_date_id,d_date,d_day_name)",
"values",
"(2450521,'AAAAAAAAJFEGFCAA','1997-03-13','Thursday'),",
"(2450749,'AAAAAAAANDFGFCAA','1997-10-27','Monday'),",
"(2451251,'AAAAAAAADDHGFCAA','1999-03-13','Saturday'),",
"(2451252,'AAAAAAAAEDHGFCAA','1999-03-14','Sunday'),",
"(2451981,'AAAAAAAANAKGFCAA','2001-03-12','Monday'),",
"(2451982,'AAAAAAAAOAKGFCAA','2001-03-13','Tuesday'),",
"(2452210,'AAAAAAAACPKGFCAA','2001-10-27','Saturday'),",
"(2452641,'AAAAAAAABKMGFCAA','2003-01-01','Wednesday'),",
"(2452642,'AAAAAAAACKMGFCAA','2003-01-02','Thursday');",
"--",
"analyze {0}.date_dim;",
"-- commit and End transaction",
"commit;",
"end transaction;"
]
}
}
},
"LoadBaselineData": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE",
"TimeoutSeconds": 180,
"HeartbeatSeconds": 60,
"Next": "GetBaselineData",
"InputPath": "$",
"ResultPath": "$"
},
"GetBaselineData": {
"Type": "Task",
"Next": "CheckBaselineData",
"Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE",
"TimeoutSeconds": 180,
"HeartbeatSeconds": 60,
"InputPath": "$",
"ResultPath": "$.status"
},
"CheckBaselineData": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.status",
"StringEquals": "FAILED",
"Next": "FailLoadBaselineData"
},
{
"Variable": "$.status",
"StringEquals": "FINISHED",
"Next": "ParallelizeDimensionLoadJob"
}
],
"Default": "BaselineDataWait"
},
"BaselineDataWait": {
"Type": "Wait",
"Seconds": 20,
"Next": "GetBaselineData"
},
"FailLoadBaselineData": {
"Type": "Fail",
"Cause": "Load Baseline Data Failed",
"Error": "Error"
},
"ParallelizeDimensionLoadJob": {
"Type": "Parallel",
"Next": "InitializeSalesFactLoadJob",
"ResultPath": "$.status",
"Branches": [
{
"StartAt": "InitializeCustomerAddressDimensionLoadJob",
"States": {
"InitializeCustomerAddressDimensionLoadJob": {
"Type": "Pass",
"Next": "ExecuteCustomerAddressDimensionLoadJob",
"Result": {
"input": {
"redshift_cluster_id": "cfn36-redshiftcluster-AKIAI44QH8DHBEXAMPLE",
"redshift_database": "dev",
"redshift_user": "awsuser",
"redshift_schema": "tpcds",
"action": "load_customer_address",
"sql_statement": [
"begin transaction;",
"/* Create a staging table to hold the input data. Staging table is created with BACKUP NO option for faster inserts and also data temporary */",
"drop table if exists {0}.stg_customer_address;",
"create table if not exists {0}.stg_customer_address",
"(ca_address_id varchar(16) encode zstd",
",ca_state varchar(2) encode zstd",
",ca_zip varchar(10) encode zstd",
",ca_country varchar(20) encode zstd",
")",
"backup no",
"diststyle even;",
"/* Ingest data from source */",
"insert into {0}.stg_customer_address (ca_address_id,ca_state,ca_zip,ca_country)",
"values",
"('AAAAAAAACFBBAAAA','NE','','United States'),",
"('AAAAAAAAGAEFAAAA','NE','61749','United States'),",
"('AAAAAAAAPJKKAAAA','OK','','United States'),",
"('AAAAAAAAMIHGAAAA','AL','','United States');",
"/* Perform UPDATE for existing data with refreshed attribute values */",
"update {0}.customer_address",
" set ca_state = stg_customer_address.ca_state,",
" ca_zip = stg_customer_address.ca_zip,",
" ca_country = stg_customer_address.ca_country",
" from {0}.stg_customer_address",
" where customer_address.ca_address_id = stg_customer_address.ca_address_id;",
"/* Perform insert for new rows */",
"insert into {0}.customer_address",
"(ca_address_sk",
",ca_address_id",
",ca_state",
",ca_zip",
",ca_country",
")",
"with max_customer_address_sk as",
"(select max(ca_address_sk) max_ca_address_sk",
"from {0}.customer_address)",
"select row_number() over (order by stg_customer_address.ca_address_id) + max_customer_address_sk.max_ca_address_sk as ca_address_sk",
",stg_customer_address.ca_address_id",
",stg_customer_address.ca_state",
",stg_customer_address.ca_zip",
",stg_customer_address.ca_country",
"from {0}.stg_customer_address,",
"max_customer_address_sk",
"where stg_customer_address.ca_address_id not in (select customer_address.ca_address_id from {0}.customer_address);",
"/* Commit and End transaction */",
"commit;",
"end transaction;"
]
}
}
},
"ExecuteCustomerAddressDimensionLoadJob": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE",
"TimeoutSeconds": 180,
"HeartbeatSeconds": 60,
"Next": "GetCustomerAddressDimensionLoadStatus",
"InputPath": "$",
"ResultPath": "$"
},
"GetCustomerAddressDimensionLoadStatus": {
"Type": "Task",
"Next": "CheckCustomerAddressDimensionLoadStatus",
"Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE",
"TimeoutSeconds": 180,
"HeartbeatSeconds": 60,
"InputPath": "$",
"ResultPath": "$.status"
},
"CheckCustomerAddressDimensionLoadStatus": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.status",
"StringEquals": "FAILED",
"Next": "FailCustomerAddressDimensionLoad"
},
{
"Variable": "$.status",
"StringEquals": "FINISHED",
"Next": "CompleteCustomerAddressDimensionLoad"
}
],
"Default": "CustomerAddressWait"
},
"CustomerAddressWait": {
"Type": "Wait",
"Seconds": 5,
"Next": "GetCustomerAddressDimensionLoadStatus"
},
"CompleteCustomerAddressDimensionLoad": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE",
"TimeoutSeconds": 180,
"HeartbeatSeconds": 60,
"End": true
},
"FailCustomerAddressDimensionLoad": {
"Type": "Fail",
"Cause": "ETL Workflow Failed",
"Error": "Error"
}
}
},
{
"StartAt": "InitializeItemDimensionLoadJob",
"States": {
"InitializeItemDimensionLoadJob": {
"Type": "Pass",
"Next": "ExecuteItemDimensionLoadJob",
"Result": {
"input": {
"redshift_cluster_id": "cfn36-redshiftcluster-AKIAI44QH8DHBEXAMPLE",
"redshift_database": "dev",
"redshift_user": "awsuser",
"redshift_schema": "tpcds",
"action": "load_item",
"sql_statement": [
"begin transaction;",
"/* Create a staging table to hold the input data. Staging table is created with BACKUP NO option for faster inserts and also data temporary */",
"drop table if exists {0}.stg_item;",
"create table if not exists {0}.stg_item",
"(i_item_id varchar(16) encode zstd",
",i_rec_start_date date encode zstd",
",i_rec_end_date date encode zstd",
",i_current_price numeric(7,2) encode zstd",
",i_category varchar(50) encode zstd",
",i_product_name varchar(50) encode zstd",
")",
"backup no",
"diststyle even;",
"/* Ingest data from source */",
"insert into {0}.stg_item",
"(i_item_id,i_rec_start_date,i_rec_end_date,i_current_price,i_category,i_product_name)",
"values",
"('AAAAAAAAABJBAAAA','2000-10-27',NULL,4.10,'Books','ationoughtesecally'),",
"('AAAAAAAAOPKBAAAA','2001-10-27',NULL,4.22,'Books','ableoughtn stcally'),",
"('AAAAAAAAHGPAAAAA','1997-10-27',NULL,29.30,'Books','priesen stpri'),",
"('AAAAAAAAICMAAAAA','2001-10-27',NULL,1.93,'Books','eseoughtoughtpri'),",
"('AAAAAAAAGPGBAAAA','2001-10-27',NULL,9.96,'Books','bareingeinganti'),",
"('AAAAAAAANBEBAAAA','1997-10-27',NULL,2.25,'Music','n steseoughtanti'),",
"('AAAAAAAACLAAAAAA','2001-10-27',NULL,1.71,'Home','bareingought'),",
"('AAAAAAAAOBBDAAAA','2001-10-27',NULL,5.55,'Books','callyationantiableought');",
"/************************************************************************************************************************",
"** Type 2 is maintained for i_current_price column.",
"** Update all attributes for the item when the price is not changed",
"** Sunset existing active item record with current i_rec_end_date and insert a new record when the price does not match",
"*************************************************************************************************************************/",
"update {0}.item",
" set i_category = stg_item.i_category,",
" i_product_name = stg_item.i_product_name",
" from {0}.stg_item",
" where item.i_item_id = stg_item.i_item_id",
" and item.i_rec_end_date is null",
" and item.i_current_price = stg_item.i_current_price;",
"insert into {0}.item",
"(i_item_sk",
",i_item_id",
",i_rec_start_date",
",i_rec_end_date",
",i_current_price",
",i_category",
",i_product_name",
")",
"with max_item_sk as",
"(select max(i_item_sk) max_item_sk",
" from {0}.item)",
"select row_number() over (order by stg_item.i_item_id) + max_item_sk as i_item_sk",
" ,stg_item.i_item_id",
" ,trunc(sysdate) as i_rec_start_date",
" ,null as i_rec_end_date",
" ,stg_item.i_current_price",
" ,stg_item.i_category",
" ,stg_item.i_product_name",
" from {0}.stg_item, {0}.item, max_item_sk",
" where item.i_item_id = stg_item.i_item_id",
" and item.i_rec_end_date is null",
" and item.i_current_price <> stg_item.i_current_price;",
"/* Sunset penultimate records that were inserted as type 2 */",
"update {0}.item",
" set i_rec_end_date = trunc(sysdate)",
" from {0}.stg_item",
" where item.i_item_id = stg_item.i_item_id",
" and item.i_rec_end_date is null",
" and item.i_current_price <> stg_item.i_current_price;",
"/* Commit and End transaction */",
"commit;",
"end transaction;"
]
}
}
},
"ExecuteItemDimensionLoadJob": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE",
"TimeoutSeconds": 180,
"HeartbeatSeconds": 60,
"Next": "GetItemDimensionLoadStatus",
"InputPath": "$",
"ResultPath": "$"
},
"GetItemDimensionLoadStatus": {
"Type": "Task",
"Next": "CheckItemDimensionLoadStatus",
"Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE",
"TimeoutSeconds": 180,
"HeartbeatSeconds": 60,
"InputPath": "$",
"ResultPath": "$.status"
},
"CheckItemDimensionLoadStatus": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.status",
"StringEquals": "FAILED",
"Next": "FailItemDimensionLoad"
},
{
"Variable": "$.status",
"StringEquals": "FINISHED",
"Next": "CompleteItemDimensionLoad"
}
],
"Default": "ItemWait"
},
"ItemWait": {
"Type": "Wait",
"Seconds": 5,
"Next": "GetItemDimensionLoadStatus"
},
"CompleteItemDimensionLoad": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE",
"TimeoutSeconds": 180,
"HeartbeatSeconds": 60,
"End": true
},
"FailItemDimensionLoad": {
"Type": "Fail",
"Cause": "ETL Workflow Failed",
"Error": "Error"
}
}
}
]
},
"InitializeSalesFactLoadJob": {
"Type": "Pass",
"Next": "ExecuteSalesFactLoadJob",
"Result": {
"input": {
"redshift_cluster_id": "cfn36-redshiftcluster-AKIAI44QH8DHBEXAMPLE",
"redshift_database": "dev",
"redshift_user": "awsuser",
"redshift_schema": "tpcds",
"snapshot_date": "2003-01-02",
"action": "load_sales_fact",
"sql_statement": [
"begin transaction;",
"/* Create a stg_store_sales staging table */",
"drop table if exists {0}.stg_store_sales;",
"create table {0}.stg_store_sales",
"(sold_date date encode zstd",
",i_item_id varchar(16) encode zstd",
",c_customer_id varchar(16) encode zstd",
",ca_address_id varchar(16) encode zstd",
",ss_ticket_number integer encode zstd",
",ss_quantity integer encode zstd",
",ss_net_paid numeric(7,2) encode zstd",
",ss_net_profit numeric(7,2) encode zstd",
")",
"backup no",
"diststyle even;",
"/* Ingest data from source */",
"insert into {0}.stg_store_sales",
"(sold_date,i_item_id,c_customer_id,ca_address_id,ss_ticket_number,ss_quantity,ss_net_paid,ss_net_profit)",
"values",
"('2003-01-02','AAAAAAAAIFNAAAAA','AAAAAAAAOHNBAAAA','AAAAAAAAGBOFNIAA',1403191,13,5046.37,150.97),",
"('2003-01-02','AAAAAAAAIFNAAAAA','AAAAAAAAOHNBAAAA','AAAAAAAAGBOFNIAA',1403191,13,2103.72,-124.08),",
"('2003-01-02','AAAAAAAAILOBAAAA','AAAAAAAAOHNBAAAA','AAAAAAAAGBOFNIAA',1403191,13,959.10,-1304.70),",
"('2003-01-02','AAAAAAAAILOBAAAA','AAAAAAAAHNAJAAAA','AAAAAAAAIAPCFNAA',1403191,13,962.65,-475.80),",
"('2003-01-02','AAAAAAAAMCOAAAAA','AAAAAAAAHNAJAAAA','AAAAAAAAIAPCFNAA',1201746,17,111.60,-241.65),",
"('2003-01-02','AAAAAAAAMCOAAAAA','AAAAAAAAHNAJAAAA','AAAAAAAAIAPCFNAA',1201746,17,4013.02,-1111.48),",
"('2003-01-02','AAAAAAAAMCOAAAAA','AAAAAAAAMJCLACAA','AAAAAAAAMIFKEHAA',1201746,17,2689.12,-5572.28),",
"('2003-01-02','AAAAAAAAMGOAAAAA','AAAAAAAAMJCLACAA','AAAAAAAAMIFKEHAA',193971,18,1876.89,-556.35);",
"/* Delete any rows from target store_sales for the input date for idempotency */",
"delete from {0}.store_sales where ss_sold_date_sk in (select d_date_sk from {0}.date_dim where d_date='{1}');",
"/* Insert data from staging table to the target table */",
"insert into {0}.store_sales",
"(ss_sold_date_sk",
",ss_item_sk",
",ss_customer_sk",
",ss_addr_sk",
",ss_ticket_number",
",ss_quantity",
",ss_net_paid",
",ss_net_profit",
")",
"select date_dim.d_date_sk ss_sold_date_sk",
" ,item.i_item_sk ss_item_sk",
" ,customer.c_customer_sk ss_customer_sk",
" ,customer_address.ca_address_sk ss_addr_sk",
" ,ss_ticket_number",
" ,ss_quantity",
" ,ss_net_paid",
" ,ss_net_profit",
" from {0}.stg_store_sales as store_sales",
" inner join {0}.date_dim on store_sales.sold_date = date_dim.d_date",
" left join {0}.item on store_sales.i_item_id = item.i_item_id and item.i_rec_end_date is null",
" left join {0}.customer on store_sales.c_customer_id = customer.c_customer_id",
" left join {0}.customer_address on store_sales.ca_address_id = customer_address.ca_address_id;",
"/* Drop staging table */",
"drop table if exists {0}.stg_store_sales;",
"/* Commit and End transaction */",
"commit;",
"end transaction;"
]
}
}
},
"ExecuteSalesFactLoadJob": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE",
"TimeoutSeconds": 180,
"HeartbeatSeconds": 60,
"Next": "GetSalesFactLoadStatus",
"InputPath": "$",
"ResultPath": "$"
},
"GetSalesFactLoadStatus": {
"Type": "Task",
"Next": "CheckSalesFactLoadStatus",
"Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE",
"TimeoutSeconds": 180,
"HeartbeatSeconds": 60,
"InputPath": "$",
"ResultPath": "$.status"
},
"CheckSalesFactLoadStatus": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.status",
"StringEquals": "FAILED",
"Next": "FailSalesFactLoad"
},
{
"Variable": "$.status",
"StringEquals": "FINISHED",
"Next": "SalesETLPipelineComplete"
}
],
"Default": "SalesWait"
},
"SalesWait": {
"Type": "Wait",
"Seconds": 5,
"Next": "GetSalesFactLoadStatus"
},
"FailSalesFactLoad": {
"Type": "Fail",
"Cause": "ETL Workflow Failed",
"Error": "Error"
},
"ClusterUnavailable": {
"Type": "Fail",
"Cause": "Redshift cluster is not available",
"Error": "Error"
},
"SalesETLPipelineComplete": {
"Type": "Pass",
"Next": "ValidateSalesMetric",
"Result": {
"input": {
"redshift_cluster_id": "cfn36-redshiftcluster-AKIAI44QH8DHBEXAMPLE",
"redshift_database": "dev",
"redshift_user": "awsuser",
"redshift_schema": "tpcds",
"snapshot_date": "2003-01-02",
"action": "validate_sales_metric",
"sql_statement": [
"select 1/count(1) from {0}.store_sales where ss_sold_date_sk in (select d_date_sk from {0}.date_dim where d_date='{1}')"
]
}
}
},
"ValidateSalesMetric": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE",
"TimeoutSeconds": 180,
"HeartbeatSeconds": 60,
"Next": "GetValidateSalesMetricStatus",
"InputPath": "$",
"ResultPath": "$"
},
"GetValidateSalesMetricStatus": {
"Type": "Task",
"Next": "CheckValidateSalesMetricStatus",
"Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE",
"TimeoutSeconds": 180,
"HeartbeatSeconds": 60,
"InputPath": "$",
"ResultPath": "$.status"
},
"CheckValidateSalesMetricStatus": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.status",
"StringEquals": "FAILED",
"Next": "FailSalesMetricValidation"
},
{
"Variable": "$.status",
"StringEquals": "FINISHED",
"Next": "DataValidationComplete"
}
],
"Default": "SalesValidationWait"
},
"SalesValidationWait": {
"Type": "Wait",
"Seconds": 5,
"Next": "GetValidateSalesMetricStatus"
},
"FailSalesMetricValidation": {
"Type": "Fail",
"Cause": "Data Validation Failed",
"Error": "Error"
},
"DataValidationComplete": {
"Type": "Pass",
"Next": "InitializePauseCluster"
},
"InitializePauseCluster": {
"Type": "Pass",
"Next": "PauseCluster",
"Result": {
"input": {
"redshift_cluster_id": "cfn36-redshiftcluster-AKIAI44QH8DHBEXAMPLE",
"operation": "pause"
}
}
},
"PauseCluster": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftOperations-AKIAIOSFODNN7EXAMPLE",
"TimeoutSeconds": 180,
"HeartbeatSeconds": 60,
"Next": "PauseClusterWait",
"InputPath": "$",
"ResultPath": "$.clusterStatus",
"Catch": [
{
"ErrorEquals": [
"States.ALL"
],
"Next": "ClusterPausedComplete"
}
]
},
"InitializeCheckPauseCluster": {
"Type": "Pass",
"Next": "GetStateOfPausedCluster",
"Result": {
"input": {
"redshift_cluster_id": "cfn36-redshiftcluster-AKIAI44QH8DHBEXAMPLE",
"operation": "status"
}
}
},
"GetStateOfPausedCluster": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftOperations-AKIAIOSFODNN7EXAMPLE",
"TimeoutSeconds": 180,
"HeartbeatSeconds": 60,
"Next": "IsClusterPaused",
"InputPath": "$",
"ResultPath": "$.clusterStatus"
},
"IsClusterPaused": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.clusterStatus",
"StringEquals": "available",
"Next": "InitializePauseCluster"
},
{
"Variable": "$.clusterStatus",
"StringEquals": "paused",
"Next": "ClusterPausedComplete"
},
{
"Variable": "$.clusterStatus",
"StringEquals": "unavailable",
"Next": "ClusterUnavailable"
},
{
"Variable": "$.clusterStatus",
"StringEquals": "resuming",
"Next": "PauseClusterWait"
}
]
},
"PauseClusterWait": {
"Type": "Wait",
"Seconds": 720,
"Next": "InitializeCheckPauseCluster"
},
"ClusterPausedComplete": {
"Type": "Pass",
"End": true
}
}
}
Step Functions AWS を他のサービスで使用する場合の IAM の設定方法については、を参照してください統合サービスの IAM ポリシー。
IAM の例
サンプルプロジェクトで生成されたこれらのサンプル AWS Identity and Access Management (IAM) ポリシーには、ステートマシンと関連リソースを実行するのに必要な最小限の権限が含まれています。IAM ポリシーで必要なアクセス許可のみを含めることをお勧めします。
{
"Version": "2012-10-17",
"Statement": [
{
"Action": [
"lambda:InvokeFunction"
],
"Resource": [
"arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftDataApi-AIDACKCEVSQ6C2EXAMPLE",
"arn:aws:lambda:us-east-1:111122223333:function:CFN36-RedshiftOperations-AKIAIOSFODNN7EXAMPLE"
],
"Effect": "Allow"
}
]
}
Step Functions AWS を他のサービスで使用する場合の IAM の設定方法については、を参照してください統合サービスの IAM ポリシー。