Amazon EMRクラスターのインスタンス容量不足イベントへの対応 - Amazon EMR

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Amazon EMRクラスターのインスタンス容量不足イベントへの対応

概要

Amazon EMRクラスターは、EC2 provisioning - Insufficient Instance Capacity選択したアベイラビリティーゾーンにクラスターの開始またはサイズ変更リクエストを満たすのに十分な容量がない場合にイベントコードを返します。Amazon がキャパシティ不足の例外EMRを繰り返し検出し、クラスターの開始またはクラスターのサイズ変更オペレーションのプロビジョニングリクエストを満たせない場合、イベントはインスタンスグループとインスタンスフリートの両方で定期的に発生します。

このページでは、EMRクラスターでこのイベントタイプが発生したときに、そのイベントタイプに最もよく対応する方法を説明します。

容量不足イベントへの推奨対応

容量不足のイベントには、次のいずれかの方法で対応することをお勧めします。

  • 容量が回復するまで待ちます。容量は頻繁に変化するため、容量不足の例外は自然に解消される可能性があります。Amazon EC2容量が利用可能になるとすぐに、クラスターのサイズ変更が開始または終了します。

  • または、クラスターを終了して、インスタンスタイプの設定を変更し、更新されたクラスター設定のリクエストを使用して新しいクラスターを作成します。詳細については、「インスタンスとアベイラビリティーゾーンの柔軟性に関するベストプラクティス」を参照してください。

次のセクションで説明するように、容量不足のイベントに対するルールや自動応答を設定することも可能です。

容量不足のイベントからの自動回復

イベントコード を持つEMRイベントなどの Amazon イベントに応じてオートメーションを構築できますEC2 provisioning - Insufficient Instance Capacity。例えば、次の AWS Lambda 関数は、オンデマンドインスタンスを使用するインスタンスグループで EMR クラスターを終了し、元のリクエストとは異なるインスタンスタイプを含むインスタンスグループで新しいEMRクラスターを作成します。

以下の条件によって自動化プロセスが開始されます。

  • プライマリノードまたはコアノードで容量不足のイベントが 20 分を超えて発生しています。

  • クラスターが READYまたは WAITING状態ではありません。EMR クラスターの状態の詳細については、「」を参照してくださいクラスターライフサイクルについて

注記

容量不足の例外に対する自動化プロセスを構築するときは、容量不足のイベントは回復可能である点を考慮してください。多くの場合、容量はシフトし、Amazon EC2容量が利用可能になるとすぐにクラスターはサイズ変更を再開するか、オペレーションを開始します。

例 容量不足のイベントに対応する関数
// Lambda code with Python 3.10 and handler is lambda_function.lambda_handler // Note: related IAM role requires permission to use Amazon EMR import json import boto3 import datetime from datetime import timezone INSUFFICIENT_CAPACITY_EXCEPTION_DETAIL_TYPE = "EMR Instance Group Provisioning" INSUFFICIENT_CAPACITY_EXCEPTION_EVENT_CODE = ( "EC2 provisioning - Insufficient Instance Capacity" ) ALLOWED_INSTANCE_TYPES_TO_USE = [ "m5.xlarge", "c5.xlarge", "m5.4xlarge", "m5.2xlarge", "t3.xlarge", ] CLUSTER_START_ACCEPTABLE_STATES = ["WAITING", "RUNNING"] CLUSTER_START_SLA = 20 CLIENT = boto3.client("emr", region_name="us-east-1") # checks if the incoming event is 'EMR Instance Fleet Provisioning' with eventCode 'EC2 provisioning - Insufficient Instance Capacity' def is_insufficient_capacity_event(event): if not event["detail"]: return False else: return ( event["detail-type"] == INSUFFICIENT_CAPACITY_EXCEPTION_DETAIL_TYPE and event["detail"]["eventCode"] == INSUFFICIENT_CAPACITY_EXCEPTION_EVENT_CODE ) # checks if the cluster is eligible for termination def is_cluster_eligible_for_termination(event, describeClusterResponse): # instanceGroupType could be CORE, MASTER OR TASK instanceGroupType = event["detail"]["instanceGroupType"] clusterCreationTime = describeClusterResponse["Cluster"]["Status"]["Timeline"][ "CreationDateTime" ] clusterState = describeClusterResponse["Cluster"]["Status"]["State"] now = datetime.datetime.now() now = now.replace(tzinfo=timezone.utc) isClusterStartSlaBreached = clusterCreationTime < now - datetime.timedelta( minutes=CLUSTER_START_SLA ) # Check if instance group receiving Insufficient capacity exception is CORE or PRIMARY (MASTER), # and it's been more than 20 minutes since cluster was created but the cluster state and the cluster state is not updated to RUNNING or WAITING if ( (instanceGroupType == "CORE" or instanceGroupType == "MASTER") and isClusterStartSlaBreached and clusterState not in CLUSTER_START_ACCEPTABLE_STATES ): return True else: return False # Choose item from the list except the exempt value def choice_excluding(exempt): for i in ALLOWED_INSTANCE_TYPES_TO_USE: if i != exempt: return i # Create a new cluster by choosing different InstanceType. def create_cluster(event): # instanceGroupType cloud be CORE, MASTER OR TASK instanceGroupType = event["detail"]["instanceGroupType"] # Following two lines assumes that the customer that created the cluster already knows which instance types they use in original request instanceTypesFromOriginalRequestMaster = "m5.xlarge" instanceTypesFromOriginalRequestCore = "m5.xlarge" # Select new instance types to include in the new createCluster request instanceTypeForMaster = ( instanceTypesFromOriginalRequestMaster if instanceGroupType != "MASTER" else choice_excluding(instanceTypesFromOriginalRequestMaster) ) instanceTypeForCore = ( instanceTypesFromOriginalRequestCore if instanceGroupType != "CORE" else choice_excluding(instanceTypesFromOriginalRequestCore) ) print("Starting to create cluster...") instances = { "InstanceGroups": [ { "InstanceRole": "MASTER", "InstanceCount": 1, "InstanceType": instanceTypeForMaster, "Market": "ON_DEMAND", "Name": "Master", }, { "InstanceRole": "CORE", "InstanceCount": 1, "InstanceType": instanceTypeForCore, "Market": "ON_DEMAND", "Name": "Core", }, ] } response = CLIENT.run_job_flow( Name="Test Cluster", Instances=instances, VisibleToAllUsers=True, JobFlowRole="EMR_EC2_DefaultRole", ServiceRole="EMR_DefaultRole", ReleaseLabel="emr-6.10.0", ) return response["JobFlowId"] # Terminated the cluster using clusterId received in an event def terminate_cluster(event): print("Trying to terminate cluster, clusterId: " + event["detail"]["clusterId"]) response = CLIENT.terminate_job_flows(JobFlowIds=[event["detail"]["clusterId"]]) print(f"Terminate cluster response: {response}") def describe_cluster(event): response = CLIENT.describe_cluster(ClusterId=event["detail"]["clusterId"]) return response def lambda_handler(event, context): if is_insufficient_capacity_event(event): print( "Received insufficient capacity event for instanceGroup, clusterId: " + event["detail"]["clusterId"] ) describeClusterResponse = describe_cluster(event) shouldTerminateCluster = is_cluster_eligible_for_termination( event, describeClusterResponse ) if shouldTerminateCluster: terminate_cluster(event) clusterId = create_cluster(event) print("Created a new cluster, clusterId: " + clusterId) else: print( "Cluster is not eligible for termination, clusterId: " + event["detail"]["clusterId"] ) else: print("Received event is not insufficient capacity event, skipping")