HadoopActivity - AWS Data Pipeline

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

HadoopActivity

在叢集上執行 MapReduce 工作。叢集可以是由管理的 EMR 叢集,也可以是其他 AWS Data Pipeline 資源 (如果您使用 TaskRunner)。 HadoopActivity 當您要 parallel 執行工作時使用。這使您可以在 Hadoop 1 中使用 YARN 框架或 MapReduce 資源談判代表的調度資源。如果您想要使用 Amazon EMR 步驟動作依序執行工作,您仍然可以使用。EmrActivity

範例

HadoopActivity 使用管理的 EMR 叢集 AWS Data Pipeline

下列 HadoopActivity 物件會使用 EmrCluster 資源來執行程式:

{ "name": "MyHadoopActivity", "schedule": {"ref": "ResourcePeriod"}, "runsOn": {"ref": “MyEmrCluster”}, "type": "HadoopActivity", "preActivityTaskConfig":{"ref":"preTaskScriptConfig”}, "jarUri": "/home/hadoop/contrib/streaming/hadoop-streaming.jar", "argument": [ "-files", “s3://elasticmapreduce/samples/wordcount/wordSplitter.py“, "-mapper", "wordSplitter.py", "-reducer", "aggregate", "-input", "s3://elasticmapreduce/samples/wordcount/input/", "-output", “s3://test-bucket/MyHadoopActivity/#{@pipelineId}/#{format(@scheduledStartTime,'YYYY-MM-dd')}" ], "maximumRetries": "0", "postActivityTaskConfig":{"ref":"postTaskScriptConfig”}, "hadoopQueue" : “high” }

以下是對應的 MyEmrCluster,它在 YARN 中為基於 Hadoop 2 的 AMI 配置 FairScheduler 和隊列:

{ "id" : "MyEmrCluster", "type" : "EmrCluster", "hadoopSchedulerType" : "PARALLEL_FAIR_SCHEDULING", “amiVersion” : “3.7.0”, "bootstrapAction" : ["s3://Region.elasticmapreduce/bootstrap-actions/configure-hadoop,-z,yarn.scheduler.capacity.root.queues=low\,high\,default,-z,yarn.scheduler.capacity.root.high.capacity=50,-z,yarn.scheduler.capacity.root.low.capacity=10,-z,yarn.scheduler.capacity.root.default.capacity=30”] }

這是 EmrCluster 您在 Hadoop 1 FairScheduler 中使用的配置:

{ "id": "MyEmrCluster", "type": "EmrCluster", "hadoopSchedulerType": "PARALLEL_FAIR_SCHEDULING", "amiVersion": "2.4.8", "bootstrapAction": "s3://Region.elasticmapreduce/bootstrap-actions/configure-hadoop,-m,mapred.queue.names=low\\\\,high\\\\,default,-m,mapred.fairscheduler.poolnameproperty=mapred.job.queue.name" }

以下是針對以 Hadoop 2 為 EmrCluster 基礎的 CapacityScheduler AMI 進行設定:

{ "id": "MyEmrCluster", "type": "EmrCluster", "hadoopSchedulerType": "PARALLEL_CAPACITY_SCHEDULING", "amiVersion": "3.7.0", "bootstrapAction": "s3://Region.elasticmapreduce/bootstrap-actions/configure-hadoop,-z,yarn.scheduler.capacity.root.queues=low\\\\,high,-z,yarn.scheduler.capacity.root.high.capacity=40,-z,yarn.scheduler.capacity.root.low.capacity=60" }
HadoopActivity 使用現有的 EMR 叢集

在此範例中,您可 TaskRunner 以使用工作者群組和 a 在現有 EMR 叢集上執行程式。下列配管定義用 HadoopActivity 於:

{ "objects": [ { "argument": [ "-files", "s3://elasticmapreduce/samples/wordcount/wordSplitter.py", "-mapper", "wordSplitter.py", "-reducer", "aggregate", "-input", "s3://elasticmapreduce/samples/wordcount/input/", "-output", "s3://test-bucket/MyHadoopActivity/#{@pipelineId}/#{format(@scheduledStartTime,'YYYY-MM-dd')}" ], "id": "MyHadoopActivity", "jarUri": "/home/hadoop/contrib/streaming/hadoop-streaming.jar", "name": "MyHadoopActivity", "type": "HadoopActivity" }, { "id": "SchedulePeriod", "startDateTime": "start_datetime", "name": "SchedulePeriod", "period": "1 day", "type": "Schedule", "endDateTime": "end_datetime" }, { "id": "ShellScriptConfig", "scriptUri": "s3://test-bucket/scripts/preTaskScript.sh", "name": "preTaskScriptConfig", "scriptArgument": [ "test", "argument" ], "type": "ShellScriptConfig" }, { "id": "ShellScriptConfig", "scriptUri": "s3://test-bucket/scripts/postTaskScript.sh", "name": "postTaskScriptConfig", "scriptArgument": [ "test", "argument" ], "type": "ShellScriptConfig" }, { "id": "Default", "scheduleType": "cron", "schedule": { "ref": "SchedulePeriod" }, "name": "Default", "pipelineLogUri": "s3://test-bucket/logs/2015-05-22T18:02:00.343Z642f3fe415", "maximumRetries": "0", "workerGroup": "myWorkerGroup", "preActivityTaskConfig": { "ref": "preTaskScriptConfig" }, "postActivityTaskConfig": { "ref": "postTaskScriptConfig" } } ] }

語法

必要欄位 描述 槽類型
jarUri Amazon S3 中 JAR 的位置或要與之一起執行的叢集本機檔案系統 HadoopActivity。 字串

物件呼叫欄位 描述 槽類型
schedule 在排程間隔的執行期間會呼叫此物件。使用者必須指定另一個物件的排程參考,設定此物件的相依性執行順序。使用者可以在物件上明確設定排程來滿足此需求,例如指定「schedule」: {"ref」: DefaultSchedule "}。在大部分的情況下,建議您將排程參考放在預設的管道物件,讓所有物件都繼承該排程。或者,如果管道有排程的樹狀目錄 (主排程內還有排程),使用者可以建立有排程參考的父物件。如需範例選用排程組態的詳細資訊,請參閱https://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-object-schedule.html 參考對象,例如「時間表」:{「ref」:」myScheduleId「}

必要的群組 (下列其中之一為必要) 描述 槽類型
runsOn 要在其中執行此任務的 EMR 叢集。 引用對象,例如「runsOn」:{「參考」:「myEmrClusterID」}
workerGroup 工作者群組。這是用於路由任務。如果您提供 runsOn 值,且 workerGroup 存在,則會忽略 workerGroup。 字串

選用欄位 描述 槽類型
argument 要傳遞給 JAR 的引數。 字串
attemptStatus 遠端活動最新回報的狀態。 字串
attemptTimeout 遠端工作完成的逾時。如果設定,則系統可能會重試未在設定開始時間內完成的遠端活動。 期間
dependsOn 指定與另一個可執行物件的相依性。 引用對象,例如「依賴信息」:{「參考」:myActivityId「}
failureAndRerun模式 描述相依性故障或重新執行時的消費者節點行為 列舉
hadoopQueue 要提交活動至其中的 Hadoop 排程器佇列名稱。 字串
input 輸入資料的位置。 引用對象,例如「輸入」:{「參考」:「myDataNodeID」}
lateAfterTimeout 管線開始後,物件必須在其中完成的經過時間。僅當明細表類型未設定為時,才會觸發此選項ondemand 期間
mainClass 您正在執行的 JAR 的主要類別 HadoopActivity。 字串
maxActiveInstances 同時作用中的元件執行個體數目上限。重新執行不計入作用中的執行個體數量。 Integer
maximumRetries 故障時嘗試重試的次數上限 Integer
onFail 目前物件發生故障時要執行的動作。 引用對象,例如「onFail」:{「參考」:myActionId「}
onLateAction 某個物件尚未排程或仍未完成時,應該觸發的動作。 引用對象,例如 onLateAction「「:{" ref」:」myActionId「}
onSuccess 目前物件成功時要執行的動作。 引用對象,例如「onSuccess」:{「裁判」:myActionId「}
output 輸出資料的位置。 引用對象,例如「輸出」:{「ref」:「myDataNodeID」}
parent 目前物件的父系,其插槽會被繼承。 引用對象,例如「父」:{「ref」:「myBaseObjectID」}
pipelineLogUri 用於上傳管道日誌的 S3 URI(例如 's3://BucketName/密鑰/')。 字串
postActivityTaskConfig 要執行的活動後組態指令碼。這包含 Amazon S3 中的 shell 指令碼 URI 和引數清單。 引用對象,例如「postActivityTaskConfig」:{「ref」:」myShellScript ConfigId「}
preActivityTaskConfig 要執行的活動前組態指令碼。這包含 Amazon S3 中的 shell 指令碼 URI 和引數清單。 引用對象,例如「preActivityTaskConfig」:{「ref」:」myShellScript ConfigId「}
precondition 選擇是否定義先決條件。在所有先決條件滿足前,資料節點不會標示為"READY"。 引用對象,例如「前提條件」:{「ref」:」myPreconditionId「}
reportProgressTimeout 遠端工作連續呼叫 reportProgress 的逾時。如果設定,則不回報指定時段進度的遠端活動,可能會視為已停滯而重試。 期間
retryDelay 兩次重試嘗試之間的逾時持續時間。 期間
scheduleType 排程類型可讓您指定管道定義的物件應該排程在間隔開頭還是間隔結尾。時間序列樣式排程表示執行個體排程在每個間隔的結尾,而 Cron 樣式排程表示執行個體排程在每個間隔的開頭。隨需排程可讓您在每次啟用時執行一次管道。這表示您不必複製或重新建立管道,然後再執行一次。若您使用隨需排程,則必須在預設物件中指定此排程,且其必須是針對管道中物件指定的唯一 scheduleType。若要使用隨選管線,只要呼叫每次後續執行的 ActivatePipeline 作業即可。值為:Cron、ondemand 和 timeseries。 列舉

執行時間欄位 描述 槽類型
@activeInstances 目前已排程的作用中執行個體物件清單。 引用對象,例如「活動實例」:{「ref」:「myRunnableObjectId」}
@actualEndTime 此物件執行完成的時間。 DateTime
@actualStartTime 此物件執行開始的時間。 DateTime
cancellationReason 若此物件已取消,會提供 cancellationReason。 字串
@cascadeFailedOn 物件失敗所在的相依鏈的描述。 參考物件,例如 cascadeFailedOn "" ": {" ref」: "myRunnableObjectId"}
emrStepLog 只在 EMR 活動嘗試時才可使用的 EMR 步驟日誌 字串
errorId 若此物件失敗,會提供 errorId。 字串
errorMessage 若此物件失敗,會提供 errorMessage。 字串
errorStackTrace 如果此物件失敗,則為錯誤堆疊追蹤。 字串
@finishedTime 此物件完成其執行的時間。 DateTime
hadoopJobLog 嘗試 EMR 型活動可用的 Hadoop 任務日誌。 字串
@healthStatus 反映已達終止狀態之最後一個物件執行個體成功或失敗的物件運作狀態。 字串
@healthStatusFromInstanceId 已達終止狀態之最後一個執行個體物件的 ID。 字串
@ healthStatusUpdated 時間 上次更新運作狀態的時間。 DateTime
hostname 選取任務嘗試之用戶端的主機名稱。 字串
@lastDeactivatedTime 此物件最後停用的時間。 DateTime
@ latestCompletedRun 時間 執行完成最近一次執行的時間。 DateTime
@latestRunTime 執行排程最近一次執行的時間。 DateTime
@nextRunTime 下次要排程執行的時間。 DateTime
reportProgressTime 遠端活動最近報告進度的時間。 DateTime
@scheduledEndTime 物件的排程結束時間 DateTime
@scheduledStartTime 物件的排程開始時間 DateTime
@status 此物件的狀態。 字串
@version 建立物件使用的管道版本。 字串
@waitingOn 此物件等待之相依性清單的描述。 引用對象,例如「等待」:{「參考」:「myRunnableObjectID」}

系統欄位 描述 槽類型
@error 描述格式錯誤物件的錯誤。 字串
@pipelineId 此物件所屬管道的 ID。 字串
@sphere 物件範圍代表其在生命週期中的位置:Component 物件會引發執行 Attempt 物件的 Instance 物件。 字串

另請參閱