分散マップを使用した大規模 CSV データのコピー - AWS Step Functions

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

分散マップを使用した大規模 CSV データのコピー

このチュートリアルでは、分散モードで Map 状態を使用開始する方法を説明します。[分散] に設定された Map ステートは、分散マップ状態と呼ばれます。ワークフローで分散マップ状態を使用し、大規模な Amazon S3 データソースを反復処理します。Map 状態は、それぞれの反復を子ワークフロー実行として実行するため、高い同時実行性が得られます。分散モードについての詳細は、「分散モードでのマップステート」を参照してください。

このチュートリアルでは、分散マップ状態を使用して、Amazon S3 バケット内の CSV ファイルを反復処理します。次に、その内容と子ワークフロー実行の ARN を別の Amazon S3 バケットに返します。最初に、Workflow Studio でワークフロープロトタイプを作成します。次に、Map 状態の処理モード を [分散] に設定し、CSV ファイルをデータセットとして指定し、その場所を Map 状態に提供します。また、[Express] として分散マップ状態を開始する子ワークフロー実行のワークフロータイプを指定します。

これらの設定に加えて、このチュートリアルで使用するワークフローの例では、子ワークフローの同時実行の最大数や Map の結果をエクスポートする場所など、その他の設定も指定します。

前提条件

  • CSV ファイルを Amazon S3 バケットにアップロードする CSV ファイル内にヘッダー行を定義する必要があります。CSV ファイルに適用されるサイズ制限とヘッダー行の指定方法については、「Amazon S3 バケット内の CSV ファイル」を参照してください。

  • 別の Amazon S3 バケットを作成し、そのバケット内に Map 状態の結果をエクスポートするフォルダを作成します。

重要

Amazon S3 AWS アカウント バケットがステートマシンと同じ下にあることを確認してください。 AWS リージョン

ステップ 1: ワークフロープロトタイプを作成する

このステップでは、Workflow Studio を使用してワークフロープロトタイプを作成します。Workflow Studio は、Step Functions コンソールで使用可能なビジュアルワークフローデザイナーです。必要な状態と API アクションを [フロー] タブと [アクション] タブのそれぞれから選択します。Workflow Studio のドラッグアンドドロップ機能を使用して、ワークフロープロトタイプを作成します。

  1. Step Functions コンソールを開き、[ステートマシンの作成] を選択します。

  2. [テンプレートを選択] ダイアログボックスで [空白] を選択します。

  3. [選択] を選びます。これにより、デザインモード で Workflow Studio が開きます。

  4. [フロー] タブで、[マップ] 状態をドラッグし、[最初の状態をここにドラッグ] とラベル付けされた空の状態にドロップします。

  5. [設定] タブの [状態名]Process data と入力します。

  6. [アクション] タブから [AWS Lambda 呼び出し] API アクションをドラッグし、[データの処理] 状態の中にドロップします。

  7. AWS Lambda 呼び出し 状態の名前を Process CSV data に変更します。

ステップ 2: マップステートの必須フィールドを設定する

このステップでは、分散マップ状態に必要な次のフィールドを設定します。

  • ItemReaderMap ステートが入力を読み取ることができるデータセットとその場所を指定します。

  • ItemProcessor - 次の値を指定します。

    • ProcessorConfig - ModeExecutionType を、それぞれ DISTRIBUTEDEXPRESS に設定します。これにより、Map ステートの処理モードと、分散マップ状態 が開始する子ワークフロー実行のワークフロータイプが設定されます。

    • StartAt - マップワークフローの最初のステート。

    • States - マップワークフローを定義します。マップワークフローは、子ワークフローを実行するたびに繰り返す一連のステップです。

  • ResultWriter— Step Functions が分散マップの状態結果を書き込む Amazon S3 の場所を指定します。

    重要

    Map Run の結果をエクスポートするために使用する Amazon S3 バケットが AWS アカウント 、 AWS リージョン ステートマシンと同じ場所にあることを確認してください。それ以外の場合は、States.ResultWriterFailed エラーが発生してステートマシンの実行が失敗します。

必須フィールドを設定するには:
  1. [データの処理] 状態を選択し、[設定] タブで次の操作を行います。

    1. [処理モード] は、[分散] を選択します。

    2. [項目ソース] は、[Amazon S3] を選択し、[S3 項目ソース] ドロップダウンリストから [S3 の CSV ファイル] を選択します。

    3. CSV ファイルの Amazon S3 の場所を指定するには、次の手順を実行します。

      1. [S3 オブジェクト] は、ドロップダウンリストから [バケットとキーを入力] を選択します。

      2. [バケット] には、CSV ファイルを含む Amazon S3 バケットの名前を入力します。例えば sourceBucket です。

      3. [キー] には、CSV ファイルを保存した Amazon S3 オブジェクトの名前を入力します。また、このフィールドには、CSV ファイルの名前を指定する必要があります。例えば csvDataset/ratings.csv です。

    4. CSV ファイルの場合は、列ヘッダーの場所も指定する必要があります。これを指定するには、[追加設定] を選択し、CSV ファイルの最初の行がヘッダーの場合は、[CSV ヘッダーの場所] をデフォルトの [最初の行] のままにします。それ以外の場合は、[指定済み] を選択してステートマシン定義内のヘッダーを指定します。詳細については、「ReaderConfig」を参照してください。

    5. [子実行タイプ] には、[Express] を選択します。

  2. [エクスポートの場所] で、マップの実行結果を特定の Amazon S3 の場所にエクスポートするには、[マップステートの出力を Amazon S3 にエクスポートする] を選択します。

  3. 以下の操作を実行します。

    1. [S3 バケット] は、ドロップダウンリストから [バケット名とプレフィックスを入力する] を選択します。

    2. [バケット] には、結果をエクスポートする Amazon S3 バケットの名前を入力します。例えば mapOutputs です。

    3. [プレフィックス] には、結果を保存するフォルダ名を入力します。例えば resultData です。

ステップ 3: 追加オプションを設定する

分散マップ状態に必要な設定に加えて、他のオプションも指定できます。これには、同時に実行する子ワークフローの最大数や、Map 状態の結果をエクスポートする場所などが含まれます。

  1. データの処理状態の場所を指定します。次に、[項目ソース][追加設定] を選択します。

  2. 以下の操作を実行します。

    1. [Modify items with ItemSelector] を選択して、子ワークフローを実行するたびにカスタム JSON 入力を指定します。

    2. 次の JSON 入力を入力します。

      { "index.$": "$$.Map.Item.Index", "value.$": "$$.Map.Item.Value" }

      カスタム入力の作成方法については、「ItemSelector」を参照してください。

  3. [ランタイムの設定][同時実行数の制限] で、分散マップ状態で開始できる子ワークフローの同時実行数を指定します。たとえば、100 と入力します。

  4. ステップ 4: Lambda 関数を設定する で説明されているように、ブラウザで新しいウィンドウまたはタブを開き、このワークフローで使用する Lambda 関数の設定を完了します。

ステップ 4: Lambda 関数を設定する

重要

Lambda AWS リージョン 関数がステートマシンと同じ下にあることを確認してください。

  1. Lambda コンソールを開き、[関数を作成] を選択します。

  2. [関数の作成] ページで、[一から作成] を選択します。

  3. [Basic information] (ベーシックな情報) セクションで、Lambda 関数を構成:

    1. [関数名]distributedMapLambda と入力します。

    2. [ランタイム][Node.js 16.x] を選択します。

    3. デフォルトの選択をすべて維持して、[関数を作成] を選択します。

    4. Lambda 関数を作成したら、ページの右上隅に表示されている関数の Amazon リソースネーム (ARN) をコピーします。これをワークフロープロトタイプに入力する必要があります。ARN をコピーするには、 copy Amazon Resource Name をクリックします。ARN の例を次に示します。

      arn:aws:lambda:us-east-2:123456789012:function:distributedMapLambda
  4. Lambda 関数の次のコードをコピーして、ページの [コードソース] セクションに貼り付けます。distributedMapLambda

    exports.handler = async function(event, context) { console.log("Received Input:\n", event); return { 'statusCode' : 200, 'inputReceived' : event //returns the input that it received } };
  5. デプロイを選択します。関数がデプロイされたら、[テスト] を選択して Lambda 関数の出力を確認します。

ステップ 5: ワークフロープロトタイプを更新する

Step Functions コンソールで、ワークフローを更新して Lambda 関数の ARN を追加します。

  1. ワークフロープロトタイプを作成したタブまたはウィンドウに戻ります。

  2. [CSV データの処理] ステップを選択し、[設定] タブで次の操作を行います。

    1. [統合タイプ] は、[最適化] を選択します。

    2. [関数名] には、Lambda 関数の名前を入力します。表示されるドロップダウンリストから関数を選択するか、[関数名を入力] を選択して Lambda 関数の ARN を入力します。

ステップ 6: 自動生成された Amazon States Language 定義を確認してワークフローを保存する

[アクション] タブおよび [フロー] タブから状態をキャンバスにドラッグアンドドロップすると、Workflow Studio はワークフローの Amazon States Language の定義を、リアルタイムで自動的に作成します。この定義は、必要に応じて編集できます。

  1. (オプション) Inspector パネルで [定義] を選択して、ステートマシンの定義を表示します。

    ヒント

    また、Workflow Studio の コードエディタ で ASL の定義を表示できます。コードエディタで、ワークフローの ASL の定義を編集することもできます。

    次のサンプルコードは、ワークフロー用に自動的に生成された Amazon States Language の定義を示しています。

    { "Comment": "Using Map state in Distributed mode", "StartAt": "Process data", "States": { "Process data": { "Type": "Map", "MaxConcurrency": 100, "ItemReader": { "ReaderConfig": { "InputType": "CSV", "CSVHeaderLocation": "FIRST_ROW" }, "Resource": "arn:aws:states:::s3:getObject", "Parameters": { "Bucket": "sourceBucket", "Key": "csvDataset/ratings.csv" } }, "ItemProcessor": { "ProcessorConfig": { "Mode": "DISTRIBUTED", "ExecutionType": "EXPRESS" }, "StartAt": "Process CSV data", "States": { "Process CSV data": { "Type": "Task", "Resource": "arn:aws:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "Payload.$": "$", "FunctionName": "arn:aws:lambda:us-east-2:123456789012:function:distributedMapLambda" }, "End": true } } }, "Label": "Processdata", "End": true, "ResultWriter": { "Resource": "arn:aws:states:::s3:putObject", "Parameters": { "Bucket": "mapOutputs", "Prefix": "resultData" } }, "ItemSelector": { "index.$": "$$.Map.Item.Index", "value.$": "$$.Map.Item.Value" } } } }
  2. ステートマシンの名前を指定します。これを行うには、デフォルトのステートマシン名の横にある編集アイコンを選択します。MyStateMachine次に、[ステートマシンの設定][ステートマシン名] ボックスに名前を指定します。

    このチュートリアルでは、名前として「DistributedMapDemo」と入力します。

  3. (オプション) [ステートマシンの設定] で、ステートマシンのタイプや実行ロールなど、他のワークフロー設定を指定します。

    このチュートリアルでは、[ステートマシンの設定] のデフォルト設定をすべてそのまま使用します。

  4. [ロールの作成を確認] ダイアログボックスで、[確認] を選択して続行します。

    [ロールの設定を表示] を選択して [ステートマシンの設定] に戻ることもできます。

    注記

    Step Functions が作成した IAM ロールを削除すると、Step Functions を後で再作成することはできません。同様に、ロールを変更すると (例えば、IAM ポリシーのプリンシパルから Step Functions を削除するなど)、後で Step Functions でそれを元の設定に復元することはできません。

ステップ 7: ステートマシンを実行する

実行とは、タスク実行のためにワークフローを実行するステートマシンのインスタンスのことです。

  1. DistributedMapDemoページで [実行開始] を選択します。

  2. [実行を開始] ダイアログボックスで、以下の操作を行います。

    1. (オプション) 実行を識別するには、[名前] ボックスに名前を指定します。デフォルトでは、Step Functions は自動的に一意の実行名を生成します。

      注記

      Step Functions では、ステートマシン、実行、アクティビティ、ラベルに、ASCII 以外の文字を含む名前を作成できます。これらの非ASCII名は、Amazonでは機能しません。 CloudWatch CloudWatch メトリクスを追跡できるようにするには、ASCII 文字のみを使用する名前を選択してください。

    2. (オプション) [入力] ボックスに、JSON 形式の入力値を入力してワークフローを実行します。

    3. [実行のスタート] を選択します。

    4. Step Functions コンソールから実行 ID のタイトルが付いたページが表示されます。このページは、[実行の詳細] ページと呼ばれます。このページでは、実行の進行中または完了後に実行結果を確認できます。

      実行結果を確認するには、[グラフビュー] で個々の状態を選択し、ステップの詳細 ペインの個々のタブを選択すると、入力、出力、定義などの各状態の詳細がそれぞれ表示されます。[実行の詳細] ページに表示できる実行情報の詳細については、「[実行の詳細] ページ - インターフェイスの概要」を参照してください。

    例えば、Map 状態を選択してから [マップ実行] を選択すると、[マップ実行の詳細] ページが開きます。このページでは、分散マップ状態と、その状態が開始した子ワークフロー実行に関するすべての実行の詳細を表示できます。このページの詳細については、「マップ実行の確認」を参照してください。