Responding to Amazon EMR cluster insufficient instance capacity events
Overview
Amazon EMR clusters return the event code EC2 provisioning - Insufficient
Instance Capacity
when the selected Availability Zone doesn't have enough
capacity to fulfill your cluster start or resize request. The event emits
periodically with both instance groups and instance fleets if Amazon EMR repeatedly
encounters insufficient capacity exceptions and can't fulfill your provisioning
request for a cluster start or cluster resize operation.
This page describes how you can best respond to this event type when it occurs for your EMR cluster.
Recommended response to an insufficient capacity event
We recommend that you respond to an insufficient-capacity event in one of the following ways:
-
Wait for capacity to recover. Capacity shifts frequently, so an insufficient capacity exception can recover on its own. Your clusters will start or finish resizing as soon as Amazon EC2 capacity becomes available.
-
Alternatively, you can terminate your cluster, modify your instance type configurations, and create a new cluster with the updated cluster configuration request. For more information, see Availability Zone flexibility for an Amazon EMR cluster.
You can also set up rules or automated responses to an insufficient capacity event, as described in the next section.
Automated recovery from an insufficient capacity event
You can build automation in response to Amazon EMR events such as the ones with event
code EC2 provisioning - Insufficient Instance Capacity
. For example,
the following AWS Lambda function terminates an EMR cluster with an instance group
that uses On-Demand instances, and then creates a new EMR cluster with an instance
group that contains different instance types than the original request.
The following conditions trigger the automated process to occur:
-
The insufficient capacity event has been emitting for primary or core nodes for more than 20 minutes.
-
The cluster is not in a READY or WAITING state. For more information about EMR cluster states, see Understanding the cluster lifecycle.
Note
When you build an automated process for an insufficient capacity exception, you should consider that the insufficient capacity event is recoverable. Capacity often shifts and your clusters will resume the resize or start operation as soon as Amazon EC2 capacity becomes available.
Example function to respond to insufficient capacity event
// Lambda code with Python 3.10 and handler is lambda_function.lambda_handler // Note: related IAM role requires permission to use Amazon EMR import json import boto3 import datetime from datetime import timezone INSUFFICIENT_CAPACITY_EXCEPTION_DETAIL_TYPE = "EMR Instance Group Provisioning" INSUFFICIENT_CAPACITY_EXCEPTION_EVENT_CODE = ( "EC2 provisioning - Insufficient Instance Capacity" ) ALLOWED_INSTANCE_TYPES_TO_USE = [ "m5.xlarge", "c5.xlarge", "m5.4xlarge", "m5.2xlarge", "t3.xlarge", ] CLUSTER_START_ACCEPTABLE_STATES = ["WAITING", "RUNNING"] CLUSTER_START_SLA = 20 CLIENT = boto3.client("emr", region_name="us-east-1") # checks if the incoming event is 'EMR Instance Fleet Provisioning' with eventCode 'EC2 provisioning - Insufficient Instance Capacity' def is_insufficient_capacity_event(event): if not event["detail"]: return False else: return ( event["detail-type"] == INSUFFICIENT_CAPACITY_EXCEPTION_DETAIL_TYPE and event["detail"]["eventCode"] == INSUFFICIENT_CAPACITY_EXCEPTION_EVENT_CODE ) # checks if the cluster is eligible for termination def is_cluster_eligible_for_termination(event, describeClusterResponse): # instanceGroupType could be CORE, MASTER OR TASK instanceGroupType = event["detail"]["instanceGroupType"] clusterCreationTime = describeClusterResponse["Cluster"]["Status"]["Timeline"][ "CreationDateTime" ] clusterState = describeClusterResponse["Cluster"]["Status"]["State"] now = datetime.datetime.now() now = now.replace(tzinfo=timezone.utc) isClusterStartSlaBreached = clusterCreationTime < now - datetime.timedelta( minutes=CLUSTER_START_SLA ) # Check if instance group receiving Insufficient capacity exception is CORE or PRIMARY (MASTER), # and it's been more than 20 minutes since cluster was created but the cluster state and the cluster state is not updated to RUNNING or WAITING if ( (instanceGroupType == "CORE" or instanceGroupType == "MASTER") and isClusterStartSlaBreached and clusterState not in CLUSTER_START_ACCEPTABLE_STATES ): return True else: return False # Choose item from the list except the exempt value def choice_excluding(exempt): for i in ALLOWED_INSTANCE_TYPES_TO_USE: if i != exempt: return i # Create a new cluster by choosing different InstanceType. def create_cluster(event): # instanceGroupType cloud be CORE, MASTER OR TASK instanceGroupType = event["detail"]["instanceGroupType"] # Following two lines assumes that the customer that created the cluster already knows which instance types they use in original request instanceTypesFromOriginalRequestMaster = "m5.xlarge" instanceTypesFromOriginalRequestCore = "m5.xlarge" # Select new instance types to include in the new createCluster request instanceTypeForMaster = ( instanceTypesFromOriginalRequestMaster if instanceGroupType != "MASTER" else choice_excluding(instanceTypesFromOriginalRequestMaster) ) instanceTypeForCore = ( instanceTypesFromOriginalRequestCore if instanceGroupType != "CORE" else choice_excluding(instanceTypesFromOriginalRequestCore) ) print("Starting to create cluster...") instances = { "InstanceGroups": [ { "InstanceRole": "MASTER", "InstanceCount": 1, "InstanceType": instanceTypeForMaster, "Market": "ON_DEMAND", "Name": "Master", }, { "InstanceRole": "CORE", "InstanceCount": 1, "InstanceType": instanceTypeForCore, "Market": "ON_DEMAND", "Name": "Core", }, ] } response = CLIENT.run_job_flow( Name="Test Cluster", Instances=instances, VisibleToAllUsers=True, JobFlowRole="EMR_EC2_DefaultRole", ServiceRole="EMR_DefaultRole", ReleaseLabel="emr-6.10.0", ) return response["JobFlowId"] # Terminated the cluster using clusterId received in an event def terminate_cluster(event): print("Trying to terminate cluster, clusterId: " + event["detail"]["clusterId"]) response = CLIENT.terminate_job_flows(JobFlowIds=[event["detail"]["clusterId"]]) print(f"Terminate cluster response: {response}") def describe_cluster(event): response = CLIENT.describe_cluster(ClusterId=event["detail"]["clusterId"]) return response def lambda_handler(event, context): if is_insufficient_capacity_event(event): print( "Received insufficient capacity event for instanceGroup, clusterId: " + event["detail"]["clusterId"] ) describeClusterResponse = describe_cluster(event) shouldTerminateCluster = is_cluster_eligible_for_termination( event, describeClusterResponse ) if shouldTerminateCluster: terminate_cluster(event) clusterId = create_cluster(event) print("Created a new cluster, clusterId: " + clusterId) else: print( "Cluster is not eligible for termination, clusterId: " + event["detail"]["clusterId"] ) else: print("Received event is not insufficient capacity event, skipping")