HadoopActivity - AWS Data Pipeline

HadoopActivity

クラスターで MapReduce ジョブを実行します。クラスターは AWS Data Pipeline によって管理される EMR クラスターです。または TaskRunner を使用する場合、クラスターは別のリソースです。並列処理を行うときは HadoopActivity を使用します。これにより、YARN フレームワークまたは Hadoop 1 の MapReduce リソースネゴシエータのスケジューリングリソースを使用できます。Amazon EMR ステップアクションを使用して作業を連続的に実行する場合は、依然として EmrActivity を使用できます。

AWS Data Pipeline によって管理される EMRクラスターを使用する HadoopActivity

次の HadoopActivity オブジェクトは、EmrCluster リソースを使用してプログラムを実行します。

{ "name": "MyHadoopActivity", "schedule": {"ref": "ResourcePeriod"}, "runsOn": {"ref": “MyEmrCluster”}, "type": "HadoopActivity", "preActivityTaskConfig":{"ref":"preTaskScriptConfig”}, "jarUri": "/home/hadoop/contrib/streaming/hadoop-streaming.jar", "argument": [ "-files", “s3://elasticmapreduce/samples/wordcount/wordSplitter.py“, "-mapper", "wordSplitter.py", "-reducer", "aggregate", "-input", "s3://elasticmapreduce/samples/wordcount/input/", "-output", “s3://test-bucket/MyHadoopActivity/#{@pipelineId}/#{format(@scheduledStartTime,'YYYY-MM-dd')}" ], "maximumRetries": "0", "postActivityTaskConfig":{"ref":"postTaskScriptConfig”}, "hadoopQueue" : “high” }

こちらの対応する MyEmrCluster では Hadoop 2 AMI の YARN で FairScheduler とキューを設定します。

{ "id" : "MyEmrCluster", "type" : "EmrCluster", "hadoopSchedulerType" : "PARALLEL_FAIR_SCHEDULING", “amiVersion” : “3.7.0”, "bootstrapAction" : ["s3://Region.elasticmapreduce/bootstrap-actions/configure-hadoop,-z,yarn.scheduler.capacity.root.queues=low\,high\,default,-z,yarn.scheduler.capacity.root.high.capacity=50,-z,yarn.scheduler.capacity.root.low.capacity=10,-z,yarn.scheduler.capacity.root.default.capacity=30”] }

これは Hadoop 1 で FairScheduler を設定するために使う EmrCluster です。

{ "id": "MyEmrCluster", "type": "EmrCluster", "hadoopSchedulerType": "PARALLEL_FAIR_SCHEDULING", "amiVersion": "2.4.8", "bootstrapAction": "s3://Region.elasticmapreduce/bootstrap-actions/configure-hadoop,-m,mapred.queue.names=low\\\\,high\\\\,default,-m,mapred.fairscheduler.poolnameproperty=mapred.job.queue.name" }

次の EmrCluster は Hadoop 2 AMI の CapacityScheduler を設定します。

{ "id": "MyEmrCluster", "type": "EmrCluster", "hadoopSchedulerType": "PARALLEL_CAPACITY_SCHEDULING", "amiVersion": "3.7.0", "bootstrapAction": "s3://Region.elasticmapreduce/bootstrap-actions/configure-hadoop,-z,yarn.scheduler.capacity.root.queues=low\\\\,high,-z,yarn.scheduler.capacity.root.high.capacity=40,-z,yarn.scheduler.capacity.root.low.capacity=60" }
既存の EMR クラスターを使用する HadoopActivity

この例では、ワーカーグループと TaskRunner を使用して、既存の EMR クラスターでプログラムを実行します。次のパイプライン定義では HadoopActivity を使用して以下の操作を行います。

{ "objects": [ { "argument": [ "-files", "s3://elasticmapreduce/samples/wordcount/wordSplitter.py", "-mapper", "wordSplitter.py", "-reducer", "aggregate", "-input", "s3://elasticmapreduce/samples/wordcount/input/", "-output", "s3://test-bucket/MyHadoopActivity/#{@pipelineId}/#{format(@scheduledStartTime,'YYYY-MM-dd')}" ], "id": "MyHadoopActivity", "jarUri": "/home/hadoop/contrib/streaming/hadoop-streaming.jar", "name": "MyHadoopActivity", "type": "HadoopActivity" }, { "id": "SchedulePeriod", "startDateTime": "start_datetime", "name": "SchedulePeriod", "period": "1 day", "type": "Schedule", "endDateTime": "end_datetime" }, { "id": "ShellScriptConfig", "scriptUri": "s3://test-bucket/scripts/preTaskScript.sh", "name": "preTaskScriptConfig", "scriptArgument": [ "test", "argument" ], "type": "ShellScriptConfig" }, { "id": "ShellScriptConfig", "scriptUri": "s3://test-bucket/scripts/postTaskScript.sh", "name": "postTaskScriptConfig", "scriptArgument": [ "test", "argument" ], "type": "ShellScriptConfig" }, { "id": "Default", "scheduleType": "cron", "schedule": { "ref": "SchedulePeriod" }, "name": "Default", "pipelineLogUri": "s3://test-bucket/logs/2015-05-22T18:02:00.343Z642f3fe415", "maximumRetries": "0", "workerGroup": "myWorkerGroup", "preActivityTaskConfig": { "ref": "preTaskScriptConfig" }, "postActivityTaskConfig": { "ref": "postTaskScriptConfig" } } ] }

Syntax

必須フィールド 説明 スロットタイプ
jarUri Amazon S3 の JAR の場所または HadoopActivity で実行するクラスターのローカルファイルシステム。 文字列

オブジェクト呼び出しフィールド 説明 スロットタイプ
schedule このオブジェクトは、スケジュール期間の実行中に呼び出されます。ユーザーは、このオブジェクトの依存関係の実行順序を設定するには、別のオブジェクトへのスケジュール参照を指定する必要があります。ユーザーは、オブジェクトでスケジュールを明示的に設定して、この要件を満たすことができます。たとえば、"schedule": {"ref": "DefaultSchedule"} と指定します。ほとんどの場合、すべてのオブジェクトがそのスケジュールを継承するように、スケジュール参照をデフォルトのパイプラインオブジェクトに配置することをお勧めします。または、パイプラインにスケジュールのツリー (マスタースケジュール内のスケジュール) がある場合、ユーザーは、スケジュール参照がある親オブジェクトを作成することができます。オプションのスケジュール設定の例については、「https://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-object-schedule.html」を参照してください。 参照オブジェクト ("schedule":{"ref":"myScheduleId"} など)

必須のグループ (次のいずれかが必要です) 説明 スロットタイプ
runsOn このジョブが実行される EMR クラスター。 参照オブジェクト ("runsOn":{"ref":"myEmrClusterId"} など)
workerGroup ワーカーグループ。これはルーティングタスクに使用されます。runsOn 値を指定して、workerGroup がある場合、workerGroup は無視されます。 文字列

オプションのフィールド 説明 スロットタイプ
argument JAR に渡す引数。 文字列
attemptStatus リモートアクティビティから最も最近報告されたステータス。 文字列
attemptTimeout リモートの作業完了のタイムアウト。設定された場合、設定された開始時間内に完了しなかったリモートアクティビティを再試行することができます。 [Period] (期間)
dependsOn 実行可能な別のオブジェクトで依存関係を指定します。 参照オブジェクト ("dependsOn":{"ref":"myActivityId"} など)
failureAndRerunMode 依存関係が失敗または再実行されたときのコンシューマーノードの動作を示します。 一覧表
hadoopQueue アクティビティを送信する先の Hadoop スケジューラーのキュー名。 文字列
input 入力データの場所。 参照オブジェクト ("input":{"ref":"myDataNodeId"} など)
lateAfterTimeout オブジェクトが完了しなければならない、パイプライン開始からの経過時間。スケジュールタイプが ondemand に設定されていない場合にのみトリガーされます。 [Period] (期間)
mainClass HadoopActivity で実行している JAR のメインクラス。 文字列
maxActiveInstances コンポーネントで同時にアクティブになるインスタンスの最大数。再実行はアクティブなインスタンスの数にはカウントされません。 整数
maximumRetries 失敗時の最大再試行回数 整数
onFail 現在のオブジェクトが失敗したときに実行するアクション。 参照オブジェクト (onFail:{"ref":"myActionId"} など)
onLateAction オブジェクトが予定されていないか、まだ完了していない場合にトリガーされるアクション。 参照オブジェクト ("onLateAction":{"ref":"myActionId"} など)
onSuccess 現在のオブジェクトが成功したときに実行するアクション。 参照オブジェクト ("onSuccess":{"ref":"myActionId"} など)
output 出力データの場所。 参照オブジェクト ("output":{"ref":"myDataNodeId"} など)
parent スロットの継承元となる現在のオブジェクトの親。 参照オブジェクト ("parent":{"ref":"myBaseObjectId"} など)
pipelineLogUri パイプラインのログをアップロードするための S3 URI (s3://BucketName/Key/ など)。 文字列
postActivityTaskConfig 実行するポストアクティビティ設定スクリプト。Amazon S3 のシェルスクリプトの URI と引数のリストで構成されます。 参照オブジェクト ("postActivityTaskConfig":{"ref":"myShellScriptConfigId"} など)
preActivityTaskConfig 実行するプリアクティビティ設定スクリプト。Amazon S3 のシェルスクリプトの URI と引数のリストで構成されます。 参照オブジェクト ("preActivityTaskConfig":{"ref":"myShellScriptConfigId"} など)
precondition オプションで前提条件を定義します。すべての前提条件を満たすまで、データノードは "READY" とマークされません。 参照オブジェクト ("precondition":{"ref":"myPreconditionId"} など)
reportProgressTimeout reportProgress へのリモート作業の連続した呼び出しのタイムアウト。設定された場合、指定された期間の進捗状況を報告しないリモートアクティビティは停止されたと見なし、再試行できます。 [Period] (期間)
retryDelay 2 回の再試行の間のタイムアウト期間。 [Period] (期間)
scheduleType スケジュールタイプによって、パイプライン定義のオブジェクトを、期間の最初にスケジュールするか、最後にスケジュールするかを指定できます。[Time Series Style Scheduling] は、インスタンスが各間隔の最後にスケジュールされることを意味し、[Cron Style Scheduling] は、インスタンスが各間隔の最初にスケジュールされることを意味します。オンデマンドスケジュールにより、アクティベーションごとに 1 回パイプラインを実行することができます。つまり、パイプラインを再実行するために、クローンしたり再作成したりする必要はありません。オンデマンドスケジュールを使用する場合は、デフォルトオブジェクトで指定し、パイプラインのオブジェクトに対して指定される唯一の scheduleType である必要があります。オンデマンドパイプラインを使用するには、それ以降の実行ごとに、ActivatePipeline オペレーションを呼び出すだけです。値は、cron、ondemand、および timeseries です。 一覧表

実行時フィールド 説明 スロットタイプ
@activeInstances 現在スケジュールされているアクティブなインスタンスオブジェクトのリスト。 参照オブジェクト ("activeInstances":{"ref":"myRunnableObjectId"} など)
@actualEndTime このオブジェクトの実行が終了した時刻。 DateTime
@actualStartTime このオブジェクトの実行が開始された時刻。 DateTime
cancellationReason このオブジェクトがキャンセルされた場合の cancellationReason。 文字列
@cascadeFailedOn オブジェクトが失敗した際の依存関係チェーンの説明。 参照オブジェクト ("cascadeFailedOn":{"ref":"myRunnableObjectId"} など)
emrStepLog EMR アクティビティの試行でのみ使用可能な EMR ステップログ 文字列
errorId このオブジェクトが失敗した場合は errorId。 文字列
errorMessage このオブジェクトが失敗した場合は errorMessage。 文字列
errorStackTrace このオブジェクトが失敗した場合は、エラースタックトレース。 文字列
@finishedTime このオブジェクトが実行を終了した時刻。 DateTime
hadoopJobLog EMR ベースのアクティビティで試みることができる Hadoop ジョブのログ。 文字列
@healthStatus 終了状態に達した最後のオブジェクトインスタンスの成功または失敗を反映する、オブジェクトのヘルスステータス。 文字列
@healthStatusFromInstanceId 終了状態に達した最後のインスタンスオブジェクトの ID。 文字列
@healthStatusUpdatedTime ヘルス状態が最後に更新された時間。 DateTime
hostname タスクの試行を取得したクライアントのホスト名。 文字列
@lastDeactivatedTime このオブジェクトが最後に非アクティブ化された時刻。 DateTime
@latestCompletedRunTime 実行が完了した最後の実行の時刻。 DateTime
@latestRunTime 実行がスケジュールされた最後の実行の時刻。 DateTime
@nextRunTime 次回にスケジュールされた実行の時刻。 DateTime
reportProgressTime リモートアクティビティで進捗状況が報告された最新の時刻。 DateTime
@scheduledEndTime オブジェクトの予定された終了時刻 DateTime
@scheduledStartTime オブジェクトの予定された開始時刻 DateTime
@status このオブジェクトのステータス。 文字列
@version オブジェクトが作成されたパイプラインのバージョン。 文字列
@waitingOn このオブジェクトが待機している依存関係のリストの説明。 参照オブジェクト ("waitingOn":{"ref":"myRunnableObjectId"} など)

システムフィールド 説明 スロットタイプ
@error 形式が正しくないオブジェクトを説明するエラー。 文字列
@pipelineId このオブジェクトが属するパイプラインの ID。 文字列
@sphere オブジェクトの球は、ライフサイクルにおける場所を示します。コンポーネントオブジェクトにより、試行オブジェクトを実行するインスタンスオブジェクトが発生します。 文字列

以下の資料も参照してください。