AWS Data Pipeline ist für Neukunden nicht mehr verfügbar. Bestehende Kunden von AWS Data Pipeline können den Service weiterhin wie gewohnt nutzen. Weitere Informationen
Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.
PigActivity
PigActivity bietet native Unterstützung für Pig-Skripte, AWS Data Pipeline ohne dass die Verwendung von ShellCommandActivity
oder erforderlich istEmrActivity
. PigActivity Unterstützt außerdem Daten-Staging. Wenn das Stage-Feld auf „true“ festgelegt wurde, arrangiert AWS Data Pipeline die Eingabedaten ohne zusätzlichen Code des Benutzers als Schema in Pig.
Beispiel
Im folgenden Pipeline-Beispiel wird gezeigt, wie PigActivity
verwendet wird. Die Beispiel-Pipeline führt die folgenden Schritte aus:
-
MyPigActivity1 lädt Daten aus Amazon S3 und führt ein Pig-Skript aus, das einige Datenspalten auswählt und sie auf Amazon S3 hochlädt.
-
MyPigActivity2 lädt die erste Ausgabe, wählt einige Spalten und drei Datenzeilen aus und lädt sie als zweite Ausgabe auf Amazon S3 hoch.
-
MyPigActivity3 lädt die zweiten Ausgabedaten, fügt zwei Datenzeilen und nur die Spalte mit dem Namen „Fifth“ nach Amazon einRDS.
-
MyPigActivity4 lädt RDS Amazon-Daten, wählt die erste Datenzeile aus und lädt sie auf Amazon S3 hoch.
{ "objects": [ { "id": "MyInputData1", "schedule": { "ref": "MyEmrResourcePeriod" }, "directoryPath": "s3://
example-bucket
/pigTestInput", "name": "MyInputData1", "dataFormat": { "ref": "MyInputDataType1" }, "type": "S3DataNode" }, { "id": "MyPigActivity4", "scheduleType": "CRON", "schedule": { "ref": "MyEmrResourcePeriod" }, "input": { "ref": "MyOutputData3" }, "pipelineLogUri": "s3://example-bucket/path
/", "name": "MyPigActivity4", "runsOn": { "ref": "MyEmrResource" }, "type": "PigActivity", "dependsOn": { "ref": "MyPigActivity3" }, "output": { "ref": "MyOutputData4" }, "script": "B = LIMIT ${input1} 1; ${output1} = FOREACH B GENERATE one;", "stage": "true" }, { "id": "MyPigActivity3", "scheduleType": "CRON", "schedule": { "ref": "MyEmrResourcePeriod" }, "input": { "ref": "MyOutputData2" }, "pipelineLogUri": "s3://example-bucket/path
", "name": "MyPigActivity3", "runsOn": { "ref": "MyEmrResource" }, "script": "B = LIMIT ${input1} 2; ${output1} = FOREACH B GENERATE Fifth;", "type": "PigActivity", "dependsOn": { "ref": "MyPigActivity2" }, "output": { "ref": "MyOutputData3" }, "stage": "true" }, { "id": "MyOutputData2", "schedule": { "ref": "MyEmrResourcePeriod" }, "name": "MyOutputData2", "directoryPath": "s3://example-bucket
/PigActivityOutput2", "dataFormat": { "ref": "MyOutputDataType2" }, "type": "S3DataNode" }, { "id": "MyOutputData1", "schedule": { "ref": "MyEmrResourcePeriod" }, "name": "MyOutputData1", "directoryPath": "s3://example-bucket
/PigActivityOutput1", "dataFormat": { "ref": "MyOutputDataType1" }, "type": "S3DataNode" }, { "id": "MyInputDataType1", "name": "MyInputDataType1", "column": [ "First STRING", "Second STRING", "Third STRING", "Fourth STRING", "Fifth STRING", "Sixth STRING", "Seventh STRING", "Eighth STRING", "Ninth STRING", "Tenth STRING" ], "inputRegEx": "^(\\\\S+) (\\\\S+) (\\\\S+) (\\\\S+) (\\\\S+) (\\\\S+) (\\\\S+) (\\\\S+) (\\\\S+) (\\\\S+)", "type": "RegEx" }, { "id": "MyEmrResource", "region": "us-east-1", "schedule": { "ref": "MyEmrResourcePeriod" }, "keyPair": "example-keypair
", "masterInstanceType": "m1.small", "enableDebugging": "true", "name": "MyEmrResource", "actionOnTaskFailure": "continue", "type": "EmrCluster" }, { "id": "MyOutputDataType4", "name": "MyOutputDataType4", "column": "one STRING", "type": "CSV" }, { "id": "MyOutputData4", "schedule": { "ref": "MyEmrResourcePeriod" }, "directoryPath": "s3://example-bucket
/PigActivityOutput3", "name": "MyOutputData4", "dataFormat": { "ref": "MyOutputDataType4" }, "type": "S3DataNode" }, { "id": "MyOutputDataType1", "name": "MyOutputDataType1", "column": [ "First STRING", "Second STRING", "Third STRING", "Fourth STRING", "Fifth STRING", "Sixth STRING", "Seventh STRING", "Eighth STRING" ], "columnSeparator": "*", "type": "Custom" }, { "id": "MyOutputData3", "username": "___", "schedule": { "ref": "MyEmrResourcePeriod" }, "insertQuery": "insert into #{table} (one) values (?)", "name": "MyOutputData3", "*password": "___", "runsOn": { "ref": "MyEmrResource" }, "connectionString": "jdbc:mysql://example-database-instance
:3306/example-database
", "selectQuery": "select * from #{table}", "table": "example-table-name
", "type": "MySqlDataNode" }, { "id": "MyOutputDataType2", "name": "MyOutputDataType2", "column": [ "Third STRING", "Fourth STRING", "Fifth STRING", "Sixth STRING", "Seventh STRING", "Eighth STRING" ], "type": "TSV" }, { "id": "MyPigActivity2", "scheduleType": "CRON", "schedule": { "ref": "MyEmrResourcePeriod" }, "input": { "ref": "MyOutputData1" }, "pipelineLogUri": "s3://example-bucket/path
", "name": "MyPigActivity2", "runsOn": { "ref": "MyEmrResource" }, "dependsOn": { "ref": "MyPigActivity1" }, "type": "PigActivity", "script": "B = LIMIT ${input1} 3; ${output1} = FOREACH B GENERATE Third, Fourth, Fifth, Sixth, Seventh, Eighth;", "output": { "ref": "MyOutputData2" }, "stage": "true" }, { "id": "MyEmrResourcePeriod", "startDateTime": "2013-05-20T00:00:00", "name": "MyEmrResourcePeriod", "period": "1 day", "type": "Schedule", "endDateTime": "2013-05-21T00:00:00" }, { "id": "MyPigActivity1", "scheduleType": "CRON", "schedule": { "ref": "MyEmrResourcePeriod" }, "input": { "ref": "MyInputData1" }, "pipelineLogUri": "s3://example-bucket/path
", "scriptUri": "s3://example-bucket
/script/pigTestScipt.q", "name": "MyPigActivity1", "runsOn": { "ref": "MyEmrResource" }, "scriptVariable": [ "column1=First", "column2=Second", "three=3" ], "type": "PigActivity", "output": { "ref": "MyOutputData1" }, "stage": "true" } ] }
Der Inhalt von pigTestScript.q
ist wie folgt:
B = LIMIT ${input1} $three; ${output1} = FOREACH B GENERATE $column1, $column2, Third, Fourth, Fifth, Sixth, Seventh, Eighth;
Syntax
Objektaufruf-Felder | Beschreibung | Slot-Typ |
---|---|---|
schedule | Dieses Objekt wird innerhalb der Ausführung eines Zeitplanintervalls aufgerufen. Benutzer müssen einen Zeitplanverweis auf ein anderes Objekt angeben, um die Abhängigkeitsausführungsreihenfolge für dieses Objekt festzulegen. Benutzer können diese Anforderung erfüllen, indem sie explizit einen Zeitplan für das Objekt festlegen, z. B. indem sie „schedule“: {"ref“: "DefaultSchedule„} angeben. In den meisten Fällen ist es besser, den Zeitplanverweis auf das Standard-Pipeline-Objekt zu setzen, damit alle Objekte diesen Zeitplan erben. Wenn die Pipeline über einen Baum mit Zeitplänen verfügt (Zeitpläne innerhalb des Hauptplans), können Benutzer ein übergeordnetes Objekt mit Zeitplänenreferenz erstellen. Weitere Informationen zu optionalen Beispiel-Zeitplankonfigurationen finden Sie unter https://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-object-schedule.html. | Referenzobjekt, zum Beispiel „schedule“: {"ref“:“ myScheduleId „} |
Erforderliche Gruppe (mindestens eine der folgenden ist erforderlich) | Beschreibung | Slot-Typ |
---|---|---|
script | Das auszuführende Pig-Skript. | String |
scriptUri | Der Speicherort des Pig-Skripts, das ausgeführt werden soll (z. B. s3://scriptLocation). | String |
Erforderliche Gruppe (mindestens eine der folgenden ist erforderlich) | Beschreibung | Slot-Typ |
---|---|---|
runsOn | EMRCluster, auf dem das PigActivity ausgeführt wird. | Referenzobjekt, zum Beispiel "runsOn„: {" ref“:“ myEmrCluster Id "} |
workerGroup | Die Auftragnehmergruppe. Dies wird für Routing-Aufgaben verwendet. Wenn Sie einen runsOn -Wert angeben und workerGroup vorhanden ist, wird ignoriert.workerGroup |
String |
Optionale Felder | Beschreibung | Slot-Typ |
---|---|---|
attemptStatus | Der zuletzt gemeldete Status von der Remote-Aktivität. | String |
attemptTimeout | Das Timeout für die Fertigstellung der Remote-Arbeit. Wenn diese Option aktiviert ist, kann eine Remote-Aktivität, die nicht innerhalb der festgelegten Startzeit abgeschlossen wird, wiederholt werden. | Intervall |
dependsOn | Gibt die Abhängigkeit von einem anderen ausführbaren Objekt an. | Referenzobjekt, zum Beispiel "dependsOn„: {" ref“:“ myActivityId „} |
failureAndRerunModus | Beschreibt das Verhalten des Konsumentenknotens, wenn Abhängigkeiten fehlschlagen oder erneut ausgeführt werden. | Aufzählung |
input | Die Eingangsdatenquelle. | Referenzobjekt, zum Beispiel „input“: {"ref“:“ myDataNode Id "} |
lateAfterTimeout | Die verstrichene Zeit nach dem Start der Pipeline, innerhalb derer das Objekt abgeschlossen werden muss. Sie wird nur ausgelöst, wenn der Zeitplantyp nicht auf eingestellt ist. ondemand |
Intervall |
maxActiveInstances | Die maximale Anzahl gleichzeitiger aktiver Instances einer Komponente. Wiederholungen zählen nicht zur Anzahl der aktiven Instances. | Ganzzahl |
maximumRetries | Die maximale Anzahl von Versuchen bei Ausfällen. | Ganzzahl |
onFail | Eine Aktion, die ausgeführt werden soll, wenn das aktuelle Objekt fehlschlägt. | Referenzobjekt, zum Beispiel "onFail„: {" ref“:“ myActionId „} |
onLateAction | Aktionen, die ausgelöst werden sollen, wenn ein Objekt noch nicht geplant oder noch nicht abgeschlossen wurde. | Referenzobjekt, zum Beispiel "onLateAction„: {" ref“:“ myActionId „} |
onSuccess | Eine Aktion, die ausgeführt wird, wenn das aktuelle Objekt erfolgreich ist. | Referenzobjekt, zum Beispiel "onSuccess„: {" ref“:“ myActionId „} |
output | Die Eingangsdatenquelle. | Referenzobjekt, zum Beispiel „output“: {"ref“:“ myDataNode Id "} |
übergeordneter | Übergeordnetes Objekt des aktuellen Objekts, aus dem Slots übernommen werden. | Referenzobjekt, zum Beispiel „parent“: {"ref“:“ myBaseObject Id "} |
pipelineLogUri | Amazon S3 URI (z. B. 's3://BucketName/Key/ ') zum Hochladen von Protokollen für die Pipeline. | String |
postActivityTaskConfig | Post-Activity-Konfigurationsskript, das ausgeführt werden soll. Dies besteht URI aus einem Shell-Skript in Amazon S33 und einer Liste von Argumenten. | Referenzobjekt, zum Beispiel "postActivityTaskConfig“: {"ref“:“ myShellScript ConfigId „} |
preActivityTaskConfig | Pre-Activity-Konfigurationsskript, das ausgeführt werden soll. Dies besteht URI aus einem Shell-Skript in Amazon S3 und einer Liste von Argumenten. | Referenzobjekt, zum Beispiel "preActivityTaskConfig“: {"ref“:“ myShellScript ConfigId „} |
precondition | Legen Sie optional eine Vorbedingung fest. Ein Datenknoten wird erst mit "READY" markiert, wenn alle Voraussetzungen erfüllt sind. | Referenzobjekt, zum Beispiel „precondition“: {"ref“:“ myPreconditionId „} |
reportProgressTimeout | Das Timeout für aufeinanderfolgende Aufrufe von reportProgress durch Remote-Arbeit. Wenn diese Option aktiviert ist, werden Remote-Aktivitäten, die den Fortschritt für den angegebenen Zeitraum nicht melden, als fehlgeschlagen angesehen und es wird erneut versucht. |
Intervall |
resizeClusterBeforeWird ausgeführt | Ändern Sie die Größe des Clusters, bevor Sie diese Aktivität ausführen, um DynamoDB-Datenknoten aufzunehmen, die als Eingaben oder Ausgaben angegeben sind.AnmerkungWenn Ihre Aktivität a entweder |
Boolesch |
resizeClusterMaxInstanzen | Ein Limit für die maximale Anzahl von Instances, die vom Resize-Algorithmus angefordert werden können. | Ganzzahl |
retryDelay | Die Zeitüberschreitungsdauer zwischen zwei Wiederholungsversuchen. | Intervall |
scheduleType | Mit dem Zeitplantyp können Sie angeben, ob die Objekte in Ihrer Pipeline-Definition am Anfang des Intervalls oder am Ende des Intervalls geplant werden sollen. Zeitreihenstilplanung bedeutet, dass Instances am Ende jedes Intervalls geplant werden und Cron-Stil-Planung bedeutet, dass Instances zu Beginn jedes Intervalls geplant werden. Ein On-Demand-Zeitplan ermöglicht es Ihnen, eine Pipeline einmal pro Aktivierung auszuführen. Dies bedeutet, dass Sie die Pipeline nicht klonen oder neu erstellen müssen, um sie erneut auszuführen. Wenn Sie einen On-Demand-Zeitplan verwenden, muss dieser im Standardobjekt angegeben werden und darf das einzige Objekt sein, das für Objekte in der Pipeline scheduleType angegeben wird. Um On-Demand-Pipelines zu verwenden, rufen Sie den ActivatePipeline Vorgang einfach für jeden nachfolgenden Lauf auf. Die Werte sind: cron, ondemand und timeseries. | Aufzählung |
scriptVariable | Die Argumente, die an das Pig-Skript übergeben werden sollen. Sie können scriptVariable mit Skript oder scriptUri verwenden. | String |
stage | Bestimmt, ob das Staging aktiviert ist, und ermöglicht Ihrem Pig-Skript den Zugriff auf die Staging-Datentabellen wie $ {INPUT1} und $ {}. OUTPUT1 | Boolesch |
Laufzeitfelder | Beschreibung | Slot-Typ |
---|---|---|
@activeInstances | Liste der aktuell geplanten aktiven Instance-Objekte. | Referenzobjekt, zum Beispiel "activeInstances„: {" ref“:“ Id "} myRunnableObject |
@actualEndTime | Zeitpunkt, zu dem die Ausführung dieses Objekts abgeschlossen wurde. | DateTime |
@actualStartTime | Zeitpunkt, zu dem die Ausführung dieses Objekts gestartet wurde. | DateTime |
cancellationReason | cancellationReason Wenn dieses Objekt storniert wurde. | String |
@cascadeFailedOn | Beschreibung der Abhängigkeitskette, bei der das Objekt fehlgeschlagen ist. | Referenzobjekt, zum Beispiel "cascadeFailedOn„: {" ref“:“ myRunnableObject Id "} |
emrStepLog | EMRAmazon-Schrittprotokolle sind nur bei EMR Aktivitätsversuchen verfügbar. | String |
errorId | errorId Wenn dieses Objekt fehlgeschlagen ist. | String |
errorMessage | Das ist errorMessage , wenn dieses Objekt fehlgeschlagen ist. | String |
errorStackTrace | Die Fehler-Stack-Ablaufverfolgung., wenn dieses Objekt fehlgeschlagen ist. | String |
@finishedTime | Der Zeitpunkt, zu der dieses Objekt seine Ausführung beendet hat. | DateTime |
hadoopJobLog | Hadoop-Jobprotokolle sind bei Versuchen für EMR basierte Aktivitäten verfügbar. | String |
@healthStatus | Der Integritätsstatus des Objekts, der Erfolg oder Misserfolg der letzten Objekt-Instance widerspiegelt, die einen beendeten Zustand erreicht hat. | String |
@healthStatusFromInstanceId | Id des Objekts der letzten Instance, das einen beendeten Zustand erreicht hat. | String |
@ Zeit healthStatusUpdated | Zeitpunkt, zu dem der Servicestatus beim letzten Mal aktualisiert wurde. | DateTime |
hostname | Der Hostname des Clients, der den Aufgabenversuch aufnimmt. | String |
@lastDeactivatedTime | Zeitpunkt, zu dem dieses Objekt zuletzt deaktiviert wurde. | DateTime |
@ latestCompletedRun Zeit | Zeitpunkt des letzten Laufs, für den die Ausführung abgeschlossen wurde. | DateTime |
@latestRunTime | Zeitpunkt des letzten Laufs, für den die Ausführung geplant war. | DateTime |
@nextRunTime | Zeitpunkt des Laufs, der als nächstes geplant werden soll | DateTime |
reportProgressTime | Der letzte Zeitpunkt, an dem die Remote-Aktivität einen Fortschritt gemeldet hat. | DateTime |
@scheduledEndTime | Endzeit für das Objekt einplanen. | DateTime |
@scheduledStartTime | Startzeit für das Objekt einplanen. | DateTime |
@Status | Der Status des Objekts. | String |
@Version | Pipeline-Version, mit der das Objekt erstellt wurde | String |
@waitingOn | Beschreibung der Liste der Abhängigkeiten, auf die dieses Objekt wartet. | Referenzobjekt, zum Beispiel "waitingOn„: {" ref“:“ myRunnableObject Id "} |
Systemfelder | Beschreibung | Slot-Typ |
---|---|---|
@error | Fehler mit einer Beschreibung des falsch formatierten Objekts. | String |
@pipelineId | ID der Pipeline, zu der dieses Objekt gehört. | String |
@sphere | Die Kugel eines Objekts bezeichnet seinen Platz im Lebenszyklus: Komponentenobjekte ergeben Instance-Objekte, die Versuchsobjekte ausführen. | String |