Menanggapi peristiwa batas waktu tunggu armada instans EMR cluster Amazon - Amazon EMR

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Menanggapi peristiwa batas waktu tunggu armada instans EMR cluster Amazon

Gambaran Umum

EMRCluster Amazon memancarkan peristiwa saat menjalankan operasi pengubahan ukuran untuk cluster armada misalnya. Peristiwa batas waktu penyediaan dipancarkan saat EMR Amazon menghentikan penyediaan kapasitas Spot atau On-Demand untuk armada setelah batas waktu berakhir. Durasi batas waktu dapat dikonfigurasi oleh pengguna sebagai bagian dari spesifikasi pengubahan ukuran untuk armada instance. Dalam skenario pengubahan ukuran berturut-turut untuk armada instans yang sama, Amazon EMR memancarkan Spot provisioning timeout - continuing resize atau On-Demand provisioning timeout - continuing resize peristiwa saat batas waktu untuk operasi pengubahan ukuran saat ini kedaluwarsa. Kemudian mulai menyediakan kapasitas untuk operasi pengubahan ukuran armada berikutnya.

Menanggapi peristiwa timeout pengubahan ukuran armada instance

Kami menyarankan Anda menanggapi peristiwa batas waktu penyediaan dengan salah satu cara berikut:

  • Kunjungi kembali spesifikasi pengubahan ukuran dan coba lagi operasi pengubahan ukuran. Karena kapasitas sering bergeser, cluster Anda akan berhasil mengubah ukuran segera setelah EC2 kapasitas Amazon tersedia. Kami menyarankan pelanggan untuk mengonfigurasi nilai yang lebih rendah untuk durasi waktu tunggu untuk pekerjaan yang membutuhkan lebih ketatSLAs.

  • Atau, Anda dapat:

  • Untuk waktu tunggu penyediaan - melanjutkan acara pengubahan ukuran, Anda juga dapat menunggu operasi pengubahan ukuran diproses. Amazon EMR akan terus memproses secara berurutan operasi pengubahan ukuran yang dipicu untuk armada, dengan menghormati spesifikasi pengubahan ukuran yang dikonfigurasi.

Anda juga dapat mengatur aturan atau tanggapan otomatis untuk acara ini seperti yang dijelaskan di bagian berikutnya.

Pemulihan otomatis dari peristiwa batas waktu penyediaan

Anda dapat membangun otomatisasi dalam menanggapi EMR peristiwa Amazon dengan kode Spot Provisioning timeout acara. Misalnya, AWS Lambda fungsi berikut mematikan EMR klaster dengan armada instance yang menggunakan instance Spot untuk node Tugas, dan kemudian membuat EMR klaster baru dengan armada instance yang berisi tipe instance yang lebih beragam daripada permintaan asli. Dalam contoh ini, Spot Provisioning timeout peristiwa yang dipancarkan untuk node tugas akan memicu eksekusi fungsi Lambda.

contoh Contoh fungsi untuk menanggapi Spot Provisioning timeout peristiwa
// Lambda code with Python 3.10 and handler is lambda_function.lambda_handler // Note: related IAM role requires permission to use Amazon EMR import json import boto3 import datetime from datetime import timezone SPOT_PROVISIONING_TIMEOUT_EXCEPTION_DETAIL_TYPE = "EMR Instance Fleet Resize" SPOT_PROVISIONING_TIMEOUT_EXCEPTION_EVENT_CODE = ( "Spot Provisioning timeout" ) CLIENT = boto3.client("emr", region_name="us-east-1") # checks if the incoming event is 'EMR Instance Fleet Resize' with eventCode 'Spot provisioning timeout' def is_spot_provisioning_timeout_event(event): if not event["detail"]: return False else: return ( event["detail-type"] == SPOT_PROVISIONING_TIMEOUT_EXCEPTION_DETAIL_TYPE and event["detail"]["eventCode"] == SPOT_PROVISIONING_TIMEOUT_EXCEPTION_EVENT_CODE ) # checks if the cluster is eligible for termination def is_cluster_eligible_for_termination(event, describeClusterResponse): # instanceFleetType could be CORE, MASTER OR TASK instanceFleetType = event["detail"]["instanceFleetType"] # Check if instance fleet receiving Spot provisioning timeout event is TASK if (instanceFleetType == "TASK"): return True else: return False # create a new cluster by choosing different InstanceType. def create_cluster(event): # instanceFleetType cloud be CORE, MASTER OR TASK instanceFleetType = event["detail"]["instanceFleetType"] # the following two lines assumes that the customer that created the cluster already knows which instance types they use in original request instanceTypesFromOriginalRequestMaster = "m5.xlarge" instanceTypesFromOriginalRequestCore = "m5.xlarge" # select new instance types to include in the new createCluster request instanceTypesForTask = [ "m5.xlarge", "m5.2xlarge", "m5.4xlarge", "m5.8xlarge", "m5.12xlarge" ] print("Starting to create cluster...") instances = { "InstanceFleets": [ { "InstanceFleetType":"MASTER", "TargetOnDemandCapacity":1, "TargetSpotCapacity":0, "InstanceTypeConfigs":[ { 'InstanceType': instanceTypesFromOriginalRequestMaster, "WeightedCapacity":1, } ] }, { "InstanceFleetType":"CORE", "TargetOnDemandCapacity":1, "TargetSpotCapacity":0, "InstanceTypeConfigs":[ { 'InstanceType': instanceTypesFromOriginalRequestCore, "WeightedCapacity":1, } ] }, { "InstanceFleetType":"TASK", "TargetOnDemandCapacity":0, "TargetSpotCapacity":100, "LaunchSpecifications":{}, "InstanceTypeConfigs":[ { 'InstanceType': instanceTypesForTask[0], "WeightedCapacity":1, }, { 'InstanceType': instanceTypesForTask[1], "WeightedCapacity":2, }, { 'InstanceType': instanceTypesForTask[2], "WeightedCapacity":4, }, { 'InstanceType': instanceTypesForTask[3], "WeightedCapacity":8, }, { 'InstanceType': instanceTypesForTask[4], "WeightedCapacity":12, } ], "ResizeSpecifications": { "SpotResizeSpecification": { "TimeoutDurationMinutes": 30 } } } ] } response = CLIENT.run_job_flow( Name="Test Cluster", Instances=instances, VisibleToAllUsers=True, JobFlowRole="EMR_EC2_DefaultRole", ServiceRole="EMR_DefaultRole", ReleaseLabel="emr-6.10.0", ) return response["JobFlowId"] # terminated the cluster using clusterId received in an event def terminate_cluster(event): print("Trying to terminate cluster, clusterId: " + event["detail"]["clusterId"]) response = CLIENT.terminate_job_flows(JobFlowIds=[event["detail"]["clusterId"]]) print(f"Terminate cluster response: {response}") def describe_cluster(event): response = CLIENT.describe_cluster(ClusterId=event["detail"]["clusterId"]) return response def lambda_handler(event, context): if is_spot_provisioning_timeout_event(event): print( "Received spot provisioning timeout event for instanceFleet, clusterId: " + event["detail"]["clusterId"] ) describeClusterResponse = describe_cluster(event) shouldTerminateCluster = is_cluster_eligible_for_termination( event, describeClusterResponse ) if shouldTerminateCluster: terminate_cluster(event) clusterId = create_cluster(event) print("Created a new cluster, clusterId: " + clusterId) else: print( "Cluster is not eligible for termination, clusterId: " + event["detail"]["clusterId"] ) else: print("Received event is not spot provisioning timeout event, skipping")