Amazon EMR 클러스터 인스턴스 플릿 크기 조정 제한 시간 초과 이벤트에 대응 - 아마존 EMR

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

Amazon EMR 클러스터 인스턴스 플릿 크기 조정 제한 시간 초과 이벤트에 대응

개요

Amazon EMR 클러스터는 인스턴스 플릿 클러스터의 크기 조정 작업을 실행하는 동안 이벤트를 내보냅니다. 프로비저닝 타임아웃 이벤트는 Amazon이 타임아웃이 만료된 후 플릿에 대한 스팟 또는 온디맨드 용량 프로비저닝을 EMR 중단할 때 발생합니다. 제한 시간은 인스턴스 플릿의 크기 조정 사양의 일부로 사용자가 구성할 수 있습니다. 동일한 인스턴스 집합의 크기가 연속적으로 조정되는 시나리오에서 EMR Amazon은 현재 크기 조정 작업의 제한 시간이 만료되면 Spot provisioning timeout - continuing resize 또는 On-Demand provisioning timeout - continuing resize 이벤트를 발생시킵니다. 그런 다음 플릿의 다음 크기 조정 작업을 위한 용량 프로비저닝을 시작합니다.

인스턴스 플릿 크기 조정 제한 시간 이벤트에 대한 대응

프로비저닝 제한 시간 이벤트에 대한 다음 방법 중 하나를 사용하여 대응하는 것이 좋습니다.

  • 크기 조정 사양을 다시 확인하고 크기 조정 작업을 다시 시도합니다. 용량이 자주 변경되므로 Amazon EC2 용량을 사용할 수 있게 되는 즉시 클러스터의 크기가 성공적으로 조정됩니다. 더 엄격한 조건을 요구하는 작업의 경우 제한 시간을 더 낮은 값으로 구성하는 것이 좋습니다. SLAs

  • 또는 다음 중 하나를 수행할 수 있습니다.

  • 프로비저닝 제한 시간 - 크기 조정 계속 이벤트의 경우 크기 조정 작업이 처리될 때까지 더 기다릴 수 있습니다. EMRAmazon은 구성된 크기 조정 사양에 따라 플릿에 대해 트리거된 크기 조정 작업을 계속해서 순차적으로 처리할 것입니다.

다음 섹션의 설명에 따라 이 이벤트에 대한 규칙이나 자동 대응을 설정할 수도 있습니다.

프로비저닝 제한 시간 이벤트에서 자동 복구

이벤트 코드를 사용하여 Amazon EMR 이벤트에 대한 응답으로 자동화를 구축할 수 있습니다. Spot Provisioning timeout 예를 들어 다음 AWS Lambda 함수는 스팟 인스턴스를 태스크 노드로 사용하는 인스턴스 플릿이 있는 EMR 클러스터를 종료한 다음 원래 요청보다 더 다양한 인스턴스 유형을 포함하는 인스턴스 플릿으로 새 EMR 클러스터를 생성합니다. 이 예제에서 태스크 노드에 대해 생성된 Spot Provisioning timeout 이벤트는 Lambda 함수의 실행을 트리거합니다.

Spot Provisioning timeout 이벤트에 대응하는 예제 함수
// Lambda code with Python 3.10 and handler is lambda_function.lambda_handler // Note: related IAM role requires permission to use Amazon EMR import json import boto3 import datetime from datetime import timezone SPOT_PROVISIONING_TIMEOUT_EXCEPTION_DETAIL_TYPE = "EMR Instance Fleet Resize" SPOT_PROVISIONING_TIMEOUT_EXCEPTION_EVENT_CODE = ( "Spot Provisioning timeout" ) CLIENT = boto3.client("emr", region_name="us-east-1") # checks if the incoming event is 'EMR Instance Fleet Resize' with eventCode 'Spot provisioning timeout' def is_spot_provisioning_timeout_event(event): if not event["detail"]: return False else: return ( event["detail-type"] == SPOT_PROVISIONING_TIMEOUT_EXCEPTION_DETAIL_TYPE and event["detail"]["eventCode"] == SPOT_PROVISIONING_TIMEOUT_EXCEPTION_EVENT_CODE ) # checks if the cluster is eligible for termination def is_cluster_eligible_for_termination(event, describeClusterResponse): # instanceFleetType could be CORE, MASTER OR TASK instanceFleetType = event["detail"]["instanceFleetType"] # Check if instance fleet receiving Spot provisioning timeout event is TASK if (instanceFleetType == "TASK"): return True else: return False # create a new cluster by choosing different InstanceType. def create_cluster(event): # instanceFleetType cloud be CORE, MASTER OR TASK instanceFleetType = event["detail"]["instanceFleetType"] # the following two lines assumes that the customer that created the cluster already knows which instance types they use in original request instanceTypesFromOriginalRequestMaster = "m5.xlarge" instanceTypesFromOriginalRequestCore = "m5.xlarge" # select new instance types to include in the new createCluster request instanceTypesForTask = [ "m5.xlarge", "m5.2xlarge", "m5.4xlarge", "m5.8xlarge", "m5.12xlarge" ] print("Starting to create cluster...") instances = { "InstanceFleets": [ { "InstanceFleetType":"MASTER", "TargetOnDemandCapacity":1, "TargetSpotCapacity":0, "InstanceTypeConfigs":[ { 'InstanceType': instanceTypesFromOriginalRequestMaster, "WeightedCapacity":1, } ] }, { "InstanceFleetType":"CORE", "TargetOnDemandCapacity":1, "TargetSpotCapacity":0, "InstanceTypeConfigs":[ { 'InstanceType': instanceTypesFromOriginalRequestCore, "WeightedCapacity":1, } ] }, { "InstanceFleetType":"TASK", "TargetOnDemandCapacity":0, "TargetSpotCapacity":100, "LaunchSpecifications":{}, "InstanceTypeConfigs":[ { 'InstanceType': instanceTypesForTask[0], "WeightedCapacity":1, }, { 'InstanceType': instanceTypesForTask[1], "WeightedCapacity":2, }, { 'InstanceType': instanceTypesForTask[2], "WeightedCapacity":4, }, { 'InstanceType': instanceTypesForTask[3], "WeightedCapacity":8, }, { 'InstanceType': instanceTypesForTask[4], "WeightedCapacity":12, } ], "ResizeSpecifications": { "SpotResizeSpecification": { "TimeoutDurationMinutes": 30 } } } ] } response = CLIENT.run_job_flow( Name="Test Cluster", Instances=instances, VisibleToAllUsers=True, JobFlowRole="EMR_EC2_DefaultRole", ServiceRole="EMR_DefaultRole", ReleaseLabel="emr-6.10.0", ) return response["JobFlowId"] # terminated the cluster using clusterId received in an event def terminate_cluster(event): print("Trying to terminate cluster, clusterId: " + event["detail"]["clusterId"]) response = CLIENT.terminate_job_flows(JobFlowIds=[event["detail"]["clusterId"]]) print(f"Terminate cluster response: {response}") def describe_cluster(event): response = CLIENT.describe_cluster(ClusterId=event["detail"]["clusterId"]) return response def lambda_handler(event, context): if is_spot_provisioning_timeout_event(event): print( "Received spot provisioning timeout event for instanceFleet, clusterId: " + event["detail"]["clusterId"] ) describeClusterResponse = describe_cluster(event) shouldTerminateCluster = is_cluster_eligible_for_termination( event, describeClusterResponse ) if shouldTerminateCluster: terminate_cluster(event) clusterId = create_cluster(event) print("Created a new cluster, clusterId: " + clusterId) else: print( "Cluster is not eligible for termination, clusterId: " + event["detail"]["clusterId"] ) else: print("Received event is not spot provisioning timeout event, skipping")