AWS Data Pipeline 不再向新客户提供。的现有客户 AWS Data Pipeline 可以继续照常使用该服务。了解更多
本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
将数据和表与管道活动一起暂存
AWS Data Pipeline 可以暂存您的管道中的输入和输出数据,以便更容易使用特定活动,例如 ShellCommandActivity
和 HiveActivity
。
数据暂存让您能够将数据从输入数据节点复制到执行活动的资源,从资源到输出数据节点与此类似。
通过在活动的 shell 命令或 Hive 脚本中使用特殊变量,可使用 Amazon EMR 或 Amazon EC2 资源上的暂存数据。
表暂存类似于数据暂存,具体而言,不同之处在于暂存的数据采用数据库表的形式。
AWS Data Pipeline 支持以下暂存场景:
-
使用
ShellCommandActivity
的数据暂存 -
使用 Hive 的表暂存和支持暂存的数据节点
-
使用 Hive 的表暂存和不支持暂存的数据节点
注意
仅当活动上的 stage
字段设置为 true
时暂存才生效,例如 ShellCommandActivity
。有关更多信息,请参阅ShellCommandActivity。
此外,数据节点和活动可以通过四种方式关联:
- 在资源上本地暂存数据
-
输入数据自动复制到资源本地文件系统。输出数据自动从资源本地文件系统复制到输出数据节点。例如,当您使用 staging = true 配置
ShellCommandActivity
输入和输出时,输入数据作为 INPUTx_STAGING_DIR 可用,输出数据作为 OUTPUTx_STAGING_DIR 可用,其中 x 是输入或输出的编号。 - 暂存活动的输入和输出定义
-
输入数据格式 (列名和表名) 自动复制到活动的资源中。例如,当您使用 staging = true 配置
HiveActivity
时。在输入S3DataNode
上指定的数据格式用于从 Hive 表暂存表定义。 - 暂存未启用
-
输入和输出对象及其字段可用于活动,但数据本身不行。例如,
EmrActivity
默认情况下或在您使用 staging = false 配置其他活动时。在此配置中,数据字段可供活动使用 AWS Data Pipeline 表达式语法引用它们,这仅在满足依赖关系时发生。这仅用作依赖关系检查。活动中的代码负责将数据从输入复制到运行活动的资源。 - 对象之间的依赖关系
-
两个对象之间存在依赖关系,这会导致类似于未启用暂存的情况。这导致数据节点或活动用作执行另一个活动的先决条件。
使用 ShellCommandActivity 的数据暂存
请考虑使用 ShellCommandActivity
和 S3DataNode
对象作为数据输入和输出的场景。AWS Data Pipeline 自动暂存数据节点,使其可通过 shell 命令使用环境变量 ${INPUT1_STAGING_DIR}
和 ${OUTPUT1_STAGING_DIR}
访问,就像在本地文件夹中一样,如下例中所示。名为 INPUT1_STAGING_DIR
和 OUTPUT1_STAGING_DIR
的变量的数字部分根据您的活动引用的数据节点数递增。
注意
只有在您的输入和输出为 S3DataNode
对象时,此场景才按所述工作。此外,只有当 directoryPath
设置在输出 S3DataNode
对象上时,才允许输出数据暂存。
{ "id": "AggregateFiles", "type": "ShellCommandActivity", "stage": "true", "command": "cat ${INPUT1_STAGING_DIR}/part* > ${OUTPUT1_STAGING_DIR}/aggregated.csv", "input": { "ref": "MyInputData" }, "output": { "ref": "MyOutputData" } }, { "id": "MyInputData", "type": "S3DataNode", "schedule": { "ref": "MySchedule" }, "filePath": "s3://my_bucket/source/#{format(@scheduledStartTime,'YYYY-MM-dd_HHmmss')}/items" } }, { "id": "MyOutputData", "type": "S3DataNode", "schedule": { "ref": "MySchedule" }, "directoryPath": "s3://my_bucket/destination/#{format(@scheduledStartTime,'YYYY-MM-dd_HHmmss')}" } }, ...
使用 Hive 的表暂存和支持暂存的数据节点
请考虑使用 HiveActivity
和 S3DataNode
对象作为数据输入和输出的场景。AWS Data Pipeline 自动暂存数据节点,使其可通过 Hive 脚本使用变量 ${input1}
和 ${output1}
访问,就像它们是 Hive 表一样,如下例 HiveActivity
中所示。名为 input
和 output
的变量的数字部分根据您的活动引用的数据节点数递增。
注意
只有在您的输入和输出为 S3DataNode
或 MySqlDataNode
对象时,此场景才按所述方式工作。DynamoDBDataNode
不支持表暂存。
{ "id": "MyHiveActivity", "type": "HiveActivity", "schedule": { "ref": "MySchedule" }, "runsOn": { "ref": "MyEmrResource" }, "input": { "ref": "MyInputData" }, "output": { "ref": "MyOutputData" }, "hiveScript": "INSERT OVERWRITE TABLE ${output1} select * from ${input1};" }, { "id": "MyInputData", "type": "S3DataNode", "schedule": { "ref": "MySchedule" }, "directoryPath": "s3://test-hive/input" } }, { "id": "MyOutputData", "type": "S3DataNode", "schedule": { "ref": "MySchedule" }, "directoryPath": "s3://test-hive/output" } }, ...
使用 Hive 的表暂存和不支持暂存的数据节点
请考虑使用 HiveActivity
和 DynamoDBDataNode
作为数据输入并将 S3DataNode
对象作为输出的场景。没有数据暂存可用于 DynamoDBDataNode
,因此您必须先手动在 hive 脚本中创建表,使用变量名 #{input.tableName}
引用 DynamoDB 表。如果 DynamoDB 表是输出,类似术语也适用,除非您使用变量 #{output.tableName}
。在本示例中,暂存可用于输出 S3DataNode
对象,因此您可以将输出数据节点作为 ${output1}
引用。
注意
在本示例中,表名变量具有 #(井号)字符前缀,因为 AWS Data Pipeline 使用表达式访问 tableName
或 directoryPath
。有关表达式求值在 AWS Data Pipeline 中工作方式的详细信息,请参阅 表达式计算。
{ "id": "MyHiveActivity", "type": "HiveActivity", "schedule": { "ref": "MySchedule" }, "runsOn": { "ref": "MyEmrResource" }, "input": { "ref": "MyDynamoData" }, "output": { "ref": "MyS3Data" }, "hiveScript": "-- Map DynamoDB Table SET dynamodb.endpoint=dynamodb.us-east-1.amazonaws.com; SET dynamodb.throughput.read.percent = 0.5; CREATE EXTERNAL TABLE dynamodb_table (item map<string,string>) STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' TBLPROPERTIES ("dynamodb.table.name" = "#{input.tableName}"); INSERT OVERWRITE TABLE ${output1} SELECT * FROM dynamodb_table;" }, { "id": "MyDynamoData", "type": "DynamoDBDataNode", "schedule": { "ref": "MySchedule" }, "tableName": "MyDDBTable" }, { "id": "MyS3Data", "type": "S3DataNode", "schedule": { "ref": "MySchedule" }, "directoryPath": "s3://test-hive/output" } }, ...