PigActivity - AWS Data Pipeline

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

PigActivity

PigActivity fournit un support natif pour les scripts Pig AWS Data Pipeline sans qu'il soit nécessaire d'utiliser ShellCommandActivity ouEmrActivity. En outre, PigActivity prend en charge le transfert des données. Lorsque le champ « stage » est défini sur true, AWS Data Pipeline prépare les données d'entrée en tant que schéma dans Pig, sans code supplémentaire de l'utilisateur.

Exemple

L'exemple de pipeline suivant montre comment utiliser PigActivity. L'exemple de pipeline effectue les étapes suivantes :

  • MyPigActivity1 charge des données depuis Amazon S3 et exécute un script Pig qui sélectionne quelques colonnes de données et les télécharge sur Amazon S3.

  • MyPigActivity2 charge la première sortie, sélectionne quelques colonnes et trois lignes de données, puis la télécharge sur Amazon S3 en tant que deuxième sortie.

  • MyPigActivity3 charge les deuxièmes données de sortie, insère deux lignes de données et uniquement la colonne nommée « cinquième » sur Amazon RDS.

  • MyPigActivity4 charge les données Amazon RDS, sélectionne la première ligne de données et les télécharge sur Amazon S3.

{ "objects": [ { "id": "MyInputData1", "schedule": { "ref": "MyEmrResourcePeriod" }, "directoryPath": "s3://example-bucket/pigTestInput", "name": "MyInputData1", "dataFormat": { "ref": "MyInputDataType1" }, "type": "S3DataNode" }, { "id": "MyPigActivity4", "scheduleType": "CRON", "schedule": { "ref": "MyEmrResourcePeriod" }, "input": { "ref": "MyOutputData3" }, "pipelineLogUri": "s3://example-bucket/path/", "name": "MyPigActivity4", "runsOn": { "ref": "MyEmrResource" }, "type": "PigActivity", "dependsOn": { "ref": "MyPigActivity3" }, "output": { "ref": "MyOutputData4" }, "script": "B = LIMIT ${input1} 1; ${output1} = FOREACH B GENERATE one;", "stage": "true" }, { "id": "MyPigActivity3", "scheduleType": "CRON", "schedule": { "ref": "MyEmrResourcePeriod" }, "input": { "ref": "MyOutputData2" }, "pipelineLogUri": "s3://example-bucket/path", "name": "MyPigActivity3", "runsOn": { "ref": "MyEmrResource" }, "script": "B = LIMIT ${input1} 2; ${output1} = FOREACH B GENERATE Fifth;", "type": "PigActivity", "dependsOn": { "ref": "MyPigActivity2" }, "output": { "ref": "MyOutputData3" }, "stage": "true" }, { "id": "MyOutputData2", "schedule": { "ref": "MyEmrResourcePeriod" }, "name": "MyOutputData2", "directoryPath": "s3://example-bucket/PigActivityOutput2", "dataFormat": { "ref": "MyOutputDataType2" }, "type": "S3DataNode" }, { "id": "MyOutputData1", "schedule": { "ref": "MyEmrResourcePeriod" }, "name": "MyOutputData1", "directoryPath": "s3://example-bucket/PigActivityOutput1", "dataFormat": { "ref": "MyOutputDataType1" }, "type": "S3DataNode" }, { "id": "MyInputDataType1", "name": "MyInputDataType1", "column": [ "First STRING", "Second STRING", "Third STRING", "Fourth STRING", "Fifth STRING", "Sixth STRING", "Seventh STRING", "Eighth STRING", "Ninth STRING", "Tenth STRING" ], "inputRegEx": "^(\\\\S+) (\\\\S+) (\\\\S+) (\\\\S+) (\\\\S+) (\\\\S+) (\\\\S+) (\\\\S+) (\\\\S+) (\\\\S+)", "type": "RegEx" }, { "id": "MyEmrResource", "region": "us-east-1", "schedule": { "ref": "MyEmrResourcePeriod" }, "keyPair": "example-keypair", "masterInstanceType": "m1.small", "enableDebugging": "true", "name": "MyEmrResource", "actionOnTaskFailure": "continue", "type": "EmrCluster" }, { "id": "MyOutputDataType4", "name": "MyOutputDataType4", "column": "one STRING", "type": "CSV" }, { "id": "MyOutputData4", "schedule": { "ref": "MyEmrResourcePeriod" }, "directoryPath": "s3://example-bucket/PigActivityOutput3", "name": "MyOutputData4", "dataFormat": { "ref": "MyOutputDataType4" }, "type": "S3DataNode" }, { "id": "MyOutputDataType1", "name": "MyOutputDataType1", "column": [ "First STRING", "Second STRING", "Third STRING", "Fourth STRING", "Fifth STRING", "Sixth STRING", "Seventh STRING", "Eighth STRING" ], "columnSeparator": "*", "type": "Custom" }, { "id": "MyOutputData3", "username": "___", "schedule": { "ref": "MyEmrResourcePeriod" }, "insertQuery": "insert into #{table} (one) values (?)", "name": "MyOutputData3", "*password": "___", "runsOn": { "ref": "MyEmrResource" }, "connectionString": "jdbc:mysql://example-database-instance:3306/example-database", "selectQuery": "select * from #{table}", "table": "example-table-name", "type": "MySqlDataNode" }, { "id": "MyOutputDataType2", "name": "MyOutputDataType2", "column": [ "Third STRING", "Fourth STRING", "Fifth STRING", "Sixth STRING", "Seventh STRING", "Eighth STRING" ], "type": "TSV" }, { "id": "MyPigActivity2", "scheduleType": "CRON", "schedule": { "ref": "MyEmrResourcePeriod" }, "input": { "ref": "MyOutputData1" }, "pipelineLogUri": "s3://example-bucket/path", "name": "MyPigActivity2", "runsOn": { "ref": "MyEmrResource" }, "dependsOn": { "ref": "MyPigActivity1" }, "type": "PigActivity", "script": "B = LIMIT ${input1} 3; ${output1} = FOREACH B GENERATE Third, Fourth, Fifth, Sixth, Seventh, Eighth;", "output": { "ref": "MyOutputData2" }, "stage": "true" }, { "id": "MyEmrResourcePeriod", "startDateTime": "2013-05-20T00:00:00", "name": "MyEmrResourcePeriod", "period": "1 day", "type": "Schedule", "endDateTime": "2013-05-21T00:00:00" }, { "id": "MyPigActivity1", "scheduleType": "CRON", "schedule": { "ref": "MyEmrResourcePeriod" }, "input": { "ref": "MyInputData1" }, "pipelineLogUri": "s3://example-bucket/path", "scriptUri": "s3://example-bucket/script/pigTestScipt.q", "name": "MyPigActivity1", "runsOn": { "ref": "MyEmrResource" }, "scriptVariable": [ "column1=First", "column2=Second", "three=3" ], "type": "PigActivity", "output": { "ref": "MyOutputData1" }, "stage": "true" } ] }

Le contenu de pigTestScript.q est le suivant.

B = LIMIT ${input1} $three; ${output1} = FOREACH B GENERATE $column1, $column2, Third, Fourth, Fifth, Sixth, Seventh, Eighth;

Syntaxe

Champs d'invocation de l'objet Description Type d'option
schedule Cet objet est appelé dans le cadre de l'exécution d'un intervalle de planification. Les utilisateurs doivent spécifier une référence de planification à un autre objet pour définir l'ordre d'exécution des dépendances de l'objet. Les utilisateurs peuvent satisfaire à cette exigence en définissant explicitement un calendrier sur l'objet, par exemple en spécifiant « schedule » : {"ref » : "DefaultSchedule«}. Dans la plupart des cas, il est préférable de placer la planification de référence sur l'objet de pipeline par défaut de manière à ce que tous les objets héritent cette planification. Ou, si le pipeline dispose d'une arborescence de planifications (planifications au sein de la planification maître), les utilisateurs peuvent créer un objet parent ayant une référence de planification. Pour plus d'informations sur les exemples de configurations de planification facultatives, consultez https://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-object-schedule.html. Objet de référence, par exemple, « schedule » : {"ref » : » myScheduleId «}

Groupe obligatoire (l'un des groupes suivants est obligatoire) Description Type d'option
script Script Pig à exécuter. Chaîne
scriptUri Emplacement du script Pig à exécuter (par exemple, s3://scriptLocation). Chaîne

Groupe obligatoire (l'un des groupes suivants est obligatoire) Description Type d'option
runsOn Cluster EMR sur lequel cela s' PigActivity exécute. Objet de référence, par exemple, « RunSon » : {"ref » : » myEmrCluster Id "}
workerGroup Groupe de travail. Utilisé pour les tâches d'acheminement. Si vous fournissez une valeur runsOn et que workerGroup existe, workerGroup est ignoré. Chaîne

Champs facultatifs Description Type d'option
attemptStatus État de l'activité à distance le plus récemment rapporté. Chaîne
attemptTimeout Délai d'achèvement de la tâche à distance. Si une valeur est définie, une activité à distance qui n'est pas exécutée dans le cadre de la période de départ définie peut être retentée. Période
dependsOn Spécifie la dépendance sur un autre objet exécutable. Objet de référence, par exemple, « DependsOn » : {"ref » : » myActivityId «}
failureAndRerunMode Décrit le comportement du nœud de consommateurs lorsque les dépendances échouent ou sont à nouveau exécutées. Énumération
input Source de données d'entrée. Objet de référence, par exemple, « input » : {"ref » : » myDataNode Id "}
lateAfterTimeout Temps écoulé après le début du pipeline pendant lequel l'objet doit être terminé. Il est déclenché uniquement lorsque le type de planification n'est pas défini surondemand. Période
maxActiveInstances Nombre maximal d'instances actives simultanées d'un composant. Les réexécutions ne sont pas comptabilisées dans le nombre d'instances actives. Entier
maximumRetries Nombre maximal de nouvelles tentatives en cas d'échec. Entier
onFail Action à exécuter en cas d'échec de l'objet actuel. Objet de référence, par exemple, « onFail » : {"ref » : » myActionId «}
onLateAction Actions à déclencher si un objet n'a pas encore été planifié ou n'est toujours pas terminé. Objet de référence, par exemple, "onLateAction« : {" ref » : » myActionId «}
onSuccess Action à exécuter en cas de réussite de l'objet actuel. Objet de référence, par exemple, « onSuccess » : {"ref » : » myActionId «}
output Source de données de sortie. Objet de référence, par exemple, « output » : {"ref » : » myDataNode Id "}
parent Parent de l'objet actuel à partir duquel les emplacements sont hérités. Objet de référence, par exemple, « parent » : {"ref » : » myBaseObject Id "}
pipelineLogUri L'URI Amazon S3 (tel que 's3 ://BucketName/Key/ ') pour le téléchargement des journaux pour le pipeline. Chaîne
postActivityTaskConfig Script de configuration de post-activité à exécuter. Il s'agit d'un URI du script shell dans Amazon S33 et d'une liste d'arguments. Objet de référence, par exemple, "postActivityTaskConfig » : {"ref » : » myShellScript ConfigId «}
preActivityTaskConfig Script de configuration de pré-activité à exécuter. Se compose d'un URI du script shell dans Amazon S3 et d'une liste d'arguments. Objet de référence, par exemple, "preActivityTaskConfig » : {"ref » : » myShellScript ConfigId «}
precondition Définit une condition préalable facultative. Un nœud de données n'est pas marqué « READY » tant que toutes les conditions préalables ne sont pas remplies. Objet de référence, par exemple, « précondition » : {"ref » : » myPreconditionId «}
reportProgressTimeout Délai pour les appels successifs de travail à distance adressés à reportProgress. Si une valeur est définie, les activités à distance qui ne font pas état d'avancement pour la période spécifiée doivent être considérées comme bloquées et, par conséquent, retentées. Période
resizeClusterBeforeCourir Redimensionnez le cluster avant d'effectuer cette activité pour prendre en charge les nœuds de données DynamoDB spécifiés en entrée ou en sortie.
Note

Si votre activité utilise un DynamoDBDataNode comme nœud de données d'entrée ou de sortie, et si vous définissez le resizeClusterBeforeRunning toTRUE, AWS Data Pipeline commence à utiliser des types d'm3.xlargeinstance. Vos choix de type d'instance sont alors remplacés par m3.xlarge, ce qui peut accroître vos coûts mensuels.

Booléen
resizeClusterMaxInstances Limite du nombre maximal d'instances qui peuvent être demandées par l'algorithme de redimensionnement. Entier
retryDelay Délai entre deux nouvelles tentatives. Période
scheduleType Le type de planification vous permet de spécifier si les objets de votre définition de pipeline doivent être planifiés au début ou à la fin de l'intervalle. Dans la planification de type séries chronologiques, les instances sont planifiées à la fin de chaque intervalle et dans la planification de type cron, les instances sont planifiées au début de chaque intervalle. Une planification à la demande vous permet d'exécuter un pipeline une fois par activation. Cela signifie que vous n'avez pas à cloner ou à recréer le pipeline pour l'exécuter à nouveau. Si vous utilisez une planification à la demande, elle doit être spécifiée dans l'objet par défaut et être le seul scheduleType pour les objets du pipeline. Pour utiliser des pipelines à la demande, il suffit d'appeler l' ActivatePipeline opération pour chaque exécution suivante. Les valeurs sont : cron, ondemand et timeseries (cron, à la demande et séries chronologiques). Énumération
scriptVariable Arguments à transmettre au script Pig. Vous pouvez utiliser scriptVariable avec script ou scriptUri. Chaîne
stage Détermine si la gestion intermédiaire est activée et permet à votre script Pig d'avoir accès aux tables de données mises en lots, telles que ${INPUT1} et ${OUTPUT1}. Booléen

Champs liés à l'exécution Description Type d'option
@activeInstances Liste des objets d'instances actives actuellement planifiés. Objet de référence, par exemple, « ActiveInstances » : {"ref » : » myRunnableObject Id "}
@actualEndTime Heure à laquelle l'exécution de l'objet s'est terminée. DateTime
@actualStartTime Heure à laquelle l'exécution de l'objet a démarré. DateTime
cancellationReason Motif de l'annulation si l'objet a été annulé. Chaîne
@cascadeFailedOn Description de la chaîne de dépendances sur laquelle l'objet a échoué. Objet de référence, par exemple, "cascadeFailedOn« : {" ref » : » myRunnableObject Id "}
emrStepLog Les journaux d'étapes Amazon EMR sont disponibles uniquement pour les tentatives d'activité EMR. Chaîne
errorId ID de l'erreur si l'objet a échoué. Chaîne
errorMessage errorMessage si l'objet a échoué. Chaîne
errorStackTrace Suivi de la pile d'erreurs si l'objet a échoué. Chaîne
@finishedTime Heure à laquelle l'objet a terminé son exécution. DateTime
hadoopJobLog Journaux de travail Hadoop disponibles sur les tentatives pour les activités EMR. Chaîne
@healthStatus État de santé de l'objet qui reflète la réussite ou l'échec de la dernière instance qui a atteint un état résilié. Chaîne
@healthStatusFromInstanceId ID du dernier objet d'instance qui atteint un état résilié. Chaîne
@ healthStatusUpdated Heure Heure à laquelle l'état de santé a été mis à jour pour la dernière fois. DateTime
hostname Nom d'hôte du client qui a sélectionné la tentative de tâche. Chaîne
@lastDeactivatedTime Heure à laquelle l'objet a été désactivé pour la dernière fois. DateTime
@ latestCompletedRun Heure Heure de la dernière exécution pour laquelle l'exécution s'est terminée. DateTime
@latestRunTime Heure de la dernière exécution pour laquelle l'exécution a été planifiée. DateTime
@nextRunTime Prochaine heure d'exécution planifiée. DateTime
reportProgressTime Heure la plus récente pour laquelle l'activité distante a signalé une progression. DateTime
@scheduledEndTime Heure de fin planifiée pour l'objet. DateTime
@scheduledStartTime Heure de début planifiée pour l'objet. DateTime
@État État de l'objet. Chaîne
@Version Version du pipeline avec laquelle l'objet a été créé. Chaîne
@waitingOn Description de la liste des dépendances sur laquelle l'objet est en attente. Objet de référence, par exemple, « WaitingOn » : {"ref » : » myRunnableObject Id "}

Champs système Description Type d'option
@error Erreur décrivant l'objet mal formé. Chaîne
@pipelineId ID du pipeline auquel l'objet appartient. Chaîne
@sphere La sphère d'un objet désigne sa place dans le cycle de vie : les objets « composant » entraînent les objets « instance » qui exécutent les objets « tentative ». Chaîne

consultez aussi