PigActivity - AWS Data Pipeline

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

PigActivity

PigActivity fornisce supporto nativo per gli script Pig AWS Data Pipeline senza la necessità di utilizzare o. ShellCommandActivity EmrActivity Inoltre, PigActivity supporta l'archiviazione dei dati. Quando il campo della fase è impostato su true, AWS Data Pipeline gestisce temporaneamente i dati di input come schema in Pig senza codice aggiuntivo da parte dell'utente.

Esempio

La seguente pipeline di esempio mostra come utilizzare PigActivity. La pipeline di esempio esegue le operazioni seguenti:

  • MyPigActivity1 carica i dati da Amazon S3 ed esegue uno script Pig che seleziona alcune colonne di dati e li carica su Amazon S3.

  • MyPigActivity2 carica il primo output, seleziona alcune colonne e tre righe di dati e lo carica su Amazon S3 come secondo output.

  • MyPigActivity3 carica il secondo dato di output, inserisce due righe di dati e solo la colonna denominata «quinta» in Amazon RDS.

  • MyPigActivity4 carica i dati Amazon RDS, seleziona la prima riga di dati e la carica su Amazon S3.

{ "objects": [ { "id": "MyInputData1", "schedule": { "ref": "MyEmrResourcePeriod" }, "directoryPath": "s3://example-bucket/pigTestInput", "name": "MyInputData1", "dataFormat": { "ref": "MyInputDataType1" }, "type": "S3DataNode" }, { "id": "MyPigActivity4", "scheduleType": "CRON", "schedule": { "ref": "MyEmrResourcePeriod" }, "input": { "ref": "MyOutputData3" }, "pipelineLogUri": "s3://example-bucket/path/", "name": "MyPigActivity4", "runsOn": { "ref": "MyEmrResource" }, "type": "PigActivity", "dependsOn": { "ref": "MyPigActivity3" }, "output": { "ref": "MyOutputData4" }, "script": "B = LIMIT ${input1} 1; ${output1} = FOREACH B GENERATE one;", "stage": "true" }, { "id": "MyPigActivity3", "scheduleType": "CRON", "schedule": { "ref": "MyEmrResourcePeriod" }, "input": { "ref": "MyOutputData2" }, "pipelineLogUri": "s3://example-bucket/path", "name": "MyPigActivity3", "runsOn": { "ref": "MyEmrResource" }, "script": "B = LIMIT ${input1} 2; ${output1} = FOREACH B GENERATE Fifth;", "type": "PigActivity", "dependsOn": { "ref": "MyPigActivity2" }, "output": { "ref": "MyOutputData3" }, "stage": "true" }, { "id": "MyOutputData2", "schedule": { "ref": "MyEmrResourcePeriod" }, "name": "MyOutputData2", "directoryPath": "s3://example-bucket/PigActivityOutput2", "dataFormat": { "ref": "MyOutputDataType2" }, "type": "S3DataNode" }, { "id": "MyOutputData1", "schedule": { "ref": "MyEmrResourcePeriod" }, "name": "MyOutputData1", "directoryPath": "s3://example-bucket/PigActivityOutput1", "dataFormat": { "ref": "MyOutputDataType1" }, "type": "S3DataNode" }, { "id": "MyInputDataType1", "name": "MyInputDataType1", "column": [ "First STRING", "Second STRING", "Third STRING", "Fourth STRING", "Fifth STRING", "Sixth STRING", "Seventh STRING", "Eighth STRING", "Ninth STRING", "Tenth STRING" ], "inputRegEx": "^(\\\\S+) (\\\\S+) (\\\\S+) (\\\\S+) (\\\\S+) (\\\\S+) (\\\\S+) (\\\\S+) (\\\\S+) (\\\\S+)", "type": "RegEx" }, { "id": "MyEmrResource", "region": "us-east-1", "schedule": { "ref": "MyEmrResourcePeriod" }, "keyPair": "example-keypair", "masterInstanceType": "m1.small", "enableDebugging": "true", "name": "MyEmrResource", "actionOnTaskFailure": "continue", "type": "EmrCluster" }, { "id": "MyOutputDataType4", "name": "MyOutputDataType4", "column": "one STRING", "type": "CSV" }, { "id": "MyOutputData4", "schedule": { "ref": "MyEmrResourcePeriod" }, "directoryPath": "s3://example-bucket/PigActivityOutput3", "name": "MyOutputData4", "dataFormat": { "ref": "MyOutputDataType4" }, "type": "S3DataNode" }, { "id": "MyOutputDataType1", "name": "MyOutputDataType1", "column": [ "First STRING", "Second STRING", "Third STRING", "Fourth STRING", "Fifth STRING", "Sixth STRING", "Seventh STRING", "Eighth STRING" ], "columnSeparator": "*", "type": "Custom" }, { "id": "MyOutputData3", "username": "___", "schedule": { "ref": "MyEmrResourcePeriod" }, "insertQuery": "insert into #{table} (one) values (?)", "name": "MyOutputData3", "*password": "___", "runsOn": { "ref": "MyEmrResource" }, "connectionString": "jdbc:mysql://example-database-instance:3306/example-database", "selectQuery": "select * from #{table}", "table": "example-table-name", "type": "MySqlDataNode" }, { "id": "MyOutputDataType2", "name": "MyOutputDataType2", "column": [ "Third STRING", "Fourth STRING", "Fifth STRING", "Sixth STRING", "Seventh STRING", "Eighth STRING" ], "type": "TSV" }, { "id": "MyPigActivity2", "scheduleType": "CRON", "schedule": { "ref": "MyEmrResourcePeriod" }, "input": { "ref": "MyOutputData1" }, "pipelineLogUri": "s3://example-bucket/path", "name": "MyPigActivity2", "runsOn": { "ref": "MyEmrResource" }, "dependsOn": { "ref": "MyPigActivity1" }, "type": "PigActivity", "script": "B = LIMIT ${input1} 3; ${output1} = FOREACH B GENERATE Third, Fourth, Fifth, Sixth, Seventh, Eighth;", "output": { "ref": "MyOutputData2" }, "stage": "true" }, { "id": "MyEmrResourcePeriod", "startDateTime": "2013-05-20T00:00:00", "name": "MyEmrResourcePeriod", "period": "1 day", "type": "Schedule", "endDateTime": "2013-05-21T00:00:00" }, { "id": "MyPigActivity1", "scheduleType": "CRON", "schedule": { "ref": "MyEmrResourcePeriod" }, "input": { "ref": "MyInputData1" }, "pipelineLogUri": "s3://example-bucket/path", "scriptUri": "s3://example-bucket/script/pigTestScipt.q", "name": "MyPigActivity1", "runsOn": { "ref": "MyEmrResource" }, "scriptVariable": [ "column1=First", "column2=Second", "three=3" ], "type": "PigActivity", "output": { "ref": "MyOutputData1" }, "stage": "true" } ] }

Il contenuto di pigTestScript.q è il seguente.

B = LIMIT ${input1} $three; ${output1} = FOREACH B GENERATE $column1, $column2, Third, Fourth, Fifth, Sixth, Seventh, Eighth;

Sintassi

Campi Object Invocation Descrizione Tipo di slot
schedule Questo oggetto viene richiamato entro l'esecuzione di un intervallo di pianificazione. Gli utenti devono specificare un riferimento alla pianificazione di un altro oggetto per impostare l'ordine di esecuzione delle dipendenze per questo oggetto. Gli utenti possono soddisfare questo requisito impostando esplicitamente una pianificazione sull'oggetto, ad esempio specificando «schedule»: {"ref»: "«}. DefaultSchedule Nella maggior parte dei casi, è meglio inserire il riferimento alla pianificazione nell'oggetto pipeline di default, in modo che tutti gli oggetti possano ereditare tale pianificazione. O, se la pipeline consiste di una struttura di pianificazioni (nidificate all'interno della pianificazione principale), gli utenti possono creare un oggetto padre che dispone di un riferimento alla pianificazione. Per ulteriori informazioni sulle configurazioni di pianificazione opzionali di esempio, consulta https://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-object-schedule.html Oggetto di riferimento, ad esempio, «schedule»: {"ref»:» myScheduleId «}

Gruppo richiesto (uno dei seguenti è obbligatorio) Descrizione Tipo di slot
script Lo script Pig da eseguire Stringa
scriptUri La posizione dello script Pig da eseguire (ad esempio, s3:// scriptLocation). Stringa

Gruppo richiesto (uno dei seguenti è obbligatorio) Descrizione Tipo di slot
runsOn Cluster EMR su cui viene eseguito. PigActivity Oggetto di riferimento, ad esempio «runSon»: {"ref»:» myEmrCluster Id "}
workerGroup Il gruppo di lavoro. Utilizzato per le attività di routing. Se si fornisce un valore runsOn ed esiste workerGroup, workerGroup verrà ignorato. Stringa

Campi opzionali Descrizione Tipo di slot
attemptStatus Lo stato segnalato più di recente dall'attività remota. Stringa
attemptTimeout Il timeout per il completamento del lavoro in remoto. Se questo campo è impostato, un'attività remota che non viene completata entro il tempo impostato di avvio viene tentata di nuovo. Periodo
dependsOn Specifica la dipendenza su un altro oggetto eseguibile. Oggetto di riferimento, ad esempio «dependsOn»: {"ref»:» myActivityId «}
failureAndRerunModo Descrive il comportamento del nodo consumer quando le dipendenze presentano un errore o vengono di nuovo eseguite. Enumerazione
input Origine dati di input. Oggetto di riferimento, ad esempio, «input»: {"ref»:» myDataNode Id "}
lateAfterTimeout Il tempo trascorso dall'inizio della pipeline entro il quale l'oggetto deve essere completato. Viene attivato solo quando il tipo di pianificazione non è impostato su. ondemand Periodo
maxActiveInstances Il numero massimo di istanze attive simultanee di un componente. Le riesecuzioni non contano ai fini del numero di istanze attive. Numero intero
maximumRetries Numero massimo di tentativi in caso di errore. Numero intero
onFail Un'azione da eseguire quando l'oggetto corrente ha esito negativo. Oggetto di riferimento, ad esempio «onFail»: {"ref»:» myActionId «}
onLateAction Azioni che devono essere attivate se un oggetto non è stato ancora pianificato o non è ancora completo. Oggetto di riferimento, ad esempio "onLateAction«: {" ref»:» myActionId «}
onSuccess Un'operazione da eseguire quando l'oggetto corrente ha esito positivo. Oggetto di riferimento, ad esempio «onSuccess»: {"ref»:» myActionId «}
output Origine dati di output. Oggetto di riferimento, ad esempio «output»: {"ref»:» myDataNode Id "}
parent Padre dell'oggetto corrente da cui saranno ereditati gli slot. Oggetto di riferimento, ad esempio «parent»: {"ref»:» myBaseObject Id "}
pipelineLogUri L'URI di Amazon S3 (ad esempio 's3://BucketName/Key/ ') per caricare i log per la pipeline. Stringa
postActivityTaskConfig Lo script di configurazione post-attività da eseguire. È costituito da un URI dello script di shell in Amazon S33 e da un elenco di argomenti. Oggetto di riferimento, ad esempio "postActivityTaskConfig»: {"ref»:» myShellScript ConfigId «}
preActivityTaskConfig Lo script di configurazione pre-attività da eseguire. Questo è composto da un URI dello script della shell in Amazon S3 e da un elenco di argomenti. Oggetto di riferimento, ad esempio "preActivityTaskConfig»: {"ref»:» myShellScript ConfigId «}
precondizione Definisce eventualmente una precondizione. Un nodo dati non è contrassegnato come "READY" finché tutte le precondizioni non siano state soddisfatte. Oggetto di riferimento, ad esempio, «precondition»: {"ref»:» myPreconditionId «}
reportProgressTimeout Timeout per chiamate successive di attività in remoto a reportProgress. Se impostato, le attività in remoto che non presentano avanzamenti nel periodo specificato potrebbero essere considerate bloccate e sono quindi oggetto di un altro tentativo. Periodo
resizeClusterBeforeIn esecuzione Ridimensiona il cluster prima di eseguire questa attività per adattare i nodi di dati DynamoDB specificati come input o output.
Nota

Se la tua attività utilizza un DynamoDBDataNode come nodo di dati di input o output e lo imposti su, inizia resizeClusterBeforeRunning a TRUE utilizzare i tipi di istanza. AWS Data Pipeline m3.xlarge Questo sovrascrive le tue scelte in termini di tipi di istanze con m3.xlarge, con un possibile aumento dei costi.

Booleano
resizeClusterMaxIstanze Un limite per il numero massimo di istanze che possono essere richieste dall'algoritmo di ridimensionamento. Numero intero
retryDelay La durata del timeout tra due tentativi. Periodo
scheduleType Il tipo di pianificazione consente di specificare se gli oggetti nella definizione di pipeline devono essere programmati all'inizio o alla fine dell'intervallo. Time Series Style Scheduling significa che le istanze vengono programmate al termine di ogni intervallo e Cron Style Scheduling significa che le istanze vengono programmate all'inizio di ogni intervallo. Una pianificazione on demand consente di eseguire una pipeline una sola volta, per attivazione. Questo significa che non è necessario clonare o ricreare la pipeline per eseguirla di nuovo. Se utilizzi una pianificazione on demand, devi specificarlo nell'oggetto predefinito e deve essere l'unico scheduleType specificato per gli oggetti della pipeline. Per utilizzare le pipeline su richiesta, è sufficiente chiamare l' ActivatePipeline operazione per ogni esecuzione successiva. I valori sono: cron, ondemand e timeseries. Enumerazione
scriptVariable Gli argomenti da passare allo script Pig. È possibile utilizzare scriptVariable con lo script o scriptUri. Stringa
fase Stabilisce se è abilitata la gestione temporanea e consente allo script Pig di accedere alle tabelle dei dati gestiti temporaneamente, ad esempio ${INPUT1} e ${OUTPUT1}. Booleano

Campi Runtime Descrizione Tipo di slot
@activeInstances Elenco di oggetti di istanze attive attualmente programmate. Oggetto di riferimento, ad esempio «activeInstances»: {"ref»:» Id "} myRunnableObject
@actualEndTime L'ora in cui è terminata l'esecuzione di questo oggetto. DateTime
@actualStartTime L'ora in cui è stata avviata l'esecuzione di questo oggetto. DateTime
cancellationReason CancellationReason se questo oggetto è stato annullato. Stringa
@cascadeFailedOn Descrizione della catena di dipendenza che ha generato l'errore dell'oggetto. Oggetto di riferimento, ad esempio "cascadeFailedOn«: {" ref»:» myRunnableObject Id "}
emrStepLog I log dei passaggi di Amazon EMR sono disponibili solo nei tentativi di attività EMR. Stringa
errorId ErrorId se l'oggetto non è riuscito. Stringa
errorMessage ErrorMessage se l'oggetto non è riuscito. Stringa
errorStackTrace Traccia dello stack di errore se l'oggetto non è riuscito. Stringa
@finishedTime L'ora in cui è terminata l'esecuzione di questo oggetto. DateTime
hadoopJobLog Log delle attività Hadoop disponibili per le attività basate su EMR. Stringa
@healthStatus Lo stato di integrità dell'oggetto che riflette l'esito positivo o negativo dell'ultima istanza dell'oggetto che ha raggiunto lo stato di un'istanza terminata. Stringa
@healthStatusFromInstanceId Id dell'ultimo oggetto dell'istanza che ha raggiunto lo stato terminato. Stringa
@ Ora healthStatusUpdated L'ora in cui lo stato di integrità è stato aggiornato l'ultima volta. DateTime
hostname Il nome host del client che si è aggiudicato il tentativo dell'attività. Stringa
@lastDeactivatedTime L'ora in cui l'oggetto è stato disattivato. DateTime
@ latestCompletedRun Ora L'orario dell'esecuzione più recente durante il quale l'esecuzione è stata completata. DateTime
@latestRunTime L'orario dell'esecuzione più recente durante il quale l'esecuzione è stata pianificata. DateTime
@nextRunTime L'orario dell'esecuzione da programmare come successiva. DateTime
reportProgressTime Il periodo di tempo più recente in cui l'attività remota ha segnalato un progresso. DateTime
@scheduledEndTime L'orario di termine della pianificazione per l'oggetto. DateTime
@scheduledStartTime L'orario di inizio della pianificazione per l'oggetto. DateTime
@status Lo stato di questo oggetto. Stringa
@version Versione della pipeline con cui l'oggetto è stato creato. Stringa
@waitingOn Descrizione dell'elenco di dipendenze per cui questo oggetto è in attesa. Oggetto di riferimento, ad esempio «waitingOn»: {"ref»:» myRunnableObject Id "}

Campi di sistema Descrizione Tipo di slot
@error Errore che descrive il formato oggetto errato. Stringa
@pipelineId L'ID della pipeline a cui appartiene questo oggetto. Stringa
@sphere La sfera di un oggetto indica la propria posizione nel ciclo di vita: i Component Objects generano Instance Objects che eseguono Attempt Objects. Stringa

Vedi anche