RedshiftCopyActivity - AWS Data Pipeline

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

RedshiftCopyActivity

Copia i dati da DynamoDB o Amazon S3 su Amazon Redshift. È possibile caricare i dati in una nuova tabella, oppure unirli facilmente alla tabella esistente.

Questa è una panoramica di un caso d'uso in cui utilizzare RedshiftCopyActivity:

  1. Inizia a AWS Data Pipeline utilizzarlo per lo staging dei dati in Amazon S3.

  2. RedshiftCopyActivityUtilizzalo per spostare i dati da Amazon RDS e Amazon EMR ad Amazon Redshift.

    In questo modo puoi caricare i tuoi dati in Amazon Redshift dove puoi analizzarli.

  3. SqlActivityUtilizzalo per eseguire query SQL sui dati che hai caricato in Amazon Redshift.

Inoltre, RedshiftCopyActivity consente di lavorare con un S3DataNode, poiché supporta un file manifest. Per ulteriori informazioni, consulta S3 DataNode.

Esempio

Di seguito è illustrato un esempio di questo tipo di oggetto.

Per garantire la conversione dei formati, questo esempio utilizza i parametri speciali di conversione EMPTYASNULL e IGNOREBLANKLINES in commandOptions. Per informazioni, consulta i parametri di conversione dei dati nella Amazon Redshift Database Developer Guide.

{ "id" : "S3ToRedshiftCopyActivity", "type" : "RedshiftCopyActivity", "input" : { "ref": "MyS3DataNode" }, "output" : { "ref": "MyRedshiftDataNode" }, "insertMode" : "KEEP_EXISTING", "schedule" : { "ref": "Hour" }, "runsOn" : { "ref": "MyEc2Resource" }, "commandOptions": ["EMPTYASNULL", "IGNOREBLANKLINES"] }

L'esempio seguente di definizione di pipeline mostra un'attività che utilizza la modalità di inserimento APPEND:

{ "objects": [ { "id": "CSVId1", "name": "DefaultCSV1", "type": "CSV" }, { "id": "RedshiftDatabaseId1", "databaseName": "dbname", "username": "user", "name": "DefaultRedshiftDatabase1", "*password": "password", "type": "RedshiftDatabase", "clusterId": "redshiftclusterId" }, { "id": "Default", "scheduleType": "timeseries", "failureAndRerunMode": "CASCADE", "name": "Default", "role": "DataPipelineDefaultRole", "resourceRole": "DataPipelineDefaultResourceRole" }, { "id": "RedshiftDataNodeId1", "schedule": { "ref": "ScheduleId1" }, "tableName": "orders", "name": "DefaultRedshiftDataNode1", "createTableSql": "create table StructuredLogs (requestBeginTime CHAR(30) PRIMARY KEY DISTKEY SORTKEY, requestEndTime CHAR(30), hostname CHAR(100), requestDate varchar(20));", "type": "RedshiftDataNode", "database": { "ref": "RedshiftDatabaseId1" } }, { "id": "Ec2ResourceId1", "schedule": { "ref": "ScheduleId1" }, "securityGroups": "MySecurityGroup", "name": "DefaultEc2Resource1", "role": "DataPipelineDefaultRole", "logUri": "s3://myLogs", "resourceRole": "DataPipelineDefaultResourceRole", "type": "Ec2Resource" }, { "id": "ScheduleId1", "startDateTime": "yyyy-mm-ddT00:00:00", "name": "DefaultSchedule1", "type": "Schedule", "period": "period", "endDateTime": "yyyy-mm-ddT00:00:00" }, { "id": "S3DataNodeId1", "schedule": { "ref": "ScheduleId1" }, "filePath": "s3://datapipeline-us-east-1/samples/hive-ads-samples.csv", "name": "DefaultS3DataNode1", "dataFormat": { "ref": "CSVId1" }, "type": "S3DataNode" }, { "id": "RedshiftCopyActivityId1", "input": { "ref": "S3DataNodeId1" }, "schedule": { "ref": "ScheduleId1" }, "insertMode": "APPEND", "name": "DefaultRedshiftCopyActivity1", "runsOn": { "ref": "Ec2ResourceId1" }, "type": "RedshiftCopyActivity", "output": { "ref": "RedshiftDataNodeId1" } } ] }

L'operazione APPEND aggiunge gli elementi a una tabella indipendentemente dalle chiavi di ordinamento o primarie. Ad esempio, se si ha la seguente tabella, è possibile aggiungere un record con lo stesso ID e valore utente.

ID(PK) USER 1 aaa 2 bbb

È possibile aggiungere un record con lo stesso ID e valore utente:

ID(PK) USER 1 aaa 2 bbb 1 aaa
Nota

Se un'operazione APPEND viene interrotta e riprovata, la risultante pipeline rieseguita viene potenzialmente aggiunta dall'inizio. L'operazione potrebbe causare un ulteriore doppione, quindi è necessario essere a conoscenza di questo comportamento, soprattutto se si ha una logica che conteggia il numero di righe.

Per un tutorial, vedere Copia i dati su Amazon Redshift utilizzando AWS Data Pipeline.

Sintassi

Campi obbligatori Descrizione Tipo di slot
insertMode

Determina AWS Data Pipeline cosa fare con i dati preesistenti nella tabella di destinazione che si sovrappongono alle righe dei dati da caricare.

I valori validi sono: KEEP_EXISTING, OVERWRITE_EXISTING, TRUNCATE e APPEND.

KEEP_EXISTING aggiunge nuove righe alla tabella, mentre non modifica le righe esistenti.

KEEP_EXISTING e OVERWRITE_EXISTING utilizzano la chiave primaria, l'ordinamento e le chiavi di distribuzione per identificare quali righe in entrata abbinare alle righe esistenti. Consulta Aggiornamento e inserimento di nuovi dati nella Amazon Redshift Database Developer Guide.

TRUNCATE elimina tutti i dati della tabella di destinazione prima di scrivere i nuovi dati.

APPEND aggiunge tutti i record alla fine della tabella Redshift. APPEND non richiede una chiave primaria e di distribuzione o la chiave di ordinamento, in modo da poter aggiungere gli elementi che potrebbero essere potenziali duplicati.

Enumerazione

Campi Object Invocation Descrizione Tipo di slot
schedule

Questo oggetto viene richiamato entro l'esecuzione di un intervallo di pianificazione.

Specificare un riferimento alla pianificazione di un altro oggetto per impostare l'ordine di esecuzione delle dipendenze per questo oggetto.

Nella maggior parte dei casi, è preferibile inserire il riferimento alla pianificazione nell'oggetto pipeline di default, in modo che tutti gli oggetti possano ereditare tale pianificazione. Ad esempio, è possibile impostare una pianificazione esplicitamente sull'oggetto, specificando "schedule": {"ref": "DefaultSchedule"}.

Se la pianificazione master nella pipeline contiene pianificazioni nidificate, è possibile creare un oggetto padre che dispone di un riferimento alla pianificazione.

Per ulteriori informazioni sulle configurazioni di pianificazione opzionali di esempio, consulta Pianificazione.

Reference Object, ad esempio: "schedule":{"ref":"myScheduleId"}

Gruppo richiesto (uno dei seguenti è obbligatorio) Descrizione Tipo di slot
runsOn Le risorse di calcolo per eseguire l'attività o il comando. Ad esempio, un'istanza Amazon EC2 o un cluster Amazon EMR. Oggetto di riferimento, ad esempio «runSon»: {"ref»:» myResourceId «}
workerGroup Il gruppo di lavoro. Utilizzato per le attività di routing. Se si fornisce un valore runsOn ed esiste workerGroup, workerGroup verrà ignorato. Stringa

Campi opzionali Descrizione Tipo di slot
attemptStatus Lo stato segnalato più di recente dall'attività remota. Stringa
attemptTimeout Timeout per il completamento del lavoro in remoto. Se questo campo è impostato, un'attività remota che non viene completata entro il tempo impostato di avvio viene tentata di nuovo. Periodo
commandOptions

Richiede i parametri da passare al nodo di dati Amazon Redshift durante l'COPYoperazione. Per informazioni sui parametri, consulta COPY nella Amazon Redshift Database Developer Guide.

Mentre carica la tabella, COPY tenta di convertire in modo implicito le stringhe al tipo di dati della colonna di destinazione. Oltre alle conversioni predefinite dei dati che si verificano automaticamente, se si ricevono errori o altre esigenze di conversione, è possibile specificare i parametri di conversione aggiuntivi. Per informazioni, consulta i parametri di conversione dei dati nella Amazon Redshift Database Developer Guide.

Se un formato di dati è associato al nodo di dati in ingresso o in uscita, allora i parametri forniti vengono ignorati.

Poiché l'operazione di copia utilizza per prima cosa COPY per inserire i dati in una tabella intermedia, quindi usa un comando INSERT per copiare i dati dalla tabella intermedia nella tabella di destinazione, alcuni parametri COPY non sono applicabili, ad esempio la capacità del comando COPY di abilitare la compressione automatica della tabella. Se la compressione è obbligatoria, aggiungi i dettagli di codifica della colonna all'istruzione CREATE TABLE.

Inoltre, in alcuni casi, quando deve scaricare dati dal cluster Amazon Redshift e creare file in Amazon S3, si affida RedshiftCopyActivity al funzionamento UNLOAD di Amazon Redshift.

Per migliorare le prestazioni durante la copia e lo scaricamento, specificare il parametro PARALLEL OFF del comando UNLOAD. Per informazioni sui parametri, consulta UNLOAD nella Amazon Redshift Database Developer Guide.

Stringa
dependsOn Specifica una dipendenza su un altro oggetto eseguibile. Reference Object: "dependsOn":{"ref":"myActivityId"}
failureAndRerunModalità Descrive il comportamento del nodo consumer quando le dipendenze presentano un errore o vengono di nuovo eseguite Enumerazione
input Nodo dei dati di input. L'origine dati può essere Amazon S3, DynamoDB o Amazon Redshift. Reference Object: "input":{"ref":"myDataNodeId"}
lateAfterTimeout Il tempo trascorso dopo l'inizio della pipeline entro il quale l'oggetto deve essere completato. Viene attivato solo quando il tipo di pianificazione non è impostato su. ondemand Periodo
maxActiveInstances Il numero massimo di istanze attive simultanee di un componente. Le riesecuzioni non contano ai fini del numero di istanze attive. Numero intero
maximumRetries Numero massimo di tentativi in caso di errore Numero intero
onFail Un'azione da eseguire quando l'oggetto corrente ha esito negativo. Reference Object: "onFail":{"ref":"myActionId"}
onLateAction Azioni che devono essere attivate se un oggetto non è stato ancora pianificato o non è ancora completo. Reference Object: "onLateAction":{"ref":"myActionId"}
onSuccess Un'operazione da eseguire quando l'oggetto corrente ha esito positivo. Reference Object: "onSuccess":{"ref":"myActionId"}
output Nodo dei dati di output. Il percorso di output può essere Amazon S3 o Amazon Redshift. Reference Object: "output":{"ref":"myDataNodeId"}
parent Padre dell'oggetto corrente da cui saranno ereditati gli slot. Reference Object: "parent":{"ref":"myBaseObjectId"}
pipelineLogUri L'URI S3 (ad esempio 's3://BucketName/Key/ ') per caricare i log per la pipeline. Stringa
precondizione Definisce eventualmente una precondizione. Un nodo dati non è contrassegnato come "READY" finché tutte le precondizioni non siano state soddisfatte. Reference Object: "precondition":{"ref":"myPreconditionId"}
coda

Corrisponde all'query_group impostazione di Amazon Redshift, che consente di assegnare e dare priorità alle attività simultanee in base alla loro collocazione nelle code.

Amazon Redshift limita il numero di connessioni simultanee a 15. Per ulteriori informazioni, consulta Assigning Queries to Queues nella Amazon RDS Database Developer Guide.

Stringa
reportProgressTimeout

Timeout per chiamate successive di attività in remoto a reportProgress.

Se impostato, le attività in remoto che non presentano avanzamenti nel periodo specificato potrebbero essere considerate bloccate e sono quindi oggetto di un altro tentativo.

Periodo
retryDelay La durata del timeout tra due tentativi. Periodo
scheduleType

Consente di specificare se la pianificazione per gli oggetti è nella pipeline. I valori sono cron, ondemand e timeseries.

La pianificazione timeseries significa che le istanze sono programmate al termine di ogni intervallo.

La pianificazione Cron significa che le istanze sono programmate all'inizio di ogni intervallo.

Una pianificazione ondemandconsente di eseguire una pipeline una sola volta, per attivazione. Questo significa che non è necessario clonare o ricreare la pipeline per eseguirla di nuovo.

Per utilizzare le pipeline ondemand, chiama l'operazione ActivatePipeline per ogni esecuzione successiva.

Se utilizzi una pianificazione ondemand, devi specificarlo nell'oggetto predefinito e deve essere l'unica scheduleType specificata per gli oggetti della pipeline.

Enumerazione
transformSql

L'espressione SQL SELECT utilizzata per trasformare i dati di input.

Esegui l'espressione transformSql nella tabella denominata staging.

Quando copi dati da DynamoDB o Amazon S3 AWS Data Pipeline , crea una tabella chiamata «staging» e inizialmente carica i dati al suo interno. I dati di questa tabella vengono utilizzati per aggiornare la tabella di destinazione.

Lo schema di output di transformSql deve corrispondere allo schema della tabella di destinazione finale.

Se si specifica l'opzione transformSql, viene creata una seconda tabella intermedia dall'istruzione SQL specificata. I dati di questa seconda tabella intermedia vengono quindi aggiornati nella tabella di destinazione finale.

Stringa

Campi Runtime Descrizione Tipo di slot
@activeInstances Elenco di oggetti di istanze attive attualmente programmate. Reference Object: "activeInstances":{"ref":"myRunnableObjectId"}
@actualEndTime L'ora in cui è terminata l'esecuzione di questo oggetto. DateTime
@actualStartTime L'ora in cui è stata avviata l'esecuzione di questo oggetto. DateTime
cancellationReason CancellationReason se questo oggetto è stato annullato. Stringa
@cascadeFailedOn Descrizione della catena di dipendenza che ha generato l'errore dell'oggetto. Reference Object: "cascadeFailedOn":{"ref":"myRunnableObjectId"}
emrStepLog Log della fase EMR disponibili solo sui tentativi delle attività EMR Stringa
errorId ErrorId se l'oggetto non è riuscito. Stringa
errorMessage ErrorMessage se l'oggetto non è riuscito. Stringa
errorStackTrace Traccia dello stack di errore se l'oggetto non è riuscito. Stringa
@finishedTime L'ora in cui è terminata l'esecuzione di questo oggetto. DateTime
hadoopJobLog Log delle attività Hadoop disponibili per le attività basate su EMR. Stringa
@healthStatus Lo stato di integrità dell'oggetto che riflette l'esito positivo o negativo dell'ultima istanza dell'oggetto che ha raggiunto lo stato di un'istanza terminata. Stringa
@healthStatusFromInstanceId Id dell'ultimo oggetto dell'istanza che ha raggiunto lo stato terminato. Stringa
@ Ora healthStatusUpdated L'ora in cui lo stato di integrità è stato aggiornato l'ultima volta. DateTime
hostname Il nome host del client che si è aggiudicato il tentativo dell'attività. Stringa
@lastDeactivatedTime L'ora in cui l'oggetto è stato disattivato. DateTime
@ latestCompletedRun Ora L'orario dell'esecuzione più recente durante il quale l'esecuzione è stata completata. DateTime
@latestRunTime L'orario dell'esecuzione più recente durante il quale l'esecuzione è stata pianificata. DateTime
@nextRunTime L'orario dell'esecuzione da programmare come successiva. DateTime
reportProgressTime Il periodo di tempo più recente in cui l'attività remota ha segnalato un progresso. DateTime
@scheduledEndTime L'orario di termine della pianificazione per un oggetto. DateTime
@scheduledStartTime L'orario di inizio della pianificazione per l'oggetto. DateTime
@status Lo stato di questo oggetto. Stringa
@version Versione della pipeline con cui l'oggetto è stato creato. Stringa
@waitingOn Descrizione dell'elenco di dipendenze per cui questo oggetto è in attesa. Reference Object: "waitingOn":{"ref":"myRunnableObjectId"}

Campi di sistema Descrizione Tipo di slot
@error Errore che descrive il formato oggetto errato. Stringa
@pipelineId L'id della pipeline a cui appartiene questo oggetto. Stringa
@sphere La sfera di un oggetto. Indica la propria posizione nel ciclo di vita. Ad esempio, i Component Objects generano Instance Objects che eseguono Attempt Objects. Stringa