Step Functions を使用して Amazon EMR を呼び出す - AWS Step Functions

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Step Functions を使用して Amazon EMR を呼び出す

Step Functions AWS は特定のサービスを Amazon ステートメント言語 (ASL) から直接制御できます。詳細については、「他の サービスでの使用」および「サービス API にパラメータを渡す」を参照してください。

最適化された Amazon EMR インテグレーションと Amazon EMR AWS SDK インテグレーションの違い

最適化された Amazon EMR サービス統合には、以下で説明するような基になる Amazon EMR API をラップするカスタマイズされた一連の API があります。このため、Amazon EMR AWS SDK サービスの統合とは大きく異なります。また、ジョブの実行 (.sync) 統合パターンがサポートされています。

Amazon EMR AWS Step Functions と統合するには、提供されている Amazon EMR サービス統合 API を使用します。サービス統合 API は対応する Amazon EMR API に似ていますが、渡されるフィールドと返される応答にいくつかの違いがあります。

Step Functions は、実行が停止しても Amazon EMR クラスターを自動的に終了しません。Amazon EMR クラスターが終了する前にステートマシンが停止した場合、クラスターは無期限に実行され、追加料金が発生する可能性があります。これを回避するには、作成した Amazon EMR クラスターが正しく終了していることを確認してください。詳細については、以下を参照してください。

注記

emr-5.28.0 の時点で、クラスターの作成時に StepConcurrencyLevel パラメータを指定して、単一のクラスターで複数のステップを並行して実行することを許可します。Step Functions Map および Parallel 状態を使用して、並行して作業をクラスターに送信できます。

Amazon EMR サービス統合の可用性は、Amazon EMR API の可用性により決定します。特殊リージョンにおける制限については、Amazon EMR をチェックしてください。

注記

Amazon EMR との統合のため、Step Functions は最初の 10 分とその後の 300 秒間、ジョブポーリング頻度をハードコーディングして 60 秒に設定しています。

各サービス統合 API と対応する Amazon EMR API の違いを次の表に示します。

Amazon EMR サービス統合 API および対応する Amazon EMR API
Amazon EMR サービス統合 API 対応する EMR API 差異
createCluster

新しいクラスター (ジョブフロー) を作成して実行を開始します。

Amazon EMR はサービスリンクロールとして知られる IAM ロールの一意のタイプに直接リンクされています。createClustercreateCluster.sync が機能するには、サービスリンクロール AWSServiceRoleForEMRCleanup を作成するために必要なアクセス許可が設定されている必要があります。IAM 許可ポリシーに追加できるステートメントなど、この詳細については、Amazon EMR のサービスリンクロールを使用するを参照してください。

runJobFlow createCluster以下を除いてと同じリクエスト構文を使用します。runJobFlow
  • このフィールド Instances.KeepJobFlowAliveWhenNoSteps は必須で、ブール値 TRUE である必要があります。

  • フィールド Steps は許可されていません。

  • フィールド Instances.InstanceFleets[index].Name は指定する必要があり、オプションの modifyInstanceFleetByName コネクタ API を使用する場合は一意である必要があります。

  • フィールド Instances.InstanceGroups[index].Name は指定する必要があり、オプションの modifyInstanceGroupByName API を使用する場合は一意である必要があります。

レスポンスは次のとおりです。
{ "ClusterId": "string" }
Amazon EMR は以下を使用します。
{ "JobFlowId": "string" }
createCluster.sync

新しいクラスター (ジョブフロー) を作成して実行を開始します。

runJobFlow createCluster と同じですが、クラスターが WAITING 状態になるまで待機します。
setClusterTermination保護

クラスター (ジョブフロー) をロックして、クラスター内の EC2 インスタンスをユーザーの介入、API コール、またはジョブフローエラーが発生した場合に終了できないようにします。

setTerminationProtection リクエストは以下を使用します。
{ "ClusterId": "string" }
Amazon EMR は以下を使用します。
{ "JobFlowIds": ["string"] }
terminateCluster

クラスター (ジョブフロー) をシャットダウンします。

terminateJobFlows リクエストは以下を使用します。
{ "ClusterId": "string" }
Amazon EMR は以下を使用します。
{ "JobFlowIds": ["string"] }
terminateCluster.sync

クラスター (ジョブフロー) をシャットダウンします。

terminateJobFlows terminateCluster と同じですが、クラスターが終了するまで待機します。
addStep

実行中のクラスターに新しいステップを追加します。

オプションで、この API ExecutionRoleArn を使用する際に パラメータを指定することもできます。

addJobFlow手順

リクエストはキー "ClusterId" を使用します。Amazon EMR は "JobFlowId" を使用します。リクエストは 1 つのステップを使用します。
{ "Step": <"StepConfig object"> }
Amazon EMR は以下を使用します。
{ "Steps": [<StepConfig objects>] }
レスポンスは次のとおりです。
{ "StepId": "string" }
Amazon EMR はこれを返します。
{ "StepIds": [<strings>] }
addStep.sync

実行中のクラスターに新しいステップを追加します。

オプションで、この API ExecutionRoleArn を使用する際に パラメータを指定することもできます。

addJobFlowステップ

addStep と同じですが、ステップが完了するまで待機します。
cancelStep

実行中のクラスターで保留中のステップを取り消します。

cancelSteps リクエストは以下を使用します。
{ "StepId": "string" }
Amazon EMR は以下を使用します。
{ "StepIds": [<strings>] }
レスポンスは次のとおりです。
{ "CancelStepsInfo": <CancelStepsInfo object> }
Amazon EMR は以下を使用します。
{ "CancelStepsInfoList": [<CancelStepsInfo objects>] }
modifyInstanceFleetByName

指定された InstanceFleetName を使用して、インスタンスフリートのターゲットオンデマンドおよびターゲットスポット容量を変更します。

modifyInstanceFleet リクエストは modifyInstanceFleet の場合と同じですが、以下が異なります。
  • フィールド Instance.InstanceFleetId は許可されていません。

  • 実行時に、InstanceFleetIdListInstanceFleets を呼び出して結果を解析することにより、サービス統合によって自動的に決定されます。

modifyInstanceGroupByName

インスタンスグループのノード数と構成設定を変更します。

modifyInstanceGroups リクエストは次のとおりです。
{ "ClusterId": "string", "InstanceGroup": <InstanceGroupModifyConfig object> }
Amazon EMR は以下のリストを使用します。
{ "ClusterId": ["string"], "InstanceGroups": [<InstanceGroupModifyConfig objects>] }

InstanceGroupModifyConfig オブジェクト内では、フィールド InstanceGroupId は使用できません。

新しいフィールド InstanceGroupName が追加されました。実行時に、InstanceGroupIdListInstanceGroups を呼び出して結果を解析することにより、サービス統合によって自動的に決定されます。

以下にはクラスターを作成する Task 状態が含まれています。

"Create_Cluster": { "Type": "Task", "Resource": "arn:aws:states:::elasticmapreduce:createCluster.sync", "Parameters": { "Name": "MyWorkflowCluster", "VisibleToAllUsers": true, "ReleaseLabel": "emr-5.28.0", "Applications": [ { "Name": "Hive" } ], "ServiceRole": "EMR_DefaultRole", "JobFlowRole": "EMR_EC2_DefaultRole", "LogUri": "s3n://aws-logs-123456789012-us-east-1/elasticmapreduce/", "Instances": { "KeepJobFlowAliveWhenNoSteps": true, "InstanceFleets": [ { "InstanceFleetType": "MASTER", "Name": "MASTER", "TargetOnDemandCapacity": 1, "InstanceTypeConfigs": [ { "InstanceType": "m4.xlarge" } ] }, { "InstanceFleetType": "CORE", "Name": "CORE", "TargetOnDemandCapacity": 1, "InstanceTypeConfigs": [ { "InstanceType": "m4.xlarge" } ] } ] } }, "End": true }

以下には終了保護を有効にする Task 状態が含まれています。

"Enable_Termination_Protection": { "Type": "Task", "Resource": "arn:aws:states:::elasticmapreduce:setClusterTerminationProtection", "Parameters": { "ClusterId.$": "$.ClusterId", "TerminationProtected": true }, "End": true }

以下にはクラスターにステップを送信する Task 状態が含まれています。

"Step_One": { "Type": "Task", "Resource": "arn:aws:states:::elasticmapreduce:addStep.sync", "Parameters": { "ClusterId.$": "$.ClusterId", "ExecutionRoleArn": "arn:aws:iam::123456789012:role/myEMR-execution-role", "Step": { "Name": "The first step", "ActionOnFailure": "CONTINUE", "HadoopJarStep": { "Jar": "command-runner.jar", "Args": [ "hive-script", "--run-hive-script", "--args", "-f", "s3://<region>.elasticmapreduce.samples/cloudfront/code/Hive_CloudFront.q", "-d", "INPUT=s3://<region>.elasticmapreduce.samples", "-d", "OUTPUT=s3://<mybucket>/MyHiveQueryResults/" ] } } }, "End": true }

以下には、ステップをキャンセルする Task 状態が含まれます。

"Cancel_Step_One": { "Type": "Task", "Resource": "arn:aws:states:::elasticmapreduce:cancelStep", "Parameters": { "ClusterId.$": "$.ClusterId", "StepId.$": "$.AddStepsResult.StepId" }, "End": true }

以下には、クラスターを終了する Task 状態を示します。

"Terminate_Cluster": { "Type": "Task", "Resource": "arn:aws:states:::elasticmapreduce:terminateCluster.sync", "Parameters": { "ClusterId.$": "$.ClusterId" }, "End": true }

以下には、インスタンスグループに合わせてクラスターをスケールアップまたはスケールダウンする Task 状態が含まれています。

"ModifyInstanceGroupByName": { "Type": "Task", "Resource": "arn:aws:states:::elasticmapreduce:modifyInstanceGroupByName", "Parameters": { "ClusterId": "j-1234567890123", "InstanceGroupName": "MyCoreGroup", "InstanceGroup": { "InstanceCount": 8 } }, "End": true }

以下には、インスタンスフリートに合わせてクラスターをスケールアップまたはスケールダウンする Task 状態が含まれています。

"ModifyInstanceFleetByName": { "Type": "Task", "Resource": "arn:aws:states:::elasticmapreduce:modifyInstanceFleetByName", "Parameters": { "ClusterId": "j-1234567890123", "InstanceFleetName": "MyCoreFleet", "InstanceFleet": { "TargetOnDemandCapacity": 8, "TargetSpotCapacity": 0 } }, "End": true }

IAMStep Functions AWS 他のサービスと併用する場合の権限の設定方法については、を参照してください統合サービスの IAM ポリシー