RedshiftCopyActivity - AWS Data Pipeline

RedshiftCopyActivity

Copia uma tabela do DynamoDB ou Amazon S3 para o Amazon Redshift. Você pode carregar dados em uma nova tabela ou mesclar dados em uma tabela existente de maneira fácil.

Esta é uma visão geral de um caso de uso no qual usar RedshiftCopyActivity:

  1. Comece usando o AWS Data Pipeline para preparar seus dados no Amazon S3.

  2. Use o RedshiftCopyActivity para mover os dados do Amazon RDS e do Amazon EMR para o Amazon Redshift.

    Isso permite que você carregue seus dados no Amazon Redshift, onde pode analisá-los.

  3. Use o SqlActivity para realizar consultas SQL nos dados que você carregou no Amazon Redshift.

Além disso, RedshiftCopyActivity permite que você trabalhe com um S3DataNode, já que ele oferece suporte a um arquivo manifesto. Para obter mais informações, consulte S3DataNode.

Exemplo

Veja a seguir um exemplo deste tipo de objeto.

Para garantir a conversão de formatos, este exemplo usa os parâmetros de conversão especiais EMPTYASNULL e IGNOREBLANKLINES em commandOptions. Para obter informações, consulte Parâmetros de conversão de dados no Guia do desenvolvedor de banco de dados do Amazon Redshift.

{ "id" : "S3ToRedshiftCopyActivity", "type" : "RedshiftCopyActivity", "input" : { "ref": "MyS3DataNode" }, "output" : { "ref": "MyRedshiftDataNode" }, "insertMode" : "KEEP_EXISTING", "schedule" : { "ref": "Hour" }, "runsOn" : { "ref": "MyEc2Resource" }, "commandOptions": ["EMPTYASNULL", "IGNOREBLANKLINES"] }

A definição de pipeline de exemplo a seguir mostra uma atividade que usa o modo de inserção APPEND:

{ "objects": [ { "id": "CSVId1", "name": "DefaultCSV1", "type": "CSV" }, { "id": "RedshiftDatabaseId1", "databaseName": "dbname", "username": "user", "name": "DefaultRedshiftDatabase1", "*password": "password", "type": "RedshiftDatabase", "clusterId": "redshiftclusterId" }, { "id": "Default", "scheduleType": "timeseries", "failureAndRerunMode": "CASCADE", "name": "Default", "role": "DataPipelineDefaultRole", "resourceRole": "DataPipelineDefaultResourceRole" }, { "id": "RedshiftDataNodeId1", "schedule": { "ref": "ScheduleId1" }, "tableName": "orders", "name": "DefaultRedshiftDataNode1", "createTableSql": "create table StructuredLogs (requestBeginTime CHAR(30) PRIMARY KEY DISTKEY SORTKEY, requestEndTime CHAR(30), hostname CHAR(100), requestDate varchar(20));", "type": "RedshiftDataNode", "database": { "ref": "RedshiftDatabaseId1" } }, { "id": "Ec2ResourceId1", "schedule": { "ref": "ScheduleId1" }, "securityGroups": "MySecurityGroup", "name": "DefaultEc2Resource1", "role": "DataPipelineDefaultRole", "logUri": "s3://myLogs", "resourceRole": "DataPipelineDefaultResourceRole", "type": "Ec2Resource" }, { "id": "ScheduleId1", "startDateTime": "yyyy-mm-ddT00:00:00", "name": "DefaultSchedule1", "type": "Schedule", "period": "period", "endDateTime": "yyyy-mm-ddT00:00:00" }, { "id": "S3DataNodeId1", "schedule": { "ref": "ScheduleId1" }, "filePath": "s3://datapipeline-us-east-1/samples/hive-ads-samples.csv", "name": "DefaultS3DataNode1", "dataFormat": { "ref": "CSVId1" }, "type": "S3DataNode" }, { "id": "RedshiftCopyActivityId1", "input": { "ref": "S3DataNodeId1" }, "schedule": { "ref": "ScheduleId1" }, "insertMode": "APPEND", "name": "DefaultRedshiftCopyActivity1", "runsOn": { "ref": "Ec2ResourceId1" }, "type": "RedshiftCopyActivity", "output": { "ref": "RedshiftDataNodeId1" } } ] }

APPEND A operação adiciona itens a uma tabela, independentemente das chaves principais ou de classificação. Por exemplo, se você tiver a tabela a seguir, poderá anexar um registro com o mesmo ID e o valor de usuário.

ID(PK) USER 1 aaa 2 bbb

Você pode anexar um registro com o mesmo ID e valor de usuário:

ID(PK) USER 1 aaa 2 bbb 1 aaa
nota

Se uma operação APPEND é interrompida e realizada novamente, a nova execução resultante do pipeline pode acrescentar linhas desde o início. Isso pode causar uma duplicação. Por isso, você deve estar ciente desse comportamento, especialmente se houver alguma lógica que conta o número de linhas.

Para ver um tutorial, consulte Copiar dados para o Amazon Redshift usando AWS Data Pipeline.

Sintaxe

Campos obrigatórios Descrição Tipo de slot
insertMode

Determina o que o AWS Data Pipeline faz com os dados preexistentes na tabela de destino que se sobrepõem com linhas aos dados a serem carregados.

Os valores válidos são: KEEP_EXISTING, OVERWRITE_EXISTING, TRUNCATE e APPEND.

KEEP_EXISTING adiciona novas linhas à tabela deixando quaisquer linhas existentes sem modificações.

KEEP_EXISTING e OVERWRITE_EXISTING usam as chaves primária, de classificação e de distribuição para identificar quais linhas de entrada correspondem a linhas existentes. Consulte Atualizar e inserir novos dados no Guia do desenvolvedor de banco de dados do Amazon Redshift.

TRUNCATE exclui todos os dados na tabela de destino antes de gravar os novos dados.

APPEND adiciona todos os registros ao final da tabela do Redshift. APPEND não requer uma chave de distribuição primária ou uma chave de classificação de modo que itens que podem ser possíveis duplicatas podem ser anexados.

Enumeração

Campos de invocação de objetos Descrição Tipo de slot
schedule

Esse objeto é invocado durante a execução de um intervalo de programação.

Especifique uma referência de programação para outro objeto para definir a ordem de execução de dependência desse objeto.

Na maioria dos casos, recomendamos colocar a referência de programação no objeto de pipeline padrão para que todos os objetos herdem essa programação. Por exemplo, você pode definir uma programação explicitamente no objeto especificando "schedule": {"ref": "DefaultSchedule"}.

Se a programação principal do seu pipeline contiver programações aninhadas, crie um objeto pai que tenha uma referência de programação.

Para obter mais informações sobre configurações opcionais de programação de exemplo, consulte Programação.

Objeto de referência, como: "schedule":{"ref":"myScheduleId"}

Grupo obrigatório (um dos seguintes é obrigatório) Descrição Tipo de slot
runsOn O recurso computacional para executar a atividade ou o comando. Por exemplo, uma instância do Amazon EC2 ou um cluster do Amazon EMR. Objeto de referência, como “runsOn”:{"ref":"myResourceId"}
workerGroup O grupo de operadores. Isso é usado para tarefas de roteamento. Se você fornecer um valor de runsOn e workerGroup existir, workerGroup será ignorado. String

Campos opcionais Descrição Tipo de slot
attemptStatus Status mais recente da atividade remota. String
attemptTimeout Tempo limite para conclusão do trabalho remoto. Se definida, uma atividade remota não concluída dentro do prazo definido poderá ser executada novamente. Período
commandOptions

Pega parâmetros para passar para o nó de dados do Amazon Redshift durante a operação COPY. Para obter mais informações sobre parâmetros, consulte COPIAR no Guia do desenvolvedor de banco de dados do Amazon Redshift.

À medida que carrega a tabela, COPY tenta converter implicitamente as strings no tipo de dados da coluna de destino. Além das conversões de dados padrão que são realizadas automaticamente, se você receber erros ou tiver outras necessidades de conversão, especifique parâmetros de conversão adicionais. Para obter informações, consulte Parâmetros de conversão de dados no Guia do desenvolvedor de banco de dados do Amazon Redshift.

Se um formato de dados é associado ao nó de dados de entrada ou saída, os parâmetros fornecidos são ignorados.

Como a operação de cópia usa COPY para inserir dados em uma tabela de preparação e, em seguida, usa um comando INSERT para copiar os dados da tabela de preparação para a tabela de destino, alguns parâmetros COPY não se aplicam, como a capacidade do comando COPY para permitir a compactação automática da tabela. Se a compactação for necessária, adicione detalhes de codificação de coluna na instrução CREATE TABLE.

Além disso, em alguns casos, quando é preciso descarregar os dados do cluster do Amazon Redshift e criar arquivos no Amazon S3, a RedshiftCopyActivity depende da operação UNLOAD do Amazon Redshift.

Para melhorar o desempenho ao copiar e descarregar, especifique o parâmetro PARALLEL OFF do comando UNLOAD. Para obter informações sobre parâmetros, consulte DESCARREGAR no Guia do desenvolvedor de banco de dados do Amazon Redshift.

String
dependsOn Especifique a dependência em outro objeto executável. Objeto de referência: "dependsOn":{"ref":"myActivityId"}
failureAndRerunMode Descreve o comportamento do nó do consumidor quando as dependências apresentam falhas ou são executadas novamente. Enumeração
input O nó de dados de entrada. A fonte de dados pode ser o Amazon S3, o DynamoDB ou o Amazon Redshift. Objeto de referência: "input":{"ref":"myDataNodeId"}
lateAfterTimeout O tempo decorrido após o início do pipeline no qual o objeto deve ser concluído. Ele é acionado somente quando o tipo de programação não está definido como ondemand. Período
maxActiveInstances O número máximo de instâncias ativas simultâneas de um componente. Novas execuções não contam para o número de instâncias ativas. Inteiro
maximumRetries Quantidade máxima de novas tentativas com falha. Inteiro
onFail Uma ação a ser executada quando há falha no objeto atual. Objeto de referência: "onFail":{"ref":"myActionId"}
onLateAction Ações que devem ser acionadas se um objeto ainda não foi agendado ou não foi concluído. Objeto de referência: "onLateAction":{"ref":"myActionId"}
onSuccess Uma ação a ser executada quando o objeto atual é executado com êxito. Objeto de referência: "onSuccess":{"ref":"myActionId"}
output O nó de dados de saída. A localização de saída pode ser o Amazon S3 ou o Amazon Redshift. Objeto de referência: "output":{"ref":"myDataNodeId"}
parent Pai do objeto atual a partir do qual os slots serão herdados. Objeto de referência: "parent":{"ref":"myBaseObjectId"}
pipelineLogUri O URI do S3 (por exemplo, "s3://BucketName/Key/") para fazer upload de logs para o pipeline. String
precondition Se desejar, você pode definir uma precondição. Um nó de dados não fica marcado como "READY" até que todas as precondições tenham sido atendidas. Objeto de referência: "precondition":{"ref":"myPreconditionId"}
queue (fila)

Corresponde à configuração query_group no Amazon Redshift, que permite atribuir e priorizar atividades simultâneas com base em sua colocação em filas.

O Amazon Redshift limita o número de conexões simultâneas a 15. Para obter mais informações, consulte Atribuir consultas a filas no Guia do desenvolvedor de banco de dados do Amazon RDS.

String
reportProgressTimeout

Tempo limite para as chamadas sucessivas de trabalho remoto para reportProgress.

Se definidas, as atividades remotas sem progresso para o período especificado podem ser consideradas como interrompidas e executadas novamente.

Período
retryDelay A duração do tempo limite entre duas novas tentativas. Período
scheduleType

Permite que você especifique a programação para objetos no pipeline. Os valores são: cron, ondemand e timeseries.

A programação timeseries significa que as instâncias são programadas no final de cada intervalo.

A programação Cron significa que as instâncias são programadas no início de cada intervalo.

Uma programação ondemand permite que você execute um pipeline uma vez por ativação. Isso significa que você não precisa clonar nem recriar o pipeline para executá-lo novamente.

Para usar pipelines ondemand, chame a operação ActivatePipeline para cada execução subsequente.

Se você usar uma programação ondemand, deverá especificá-la no objeto padrão, e este deverá ser o único scheduleType especificado para objetos no pipeline.

Enumeração
transformSql

A expressão SQL SELECT usada para transformar os dados de entrada.

Execute a expressão transformSql na tabela chamada staging.

Ao copiar dados do DynamoDB ou do Amazon S3, o AWS Data Pipeline cria uma tabela chamada “staging” e, inicialmente, carrega dados nesta tabela. Os dados dessa tabela são usados para atualizar a tabela de destino.

O esquema de saída de transformSql deve corresponder ao esquema da tabela de destinos finais.

Se você especificar a opção transformSql, uma segunda tabela de preparação será criada a partir da instrução SQL especificada. Os dados na segunda tabela de preparação são, então, atualizados na tabela de destino final.

String

Campos de tempo de execução Descrição Tipo de slot
@activeInstances Lista dos objetos da instância ativa agendados no momento. Objeto de referência: "activeInstances":{"ref":"myRunnableObjectId"}
@actualEndTime Hora em que a execução deste objeto foi concluída. DateTime
@actualStartTime Hora em que a execução deste objeto foi iniciada. DateTime
cancellationReason O motivo do cancelamento, se esse objeto foi cancelado. String
@cascadeFailedOn Descrição da cadeia de dependência na qual o objeto apresentou falha. Objeto de referência: "cascadeFailedOn":{"ref":"myRunnableObjectId"}
emrStepLog Registros da etapa do EMR disponíveis somente nas tentativas de atividade do EMR. String
errorId O ID do erro se esse objeto apresentou falha. String
errorMessage A mensagem de erro se esse objeto apresentou falha. String
errorStackTrace O rastreamento de pilha com erro se esse objeto apresentou falha. String
@finishedTime A hora em que esse objeto terminou a execução. DateTime
hadoopJobLog Registos de trabalho do Hadoop disponíveis nas tentativas de atividades baseadas em EMR. String
@healthStatus O status de integridade do objeto que indica se houve sucesso ou falha na última instância concluída do objeto. String
@healthStatusFromInstanceId ID do último objeto da instância concluído. String
@healthStatusUpdatedTime Hora em que o status de integridade foi atualizado pela última vez. DateTime
hostname O nome do host do cliente que capturou a tentativa da tarefa. String
@lastDeactivatedTime A hora em que esse objeto foi desativado pela última vez. DateTime
@latestCompletedRunTime Hora da última execução concluída. DateTime
@latestRunTime Hora da última execução programada. DateTime
@nextRunTime Hora da próxima execução a ser programada. DateTime
reportProgressTime A última vez que a atividade remota relatou progresso. DateTime
@scheduledEndTime Horário de término da programação para o objeto. DateTime
@scheduledStartTime Horário de início da programação para o objeto. DateTime
@status O status deste objeto. String
@version A versão do pipeline com que o objeto foi criado. String
@waitingOn Descrição da lista de dependências em que este objeto está aguardando. Objeto de referência: "waitingOn":{"ref":"myRunnableObjectId"}

Campos do sistema Descrição Tipo de slot
@error Erro ao descrever o objeto malformado. String
@pipelineId ID do pipeline ao qual este objeto pertence. String
@sphere A esfera de um objeto. Denota seu lugar no ciclo de vida. Por exemplo, objetos de componentes dão origem a objetos de instância, que executam objetos de tentativa. String