RedshiftCopyActivity - AWS Data Pipeline

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

RedshiftCopyActivity

将数据从 DynamoDB 或 Amazon S3 复制到 Amazon Redshift。您可以将数据加载到新表中,或轻松地将数据并入现有表中。

下面概括了使用 RedshiftCopyActivity 的使用案例:

  1. 首先使用 AWS Data Pipeline 在 Amazon S3 中暂存您的数据。

  2. 使用 RedshiftCopyActivity 将数据从 Amazon RDS 和 Amazon EMR 移动到 Amazon Redshift。

    这可让您将数据加载到 Amazon Redshift 中,您可以在此处对数据进行分析。

  3. 使用 SqlActivity 可以对已加载到 Amazon Redshift 中的数据执行 SQL 查询。

此外,借助 RedshiftCopyActivity,您可以使用 S3DataNode,因为它支持清单文件。有关更多信息,请参阅 S3 DataNode

示例

以下是该对象类型的示例。

为了确保格式转换,本示例在 commandOptions 中使用 EMPTYASNULLIGNOREBLANKLINES 特殊转换参数。有关信息,请参阅 Amazon Redshift 数据库开发人员指南中的数据转换参数

{ "id" : "S3ToRedshiftCopyActivity", "type" : "RedshiftCopyActivity", "input" : { "ref": "MyS3DataNode" }, "output" : { "ref": "MyRedshiftDataNode" }, "insertMode" : "KEEP_EXISTING", "schedule" : { "ref": "Hour" }, "runsOn" : { "ref": "MyEc2Resource" }, "commandOptions": ["EMPTYASNULL", "IGNOREBLANKLINES"] }

以下示例管道定义说明了一个使用 APPEND 插入模式的活动:

{ "objects": [ { "id": "CSVId1", "name": "DefaultCSV1", "type": "CSV" }, { "id": "RedshiftDatabaseId1", "databaseName": "dbname", "username": "user", "name": "DefaultRedshiftDatabase1", "*password": "password", "type": "RedshiftDatabase", "clusterId": "redshiftclusterId" }, { "id": "Default", "scheduleType": "timeseries", "failureAndRerunMode": "CASCADE", "name": "Default", "role": "DataPipelineDefaultRole", "resourceRole": "DataPipelineDefaultResourceRole" }, { "id": "RedshiftDataNodeId1", "schedule": { "ref": "ScheduleId1" }, "tableName": "orders", "name": "DefaultRedshiftDataNode1", "createTableSql": "create table StructuredLogs (requestBeginTime CHAR(30) PRIMARY KEY DISTKEY SORTKEY, requestEndTime CHAR(30), hostname CHAR(100), requestDate varchar(20));", "type": "RedshiftDataNode", "database": { "ref": "RedshiftDatabaseId1" } }, { "id": "Ec2ResourceId1", "schedule": { "ref": "ScheduleId1" }, "securityGroups": "MySecurityGroup", "name": "DefaultEc2Resource1", "role": "DataPipelineDefaultRole", "logUri": "s3://myLogs", "resourceRole": "DataPipelineDefaultResourceRole", "type": "Ec2Resource" }, { "id": "ScheduleId1", "startDateTime": "yyyy-mm-ddT00:00:00", "name": "DefaultSchedule1", "type": "Schedule", "period": "period", "endDateTime": "yyyy-mm-ddT00:00:00" }, { "id": "S3DataNodeId1", "schedule": { "ref": "ScheduleId1" }, "filePath": "s3://datapipeline-us-east-1/samples/hive-ads-samples.csv", "name": "DefaultS3DataNode1", "dataFormat": { "ref": "CSVId1" }, "type": "S3DataNode" }, { "id": "RedshiftCopyActivityId1", "input": { "ref": "S3DataNodeId1" }, "schedule": { "ref": "ScheduleId1" }, "insertMode": "APPEND", "name": "DefaultRedshiftCopyActivity1", "runsOn": { "ref": "Ec2ResourceId1" }, "type": "RedshiftCopyActivity", "output": { "ref": "RedshiftDataNodeId1" } } ] }

APPEND 操作向表中添加项,无论主键或排序键如何。例如,如果您有以下表,则可追加具有相同的 ID 和用户值的记录。

ID(PK) USER 1 aaa 2 bbb

您可以追加具有相同的 ID 和用户值的记录:

ID(PK) USER 1 aaa 2 bbb 1 aaa
注意

如果 APPEND 操作中断并重试,生成的重新运行管道可能会从开始位置追加。这可能会导致进一步复制,因此,您应了解此行为,尤其是当您有任何计算行数的逻辑时。

有关教程,请参阅 使用 AWS Data Pipeline 复制数据到 Amazon Redshift

语法

必填字段 描述 槽类型
insertMode

确定 AWS Data Pipeline 如何处理目标表中与要加载的数据中的行重叠的预先存在的数据。

有效值包括:KEEP_EXISTINGOVERWRITE_EXISTINGTRUNCATEAPPEND

KEEP_EXISTING 添加新行到表中,同时保留任何现有的行不变。

KEEP_EXISTING OVERWRITE_EXISTING 使用主键、排序键和分配键来识别哪些传入行与现有行匹配。请参阅 Amazon Redshift 数据库开发人员指南中的更新和插入新数据

TRUNCATE 先删除目标表中的所有数据,然后写入新数据。

APPEND 会将所有记录添加到 Redshift 表的结尾。APPEND 不需要主键、分配键或排序键,因此会附加可能存在重复的项。

枚举

对象调用字段 描述 槽类型
schedule

该对象在计划间隔的执行中调用。

指定对另一个对象的计划引用,以便设置该对象的依赖项执行顺序。

在大多数情况下,我们建议将计划引用放在默认管道对象上,以便所有对象继承该计划。例如,您可以通过指定 "schedule": {"ref": "DefaultSchedule"},明确地针对该对象设置计划。

如果您的管道中的主计划包含嵌套计划,则可以创建具有计划引用的父对象。

有关示例可选计划配置的更多信息,请参阅计划

引用对象,例如: "schedule":{"ref":"myScheduleId"}

所需的组 (下列选项之一是必需的) 描述 槽类型
runsOn 运行活动或命令的计算资源。例如,Amazon EC2 实例或 Amazon EMR 集群。 参考对象,例如 “runson”:{“ref”:” myResourceId “}
workerGroup 工作线程组。这可用于路由任务。如果您提供 runsOn 值并且存在 workerGroup,则将忽略 workerGroup。 String

可选字段 描述 槽类型
attemptStatus 来自远程活动的最近报告的状态。 String
attemptTimeout 远程工作完成的超时时间。如果设置此字段,则可能会重试未在设定的开始时间内完成的远程活动。 周期
commandOptions

获取在 COPY 操作期间传递到 Amazon Redshift 数据节点的参数。有关更多信息,请参阅 Amazon Redshift 数据库开发人员指南中的 COPY

在加载表时,COPY 会尝试将字符串隐式转换为目标列的数据类型。如果您收到错误或有其他转换需求,则除了自动发生的默认数据转换之外,您还可以指定其他转换参数。有关信息,请参阅 Amazon Redshift 数据库开发人员指南中的数据转换参数

如果数据格式与输入或输出数据节点关联,则忽略提供的参数。

由于复制操作首先使用 COPY 将数据插入暂存表,然后使用 INSERT 命令将数据从暂存表复制到目标表中,一些 COPY 参数不适用,例如 COPY 命令启用表上自动压缩的功能。如果需要压缩,请向 CREATE TABLE 语句添加列编码详细信息。

此外,在某些需要从 Amazon Redshift 集群卸载数据和在 Amazon S3 中创建文件的情况下,RedshiftCopyActivity 依赖 Amazon Redshift 的 UNLOAD 操作。

为提高复制和卸载过程中的性能,请从 UNLOAD 命令指定 PARALLEL OFF 参数。有关更多信息,请参阅 Amazon Redshift 数据库开发人员指南中的 UNLOAD

String
dependsOn 指定与另一个可运行对象的依赖关系。 引用对象:"dependsOn":{"ref":"myActivityId"}
failureAndRerun模式 描述依赖项失败或重新运行时的使用者节点行为。 枚举
input 输入数据节点。数据源可以是 Amazon S3、DynamoDB 或 Amazon Redshift。 引用对象: "input":{"ref":"myDataNodeId"}
lateAfterTimeout 管道启动后经过的时间,在此时间内,对象必须完成。仅当计划类型未设置为 ondemand 时才会触发。 周期
maxActiveInstances 组件的并发活动实例的最大数量。重新运行不计入活动实例数中。 整数
maximumRetries 失败后的最大重试次数 整数
onFail 当前对象失败时要运行的操作。 引用对象:"onFail":{"ref":"myActionId"}
onLateAction 在尚未计划对象或对象仍未完成的情况下将触发的操作。 引用对象: "onLateAction":{"ref":"myActionId"}
onSuccess 当前对象成功时要运行的操作。 引用对象: "onSuccess":{"ref":"myActionId"}
output 输出数据节点。输出位置可以是 Amazon S3 或 Amazon Redshift。 引用对象: "output":{"ref":"myDataNodeId"}
parent 槽将继承自的当前对象的父级。 引用对象:"parent":{"ref":"myBaseObjectId"}
pipelineLogUri 用于上传管道日志的 S3 URI(例如 's3: BucketName ///Key/ ')。 String
precondition (可选) 定义先决条件。在满足所有先决条件之前,数据节点不会标记为“READY”。 引用对象:"precondition":{"ref":"myPreconditionId"}
队列

对应于 Amazon Redshift 中的 query_group 设置,这允许您根据放置在队列中的位置分配并优先处理并发活动。

Amazon Redshift 将同时连接的数量限制为 15。有关更多信息,请参阅 Amazon RDS 数据库开发人员指南中的向队列分配查询

String
reportProgressTimeout

远程工作对 reportProgress 的连续调用的超时时间。

如果设置此字段,则未报告指定时段的进度的远程活动可能会被视为停滞且已重试。

周期
retryDelay 两次重试之间的超时时间。 周期
scheduleType

允许您指定是否计划管道中的对象。值包括:cronondemandtimeseries

timeseries 计划表示在每个间隔结束时计划实例。

Cron 计划表示在每个间隔开始时计划实例。

ondemand 计划让您可以在每次激活时运行一次管道。这意味着,您不需要克隆或重新创建管道以再次运行它。

要使用 ondemand 管道,请为每个后续运行调用 ActivatePipeline 操作。

如果您使用 ondemand 计划,您必须在默认对象中指定它,并且该计划必须是在管道中为对象指定的唯一 scheduleType

枚举
transformSql

用于转换输入数据的 SQL SELECT 表达式。

在名为 staging 的表上运行 transformSql 表达式。

当您从 DynamoDB 或 Amazon S3 复制数据时, AWS Data Pipeline 会创建一个名为“staging”的表,并且最初在该表中加载数据。此表中的数据用于更新目标表。

transformSql 的输出架构必须与最终目标表的架构匹配。

如果您指定了 transformSql 选项,则会从指定的 SQL 语句创建第二个暂存表。然后,来自这第二个暂存表的数据更新到最终的目标表中。

String

运行时字段 描述 槽类型
@activeInstances 当前计划的有效实例对象的列表。 引用对象:"activeInstances":{"ref":"myRunnableObjectId"}
@actualEndTime 该对象的执行完成时间。 DateTime
@actualStartTime 该对象的执行开始时间。 DateTime
cancellationReason 该对象被取消时显示的 cancellationReason。 String
@cascadeFailedOn 对象在其上失败的依赖项链的描述。 引用对象: "cascadeFailedOn":{"ref":"myRunnableObjectId"}
emrStepLog 仅在尝试 EMR 活动时可用的 EMR 步骤日志 String
errorId 该对象失败时显示的 errorId。 String
errorMessage 该对象失败时显示的 errorMessage。 String
errorStackTrace 该对象失败时显示的错误堆栈跟踪。 String
@finishedTime 该对象完成其执行的时间。 DateTime
hadoopJobLog 在尝试基于 EMR 的活动时可用的 Hadoop 任务日志。 String
@healthStatus 对象的运行状况,反映进入终止状态的上个对象实例成功还是失败。 String
@healthStatusFromInstanceId 进入终止状态的上个实例对象的 ID。 String
@ T healthStatusUpdated ime 上次更新运行状况的时间。 DateTime
hostname 已执行任务尝试的客户端的主机名。 String
@lastDeactivatedTime 上次停用该对象的时间。 DateTime
@ T latestCompletedRun ime 已完成执行的最新运行的时间。 DateTime
@latestRunTime 已计划执行的最新运行的时间。 DateTime
@nextRunTime 计划下次运行的时间。 DateTime
reportProgressTime 远程活动报告进度的最近时间。 DateTime
@scheduledEndTime 对象的计划结束时间。 DateTime
@scheduledStartTime 对象的计划开始时间。 DateTime
@status 该对象的状态。 String
@version 用来创建对象的管道版本。 String
@waitingOn 该对象在其上处于等待状态的依赖项列表的描述。 引用对象: "waitingOn":{"ref":"myRunnableObjectId"}

系统字段 描述 槽类型
@error 用于描述格式不正确的对象的错误消息。 String
@pipelineId 该对象所属的管道的 ID。 String
@sphere 对象的球体。表示对象在生命周期中的位置。例如,组件对象产生实例对象,后者执行尝试对象。 String