RedshiftCopyActivity - AWS Data Pipeline

RedshiftCopyActivity

Copia datos directamente de DynamoDB o Amazon S3 en Amazon Redshift. Puede cargar datos en una nueva tabla o combinar datos fácilmente en una tabla existente.

A continuación, se muestra información general de un caso de uso en el que utilizar RedshiftCopyActivity:

  1. Empiece por utilizar AWS Data Pipeline para almacenar sus datos en Amazon S3.

  2. Se utiliza RedshiftCopyActivity para mover los datos de Amazon RDS y Amazon EMR a Amazon Redshift.

    Esto le permite cargar sus datos en Amazon Redshift para poder analizarlos.

  3. Utilice SqlActivity para realizar consultas SQL en los datos que ha cargado en Amazon Redshift.

Además, RedshiftCopyActivity le permite trabajar con un S3DataNode, dado que admite un archivo de manifiesto. Para obtener más información, consulte S3DataNode.

Ejemplo

A continuación se muestra un ejemplo de este tipo de objeto.

Para garantizar la conversión de formatos, este ejemplo utiliza los parámetros de conversión especiales EMPTYASNULL e IGNOREBLANKLINES en commandOptions. Para obtener más información, consulte Parámetros de conversión de datos en la Guía de desarrollador de base de datos de Amazon Redshift.

{ "id" : "S3ToRedshiftCopyActivity", "type" : "RedshiftCopyActivity", "input" : { "ref": "MyS3DataNode" }, "output" : { "ref": "MyRedshiftDataNode" }, "insertMode" : "KEEP_EXISTING", "schedule" : { "ref": "Hour" }, "runsOn" : { "ref": "MyEc2Resource" }, "commandOptions": ["EMPTYASNULL", "IGNOREBLANKLINES"] }

En la siguiente definición de canalización de ejemplo se muestra una actividad que usa el modo de inserción APPEND:

{ "objects": [ { "id": "CSVId1", "name": "DefaultCSV1", "type": "CSV" }, { "id": "RedshiftDatabaseId1", "databaseName": "dbname", "username": "user", "name": "DefaultRedshiftDatabase1", "*password": "password", "type": "RedshiftDatabase", "clusterId": "redshiftclusterId" }, { "id": "Default", "scheduleType": "timeseries", "failureAndRerunMode": "CASCADE", "name": "Default", "role": "DataPipelineDefaultRole", "resourceRole": "DataPipelineDefaultResourceRole" }, { "id": "RedshiftDataNodeId1", "schedule": { "ref": "ScheduleId1" }, "tableName": "orders", "name": "DefaultRedshiftDataNode1", "createTableSql": "create table StructuredLogs (requestBeginTime CHAR(30) PRIMARY KEY DISTKEY SORTKEY, requestEndTime CHAR(30), hostname CHAR(100), requestDate varchar(20));", "type": "RedshiftDataNode", "database": { "ref": "RedshiftDatabaseId1" } }, { "id": "Ec2ResourceId1", "schedule": { "ref": "ScheduleId1" }, "securityGroups": "MySecurityGroup", "name": "DefaultEc2Resource1", "role": "DataPipelineDefaultRole", "logUri": "s3://myLogs", "resourceRole": "DataPipelineDefaultResourceRole", "type": "Ec2Resource" }, { "id": "ScheduleId1", "startDateTime": "yyyy-mm-ddT00:00:00", "name": "DefaultSchedule1", "type": "Schedule", "period": "period", "endDateTime": "yyyy-mm-ddT00:00:00" }, { "id": "S3DataNodeId1", "schedule": { "ref": "ScheduleId1" }, "filePath": "s3://datapipeline-us-east-1/samples/hive-ads-samples.csv", "name": "DefaultS3DataNode1", "dataFormat": { "ref": "CSVId1" }, "type": "S3DataNode" }, { "id": "RedshiftCopyActivityId1", "input": { "ref": "S3DataNodeId1" }, "schedule": { "ref": "ScheduleId1" }, "insertMode": "APPEND", "name": "DefaultRedshiftCopyActivity1", "runsOn": { "ref": "Ec2ResourceId1" }, "type": "RedshiftCopyActivity", "output": { "ref": "RedshiftDataNodeId1" } } ] }

La operación APPEND añade elementos a una tabla independientemente de las claves principales o de ordenación. Por ejemplo, si tiene la tabla siguiente, puede incluir un registro con el mismo valor de usuario e ID.

ID(PK) USER 1 aaa 2 bbb

Puede incluir un registro con el mismo valor de usuario e ID:

ID(PK) USER 1 aaa 2 bbb 1 aaa
nota

Si una operación APPEND se interrumpe y reintenta, la nueva ejecución de la canalización resultante podría iniciar la operación desde el principio. Esto puede ocasionar una duplicación adicional, por lo que debe ser consciente de este comportamiento, especialmente si tiene cualquier lógica que cuente el número de filas.

Para ver un tutorial, consulte Copiar datos a Amazon Redshift utilizando AWS Data Pipeline.

Sintaxis

Campos obligatorios Descripción Tipo de slot
insertMode

Determina qué hace AWS Data Pipeline con los datos preexistentes en la tabla de destino que se superpone con las filas de datos que se van a cargar.

Los valores válidos son: KEEP_EXISTING, OVERWRITE_EXISTING, TRUNCATE y APPEND.

KEEP_EXISTING añade nuevas filas a la tabla y deja sin modificar las filas existentes.

KEEP_EXISTING y OVERWRITE_EXISTING utilizan la clave principal, de ordenación y las claves de distribución para identificar qué filas entrantes se corresponden con filas existentes. Consulte Actualización e inserción de datos nuevos en la Guía de desarrollador de base de datos de Amazon Redshift.

TRUNCATE elimina todos los datos de la tabla de destino antes de escribir los nuevos datos.

APPEND añade todos los registros al final de la tabla de Redshift. APPEND no requiere una clave principal, de distribución o de ordenación, por lo que se podrían agregar elementos que pueden ser duplicados.

Enumeración

Campos de invocación de objetos Descripción Tipo de slot
schedule

Este objeto se invoca dentro de la ejecución de un intervalo de programación.

Especifique una referencia de programación a otro objeto para establecer el orden de ejecución de dependencia para este objeto.

En la mayoría de los casos, recomendamos poner la referencia de programación en el objeto de la canalización predeterminado de modo que todos los objetos hereden ese programa. Por ejemplo, puede establecer un programa en el objeto de forma explícita especificando "schedule": {"ref": "DefaultSchedule"}.

Si el programa maestro de la canalización contiene programas anidados, cree un objeto principal que tenga una referencia de programación.

Para obtener más información acerca de las configuraciones de programación opcionales de ejemplo, consulte Programación.

Objeto de referencia, como por ejemplo: "schedule":{"ref":"myScheduleId"}

Grupo obligatorio (se requiere uno de los siguientes) Descripción Tipo de slot
runsOn El recurso informático para ejecutar la actividad o comando. Por ejemplo, una instancia de Amazon EC2 o un clúster de Amazon EMR. Objeto de referencia, por ejemplo, "runsOn":{"ref":"myResourceId"}
workerGroup El grupo de procesos de trabajo. Este se usa para dirigir tareas. Si proporciona un valor runsOn y existe workerGroup, workerGroup se ignora. Cadena

Campos opcionales Descripción Tipo de slot
attemptStatus Estado más reciente notificado por la actividad remota. Cadena
attemptTimeout Tiempo de espera para que se complete el trabajo remoto. Si se establece, se puede reintentar una actividad remota que no se complete dentro del tiempo de inicio establecido. Período
commandOptions

Toma parámetros para pasar al nodo de datos de Amazon Redshift durante la operación COPY. Para más información, consulte COPY en la Guía para desarrolladores de bases de datos de Amazon Redshift.

A medida que carga la tabla, COPY intenta convertir de forma implícita las cadenas al tipo de datos de la columna de destino. Además de las conversiones de datos predeterminadas que ocurren automáticamente, si recibe errores o tiene otras necesidades de conversión, puede especificar parámetros de conversión adicionales. Para obtener más información, consulte Parámetros de conversión de datos en la Guía de desarrollador de base de datos de Amazon Redshift.

Si un formato de datos está asociado al nodo de datos de entrada o salida, los parámetros proporcionados se omiten.

Dado que la operación de copia utiliza primero COPY para insertar los datos en una tabla provisional y, a continuación, utiliza un comando INSERT para copiar los datos desde la tabla provisional a la tabla de destino, algunos parámetros COPY no se aplican, como la capacidad del comando COPY para permitir la compresión automática de la tabla. Si la compresión es necesaria, añada los detalles de codificación de columna a la instrucción CREATE TABLE.

Además, en algunos casos en que es necesario descargar datos del clúster de Amazon Redshift y crear archivos en Amazon S3, RedshiftCopyActivity se basa en la operación UNLOAD de Amazon Redshift.

Para mejorar el rendimiento durante la copia y la descarga, especifique el parámetro PARALLEL OFF del comando UNLOAD. Para obtener más información sobre los parámetros, consulte DESCARGAR en la Guía de desarrollador de base de datos de Amazon Redshift.

Cadena
dependsOn Especificar la dependencia de otro objeto ejecutable. Objeto de referencia: "dependsOn":{"ref":"myActivityId"}
failureAndRerunMode Describe el comportamiento del nodo del consumidor cuando las dependencias producen un error o se vuelven a ejecutar. Enumeración
input El nodo de datos de entrada. El origen de datos puede ser Amazon S3, DynamoDB o Amazon Redshift. Objeto de referencia: "input":{"ref":"myDataNodeId"}
lateAfterTimeout El tiempo transcurrido desde el inicio de la canalización dentro del cual el objeto debe completarse. Solo se activa cuando el tipo de programación no está establecido en ondemand. Período
maxActiveInstances El número máximo de instancias activas simultáneas de un componente. Las nuevas ejecuciones no cuentan para el número de instancias activas. Entero
maximumRetries Número máximo de reintentos cuando se produce un error. Entero
onFail Acción que se debe ejecutar cuando el objeto actual produzca un error. Objeto de referencia: "onFail":{"ref":"myActionId"}
onLateAction Acciones que deben iniciarse si un objeto todavía no se ha programado o no se ha completado. Objeto de referencia: "onLateAction":{"ref":"myActionId"}
onSuccess Acción que se debe ejecutar cuando el objeto actual se complete correctamente. Objeto de referencia: "onSuccess":{"ref":"myActionId"}
salida El nodo de datos de salida. La ubicación de salida puede ser Amazon S3 o Amazon Redshift. Objeto de referencia: "output":{"ref":"myDataNodeId"}
parent Elemento principal del objeto actual del que se heredarán los slots. Objeto de referencia: "parent":{"ref":"myBaseObjectId"}
pipelineLogUri El URI de S3 (como 's3://BucketName/Key/') para cargar registros para la canalización. Cadena
precondition Opcionalmente, defina una condición previa. Un nodo de datos no se marca como "READY" hasta que se han cumplido todas las condiciones previas. Objeto de referencia: "precondition":{"ref":"myPreconditionId"}
cola

Se corresponde a la configuración de query_group en Amazon Redshift que le permite asignar y priorizar actividades simultáneas en función de su ubicación en las colas.

Amazon Redshift limita el número de conexiones simultáneas a 15. Para obtener más información, consulte Asignación de consultas a las colas en la Guía de desarrollador de base de datos de Amazon Redshift.

Cadena
reportProgressTimeout

Tiempo de espera para llamadas sucesivas del trabajo remoto a reportProgress.

Si se establece, las actividades remotas que no informen de su progreso durante el período especificado pueden considerarse estancadas y, en consecuencia, reintentarse.

Período
retryDelay Duración del tiempo de espera entre dos reintentos. Período
scheduleType

Le permite especificar la programación de objetos en su canalización. Los valores son: cron, ondemand y timeseries.

La programación timeseries significa que las instancias se programan al final de cada intervalo.

La programación Cron significa que las instancias se programan al principio de cada intervalo.

Un programa ondemand le permite ejecutar una canalización una vez por activación. Esto significa que no tiene que clonar o recrear la canalización para ejecutarla de nuevo.

Para usar canalizaciones ondemand, solo tiene que llamar a la operación ActivatePipeline para cada ejecución posterior.

Si usa un programa ondemand, debe especificarlo en el objeto predeterminado y debe ser el único scheduleType especificado para los objetos de la canalización.

Enumeración
transformSql

La expresión SQL SELECT que se utiliza para transformar los datos de entrada.

Ejecute la expresión transformSql en la tabla denominada staging.

Cuando se copian datos desde DynamoDB o Amazon S3, AWS Data Pipeline crea una tabla denominada “staging” y carga los datos en ella inicialmente. Los datos de esta tabla se utilizan para actualizar la tabla de destino.

El esquema de salida de transformSql debe coincidir con el esquema de la tabla de destino final.

Si especifica la opción transformSql, se crea una segunda tabla provisional a partir de la instrucción SQL especificada. Los datos de esta segunda tabla staging se actualizan en la tabla de destino final.

Cadena

Campos de tiempo de ejecución Descripción Tipo de slot
@activeInstances Lista de los objetos de instancias activas programados actualmente. Objeto de referencia: "activeInstances":{"ref":"myRunnableObjectId"}
@actualEndTime La hora a la que finalizó la ejecución de este objeto. DateTime
@actualStartTime La hora a la que comenzó la ejecución de este objeto. DateTime
cancellationReason El valor de cancellationReason si este objeto se ha cancelado. Cadena
@cascadeFailedOn Descripción de la cadena de dependencia en la que ha fallado el objeto. Objeto de referencia: "cascadeFailedOn":{"ref":"myRunnableObjectId"}
emrStepLog Registros de pasos de EMR disponibles únicamente sobre intentos de actividad de EMR. Cadena
errorId El valor de errorId si este objeto ha fallado. Cadena
errorMessage El valor de errorMessage si este objeto ha fallado. Cadena
errorStackTrace El seguimiento de la pila de error si este objeto ha fallado. Cadena
@finishedTime La hora a la que este objeto finalizó su ejecución. DateTime
hadoopJobLog Los registros de trabajo de Hadoop disponibles sobre intentos de actividades basadas en EMR. Cadena
@healthStatus El estado de salud del objeto que refleja el éxito o el fracaso de la última instancia de objeto que alcanzó un estado terminado. Cadena
@healthStatusFromInstanceId ID del último objeto de instancia que alcanzó un estado terminado. Cadena
@healthStatusUpdatedTime Hora a la que el estado de salud se actualizó la última vez. DateTime
hostname El nombre de host del cliente que recogió el intento de tarea. Cadena
@lastDeactivatedTime La hora a la que este objeto se desactivó la última vez. DateTime
@latestCompletedRunTime Hora de la última ejecución para la que se completó la ejecución. DateTime
@latestRunTime Hora de la última ejecución para la que se programó la ejecución. DateTime
@nextRunTime Hora de ejecución que se va a programar a continuación. DateTime
reportProgressTime La hora más reciente a la que la actividad remota notificó algún progreso. DateTime
@scheduledEndTime Hora de finalización programada para el objeto. DateTime
@scheduledStartTime Hora de comienzo programada para el objeto. DateTime
@status El estado de este objeto. Cadena
@version Versión de la canalización con la que se creó el objeto. Cadena
@waitingOn Descripción de la lista de dependencias de la que este objeto está a la espera. Objeto de referencia: "waitingOn":{"ref":"myRunnableObjectId"}

Campos del sistema Descripción Tipo de slot
@error Error al describir el objeto mal estructurado. Cadena
@pipelineId ID de la canalización a la que pertenece este objeto. Cadena
@sphere La esfera de un objeto. Denota su lugar en el ciclo de vida. Por ejemplo, los objetos de componente dan lugar a objetos de instancia, que ejecutan objetos de intento. Cadena