RedshiftCopyActivity - AWS Data Pipeline

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

RedshiftCopyActivity

Copie les données depuis DynamoDB ou Amazon S3 vers Amazon Redshift. Vous pouvez charger les données dans une nouvelle table ou les fusionner facilement dans une table existante.

Voici une présentation d'un cas d'utilisation dans lequel vous pouvez utiliser RedshiftCopyActivity :

  1. Commencez par utiliser AWS Data Pipeline pour stocker vos données dans Amazon S3.

  2. RedshiftCopyActivityÀ utiliser pour déplacer les données d'Amazon RDS et Amazon EMR vers Amazon Redshift.

    Cela vous permet de charger vos données dans Amazon Redshift où vous pouvez les analyser.

  3. SqlActivityÀ utiliser pour exécuter des requêtes SQL sur les données que vous avez chargées dans Amazon Redshift.

En outre, RedshiftCopyActivity prend en charge un fichier manifeste et vous permet donc d'utiliser un S3DataNode. Pour plus d’informations, consultez S3 DataNode.

Exemple

Voici un exemple de ce type d'objet.

Pour prendre en charge les formats de conversion, cet exemple utilise les paramètres de conversion spéciaux EMPTYASNULL et IGNOREBLANKLINES dans commandOptions. Pour plus d'informations, consultez la section Paramètres de conversion des données dans le manuel Amazon Redshift Database Developer Guide.

{ "id" : "S3ToRedshiftCopyActivity", "type" : "RedshiftCopyActivity", "input" : { "ref": "MyS3DataNode" }, "output" : { "ref": "MyRedshiftDataNode" }, "insertMode" : "KEEP_EXISTING", "schedule" : { "ref": "Hour" }, "runsOn" : { "ref": "MyEc2Resource" }, "commandOptions": ["EMPTYASNULL", "IGNOREBLANKLINES"] }

L'exemple de définition de pipeline suivant illustre une activité qui utilise le mode d'insertion APPEND :

{ "objects": [ { "id": "CSVId1", "name": "DefaultCSV1", "type": "CSV" }, { "id": "RedshiftDatabaseId1", "databaseName": "dbname", "username": "user", "name": "DefaultRedshiftDatabase1", "*password": "password", "type": "RedshiftDatabase", "clusterId": "redshiftclusterId" }, { "id": "Default", "scheduleType": "timeseries", "failureAndRerunMode": "CASCADE", "name": "Default", "role": "DataPipelineDefaultRole", "resourceRole": "DataPipelineDefaultResourceRole" }, { "id": "RedshiftDataNodeId1", "schedule": { "ref": "ScheduleId1" }, "tableName": "orders", "name": "DefaultRedshiftDataNode1", "createTableSql": "create table StructuredLogs (requestBeginTime CHAR(30) PRIMARY KEY DISTKEY SORTKEY, requestEndTime CHAR(30), hostname CHAR(100), requestDate varchar(20));", "type": "RedshiftDataNode", "database": { "ref": "RedshiftDatabaseId1" } }, { "id": "Ec2ResourceId1", "schedule": { "ref": "ScheduleId1" }, "securityGroups": "MySecurityGroup", "name": "DefaultEc2Resource1", "role": "DataPipelineDefaultRole", "logUri": "s3://myLogs", "resourceRole": "DataPipelineDefaultResourceRole", "type": "Ec2Resource" }, { "id": "ScheduleId1", "startDateTime": "yyyy-mm-ddT00:00:00", "name": "DefaultSchedule1", "type": "Schedule", "period": "period", "endDateTime": "yyyy-mm-ddT00:00:00" }, { "id": "S3DataNodeId1", "schedule": { "ref": "ScheduleId1" }, "filePath": "s3://datapipeline-us-east-1/samples/hive-ads-samples.csv", "name": "DefaultS3DataNode1", "dataFormat": { "ref": "CSVId1" }, "type": "S3DataNode" }, { "id": "RedshiftCopyActivityId1", "input": { "ref": "S3DataNodeId1" }, "schedule": { "ref": "ScheduleId1" }, "insertMode": "APPEND", "name": "DefaultRedshiftCopyActivity1", "runsOn": { "ref": "Ec2ResourceId1" }, "type": "RedshiftCopyActivity", "output": { "ref": "RedshiftDataNodeId1" } } ] }

APPENDL'opération ajoute des éléments à une table, quelles que soient les clés primaires ou les clés de tri. Par exemple, si vous avez le tableau suivant, vous pouvez ajouter un enregistrement avec les mêmes valeurs d'ID et d'utilisateur.

ID(PK) USER 1 aaa 2 bbb

Vous pouvez ajouter un enregistrement avec les mêmes valeurs d'ID et d'utilisateur.

ID(PK) USER 1 aaa 2 bbb 1 aaa
Note

Si une opération APPEND est interrompue et retentée, le pipeline de réexécution résultant ajoute potentiellement depuis le début. Comme cela peut entraîner de nouvelles duplications, soyez conscient de ce comportement, en particulier si vous avez une logique qui comptabilise le nombre de lignes.

Pour obtenir un didacticiel, consultez Copier des données vers Amazon Redshift à l'aide de AWS Data Pipeline.

Syntaxe

Champs obligatoires Description Type d'option
insertMode

Détermine AWS Data Pipeline le sort des données préexistantes de la table cible qui chevauchent les lignes des données à charger.

Les valeurs valides sont : KEEP_EXISTING, OVERWRITE_EXISTING, TRUNCATE et APPEND.

KEEP_EXISTING ajoute de nouvelles lignes à la table, en conservant toutes les lignes existantes non modifiées.

KEEP_EXISTING et OVERWRITE_EXISTING utilise les clés primaire, de tri et de distribution pour identifier les lignes entrantes à associer aux lignes existantes. Consultez la section Mise à jour et insertion de nouvelles données dans le manuel Amazon Redshift Database Developer Guide.

TRUNCATE supprime toutes les données de la table de destination avant d'écrire les nouvelles données.

APPEND ajoute tous les enregistrements à la fin de la table Redshift. APPEND ne nécessite aucune clé primaire, de distribution ou de tri, par conséquent, des doublons potentiels peuvent être ajoutés.

Énumération

Champs d'invocation de l'objet Description Type d'option
schedule

Cet objet est appelé dans le cadre de l'exécution d'un intervalle de planification.

Spécifiez une référence de planification à un autre objet pour définir l'ordre d'exécution des dépendances de l'objet.

Dans la plupart des cas, nous vous recommandons de placer la planification de référence sur l'objet de pipeline par défaut de manière à ce que tous les objets héritent cette planification. Vous pouvez, par exemple, définir explicitement une planification sur l'objet en spécifiant "schedule": {"ref": "DefaultSchedule"}.

Si la planification maître de votre pipeline contient des planifications imbriquées, créez un objet parent ayant une référence de planification.

Pour obtenir des exemples de configurations de planification facultatives, consultez la section Planification.

Objet de référence, tel que : "schedule":{"ref":"myScheduleId"}

Groupe obligatoire (l'un des groupes suivants est obligatoire) Description Type d'option
runsOn Ressource de calcul pour exécuter l'activité ou la commande. Par exemple, une instance Amazon EC2 ou un cluster Amazon EMR. Objet de référence, par exemple « RunSon » : {"ref » : » myResourceId «}
workerGroup Groupe de travail. Utilisé pour les tâches d'acheminement. Si vous fournissez une valeur runsOn et que workerGroup existe, workerGroup est ignoré. Chaîne

Champs facultatifs Description Type d'option
attemptStatus État de l'activité à distance le plus récemment rapporté. Chaîne
attemptTimeout Délai d'achèvement de la tâche à distance. Si une valeur est définie, une activité à distance qui n'est pas exécutée dans le cadre de la période de départ définie peut être retentée. Période
commandOptions

Prend des paramètres à transmettre au nœud de données Amazon Redshift pendant l'COPYopération. Pour plus d'informations sur les paramètres, consultez COPY dans le manuel Amazon Redshift Database Developer Guide.

Lorsqu'elle charge la table, la commande COPY tente implicitement de convertir les chaînes dans le type de données de la colonne cible. En plus des conversions de données par défaut qui s'exécutent de façon automatique, si vous rencontrez des erreurs ou si vous avez d'autres besoins de conversion, vous pouvez spécifier des paramètres de conversion supplémentaires. Pour plus d'informations, consultez la section Paramètres de conversion des données dans le manuel Amazon Redshift Database Developer Guide.

Si un format de données est associé au nœud de données d'entrée ou de sortie, les paramètres fournis sont ignorés.

Dans la mesure où l'opération de copie utilise d'abord COPY pour insérer des données dans une table intermédiaire, puis utilise une commande INSERT pour copier les données de la table intermédiaire dans la table de destination, certains paramètres de la commande COPY ne s'appliquent pas, comme la fonction de la commande COPY qui lui permet d'activer la compression automatique de la table. Si une compression est nécessaire, ajoutez les détails d'encodage de colonne à l'instruction CREATE TABLE.

De plus, dans certains cas, lorsqu'il doit décharger des données du cluster Amazon Redshift et créer des fichiers dans Amazon S3, il s'appuie sur RedshiftCopyActivity UNLOAD l'opération d'Amazon Redshift.

Pour améliorer les performances pendant la copie et le déchargement, spécifiez le paramètre PARALLEL OFF à partir de la commande UNLOAD. Pour plus d'informations sur les paramètres, consultez UNLOAD dans le manuel Amazon Redshift Database Developer Guide.

Chaîne
dependsOn Spécifie une dépendance sur un autre objet exécutable. Objet de référence : "dependsOn":{"ref":"myActivityId"}
failureAndRerunMode Décrit le comportement du nœud de consommateurs lorsque les dépendances échouent ou sont à nouveau exécutées. Énumération
input Nœud de données d'entrée. La source de données peut être Amazon S3, DynamoDB ou Amazon Redshift. Objet de référence : "input":{"ref":"myDataNodeId"}
lateAfterTimeout Temps écoulé après le début du pipeline pendant lequel l'objet doit être terminé. Il est déclenché uniquement lorsque le type de planification n'est pas défini surondemand. Période
maxActiveInstances Nombre maximal d'instances actives simultanées d'un composant. Les réexécutions ne sont pas comptabilisées dans le nombre d'instances actives. Entier
maximumRetries Nombre maximal de nouvelles tentatives en cas d'échec Entier
onFail Action à exécuter en cas d'échec de l'objet actuel. Objet de référence : "onFail":{"ref":"myActionId"}
onLateAction Actions à déclencher si un objet n'a pas encore été planifié ou n'est toujours pas terminé. Objet de référence : "onLateAction":{"ref":"myActionId"}
onSuccess Action à exécuter en cas de réussite de l'objet actuel. Objet de référence : "onSuccess":{"ref":"myActionId"}
output Nœud de données de sortie. L'emplacement de sortie peut être Amazon S3 ou Amazon Redshift. Objet de référence : "output":{"ref":"myDataNodeId"}
parent Parent de l'objet actuel à partir duquel les emplacements sont hérités. Objet de référence : "parent":{"ref":"myBaseObjectId"}
pipelineLogUri L'URI S3 (tel que 's3 ://BucketName/Key/ ') pour le téléchargement des journaux pour le pipeline. Chaîne
precondition Définit une condition préalable facultative. Un nœud de données n'est pas marqué « READY » tant que toutes les conditions préalables ne sont pas remplies. Objet de référence : "precondition":{"ref":"myPreconditionId"}
file d’attente

Correspond au query_group paramètre d'Amazon Redshift, qui vous permet d'attribuer et de prioriser les activités simultanées en fonction de leur placement dans les files d'attente.

Amazon Redshift limite le nombre de connexions simultanées à 15. Pour plus d'informations, consultez la section Affectation de requêtes à des files d'attente dans le manuel Amazon RDS Database Developer Guide.

Chaîne
reportProgressTimeout

Délai pour les appels successifs de travail à distance adressés à reportProgress.

Si une valeur est définie, les activités à distance qui ne font pas état d'avancement pour la période spécifiée doivent être considérées comme bloquées et, par conséquent, retentées.

Période
retryDelay Délai entre deux nouvelles tentatives. Période
scheduleType

Permet de spécifier si la planification s'applique aux objets de votre pipeline. Les valeurs sont : cron, ondemand et timeseries (cron, à la demande et séries chronologiques).

La planification timeseries signifie que les instances sont programmées à la fin de chaque intervalle.

La planification Cron signifie que les instances sont programmées au début de chaque intervalle.

Une planification ondemand vous permet d'exécuter un pipeline une fois par activation. Cela signifie que vous n'avez pas à cloner ou à recréer le pipeline pour l'exécuter à nouveau.

Pour utiliser des pipelines ondemand, vous devez appeler l'opération ActivatePipeline pour chaque exécution suivante.

Si vous utilisez une planification ondemand, vous devez la spécifier dans l'objet par défaut et faire en sorte qu'elle soit le seul scheduleType spécifié pour les objets du pipeline.

Énumération
transformSql

Expression SQL SELECT utilisée pour transformer les données d'entrée.

Exécutez l'expression transformSql sur la table nommée staging.

Lorsque vous copiez des données depuis DynamoDB ou Amazon S3 AWS Data Pipeline , vous créez une table appelée « staging » et y chargez initialement les données. Les données de cette table sont utilisées pour mettre à jour la table cible.

Le schéma de sortie de transformSql doit correspondre au schéma de la table cible finale.

Si vous spécifiez l'option transformSql, une seconde table intermédiaire est créée à partir de l'instruction SQL spécifiée. Les données de cette seconde table intermédiaire sont ensuite mises à jour dans la table cible finale.

Chaîne

Champs liés à l'exécution Description Type d'option
@activeInstances Liste des objets d'instances actives actuellement planifiés. Objet de référence : "activeInstances":{"ref":"myRunnableObjectId"}
@actualEndTime Heure à laquelle l'exécution de l'objet s'est terminée. DateTime
@actualStartTime Heure à laquelle l'exécution de l'objet a démarré. DateTime
cancellationReason Motif de l'annulation si l'objet a été annulé. Chaîne
@cascadeFailedOn Description de la chaîne de dépendances sur laquelle l'objet a échoué. Objet de référence : "cascadeFailedOn":{"ref":"myRunnableObjectId"}
emrStepLog Journaux d'étapes EMR disponibles uniquement sur les tentatives d'activité EMR Chaîne
errorId ID de l'erreur si l'objet a échoué. Chaîne
errorMessage errorMessage si l'objet a échoué. Chaîne
errorStackTrace Suivi de la pile d'erreurs si l'objet a échoué. Chaîne
@finishedTime Heure à laquelle l'objet a terminé son exécution. DateTime
hadoopJobLog Journaux de travail Hadoop disponibles sur les tentatives pour les activités EMR. Chaîne
@healthStatus État de santé de l'objet qui reflète la réussite ou l'échec de la dernière instance qui a atteint un état résilié. Chaîne
@healthStatusFromInstanceId ID du dernier objet d'instance qui atteint un état résilié. Chaîne
@ healthStatusUpdated Heure Heure à laquelle l'état de santé a été mis à jour pour la dernière fois. DateTime
hostname Nom d'hôte du client qui a sélectionné la tentative de tâche. Chaîne
@lastDeactivatedTime Heure à laquelle l'objet a été désactivé pour la dernière fois. DateTime
@ latestCompletedRun Heure Heure de la dernière exécution pour laquelle l'exécution s'est terminée. DateTime
@latestRunTime Heure de la dernière exécution pour laquelle l'exécution a été planifiée. DateTime
@nextRunTime Prochaine heure d'exécution planifiée. DateTime
reportProgressTime Heure la plus récente pour laquelle l'activité distante a signalé une progression. DateTime
@scheduledEndTime Heure de fin planifiée pour l'objet. DateTime
@scheduledStartTime Heure de début planifiée pour l'objet. DateTime
@État État de l'objet. Chaîne
@Version Version du pipeline avec laquelle l'objet été créé. Chaîne
@waitingOn Description de la liste des dépendances sur laquelle l'objet est en attente. Objet de référence : "waitingOn":{"ref":"myRunnableObjectId"}

Champs système Description Type d'option
@error Erreur décrivant l'objet mal formé. Chaîne
@pipelineId Id du pipeline auquel l'objet appartient. Chaîne
@sphere Sphère d'un objet. Indique sa situation dans le cycle de vie. Par exemple, les objets de composant produisent des objets d'instance qui exécutent des objets « tentatives ». Chaîne