EmrActivity - AWS Data Pipeline

EmrActivity

EMR クラスターを実行します。

AWS Data Pipeline では、Amazon EMR とは異なる形式をステップに使用します。例えば、AWS Data Pipeline では、EmrActivity ステップフィールドで、JAR 名の後にコンマで区切って引数を指定します。次の例は、Amazon EMR 形式のステップと、同等の AWS Data Pipeline のステップです。

s3://example-bucket/MyWork.jar arg1 arg2 arg3
"s3://example-bucket/MyWork.jar,arg1,arg2,arg3"

以下は、このオブジェクト型の例です。この例では、旧バージョンの Amazon EMR を使用しています。現在使用しているバージョンの Amazon EMR クラスターで、この例が正しいことを検証してください。

このオブジェクトは、同じパイプライン定義ファイルで定義した他のオブジェクトを 3 つ参照します。MyEmrClusterEmrCluster オブジェクトで、MyS3InputMyS3OutputS3DataNode オブジェクトです。

注記

この例では、step フィールドをクラスター文字列(Pig スクリプト、Hadoop ストリーミングクラスター、パラメータを含む独自のカスタム JAR など)に置き換えることができます。

Hadoop 2.x (AMI 3.x)

{ "id" : "MyEmrActivity", "type" : "EmrActivity", "runsOn" : { "ref" : "MyEmrCluster" }, "preStepCommand" : "scp remoteFiles localFiles", "step" : ["s3://mybucket/myPath/myStep.jar,firstArg,secondArg,-files,s3://mybucket/myPath/myFile.py,-input,s3://myinputbucket/path,-output,s3://myoutputbucket/path,-mapper,myFile.py,-reducer,reducerName","s3://mybucket/myPath/myotherStep.jar,..."], "postStepCommand" : "scp localFiles remoteFiles", "input" : { "ref" : "MyS3Input" }, "output" : { "ref" : "MyS3Output" } }
注記

引数を 1 ステップでアプリケーションに渡すには、以下の例のように、スクリプトのパスに Region を指定する必要があります。さらに、渡す引数をエスケープすることが必要になる場合があります。たとえば、script-runner.jar を使ってシェルスクリプトを実行する場合、スクリプトに引数を渡すには、引数を区切るカンマをエスケープする必要があります。次のステップスロットはそれを行う方法を示しています。

"step" : "s3://eu-west-1.elasticmapreduce/libs/script-runner/script-runner.jar,s3://datapipeline/echo.sh,a\\\\,b\\\\,c"

このステップでは、script-runner.jar を使って echo.sh シェルスクリプトを実行し、abc を単一の引数としてスクリプトに渡します。最初のエスケープ文字は結果として生じる引数から削除されるので再度エスケープする必要があります。たとえば、File\.gz を引数として JSON に渡す場合は、それを File\\\\.gz を使ってエスケープできます。ただし、最初のエスケープは破棄されるため、File\\\\\\\\.gz を使う必要があります。

Syntax

オブジェクト呼び出しフィールド 説明 スロットタイプ
schedule このオブジェクトは、スケジュール期間の実行中に呼び出されます。このオブジェクトの依存関係の実行順序を設定するために、別のオブジェクトへのスケジュール参照を指定します。この要件を満たすには、オブジェクトのスケジュールを明示的に設定できます。たとえば、"schedule": {"ref": "DefaultSchedule"} と指定します。ほとんどの場合、すべてのオブジェクトがそのスケジュールを継承するように、スケジュール参照をデフォルトのパイプラインオブジェクトに配置することをお勧めします。または、パイプラインにスケジュールのツリー (マスタースケジュール内のスケジュール) がある場合は、スケジュール参照がある親オブジェクトを作成できます。オプションのスケジュール設定の例については、「https://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-object-schedule.html」を参照してください。 参照オブジェクト ("schedule":{"ref":"myScheduleId"} など)

必須のグループ (次のいずれかが必要です) 説明 スロットタイプ
runsOn このジョブが実行される Amazon EMR クラスター。 参照オブジェクト ("​runsOn":{"ref":"myEmrClusterId"} など)
workerGroup ワーカーグループ。これはルーティングタスクに使用されます。値 runsOn を指定して、workerGroup がある場合、workerGroup は無視されます。 文字列

オプションのフィールド 説明 スロットタイプ
attemptStatus リモートアクティビティから最も最近報告されたステータス。 文字列
attemptTimeout リモートの作業完了のタイムアウト。設定された場合、設定された開始時間内に完了しなかったリモートアクティビティを再試行することができます。 [Period] (期間)
dependsOn 実行可能な別のオブジェクトで依存関係を指定します。 参照オブジェクト ("dependsOn":{"ref":"myActivityId"} など)
failureAndRerunMode 依存関係が失敗または再実行されたときのコンシューマーノードの動作を示します。 一覧表
input 入力データの場所。 参照オブジェクト ("input":{"ref":"myDataNodeId"} など)
lateAfterTimeout オブジェクトが完了しなければならない、パイプライン開始からの経過時間。スケジュールタイプが ondemand に設定されていない場合にのみトリガーされます。 [Period] (期間)
maxActiveInstances コンポーネントで同時にアクティブになるインスタンスの最大数。再実行はアクティブなインスタンスの数にはカウントされません。 整数
maximumRetries 失敗時の最大再試行回数。 整数
onFail 現在のオブジェクトが失敗したときに実行するアクション。 参照オブジェクト ("onFail":{"ref":"myActionId"} など)
onLateAction オブジェクトが予定されていないか、まだ完了していない場合にトリガーされるアクション。 参照オブジェクト ("onLateAction":{"ref":"myActionId"} など)
onSuccess 現在のオブジェクトが成功したときに実行するアクション。 参照オブジェクト ("onSuccess":{"ref":"myActionId"} など)
output 出力データの場所。 参照オブジェクト ("output":{"ref":"myDataNodeId"} など)
parent スロットの継承元となる現在のオブジェクトの親。 参照オブジェクト ("parent":{"ref":"myBaseObjectId"} など)
pipelineLogUri パイプラインのログをアップロードするための Amazon S3 URI (s3://BucketName/Prefix/ など)。 文字列
postStepCommand すべてのステップが完了した後に実行するシェルスクリプト。スクリプトを複数(最大 255 個)指定するには、postStepCommand フィールドを複数追加します。 文字列
precondition オプションで前提条件を定義します。すべての前提条件を満たすまで、データノードは "READY" とマークされません。 参照オブジェクト ("precondition":{"ref":"myPreconditionId"} など)
preStepCommand ステップを実行する前に実行するシェルスクリプト。スクリプトを複数(最大 255 個)指定するには、preStepCommand フィールドを複数追加します。 文字列
reportProgressTimeout reportProgress へのリモート作業の連続した呼び出しのタイムアウト。設定された場合、指定された期間の進捗状況を報告しないリモートアクティビティは停止されたと見なし、再試行できます。 [Period] (期間)
resizeClusterBeforeRunning

入力または出力として指定された DynamoDB テーブルに対応するため、このアクティビティを実行する前にクラスターのサイズを変更します。

注記

EmrActivity が入力または出力データノードとして DynamoDBDataNode を使用している場合、resizeClusterBeforeRunningTRUE に設定すると、AWS Data Pipeline は m3.xlarge インスタンスタイプを使用して開始されます。これによりインスタンスタイプの選択が m3.xlarge に上書きされるため、月別コストが増加することがあります。

ブール値
resizeClusterMaxInstances サイズ変更アルゴリズムによってリクエストできるインスタンスの最大数の制限。 整数
retryDelay 2 回の再試行の間のタイムアウト期間。 [Period] (期間)
scheduleType スケジュールタイプでは、パイプライン定義のオブジェクトを間隔の最初にスケジュールするか、間隔の最後にスケジュールするかを指定できます。値は、cronondemand、および timeseries です。timeseries スケジューリングでは、インスタンスを各間隔の最後にスケジュールします。cron スケジューリングでは、インスタンスを各間隔の最初にスケジュールします。ondemand スケジュールにより、アクティベーションごとに 1 回パイプラインを実行することができます。パイプラインを再実行するために、クローンしたり再作成したりする必要はありません。ondemand スケジュールを使用する場合は、デフォルトオブジェクトで指定し、パイプラインのオブジェクトに対して指定される唯一の scheduleType である必要があります。ondemand パイプラインを使用するには、それ以降の実行ごとに、ActivatePipeline オペレーションを呼び出します。 一覧表
ステップ クラスターが実行する 1 つ以上のステップ。ステップを複数 (最大 255 個) 指定するには、ステップフィールドを複数追加します。JAR 名の後にカンマで区切って引数を使用します (例: s3://example-bucket/MyWork.jar,arg1,arg2,arg3)。 文字列

実行時フィールド 説明 スロットタイプ
@activeInstances 現在スケジュールされているアクティブなインスタンスオブジェクトのリスト。 参照オブジェクト ("activeInstances":{"ref":"myRunnableObjectId"} など)
@actualEndTime このオブジェクトの実行が終了した時刻。 DateTime
@actualStartTime このオブジェクトの実行が開始された時刻。 DateTime
cancellationReason このオブジェクトがキャンセルされた場合の cancellationReason。 文字列
@cascadeFailedOn オブジェクトが失敗した際の依存関係チェーンの説明。 参照オブジェクト ("cascadeFailedOn":{"ref":"myRunnableObjectId"} など)
emrStepLog EMR アクティビティの試行でのみ使用可能な Amazon EMR ステップログ 文字列
errorId このオブジェクトが失敗した場合は errorId 文字列
errorMessage このオブジェクトが失敗した場合は errorMessage 文字列
errorStackTrace このオブジェクトが失敗した場合は、エラースタックトレース。 文字列
@finishedTime このオブジェクトが実行を終了した時刻。 DateTime
hadoopJobLog EMR ベースのアクティビティで試みることができる Hadoop ジョブのログ。 文字列
@healthStatus 終了状態に達した最後のオブジェクトインスタンスの成功または失敗を反映する、オブジェクトのヘルスステータス。 文字列
@healthStatusFromInstanceId 終了状態に達した最後のインスタンスオブジェクトの ID。 文字列
@healthStatusUpdatedTime ヘルス状態が最後に更新された時間。 DateTime
hostname タスクの試行を取得したクライアントのホスト名。 文字列
@lastDeactivatedTime このオブジェクトが最後に非アクティブ化された時刻。 DateTime
@latestCompletedRunTime 実行が完了した最後の実行の時刻。 DateTime
@latestRunTime 実行がスケジュールされた最後の実行の時刻。 DateTime
@nextRunTime 次回にスケジュールされた実行の時刻。 DateTime
reportProgressTime リモートアクティビティで進捗状況が報告された最新の時刻。 DateTime
@scheduledEndTime オブジェクトの予定された終了時刻。 DateTime
@scheduledStartTime オブジェクトの予定された開始時刻。 DateTime
@status このオブジェクトのステータス。 文字列
@version オブジェクトを作成したパイプラインのバージョン。 文字列
@waitingOn このオブジェクトが待機している依存関係のリストの説明。 参照オブジェクト ("waitingOn":{"ref":"myRunnableObjectId"} など)

システムフィールド 説明 スロットタイプ
@error 形式が正しくないオブジェクトを説明するエラー。 文字列
@pipelineId このオブジェクトが属するパイプラインの ID。 文字列
@sphere オブジェクトの球は、ライフサイクルにおける場所を示します。コンポーネントオブジェクトにより、試行オブジェクトを実行するインスタンスオブジェクトが発生します。 文字列

以下の資料も参照してください。