PigActivity - AWS Data Pipeline

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

PigActivity

PigActivity は、 ShellCommandActivity または を使用する AWS Data Pipeline 必要なく、 で Pig スクリプトをネイティブにサポートしますEmrActivity。さらに、 はデータステージング PigActivity をサポートします。stage フィールドが true に設定されていると、 AWS Data Pipeline は Pig のスキーマとして入力データをステージングします(ユーザーがコードを追加する必要はありません)。

次のパイプライン例は PigActivity の使い方を示します。パイプライン例では、以下のステップを実行します。

  • MyPigActivity1 は Amazon S3 からデータをロードし、数列のデータを選択して Amazon S3 にアップロードする Pig スクリプトを実行します。

  • MyPigActivity2 は最初の出力をロードし、数列と 3 行のデータを選択し、2 番目の出力として Amazon S3 にアップロードします。

  • MyPigActivity3 は 2 番目の出力データをロードし、2 行のデータと「fifth」という名前の列のみを Amazon RDS に挿入します。

  • MyPigActivity4 は Amazon RDS データをロードし、データの最初の行を選択して Amazon S3 にアップロードします。

{ "objects": [ { "id": "MyInputData1", "schedule": { "ref": "MyEmrResourcePeriod" }, "directoryPath": "s3://example-bucket/pigTestInput", "name": "MyInputData1", "dataFormat": { "ref": "MyInputDataType1" }, "type": "S3DataNode" }, { "id": "MyPigActivity4", "scheduleType": "CRON", "schedule": { "ref": "MyEmrResourcePeriod" }, "input": { "ref": "MyOutputData3" }, "pipelineLogUri": "s3://example-bucket/path/", "name": "MyPigActivity4", "runsOn": { "ref": "MyEmrResource" }, "type": "PigActivity", "dependsOn": { "ref": "MyPigActivity3" }, "output": { "ref": "MyOutputData4" }, "script": "B = LIMIT ${input1} 1; ${output1} = FOREACH B GENERATE one;", "stage": "true" }, { "id": "MyPigActivity3", "scheduleType": "CRON", "schedule": { "ref": "MyEmrResourcePeriod" }, "input": { "ref": "MyOutputData2" }, "pipelineLogUri": "s3://example-bucket/path", "name": "MyPigActivity3", "runsOn": { "ref": "MyEmrResource" }, "script": "B = LIMIT ${input1} 2; ${output1} = FOREACH B GENERATE Fifth;", "type": "PigActivity", "dependsOn": { "ref": "MyPigActivity2" }, "output": { "ref": "MyOutputData3" }, "stage": "true" }, { "id": "MyOutputData2", "schedule": { "ref": "MyEmrResourcePeriod" }, "name": "MyOutputData2", "directoryPath": "s3://example-bucket/PigActivityOutput2", "dataFormat": { "ref": "MyOutputDataType2" }, "type": "S3DataNode" }, { "id": "MyOutputData1", "schedule": { "ref": "MyEmrResourcePeriod" }, "name": "MyOutputData1", "directoryPath": "s3://example-bucket/PigActivityOutput1", "dataFormat": { "ref": "MyOutputDataType1" }, "type": "S3DataNode" }, { "id": "MyInputDataType1", "name": "MyInputDataType1", "column": [ "First STRING", "Second STRING", "Third STRING", "Fourth STRING", "Fifth STRING", "Sixth STRING", "Seventh STRING", "Eighth STRING", "Ninth STRING", "Tenth STRING" ], "inputRegEx": "^(\\\\S+) (\\\\S+) (\\\\S+) (\\\\S+) (\\\\S+) (\\\\S+) (\\\\S+) (\\\\S+) (\\\\S+) (\\\\S+)", "type": "RegEx" }, { "id": "MyEmrResource", "region": "us-east-1", "schedule": { "ref": "MyEmrResourcePeriod" }, "keyPair": "example-keypair", "masterInstanceType": "m1.small", "enableDebugging": "true", "name": "MyEmrResource", "actionOnTaskFailure": "continue", "type": "EmrCluster" }, { "id": "MyOutputDataType4", "name": "MyOutputDataType4", "column": "one STRING", "type": "CSV" }, { "id": "MyOutputData4", "schedule": { "ref": "MyEmrResourcePeriod" }, "directoryPath": "s3://example-bucket/PigActivityOutput3", "name": "MyOutputData4", "dataFormat": { "ref": "MyOutputDataType4" }, "type": "S3DataNode" }, { "id": "MyOutputDataType1", "name": "MyOutputDataType1", "column": [ "First STRING", "Second STRING", "Third STRING", "Fourth STRING", "Fifth STRING", "Sixth STRING", "Seventh STRING", "Eighth STRING" ], "columnSeparator": "*", "type": "Custom" }, { "id": "MyOutputData3", "username": "___", "schedule": { "ref": "MyEmrResourcePeriod" }, "insertQuery": "insert into #{table} (one) values (?)", "name": "MyOutputData3", "*password": "___", "runsOn": { "ref": "MyEmrResource" }, "connectionString": "jdbc:mysql://example-database-instance:3306/example-database", "selectQuery": "select * from #{table}", "table": "example-table-name", "type": "MySqlDataNode" }, { "id": "MyOutputDataType2", "name": "MyOutputDataType2", "column": [ "Third STRING", "Fourth STRING", "Fifth STRING", "Sixth STRING", "Seventh STRING", "Eighth STRING" ], "type": "TSV" }, { "id": "MyPigActivity2", "scheduleType": "CRON", "schedule": { "ref": "MyEmrResourcePeriod" }, "input": { "ref": "MyOutputData1" }, "pipelineLogUri": "s3://example-bucket/path", "name": "MyPigActivity2", "runsOn": { "ref": "MyEmrResource" }, "dependsOn": { "ref": "MyPigActivity1" }, "type": "PigActivity", "script": "B = LIMIT ${input1} 3; ${output1} = FOREACH B GENERATE Third, Fourth, Fifth, Sixth, Seventh, Eighth;", "output": { "ref": "MyOutputData2" }, "stage": "true" }, { "id": "MyEmrResourcePeriod", "startDateTime": "2013-05-20T00:00:00", "name": "MyEmrResourcePeriod", "period": "1 day", "type": "Schedule", "endDateTime": "2013-05-21T00:00:00" }, { "id": "MyPigActivity1", "scheduleType": "CRON", "schedule": { "ref": "MyEmrResourcePeriod" }, "input": { "ref": "MyInputData1" }, "pipelineLogUri": "s3://example-bucket/path", "scriptUri": "s3://example-bucket/script/pigTestScipt.q", "name": "MyPigActivity1", "runsOn": { "ref": "MyEmrResource" }, "scriptVariable": [ "column1=First", "column2=Second", "three=3" ], "type": "PigActivity", "output": { "ref": "MyOutputData1" }, "stage": "true" } ] }

pigTestScript.q の内容は次のとおりです。

B = LIMIT ${input1} $three; ${output1} = FOREACH B GENERATE $column1, $column2, Third, Fourth, Fifth, Sixth, Seventh, Eighth;

構文

オブジェクト呼び出しフィールド 説明 スロットタイプ
schedule このオブジェクトは、スケジュール期間の実行中に呼び出されます。ユーザーは、このオブジェクトの依存関係の実行順序を設定するには、別のオブジェクトへのスケジュール参照を指定する必要があります。ユーザーは、オブジェクトにスケジュールを明示的に設定することで、この要件を満たすことができます。例えば、「スケジュール」: {"ref」:DefaultSchedule「"} を指定します。ほとんどの場合、すべてのオブジェクトがそのスケジュールを継承するように、スケジュール参照をデフォルトのパイプラインオブジェクトに配置することをお勧めします。または、パイプラインにスケジュールのツリー (マスタースケジュール内のスケジュール) がある場合、ユーザーは、スケジュール参照がある親オブジェクトを作成することができます。オプションのスケジュール設定の例については、「https://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-object-schedule.html」を参照してください。 リファレンスオブジェクト、例:「schedule」:{「ref"myScheduleId」}

必須のグループ (次のいずれかが必要です) 説明 スロットタイプ
script 実行する Pig スクリプト。 文字列
scriptUri 実行される Pig スクリプトの場所 (たとえば、s3: //scriptLocation)。 文字列

必須のグループ (次のいずれかが必要です) 説明 スロットタイプ
runsOn これが PigActivity 実行される EMR クラスター。 リファレンスオブジェクト、例えば「runsOn」:{「ref"myEmrClusterId」}
workerGroup ワーカーグループ。これはルーティングタスクに使用されます。値 runsOn を指定して、workerGroup がある場合、workerGroup は無視されます。 文字列

オプションのフィールド 説明 スロットタイプ
attemptStatus リモートアクティビティから最も最近報告されたステータス。 文字列
attemptTimeout リモートの作業完了のタイムアウト。設定された場合、設定された開始時間内に完了しなかったリモートアクティビティを再試行することができます。 [Period] (期間)
dependsOn 実行可能な別のオブジェクトで依存関係を指定します。 リファレンスオブジェクト、例えば「dependsOn」:{「ref"myActivityId」}
failureAndRerunモード 依存関係が失敗または再実行されたときのコンシューマーノードの動作を示します。 一覧表
input 入力データソース。 リファレンスオブジェクト、例:「input」:{「ref"myDataNodeId」}
lateAfterTimeout オブジェクトが完了しなければならない、パイプライン開始からの経過時間。スケジュールタイプが ondemand に設定されていない場合にのみトリガーされます。 [Period] (期間)
maxActiveInstances コンポーネントで同時にアクティブになるインスタンスの最大数。再実行はアクティブなインスタンスの数にはカウントされません。 整数
maximumRetries 失敗時の最大再試行回数。 整数
onFail 現在のオブジェクトが失敗したときに実行するアクション。 リファレンスオブジェクト、例えばonFail ":{"ref"myActionId}」
onLateAction オブジェクトが予定されていないか、まだ完了していない場合にトリガーされるアクション。 リファレンスオブジェクト、例:onLateAction「」:「{」ref"myActionId}
onSuccess 現在のオブジェクトが成功したときに実行するアクション。 リファレンスオブジェクト、例:「onSuccess」:{「ref"myActionId」}
output 出力データソース。 リファレンスオブジェクト、例えば「output」:{「ref"myDataNodeId」}
parent スロットの継承元となる現在のオブジェクトの親。 リファレンスオブジェクト、例:「parent」:{「ref"myBaseObjectId」}
pipelineLogUri パイプラインのログをアップロードするための Amazon S3 URI (「s3://BucketName/Key/」など)。 文字列
postActivityTask設定 実行するポストアクティビティ設定スクリプト。Amazon S33 のシェルスクリプトの URI と引数のリストで構成されます。 リファレンスオブジェクト、例:postActivityTask「Config」:{「ref"myShellScriptConfigId」}
preActivityTask設定 実行するプリアクティビティ設定スクリプト。Amazon S3 のシェルスクリプトの URI と引数のリストで構成されます。 リファレンスオブジェクト、例:preActivityTask「Config」:{「ref"myShellScriptConfigId」}
precondition オプションで前提条件を定義します。すべての前提条件を満たすまで、データノードは "READY" とマークされません。 リファレンスオブジェクト、例:「precondition」:{「ref"myPreconditionId」}
reportProgressTimeout reportProgress へのリモート作業の連続した呼び出しのタイムアウト。設定された場合、指定された期間の進捗状況を報告しないリモートアクティビティは停止されたと見なし、再試行できます。 [Period] (期間)
resizeClusterBefore実行中 入力または出力として指定された DynamoDB データノードに対応するため、このアクティビティを実行する前にクラスターのサイズを変更します。
注記

アクティビティで を入力データノードまたは出力データノードDynamoDBDataNodeとして使用し、 resizeClusterBeforeRunningを に設定するとTRUE、 はm3.xlargeインスタンスタイプを使用して AWS Data Pipeline 開始します。これによりインスタンスタイプの選択が m3.xlarge に上書きされるため、月別コストが増加することがあります。

ブール値
resizeClusterMaxインスタンス サイズ変更アルゴリズムによってリクエストできるインスタンスの最大数の制限。 整数
retryDelay 2 回の再試行の間のタイムアウト期間。 [Period] (期間)
scheduleType スケジュールタイプによって、パイプライン定義のオブジェクトを、期間の最初にスケジュールするか、最後にスケジュールするかを指定できます。[Time Series Style Scheduling] は、インスタンスが各間隔の最後にスケジュールされることを意味し、[Cron Style Scheduling] は、インスタンスが各間隔の最初にスケジュールされることを意味します。オンデマンドスケジュールにより、アクティベーションごとに 1 回パイプラインを実行することができます。つまり、パイプラインを再実行するために、クローンしたり再作成したりする必要はありません。オンデマンドスケジュールを使用する場合は、デフォルトオブジェクトで指定し、パイプラインのオブジェクトに対して指定される唯一の scheduleType である必要があります。オンデマンドパイプラインを使用するには、後続の実行ごとに ActivatePipeline オペレーションを呼び出すだけです。値は、cron、ondemand、および timeseries です。 一覧表
scriptVariable Pig スクリプトに渡す引数。スクリプトまたは scriptUri で scriptVariable を使用できます。 文字列
ステージ ステージングが有効かどうか決定し、ステージングされたデータのテーブル ({INPUT1} や {OUTPUT1} など) に Pig スクリプトがアクセスできるようにします。 ブール値

実行時フィールド 説明 スロットタイプ
@activeInstances 現在スケジュールされているアクティブなインスタンスオブジェクトのリスト。 リファレンスオブジェクト、例:「activeInstances」:{「ref"myRunnableObjectId」}
@actualEndTime このオブジェクトの実行が終了した時刻。 DateTime
@actualStartTime このオブジェクトの実行が開始された時刻。 DateTime
cancellationReason このオブジェクトがキャンセルされた場合の cancellationReason。 文字列
@cascadeFailedOn オブジェクトが失敗した際の依存関係チェーンの説明。 リファレンスオブジェクト、例:cascadeFailedOn「」:{「ref"myRunnableObjectId」}
emrStepLog EMR アクティビティの試行でのみ使用可能な Amazon EMR ステップログ。 文字列
errorId このオブジェクトが失敗した場合は errorId。 文字列
errorMessage このオブジェクトが失敗した場合は errorMessage。 文字列
errorStackTrace このオブジェクトが失敗した場合は、エラースタックトレース。 文字列
@finishedTime このオブジェクトが実行を終了した時刻。 DateTime
hadoopJobLog EMR ベースのアクティビティで試みることができる Hadoop ジョブのログ。 文字列
@healthStatus 終了状態に達した最後のオブジェクトインスタンスの成功または失敗を反映する、オブジェクトのヘルスステータス。 文字列
@healthStatusFromInstanceId 終了状態に達した最後のインスタンスオブジェクトの ID。 文字列
@healthStatusUpdatedTime ヘルス状態が最後に更新された時間。 DateTime
hostname タスクの試行を取得したクライアントのホスト名。 文字列
@lastDeactivatedTime このオブジェクトが最後に非アクティブ化された時刻。 DateTime
@latestCompletedRunTime 実行が完了した最後の実行の時刻。 DateTime
@latestRunTime 実行がスケジュールされた最後の実行の時刻。 DateTime
@nextRunTime 次回にスケジュールされた実行の時刻。 DateTime
reportProgressTime リモートアクティビティで進捗状況が報告された最新の時刻。 DateTime
@scheduledEndTime オブジェクトの予定された終了時刻。 DateTime
@scheduledStartTime オブジェクトの予定された開始時刻。 DateTime
@status このオブジェクトのステータス。 文字列
@version オブジェクトを作成したパイプラインのバージョン。 文字列
@waitingOn このオブジェクトが待機している依存関係のリストの説明。 リファレンスオブジェクト、例えば「waitingOn」:{「ref"myRunnableObjectId」}

システムフィールド 説明 スロットタイプ
@error 形式が正しくないオブジェクトを説明するエラー。 文字列
@pipelineId このオブジェクトが属するパイプラインの ID。 文字列
@sphere オブジェクトの球は、ライフサイクルにおける場所を示します。コンポーネントオブジェクトにより、試行オブジェクトを実行するインスタンスオブジェクトが発生します。 文字列

以下の資料も参照してください。