HadoopActivity - AWS Data Pipeline

AWS Data Pipeline は、新規顧客には利用できなくなりました。の既存のお客様 AWS Data Pipeline は、通常どおりサービスを引き続き使用できます。詳細はこちら

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

HadoopActivity

クラスターで MapReduce ジョブを実行します。クラスターは、 によって管理される EMR クラスター AWS Data Pipeline でも、 を使用する場合は別のリソースでもかまいません TaskRunner。作業を並行して実行 HadoopActivity する場合に使用します。これにより、Hadoop 1 のYARNフレームワークまたはリソースネゴシエーターのスケジューリング MapReduce リソースを使用できます。Amazon EMR Step アクションを使用して作業を順番に実行する場合でも、 を使用できますEmrActivity

HadoopActivity によって管理される EMRクラスターの使用 AWS Data Pipeline

次の HadoopActivity オブジェクトは、 EmrCluster リソースを使用してプログラムを実行します。

{ "name": "MyHadoopActivity", "schedule": {"ref": "ResourcePeriod"}, "runsOn": {"ref": “MyEmrCluster”}, "type": "HadoopActivity", "preActivityTaskConfig":{"ref":"preTaskScriptConfig”}, "jarUri": "/home/hadoop/contrib/streaming/hadoop-streaming.jar", "argument": [ "-files", “s3://elasticmapreduce/samples/wordcount/wordSplitter.py“, "-mapper", "wordSplitter.py", "-reducer", "aggregate", "-input", "s3://elasticmapreduce/samples/wordcount/input/", "-output", “s3://test-bucket/MyHadoopActivity/#{@pipelineId}/#{format(@scheduledStartTime,'YYYY-MM-dd')}" ], "maximumRetries": "0", "postActivityTaskConfig":{"ref":"postTaskScriptConfig”}, "hadoopQueue" : “high” }

対応する を次に示します。MyEmrCluster。Hadoop 2 ベースの YARN の で FairScheduler および キューを設定しますAMIs。

{ "id" : "MyEmrCluster", "type" : "EmrCluster", "hadoopSchedulerType" : "PARALLEL_FAIR_SCHEDULING", “amiVersion” : “3.7.0”, "bootstrapAction" : ["s3://Region.elasticmapreduce/bootstrap-actions/configure-hadoop,-z,yarn.scheduler.capacity.root.queues=low\,high\,default,-z,yarn.scheduler.capacity.root.high.capacity=50,-z,yarn.scheduler.capacity.root.low.capacity=10,-z,yarn.scheduler.capacity.root.default.capacity=30”] }

これは、Hadoop 1 FairScheduler で設定 EmrCluster するために使用する です。

{ "id": "MyEmrCluster", "type": "EmrCluster", "hadoopSchedulerType": "PARALLEL_FAIR_SCHEDULING", "amiVersion": "2.4.8", "bootstrapAction": "s3://Region.elasticmapreduce/bootstrap-actions/configure-hadoop,-m,mapred.queue.names=low\\\\,high\\\\,default,-m,mapred.fairscheduler.poolnameproperty=mapred.job.queue.name" }

以下は EmrCluster 、Hadoop 2 ベースの CapacityScheduler の設定ですAMIs。

{ "id": "MyEmrCluster", "type": "EmrCluster", "hadoopSchedulerType": "PARALLEL_CAPACITY_SCHEDULING", "amiVersion": "3.7.0", "bootstrapAction": "s3://Region.elasticmapreduce/bootstrap-actions/configure-hadoop,-z,yarn.scheduler.capacity.root.queues=low\\\\,high,-z,yarn.scheduler.capacity.root.high.capacity=40,-z,yarn.scheduler.capacity.root.low.capacity=60" }
HadoopActivity 既存のEMRクラスターの使用

この例では、ワーカーグループと TaskRunner を使用して、既存のEMRクラスターでプログラムを実行します。次のパイプライン定義では、 を使用して次の HadoopActivity 操作を行います。

  • でのみ MapReduce プログラムを実行する myWorkerGroup リソースの使用料金を見積もることができます。ワーカーグループの詳細については、「Task Runnerを使用した既存のリソースでの作業の実行」を参照してください。

  • preActivityTaskConfig と postActivityTaskConfig を実行する

{ "objects": [ { "argument": [ "-files", "s3://elasticmapreduce/samples/wordcount/wordSplitter.py", "-mapper", "wordSplitter.py", "-reducer", "aggregate", "-input", "s3://elasticmapreduce/samples/wordcount/input/", "-output", "s3://test-bucket/MyHadoopActivity/#{@pipelineId}/#{format(@scheduledStartTime,'YYYY-MM-dd')}" ], "id": "MyHadoopActivity", "jarUri": "/home/hadoop/contrib/streaming/hadoop-streaming.jar", "name": "MyHadoopActivity", "type": "HadoopActivity" }, { "id": "SchedulePeriod", "startDateTime": "start_datetime", "name": "SchedulePeriod", "period": "1 day", "type": "Schedule", "endDateTime": "end_datetime" }, { "id": "ShellScriptConfig", "scriptUri": "s3://test-bucket/scripts/preTaskScript.sh", "name": "preTaskScriptConfig", "scriptArgument": [ "test", "argument" ], "type": "ShellScriptConfig" }, { "id": "ShellScriptConfig", "scriptUri": "s3://test-bucket/scripts/postTaskScript.sh", "name": "postTaskScriptConfig", "scriptArgument": [ "test", "argument" ], "type": "ShellScriptConfig" }, { "id": "Default", "scheduleType": "cron", "schedule": { "ref": "SchedulePeriod" }, "name": "Default", "pipelineLogUri": "s3://test-bucket/logs/2015-05-22T18:02:00.343Z642f3fe415", "maximumRetries": "0", "workerGroup": "myWorkerGroup", "preActivityTaskConfig": { "ref": "preTaskScriptConfig" }, "postActivityTaskConfig": { "ref": "postTaskScriptConfig" } } ] }

構文

必須フィールド 説明 スロットタイプ
jarUri で実行する Amazon S3 またはクラスターのローカルファイルシステムJAR内の の場所 HadoopActivity。 文字列

オブジェクト呼び出しフィールド 説明 スロットタイプ
schedule このオブジェクトは、スケジュール期間の実行中に呼び出されます。ユーザーは、このオブジェクトの依存関係の実行順序を設定するには、別のオブジェクトへのスケジュール参照を指定する必要があります。ユーザーは、オブジェクトに明示的にスケジュールを設定することで、この要件を満たすことができます。例えば、「スケジュール」: {"ref": "DefaultSchedule"} を指定します。ほとんどの場合、すべてのオブジェクトがそのスケジュールを継承するように、スケジュール参照をデフォルトのパイプラインオブジェクトに配置することをお勧めします。または、パイプラインにスケジュールのツリー (マスタースケジュール内のスケジュール) がある場合、ユーザーは、スケジュール参照がある親オブジェクトを作成することができます。オプションのスケジュール設定の例については、「https://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-object-schedule.html」を参照してください。 リファレンスオブジェクト、例: "schedule":{"ref"myScheduleId"}

必須のグループ (次のいずれかが必要です) 説明 スロットタイプ
runsOn EMR このジョブを実行するクラスター。 リファレンスオブジェクト、例: "runsOn":{"ref"myEmrClusterId"}
workerGroup ワーカーグループ。これはルーティングタスクに使用されます。 runsOn 値を指定して workerGroup 存在する場合、 workerGroup は無視されます。 文字列

オプションのフィールド 説明 スロットタイプ
argument に渡す引数JAR。 文字列
attemptStatus リモートアクティビティから最も最近報告されたステータス。 文字列
attemptTimeout リモートの作業完了のタイムアウト。設定された場合、設定された開始時間内に完了しなかったリモートアクティビティを再試行することができます。 [Period] (期間)
dependsOn 実行可能な別のオブジェクトで依存関係を指定します。 リファレンスオブジェクト、例: "dependsOn":{"ref"myActivityId"}
failureAndRerunモード 依存関係が失敗または再実行されたときのコンシューマーノードの動作を示します。 一覧表
hadoopQueue アクティビティを送信する先の Hadoop スケジューラーのキュー名。 文字列
input 入力データの場所。 リファレンスオブジェクト、例:「input」:{「ref"myDataNodeId」}
lateAfterTimeout オブジェクトが完了しなければならない、パイプライン開始からの経過時間。スケジュールタイプが ondemand に設定されていない場合にのみトリガーされます。 [Period] (期間)
mainClass で実行JARしている のメインクラス HadoopActivity。 文字列
maxActiveInstances コンポーネントで同時にアクティブになるインスタンスの最大数。再実行はアクティブなインスタンスの数にはカウントされません。 整数
maximumRetries 失敗時の最大再試行回数 整数
onFail 現在のオブジェクトが失敗したときに実行するアクション。 リファレンスオブジェクト、例: "onFail":{"ref"myActionId"}
onLateAction オブジェクトが予定されていないか、まだ完了していない場合にトリガーされるアクション。 リファレンスオブジェクト、例: "onLateAction":{"ref"myActionId"}
onSuccess 現在のオブジェクトが成功したときに実行するアクション。 リファレンスオブジェクト、例: "onSuccess":{"ref"myActionId"}
output 出力データの場所。 リファレンスオブジェクト、例:「output」:{「ref"myDataNodeId」}
parent スロットの継承元となる現在のオブジェクトの親。 リファレンスオブジェクト、例:「parent」:{「ref"myBaseObjectId」}
pipelineLogUri パイプラインのログをアップロードするための S3 URI (「s3://BucketName/Key/」など)。 文字列
postActivityTask設定 実行するポストアクティビティ設定スクリプト。これは、Amazon S3 URIのシェルスクリプトの と引数のリストで構成されます。 Amazon S3 リファレンスオブジェクト、例:postActivityTask「Config」:{「ref"myShellScriptConfigId」}
preActivityTask設定 実行するプリアクティビティ設定スクリプト。これは、Amazon S3 URIのシェルスクリプトの と引数のリストで構成されます。 Amazon S3 リファレンスオブジェクト、例:preActivityTask「Config」:{「ref"myShellScriptConfigId」}
precondition オプションで前提条件を定義します。データノードは、すべての前提条件が満たされREADYるまで「」とマークされません。 リファレンスオブジェクト、例:「precondition」:{「ref"myPreconditionId」}
reportProgressTimeout へのリモートワークの連続呼び出しのタイムアウトreportProgress。設定された場合、指定された期間の進捗状況を報告しないリモートアクティビティは停止されたと見なし、再試行できます。 [Period] (期間)
retryDelay 2 回の再試行の間のタイムアウト期間。 [Period] (期間)
scheduleType スケジュールタイプによって、パイプライン定義のオブジェクトを、期間の最初にスケジュールするか、最後にスケジュールするかを指定できます。[Time Series Style Scheduling] は、インスタンスが各間隔の最後にスケジュールされることを意味し、[Cron Style Scheduling] は、インスタンスが各間隔の最初にスケジュールされることを意味します。オンデマンドスケジュールにより、アクティベーションごとに 1 回パイプラインを実行することができます。つまり、パイプラインを再実行するために、クローンしたり再作成したりする必要はありません。オンデマンドスケジュールを使用する場合は、デフォルトの オブジェクトで指定する必要があり、パイプライン内のオブジェクトに対して scheduleType 指定されるのは のみです。オンデマンドパイプラインを使用するには、後続の実行ごとに ActivatePipeline オペレーションを呼び出すだけです。値は、cron、ondemand、および timeseries です。 一覧表

実行時フィールド 説明 スロットタイプ
@activeInstances 現在スケジュールされているアクティブなインスタンスオブジェクトのリスト。 リファレンスオブジェクト、例: "activeInstances":{"ref"myRunnableObjectId"}
@actualEndTime このオブジェクトの実行が終了した時刻。 DateTime
@actualStartTime このオブジェクトの実行が開始された時刻。 DateTime
cancellationReason このオブジェクト cancellationReason がキャンセルされた場合の 。 文字列
@cascadeFailedOn オブジェクトが失敗した際の依存関係チェーンの説明。 リファレンスオブジェクト、例: "cascadeFailedOn":{"ref"myRunnableObjectId"}
emrStepLog EMR EMRアクティビティの試行でのみ使用できる ステップログ 文字列
errorId このオブジェクトが失敗 errorId した場合の 。 文字列
errorMessage このオブジェクトが失敗 errorMessage した場合の 。 文字列
errorStackTrace このオブジェクトが失敗した場合は、エラースタックトレース。 文字列
@finishedTime このオブジェクトが実行を終了した時刻。 DateTime
hadoopJobLog EMRベースのアクティビティの試行で使用可能な Hadoop ジョブログ。 文字列
@healthStatus 終了状態に達した最後のオブジェクトインスタンスの成功または失敗を反映する、オブジェクトのヘルスステータス。 文字列
@healthStatusFromInstanceId 終了状態に達した最後のインスタンスオブジェクトの ID。 文字列
@healthStatusUpdatedTime ヘルス状態が最後に更新された時間。 DateTime
hostname タスクの試行を取得したクライアントのホスト名。 文字列
@lastDeactivatedTime このオブジェクトが最後に非アクティブ化された時刻。 DateTime
@latestCompletedRunTime 実行が完了した最後の実行の時刻。 DateTime
@latestRunTime 実行がスケジュールされた最後の実行の時刻。 DateTime
@nextRunTime 次回にスケジュールされた実行の時刻。 DateTime
reportProgressTime リモートアクティビティで進捗状況が報告された最新の時刻。 DateTime
@scheduledEndTime オブジェクトの予定された終了時刻 DateTime
@scheduledStartTime オブジェクトの予定された開始時刻 DateTime
@status このオブジェクトのステータス。 文字列
@version オブジェクトが作成されたパイプラインのバージョン。 文字列
@waitingOn このオブジェクトが待機している依存関係のリストの説明。 リファレンスオブジェクト、例: "waitingOn":{"ref"myRunnableObjectId"}

システムフィールド 説明 スロットタイプ
@error 形式が正しくないオブジェクトを説明するエラー。 文字列
@pipelineId このオブジェクトが属するパイプラインの ID。 文字列
@sphere オブジェクトの球は、ライフサイクルにおける場所を示します。コンポーネントオブジェクトにより、試行オブジェクトを実行するインスタンスオブジェクトが発生します。 文字列

以下の資料も参照してください。