AWS Data Pipeline non è più disponibile per i nuovi clienti. Clienti esistenti di AWS Data Pipeline possono continuare a utilizzare il servizio normalmente. Ulteriori informazioni
Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.
HadoopActivity
Esegue un MapReduce processo su un cluster. Il cluster può essere un EMR cluster gestito da AWS Data Pipeline o un'altra risorsa, se lo utilizzi TaskRunner. Da utilizzare HadoopActivity quando si desidera eseguire il lavoro in parallelo. Ciò consente di utilizzare le risorse di pianificazione del YARN framework o del negoziatore di MapReduce risorse in Hadoop 1. Se desideri eseguire il lavoro in sequenza utilizzando l'azione Amazon EMR Step, puoi comunque utilizzareEmrActivity.
Esempi
HadoopActivity utilizzando un EMR cluster gestito da AWS Data Pipeline
L' HadoopActivity oggetto seguente utilizza una EmrCluster risorsa per eseguire un programma:
{ "name": "MyHadoopActivity", "schedule": {"ref": "ResourcePeriod"}, "runsOn": {"ref": “MyEmrCluster”}, "type": "HadoopActivity", "preActivityTaskConfig":{"ref":"preTaskScriptConfig”}, "jarUri": "/home/hadoop/contrib/streaming/hadoop-streaming.jar", "argument": [ "-files", “s3://elasticmapreduce/samples/wordcount/wordSplitter.py“, "-mapper", "wordSplitter.py", "-reducer", "aggregate", "-input", "s3://elasticmapreduce/samples/wordcount/input/", "-output", “s3://test-bucket/MyHadoopActivity/#{@pipelineId}/#{format(@scheduledStartTime,'YYYY-MM-dd')}" ], "maximumRetries": "0", "postActivityTaskConfig":{"ref":"postTaskScriptConfig”}, "hadoopQueue" : “high” }
Ecco il corrispondente MyEmrCluster
, che configura le code YARN per FairScheduler sistemi basati su Hadoop 2: AMIs
{ "id" : "MyEmrCluster", "type" : "EmrCluster", "hadoopSchedulerType" : "PARALLEL_FAIR_SCHEDULING", “amiVersion” : “3.7.0”, "bootstrapAction" : ["s3://
Region
.elasticmapreduce/bootstrap-actions/configure-hadoop,-z,yarn.scheduler.capacity.root.queues=low\,high\,default,-z,yarn.scheduler.capacity.root.high.capacity=50,-z,yarn.scheduler.capacity.root.low.capacity=10,-z,yarn.scheduler.capacity.root.default.capacity=30”] }
Questo è quello che si usa per configurare in EmrCluster Hadoop 1: FairScheduler
{ "id": "MyEmrCluster", "type": "EmrCluster", "hadoopSchedulerType": "PARALLEL_FAIR_SCHEDULING", "amiVersion": "2.4.8", "bootstrapAction": "s3://
Region
.elasticmapreduce/bootstrap-actions/configure-hadoop,-m,mapred.queue.names=low\\\\,high\\\\,default,-m,mapred.fairscheduler.poolnameproperty=mapred.job.queue.name" }
Le seguenti EmrCluster configurazioni CapacityScheduler per sistemi basati su Hadoop 2: AMIs
{ "id": "MyEmrCluster", "type": "EmrCluster", "hadoopSchedulerType": "PARALLEL_CAPACITY_SCHEDULING", "amiVersion": "3.7.0", "bootstrapAction": "s3://
Region
.elasticmapreduce/bootstrap-actions/configure-hadoop,-z,yarn.scheduler.capacity.root.queues=low\\\\,high,-z,yarn.scheduler.capacity.root.high.capacity=40,-z,yarn.scheduler.capacity.root.low.capacity=60" }
HadoopActivity utilizzando un cluster esistente EMR
In questo esempio, si utilizzano workergroups e TaskRunner a per eseguire un programma su un cluster esistenteEMR. La seguente definizione di pipeline utilizza per: HadoopActivity
-
Esegue un MapReduce programma solo su
myWorkerGroup
risorse. Per ulteriori informazioni sui gruppi di lavoratori, consulta Esecuzione di lavori su risorse esistenti utilizzando Task Runner. -
Esegui un preActivityTask Config e Config postActivityTask
{ "objects": [ { "argument": [ "-files", "s3://elasticmapreduce/samples/wordcount/wordSplitter.py", "-mapper", "wordSplitter.py", "-reducer", "aggregate", "-input", "s3://elasticmapreduce/samples/wordcount/input/", "-output", "s3://test-bucket/MyHadoopActivity/#{@pipelineId}/#{format(@scheduledStartTime,'YYYY-MM-dd')}" ], "id": "MyHadoopActivity", "jarUri": "/home/hadoop/contrib/streaming/hadoop-streaming.jar", "name": "MyHadoopActivity", "type": "HadoopActivity" }, { "id": "SchedulePeriod", "startDateTime": "start_datetime", "name": "SchedulePeriod", "period": "1 day", "type": "Schedule", "endDateTime": "end_datetime" }, { "id": "ShellScriptConfig", "scriptUri": "s3://test-bucket/scripts/preTaskScript.sh", "name": "preTaskScriptConfig", "scriptArgument": [ "test", "argument" ], "type": "ShellScriptConfig" }, { "id": "ShellScriptConfig", "scriptUri": "s3://test-bucket/scripts/postTaskScript.sh", "name": "postTaskScriptConfig", "scriptArgument": [ "test", "argument" ], "type": "ShellScriptConfig" }, { "id": "Default", "scheduleType": "cron", "schedule": { "ref": "SchedulePeriod" }, "name": "Default", "pipelineLogUri": "s3://test-bucket/logs/2015-05-22T18:02:00.343Z642f3fe415", "maximumRetries": "0", "workerGroup": "myWorkerGroup", "preActivityTaskConfig": { "ref": "preTaskScriptConfig" }, "postActivityTaskConfig": { "ref": "postTaskScriptConfig" } } ] }
Sintassi
Campi obbligatori | Descrizione | Tipo di slot |
---|---|---|
jarUri | Posizione di un file system JAR in Amazon S3 o del file system locale del cluster con cui eseguire. HadoopActivity | Stringa |
Campi Object Invocation | Descrizione | Tipo di slot |
---|---|---|
schedule | Questo oggetto viene richiamato entro l'esecuzione di un intervallo di pianificazione. Gli utenti devono specificare un riferimento alla pianificazione di un altro oggetto per impostare l'ordine di esecuzione delle dipendenze per questo oggetto. Gli utenti possono soddisfare questo requisito impostando esplicitamente una pianificazione sull'oggetto, ad esempio specificando «schedule»: {"ref»: "DefaultSchedule«}. Nella maggior parte dei casi, è meglio inserire il riferimento alla pianificazione nell'oggetto pipeline di default, in modo che tutti gli oggetti possano ereditare tale pianificazione. O, se la pipeline consiste di una struttura di pianificazioni (nidificate all'interno della pianificazione principale), gli utenti possono creare un oggetto padre che dispone di un riferimento alla pianificazione. Per ulteriori informazioni sulle configurazioni di pianificazione opzionali di esempio, consulta https://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-object-schedule.html | Oggetto di riferimento, ad esempio «schedule»: {"ref»:» «myScheduleId} |
Gruppo richiesto (uno dei seguenti è obbligatorio) | Descrizione | Tipo di slot |
---|---|---|
runsOn | EMRCluster su cui verrà eseguito questo processo. | Oggetto di riferimento, ad esempio "runsOn«: {" ref»:» myEmrCluster Id "} |
workerGroup | Il gruppo di lavoro. Utilizzato per le attività di routing. Se si fornisce un runsOn valore ed workerGroup esiste, workerGroup viene ignorato. | Stringa |
Campi opzionali | Descrizione | Tipo di slot |
---|---|---|
argument | Argomenti da passare a. JAR | Stringa |
attemptStatus | Lo stato segnalato più di recente dall'attività remota. | Stringa |
attemptTimeout | Timeout per il completamento del lavoro in remoto. Se questo campo è impostato, un'attività remota che non viene completata entro il tempo impostato di avvio viene tentata di nuovo. | Periodo |
dependsOn | Specifica una dipendenza su un altro oggetto eseguibile. | Oggetto di riferimento, ad esempio "dependsOn«: {" ref»:» myActivityId «} |
failureAndRerunModalità | Descrive il comportamento del nodo consumer quando le dipendenze presentano un errore o vengono di nuovo eseguite | Enumerazione |
hadoopQueue | Il nome della coda del pianificatore Hadoop a cui verrà inviata l'attività. | Stringa |
input | Posizione dei dati di input. | Oggetto di riferimento, ad esempio «input»: {"ref»:» myDataNode Id "} |
lateAfterTimeout | Il tempo trascorso dall'inizio della pipeline entro il quale l'oggetto deve essere completato. Viene attivato solo quando il tipo di pianificazione non è impostato su. ondemand |
Periodo |
mainClass | La classe principale con HadoopActivity cui JAR stai eseguendo. | Stringa |
maxActiveInstances | Il numero massimo di istanze attive simultanee di un componente. Le riesecuzioni non contano ai fini del numero di istanze attive. | Numero intero |
maximumRetries | Numero massimo di tentativi in caso di errore | Numero intero |
onFail | Un'azione da eseguire quando l'oggetto corrente ha esito negativo. | Oggetto di riferimento, ad esempio "onFail«: {" ref»:» myActionId «} |
onLateAction | Azioni che devono essere attivate se un oggetto non è stato ancora pianificato o non è ancora completo. | Oggetto di riferimento, ad esempio "onLateAction«: {" ref»:» myActionId «} |
onSuccess | Un'operazione da eseguire quando l'oggetto corrente ha esito positivo. | Oggetto di riferimento, ad esempio "onSuccess«: {" ref»:» myActionId «} |
output | Posizione dei dati di output. | Oggetto di riferimento, ad esempio «output»: {"ref»:» myDataNode Id "} |
parent | Padre dell'oggetto corrente da cui saranno ereditati gli slot. | Oggetto di riferimento, ad esempio «parent»: {"ref»:» myBaseObject Id "} |
pipelineLogUri | L'S3 URI (come 's3://BucketName/Key/ ') per caricare i log per la pipeline. | Stringa |
postActivityTaskConfig | Lo script di configurazione post-attività da eseguire. Consiste in uno script URI di shell in Amazon S3 e in un elenco di argomenti. | Oggetto di riferimento, ad esempio "postActivityTaskConfig»: {"ref»:» myShellScript ConfigId «} |
preActivityTaskConfig | Lo script di configurazione pre-attività da eseguire. Consiste in uno script URI di shell in Amazon S3 e in un elenco di argomenti. | Oggetto di riferimento, ad esempio "preActivityTaskConfig»: {"ref»:» myShellScript ConfigId «} |
precondizione | Definisce eventualmente una precondizione. Un nodo di dati non è contrassegnato "READY" finché non sono state soddisfatte tutte le condizioni preliminari. | Oggetto di riferimento, ad esempio «precondition»: {"ref»:» «myPreconditionId} |
reportProgressTimeout | Timeout per il lavoro remoto: chiamate successive a. reportProgress Se impostato, le attività in remoto che non presentano avanzamenti nel periodo specificato potrebbero essere considerate bloccate e sono quindi oggetto di un altro tentativo. | Periodo |
retryDelay | La durata del timeout tra due tentativi. | Periodo |
scheduleType | Il tipo di pianificazione consente di specificare se gli oggetti nella definizione di pipeline devono essere programmati all'inizio o alla fine dell'intervallo. Time Series Style Scheduling significa che le istanze vengono programmate al termine di ogni intervallo e Cron Style Scheduling significa che le istanze vengono programmate all'inizio di ogni intervallo. Una pianificazione on demand consente di eseguire una pipeline una sola volta, per attivazione. Questo significa che non è necessario clonare o ricreare la pipeline per eseguirla di nuovo. Se si utilizza una pianificazione su richiesta, questa deve essere specificata nell'oggetto predefinito e deve essere l'unica scheduleType specificata per gli oggetti nella pipeline. Per utilizzare le pipeline su richiesta, è sufficiente chiamare l' ActivatePipeline operazione per ogni esecuzione successiva. I valori sono: cron, ondemand e timeseries. | Enumerazione |
Campi Runtime | Descrizione | Tipo di slot |
---|---|---|
@activeInstances | Elenco di oggetti di istanze attive attualmente programmate. | Oggetto di riferimento, ad esempio "activeInstances«: {" ref»:» myRunnableObject Id "} |
@actualEndTime | L'ora in cui è terminata l'esecuzione di questo oggetto. | DateTime |
@actualStartTime | L'ora in cui è stata avviata l'esecuzione di questo oggetto. | DateTime |
cancellationReason | Il cancellationReason se questo oggetto è stato annullato. | Stringa |
@cascadeFailedOn | Descrizione della catena di dipendenza che ha generato l'errore dell'oggetto. | Oggetto di riferimento, ad esempio "cascadeFailedOn«: {" ref»:» myRunnableObject Id "} |
emrStepLog | EMRi registri dei passaggi sono disponibili solo in caso di tentativi di EMR attività | Stringa |
errorId | Il errorId se questo oggetto ha fallito. | Stringa |
errorMessage | Il errorMessage se questo oggetto ha avuto esito negativo. | Stringa |
errorStackTrace | Traccia dello stack di errore se l'oggetto non è riuscito. | Stringa |
@finishedTime | L'ora in cui è terminata l'esecuzione di questo oggetto. | DateTime |
hadoopJobLog | I log dei job Hadoop sono disponibili per EMR i tentativi di attività basate su di esse. | Stringa |
@healthStatus | Lo stato di integrità dell'oggetto che riflette l'esito positivo o negativo dell'ultima istanza dell'oggetto che ha raggiunto lo stato di un'istanza terminata. | Stringa |
@healthStatusFromInstanceId | Id dell'ultimo oggetto dell'istanza che ha raggiunto lo stato terminato. | Stringa |
@ Ora healthStatusUpdated | L'ora in cui lo stato di integrità è stato aggiornato l'ultima volta. | DateTime |
hostname | Il nome host del client che si è aggiudicato il tentativo dell'attività. | Stringa |
@lastDeactivatedTime | L'ora in cui l'oggetto è stato disattivato. | DateTime |
@ latestCompletedRun Ora | L'orario dell'esecuzione più recente durante il quale l'esecuzione è stata completata. | DateTime |
@latestRunTime | L'orario dell'esecuzione più recente durante il quale l'esecuzione è stata pianificata. | DateTime |
@nextRunTime | L'orario dell'esecuzione da programmare come successiva. | DateTime |
reportProgressTime | Il periodo di tempo più recente in cui l'attività remota ha segnalato un progresso. | DateTime |
@scheduledEndTime | L'orario di termine della pianificazione per un oggetto | DateTime |
@scheduledStartTime | L'orario di inizio della pianificazione per l'oggetto | DateTime |
@status | Lo stato di questo oggetto. | Stringa |
@version | Versione della pipeline con cui l'oggetto è stato creato. | Stringa |
@waitingOn | Descrizione dell'elenco di dipendenze per cui questo oggetto è in attesa. | Oggetto di riferimento, ad esempio "waitingOn«: {" ref»:» myRunnableObject Id "} |
Campi di sistema | Descrizione | Tipo di slot |
---|---|---|
@error | Errore che descrive il formato oggetto errato. | Stringa |
@pipelineId | L'id della pipeline a cui appartiene questo oggetto. | Stringa |
@sphere | La sfera di un oggetto indica la propria posizione nel ciclo di vita: i Component Objects generano Instance Objects che eseguono Attempt Objects. | Stringa |