Menu
AWS Data Pipeline
Developer Guide (API Version 2012-10-29)

EmrCluster

Represents the configuration of an EMR cluster. This object is used by EmrActivity to launch a cluster.

Schedulers

Schedulers provide a way to specify resource allocation and job prioritization within a Hadoop cluster. Administrators or users can choose a scheduler for various classes of users and applications. A scheduler will possibly use queues to allocate resources to users and applications. You set up those queues when you create the cluster. You can then setup priority for certain types of work and user over others. This provides for efficient use of cluster resources, while allowing more than one user to submit work to the cluster. There are three types of scheduler available:

  • FairScheduler — Attempts to schedule resources evenly over a significant period of time.

  • CapacityScheduler — Uses queues to allow cluster administrators to assign users to queues of varying priority and resource allocation.

  • Default — Used by the cluster, which could be configured by your site.

Amazon EMR 2.x, 3.x vs. 4.x platforms

AWS Data Pipeline supports EMR clusters based on release label emr-4.0.0 or later, which requires the use of the releaseLabel field for the corresponding EmrCluster object. For previous platforms known as AMI releases, use the amiVersion field instead. If you are using a self-managed EmrCluster object with a release label, use the most current Task Runner. For more information about TaskRunner, see Working with Task Runner. You can configure all classifications found in the Amazon EMR configuration API. For a list of all configurations see the Configuring Applications topic in the Amazon EMR Release Guide: http://docs.aws.amazon.com/ElasticMapReduce/latest/ReleaseGuide/ as well as the the section called “EmrConfiguration” and the section called “Property” object references.

Examples

The following are examples of this object type.

Example 1: Launch an EMR cluster using the hadoopVersion field

The following example launches an EMR cluster using AMI version 1.0 and Hadoop 0.20.

{
  "id" : "MyEmrCluster",
  "type" : "EmrCluster",
  "hadoopVersion" : "0.20",
  "keyPair" : "my-key-pair",
  "masterInstanceType" : "m3.xlarge",
  "coreInstanceType" : "m3.xlarge",
  "coreInstanceCount" : "10",
  "taskInstanceType" : "m3.xlarge",
  "taskInstanceCount": "10",
  "bootstrapAction" : ["s3://elasticmapreduce/bootstrap-actions/configure-hadoop,arg1,arg2,arg3","s3://elasticmapreduce/bootstrap-actions/configure-hadoop/configure-other-stuff,arg1,arg2"]
}

Example 1: Launch an EMR cluster with release label emr-4.x or greater

The following example launches an EMR cluster using the newer releaseLabel field:

{
  "id" : "MyEmrCluster",
  "type" : "EmrCluster",
  "keyPair" : "my-key-pair",
  "masterInstanceType" : "m3.xlarge",
  "coreInstanceType" : "m3.xlarge",
  "coreInstanceCount" : "10",
  "taskInstanceType" : "m3.xlarge",
  "taskInstanceCount": "10",
  "releaseLabel": "emr-4.1.0",
  "applications": ["spark", "hive", "pig"],
  "configuration": {"ref":"myConfiguration"}  
}

Example 2: Install additional software on your EMR cluster

EmrCluster provides the supportedProducts field that installs third-party software on an EMR cluster, for example installing a custom distribution of Hadoop like MapR. It accepts a comma-separated list of arguments for the third-party software to read and act on. The following example shows how to use the supportedProducts field of EmrCluster to create a custom MapR M3 edition cluster with Karmasphere Analytics installed, and run an EmrActivity object on it.

{
    "id": "MyEmrActivity",
    "type": "EmrActivity",
    "schedule": {"ref": "ResourcePeriod"},
    "runsOn": {"ref": "MyEmrCluster"},
    "postStepCommand": "echo Ending job >> /mnt/var/log/stepCommand.txt",    
    "preStepCommand": "echo Starting job > /mnt/var/log/stepCommand.txt",
    "step": "/home/hadoop/contrib/streaming/hadoop-streaming.jar,-input,s3n://elasticmapreduce/samples/wordcount/input,-output, \
     hdfs:///output32113/,-mapper,s3n://elasticmapreduce/samples/wordcount/wordSplitter.py,-reducer,aggregate"
  },
  {    
    "id": "MyEmrCluster",
    "type": "EmrCluster",
    "schedule": {"ref": "ResourcePeriod"},
    "supportedProducts": ["mapr,--edition,m3,--version,1.2,--key1,value1","karmasphere-enterprise-utility"],
    "masterInstanceType": "m3.xlarge",
    "taskInstanceType": "m3.xlarge"
}                        

Example 3: Disable server-side encryption on 3.x AMIs

An EmrCluster activity with a Hadoop version 2.x created by AWS Data Pipeline enables server-side encryption by default. If you would like to disable server-side encryption, you must specify a bootstrap action in the cluster object definition.

The following example creates an EmrCluster activity with server-side encryption disabled:

{  
   "id":"NoSSEEmrCluster",
   "type":"EmrCluster",
   "hadoopVersion":"2.x",
   "keyPair":"my-key-pair",
   "masterInstanceType":"m3.xlarge",
   "coreInstanceType":"m3.large",
   "coreInstanceCount":"10",
   "taskInstanceType":"m3.large",
   "taskInstanceCount":"10",
   "bootstrapAction":["s3://elasticmapreduce/bootstrap-actions/configure-hadoop,-e, fs.s3.enableServerSideEncryption=false"]
}

Example 3: Disable server-side encryption on 4.x releases

You must disable server-side encryption using a EmrConfiguration object.

The following example creates an EmrCluster activity with server-side encryption disabled:

   {
      "name": "ReleaseLabelCluster",
      "releaseLabel": "emr-4.1.0",
      "applications": ["spark", "hive", "pig"],
      "id": "myResourceId",
      "type": "EmrCluster",
      "configuration": {
        "ref": "disableSSE"
      }
    },
    {
      "name": "disableSSE",
      "id": "disableSSE",
      "type": "EmrConfiguration",
      "classification": "emrfs-site",
      "property": [{
        "ref": "enableServerSideEncryption"
      }
      ]
    },
    {
      "name": "enableServerSideEncryption",
      "id": "enableServerSideEncryption",
      "type": "Property",
      "key": "fs.s3.enableServerSideEncryption",
      "value": "false"
    }

Example Configure Hadoop KMS Access Control Lists (ACLs) and Create Encryption Zones in HDFS

The following objects will create ACLs for Hadoop KMS and create encryption zones and corresponding encryption keys in HDFS:

{
      "name": "kmsAcls",
      "id": "kmsAcls",
      "type": "EmrConfiguration",
      "classification": "hadoop-kms-acls",
      "property": [
        {"ref":"kmsBlacklist"},
        {"ref":"kmsAcl"}
      ]
    },
    {
      "name": "hdfsEncryptionZone",
      "id": "hdfsEncryptionZone",
      "type": "EmrConfiguration",
      "classification": "hdfs-encryption-zones",
      "property": [
        {"ref":"hdfsPath1"},
        {"ref":"hdfsPath2"}
      ]
    },
    {
      "name": "kmsBlacklist",
      "id": "kmsBlacklist",
      "type": "Property",
      "key": "hadoop.kms.blacklist.CREATE",
      "value": "foo,myBannedUser"
    },
    {
      "name": "kmsAcl",
      "id": "kmsAcl",
      "type": "Property",
      "key": "hadoop.kms.acl.ROLLOVER",
      "value": "myAllowedUser"
    },
    {
      "name": "hdfsPath1",
      "id": "hdfsPath1",
      "type": "Property",
      "key": "/myHDFSPath1",
      "value": "path1_key"
    },
    {
      "name": "hdfsPath2",
      "id": "hdfsPath2",
      "type": "Property",
      "key": "/myHDFSPath2",
      "value": "path2_key"
    }  

Example 4: Specify custom IAM roles

By default, AWS Data Pipeline passes DataPipelineDefaultRole as the EMR service role and DataPipelineDefaultResourceRole as the EC2 instance profile to create resources on your behalf. However, you can create a custom EMR service role and a custom instance profile and use them instead. AWS Data Pipeline should have sufficient permissions to create clusters using the custom role, and you must add AWS Data Pipeline as a trusted entity.

The following example object specifies custom roles for the EMR cluster:

{  
   "id":"MyEmrCluster",
   "type":"EmrCluster",
   "hadoopVersion":"2.x",
   "keyPair":"my-key-pair",
   "masterInstanceType":"m3.xlarge",
   "coreInstanceType":"m3.large",
   "coreInstanceCount":"10",
   "taskInstanceType":"m3.large",
   "taskInstanceCount":"10",
   "role":"emrServiceRole",
   "resourceRole":"emrInstanceProfile"
}

Example Using EmrCluster Resource in AWS SDK for Java

The following shows how to use an EmrCluster and EmrActivity to create an Amazon EMR 4.x cluster to run a Spark step using the Java SDK:

public class dataPipelineEmr4 {

  public static void main(String[] args) {
    
	AWSCredentials credentials = null;
	credentials = new ProfileCredentialsProvider("/path/to/AwsCredentials.properties","default").getCredentials();
	DataPipelineClient dp = new DataPipelineClient(credentials);
	CreatePipelineRequest createPipeline = new CreatePipelineRequest().withName("EMR4SDK").withUniqueId("unique");
	CreatePipelineResult createPipelineResult = dp.createPipeline(createPipeline);
	String pipelineId = createPipelineResult.getPipelineId();
    
	PipelineObject emrCluster = new PipelineObject()
	    .withName("EmrClusterObj")
	    .withId("EmrClusterObj")
	    .withFields(
			new Field().withKey("releaseLabel").withStringValue("emr-4.1.0"),
			new Field().withKey("coreInstanceCount").withStringValue("3"),
			new Field().withKey("applications").withStringValue("spark"),
			new Field().withKey("applications").withStringValue("Presto-Sandbox"),
			new Field().withKey("type").withStringValue("EmrCluster"),
			new Field().withKey("keyPair").withStringValue("myKeyName"),
			new Field().withKey("masterInstanceType").withStringValue("m3.xlarge"),
			new Field().withKey("coreInstanceType").withStringValue("m3.xlarge")        
			);
  
	PipelineObject emrActivity = new PipelineObject()
	    .withName("EmrActivityObj")
	    .withId("EmrActivityObj")
	    .withFields(
			new Field().withKey("step").withStringValue("command-runner.jar,spark-submit,--executor-memory,1g,--class,org.apache.spark.examples.SparkPi,/usr/lib/spark/lib/spark-examples.jar,10"),
			new Field().withKey("runsOn").withRefValue("EmrClusterObj"),
			new Field().withKey("type").withStringValue("EmrActivity")
			);
      
	PipelineObject schedule = new PipelineObject()
	    .withName("Every 15 Minutes")
	    .withId("DefaultSchedule")
	    .withFields(
			new Field().withKey("type").withStringValue("Schedule"),
			new Field().withKey("period").withStringValue("15 Minutes"),
			new Field().withKey("startAt").withStringValue("FIRST_ACTIVATION_DATE_TIME")
			);
      
	PipelineObject defaultObject = new PipelineObject()
	    .withName("Default")
	    .withId("Default")
	    .withFields(
			new Field().withKey("failureAndRerunMode").withStringValue("CASCADE"),
			new Field().withKey("schedule").withRefValue("DefaultSchedule"),
			new Field().withKey("resourceRole").withStringValue("DataPipelineDefaultResourceRole"),
			new Field().withKey("role").withStringValue("DataPipelineDefaultRole"),
			new Field().withKey("pipelineLogUri").withStringValue("s3://myLogUri"),
			new Field().withKey("scheduleType").withStringValue("cron")
			);     
      
	List<PipelineObject> pipelineObjects = new ArrayList<PipelineObject>();
    
	pipelineObjects.add(emrActivity);
	pipelineObjects.add(emrCluster);
	pipelineObjects.add(defaultObject);
	pipelineObjects.add(schedule);
    
	PutPipelineDefinitionRequest putPipelineDefintion = new PutPipelineDefinitionRequest()
	    .withPipelineId(pipelineId)
	    .withPipelineObjects(pipelineObjects);
    
	PutPipelineDefinitionResult putPipelineResult = dp.putPipelineDefinition(putPipelineDefintion);
	System.out.println(putPipelineResult);
    
	ActivatePipelineRequest activatePipelineReq = new ActivatePipelineRequest()
	    .withPipelineId(pipelineId);
	ActivatePipelineResult activatePipelineRes = dp.activatePipeline(activatePipelineReq);
	
      System.out.println(activatePipelineRes);
      System.out.println(pipelineId);
    
    }

}


When you create a custom IAM role, carefully consider the minimum permissions necessary for your cluster to perform its work. Be sure to grant access to required resources, such as files in Amazon S3 or data in Amazon RDS, Amazon Redshift or DynamoDB.

If you wish to set visibleToAllUsers to False, your role must have the proper permissions to do so. Note that DataPipelineDefaultRole does not have these permissions. You must either provide a union of the DefaultDataPipelineResourceRole and DataPipelineDefaultRole roles as the EmrCluster object role or create your own role for this purpose.


Syntax

Object Invocation FieldsDescriptionSlot Type
scheduleThis object is invoked within the execution of a schedule interval. Users must specify a schedule reference to another object to set the dependency execution order for this object. Users can satisfy this requirement by explicitly setting a schedule on the object, for example, by specifying "schedule": {"ref": "DefaultSchedule"}. In most cases, it is better to put the schedule reference on the default pipeline object so that all objects inherit that schedule. Or, if the pipeline has a tree of schedules (schedules within the master schedule), users can create a parent object that has a schedule reference. For more information about example optional schedule configurations, see http://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-object-schedule.htmlReference Object, e.g. "schedule":{"ref":"myScheduleId"}


Optional FieldsDescriptionSlot Type
actionOnResourceFailureThe action taken after a resource failure for this resource.String
actionOnTaskFailureThe action taken after task failure for this resource.String
additionalMasterSecurityGroupIdsThe identifier of additional master security groups of the EMR cluster, which follows the form sg-01XXXX6a. For more information, see Amazon EMR Additional Security Groups in the Amazon Elastic MapReduce Developer Guide.String
additionalSlaveSecurityGroupIdsThe identifier of additional slave security groups of the EMR cluster, which follows the form sg-01XXXX6a.String
amiVersionThe Amazon Machine Image (AMI) version that Amazon EMR uses to install the cluster nodes. For more information, see AMI Versions Supported in Amazon EMR in the Amazon Elastic MapReduce Developer Guide.String
applicationsapplications to install in the cluster with comma separated arguments. By default, hive and pig will be installed.This parameter is applicable only for releaseLabel emr-4.0.0 and aboveString
attemptStatusMost recently reported status from the remote activity.String
attemptTimeoutTimeout for remote work completion. If set then a remote activity that does not complete within the set time of starting may be retried.Period
availabilityZoneThe availability zone in which to run the cluster.String
bootstrapActionAn action to run when the cluster starts. You can specify comma-separated arguments. To specify multiple actions, up to 255, add multiple bootstrapAction fields. The default behavior is to start the cluster without any bootstrap actions.String
configurationConfiguration for the EMR cluster. This parameter is applicable only for releaseLabel emr-4.0.0 and aboveReference Object, e.g. "configuration":{"ref":"myEmrConfigurationId"}
coreInstanceBidPriceThe maximum dollar amount for your Spot Instance bid. A decimal value between 0 and 20.00, exclusive. Setting this value enables Spot Instances for the EMR cluster core nodes. Must be used together with coreInstanceCount.String
coreInstanceCountThe number of core nodes to use for the cluster.Integer
coreInstanceTypeThe type of EC2 instance to use for core nodes. The default value is m1.small.String
emrManagedMasterSecurityGroupIdThe identifier of the master security group of the EMR cluster, which follows the form sg-01XXXX6a. For more information, see Configure Security Groups for Amazon EMR in the Amazon Elastic MapReduce Developer Guide.String
emrManagedSlaveSecurityGroupIdThe identifier of the slave security group of the EMR cluster, which follows the form sg-01XXXX6a.String
enableDebuggingEnables debugging on the EMR cluster.String
failureAndRerunModeDescribes consumer node behavior when dependencies fail or are rerunEnumeration
hadoopSchedulerTypeThe scheduler type of the cluster. Valid types are: PARALLEL_FAIR_SCHEDULING, PARALLEL_CAPACITY_SCHEDULING, and DEFAULT_SCHEDULER.Enumeration
httpProxyThe proxy host that clients use to connect to AWS services.Reference Object, e.g. "httpProxy":{"ref":"myHttpProxyId"}
initTimeoutThe amount of time to wait for the resource to start. Period
keyPairThe Amazon EC2 key pair to use to log onto the master node of the EMR cluster.String
lateAfterTimeoutThe time period in which the object run must start.Period
masterInstanceBidPriceThe maximum dollar amount for your Spot Instance bid. A decimal value between 0 and 20.00, exclusive. Setting this value enables Spot Instances for the Amazon EMR cluster master node.String
masterInstanceTypeThe type of EC2 instance to use for the master node. The default value is m1.small.String
maxActiveInstancesThe maximum number of concurrent active instances of a component. Re-runs do not count toward the number of active instances.Integer
maximumRetriesMaximum number attempt retries on failureInteger
onFailAn action to run when current object fails.Reference Object, e.g. "onFail":{"ref":"myActionId"}
onLateActionActions that should be triggered if an object has not yet been scheduled or still not completed.Reference Object, e.g. "onLateAction":{"ref":"myActionId"}
onSuccessAn action to run when current object succeeds.Reference Object, e.g. "onSuccess":{"ref":"myActionId"}
parentParent of the current object from which slots will be inherited.Reference Object, e.g. "parent":{"ref":"myBaseObjectId"}
pipelineLogUriThe S3 URI (such as 's3://BucketName/Key/') for uploading logs for the pipeline.String
regionThe code for the region that the EMR cluster should run in. By default, the cluster runs in the same region as the pipeline. You can run the cluster in the same region as a dependent data set. Enumeration
releaseLabelrelease label for the Amazon EMR clusterString
reportProgressTimeoutTimeout for remote work successive calls to reportProgress. If set then remote activities that do not report progres for the specified period may be considered stalled and so retried.Period
resourceRoleThe IAM role AWS Data Pipeline uses to create the EMR cluster. The default role is DataPipelineDefaultRole. String
retryDelayThe timeout duration between two retry attempts.Period
roleThe IAM role passed to EMR to create EC2 nodesString
runsOnThis Field is not allowed on this object.Reference Object, e.g. "runsOn":{"ref":"myResourceId"}
scheduleTypeSchedule type allows you to specify whether the objects in your pipeline definition should be scheduled at the beginning of interval or end of the interval. Time Series Style Scheduling means instances are scheduled at the end of each interval and Cron Style Scheduling means instances are scheduled at the beginning of each interval. An on-demand schedule allows you to run a pipeline one time per activation. This means you do not have to clone or re-create the pipeline to run it again. If you use an on-demand schedule it must be specified in the default object and must be the only scheduleType specified for objects in the pipeline. To use on-demand pipelines, you simply call the ActivatePipeline operation for each subsequent run. Values are: cron, ondemand, and timeseries.Enumeration
subnetIdThe identifier of the subnet to launch the cluster into.String
supportedProductsA parameter that installs third-party software on an EMR cluster, for example installing a third-party distribution of Hadoop.String
taskInstanceBidPriceThe maximum dollar amount for your Spot Instance bid. A decimal value between 0 and 20.00, exclusive. Setting this value enables Spot Instances for the EMR cluster task nodes.String
taskInstanceCountThe number of task nodes to use for the cluster.Integer
taskInstanceTypeThe type of EC2 instance to use for task nodes.String
terminateAfterTerminate the resource after these many hours.Period
useOnDemandOnLastAttemptOn the last attempt to request a resource, make a request for On-Demand Instances rather than Spot Instances. This ensures that if all previous attempts have failed, the last attempt is not interrupted in the middle by changes in the Spot market. Boolean
workerGroupField not allowed on this objectString


Runtime FieldsDescriptionSlot Type
@activeInstancesList of the currently scheduled active instance objects.Reference Object, e.g. "activeInstances":{"ref":"myRunnableObjectId"}
@actualEndTimeTime when the execution of this object finished.DateTime
@actualStartTimeTime when the execution of this object started.DateTime
cancellationReasonThe cancellationReason if this object was cancelled.String
@cascadeFailedOnDescription of depedency chain the object failed on.Reference Object, e.g. "cascadeFailedOn":{"ref":"myRunnableObjectId"}
emrStepLogEMR step logs available only on EMR activity attemptsString
errorIdThe errorId if this object failed.String
errorMessageThe errorMessage if this object failed.String
errorStackTraceThe error stack trace if this object failed.String
@failureReasonThe reason for the resource failure.String
@finishedTimeThe time at which this object finished its execution.DateTime
hadoopJobLogHadoop job logs available on attempts for EMR-based activities.String
@healthStatusThe health status of the object which reflects success or failure of the last object instance that reached a terminated state.String
@healthStatusFromInstanceIdId of the last instance object that reached a terminated state.String
@healthStatusUpdatedTimeTime at which the health status was updated last time.DateTime
hostnameThe host name of client that picked up the task attempt.String
@lastDeactivatedTimeThe time at which this object was last deactivated.DateTime
@latestCompletedRunTimeTime the latest run for which the execution completed.DateTime
@latestRunTimeTime the latest run for which the execution was scheduled.DateTime
@nextRunTimeTime of run to be scheduled next.DateTime
reportProgressTimeMost recent time that remote activity reported progress.DateTime
@scheduledEndTimeSchedule end time for objectDateTime
@scheduledStartTimeSchedule start time for objectDateTime
@statusThe status of this object.String
@versionPipeline version the object was created with.String
@waitingOnDescription of list of dependencies this object is waiting on.Reference Object, e.g. "waitingOn":{"ref":"myRunnableObjectId"}


System FieldsDescriptionSlot Type
@errorError describing the ill-formed objectString
@pipelineIdId of the pipeline to which this object belongs toString
@sphereThe sphere of an object denotes its place in the lifecycle: Component Objects give rise to Instance Objects which execute Attempt ObjectsString


See Also