HadoopActivity - AWS Data Pipeline

AWS Data Pipeline 不再向新客户提供。的现有客户 AWS Data Pipeline 可以继续照常使用该服务。了解更多

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

HadoopActivity

在集群上运行 MapReduce 作业。群集可以是由管理的EMR群集, AWS Data Pipeline 也可以是其他资源(如果您使用) TaskRunner。 HadoopActivity 当你想并行运行工作时使用。这允许您使用YARN框架的调度资源或 Hadoop MapReduce 1 中的资源协商器。如果您想使用 Amazon Ste EMR p 操作按顺序运行工作,您仍然可以使用。EmrActivity

示例

HadoopActivity 使用由管理的EMR集群 AWS Data Pipeline

以下 HadoopActivity 对象使用 EmrCluster 资源来运行程序:

{ "name": "MyHadoopActivity", "schedule": {"ref": "ResourcePeriod"}, "runsOn": {"ref": “MyEmrCluster”}, "type": "HadoopActivity", "preActivityTaskConfig":{"ref":"preTaskScriptConfig”}, "jarUri": "/home/hadoop/contrib/streaming/hadoop-streaming.jar", "argument": [ "-files", “s3://elasticmapreduce/samples/wordcount/wordSplitter.py“, "-mapper", "wordSplitter.py", "-reducer", "aggregate", "-input", "s3://elasticmapreduce/samples/wordcount/input/", "-output", “s3://test-bucket/MyHadoopActivity/#{@pipelineId}/#{format(@scheduledStartTime,'YYYY-MM-dd')}" ], "maximumRetries": "0", "postActivityTaskConfig":{"ref":"postTaskScriptConfig”}, "hadoopQueue" : “high” }

这是相应的 MyEmrCluster,它为基YARN于 Hadoop 2 配置 FairScheduler 和队列:AMIs

{ "id" : "MyEmrCluster", "type" : "EmrCluster", "hadoopSchedulerType" : "PARALLEL_FAIR_SCHEDULING", “amiVersion” : “3.7.0”, "bootstrapAction" : ["s3://Region.elasticmapreduce/bootstrap-actions/configure-hadoop,-z,yarn.scheduler.capacity.root.queues=low\,high\,default,-z,yarn.scheduler.capacity.root.high.capacity=50,-z,yarn.scheduler.capacity.root.low.capacity=10,-z,yarn.scheduler.capacity.root.default.capacity=30”] }

这是 EmrCluster 你在 Hadoop 1 FairScheduler 中用来配置的:

{ "id": "MyEmrCluster", "type": "EmrCluster", "hadoopSchedulerType": "PARALLEL_FAIR_SCHEDULING", "amiVersion": "2.4.8", "bootstrapAction": "s3://Region.elasticmapreduce/bootstrap-actions/configure-hadoop,-m,mapred.queue.names=low\\\\,high\\\\,default,-m,mapred.fairscheduler.poolnameproperty=mapred.job.queue.name" }

以下是基 CapacityScheduler 于 Hadoop 2 的 EmrCluster 配置:AMIs

{ "id": "MyEmrCluster", "type": "EmrCluster", "hadoopSchedulerType": "PARALLEL_CAPACITY_SCHEDULING", "amiVersion": "3.7.0", "bootstrapAction": "s3://Region.elasticmapreduce/bootstrap-actions/configure-hadoop,-z,yarn.scheduler.capacity.root.queues=low\\\\,high,-z,yarn.scheduler.capacity.root.high.capacity=40,-z,yarn.scheduler.capacity.root.low.capacity=60" }
HadoopActivity 使用现有EMR集群

在此示例中,您使用工作组和在现有 TaskRunner EMR集群上运行程序。以下管道定义用 HadoopActivity 于:

{ "objects": [ { "argument": [ "-files", "s3://elasticmapreduce/samples/wordcount/wordSplitter.py", "-mapper", "wordSplitter.py", "-reducer", "aggregate", "-input", "s3://elasticmapreduce/samples/wordcount/input/", "-output", "s3://test-bucket/MyHadoopActivity/#{@pipelineId}/#{format(@scheduledStartTime,'YYYY-MM-dd')}" ], "id": "MyHadoopActivity", "jarUri": "/home/hadoop/contrib/streaming/hadoop-streaming.jar", "name": "MyHadoopActivity", "type": "HadoopActivity" }, { "id": "SchedulePeriod", "startDateTime": "start_datetime", "name": "SchedulePeriod", "period": "1 day", "type": "Schedule", "endDateTime": "end_datetime" }, { "id": "ShellScriptConfig", "scriptUri": "s3://test-bucket/scripts/preTaskScript.sh", "name": "preTaskScriptConfig", "scriptArgument": [ "test", "argument" ], "type": "ShellScriptConfig" }, { "id": "ShellScriptConfig", "scriptUri": "s3://test-bucket/scripts/postTaskScript.sh", "name": "postTaskScriptConfig", "scriptArgument": [ "test", "argument" ], "type": "ShellScriptConfig" }, { "id": "Default", "scheduleType": "cron", "schedule": { "ref": "SchedulePeriod" }, "name": "Default", "pipelineLogUri": "s3://test-bucket/logs/2015-05-22T18:02:00.343Z642f3fe415", "maximumRetries": "0", "workerGroup": "myWorkerGroup", "preActivityTaskConfig": { "ref": "preTaskScriptConfig" }, "postActivityTaskConfig": { "ref": "postTaskScriptConfig" } } ] }

语法

必填字段 描述 槽类型
jarUri JAR在 Amazon S3 中的位置或要运行的集群的本地文件系统 HadoopActivity。 String

对象调用字段 描述 槽类型
schedule 该对象在计划间隔的执行中调用。用户必须指定对另一个对象的计划引用,以便设置该对象的依赖项执行顺序。用户可以通过在对象上显式设置时间表来满足此要求,例如,指定 “schedule”: {"ref”: "DefaultSchedule“}。在大多数情况下,最好将计划引用放在默认管道对象上,以便所有对象继承该计划。或者,如果管道有一个计划树 (计划位于主计划中),用户可以创建具有计划引用的父对象。有关示例可选计划配置的更多信息,请参阅 https://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-object-schedule.html 参考对象,例如 “日程安排”:{“ref”:” myScheduleId “}

所需的组 (下列选项之一是必需的) 描述 槽类型
runsOn EMR此作业将在其上运行的集群。 引用对象,例如 runsOn ““: {" ref”:” myEmrCluster Id "}
workerGroup 工作线程组。这可用于路由任务。如果您提供了一个 runsOn 值并且 workerGroup 存在,则会 workerGroup 被忽略。 String

可选字段 描述 槽类型
argument 要传递给的参数JAR。 String
attemptStatus 来自远程活动的最近报告的状态。 String
attemptTimeout 远程工作完成的超时时间。如果设置此字段,则可能会重试未在设定的开始时间内完成的远程活动。 周期
dependsOn 指定与另一个可运行对象的依赖关系。 引用对象,例如 dependsOn ““: {" ref”:” myActivityId “}
failureAndRerun模式 描述依赖项失败或重新运行时的使用者节点行为。 枚举
hadoopQueue 将在其上提交活动的 Hadoop 计划程序队列名。 String
input 输入数据的位置。 参考对象,例如 “输入”:{"ref”:” myDataNode Id "}
lateAfterTimeout 管道启动后经过的时间,在此时间内,对象必须完成。仅当计划类型未设置为 ondemand 时才会触发。 周期
mainClass JAR你正在执行的主类 HadoopActivity。 String
maxActiveInstances 组件的并发活动实例的最大数量。重新运行不计入活动实例数中。 整数
maximumRetries 失败后的最大重试次数 整数
onFail 当前对象失败时要运行的操作。 引用对象,例如 onFail ““: {" ref”:” myActionId “}
onLateAction 在尚未计划对象或对象仍未完成的情况下将触发的操作。 引用对象,例如 onLateAction ““: {" ref”:” myActionId “}
onSuccess 当前对象成功时要运行的操作。 引用对象,例如 onSuccess ““: {" ref”:” myActionId “}
output 输出数据的位置。 参考对象,例如 “输出”:{"ref”:” myDataNode Id "}
parent 槽将继承自的当前对象的父级。 引用对象,例如 “父对象”:{"ref”:” myBaseObject Id "}
pipelineLogUri 用于上传管道日志的 S3URI(例如 's3: BucketName ///Key/ ')。 String
postActivityTaskConfig 要运行的活动后配置脚本。它由 Amazon S3 中的 shell 脚本和参数列表组成。URI 参考对象,例如 “postActivityTaskConfig”:{“ref”:” myShellScript ConfigId “}
preActivityTaskConfig 要运行的活动前配置脚本。它由 Amazon S3 中的 shell 脚本和参数列表组成。URI 参考对象,例如 “preActivityTaskConfig”:{“ref”:” myShellScript ConfigId “}
precondition (可选) 定义先决条件。在满足所有先决条件之前,数据节点不会被标记 READY “”。 参考对象,例如 “前提条件”:{“ref”:” myPreconditionId “}
reportProgressTimeout 远程办公连续调用超时reportProgress。如果设置此字段,则未报告指定时段的进度的远程活动可能会被视为停滞且已重试。 周期
retryDelay 两次重试之间的超时时间。 周期
scheduleType 计划类型允许您指定应在间隔的结尾还是开头计划您管道定义中的对象。时间序列风格计划表示在每次间隔的结尾计划实例,而 Cron 风格计划表示应在每次间隔的开头计划实例。按需计划让您可以在每次激活时运行一次管道。这意味着,您不需要克隆或重新创建管道以再次运行它。如果您使用按需计划,则必须在默认对象中指定该计划,并且必须是唯一为管道中的对象 scheduleType 指定的计划。要使用按需管道,您只需为后续每次运行调用该 ActivatePipeline 操作即可。值包括:cron、ondemand 和 timeseries。 枚举

运行时字段 描述 槽类型
@activeInstances 当前计划的有效实例对象的列表。 引用对象,例如 activeInstances ““: {" ref”:” myRunnableObject Id "}
@actualEndTime 该对象的执行完成时间。 DateTime
@actualStartTime 该对象的执行开始时间。 DateTime
cancellationReason cancellationReason 如果此对象已取消,则为。 String
@cascadeFailedOn 对象在其上失败的依赖项链的描述。 引用对象,例如 cascadeFailedOn ““: {" ref”:” myRunnableObject Id "}
emrStepLog EMR步骤日志仅在尝试EMR活动时可用 String
errorId errorId 如果此对象失败,则为。 String
errorMessage errorMessage 如果此对象失败,则为。 String
errorStackTrace 该对象失败时显示的错误堆栈跟踪。 String
@finishedTime 该对象完成其执行的时间。 DateTime
hadoopJobLog Hadoop 作业日志可用于尝试进行EMR基于活动的情况。 String
@healthStatus 对象的运行状况,反映进入终止状态的上个对象实例成功还是失败。 String
@healthStatusFromInstanceId 进入终止状态的上个实例对象的 ID。 String
@ T healthStatusUpdated ime 上次更新运行状况的时间。 DateTime
hostname 已执行任务尝试的客户端的主机名。 String
@lastDeactivatedTime 上次停用该对象的时间。 DateTime
@ T latestCompletedRun ime 已完成执行的最新运行的时间。 DateTime
@latestRunTime 已计划执行的最新运行的时间。 DateTime
@nextRunTime 计划下次运行的时间。 DateTime
reportProgressTime 远程活动报告进度的最近时间。 DateTime
@scheduledEndTime 对象的计划结束时间。 DateTime
@scheduledStartTime 对象的计划开始时间。 DateTime
@status 该对象的状态。 String
@version 用来创建对象的管道版本。 String
@waitingOn 该对象在其上处于等待状态的依赖项列表的描述。 引用对象,例如 waitingOn ““: {" ref”:” myRunnableObject Id "}

系统字段 描述 槽类型
@error 用于描述格式不正确的对象的错误消息。 String
@pipelineId 该对象所属的管道的 ID。 String
@sphere 对象的范围指明对象在生命周期中的位置:组件对象产生实例对象,后者执行尝试对象。 String

另请参阅