Amazon EMRクラスターインスタンスフリートのサイズ変更タイムアウトイベントへの対応 - Amazon EMR

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Amazon EMRクラスターインスタンスフリートのサイズ変更タイムアウトイベントへの対応

概要

Amazon EMRクラスターは、インスタンスフリートクラスターのサイズ変更オペレーションの実行中にイベントを出力します。プロビジョニングタイムアウトイベントは、タイムアウトの期限が切れた後に Amazon がフリートのスポットキャパシティまたはオンデマンドキャパシティのプロビジョニングEMRを停止したときに出力されます。タイムアウト時間は、インスタンスフリートのサイズ変更仕様の一部としてユーザーが設定できます。同じインスタンスフリートで連続してサイズを変更するシナリオでは、現在のサイズ変更オペレーションのタイムアウトが期限切れになると、Amazon EMR は Spot provisioning timeout - continuing resizeまたは On-Demand provisioning timeout - continuing resizeイベントを発行します。その後、フリートの次のサイズ変更操作のために容量のプロビジョニングを開始します。

インスタンスフリートのサイズ変更のタイムアウトイベントに対応する

プロビジョニングのタイムアウトイベントには、次のいずれかの方法で対応することをお勧めします。

  • サイズ変更の仕様を再確認し、サイズ変更操作を再試行します。容量が頻繁にシフトすると、Amazon EC2容量が利用可能になるとすぐにクラスターのサイズが正常に変更されます。より厳密な を必要とするジョブのタイムアウト期間には、より低い値を設定することをお勧めしますSLAs。

  • または、次のいずれかの方法を取ります。

  • provisioning timeout - continuing resize イベントでは、サイズ変更操作が処理されるのをさらに待つことができます。Amazon EMR は、設定されたサイズ変更仕様に従って、フリートに対してトリガーされたサイズ変更オペレーションを順番に処理し続けます。

次のセクションで説明するように、このイベントに対するルールや自動応答を設定することも可能です。

プロビジョニングのタイムアウトイベントからの自動回復

Spot Provisioning timeout イベントコードを使用して Amazon EMRイベントに応じてオートメーションを構築できます。例えば、次の AWS Lambda 関数は、タスクノードのスポットインスタンスを使用するインスタンスフリートで EMRクラスターをシャットダウンし、元のリクエストよりも多様化されたインスタンスタイプを含むインスタンスフリートで新しいEMRクラスターを作成します。この例では、タスクノードに対する Spot Provisioning timeout イベントが発行されると、Lambda 関数の実行がトリガーされます。

Spot Provisioning timeout イベントに対応する関数の例
// Lambda code with Python 3.10 and handler is lambda_function.lambda_handler // Note: related IAM role requires permission to use Amazon EMR import json import boto3 import datetime from datetime import timezone SPOT_PROVISIONING_TIMEOUT_EXCEPTION_DETAIL_TYPE = "EMR Instance Fleet Resize" SPOT_PROVISIONING_TIMEOUT_EXCEPTION_EVENT_CODE = ( "Spot Provisioning timeout" ) CLIENT = boto3.client("emr", region_name="us-east-1") # checks if the incoming event is 'EMR Instance Fleet Resize' with eventCode 'Spot provisioning timeout' def is_spot_provisioning_timeout_event(event): if not event["detail"]: return False else: return ( event["detail-type"] == SPOT_PROVISIONING_TIMEOUT_EXCEPTION_DETAIL_TYPE and event["detail"]["eventCode"] == SPOT_PROVISIONING_TIMEOUT_EXCEPTION_EVENT_CODE ) # checks if the cluster is eligible for termination def is_cluster_eligible_for_termination(event, describeClusterResponse): # instanceFleetType could be CORE, MASTER OR TASK instanceFleetType = event["detail"]["instanceFleetType"] # Check if instance fleet receiving Spot provisioning timeout event is TASK if (instanceFleetType == "TASK"): return True else: return False # create a new cluster by choosing different InstanceType. def create_cluster(event): # instanceFleetType cloud be CORE, MASTER OR TASK instanceFleetType = event["detail"]["instanceFleetType"] # the following two lines assumes that the customer that created the cluster already knows which instance types they use in original request instanceTypesFromOriginalRequestMaster = "m5.xlarge" instanceTypesFromOriginalRequestCore = "m5.xlarge" # select new instance types to include in the new createCluster request instanceTypesForTask = [ "m5.xlarge", "m5.2xlarge", "m5.4xlarge", "m5.8xlarge", "m5.12xlarge" ] print("Starting to create cluster...") instances = { "InstanceFleets": [ { "InstanceFleetType":"MASTER", "TargetOnDemandCapacity":1, "TargetSpotCapacity":0, "InstanceTypeConfigs":[ { 'InstanceType': instanceTypesFromOriginalRequestMaster, "WeightedCapacity":1, } ] }, { "InstanceFleetType":"CORE", "TargetOnDemandCapacity":1, "TargetSpotCapacity":0, "InstanceTypeConfigs":[ { 'InstanceType': instanceTypesFromOriginalRequestCore, "WeightedCapacity":1, } ] }, { "InstanceFleetType":"TASK", "TargetOnDemandCapacity":0, "TargetSpotCapacity":100, "LaunchSpecifications":{}, "InstanceTypeConfigs":[ { 'InstanceType': instanceTypesForTask[0], "WeightedCapacity":1, }, { 'InstanceType': instanceTypesForTask[1], "WeightedCapacity":2, }, { 'InstanceType': instanceTypesForTask[2], "WeightedCapacity":4, }, { 'InstanceType': instanceTypesForTask[3], "WeightedCapacity":8, }, { 'InstanceType': instanceTypesForTask[4], "WeightedCapacity":12, } ], "ResizeSpecifications": { "SpotResizeSpecification": { "TimeoutDurationMinutes": 30 } } } ] } response = CLIENT.run_job_flow( Name="Test Cluster", Instances=instances, VisibleToAllUsers=True, JobFlowRole="EMR_EC2_DefaultRole", ServiceRole="EMR_DefaultRole", ReleaseLabel="emr-6.10.0", ) return response["JobFlowId"] # terminated the cluster using clusterId received in an event def terminate_cluster(event): print("Trying to terminate cluster, clusterId: " + event["detail"]["clusterId"]) response = CLIENT.terminate_job_flows(JobFlowIds=[event["detail"]["clusterId"]]) print(f"Terminate cluster response: {response}") def describe_cluster(event): response = CLIENT.describe_cluster(ClusterId=event["detail"]["clusterId"]) return response def lambda_handler(event, context): if is_spot_provisioning_timeout_event(event): print( "Received spot provisioning timeout event for instanceFleet, clusterId: " + event["detail"]["clusterId"] ) describeClusterResponse = describe_cluster(event) shouldTerminateCluster = is_cluster_eligible_for_termination( event, describeClusterResponse ) if shouldTerminateCluster: terminate_cluster(event) clusterId = create_cluster(event) print("Created a new cluster, clusterId: " + clusterId) else: print( "Cluster is not eligible for termination, clusterId: " + event["detail"]["clusterId"] ) else: print("Received event is not spot provisioning timeout event, skipping")