使用 Step Functions 创建和管理亚马逊EMR集群 - AWS Step Functions

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用 Step Functions 创建和管理亚马逊EMR集群

学习如何集成 AWS Step Functions EMR使用亚马逊提供的亚马逊EMR服务集成APIs。服务集成与相应APIs的 Amazon 类似 EMRAPIs,但传递的字段和返回的响应有所不同。

要了解如何与集成 AWS Step Functions 中的服务,参见集成 服务和。在 Step Functions API 中向服务传递参数

优化 Amazon EMR 集成的主要功能
  • 优化的亚马逊EMR服务集成有一组自定义的APIs包裹底层的亚马逊 EMRAPIs,如下所述。因此,它与Amazon有很大不同 EMR AWS SDK服务集成。

  • 支持运行作业 (.sync) 集成模式。

如果执行停止,Step Functions 不会自动终止亚马逊EMR集群。如果您的状态机在您的 Amazon EMR 集群终止之前停止,则您的集群可能会无限期地继续运行,并且可能会产生额外费用。为避免这种情况,请确保正确终止您创建的任何 Amazon EMR 集群。有关更多信息,请参阅:

注意

emr-5.28.0 起,您可以在创建集群时指定参数 StepConcurrencyLevel,以允许在单个集群上并行运行多个步骤。您可以使用 Step Functions MapParallel 状态将工作并行提交到集群。

亚马逊EMR服务集成的可用性视亚马逊的可用性而定EMRAPIs。有关特殊地区的限制,请参阅 Amazon EMR 文档。

注意

为了与亚马逊集成EMR,Step Functions在之后的前10分300秒内具有硬编码的60秒作业轮询频率。

支持 Amazon EMR APIs

下表描述了每个亚马逊EMR服务集成API与相应亚马逊服务之间的区别EMRAPIs。

亚马逊EMR服务集成 API 相应的 EMR API 差异
createCluster

创建并开始运行集群(作业流程)。

EMRAmazon 与一种称为服务相关IAM角色的独特角色直接关联。要使 createClustercreateCluster.sync 起作用,您必须配置必要的权限以创建与服务关联的角色 AWSServiceRoleForEMRCleanup。有关这方面的更多信息,包括您可以添加到IAM权限策略中的声明,请参阅使用亚马逊EMR的服务相关角色

runJobFlow createCluster使用与相同的请求语法 runJobFlow,但以下语法除外:
  • 必须填写 Instances.KeepJobFlowAliveWhenNoSteps 字段,且该字段必须具有 Boolean 值 TRUE

  • 不允许填写字段 Steps

  • 如果使用可选modifyInstanceFleetByName连接器,则Instances.InstanceFleets[index].Name应提供该字段,且该字段必须API是唯一的。

  • Instances.InstanceGroups[index].Name应提供该字段,如果使用可选字段,则该字段必须modifyInstanceGroupByNameAPI是唯一的。

响应如下:
{ "ClusterId": "string" }
Amazon EMR 使用这个:
{ "JobFlowId": "string" }
createCluster.sync

创建并开始运行集群(作业流程)。

runJobFlow createCluster 相同,但等待集群达到 WAITING 状态。
setClusterTermination保护

锁定集群(任务流),这样集群中的EC2实例就不会因为用户干预、API呼叫或任务流错误而终止。

setTerminationProtection 请求使用:
{ "ClusterId": "string" }
Amazon EMR 使用这个:
{ "JobFlowIds": ["string"] }
terminateCluster

关闭集群(作业流程)。

terminateJobFlows 请求使用:
{ "ClusterId": "string" }
Amazon EMR 使用这个:
{ "JobFlowIds": ["string"] }
terminateCluster.sync

关闭集群(作业流程)。

terminateJobFlows terminateCluster 相同,但等待集群终止。
addStep

向正在运行的集群添加新步骤。

或者,您也可以在使用此ExecutionRoleArn参数时指定参数API。

addJobFlow步骤

请求使用密钥 "ClusterId"。亚马逊EMR使用"JobFlowId"。请求使用单一步骤。
{ "Step": <"StepConfig object"> }
Amazon EMR 使用这个:
{ "Steps": [<StepConfig objects>] }
响应如下:
{ "StepId": "string" }
Amazon EMR 退回了这个:
{ "StepIds": [<strings>] }
addStep.sync

向正在运行的集群添加新步骤。

或者,您也可以在使用此ExecutionRoleArn参数时指定参数API。

addJobFlow步骤

addStep 相同,但等待步骤完成。
cancelStep

取消正在运行的集群中的一个待处理步骤。

cancelSteps 请求使用:
{ "StepId": "string" }
Amazon EMR 使用这个:
{ "StepIds": [<strings>] }
响应如下:
{ "CancelStepsInfo": <CancelStepsInfo object> }
Amazon EMR 使用这个:
{ "CancelStepsInfoList": [<CancelStepsInfo objects>] }
modifyInstanceFleetByName

使用指定的 InstanceFleetName 修改实例队列的目标按需容量和目标 Spot 容量。

modifyInstanceFleet 请求与 modifyInstanceFleet 相同,但以下情况除外:
  • 不允许填写字段 Instance.InstanceFleetId

  • 在运行时,通过调用 ListInstanceFleets 并解析结果,服务集成会自动确定 InstanceFleetId

modifyInstanceGroupByName

修改实例组的节点数和配置设置。

modifyInstanceGroups 请求如下:
{ "ClusterId": "string", "InstanceGroup": <InstanceGroupModifyConfig object> }
Amazon EMR 使用清单:
{ "ClusterId": ["string"], "InstanceGroups": [<InstanceGroupModifyConfig objects>] }

InstanceGroupModifyConfig 对象中,不允许填写 InstanceGroupId 字段。

已添加一个新字段 InstanceGroupName。在运行时,通过调用 ListInstanceGroups 并解析结果,服务集成会自动确定 InstanceGroupId

工作流程示例

以下内容包含一个创建集群的 Task 状态。

"Create_Cluster": { "Type": "Task", "Resource": "arn:aws:states:::elasticmapreduce:createCluster.sync", "Parameters": { "Name": "MyWorkflowCluster", "VisibleToAllUsers": true, "ReleaseLabel": "emr-5.28.0", "Applications": [ { "Name": "Hive" } ], "ServiceRole": "EMR_DefaultRole", "JobFlowRole": "EMR_EC2_DefaultRole", "LogUri": "s3n://aws-logs-123456789012-us-east-1/elasticmapreduce/", "Instances": { "KeepJobFlowAliveWhenNoSteps": true, "InstanceFleets": [ { "InstanceFleetType": "MASTER", "Name": "MASTER", "TargetOnDemandCapacity": 1, "InstanceTypeConfigs": [ { "InstanceType": "m4.xlarge" } ] }, { "InstanceFleetType": "CORE", "Name": "CORE", "TargetOnDemandCapacity": 1, "InstanceTypeConfigs": [ { "InstanceType": "m4.xlarge" } ] } ] } }, "End": true }

以下内容包括启用终止保护的 Task 状态。

"Enable_Termination_Protection": { "Type": "Task", "Resource": "arn:aws:states:::elasticmapreduce:setClusterTerminationProtection", "Parameters": { "ClusterId.$": "$.ClusterId", "TerminationProtected": true }, "End": true }

以下内容包括向集群提交步骤的 Task 状态。

"Step_One": { "Type": "Task", "Resource": "arn:aws:states:::elasticmapreduce:addStep.sync", "Parameters": { "ClusterId.$": "$.ClusterId", "ExecutionRoleArn": "arn:aws:iam::123456789012:role/myEMR-execution-role", "Step": { "Name": "The first step", "ActionOnFailure": "CONTINUE", "HadoopJarStep": { "Jar": "command-runner.jar", "Args": [ "hive-script", "--run-hive-script", "--args", "-f", "s3://<region>.elasticmapreduce.samples/cloudfront/code/Hive_CloudFront.q", "-d", "INPUT=s3://<region>.elasticmapreduce.samples", "-d", "OUTPUT=s3://<amzn-s3-demo-bucket>/MyHiveQueryResults/" ] } } }, "End": true }

以下内容包括取消步骤的 Task 状态。

"Cancel_Step_One": { "Type": "Task", "Resource": "arn:aws:states:::elasticmapreduce:cancelStep", "Parameters": { "ClusterId.$": "$.ClusterId", "StepId.$": "$.AddStepsResult.StepId" }, "End": true }

以下内容包括终止集群的 Task 状态。

"Terminate_Cluster": { "Type": "Task", "Resource": "arn:aws:states:::elasticmapreduce:terminateCluster.sync", "Parameters": { "ClusterId.$": "$.ClusterId" }, "End": true }

以下内容包括为实例组向上或向下扩展集群的 Task 状态。

"ModifyInstanceGroupByName": { "Type": "Task", "Resource": "arn:aws:states:::elasticmapreduce:modifyInstanceGroupByName", "Parameters": { "ClusterId": "j-1234567890123", "InstanceGroupName": "MyCoreGroup", "InstanceGroup": { "InstanceCount": 8 } }, "End": true }

以下内容包括为实例队列向上或向下扩展集群的 Task 状态。

"ModifyInstanceFleetByName": { "Type": "Task", "Resource": "arn:aws:states:::elasticmapreduce:modifyInstanceFleetByName", "Parameters": { "ClusterId": "j-1234567890123", "InstanceFleetName": "MyCoreFleet", "InstanceFleet": { "TargetOnDemandCapacity": 8, "TargetSpotCapacity": 0 } }, "End": true }

IAM致电 Amazon 的政策 EMR

以下示例模板演示了如何操作 AWS Step Functions 根据状态机定义中的资源生成IAM策略。有关更多信息,请参阅Step Functions 如何为集成服务生成IAM策略在 Step Functions 中探索服务集成模式

addStep

静态资源

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "elasticmapreduce:AddJobFlowSteps", "elasticmapreduce:DescribeStep", "elasticmapreduce:CancelSteps" ], "Resource": [ "arn:aws:elasticmapreduce:[[region]]:[[accountId]]:cluster/[[clusterId]]" ] } ] }

动态资源

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "elasticmapreduce:AddJobFlowSteps", "elasticmapreduce:DescribeStep", "elasticmapreduce:CancelSteps" ], "Resource": "arn:aws:elasticmapreduce:*:*:cluster/*" } ] }

cancelStep

静态资源

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": "elasticmapreduce:CancelSteps", "Resource": [ "arn:aws:elasticmapreduce:[[region]]:[[accountId]]:cluster/[[clusterId]]" ] } ] }

动态资源

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": "elasticmapreduce:CancelSteps", "Resource": "arn:aws:elasticmapreduce:*:*:cluster/*" } ] }

createCluster

静态资源

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "elasticmapreduce:RunJobFlow", "elasticmapreduce:DescribeCluster", "elasticmapreduce:TerminateJobFlows" ], "Resource": "*" }, { "Effect": "Allow", "Action": "iam:PassRole", "Resource": [ "arn:aws:iam::{{account}}:role/[[roleName]]" ] } ] }

setClusterTerminationProtection

静态资源

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": "elasticmapreduce:SetTerminationProtection", "Resource": [ "arn:aws:elasticmapreduce:[[region]]:[[accountId]]:cluster/[[clusterId]]" ] } ] }

动态资源

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": "elasticmapreduce:SetTerminationProtection", "Resource": "arn:aws:elasticmapreduce:*:*:cluster/*" } ] }

modifyInstanceFleetByName

静态资源

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "elasticmapreduce:ModifyInstanceFleet", "elasticmapreduce:ListInstanceFleets" ], "Resource": [ "arn:aws:elasticmapreduce:[[region]]:[[accountId]]:cluster/[[clusterId]]" ] } ] }

动态资源

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "elasticmapreduce:ModifyInstanceFleet", "elasticmapreduce:ListInstanceFleets" ], "Resource": "arn:aws:elasticmapreduce:*:*:cluster/*" } ] }

modifyInstanceGroupByName

静态资源

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "elasticmapreduce:ModifyInstanceGroups", "elasticmapreduce:ListInstanceGroups" ], "Resource": [ "arn:aws:elasticmapreduce:[[region]]:[[accountId]]:cluster/[[clusterId]]" ] } ] }

动态资源

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "elasticmapreduce:ModifyInstanceGroups", "elasticmapreduce:ListInstanceGroups" ], "Resource": "*" } ] }

terminateCluster

静态资源

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "elasticmapreduce:TerminateJobFlows", "elasticmapreduce:DescribeCluster" ], "Resource": [ "arn:aws:elasticmapreduce:[[region]]:[[accountId]]:cluster/[[clusterId]]" ] } ] }

动态资源

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "elasticmapreduce:TerminateJobFlows", "elasticmapreduce:DescribeCluster" ], "Resource": "arn:aws:elasticmapreduce:*:*:cluster/*" } ] }