AWS Data Pipeline ya no está disponible para nuevos clientes. Clientes actuales de AWS Data Pipeline pueden seguir utilizando el servicio con normalidad. Más información
Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.
RedshiftCopyActivity
Copia datos de DynamoDB o Amazon S3 en Amazon Redshift. Puede cargar datos en una nueva tabla o combinar datos fácilmente en una tabla existente.
A continuación, se muestra información general de un caso de uso en el que utilizar RedshiftCopyActivity
:
-
Comience por usar AWS Data Pipeline para organizar sus datos en Amazon S3.
-
Se utiliza
RedshiftCopyActivity
para mover los datos de Amazon RDS y Amazon EMR a Amazon Redshift.Esto le permite cargar sus datos en Amazon Redshift para poder analizarlos.
-
Se utiliza SqlActivity para realizar SQL consultas sobre los datos que ha cargado en Amazon Redshift.
Además, RedshiftCopyActivity
le permite trabajar con un S3DataNode
, dado que admite un archivo de manifiesto. Para obtener más información, consulte S3 DataNode.
Ejemplo
A continuación se muestra un ejemplo de este tipo de objeto.
Para garantizar la conversión de formatos, en este ejemplo se utilizan EMPTYASNULLparámetros de conversión IGNOREBLANKLINESespeciales. commandOptions
Para obtener más información, consulte Parámetros de conversión de datos en la Guía de desarrollador de base de datos de Amazon Redshift.
{ "id" : "S3ToRedshiftCopyActivity", "type" : "RedshiftCopyActivity", "input" : { "ref": "MyS3DataNode" }, "output" : { "ref": "MyRedshiftDataNode" }, "insertMode" : "KEEP_EXISTING", "schedule" : { "ref": "Hour" }, "runsOn" : { "ref": "MyEc2Resource" }, "commandOptions": ["EMPTYASNULL", "IGNOREBLANKLINES"] }
En la siguiente definición de canalización de ejemplo se muestra una actividad que usa el modo de inserción APPEND
:
{ "objects": [ { "id": "CSVId1", "name": "DefaultCSV1", "type": "CSV" }, { "id": "RedshiftDatabaseId1", "databaseName": "dbname", "username": "user", "name": "DefaultRedshiftDatabase1", "*password": "password", "type": "RedshiftDatabase", "clusterId": "redshiftclusterId" }, { "id": "Default", "scheduleType": "timeseries", "failureAndRerunMode": "CASCADE", "name": "Default", "role": "DataPipelineDefaultRole", "resourceRole": "DataPipelineDefaultResourceRole" }, { "id": "RedshiftDataNodeId1", "schedule": { "ref": "ScheduleId1" }, "tableName": "orders", "name": "DefaultRedshiftDataNode1", "createTableSql": "create table StructuredLogs (requestBeginTime CHAR(30) PRIMARY KEY DISTKEY SORTKEY, requestEndTime CHAR(30), hostname CHAR(100), requestDate varchar(20));", "type": "RedshiftDataNode", "database": { "ref": "RedshiftDatabaseId1" } }, { "id": "Ec2ResourceId1", "schedule": { "ref": "ScheduleId1" }, "securityGroups": "MySecurityGroup", "name": "DefaultEc2Resource1", "role": "DataPipelineDefaultRole", "logUri": "s3://myLogs", "resourceRole": "DataPipelineDefaultResourceRole", "type": "Ec2Resource" }, { "id": "ScheduleId1", "startDateTime": "yyyy-mm-ddT00:00:00", "name": "DefaultSchedule1", "type": "Schedule", "period": "period", "endDateTime": "yyyy-mm-ddT00:00:00" }, { "id": "S3DataNodeId1", "schedule": { "ref": "ScheduleId1" }, "filePath": "s3://datapipeline-us-east-1/samples/hive-ads-samples.csv", "name": "DefaultS3DataNode1", "dataFormat": { "ref": "CSVId1" }, "type": "S3DataNode" }, { "id": "RedshiftCopyActivityId1", "input": { "ref": "S3DataNodeId1" }, "schedule": { "ref": "ScheduleId1" }, "insertMode": "APPEND", "name": "DefaultRedshiftCopyActivity1", "runsOn": { "ref": "Ec2ResourceId1" }, "type": "RedshiftCopyActivity", "output": { "ref": "RedshiftDataNodeId1" } } ] }
La operación APPEND
añade elementos a una tabla independientemente de las claves principales o de ordenación. Por ejemplo, si tiene la tabla siguiente, puede incluir un registro con el mismo valor de usuario e ID.
ID(PK) USER 1 aaa 2 bbb
Puede incluir un registro con el mismo valor de usuario e ID:
ID(PK) USER 1 aaa 2 bbb 1 aaa
nota
Si una operación APPEND
se interrumpe y reintenta, la nueva ejecución de la canalización resultante podría iniciar la operación desde el principio. Esto puede ocasionar una duplicación adicional, por lo que debe ser consciente de este comportamiento, especialmente si tiene cualquier lógica que cuente el número de filas.
Para ver un tutorial, consulte Copiar datos a Amazon Redshift utilizando AWS Data Pipeline.
Sintaxis
Campos obligatorios | Descripción | Tipo de slot |
---|---|---|
insertMode |
Determina qué AWS Data Pipeline ocurre con los datos preexistentes de la tabla de destino que se superponen con las filas de los datos que se van a cargar. Los valores válidos son:
|
Enumeración |
Campos de invocación de objetos | Descripción | Tipo de slot |
---|---|---|
schedule |
Este objeto se invoca dentro de la ejecución de un intervalo de programación. Especifique una referencia de programación a otro objeto para establecer el orden de ejecución de dependencia para este objeto. En la mayoría de los casos, recomendamos poner la referencia de programación en el objeto de la canalización predeterminado de modo que todos los objetos hereden ese programa. Por ejemplo, puede establecer un programa en el objeto de forma explícita especificando Si el programa maestro de la canalización contiene programas anidados, cree un objeto principal que tenga una referencia de programación. Para obtener más información acerca de las configuraciones de programación opcionales de ejemplo, consulte Programación. |
Objeto de referencia, como por ejemplo: "schedule":{"ref":"myScheduleId"} |
Grupo obligatorio (se requiere uno de los siguientes) | Descripción | Tipo de slot |
---|---|---|
runsOn | El recurso informático para ejecutar la actividad o comando. Por ejemplo, una EC2 instancia de Amazon o un EMR clúster de Amazon. | Objeto de referencia, por ejemplo, "runsOn«: {" ref»:» myResourceId «} |
workerGroup | El grupo de procesos de trabajo. Este se usa para dirigir tareas. Si proporciona un runsOn valor y workerGroup existe, workerGroup se ignora. |
Cadena |
Campos opcionales | Descripción | Tipo de slot |
---|---|---|
attemptStatus | Estado más reciente notificado por la actividad remota. | Cadena |
attemptTimeout | Tiempo de espera para que se complete el trabajo remoto. Si se establece, se puede reintentar una actividad remota que no se complete dentro del tiempo de inicio establecido. | Período |
commandOptions |
Toma parámetros para pasar al nodo de datos de Amazon Redshift durante la operación A medida que carga la tabla, Si un formato de datos está asociado al nodo de datos de entrada o salida, los parámetros proporcionados se omiten. Dado que la operación de copia utiliza primero Además, en algunos casos en que es necesario descargar datos del clúster de Amazon Redshift y crear archivos en Amazon S3, Para mejorar el rendimiento durante la copia y la descarga, especifique el parámetro |
Cadena |
dependsOn | Especificar la dependencia de otro objeto ejecutable. | Objeto de referencia: "dependsOn":{"ref":"myActivityId"} |
failureAndRerunModo | Describe el comportamiento del nodo del consumidor cuando las dependencias producen un error o se vuelven a ejecutar. | Enumeración |
input | El nodo de datos de entrada. El origen de datos puede ser Amazon S3, DynamoDB o Amazon Redshift. | Objeto de referencia: "input":{"ref":"myDataNodeId"} |
lateAfterTimeout | El tiempo transcurrido desde el inicio de la canalización dentro del cual el objeto debe completarse. Solo se activa cuando el tipo de programación no está establecido en ondemand . |
Período |
maxActiveInstances | El número máximo de instancias activas simultáneas de un componente. Las nuevas ejecuciones no cuentan para el número de instancias activas. | Entero |
maximumRetries | Número máximo de reintentos cuando se produce un error. | Entero |
onFail | Acción que se debe ejecutar cuando el objeto actual produzca un error. | Objeto de referencia: "onFail":{"ref":"myActionId"} |
onLateAction | Acciones que deben iniciarse si un objeto todavía no se ha programado o no se ha completado. | Objeto de referencia: "onLateAction":{"ref":"myActionId"} |
onSuccess | Acción que se debe ejecutar cuando el objeto actual se complete correctamente. | Objeto de referencia: "onSuccess":{"ref":"myActionId"} |
salida | El nodo de datos de salida. La ubicación de salida puede ser Amazon S3 o Amazon Redshift. | Objeto de referencia: "output":{"ref":"myDataNodeId"} |
parent | Elemento principal del objeto actual del que se heredarán los slots. | Objeto de referencia: "parent":{"ref":"myBaseObjectId"} |
pipelineLogUri | El S3 URI (como 's3://BucketName/Key/ ') para cargar los registros de la canalización. | Cadena |
precondition | Opcionalmente, defina una condición previa. Un nodo de datos no se marca como READY "hasta que se hayan cumplido todas las condiciones previas. | Objeto de referencia: "precondition":{"ref":"myPreconditionId"} |
cola |
Se corresponde a la configuración de Amazon Redshift limita el número de conexiones simultáneas a 15. Para obtener más información, consulte Asignación de consultas a colas en la Guía para desarrolladores de Amazon RDS Database. |
Cadena |
reportProgressTimeout |
Tiempo de espera para llamadas sucesivas del trabajo remoto a Si se establece, las actividades remotas que no informen de su progreso durante el período especificado pueden considerarse estancadas y, en consecuencia, reintentarse. |
Período |
retryDelay | Duración del tiempo de espera entre dos reintentos. | Período |
scheduleType |
Le permite especificar la programación de objetos en su canalización. Los valores son: La programación La programación Un programa Para usar canalizaciones Si usa un programa |
Enumeración |
transformSql |
La expresión Ejecute la expresión Cuando se copian datos desde DynamoDB o Amazon S3, AWS Data Pipeline crea una tabla denominada “staging” y carga los datos en ella inicialmente. Los datos de esta tabla se utilizan para actualizar la tabla de destino. El esquema de salida de Si especifica la |
Cadena |
Campos de tiempo de ejecución | Descripción | Tipo de slot |
---|---|---|
@activeInstances | Lista de los objetos de instancias activas programados actualmente. | Objeto de referencia: "activeInstances":{"ref":"myRunnableObjectId"} |
@actualEndTime | La hora a la que finalizó la ejecución de este objeto. | DateTime |
@actualStartTime | La hora a la que comenzó la ejecución de este objeto. | DateTime |
cancellationReason | cancellationReason Si este objeto se ha cancelado. | Cadena |
@cascadeFailedOn | Descripción de la cadena de dependencia en la que ha fallado el objeto. | Objeto de referencia: "cascadeFailedOn":{"ref":"myRunnableObjectId"} |
emrStepLog | EMRlos registros de pasos solo están disponibles en los intentos de EMR actividad | Cadena |
errorId | El errorId si este objeto falló. | Cadena |
errorMessage | El errorMessage si este objeto falló. | Cadena |
errorStackTrace | El seguimiento de la pila de error si este objeto ha fallado. | Cadena |
@finishedTime | La hora a la que este objeto finalizó su ejecución. | DateTime |
hadoopJobLog | Los registros de trabajos de Hadoop están disponibles cuando se intenta realizar actividades EMR basadas. | Cadena |
@healthStatus | El estado de salud del objeto que refleja el éxito o el fracaso de la última instancia de objeto que alcanzó un estado terminado. | Cadena |
@healthStatusFromInstanceId | ID del último objeto de instancia que alcanzó un estado terminado. | Cadena |
@ Hora healthStatusUpdated | Hora a la que el estado de salud se actualizó la última vez. | DateTime |
hostname | El nombre de host del cliente que recogió el intento de tarea. | Cadena |
@lastDeactivatedTime | La hora a la que este objeto se desactivó la última vez. | DateTime |
@ latestCompletedRun Hora | Hora de la última ejecución para la que se completó la ejecución. | DateTime |
@latestRunTime | Hora de la última ejecución para la que se programó la ejecución. | DateTime |
@nextRunTime | Hora de ejecución que se va a programar a continuación. | DateTime |
reportProgressTime | La hora más reciente a la que la actividad remota notificó algún progreso. | DateTime |
@scheduledEndTime | Hora de finalización programada para el objeto. | DateTime |
@scheduledStartTime | Hora de comienzo programada para el objeto. | DateTime |
@status | El estado de este objeto. | Cadena |
@version | Versión de la canalización con la que se creó el objeto. | Cadena |
@waitingOn | Descripción de la lista de dependencias de la que este objeto está a la espera. | Objeto de referencia: "waitingOn":{"ref":"myRunnableObjectId"} |
Campos del sistema | Descripción | Tipo de slot |
---|---|---|
@error | Error al describir el objeto mal estructurado. | Cadena |
@pipelineId | ID de la canalización a la que pertenece este objeto. | Cadena |
@sphere | La esfera de un objeto. Denota su lugar en el ciclo de vida. Por ejemplo, los objetos de componente dan lugar a objetos de instancia, que ejecutan objetos de intento. | Cadena |