Risposta a eventi di capacità insufficiente delle istanze EMR del cluster Amazon Amazon - Amazon EMR

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Risposta a eventi di capacità insufficiente delle istanze EMR del cluster Amazon Amazon

Panoramica

EMRI cluster Amazon restituiscono il codice dell'evento EC2 provisioning - Insufficient Instance Capacity quando la zona di disponibilità selezionata non ha una capacità sufficiente per soddisfare la richiesta di avvio o ridimensionamento del cluster. L'evento viene emesso periodicamente sia con i gruppi di istanze che con le flotte di istanze se Amazon incontra EMR ripetutamente eccezioni di capacità insufficienti e non è in grado di soddisfare la tua richiesta di provisioning per un'operazione di avvio o ridimensionamento del cluster.

Questa pagina descrive come rispondere al meglio a questo tipo di evento quando si verifica per il tuo cluster. EMR

Risposta consigliata a un evento di capacità insufficiente

Ti consigliamo di rispondere a un evento di capacità insufficiente in uno dei seguenti modi:

  • Attendi il ripristino della capacità. La capacità cambia frequentemente, quindi un'eccezione di capacità insufficiente può essere ripristinata autonomamente. Il ridimensionamento dei cluster inizierà o terminerà non appena la EC2 capacità di Amazon sarà disponibile.

  • In alternativa, puoi terminare il cluster, modificare le configurazioni del tipo di istanza e creare un nuovo cluster con la richiesta di configurazione del cluster aggiornata. Per ulteriori informazioni, consulta Best practice per la flessibilità delle istanze e delle zone di disponibilità.

Puoi impostare regole o risposte automatiche a un evento di capacità insufficiente, come descritto nella sezione successiva.

Ripristino automatico da un evento di capacità insufficiente

Puoi creare automazione in risposta a EMR eventi Amazon come quelli con codice eventoEC2 provisioning - Insufficient Instance Capacity. Ad esempio, la seguente AWS Lambda funzione termina un EMR cluster con un gruppo di istanze che utilizza istanze On-Demand, quindi crea un nuovo EMR cluster con un gruppo di istanze che contiene tipi di istanze diversi rispetto alla richiesta originale.

Le seguenti condizioni attivano il processo automatizzato:

  • L'evento di capacità insufficiente viene emesso per i nodi primari o principali da più di 20 minuti.

  • Il cluster non è nello stato or. READYWAITING Per ulteriori informazioni sugli stati dei EMR cluster, vedereComprensione del ciclo di vita del cluster.

Nota

Quando crei un processo automatizzato per un'eccezione di capacità insufficiente, devi considerare che l'evento di capacità insufficiente è recuperabile. La capacità cambia spesso e i cluster riprenderanno il ridimensionamento o inizieranno a funzionare non appena la capacità di Amazon EC2 sarà disponibile.

Esempio funzione per rispondere a un evento di capacità insufficiente
// Lambda code with Python 3.10 and handler is lambda_function.lambda_handler // Note: related IAM role requires permission to use Amazon EMR import json import boto3 import datetime from datetime import timezone INSUFFICIENT_CAPACITY_EXCEPTION_DETAIL_TYPE = "EMR Instance Group Provisioning" INSUFFICIENT_CAPACITY_EXCEPTION_EVENT_CODE = ( "EC2 provisioning - Insufficient Instance Capacity" ) ALLOWED_INSTANCE_TYPES_TO_USE = [ "m5.xlarge", "c5.xlarge", "m5.4xlarge", "m5.2xlarge", "t3.xlarge", ] CLUSTER_START_ACCEPTABLE_STATES = ["WAITING", "RUNNING"] CLUSTER_START_SLA = 20 CLIENT = boto3.client("emr", region_name="us-east-1") # checks if the incoming event is 'EMR Instance Fleet Provisioning' with eventCode 'EC2 provisioning - Insufficient Instance Capacity' def is_insufficient_capacity_event(event): if not event["detail"]: return False else: return ( event["detail-type"] == INSUFFICIENT_CAPACITY_EXCEPTION_DETAIL_TYPE and event["detail"]["eventCode"] == INSUFFICIENT_CAPACITY_EXCEPTION_EVENT_CODE ) # checks if the cluster is eligible for termination def is_cluster_eligible_for_termination(event, describeClusterResponse): # instanceGroupType could be CORE, MASTER OR TASK instanceGroupType = event["detail"]["instanceGroupType"] clusterCreationTime = describeClusterResponse["Cluster"]["Status"]["Timeline"][ "CreationDateTime" ] clusterState = describeClusterResponse["Cluster"]["Status"]["State"] now = datetime.datetime.now() now = now.replace(tzinfo=timezone.utc) isClusterStartSlaBreached = clusterCreationTime < now - datetime.timedelta( minutes=CLUSTER_START_SLA ) # Check if instance group receiving Insufficient capacity exception is CORE or PRIMARY (MASTER), # and it's been more than 20 minutes since cluster was created but the cluster state and the cluster state is not updated to RUNNING or WAITING if ( (instanceGroupType == "CORE" or instanceGroupType == "MASTER") and isClusterStartSlaBreached and clusterState not in CLUSTER_START_ACCEPTABLE_STATES ): return True else: return False # Choose item from the list except the exempt value def choice_excluding(exempt): for i in ALLOWED_INSTANCE_TYPES_TO_USE: if i != exempt: return i # Create a new cluster by choosing different InstanceType. def create_cluster(event): # instanceGroupType cloud be CORE, MASTER OR TASK instanceGroupType = event["detail"]["instanceGroupType"] # Following two lines assumes that the customer that created the cluster already knows which instance types they use in original request instanceTypesFromOriginalRequestMaster = "m5.xlarge" instanceTypesFromOriginalRequestCore = "m5.xlarge" # Select new instance types to include in the new createCluster request instanceTypeForMaster = ( instanceTypesFromOriginalRequestMaster if instanceGroupType != "MASTER" else choice_excluding(instanceTypesFromOriginalRequestMaster) ) instanceTypeForCore = ( instanceTypesFromOriginalRequestCore if instanceGroupType != "CORE" else choice_excluding(instanceTypesFromOriginalRequestCore) ) print("Starting to create cluster...") instances = { "InstanceGroups": [ { "InstanceRole": "MASTER", "InstanceCount": 1, "InstanceType": instanceTypeForMaster, "Market": "ON_DEMAND", "Name": "Master", }, { "InstanceRole": "CORE", "InstanceCount": 1, "InstanceType": instanceTypeForCore, "Market": "ON_DEMAND", "Name": "Core", }, ] } response = CLIENT.run_job_flow( Name="Test Cluster", Instances=instances, VisibleToAllUsers=True, JobFlowRole="EMR_EC2_DefaultRole", ServiceRole="EMR_DefaultRole", ReleaseLabel="emr-6.10.0", ) return response["JobFlowId"] # Terminated the cluster using clusterId received in an event def terminate_cluster(event): print("Trying to terminate cluster, clusterId: " + event["detail"]["clusterId"]) response = CLIENT.terminate_job_flows(JobFlowIds=[event["detail"]["clusterId"]]) print(f"Terminate cluster response: {response}") def describe_cluster(event): response = CLIENT.describe_cluster(ClusterId=event["detail"]["clusterId"]) return response def lambda_handler(event, context): if is_insufficient_capacity_event(event): print( "Received insufficient capacity event for instanceGroup, clusterId: " + event["detail"]["clusterId"] ) describeClusterResponse = describe_cluster(event) shouldTerminateCluster = is_cluster_eligible_for_termination( event, describeClusterResponse ) if shouldTerminateCluster: terminate_cluster(event) clusterId = create_cluster(event) print("Created a new cluster, clusterId: " + clusterId) else: print( "Cluster is not eligible for termination, clusterId: " + event["detail"]["clusterId"] ) else: print("Received event is not insufficient capacity event, skipping")