マップステートを分散モードで使用して大規模な並行ワークロードをオーケストレーションする - AWS Step Functions

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

マップステートを分散モードで使用して大規模な並行ワークロードをオーケストレーションする

Step Functions を使用すると、大規模な並行ワークロードをオーケストレーションして、半構造化データのオンデマンド処理などのタスクを実行できます。これらの並行ワークロードにより、Amazon S3 に保存されている大規模なデータソースを同時に処理できます。例えば、大量のデータを含む 1 つの JSON または CSV ファイルを処理できます。あるいは、大量の Amazon S3 オブジェクトを処理する場合もあります。

ワークフローに大規模な並行ワークロードを設定するには、Map 状態を分散モードに含めます。マップステートは、データセット内のアイテムを同時に処理します。[分散] に設定された Map ステートは、分散マップ状態と呼ばれます。分散モードでは、Map 状態によって高度な同時処理が可能になります。分散モードでは、Map 状態はデータセット内の項目を子ワークフロー実行と呼ばれる反復処理を行います。並行して実行できる子ワークフローの実行数を指定できます。それぞれの子ワークフローの実行には、親ワークフローとは別の実行履歴があります。指定しない場合、Step Functions は 10,000 件の子ワークフローを並列で実行します。

次の図は、ワークフローに大規模な並行ワークロードを設定する方法を示しています。


            大規模な並行ワークロードのオーケストレーションの概念を説明する図。

重要な用語

分散モード

マップステートの処理モード。このモードでは、Map 状態の各反復処理が子ワークフロー実行として実行されるため、高い同時実行性が可能になります。子ワークフローの実行にはそれぞれ独自の実行履歴があり、親ワークフローの実行履歴とは別のものです。このモードは、大規模な Amazon S3 のデータソースからの入力の読み取りをサポートします。

分散マップ状態

[分散] 処理モードに設定されたマップステート。

マップワークフロー

Map 状態が実行する一連のステップ。

親ワークフロー

1 つ以上の分散マップ状態を含むワークフロー。

子ワークフロー実行

分散マップ状態の反復。子ワークフローの実行には独自の実行履歴があり、親ワークフローの実行履歴とは別のものです。

マップ実行

分散モードで Map 状態を実行すると、Step Functions はマップ実行リソースを作成します。マップ実行とは、分散マップ状態によって開始する一連の子ワークフロー実行、およびこれらの実行をコントロールするランタイム設定を指します。Step Functions は、マップの実行に Amazon リソースネーム (ARN) を割り当てます。マップ実行は、Step Functions コンソールで確認できます。DescribeMapRun API アクションを呼び出すこともできます。Map Run CloudWatch はメトリクスをにも送信します。

詳細については、「マップ実行の確認」を参照してください。

分散マップ状態の定義の例

次の条件を満たす大規模な並列ワークロードをオーケストレーションする必要がある場合は、並列モードモードの Map 状態を使用します。

  • データセットのサイズが 256 KB を超えています。

  • ワークフローの実行イベント履歴が 25,000 エントリを超えています。

  • 40 回以上の反復処理の同時実行が必要です。

次の分散マップ状態の定義例では、データセットを Amazon S3 バケットに格納された CSV ファイルとして指定しています。また、CSV ファイルの各行のデータを処理する Lambda 関数も指定しています。この例では CSV ファイルを使用しているため、CSV 列ヘッダーの場所も指定しています。この例のステートマシン定義の詳細については、「分散マップを使用した大規模 CSV データのコピー」のチュートリアルを参照してください。

{ "Map": { "Type": "Map", "ItemReader": { "ReaderConfig": { "InputType": "CSV", "CSVHeaderLocation": "FIRST_ROW" }, "Resource": "arn:aws:states:::s3:getObject", "Parameters": { "Bucket": "Database", "Key": "csv-dataset/ratings.csv" } }, "ItemProcessor": { "ProcessorConfig": { "Mode": "DISTRIBUTED", "ExecutionType": "EXPRESS" }, "StartAt": "LambdaTask", "States": { "LambdaTask": { "Type": "Task", "Resource": "arn:aws:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "Payload.$": "$", "FunctionName": "arn:aws:lambda:us-east-2:123456789012:function:processCSVData" }, "End": true } } }, "Label": "Map", "End": true, "ResultWriter": { "Resource": "arn:aws:states:::s3:putObject", "Parameters": { "Bucket": "myOutputBucket", "Prefix": "csvProcessJobs" } } } }

分散マップを実行するアクセス許可

ワークフローに分散マップ状態を含める場合、Step Functions には、ステートマシンのロールで分散マップ状態StartExecution API アクションを呼び出すための適切な許可が必要です。

次の IAM ポリシー例では、ステートマシンのロールで分散マップ状態を実行するために必要な最小特権を付与しています。

注記

必ず stateMachineName を、分散マップ状態を使用しているステートマシン名に置き換えてください。例えば arn:aws:states:us-east-2:123456789012:stateMachine:mystateMachine です。

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "states:StartExecution" ], "Resource": [ "arn:aws:states:region:accountID:stateMachine:stateMachineName" ] }, { "Effect": "Allow", "Action": [ "states:DescribeExecution", "states:StopExecution" ], "Resource": "arn:aws:states:region:accountID:execution:stateMachineName:*" } ] }

さらに、Amazon S3 バケットなど、Distributed Map AWS 状態で使用されるリソースにアクセスするのに必要な最小限の権限を持っていることを確認する必要があります。詳細については、分散マップ状態を使用するための IAM ポリシー を参照してください。

分散マップ状態のフィールド

ワークフローで分散マップ状態を使用するには、これらのフィールドを 1 つ以上指定します。これらのフィールドは、共通状態フィールドに加えて指定します。

Type (必須)

状態のタイプ (Map など) を設定します。

ItemProcessor (必須)

Map 状態処理モードと定義を指定する次の JSON オブジェクトが含まれます。

  • ProcessorConfig - Map 状態の設定を指定する JSON オブジェクト。このオブジェクトには、以下のサブフィールドが含まれています。

    • Mode - Map 状態を分散モードで使用するには DISTRIBUTED に設定します。

      注記

      現在、Express ワークフロー内で Map 状態を使用する場合、ModeDISTRIBUTED に設定することはできません。ただし、Standard ワークフロー内で Map 状態を使用する場合、ModeDISTRIBUTED に設定できます。

    • ExecutionType - マップワークフローの実行タイプを [STANDARD] または [EXPRESS] のいずれかに指定します。DISTRIBUTEDModeサブフィールドに指定した場合は、このフィールドを指定する必要があります。ワークフロータイプの詳細については、標準ワークフロー対 Express ワークフロー を参照してください。

  • StartAt - ワークフローの最初の状態を示す文字列を指定します。この文字列は、大文字と小文字が区別され、いずれかの状態オブジェクトの名前と完全に一致する必要があります。この状態は、データセット内の各アイテムで最初に実行されます。Map 状態に提供した実行入力は、まず StartAt 状態に渡されます。

  • States – カンマで区切られた一連の状態を含む JSON オブジェクト。このオブジェクトでは、Map workflowを定義します。

ItemReader

データセットとその場所を指定します。Map 状態は指定されたデータセットから入力データを受け取ります。

分散モードでは、以前の状態から渡された JSON ペイロードまたは大規模な Amazon S3 データソースのいずれかをデータセットとして使用できます。詳細については、「ItemReader」を参照してください。

ItemsPath (オプション)

ステート入力内の項目の配列を含む JSON JsonPathノードを選択する構文を使用して参照パスを指定します

分散モードでは、前のステップの JSON 配列を状態入力として使用する場合にのみ、このフィールドを指定します。詳細については、「ItemsPath」を参照してください。

ItemSelector (オプション)

Map 状態反復に渡される前に、個々のデータセットアイテムの値をオーバーライドします。

このフィールドでは、キーと値のペアのコレクションを含む有効な JSON 入力を指定します。これらのペアは、ステートマシン定義で定義する静的な値でも、パスを使用して状態入力から選択された値でも、コンテキストオブジェクトからアクセスされる値でもかまいません。詳細については、「ItemSelector」を参照してください。

ItemBatcher (オプション)

データセットアイテムの一括処理を指定します。子ワークフローを実行するたびに、入力としてこれらのバッチを受け取ります。詳細については、「ItemBatcher」を参照してください。

MaxConcurrency (オプション)

並行して実行できる子ワークフローの実行数を指定します。インタープリタは、指定された回数までの子ワークフローの並行実行のみを許可します。同時実行値を指定しなかったり 0 に設定したりしない場合、Step Functions は同時実行を制限せず、子ワークフローを 10,000 回並行実行します。

注記

子ワークフローのparallel 実行にはより高い同時実行制限を指定できますが、 AWS などのダウンストリームサービスの容量を超えないようにすることをお勧めします。 AWS Lambda

MaxConcurrencyPath (オプション)

リファレンスパスを使用して状態の入力からタイムアウト値を動的に最大同時実行値を指定する場合は、MaxConcurrencyPath を使用してください。解決されると、リファレンスパスは、値が負でない整数のフィールドを選択する必要があります。

注記

Map 状態には MaxConcurrencyMaxConcurrencyPath の両方を含めることはできません。

ToleratedFailurePercentage (オプション)

マップ実行で許容できる失敗項目の割合を定義します。この割合を超えると、マップ実行は自動的に失敗します。Step Functions は、失敗またはタイムアウトしたアイテムの総数をアイテムの総数で割った結果として、失敗したアイテムの割合を計算します。0 から 100 までの値を指定する必要があります。詳細については、「分散マップ状態の許容される失敗しきい値」を参照してください。

ToleratedFailurePercentagePath (オプション)

リファレンスパスを使用して状態の入力から許容される失敗の割合の値を動的に指定したい場合は、ToleratedFailurePercentagePath を使用してください。解決されると、リファレンスパスは、値が 0~100の フィールドを選択する必要があります。

ToleratedFailureCount (オプション)

マップ実行で許容される障害アイテムの数を定義します。この数を超えると、マップ実行は自動的に失敗します。詳細については、「分散マップ状態の許容される失敗しきい値」を参照してください。

ToleratedFailureCountPath (オプション)

リファレンスパスを使用して状態の入力から許容される失敗数の値を動的に指定したい場合は、ToleratedFailureCountPath を使用してください。解決されると、リファレンスパスは、値が負でない整数のフィールドを選択する必要があります。

Label (オプション)

Map 状態を一意に識別する文字列。Step Functions は、マップ実行ごとにマップ実行 ARN にラベルを追加します。以下は、demoLabel という名前のカスタムラベルを持つマップ実行 ARN の例です。

arn:aws:states:us-east-1:123456789012:mapRun:demoWorkflow/demoLabel:3c39a231-69bb-3d89-8607-9e124eddbb0b

ラベルを指定しない場合、Step Functions では一意のラベルが自動的に生成されます。

注記

ラベルは 40 文字を超える文字を含めることができず、1 台のステートマシン定義内で一意である必要があり、次の文字を含めることはできません。

  • ホワイトスペース文字

  • ワイルドカード文字 (? *)

  • 角かっこ (< > { } [ ])

  • 特殊文字 (: ; , \ | ^ ~ $ # % & ` ")

  • 制御文字 (\\u0000 - \\u001f または \\u007f - \\u009f)

Step Functions では、ステートマシン、実行、アクティビティ、ラベルに、ASCII 以外の文字を含む名前を作成できます。これらの非ASCII名はAmazonでは機能しません。 CloudWatch CloudWatch メトリクスを追跡できるようにするには、ASCII 文字のみを使用する名前を選択してください。

ResultWriter (オプション)

Step Functions がすべての子ワークフローの実行結果を書き込む Amazon S3 内の場所を指定します。

Step Functions は、実行の入出力、ARN、実行状態など、すべての子ワークフローの実行データを統合します。次に、指定した Amazon S3 の場所のそれぞれのファイルに、同じステータスの実行をエクスポートします。詳細については、「ResultWriter」を参照してください。

Map 状態の結果をエクスポートしない場合、すべての子ワークフロー実行結果の配列が返されます。例:

[1, 2, 3, 4, 5]
ResultPath (オプション)

反復の出力を配置する入力内の場所を指定します。その後、入力は OutputPath フィールド (ある場合) に従ってフィルタリングされてから状態の出力として渡されます。詳細については、入力および出力処理を参照してください。

ResultSelector (オプション)

値が静的であるか、結果から選択されたキーバリューのペアの集合を渡します。詳細については、「ResultSelector」を参照してください。

ヒント

ステートマシンで使用している並列またはマップステートが配列の配列を返す場合は、ResultSelector フィールドを使用して配列をフラットな配列に変換できます。詳細については、「配列の配列の平坦化」を参照してください。

Retry (オプション)

Retrier と呼ばれるオブジェクトの配列。再試行ポリシーを定義します。状態でランタイムエラーが発生した場合、実行では再試行ポリシーが使用されます。詳細については、「Retry と Catch を使用するステートマシンの例」を参照してください。

注記

分散マップ状態に Retrier を定義すると、Map ステートが開始したすべての子ワークフロー実行に再試行ポリシーが適用されます。例えば、Map 状態で 3 つの子ワークフロー実行が開始され、そのうちの 1 つが失敗したとします。障害が発生すると、実行では Map 状態の Retry フィールド (定義されている場合) が使用されます。再試行ポリシーは、失敗した実行だけでなく、すべての子ワークフロー実行に適用されます。1 つ以上の子ワークフローの実行が失敗すると、マップ実行は失敗します。

Map 状態を再試行すると、新しいマップ実行が作成されます。

Catch (オプション)

Catcher と呼ばれるオブジェクトの配列で、フォールバック状態を定義します。Step Functions は、状態で実行時エラーが発生した場合に、Catch で定義されている Catcher を使用します。エラーが発生すると、実行は最初に Retry で定義されている Retrier を使用します。再試行ポリシーが定義されていないか使い果たされた場合、実行ではその Catcher (定義されている場合) を使用します。詳細については、「フォールバック状態」を参照してください。

次のステップ

分散マップ状態の学習を続けるには、次のリソースを参照してください。