Respondendo aos eventos de tempo limite de redimensionamento da frota de instâncias de EMR cluster da Amazon - Amazon EMR

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Respondendo aos eventos de tempo limite de redimensionamento da frota de instâncias de EMR cluster da Amazon

Visão geral

EMROs clusters da Amazon emitem eventos durante a execução da operação de redimensionamento, por exemplo, clusters de frotas. Os eventos de tempo limite de provisionamento são emitidos quando a EMR Amazon interrompe o provisionamento de capacidade spot ou sob demanda para a frota após o tempo limite expirar. O usuário pode configurar a duração do tempo limite como parte das especificações de redimensionamento das frotas de instâncias. Em cenários de redimensionamento consecutivo para a mesma frota de instâncias, a Amazon EMR emite os On-Demand provisioning timeout - continuing resize eventos Spot provisioning timeout - continuing resize ou quando o tempo limite da operação de redimensionamento atual expira. Em seguida, começa a provisionar capacidade para a próxima operação de redimensionamento da frota.

Responder a eventos de tempo limite de redimensionamento da frota de instâncias

Recomendamos responder a um evento de tempo limite de aprovisionamento de uma das seguintes maneiras:

  • Revisite as especificações de redimensionamento e repita a operação de redimensionamento. Como a capacidade muda com frequência, seus clusters serão redimensionados com sucesso assim que a EC2 capacidade da Amazon estiver disponível. Recomendamos que os clientes configurem valores mais baixos para a duração do tempo limite dos trabalhos que exigem mais rigorSLAs.

  • Como alternativa, você pode:

  • Para o evento de tempo limite de provisionamento e redimensionamento contínuo, você também pode aguardar o processamento das operações de redimensionamento. A Amazon EMR continuará processando sequencialmente as operações de redimensionamento acionadas para a frota, respeitando as especificações de redimensionamento configuradas.

Também é possível configurar regras ou respostas automatizadas para este evento, conforme descrito na próxima seção.

Recuperação automatizada de um evento de tempo limite de provisionamento

Você pode criar automação em resposta aos EMR eventos da Amazon com o código do Spot Provisioning timeout evento. Por exemplo, a AWS Lambda função a seguir encerra um EMR cluster com uma frota de instâncias que usa instâncias spot para nós de tarefas e, em seguida, cria um novo EMR cluster com uma frota de instâncias que contém tipos de instância mais diversificados do que a solicitação original. Neste exemplo, o evento Spot Provisioning timeout emitido para os nós de tarefa acionará a execução da função do Lambda.

exemplo Exemplo de função para responder ao evento Spot Provisioning timeout
// Lambda code with Python 3.10 and handler is lambda_function.lambda_handler // Note: related IAM role requires permission to use Amazon EMR import json import boto3 import datetime from datetime import timezone SPOT_PROVISIONING_TIMEOUT_EXCEPTION_DETAIL_TYPE = "EMR Instance Fleet Resize" SPOT_PROVISIONING_TIMEOUT_EXCEPTION_EVENT_CODE = ( "Spot Provisioning timeout" ) CLIENT = boto3.client("emr", region_name="us-east-1") # checks if the incoming event is 'EMR Instance Fleet Resize' with eventCode 'Spot provisioning timeout' def is_spot_provisioning_timeout_event(event): if not event["detail"]: return False else: return ( event["detail-type"] == SPOT_PROVISIONING_TIMEOUT_EXCEPTION_DETAIL_TYPE and event["detail"]["eventCode"] == SPOT_PROVISIONING_TIMEOUT_EXCEPTION_EVENT_CODE ) # checks if the cluster is eligible for termination def is_cluster_eligible_for_termination(event, describeClusterResponse): # instanceFleetType could be CORE, MASTER OR TASK instanceFleetType = event["detail"]["instanceFleetType"] # Check if instance fleet receiving Spot provisioning timeout event is TASK if (instanceFleetType == "TASK"): return True else: return False # create a new cluster by choosing different InstanceType. def create_cluster(event): # instanceFleetType cloud be CORE, MASTER OR TASK instanceFleetType = event["detail"]["instanceFleetType"] # the following two lines assumes that the customer that created the cluster already knows which instance types they use in original request instanceTypesFromOriginalRequestMaster = "m5.xlarge" instanceTypesFromOriginalRequestCore = "m5.xlarge" # select new instance types to include in the new createCluster request instanceTypesForTask = [ "m5.xlarge", "m5.2xlarge", "m5.4xlarge", "m5.8xlarge", "m5.12xlarge" ] print("Starting to create cluster...") instances = { "InstanceFleets": [ { "InstanceFleetType":"MASTER", "TargetOnDemandCapacity":1, "TargetSpotCapacity":0, "InstanceTypeConfigs":[ { 'InstanceType': instanceTypesFromOriginalRequestMaster, "WeightedCapacity":1, } ] }, { "InstanceFleetType":"CORE", "TargetOnDemandCapacity":1, "TargetSpotCapacity":0, "InstanceTypeConfigs":[ { 'InstanceType': instanceTypesFromOriginalRequestCore, "WeightedCapacity":1, } ] }, { "InstanceFleetType":"TASK", "TargetOnDemandCapacity":0, "TargetSpotCapacity":100, "LaunchSpecifications":{}, "InstanceTypeConfigs":[ { 'InstanceType': instanceTypesForTask[0], "WeightedCapacity":1, }, { 'InstanceType': instanceTypesForTask[1], "WeightedCapacity":2, }, { 'InstanceType': instanceTypesForTask[2], "WeightedCapacity":4, }, { 'InstanceType': instanceTypesForTask[3], "WeightedCapacity":8, }, { 'InstanceType': instanceTypesForTask[4], "WeightedCapacity":12, } ], "ResizeSpecifications": { "SpotResizeSpecification": { "TimeoutDurationMinutes": 30 } } } ] } response = CLIENT.run_job_flow( Name="Test Cluster", Instances=instances, VisibleToAllUsers=True, JobFlowRole="EMR_EC2_DefaultRole", ServiceRole="EMR_DefaultRole", ReleaseLabel="emr-6.10.0", ) return response["JobFlowId"] # terminated the cluster using clusterId received in an event def terminate_cluster(event): print("Trying to terminate cluster, clusterId: " + event["detail"]["clusterId"]) response = CLIENT.terminate_job_flows(JobFlowIds=[event["detail"]["clusterId"]]) print(f"Terminate cluster response: {response}") def describe_cluster(event): response = CLIENT.describe_cluster(ClusterId=event["detail"]["clusterId"]) return response def lambda_handler(event, context): if is_spot_provisioning_timeout_event(event): print( "Received spot provisioning timeout event for instanceFleet, clusterId: " + event["detail"]["clusterId"] ) describeClusterResponse = describe_cluster(event) shouldTerminateCluster = is_cluster_eligible_for_termination( event, describeClusterResponse ) if shouldTerminateCluster: terminate_cluster(event) clusterId = create_cluster(event) print("Created a new cluster, clusterId: " + clusterId) else: print( "Cluster is not eligible for termination, clusterId: " + event["detail"]["clusterId"] ) else: print("Received event is not spot provisioning timeout event, skipping")