选择您的 Cookie 首选项

我们使用必要 Cookie 和类似工具提供我们的网站和服务。我们使用性能 Cookie 收集匿名统计数据,以便我们可以了解客户如何使用我们的网站并进行改进。必要 Cookie 无法停用,但您可以单击“自定义”或“拒绝”来拒绝性能 Cookie。

如果您同意,AWS 和经批准的第三方还将使用 Cookie 提供有用的网站功能、记住您的首选项并显示相关内容,包括相关广告。要接受或拒绝所有非必要 Cookie,请单击“接受”或“拒绝”。要做出更详细的选择,请单击“自定义”。

将数据和表与管道活动一起暂存 - AWS Data Pipeline

AWS Data Pipeline 不再向新客户提供。的现有客户 AWS Data Pipeline 可以继续照常使用该服务。了解更多

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

AWS Data Pipeline 不再向新客户提供。的现有客户 AWS Data Pipeline 可以继续照常使用该服务。了解更多

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

将数据和表与管道活动一起暂存

AWS Data Pipeline 可以暂存您的管道中的输入和输出数据,以便更容易使用特定活动,例如 ShellCommandActivityHiveActivity

数据暂存让您能够将数据从输入数据节点复制到执行活动的资源,从资源到输出数据节点与此类似。

通过在活动的 shell 命令或 Hive 脚本中使用特殊变量,可使用 Amazon EMR 或 Amazon EC2 资源上的暂存数据。

表暂存类似于数据暂存,具体而言,不同之处在于暂存的数据采用数据库表的形式。

AWS Data Pipeline 支持以下暂存场景:

  • 使用 ShellCommandActivity 的数据暂存

  • 使用 Hive 的表暂存和支持暂存的数据节点

  • 使用 Hive 的表暂存和不支持暂存的数据节点

注意

仅当活动上的 stage 字段设置为 true 时暂存才生效,例如 ShellCommandActivity。有关更多信息,请参阅ShellCommandActivity

此外,数据节点和活动可以通过四种方式关联:

在资源上本地暂存数据

输入数据自动复制到资源本地文件系统。输出数据自动从资源本地文件系统复制到输出数据节点。例如,当您使用 staging = true 配置 ShellCommandActivity 输入和输出时,输入数据作为 INPUTx_STAGING_DIR 可用,输出数据作为 OUTPUTx_STAGING_DIR 可用,其中 x 是输入或输出的编号。

暂存活动的输入和输出定义

输入数据格式 (列名和表名) 自动复制到活动的资源中。例如,当您使用 staging = true 配置 HiveActivity 时。在输入 S3DataNode 上指定的数据格式用于从 Hive 表暂存表定义。

暂存未启用

输入和输出对象及其字段可用于活动,但数据本身不行。例如,EmrActivity 默认情况下或在您使用 staging = false 配置其他活动时。在此配置中,数据字段可供活动使用 AWS Data Pipeline 表达式语法引用它们,这仅在满足依赖关系时发生。这仅用作依赖关系检查。活动中的代码负责将数据从输入复制到运行活动的资源。

对象之间的依赖关系

两个对象之间存在依赖关系,这会导致类似于未启用暂存的情况。这导致数据节点或活动用作执行另一个活动的先决条件。

使用 ShellCommandActivity 的数据暂存

请考虑使用 ShellCommandActivityS3DataNode 对象作为数据输入和输出的场景。AWS Data Pipeline 自动暂存数据节点,使其可通过 shell 命令使用环境变量 ${INPUT1_STAGING_DIR}${OUTPUT1_STAGING_DIR} 访问,就像在本地文件夹中一样,如下例中所示。名为 INPUT1_STAGING_DIROUTPUT1_STAGING_DIR 的变量的数字部分根据您的活动引用的数据节点数递增。

注意

只有在您的输入和输出为 S3DataNode 对象时,此场景才按所述工作。此外,只有当 directoryPath 设置在输出 S3DataNode 对象上时,才允许输出数据暂存。

{ "id": "AggregateFiles", "type": "ShellCommandActivity", "stage": "true", "command": "cat ${INPUT1_STAGING_DIR}/part* > ${OUTPUT1_STAGING_DIR}/aggregated.csv", "input": { "ref": "MyInputData" }, "output": { "ref": "MyOutputData" } }, { "id": "MyInputData", "type": "S3DataNode", "schedule": { "ref": "MySchedule" }, "filePath": "s3://my_bucket/source/#{format(@scheduledStartTime,'YYYY-MM-dd_HHmmss')}/items" } }, { "id": "MyOutputData", "type": "S3DataNode", "schedule": { "ref": "MySchedule" }, "directoryPath": "s3://my_bucket/destination/#{format(@scheduledStartTime,'YYYY-MM-dd_HHmmss')}" } }, ...

使用 Hive 的表暂存和支持暂存的数据节点

请考虑使用 HiveActivityS3DataNode 对象作为数据输入和输出的场景。AWS Data Pipeline 自动暂存数据节点,使其可通过 Hive 脚本使用变量 ${input1}${output1} 访问,就像它们是 Hive 表一样,如下例 HiveActivity 中所示。名为 inputoutput 的变量的数字部分根据您的活动引用的数据节点数递增。

注意

只有在您的输入和输出为 S3DataNodeMySqlDataNode 对象时,此场景才按所述方式工作。DynamoDBDataNode 不支持表暂存。

{ "id": "MyHiveActivity", "type": "HiveActivity", "schedule": { "ref": "MySchedule" }, "runsOn": { "ref": "MyEmrResource" }, "input": { "ref": "MyInputData" }, "output": { "ref": "MyOutputData" }, "hiveScript": "INSERT OVERWRITE TABLE ${output1} select * from ${input1};" }, { "id": "MyInputData", "type": "S3DataNode", "schedule": { "ref": "MySchedule" }, "directoryPath": "s3://test-hive/input" } }, { "id": "MyOutputData", "type": "S3DataNode", "schedule": { "ref": "MySchedule" }, "directoryPath": "s3://test-hive/output" } }, ...

使用 Hive 的表暂存和不支持暂存的数据节点

请考虑使用 HiveActivityDynamoDBDataNode 作为数据输入并将 S3DataNode 对象作为输出的场景。没有数据暂存可用于 DynamoDBDataNode,因此您必须先手动在 hive 脚本中创建表,使用变量名 #{input.tableName} 引用 DynamoDB 表。如果 DynamoDB 表是输出,类似术语也适用,除非您使用变量 #{output.tableName}。在本示例中,暂存可用于输出 S3DataNode 对象,因此您可以将输出数据节点作为 ${output1} 引用。

注意

在本示例中,表名变量具有 #(井号)字符前缀,因为 AWS Data Pipeline 使用表达式访问 tableNamedirectoryPath。有关表达式求值在 AWS Data Pipeline 中工作方式的详细信息,请参阅 表达式计算

{ "id": "MyHiveActivity", "type": "HiveActivity", "schedule": { "ref": "MySchedule" }, "runsOn": { "ref": "MyEmrResource" }, "input": { "ref": "MyDynamoData" }, "output": { "ref": "MyS3Data" }, "hiveScript": "-- Map DynamoDB Table SET dynamodb.endpoint=dynamodb.us-east-1.amazonaws.com; SET dynamodb.throughput.read.percent = 0.5; CREATE EXTERNAL TABLE dynamodb_table (item map<string,string>) STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' TBLPROPERTIES ("dynamodb.table.name" = "#{input.tableName}"); INSERT OVERWRITE TABLE ${output1} SELECT * FROM dynamodb_table;" }, { "id": "MyDynamoData", "type": "DynamoDBDataNode", "schedule": { "ref": "MySchedule" }, "tableName": "MyDDBTable" }, { "id": "MyS3Data", "type": "S3DataNode", "schedule": { "ref": "MySchedule" }, "directoryPath": "s3://test-hive/output" } }, ...
隐私网站条款Cookie 首选项
© 2025, Amazon Web Services, Inc. 或其附属公司。保留所有权利。