PigActivity - AWS Data Pipeline

AWS Data Pipeline 不再提供給新客戶。現有客戶 AWS Data Pipeline 可繼續正常使用此服務。進一步了解

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

PigActivity

PigActivity 提供 Pig 指令碼的原生支援, AWS Data Pipeline 而不需要使用ShellCommandActivityEmrActivity。此外, PigActivity 支援資料暫存。當預備欄位設為 True 時, AWS Data Pipeline 會將輸入資料做為 Pig 中的結構描述預備,而無須使用者輸入額外的程式碼。

範例

以下範例管道示範如何使用 PigActivity。範例管道會執行下列步驟:

  • MyPigActivity1 從 Amazon S3 載入資料並執行 Pig 指令碼,選取幾欄資料並將其上傳到 Amazon S3。

  • MyPigActivity2 載入第一個輸出,選取幾欄和三列資料,然後將其作為第二個輸出上傳到 Amazon S3。

  • MyPigActivity3 加載第二個輸出數據,插入兩行數據,只將名為「第五」的列插入到 Amazon RDS。

  • MyPigActivity4 會載入 Amazon RDS 資料、選取第一列資料,然後將資料上傳到 Amazon S3。

{ "objects": [ { "id": "MyInputData1", "schedule": { "ref": "MyEmrResourcePeriod" }, "directoryPath": "s3://example-bucket/pigTestInput", "name": "MyInputData1", "dataFormat": { "ref": "MyInputDataType1" }, "type": "S3DataNode" }, { "id": "MyPigActivity4", "scheduleType": "CRON", "schedule": { "ref": "MyEmrResourcePeriod" }, "input": { "ref": "MyOutputData3" }, "pipelineLogUri": "s3://example-bucket/path/", "name": "MyPigActivity4", "runsOn": { "ref": "MyEmrResource" }, "type": "PigActivity", "dependsOn": { "ref": "MyPigActivity3" }, "output": { "ref": "MyOutputData4" }, "script": "B = LIMIT ${input1} 1; ${output1} = FOREACH B GENERATE one;", "stage": "true" }, { "id": "MyPigActivity3", "scheduleType": "CRON", "schedule": { "ref": "MyEmrResourcePeriod" }, "input": { "ref": "MyOutputData2" }, "pipelineLogUri": "s3://example-bucket/path", "name": "MyPigActivity3", "runsOn": { "ref": "MyEmrResource" }, "script": "B = LIMIT ${input1} 2; ${output1} = FOREACH B GENERATE Fifth;", "type": "PigActivity", "dependsOn": { "ref": "MyPigActivity2" }, "output": { "ref": "MyOutputData3" }, "stage": "true" }, { "id": "MyOutputData2", "schedule": { "ref": "MyEmrResourcePeriod" }, "name": "MyOutputData2", "directoryPath": "s3://example-bucket/PigActivityOutput2", "dataFormat": { "ref": "MyOutputDataType2" }, "type": "S3DataNode" }, { "id": "MyOutputData1", "schedule": { "ref": "MyEmrResourcePeriod" }, "name": "MyOutputData1", "directoryPath": "s3://example-bucket/PigActivityOutput1", "dataFormat": { "ref": "MyOutputDataType1" }, "type": "S3DataNode" }, { "id": "MyInputDataType1", "name": "MyInputDataType1", "column": [ "First STRING", "Second STRING", "Third STRING", "Fourth STRING", "Fifth STRING", "Sixth STRING", "Seventh STRING", "Eighth STRING", "Ninth STRING", "Tenth STRING" ], "inputRegEx": "^(\\\\S+) (\\\\S+) (\\\\S+) (\\\\S+) (\\\\S+) (\\\\S+) (\\\\S+) (\\\\S+) (\\\\S+) (\\\\S+)", "type": "RegEx" }, { "id": "MyEmrResource", "region": "us-east-1", "schedule": { "ref": "MyEmrResourcePeriod" }, "keyPair": "example-keypair", "masterInstanceType": "m1.small", "enableDebugging": "true", "name": "MyEmrResource", "actionOnTaskFailure": "continue", "type": "EmrCluster" }, { "id": "MyOutputDataType4", "name": "MyOutputDataType4", "column": "one STRING", "type": "CSV" }, { "id": "MyOutputData4", "schedule": { "ref": "MyEmrResourcePeriod" }, "directoryPath": "s3://example-bucket/PigActivityOutput3", "name": "MyOutputData4", "dataFormat": { "ref": "MyOutputDataType4" }, "type": "S3DataNode" }, { "id": "MyOutputDataType1", "name": "MyOutputDataType1", "column": [ "First STRING", "Second STRING", "Third STRING", "Fourth STRING", "Fifth STRING", "Sixth STRING", "Seventh STRING", "Eighth STRING" ], "columnSeparator": "*", "type": "Custom" }, { "id": "MyOutputData3", "username": "___", "schedule": { "ref": "MyEmrResourcePeriod" }, "insertQuery": "insert into #{table} (one) values (?)", "name": "MyOutputData3", "*password": "___", "runsOn": { "ref": "MyEmrResource" }, "connectionString": "jdbc:mysql://example-database-instance:3306/example-database", "selectQuery": "select * from #{table}", "table": "example-table-name", "type": "MySqlDataNode" }, { "id": "MyOutputDataType2", "name": "MyOutputDataType2", "column": [ "Third STRING", "Fourth STRING", "Fifth STRING", "Sixth STRING", "Seventh STRING", "Eighth STRING" ], "type": "TSV" }, { "id": "MyPigActivity2", "scheduleType": "CRON", "schedule": { "ref": "MyEmrResourcePeriod" }, "input": { "ref": "MyOutputData1" }, "pipelineLogUri": "s3://example-bucket/path", "name": "MyPigActivity2", "runsOn": { "ref": "MyEmrResource" }, "dependsOn": { "ref": "MyPigActivity1" }, "type": "PigActivity", "script": "B = LIMIT ${input1} 3; ${output1} = FOREACH B GENERATE Third, Fourth, Fifth, Sixth, Seventh, Eighth;", "output": { "ref": "MyOutputData2" }, "stage": "true" }, { "id": "MyEmrResourcePeriod", "startDateTime": "2013-05-20T00:00:00", "name": "MyEmrResourcePeriod", "period": "1 day", "type": "Schedule", "endDateTime": "2013-05-21T00:00:00" }, { "id": "MyPigActivity1", "scheduleType": "CRON", "schedule": { "ref": "MyEmrResourcePeriod" }, "input": { "ref": "MyInputData1" }, "pipelineLogUri": "s3://example-bucket/path", "scriptUri": "s3://example-bucket/script/pigTestScipt.q", "name": "MyPigActivity1", "runsOn": { "ref": "MyEmrResource" }, "scriptVariable": [ "column1=First", "column2=Second", "three=3" ], "type": "PigActivity", "output": { "ref": "MyOutputData1" }, "stage": "true" } ] }

pigTestScript.q 的內容如下所示。

B = LIMIT ${input1} $three; ${output1} = FOREACH B GENERATE $column1, $column2, Third, Fourth, Fifth, Sixth, Seventh, Eighth;

語法

物件呼叫欄位 描述 槽類型
schedule 在排程間隔的執行期間會呼叫此物件。使用者必須指定另一個物件的排程參考,設定此物件的相依性執行順序。使用者可以在物件上明確設定排程來滿足此需求,例如指定「schedule」: {"ref」: DefaultSchedule "}。在大部分的情況下,建議您將排程參考放在預設的管道物件,讓所有物件都繼承該排程。或者,如果管道有排程的樹狀目錄 (主排程內還有排程),使用者可以建立有排程參考的父物件。如需範例選用排程組態的詳細資訊,請參閱https://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-object-schedule.html 參考對象,例如,「時間表」:{「ref」:」myScheduleId「}

必要的群組 (下列其中之一為必要) 描述 槽類型
script 要執行的 Pig 指令碼。 字串
scriptUri 要執行的 Pig 指令碼的位置 (例如 s3://scriptLocation)。 字串

必要的群組 (下列其中之一為必要) 描述 槽類型
runsOn EMR PigActivity 執行此動作的叢集。 參考物件,例如,"runsOn" ": {" ref」: "myEmrClusterId"}
workerGroup 工作者群組。這是用於路由任務。如果您提供 runsOn 值,且 workerGroup 存在,則會忽略 workerGroup 字串

選用欄位 描述 槽類型
attemptStatus 遠端活動最新回報的狀態。 字串
attemptTimeout 遠端工作完成的逾時。如果設定,則系統可能會重試未在設定開始時間內完成的遠端活動。 期間
dependsOn 指定與其他可執行物件的相依性。 參考物件,例如,dependsOn"": {"ref」:」myActivityId「}
failureAndRerun模式 描述相依性故障或重新執行時的消費者節點行為。 列舉
input 輸入資料來源。 引用對象,例如,「輸入」:{「ref」:」myDataNode Id「}
lateAfterTimeout 管線開始後,物件必須在其中完成的經過時間。僅當明細表類型未設定為時,才會觸發此選項ondemand 期間
maxActiveInstances 同時作用中的元件執行個體數目上限。重新執行不計入作用中的執行個體數量。 Integer
maximumRetries 故障時嘗試重試的次數上限。 Integer
onFail 目前物件發生故障時要執行的動作。 參考物件,例如,onFail"": {"ref」:」myActionId「}
onLateAction 某個物件尚未排程或仍未完成時,應該觸發的動作。 參考物件,例如,onLateAction"": {"ref」:」myActionId「}
onSuccess 目前物件成功時要執行的動作。 參考物件,例如,onSuccess"": {"ref」:」myActionId「}
output 輸出資料來源。 引用對象,例如,「輸出」:{「ref」:」myDataNode Id「}
parent 目前物件的父系,其插槽會被繼承。 引用對象,例如,「父」:{「ref」:」myBaseObject Id「}
pipelineLogUri 用於上傳管道日誌的 Amazon S3URI(例如 's3://BucketName/密鑰/')。 字串
postActivityTaskConfig 要執行的活動後組態指令碼。這包括 Amazon S33 中URI的 shell 腳本和一個參數列表。 引用對象,例如,「postActivityTaskConfig」:{「ref」:」myShellScript ConfigId「}
preActivityTaskConfig 要執行的活動前組態指令碼。這是由 Amazon S3 中URI的殼層指令碼和引數清單所組成。 引用對象,例如,「preActivityTaskConfig」:{「ref」:」myShellScript ConfigId「}
precondition 選擇是否定義先決條件。在符合所有先決條件之前,資料節點不會標記 READY ""。 引用對象,例如,「先決條件」:{「ref」:」myPreconditionId「}
reportProgressTimeout 遠端工作連續呼叫 reportProgress 的逾時。如果設定,則不回報指定時段進度的遠端活動,可能會視為已停滯而重試。 期間
resizeClusterBefore跑步 在執行此活動之前重新調整叢集大小,以容納指定為輸入或輸出的 DynamoDB 資料節點。
注意

如果您的活動使用 a DynamoDBDataNode 作為輸入或輸出資料節點,並且resizeClusterBeforeRunning將設定為TRUE,則會 AWS Data Pipeline 開始使用m3.xlarge執行個體類型。這會將您選擇的執行個體類型覆寫為 m3.xlarge,可能會增加您的每月成本。

Boolean
resizeClusterMax實例 調整大小演算法可請求的執行個體數目上限。 Integer
retryDelay 兩次重試嘗試之間的逾時持續時間。 期間
scheduleType 排程類型可讓您指定管道定義的物件應該排程在間隔開頭還是間隔結尾。時間序列樣式排程表示執行個體排程在每個間隔的結尾,而 Cron 樣式排程表示執行個體排程在每個間隔的開頭。隨需排程可讓您在每次啟用時執行一次管道。這表示您不必複製或重新建立管道,然後再執行一次。如果您使用隨選排程,則必須在預設物件中指定該排程,且必須是管線中物件的唯一 scheduleType 指定排程。若要使用隨選管線,您只需針對每次後續執行呼叫 ActivatePipeline 作業即可。值為:Cron、ondemand 和 timeseries。 列舉
scriptVariable 要傳遞給 Pig 指令碼的引數。您可以使 scriptVariable 用腳本或scriptUri. 字串
stage 判斷是否啟用暫存,並允許 Pig 指令碼存取分段資料表格,例如 $ {INPUT1} 和 $ {OUTPUT1}。 Boolean

執行時間欄位 描述 槽類型
@activeInstances 目前已排程的作用中執行個體物件清單。 參考物件,例如,"activeInstances" ": {" ref」: "myRunnableObjectId"}
@actualEndTime 此物件執行完成的時間。 DateTime
@actualStartTime 此物件執行開始的時間。 DateTime
cancellationReason cancellationReason 如果此物件已取消。 字串
@cascadeFailedOn 物件失敗所在的相依鏈的描述。 參考物件,例如,"cascadeFailedOn" ": {" ref」: "myRunnableObjectId"}
emrStepLog Amazon EMR 步驟日誌僅適用於EMR活動嘗試。 字串
errorId errorId 如果此對象失敗。 字串
errorMessage errorMessage 如果此對象失敗。 字串
errorStackTrace 如果此物件失敗,則為錯誤堆疊追蹤。 字串
@finishedTime 此物件完成其執行的時間。 DateTime
hadoopJobLog Hadoop 工作日誌可用於EMR基於活動的嘗試。 字串
@healthStatus 反映已達終止狀態之最後一個物件執行個體成功或失敗的物件運作狀態。 字串
@healthStatusFromInstanceId 已達終止狀態之最後一個執行個體物件的 ID。 字串
@ healthStatusUpdated 時間 上次更新運作狀態的時間。 DateTime
hostname 選取任務嘗試之用戶端的主機名稱。 字串
@lastDeactivatedTime 此物件最後停用的時間。 DateTime
@ latestCompletedRun 時間 執行完成最近一次執行的時間。 DateTime
@latestRunTime 執行排程最近一次執行的時間。 DateTime
@nextRunTime 下次要排程執行的時間。 DateTime
reportProgressTime 遠端活動最近報告進度的時間。 DateTime
@scheduledEndTime 物件的排程結束時間。 DateTime
@scheduledStartTime 物件的排程開始時間。 DateTime
@status 此物件的狀態。 字串
@version 建立物件時使用的管道版本。 字串
@waitingOn 此物件等待之相依性清單的描述。 參考物件,例如,"waitingOn" ": {" ref」: "myRunnableObjectId"}

系統欄位 描述 槽類型
@error 描述格式錯誤物件的錯誤。 字串
@pipelineId 此物件所屬管道的 ID。 字串
@sphere 物件範圍代表其在生命週期中的位置:Component 物件會引發執行 Attempt 物件的 Instance 物件。 字串

另請參閱