Step Functions를 사용하여 Amazon EMR 클러스터를 생성하고 관리합니다. - AWS Step Functions

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

Step Functions를 사용하여 Amazon EMR 클러스터를 생성하고 관리합니다.

통합 방법 알아보기 AWS Step Functions 제공된 Amazon EMR 서비스 통합을 EMR 사용하여 Amazon과 APIs 협력합니다. 서비스 APIs 통합은 전달되는 필드와 반환되는 응답에 약간의 차이가 있다는 점을 제외하면 해당 EMR APIs Amazon과 유사합니다.

통합에 대해 알아보려면 AWS Step Functions의 서비스에 대해서는 및 을 참조하십시오 서비스 통합. Step API Functions에서 서비스에 파라미터 전달하기

최적화된 Amazon EMR 통합의 주요 기능
  • 최적화된 Amazon EMR 서비스 통합에는 아래에 설명된 것처럼 기본 EMR APIs Amazon을 감싸는 사용자 지정 세트가 있습니다. APIs 이 때문에 아마존과 크게 다릅니다. EMR AWS SDK서비스 통합.

  • 작업 실행(.sync) 통합 패턴이 지원됩니다.

Step Functions는 실행이 중지된 경우 Amazon EMR 클러스터를 자동으로 종료하지 않습니다. Amazon EMR 클러스터가 종료되기 전에 상태 머신이 중지되는 경우 클러스터는 무기한 계속 실행될 수 있으며 추가 요금이 발생할 수 있습니다. 이를 방지하려면 생성한 모든 Amazon EMR 클러스터가 제대로 종료되었는지 확인하십시오. 자세한 내용은 다음을 참조하세요.

참고

emr-5.28.0에 따라 클러스터를 만들 때 StepConcurrencyLevel 파라미터를 지정하여 단일 클러스터에서 여러 단계를 동시에 실행할 수 있습니다. Step Functions MapParallel 상태를 사용하여 동시에 작업을 클러스터에 제출할 수 있습니다.

Amazon EMR 서비스 통합의 가용성은 Amazon의 가용성에 따라 달라집니다 EMRAPIs. 특수 지역의 제한 사항은 Amazon EMR 설명서를 참조하십시오.

참고

Amazon과의 통합을 위해 Step Functions는 처음 10분EMR, 이후 300초 동안은 작업 폴링 빈도를 60초로 하드 코딩했습니다.

아마존 지원 EMR APIs

다음 표는 각 Amazon EMR 서비스 API 통합과 해당 Amazon 간의 차이점을 설명합니다 EMRAPIs.

아마존 EMR 서비스 통합 API 해당 EMR API 차이
createCluster

클러스터(작업 흐름)를 생성하고 실행을 시작합니다.

EMRAmazon은 서비스 연결 IAM 역할이라고 하는 고유한 유형의 역할에 직접 연결되어 있습니다. createClustercreateCluster.sync 작업을 수행하려면 서비스 연결 역할인 AWSServiceRoleForEMRCleanup을 생성하도록 구성된 필수 권한이 있어야 합니다. IAM권한 정책에 추가할 수 있는 설명을 포함하여 이에 대한 자세한 내용은 Amazon용 서비스 연결 역할 사용을 참조하십시오. EMR

runJobFlow createCluster다음을 제외하고 와 runJobFlow동일한 요청 구문을 사용합니다.
  • Instances.KeepJobFlowAliveWhenNoSteps 필드는 필수이며 부울 값 TRUE가 있어야 합니다.

  • Steps 필드는 허용되지 않습니다.

  • 이 필드는 Instances.InstanceFleets[index].Name 제공되어야 하며 선택적 modifyInstanceFleetByName 커넥터를 API 사용하는 경우 고유해야 합니다.

  • modifyInstanceGroupByName API 필드는 Instances.InstanceGroups[index].Name 제공되어야 하며, 선택 필드를 사용하는 경우 고유해야 합니다.

응답은 다음과 같습니다.
{ "ClusterId": "string" }
EMRAmazon은 다음을 사용합니다.
{ "JobFlowId": "string" }
createCluster.sync

클러스터(작업 흐름)를 생성하고 실행을 시작합니다.

runJobFlow createCluster와 동일하지만 클러스터가 WAITING 상태에 도달할 때까지 기다려야 합니다.
setClusterTermination보호

사용자 개입, API 호출 또는 작업 흐름 오류로 인해 클러스터의 EC2 인스턴스가 종료되지 않도록 클러스터 (작업 흐름) 를 잠급니다.

setTerminationProtection 요청은 다음을 사용합니다.
{ "ClusterId": "string" }
EMRAmazon은 다음을 사용합니다.
{ "JobFlowIds": ["string"] }
terminateCluster

클러스터(작업 흐름)를 종료합니다.

terminateJobFlows 요청은 다음을 사용합니다.
{ "ClusterId": "string" }
EMRAmazon은 다음을 사용합니다.
{ "JobFlowIds": ["string"] }
terminateCluster.sync

클러스터(작업 흐름)를 종료합니다.

terminateJobFlows terminateCluster와 동일하지만 클러스터가 종료될 때까지 기다려야 합니다.
addStep

실행 중인 클러스터에 새 단계를 추가합니다.

선택적으로, 이를 API 사용하는 동안 ExecutionRoleArn 파라미터를 지정할 수도 있습니다.

addJobFlow단계

요청에서 "ClusterId" 키를 사용합니다. 아마존이 EMR 사용합니다"JobFlowId". 요청은 단일 단계를 사용합니다.
{ "Step": <"StepConfig object"> }
EMRAmazon은 다음을 사용합니다.
{ "Steps": [<StepConfig objects>] }
응답은 다음과 같습니다.
{ "StepId": "string" }
Amazon은 다음을 EMR 반환합니다.
{ "StepIds": [<strings>] }
addStep.sync

실행 중인 클러스터에 새 단계를 추가합니다.

선택적으로, 이를 API 사용하는 동안 ExecutionRoleArn 파라미터를 지정할 수도 있습니다.

addJobFlow단계

addStep과 동일하지만 단계가 완료될 때까지 기다려야 합니다.
cancelStep

실행 중인 클러스터에서 보류 중인 단계를 취소합니다.

cancelSteps 요청은 다음을 사용합니다.
{ "StepId": "string" }
EMRAmazon은 다음을 사용합니다.
{ "StepIds": [<strings>] }
응답은 다음과 같습니다.
{ "CancelStepsInfo": <CancelStepsInfo object> }
EMRAmazon은 다음을 사용합니다.
{ "CancelStepsInfoList": [<CancelStepsInfo objects>] }
modifyInstanceFleetByName

지정된 InstanceFleetName을 사용하여 인스턴스 플릿에 대한 대상 온디맨드 및 대상 스팟 용량을 수정합니다.

modifyInstanceFleet 요청은 다음을 제외하고 modifyInstanceFleet과 동일합니다.
  • Instance.InstanceFleetId 필드는 허용되지 않습니다.

  • 런타임 시 InstanceFleetIdListInstanceFleets를 호출하고 결과를 구문 분석하여 서비스 통합에 의해 자동으로 결정됩니다.

modifyInstanceGroupByName

인스턴스 그룹의 구성 설정 및 노드 수를 수정합니다.

modifyInstanceGroups 요청은 다음과 같습니다.
{ "ClusterId": "string", "InstanceGroup": <InstanceGroupModifyConfig object> }
EMRAmazon은 다음 목록을 사용합니다.
{ "ClusterId": ["string"], "InstanceGroups": [<InstanceGroupModifyConfig objects>] }

InstanceGroupModifyConfig 객체 내에서 InstanceGroupId 필드는 허용되지 않습니다.

새 필드 InstanceGroupName이 추가되었습니다. 런타임 시 InstanceGroupIdListInstanceGroups를 호출하고 결과를 구문 분석하여 서비스 통합에 의해 자동으로 결정됩니다.

워크플로 예제

다음은 클러스터를 생성하는 Task 상태를 포함합니다.

"Create_Cluster": { "Type": "Task", "Resource": "arn:aws:states:::elasticmapreduce:createCluster.sync", "Parameters": { "Name": "MyWorkflowCluster", "VisibleToAllUsers": true, "ReleaseLabel": "emr-5.28.0", "Applications": [ { "Name": "Hive" } ], "ServiceRole": "EMR_DefaultRole", "JobFlowRole": "EMR_EC2_DefaultRole", "LogUri": "s3n://aws-logs-123456789012-us-east-1/elasticmapreduce/", "Instances": { "KeepJobFlowAliveWhenNoSteps": true, "InstanceFleets": [ { "InstanceFleetType": "MASTER", "Name": "MASTER", "TargetOnDemandCapacity": 1, "InstanceTypeConfigs": [ { "InstanceType": "m4.xlarge" } ] }, { "InstanceFleetType": "CORE", "Name": "CORE", "TargetOnDemandCapacity": 1, "InstanceTypeConfigs": [ { "InstanceType": "m4.xlarge" } ] } ] } }, "End": true }

다음은 종료 보호를 활성화하는 Task 상태를 포함합니다.

"Enable_Termination_Protection": { "Type": "Task", "Resource": "arn:aws:states:::elasticmapreduce:setClusterTerminationProtection", "Parameters": { "ClusterId.$": "$.ClusterId", "TerminationProtected": true }, "End": true }

다음은 클러스터에 단계를 제출하는 Task 상태를 포함합니다.

"Step_One": { "Type": "Task", "Resource": "arn:aws:states:::elasticmapreduce:addStep.sync", "Parameters": { "ClusterId.$": "$.ClusterId", "ExecutionRoleArn": "arn:aws:iam::123456789012:role/myEMR-execution-role", "Step": { "Name": "The first step", "ActionOnFailure": "CONTINUE", "HadoopJarStep": { "Jar": "command-runner.jar", "Args": [ "hive-script", "--run-hive-script", "--args", "-f", "s3://<region>.elasticmapreduce.samples/cloudfront/code/Hive_CloudFront.q", "-d", "INPUT=s3://<region>.elasticmapreduce.samples", "-d", "OUTPUT=s3://<amzn-s3-demo-bucket>/MyHiveQueryResults/" ] } } }, "End": true }

다음은 단계를 취소하는 Task 상태를 포함합니다.

"Cancel_Step_One": { "Type": "Task", "Resource": "arn:aws:states:::elasticmapreduce:cancelStep", "Parameters": { "ClusterId.$": "$.ClusterId", "StepId.$": "$.AddStepsResult.StepId" }, "End": true }

다음은 클러스터를 종료하는 Task 상태를 포함합니다.

"Terminate_Cluster": { "Type": "Task", "Resource": "arn:aws:states:::elasticmapreduce:terminateCluster.sync", "Parameters": { "ClusterId.$": "$.ClusterId" }, "End": true }

다음은 인스턴스 그룹에 대해 클러스터를 확장 또는 축소하는 Task 상태를 포함합니다.

"ModifyInstanceGroupByName": { "Type": "Task", "Resource": "arn:aws:states:::elasticmapreduce:modifyInstanceGroupByName", "Parameters": { "ClusterId": "j-1234567890123", "InstanceGroupName": "MyCoreGroup", "InstanceGroup": { "InstanceCount": 8 } }, "End": true }

다음은 인스턴스 플릿에 대해 클러스터를 확장 또는 축소하는 Task 상태를 포함합니다.

"ModifyInstanceFleetByName": { "Type": "Task", "Resource": "arn:aws:states:::elasticmapreduce:modifyInstanceFleetByName", "Parameters": { "ClusterId": "j-1234567890123", "InstanceFleetName": "MyCoreFleet", "InstanceFleet": { "TargetOnDemandCapacity": 8, "TargetSpotCapacity": 0 } }, "End": true }

IAM아마존 통화 정책 EMR

다음 예제 템플릿은 방법을 보여줍니다. AWS Step Functions 스테이트 머신 정의의 리소스를 기반으로 IAM 정책을 생성합니다. 자세한 내용은 Step Functions가 통합 서비스를 위한 IAM 정책을 생성하는 방법Step Functions에서 서비스 통합 패턴을 살펴보세요 단원을 참조하세요.

addStep

정적 리소스

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "elasticmapreduce:AddJobFlowSteps", "elasticmapreduce:DescribeStep", "elasticmapreduce:CancelSteps" ], "Resource": [ "arn:aws:elasticmapreduce:[[region]]:[[accountId]]:cluster/[[clusterId]]" ] } ] }

동적 리소스

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "elasticmapreduce:AddJobFlowSteps", "elasticmapreduce:DescribeStep", "elasticmapreduce:CancelSteps" ], "Resource": "arn:aws:elasticmapreduce:*:*:cluster/*" } ] }

cancelStep

정적 리소스

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": "elasticmapreduce:CancelSteps", "Resource": [ "arn:aws:elasticmapreduce:[[region]]:[[accountId]]:cluster/[[clusterId]]" ] } ] }

동적 리소스

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": "elasticmapreduce:CancelSteps", "Resource": "arn:aws:elasticmapreduce:*:*:cluster/*" } ] }

createCluster

정적 리소스

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "elasticmapreduce:RunJobFlow", "elasticmapreduce:DescribeCluster", "elasticmapreduce:TerminateJobFlows" ], "Resource": "*" }, { "Effect": "Allow", "Action": "iam:PassRole", "Resource": [ "arn:aws:iam::{{account}}:role/[[roleName]]" ] } ] }

setClusterTerminationProtection

정적 리소스

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": "elasticmapreduce:SetTerminationProtection", "Resource": [ "arn:aws:elasticmapreduce:[[region]]:[[accountId]]:cluster/[[clusterId]]" ] } ] }

동적 리소스

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": "elasticmapreduce:SetTerminationProtection", "Resource": "arn:aws:elasticmapreduce:*:*:cluster/*" } ] }

modifyInstanceFleetByName

정적 리소스

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "elasticmapreduce:ModifyInstanceFleet", "elasticmapreduce:ListInstanceFleets" ], "Resource": [ "arn:aws:elasticmapreduce:[[region]]:[[accountId]]:cluster/[[clusterId]]" ] } ] }

동적 리소스

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "elasticmapreduce:ModifyInstanceFleet", "elasticmapreduce:ListInstanceFleets" ], "Resource": "arn:aws:elasticmapreduce:*:*:cluster/*" } ] }

modifyInstanceGroupByName

정적 리소스

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "elasticmapreduce:ModifyInstanceGroups", "elasticmapreduce:ListInstanceGroups" ], "Resource": [ "arn:aws:elasticmapreduce:[[region]]:[[accountId]]:cluster/[[clusterId]]" ] } ] }

동적 리소스

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "elasticmapreduce:ModifyInstanceGroups", "elasticmapreduce:ListInstanceGroups" ], "Resource": "*" } ] }

terminateCluster

정적 리소스

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "elasticmapreduce:TerminateJobFlows", "elasticmapreduce:DescribeCluster" ], "Resource": [ "arn:aws:elasticmapreduce:[[region]]:[[accountId]]:cluster/[[clusterId]]" ] } ] }

동적 리소스

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "elasticmapreduce:TerminateJobFlows", "elasticmapreduce:DescribeCluster" ], "Resource": "arn:aws:elasticmapreduce:*:*:cluster/*" } ] }