EmrCluster - AWS Data Pipeline

EmrCluster

Represents the configuration of an Amazon EMR cluster. This object is used by EmrActivity and HadoopActivity to launch a cluster.

Schedulers

Schedulers provide a way to specify resource allocation and job prioritization within a Hadoop cluster. Administrators or users can choose a scheduler for various classes of users and applications. A scheduler could use queues to allocate resources to users and applications. You set up those queues when you create the cluster. You can then set up priority for certain types of work and user over others. This provides for efficient use of cluster resources, while allowing more than one user to submit work to the cluster. There are three types of scheduler available:

  • FairScheduler — Attempts to schedule resources evenly over a significant period of time.

  • CapacityScheduler — Uses queues to allow cluster administrators to assign users to queues of varying priority and resource allocation.

  • Default — Used by the cluster, which could be configured by your site.

Amazon EMR Release Versions

An Amazon EMR release is a set of open-source applications from the big data ecosystem. Each release comprises different big data applications, components, and features that you select to have Amazon EMR install and configure when you create a cluster. You specify the release version using the release label. Release labels are in the form emr-x.x.x. For example, emr-5.30.0. Amazon EMR clusters based on release label emr-4.0.0 and later use the releaseLabel property to specify the release label of an EmrCluster object. Earlier versions use the amiVersion property.

Important

All Amazon EMR clusters created using release version 5.22.0 or later use Signature Version 4 to authenticate requests to Amazon S3. Some earlier release versions use Signature Version 2. Signature Version 2 support is being discontinued. For more information, see Amazon S3 Update – SigV2 Deprecation Period Extended and Modified. We strongly recommend that you use an Amazon EMR release version that supports Signature Version 4. For earlier version releases, beginning with EMR 4.7.x, the most recent release in the series has been updated to support Signature Version 4. When using an earlier version EMR release, we recommend that you use the latest release in the series. In addition, avoid releases earlier than EMR 4.7.0.

Considerations and Limitations

Use the latest version of Task Runner

If you are using a self-managed EmrCluster object with a release label, use the latest Task Runner. For more information about Task Runner, see Working with Task Runner. You can configure property values for all Amazon EMR configuration classifications. For more information, see Configuring Applications in the Amazon EMR Release Guide, the EmrConfiguration, and Property object references.

Support for IMDSv2

Earlier, AWS Data Pipeline supported only IMDSv1. Now, AWS Data Pipeline supports IMDSv2 in Amazon EMR 5.23.1, 5.27.1, and 5.32 or later, and Amazon EMR 6.2 or later. IMDSv2 uses a session-oriented method to better handle authentication when retrieving metadata information from instances. You should configure your instances to make IMDSv2 calls by creating user-managed resources using TaskRunner-2.0.

Amazon EMR 5.32 or later and Amazon EMR 6.x

The Amazon EMR 5.32 or later and 6.x release series uses Hadoop version 3.x, which introduced breaking changes in how Hadoop's classpath is evaluated as compared to Hadoop version 2.x. Common libraries like Joda-Time were removed from the classpath.

If EmrActivity or HadoopActivity runs a Jar file that has dependencies on a library that was removed in Hadoop 3.x, the step fails with the error java.lang.NoClassDefFoundError or java.lang.ClassNotFoundException. This can happen for Jar files that ran with no issues using Amazon EMR 5.x release versions.

To fix the issue, you must copy Jar file dependencies to the Hadoop classpath on an EmrCluster object before starting the EmrActivity or the HadoopActivity. We provide a bash script to do this. The bash script is available in the following location, where MyRegion is the AWS Region where your EmrCluster object runs, for example us-west-2.

s3://datapipeline-MyRegion/MyRegion/bootstrap-actions/latest/TaskRunner/copy-jars-to-hadoop-classpath.sh

The way to run the script depends on whether EmrActivity or HadoopActivity runs on a resource managed by AWS Data Pipeline or runs on a self-managed resource.

If you use a resource managed by AWS Data Pipeline, add a bootstrapAction to the EmrCluster object. The bootstrapAction specifies the script and the Jar files to copy as arguments. You can add up to 255 bootstrapAction fields per EmrCluster object, and you can add a bootstrapAction field to an EmrCluster object that already has bootstrap actions.

To specify this script as a bootstrap action, use the following syntax, where JarFileRegion is the Region where the Jar file is saved, and each MyJarFilen is the absolute path in Amazon S3 of a Jar file to be copied to the Hadoop classpath. Do not specify Jar files that are in the Hadoop classpath by default.

s3://datapipeline-MyRegion/MyRegion/bootstrap-actions/latest/TaskRunner/copy-jars-to-hadoop-classpath.sh,JarFileRegion,MyJarFile1,MyJarFile2[, ...]

The following example specifies a bootstrap action that copies two Jar files in Amazon S3: my-jar-file.jar and the emr-dynamodb-tool-4.14.0-jar-with-dependencies.jar. The Region used in the example is us-west-2.

{ "id" : "MyEmrCluster", "type" : "EmrCluster", "keyPair" : "my-key-pair", "masterInstanceType" : "m5.xlarge", "coreInstanceType" : "m5.xlarge", "coreInstanceCount" : "2", "taskInstanceType" : "m5.xlarge", "taskInstanceCount": "2", "bootstrapAction" : ["s3://datapipeline-us-west-2/us-west-2/bootstrap-actions/latest/TaskRunner/copy-jars-to-hadoop-classpath.sh,us-west-2,s3://path/to/my-jar-file.jar,s3://dynamodb-dpl-us-west-2/emr-ddb-storage-handler/4.14.0/emr-dynamodb-tools-4.14.0-jar-with-dependencies.jar"] }

You must save and activate the pipeline for the change to the new bootstrapAction to take effect.

If you use a self-managed resource, you can download the script to the cluster instance and run it from the command line using SSH. The script creates a directory named /etc/hadoop/conf/shellprofile.d and a file named datapipeline-jars.sh in that directory. The jar files provided as command-line arguments are copied to a directory that the script creates named /home/hadoop/datapipeline_jars. If your cluster is set up differently, modify the script appropriately after downloading it.

The syntax for running the script on the command line is slightly different from using the bootstrapAction shown in the previous example. Use spaces instead of commas between arguments, as shown in the following example.

./copy-jars-to-hadoop-classpath.sh us-west-2 s3://path/to/my-jar-file.jar s3://dynamodb-dpl-us-west-2/emr-ddb-storage-handler/4.14.0/emr-dynamodb-tools-4.14.0-jar-with-dependencies.jar

Amazon EMR permissions

When you create a custom IAM role, carefully consider the minimum permissions necessary for your cluster to perform its work. Be sure to grant access to required resources, such as files in Amazon S3 or data in Amazon RDS, Amazon Redshift, or DynamoDB. If you wish to set visibleToAllUsers to False, your role must have the proper permissions to do so. Note that DataPipelineDefaultRole does not have these permissions. You must either provide a union of the DefaultDataPipelineResourceRole and DataPipelineDefaultRole roles as the EmrCluster object role, or create your own role for this purpose.

Syntax

Object Invocation Fields Description Slot Type
schedule This object is invoked within the execution of a schedule interval. Specify a schedule reference to another object to set the dependency execution order for this object. You can satisfy this requirement by explicitly setting a schedule on the object, for example, by specifying "schedule": {"ref": "DefaultSchedule"}. In most cases, it is better to put the schedule reference on the default pipeline object so that all objects inherit that schedule. Or, if the pipeline has a tree of schedules (schedules within the master schedule), you can create a parent object that has a schedule reference. For more information about example optional schedule configurations, see https://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-object-schedule.html Reference Object, for example, "schedule":{"ref":"myScheduleId"}

Optional Fields Description Slot Type
actionOnResourceFailure The action taken after a resource failure for this resource. Valid values are "retryall", which retries all tasks to the cluster for the specified duration, and "retrynone". String
actionOnTaskFailure The action taken after task failure for this resource. Valid values are "continue", meaning do not terminate the cluster, and "terminate." String
additionalMasterSecurityGroupIds The identifier of additional master security groups of the EMR cluster, which follows the form sg-01XXXX6a. For more information, see Amazon EMR Additional Security Groups in the Amazon EMR Management Guide. String
additionalSlaveSecurityGroupIds The identifier of additional slave security groups of the EMR cluster, which follows the form sg-01XXXX6a. String
amiVersion The Amazon Machine Image (AMI) version that Amazon EMR uses to install the cluster nodes. For more information, see the Amazon EMR Management Guide. String
applications Applications to install in the cluster with comma-separated arguments. By default, Hive and Pig are installed. This parameter is applicable only for Amazon EMR version 4.0 and later. String
attemptStatus The most recently reported status from the remote activity. String
attemptTimeout Timeout for remote work completion. If set, then a remote activity that does not complete within the set time of starting may be retried. Period
availabilityZone The Availability Zone in which to run the cluster. String
bootstrapAction An action to run when the cluster starts. You can specify comma-separated arguments. To specify multiple actions, up to 255, add multiple bootstrapAction fields. The default behavior is to start the cluster without any bootstrap actions. String
configuration Configuration for the Amazon EMR cluster. This parameter is applicable only for Amazon EMR version 4.0 and later. Reference Object, for example, "configuration":{"ref":"myEmrConfigurationId"}
coreInstanceBidPrice The maximum Spot price your are willing to pay for Amazon EC2 instances. If a bid price is specified, Amazon EMR uses Spot Instances for the instance group. Specified in USD. String
coreInstanceCount The number of core nodes to use for the cluster. Integer
coreInstanceType The type of Amazon EC2 instance to use for core nodes. See Supported Amazon EC2 Instances for Amazon EMR Clusters . String
coreGroupConfiguration The configuration for the Amazon EMR cluster core instance group. This parameter is applicable only for Amazon EMR version 4.0 and later. Reference Object, for example “configuration”: {“ref”: “myEmrConfigurationId”}
coreEbsConfiguration The configuration for Amazon EBS volumes that will be attached to each of the core nodes in the core group in the Amazon EMR cluster. For more information, see Instance Types That Support EBS Optimization in the Amazon EC2 User Guide for Linux Instances. Reference Object, for example “coreEbsConfiguration”: {“ref”: “myEbsConfiguration”}
customAmiId Applies only to Amazon EMR release version 5.7.0 and later. Specifies the AMI ID of a custom AMI to use when Amazon EMR provisions Amazon EC2 instances. It can also be used instead of bootstrap actions to customize cluster node configurations. For more information, see the following topic in the Amazon EMR Management Guide. Using a custom AMI String
EbsBlockDeviceConfig

The configuration of a requested Amazon EBS block device associated with the instance group. Includes a specified number of volumes that will be associated with each instance in the instance group. Includes volumesPerInstance and volumeSpecification, where:

  • volumesPerInstance is the number of EBS volumes with a specific volume configuration that will be associated with each instance in the instance group.

  • volumeSpecification is the Amazon EBS volume specifications, such as volume type, IOPS, and size in Gigibytes (GiB) that will be requested for the EBS volume attached to an EC2 instance in the Amazon EMR cluster.

Reference Object, for example “EbsBlockDeviceConfig”: {“ref”: “myEbsBlockDeviceConfig”}
emrManagedMasterSecurityGroupId The identifier of the master security group of the Amazon EMR cluster, which follows the form of sg-01XXXX6a. For more information, see Configure Security Groups in the Amazon EMR Management Guide. String
emrManagedSlaveSecurityGroupId The identifier of the slave security group of the Amazon EMR cluster, which follows the form sg-01XXXX6a. String
enableDebugging Enables debugging on the Amazon EMR cluster. String
failureAndRerunMode Describes consumer node behavior when dependencies fail or are rerun. Enumeration
hadoopSchedulerType The scheduler type of the cluster. Valid types are: PARALLEL_FAIR_SCHEDULING, PARALLEL_CAPACITY_SCHEDULING, and DEFAULT_SCHEDULER. Enumeration
httpProxy The proxy host that clients use to connect to AWS services. Reference Object, for example, "httpProxy":{"ref":"myHttpProxyId"}
initTimeout The amount of time to wait for the resource to start. Period
keyPair The Amazon EC2 key pair to use to log on to the master node of the Amazon EMR cluster. String
lateAfterTimeout The elapsed time after pipeline start within which the object must complete. It is triggered only when the schedule type is not set to ondemand. Period
masterInstanceBidPrice The maximum Spot price your are willing to pay for Amazon EC2 instances. It is a decimal value between 0 and 20.00, exclusive. Specified in USD. Setting this value enables Spot Instances for the Amazon EMR cluster master node. If a bid price is specified, Amazon EMR uses Spot Instances for the instance group. String
masterInstanceType The type of Amazon EC2 instance to use for the master node. See Supported Amazon EC2 Instances for Amazon EMR Clusters . String
masterGroupConfiguration The configuration for the Amazon EMR cluster master instance group. This parameter is applicable only for Amazon EMR version 4.0 and later. Reference Object, for example “configuration”: {“ref”: “myEmrConfigurationId”}
masterEbsConfiguration The configuration for Amazon EBS volumes that will be attached to each of the master nodes in the master group in the Amazon EMR cluster. For more information, see Instance Types That Support EBS Optimization in the Amazon EC2 User Guide for Linux Instances. Reference Object, for example “masterEbsConfiguration”: {“ref”: “myEbsConfiguration”}
maxActiveInstances The maximum number of concurrent active instances of a component. Re-runs do not count toward the number of active instances. Integer
maximumRetries Maximum number attempt retries on failure. Integer
onFail An action to run when the current object fails. Reference Object, for example, "onFail":{"ref":"myActionId"}
onLateAction Actions that should be triggered if an object has not yet been scheduled or is still not completed. Reference Object, for example, "onLateAction":{"ref":"myActionId"}
onSuccess An action to run when the current object succeeds. Reference Object, for example, "onSuccess":{"ref":"myActionId"}
parent Parent of the current object from which slots are inherited. Reference Object, for example. "parent":{"ref":"myBaseObjectId"}
pipelineLogUri The Amazon S3 URI (such as 's3://BucketName/Key/') for uploading logs for the pipeline. String
region The code for the region that the Amazon EMR cluster should run in. By default, the cluster runs in the same region as the pipeline. You can run the cluster in the same region as a dependent dataset. Enumeration
releaseLabel Release label for the EMR cluster. String
reportProgressTimeout Timeout for remote work successive calls to reportProgress. If set, then remote activities that do not report progress for the specified period may be considered stalled and so retried. Period
resourceRole The IAM role that AWS Data Pipeline uses to create the Amazon EMR cluster. The default role is DataPipelineDefaultRole. String
retryDelay The timeout duration between two retry attempts. Period
role The IAM role passed to Amazon EMR to create EC2 nodes. String
runsOn This field is not allowed on this object. Reference Object, for example, "runsOn":{"ref":"myResourceId"}
securityConfiguration The identifier of the EMR security configuration that will be applied to the cluster. This parameter is applicable only for Amazon EMR version 4.8.0 and later. String
serviceAccessSecurityGroupId The identifier for the service access security group of the Amazon EMR cluster. String. It follows the form of sg-01XXXX6a, for example, sg-1234abcd.
scheduleType Schedule type allows you to specify whether the objects in your pipeline definition should be scheduled at the beginning of the interval, or end of the interval. Values are: cron, ondemand, and timeseries. The timeseries scheduling means that instances are scheduled at the end of each interval. The cron scheduling means that instances are scheduled at the beginning of each interval. An ondemand schedule allows you to run a pipeline one time per activation. You do not have to clone or re-create the pipeline to run it again. If you use an ondemand schedule, it must be specified in the default object and must be the only scheduleType specified for objects in the pipeline. To use ondemand pipelines, call the ActivatePipeline operation for each subsequent run. Enumeration
subnetId The identifier of the subnet into which to launch the Amazon EMR cluster. String
supportedProducts A parameter that installs third-party software on an Amazon EMR cluster, for example, a third-party distribution of Hadoop. String
taskInstanceBidPrice The maximum Spot price your are willing to pay for EC2 instances. A decimal value between 0 and 20.00, exclusive. Specified in USD. If a bid price is specified, Amazon EMR uses Spot Instances for the instance group. String
taskInstanceCount The number of task nodes to use for the Amazon EMR cluster. Integer
taskInstanceType The type of Amazon EC2 instance to use for task nodes. String
taskGroupConfiguration The configuration for the Amazon EMR cluster task instance group. This parameter is applicable only for Amazon EMR version 4.0 and later. Reference Object, for example “configuration”: {“ref”: “myEmrConfigurationId”}
taskEbsConfiguration The configuration for Amazon EBS volumes that will be attached to each of the task nodes in the task group in the Amazon EMR cluster. For more information, see Instance Types That Support EBS Optimization in the Amazon EC2 User Guide for Linux Instances. Reference Object, for example “taskEbsConfiguration”: {“ref”: “myEbsConfiguration”}
terminateAfter Terminate the resource after these many hours. Integer
VolumeSpecification

The Amazon EBS volume specifications, such as volume type, IOPS, and size in Gigibytes (GiB) that will be requested for the Amazon EBS volume attached to an Amazon EC2 instance in the Amazon EMR cluster. The node can be a core, master or task node.

The VolumeSpecification includes:

  • iops() Integer. The number of I/O operations per second (IOPS) that the Amazon EBS volume supports, for example, 1000. For more information, see EBS I/O Characteristics in the Amazon EC2 User Guide for Linux Instances.

  • sizeinGB(). Integer. The Amazon EBS volume size, in gibibytes (GiB), for example 500. For information about valid combinations of volume types and hard drive sizes, see EBS Volume Types in the Amazon EC2 User Guide for Linux Instances.

  • volumetType. String. The Amazon EBS volume type, for example, gp2. The supported volume types include standard, gp2, io1, st1, sc1, and others. For more information, see EBS Volume Types in the Amazon EC2 User Guide for Linux Instances.

Reference Object, for example “VolumeSpecification”: {“ref”: “myVolumeSpecification”}
useOnDemandOnLastAttempt On the last attempt to request a resource, make a request for On-Demand Instances rather than Spot Instances. This ensures that if all previous attempts have failed, the last attempt is not interrupted. Boolean
workerGroup Field not allowed on this object. String

Runtime Fields Description Slot Type
@activeInstances List of the currently scheduled active instance objects. Reference Object, for example, "activeInstances":{"ref":"myRunnableObjectId"}
@actualEndTime Time when the execution of this object finished. DateTime
@actualStartTime Time when the execution of this object started. DateTime
cancellationReason The cancellationReason if this object was cancelled. String
@cascadeFailedOn Description of the dependency chain on which the object failed. Reference Object, for example, "cascadeFailedOn":{"ref":"myRunnableObjectId"}
emrStepLog Step logs available only on Amazon EMR activity attempts. String
errorId The error ID if this object failed. String
errorMessage The error message if this object failed. String
errorStackTrace The error stack trace if this object failed. String
@failureReason The reason for the resource failure. String
@finishedTime The time at which this object finished its execution. DateTime
hadoopJobLog Hadoop job logs available on attempts for Amazon EMR activities. String
@healthStatus The health status of the object that reflects success or failure of the last object instance that reached a terminated state. String
@healthStatusFromInstanceId ID of the last instance object that reached a terminated state. String
@healthStatusUpdatedTime Time at which the health status was updated last time. DateTime
hostname The host name of client that picked up the task attempt. String
@lastDeactivatedTime The time at which this object was last deactivated. DateTime
@latestCompletedRunTime Time the latest run for which the execution completed. DateTime
@latestRunTime Time the latest run for which the execution was scheduled. DateTime
@nextRunTime Time of run to be scheduled next. DateTime
reportProgressTime Most recent time that remote activity reported progress. DateTime
@scheduledEndTime Schedule end time for object. DateTime
@scheduledStartTime Schedule start time for object. DateTime
@status The status of this object. String
@version Pipeline version with which the object was created. String
@waitingOn Description of the list of dependencies on which this object is waiting. Reference Object, for example, "waitingOn":{"ref":"myRunnableObjectId"}

System Fields Description Slot Type
@error Error describing the ill-formed object. String
@pipelineId ID of the pipeline to which this object belongs. String
@sphere The place of an object in the lifecycle. Component objects give rise to instance objects, which execute attempt objects. String

See Also