AWS Data Pipeline n'est plus disponible pour les nouveaux clients. Les clients existants de AWS Data Pipeline peut continuer à utiliser le service normalement. En savoir plus
Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.
Copie intermédiaire des données et des tables avec les activités de pipeline
AWS Data Pipeline peut effectuer une copie intermédiaire des données entrantes et sortantes dans vos pipelines pour faciliter l'utilisation de certaines activités, telles que ShellCommandActivity
et HiveActivity
.
Une copie intermédiaire vous permet de copier des données du nœud d'entrée vers la ressource exécutant l'activité et, de la même manière, de la ressource vers le nœud de sortie.
Les données intermédiaires sur la ressource Amazon EMR ou Amazon EC2 sont disponibles à l'aide de variables spéciales dans les commandes shell ou les scripts Hive de l'activité.
La copie intermédiaire de tables est similaire au déploiement de données, à la différence près que les données ayant fait l'objet d'une copie intermédiaire prennent la forme de tables de base de données, tout spécialement.
AWS Data Pipeline prend en charge les scénarios de copie intermédiaire suivants :
-
Copie intermédiaire de données avec
ShellCommandActivity
-
Copie intermédiaire de tables avec Hive et nœuds de données pris en charge par la copie intermédiaire
-
Copie intermédiaire de tables avec Hive et nœuds de données non pris en charge par la copie intermédiaire
Note
La copie intermédiaire fonctionne uniquement lorsque le champ stage
a la valeur true
sur une activité, telle que ShellCommandActivity
. Pour plus d'informations, veuillez consulter ShellCommandActivity.
En outre, les nœuds de données et les activités peuvent avoir des liens de quatre manières :
- Copie intermédiaire de données localement sur une ressource
-
Les données d'entrée sont automatiquement copiées dans le système de fichiers local des ressources. Les données de sortie sont automatiquement copiées du système de fichiers local des ressources vers le nœud de données de sortie. Par exemple, lorsque vous configurez des entrées et des sorties
ShellCommandActivity
avec la copie intermédiaire = true, les données d'entrée sont disponibles en tant qu'INPUTx_STAGING_DIR et les données de sortie sont disponibles en tant qu'OUTPUTx_STAGING_DIR, où x correspond au nombre d'entrée ou de sortie. - Copie intermédiaire des définitions d'entrée et de sortie pour une activité
-
Le format des données d'entrée (noms des colonnes et noms des tables) est automatiquement copié dans la ressource de l'activité. Par exemple, lorsque vous configurez
HiveActivity
avec la copie intermédiaire = true. Le format de données spécifié sur l'entréeS3DataNode
est utilisé pour effectuer une copie intermédiaire de la définition de table à partir de la table Hive. - Copie intermédiaire non activée
-
Les objets d'entrée et de sortie et leurs champs sont disponibles pour l'activité, mais les données elles-mêmes ne le sont pas. Par exemple,
EmrActivity
par défaut ou lorsque vous configurez d'autres activités avec copie intermédiaire = false. Dans cette configuration, les champs de données sont disponibles pour l'activité afin d'y faire référence à l'aide de la syntaxe des expressions AWS Data Pipeline, et ceci se produit uniquement lorsque la dépendance est satisfaite. Ceci sert à vérifier la dépendance uniquement. Le code de l'activité est responsable de la copie des données de l'entrée vers la ressource exécutant l'activité. - Relation de dépendance entre les objets
-
Il existe une relation de dépendance entre deux objets, ce qui donne lieu à une situation similaire à celle lorsque la copie intermédiaire n'est pas activée. Ceci entraîne un nœud de données ou une activité à agir en tant que condition préalable pour l'exécution d'une autre activité.
Stationnage des données avec ShellCommandActivity
Prenons l'exemple d'un scénario qui utilise une ShellCommandActivity
avec des objets S3DataNode
en tant qu'entrée et sortie de données. AWS Data Pipeline effectue automatiquement une copie intermédiaire des nœuds de données pour les rendre accessibles à la commande shell comme s'ils étaient des dossiers de fichiers locaux en utilisant les variables d'environnement ${INPUT1_STAGING_DIR}
et ${OUTPUT1_STAGING_DIR}
, comme indiqué dans l'exemple suivant. La partie numérique des variables nommées INPUT1_STAGING_DIR
et OUTPUT1_STAGING_DIR
s'incrémente en fonction du nombre de nœuds de données référencés par votre activité.
Note
Ce scénario fonctionne uniquement comme indiqué si vos entrées et sorties de données sont des objets S3DataNode
. De plus, la copie intermédiaire des données de sortie est autorisée uniquement lorsque le directoryPath
est défini sur l'objet S3DataNode
de sortie.
{ "id": "AggregateFiles", "type": "ShellCommandActivity", "stage": "true", "command": "cat ${INPUT1_STAGING_DIR}/part* > ${OUTPUT1_STAGING_DIR}/aggregated.csv", "input": { "ref": "MyInputData" }, "output": { "ref": "MyOutputData" } }, { "id": "MyInputData", "type": "S3DataNode", "schedule": { "ref": "MySchedule" }, "filePath": "s3://my_bucket/source/#{format(@scheduledStartTime,'YYYY-MM-dd_HHmmss')}/items" } }, { "id": "MyOutputData", "type": "S3DataNode", "schedule": { "ref": "MySchedule" }, "directoryPath": "s3://my_bucket/destination/#{format(@scheduledStartTime,'YYYY-MM-dd_HHmmss')}" } }, ...
Copie intermédiaire de tables avec Hive et nœuds de données pris en charge par la copie intermédiaire
Prenons l'exemple d'un scénario qui utilise une HiveActivity
avec des objets S3DataNode
en tant qu'entrée et sortie de données. AWS Data Pipeline effectue automatiquement une copie intermédiaire des nœuds de données pour les rendre accessibles au script Hive comme s'ils étaient des tables Hive en utilisant les variables ${input1}
et ${output1}
, comme indiqué dans l'exemple suivant pour HiveActivity
. La partie numérique des variables nommées input
et output
s'incrémente en fonction du nombre de nœuds de données référencés par votre activité.
Note
Ce scénario fonctionne uniquement comme indiqué si vos entrées et sorties de données sont des objets S3DataNode
ou MySqlDataNode
. La copie intermédiaire de tables n'est pas prise en charge pour DynamoDBDataNode
.
{ "id": "MyHiveActivity", "type": "HiveActivity", "schedule": { "ref": "MySchedule" }, "runsOn": { "ref": "MyEmrResource" }, "input": { "ref": "MyInputData" }, "output": { "ref": "MyOutputData" }, "hiveScript": "INSERT OVERWRITE TABLE ${output1} select * from ${input1};" }, { "id": "MyInputData", "type": "S3DataNode", "schedule": { "ref": "MySchedule" }, "directoryPath": "s3://test-hive/input" } }, { "id": "MyOutputData", "type": "S3DataNode", "schedule": { "ref": "MySchedule" }, "directoryPath": "s3://test-hive/output" } }, ...
Copie intermédiaire de tables avec Hive et nœuds de données non pris en charge par la copie intermédiaire
Prenons l'exemple d'un scénario qui utilise une HiveActivity
avec DynamoDBDataNode
en tant qu'entrée de données et un objet S3DataNode
en tant que sortie. Aucun transfert de données n'est disponible. Vous devez donc d'abord créer la table manuellement dans votre script Hive, en utilisant le nom de la variable #{input.tableName}
pour faire référence à la table DynamoDB. DynamoDBDataNode
Une nomenclature similaire s'applique si la table DynamoDB est la sortie, sauf que vous utilisez une variable. #{output.tableName}
La copie intermédiaire est disponible pour l'objet S3DataNode
de sortie dans cet exemple, vous pouvez donc faire référence au nœud de données de sortie en tant que ${output1}
.
Note
Dans cet exemple, la variable du nom de la table comporte un préfixe # (hachage), car AWS Data Pipeline utilise des expressions pour accéder au tableName
ou au directoryPath
. Pour plus d'informations sur le fonctionnement de l'évaluation des expressions dans AWS Data Pipeline, consultez Evaluation d'expression.
{ "id": "MyHiveActivity", "type": "HiveActivity", "schedule": { "ref": "MySchedule" }, "runsOn": { "ref": "MyEmrResource" }, "input": { "ref": "MyDynamoData" }, "output": { "ref": "MyS3Data" }, "hiveScript": "-- Map DynamoDB Table SET dynamodb.endpoint=dynamodb.us-east-1.amazonaws.com; SET dynamodb.throughput.read.percent = 0.5; CREATE EXTERNAL TABLE dynamodb_table (item map<string,string>) STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' TBLPROPERTIES ("dynamodb.table.name" = "#{input.tableName}"); INSERT OVERWRITE TABLE ${output1} SELECT * FROM dynamodb_table;" }, { "id": "MyDynamoData", "type": "DynamoDBDataNode", "schedule": { "ref": "MySchedule" }, "tableName": "MyDDBTable" }, { "id": "MyS3Data", "type": "S3DataNode", "schedule": { "ref": "MySchedule" }, "directoryPath": "s3://test-hive/output" } }, ...