使用 Step Functions 建立和管理 Amazon EMR 叢集 - AWS Step Functions

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

使用 Step Functions 建立和管理 Amazon EMR 叢集

瞭解如何整合 AWS Step Functions 與 Amazon EMR 使用提供的 Amazon EMR 服務集成APIs。服務整APIs合類似於對應的 Amazon EMRAPIs,傳遞的欄位和傳回的回應有些許差異。

若要瞭解如何整合 AWS 服務在 Step Functions 中,請參閱整合 服務和。將參數傳遞給 Step Functions 數API中的服務

優化 Amazon EMR 集成的主要功能
  • 優化的 Amazon EMR 服務集成具有一組定制APIs的包裝基礎 Amazon EMRAPIs,如下所述。正因為如此,它與 Amazon 顯著不同 EMR AWS SDK服務整合。

  • 支援執行任務 (.sync)整合模式。

如果停止執行,Step Functions 不會自動終止 Amazon EMR 叢集。如果您的狀態機器在 Amazon EMR 叢集終止之前停止,您的叢集可能會無限期地繼續執行,並可能會產生額外費用。為避免這種情況,請確保您建立的任何 Amazon EMR 叢集都已正確終止。如需詳細資訊,請參閱:

注意

截至目前emr-5.28.0,您可以在建立叢集StepConcurrencyLevel時指定參數,以允許在單一叢集上 parallel 執行多個步驟。您可以使用 Step Functions MapParallel狀態來提交與叢集 parallel 的工作。

Amazon EMR 服務集成的可用性取決於 Amazon 的可用性EMRAPIs。如需特殊區域的限制,請參閱 Amazon EMR 文件。

注意

為了與 Amazon 整合EMR,Step Functions 在此之後的前 10 分鐘和 300 秒內具有硬編碼的 60 秒任務輪詢頻率。

支持 Amazon EMR APIs

下表說明每個 Amazon EMR 服務整合API與對應 Amazon 之間的差異EMRAPIs。

Amazon EMR 服務集成 API 對應 EMR API 差異
createCluster

建立並開始執行叢集 (任務流程)。

Amazon 直EMR接連結到稱為服務連結IAM角色的唯一角色類型。為了讓 createClustercreateCluster.sync 運作,您必須設定必要的許可來建立服務連結角色 AWSServiceRoleForEMRCleanup。有關此方面的詳細資訊,包括可新增至IAM許可政策的聲明,請參閱使用 Amazon EMR 的服務連結角色

runJobFlow createCluster使用與下列項目相同的要求語法:runJobFlow
  • Instances.KeepJobFlowAliveWhenNoSteps 欄位是強制性的,且必須具有布林值 TRUE

  • 不允許 Steps 欄位。

  • Instances.InstanceFleets[index].Name應該提供欄位,如果使用選用的modifyInstanceFleetByName連接器,則必須API是唯一的。

  • Instances.InstanceGroups[index].Name應該提供此欄位,如果使用選擇性欄位,則必須modifyInstanceGroupByNameAPI是唯一的。

回應為:
{ "ClusterId": "string" }
Amazon EMR 使用這個:
{ "JobFlowId": "string" }
createCluster. 同步。

建立並開始執行叢集 (任務流程)。

runJobFlow createCluster 一樣,但會等待叢集到達 WAITING 狀態。
setClusterTermination保護

鎖定叢集 (工作流程),使叢集中的EC2執行個體無法因使用者介入、API呼叫或工作流程錯誤而終止。

setTerminationProtection 請求會使用:
{ "ClusterId": "string" }
Amazon EMR 使用這個:
{ "JobFlowIds": ["string"] }
terminateCluster

關閉叢集 (任務流程)。

terminateJobFlows 請求會使用:
{ "ClusterId": "string" }
Amazon EMR 使用這個:
{ "JobFlowIds": ["string"] }
terminateCluster. 同步。

關閉叢集 (任務流程)。

terminateJobFlows terminateCluster 相同,但會等待叢集終止。
addStep

新增步驟至執行中叢集。

或者,您也可以在使用此ExecutionRoleArn參數時指定參數API。

addJobFlow步驟

請求使用密鑰"ClusterId"。Amazon EMR 使用"JobFlowId"。請求會使用單一步驟。
{ "Step": <"StepConfig object"> }
Amazon EMR 使用這個:
{ "Steps": [<StepConfig objects>] }
回應為:
{ "StepId": "string" }
Amazon EMR 返回這個:
{ "StepIds": [<strings>] }
addStep. 同步。

新增步驟至執行中叢集。

或者,您也可以在使用此ExecutionRoleArn參數時指定參數API。

addJobFlow步驟

addStep 相同,但會等待步驟完成。
cancelStep

取消執行中叢集中的擱置步驟。

cancelSteps 請求會使用:
{ "StepId": "string" }
Amazon EMR 使用這個:
{ "StepIds": [<strings>] }
回應為:
{ "CancelStepsInfo": <CancelStepsInfo object> }
Amazon EMR 使用這個:
{ "CancelStepsInfoList": [<CancelStepsInfo objects>] }
modifyInstanceFleetByName

針對具有所指定 InstanceFleetName 的執行個體機群,修改其目標隨需容量和目標 Spot 容量。

modifyInstanceFleet 請求與 modifyInstanceFleet 相同,除了:
  • 不允許 Instance.InstanceFleetId 欄位。

  • 在執行時間,InstanceFleetId 是由服務整合自動決定,方法為呼叫 ListInstanceFleets 並剖析結果。

modifyInstanceGroupByName

修改執行個體群組的節點數量和組態設定。

modifyInstanceGroups 請求為:
{ "ClusterId": "string", "InstanceGroup": <InstanceGroupModifyConfig object> }
Amazon EMR 使用一個列表:
{ "ClusterId": ["string"], "InstanceGroups": [<InstanceGroupModifyConfig objects>] }

InstanceGroupModifyConfig 物件中,不允許 InstanceGroupId 欄位。

已新增新欄位 InstanceGroupName。在執行時間,InstanceGroupId 是由服務整合自動決定,方法為呼叫 ListInstanceGroups 並剖析結果。

工作流程例

以下包含建立叢集的 Task 狀態。

"Create_Cluster": { "Type": "Task", "Resource": "arn:aws:states:::elasticmapreduce:createCluster.sync", "Parameters": { "Name": "MyWorkflowCluster", "VisibleToAllUsers": true, "ReleaseLabel": "emr-5.28.0", "Applications": [ { "Name": "Hive" } ], "ServiceRole": "EMR_DefaultRole", "JobFlowRole": "EMR_EC2_DefaultRole", "LogUri": "s3n://aws-logs-123456789012-us-east-1/elasticmapreduce/", "Instances": { "KeepJobFlowAliveWhenNoSteps": true, "InstanceFleets": [ { "InstanceFleetType": "MASTER", "Name": "MASTER", "TargetOnDemandCapacity": 1, "InstanceTypeConfigs": [ { "InstanceType": "m4.xlarge" } ] }, { "InstanceFleetType": "CORE", "Name": "CORE", "TargetOnDemandCapacity": 1, "InstanceTypeConfigs": [ { "InstanceType": "m4.xlarge" } ] } ] } }, "End": true }

以下包含啟用終止保護的 Task 狀態。

"Enable_Termination_Protection": { "Type": "Task", "Resource": "arn:aws:states:::elasticmapreduce:setClusterTerminationProtection", "Parameters": { "ClusterId.$": "$.ClusterId", "TerminationProtected": true }, "End": true }

以下包含提交步驟至叢集的 Task 狀態。

"Step_One": { "Type": "Task", "Resource": "arn:aws:states:::elasticmapreduce:addStep.sync", "Parameters": { "ClusterId.$": "$.ClusterId", "ExecutionRoleArn": "arn:aws:iam::123456789012:role/myEMR-execution-role", "Step": { "Name": "The first step", "ActionOnFailure": "CONTINUE", "HadoopJarStep": { "Jar": "command-runner.jar", "Args": [ "hive-script", "--run-hive-script", "--args", "-f", "s3://<region>.elasticmapreduce.samples/cloudfront/code/Hive_CloudFront.q", "-d", "INPUT=s3://<region>.elasticmapreduce.samples", "-d", "OUTPUT=s3://<amzn-s3-demo-bucket>/MyHiveQueryResults/" ] } } }, "End": true }

以下包含取消步驟的 Task 狀態。

"Cancel_Step_One": { "Type": "Task", "Resource": "arn:aws:states:::elasticmapreduce:cancelStep", "Parameters": { "ClusterId.$": "$.ClusterId", "StepId.$": "$.AddStepsResult.StepId" }, "End": true }

以下包含終止叢集的 Task 狀態。

"Terminate_Cluster": { "Type": "Task", "Resource": "arn:aws:states:::elasticmapreduce:terminateCluster.sync", "Parameters": { "ClusterId.$": "$.ClusterId" }, "End": true }

以下包含一種可對執行個體群組的叢集進行擴增或縮減的 Task 狀態。

"ModifyInstanceGroupByName": { "Type": "Task", "Resource": "arn:aws:states:::elasticmapreduce:modifyInstanceGroupByName", "Parameters": { "ClusterId": "j-1234567890123", "InstanceGroupName": "MyCoreGroup", "InstanceGroup": { "InstanceCount": 8 } }, "End": true }

以下包含一種可對執行個體機群的叢集進行擴增或縮減的 Task 狀態。

"ModifyInstanceFleetByName": { "Type": "Task", "Resource": "arn:aws:states:::elasticmapreduce:modifyInstanceFleetByName", "Parameters": { "ClusterId": "j-1234567890123", "InstanceFleetName": "MyCoreFleet", "InstanceFleet": { "TargetOnDemandCapacity": 8, "TargetSpotCapacity": 0 } }, "End": true }

IAM致電 Amazon 政策 EMR

下面的示例模板顯示如何 AWS Step Functions 根據狀態機器定義中的資源產生IAM策略。如需詳細資訊,請參閱 Step Functions 式如何為整合式服務產生IAM原則探索 Step Functions 中的服務整合模式

addStep

靜態資源

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "elasticmapreduce:AddJobFlowSteps", "elasticmapreduce:DescribeStep", "elasticmapreduce:CancelSteps" ], "Resource": [ "arn:aws:elasticmapreduce:[[region]]:[[accountId]]:cluster/[[clusterId]]" ] } ] }

動態資源

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "elasticmapreduce:AddJobFlowSteps", "elasticmapreduce:DescribeStep", "elasticmapreduce:CancelSteps" ], "Resource": "arn:aws:elasticmapreduce:*:*:cluster/*" } ] }

cancelStep

靜態資源

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": "elasticmapreduce:CancelSteps", "Resource": [ "arn:aws:elasticmapreduce:[[region]]:[[accountId]]:cluster/[[clusterId]]" ] } ] }

動態資源

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": "elasticmapreduce:CancelSteps", "Resource": "arn:aws:elasticmapreduce:*:*:cluster/*" } ] }

createCluster

靜態資源

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "elasticmapreduce:RunJobFlow", "elasticmapreduce:DescribeCluster", "elasticmapreduce:TerminateJobFlows" ], "Resource": "*" }, { "Effect": "Allow", "Action": "iam:PassRole", "Resource": [ "arn:aws:iam::{{account}}:role/[[roleName]]" ] } ] }

setClusterTerminationProtection

靜態資源

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": "elasticmapreduce:SetTerminationProtection", "Resource": [ "arn:aws:elasticmapreduce:[[region]]:[[accountId]]:cluster/[[clusterId]]" ] } ] }

動態資源

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": "elasticmapreduce:SetTerminationProtection", "Resource": "arn:aws:elasticmapreduce:*:*:cluster/*" } ] }

modifyInstanceFleetByName

靜態資源

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "elasticmapreduce:ModifyInstanceFleet", "elasticmapreduce:ListInstanceFleets" ], "Resource": [ "arn:aws:elasticmapreduce:[[region]]:[[accountId]]:cluster/[[clusterId]]" ] } ] }

動態資源

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "elasticmapreduce:ModifyInstanceFleet", "elasticmapreduce:ListInstanceFleets" ], "Resource": "arn:aws:elasticmapreduce:*:*:cluster/*" } ] }

modifyInstanceGroupByName

靜態資源

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "elasticmapreduce:ModifyInstanceGroups", "elasticmapreduce:ListInstanceGroups" ], "Resource": [ "arn:aws:elasticmapreduce:[[region]]:[[accountId]]:cluster/[[clusterId]]" ] } ] }

動態資源

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "elasticmapreduce:ModifyInstanceGroups", "elasticmapreduce:ListInstanceGroups" ], "Resource": "*" } ] }

terminateCluster

靜態資源

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "elasticmapreduce:TerminateJobFlows", "elasticmapreduce:DescribeCluster" ], "Resource": [ "arn:aws:elasticmapreduce:[[region]]:[[accountId]]:cluster/[[clusterId]]" ] } ] }

動態資源

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "elasticmapreduce:TerminateJobFlows", "elasticmapreduce:DescribeCluster" ], "Resource": "arn:aws:elasticmapreduce:*:*:cluster/*" } ] }