使用分散式地圖複製大規模 CSV 資料 - AWS Step Functions

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

使用分散式地圖複製大規模 CSV 資料

本教學課程可協助您開始在分散式模式下使用Map狀態。設置為分佈式的Map狀態稱為分佈式地圖狀態。您可以在工作流程中使用分散式地圖狀態來迭代大規模的 Amazon S3 資料來源。狀Map態會將每個版序當做子工作流程執行來執行,以達到高並行性。如需分散式模式的詳細資訊,請參閱分散式模式中的對應狀態

在本教學中,您會使用分散式地圖狀態在 Amazon S3 儲存貯體中迭代 CSV 檔案。然後,您將其內容與子工作流程執行的 ARN 一起傳回到另一個 Amazon S3 儲存貯體。您可以先在工作流程 Studio 中建立工作流程原型。接下來,您將Map狀態的處理模式設定為 [分散式]、將 CSV 檔案指定為資料集,並將其位置提供給Map狀態。您也可以指定子工作流程執行的工作流程類型,分散式對應狀態開始為 Express

除了這些設定之外,您還可以針對本教學課程中使用的範例工作流程指定其他組態,例如同時執行子項工作流程的最大數目,以及匯出Map結果的位置。

必要條件

  • 將 CSV 檔案上傳到 Amazon S3 儲存貯體。您必須在 CSV 檔案中定義標題列。如需有關 CSV 檔案的大小限制以及如何指定標頭列的資訊,請參閱Amazon S3 儲存貯體中的 CSV 檔案

  • 建立另一個 Amazon S3 儲存貯體和該儲存貯體內的資料夾,以將狀Map態結果匯出到。

重要

確保您的 Amazon S3 儲存貯體 AWS 帳戶 與狀態機器位於相 AWS 區域 同的狀態機器下。

步驟 1:建立工作流程原型

在此步驟中,您可以使用工作流程 Studio 建立工作流程的原型。工作流程 Studio 是 Step Functions 控制台中提供的可視化工作流程設計器。您可以分別從「流程」和「動作」 標籤中選擇所需的狀態和 API 動作。您將使用拖放工作流 Studio 的功能來創建工作流原型。

  1. 開啟 Step Functions 主控台,然後選擇建立狀態機器

  2. 在「選擇範本」對話方塊中,選取「空白」。

  3. 選擇選取。這會在設計模式中開啟工作流程工作室

  4. 從「程」索引標籤中,將地圖狀態拖放至標示為「在此處拖曳第一個狀態」的空白狀態

  5. 組態索引標籤中,對於狀態名稱,輸入Process data

  6. 從「動」索引標籤中,將「AWS Lambda 叫用 API」動作拖放到「處理程序」資料狀態中。

  7. AWS Lambda 呼叫狀態重新命名為Process CSV data

第 2 步:配置地圖狀態的必填字段

在此步驟中,您可以設定「分散式對應」狀態的下列必要欄位:

  • ItemReader— 指定資料集及其Map狀態可從中讀取輸入的位置。

  • ItemProcessor— 指定下列值:

    • ProcessorConfigExecutionTypeMode和設定為DISTRIBUTEDEXPRESS分別。這會設定Map狀態的處理模式和「分散式對映」狀態啟動之子工作流程執行的工作流程類型。

    • StartAt—「地圖」工作流程中的第一個狀態。

    • States— 定義 Map 工作流程,這是每個子工作流程執行中要重複的一組步驟。

  • ResultWriter— 指定 Step Functions 寫入分散式地圖狀態結果的 Amazon S3 位置。

    重要

    確保用於匯出 Map Run 結果的 Amazon S3 儲存貯體與您的狀態機器處於相 AWS 區域 同 AWS 帳戶 的狀態機器下。否則,您的狀態機執行將失敗並顯示States.ResultWriterFailed錯誤。

若要設定必要欄位:
  1. 選擇處理資料狀態,然後在組態索引標籤中執行下列動作:

    1. 對於「處理」模式,請選擇「分散

    2. 對於項目來源,請選擇 Amazon S3,然後從 S3 項目來源下拉式清單中選擇 S3 中的 CSV 檔案

    3. 執行下列動作以指定 CSV 檔案的 Amazon S3 位置:

      1. 對於 S3 物件,請從下拉式清單中選取 [輸入儲存貯體和金鑰]。

      2. 在儲存體中,輸入包含 CSV 檔案的 Amazon S3 儲存貯體的名稱。例如 sourceBucket

      3. 金鑰中,輸入您儲存 CSV 檔案的 Amazon S3 物件名稱。您也必須在此欄位中指定 CSV 檔案的名稱。例如 csvDataset/ratings.csv

    4. 對於 CSV 檔案,您還必須指定欄標題的位置。若要這麼做,請選擇 [其他組態],然後針對 CSV 標頭位置保留預設選取 [第一列] (如果 CSV 檔案的第一列是標頭)。否則,請選擇「定」以在狀態機定義中指定標頭。如需詳細資訊,請參閱 ReaderConfig

    5. 針對子項執行類型,選擇快速

  2. 匯出位置中,若要將地圖執行結果匯出到特定的 Amazon S3 位置,請選擇將地圖狀態的輸出匯出到 Amazon S3

  3. 請執行下列操作:

    1. 對於 S3 儲存貯體,請從下拉式清單中選擇 [輸入儲存貯體名稱和前綴]

    2. 在儲存體中,輸入您要將結果匯出到的 Amazon S3 儲存貯體的名稱。例如 mapOutputs

    3. 在「字首」中,輸入您要儲存結果的資料夾名稱。例如 resultData

步驟 3:設定其他選項

除了分散式貼圖狀態的必要設定之外,您還可以指定其他選項。這些可能包括同時執行子項工作流程的最大數目,以及將Map狀態結果匯出至的位置。

  1. 選擇處理資料狀態。然後,在項目來源中,選擇其他組態

  2. 請執行下列操作:

    1. 選擇 [修改項目],為每個子工作流程執行指定自訂 JSON 輸入。 ItemSelector

    2. 輸入下列 JSON 輸入:

      { "index.$": "$$.Map.Item.Index", "value.$": "$$.Map.Item.Value" }

      若要取得有關如何建立自訂輸入的資訊,請參閱〈〉ItemSelector

  3. 在「執行階段」設定中,對於並行限制,指定「分散式對應」狀態可以啟動的並行子工作流程執行數目。例如,​輸入 100

  4. 在瀏覽器上開啟新視窗或索引標籤,並完成您將在此工作流程中使用的 Lambda 函數的設定,如中所述步驟 4:設定 Lambda 函數

步驟 4:設定 Lambda 函數

重要

確保您的 Lambda 函數與狀態機 AWS 區域 相同。

  1. 開啟 Lambda 主控台,然後選擇建立函數

  2. Create function (建立函數) 頁面上,選擇 Author from scratch (從頭開始撰寫)

  3. 在「基本資訊」區段中,設定 Lambda 函數:

    1. 針對 函數名稱 ,請輸入 distributedMapLambda

    2. 針對 執行時間,請選擇 Node.js 16.x

    3. 保留所有默認選擇並選擇創建功能

    4. 建立 Lambda 函數後,複製頁面右上角顯示的函數的 Amazon 資源名稱 (ARN)。您需要在工作流程原型中提供此功能。若要複製 ARN,請按一下 
                                    copy Amazon Resource Name
                                。以下是 ARN 的示例:

      arn:aws:lambda:us-east-2:123456789012:function:distributedMapLambda
  4. 複製 Lambda 函數的下列程式碼,並將其貼到distributedMapLambda頁面的程式碼來源區段中。

    exports.handler = async function(event, context) { console.log("Received Input:\n", event); return { 'statusCode' : 200, 'inputReceived' : event //returns the input that it received } };
  5. 選擇部署。部署函數後,請選擇「測試」以查看 Lambda 函數的輸出。

步驟 5:更新工作流程原型

在 Step Functions 數主控台中,您將更新工作流程以新增 Lambda 函數的 ARN。

  1. 返回建立工作流程原型的標籤或視窗。

  2. 選擇「處理 CSV 資料」步驟,然後在「組態」索引標籤中執行下列操作:

    1. 針對整合類型,選擇最佳化

    2. 函數名稱中,開始輸入 Lambda 函數的名稱。從出現的下拉式清單中選擇函數,或選擇 [輸入函數名稱] 並提供 Lambda 函數 ARN。

步驟 6:檢閱自動產生的 Amazon 州語言定義並儲存工作流程

當您將狀態從「動作」和「流程」索引標籤拖放到畫布上時,Work flow Studio 會自動即時編寫工作流程的 Amazon States 語言定義。您可以視需要編輯此定義。

  1. (選擇性) 在Inspector面板上選擇「定義」並檢視狀態機定義。

    提示

    您也可以在工作流程工作室中檢視 ASL 定義。程式碼編輯器在程式碼編輯器中,您也可以編輯工作流程的 ASL 定義。

    下列範例程式碼顯示為您的工作流程自動產生的 Amazon States 語言定義。

    { "Comment": "Using Map state in Distributed mode", "StartAt": "Process data", "States": { "Process data": { "Type": "Map", "MaxConcurrency": 100, "ItemReader": { "ReaderConfig": { "InputType": "CSV", "CSVHeaderLocation": "FIRST_ROW" }, "Resource": "arn:aws:states:::s3:getObject", "Parameters": { "Bucket": "sourceBucket", "Key": "csvDataset/ratings.csv" } }, "ItemProcessor": { "ProcessorConfig": { "Mode": "DISTRIBUTED", "ExecutionType": "EXPRESS" }, "StartAt": "Process CSV data", "States": { "Process CSV data": { "Type": "Task", "Resource": "arn:aws:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "Payload.$": "$", "FunctionName": "arn:aws:lambda:us-east-2:123456789012:function:distributedMapLambda" }, "End": true } } }, "Label": "Processdata", "End": true, "ResultWriter": { "Resource": "arn:aws:states:::s3:putObject", "Parameters": { "Bucket": "mapOutputs", "Prefix": "resultData" } }, "ItemSelector": { "index.$": "$$.Map.Item.Index", "value.$": "$$.Map.Item.Value" } } } }
  2. 指定狀態機的名稱。若要執行此操作,請選擇的預設狀態機器名稱旁邊的編輯圖示MyStateMachine。然後,在 [狀態機器組態] 中,在 [狀態機器名稱] 方塊中指定名稱

    針對本教學課程,輸入名稱 DistributedMapDemo

  3. (選擇性) 在狀態機器組態中,指定其他工作流程設定,例如狀態機器類型及其執行角色。

    在本教學課程中,請保留狀態機器組態中的所有預設選項。

  4. 在 [確認角色建立] 對話方塊中,選擇 [確認] 以繼續。

    您也可以選擇 [檢視角色設定] 以返回 [狀態機器組態]。

    注意

    如果您刪除 Step Functions 建立的 IAM 角色,則 Step Functions 稍後無法重新建立。同樣地,如果您修改角色 (例如,透過從 IAM 政策中的主體移除 Step Functions),Step Functions 稍後無法還原其原始設定。

步驟 7:運行狀態機

執行是您執行工作流程以執行工作的狀態機器執行個體。

  1. DistributedMapDemo頁面上,選擇 [開始執行]。

  2. 在 [開始執行] 對話方塊中,執行下列動作:

    1. (選擇性) 若要識別您的執行項目,您可以在「名稱」(Name) 方塊中指定執行項目的名稱。依預設,Step Functions 會自動產生唯一的執行名稱。

      注意

      Step Functions 可讓您為包含非 ASCII 字元的狀態機器、執行項目、活動和標籤建立名稱。這些非 ASCII 名稱不適用於 Amazon CloudWatch。若要確保您可以追蹤 CloudWatch 量度,請選擇僅使用 ASCII 字元的名稱。

    2. (選擇性) 在「入」方塊中,以 JSON 格式輸入輸入值以執行工作流程。

    3. 選擇 Start execution (開始執行)

    4. Step Functions 主控台會將您導向至標題為執行 ID 的頁面。此頁面稱為「執行詳細資訊」頁面。在此頁面上,您可以在執行進行時或完成後複查執行結果。

      若要複查執行結果,請在「圖形」檢視中選擇個別狀態,然後選擇步驟詳情窗格上的個別索引標籤,分別檢視每個狀態的詳細資訊,包括輸入、輸出和定義。如需有關可在「執行詳細資訊」頁面檢視之執行資訊的詳細資訊,請參閱執行詳細資訊頁面 — 介面概觀

    例如,選擇Map狀態,然後選擇 [對應執行] 以開啟 [對映執行詳細資訊] 頁面。在此頁面上,您可以檢視分散式對應狀態的所有執行詳細資訊,以及其啟動的子工作流程執行項目。如需有關此頁面的資訊,請參閱檢查地圖運行