将数据和表与管道活动一起暂存 - AWS Data Pipeline

将数据和表与管道活动一起暂存

AWS Data Pipeline 可以暂存您的管道中的输入和输出数据,以便更容易使用特定活动,例如 ShellCommandActivityHiveActivity

数据暂存让您能够将数据从输入数据节点复制到执行活动的资源,从资源到输出数据节点与此类似。

通过在活动的 shell 命令或 Hive 脚本中使用特殊变量,可使用 Amazon EMR 或 Amazon EC2 资源上的暂存数据。

表暂存类似于数据暂存,具体而言,不同之处在于暂存的数据采用数据库表的形式。

AWS Data Pipeline 支持以下暂存场景:

  • 使用 ShellCommandActivity 的数据暂存

  • 使用 Hive 的表暂存和支持暂存的数据节点

  • 使用 Hive 的表暂存和不支持暂存的数据节点

注意

仅当活动上的 stage 字段设置为 true 时暂存才生效,例如 ShellCommandActivity。有关更多信息,请参阅ShellCommandActivity

此外,数据节点和活动可以通过四种方式关联:

在资源上本地暂存数据

输入数据自动复制到资源本地文件系统。输出数据自动从资源本地文件系统复制到输出数据节点。例如,当您使用 staging = true 配置 ShellCommandActivity 输入和输出时,输入数据作为 INPUTx_STAGING_DIR 可用,输出数据作为 OUTPUTx_STAGING_DIR 可用,其中 x 是输入或输出的编号。

暂存活动的输入和输出定义

输入数据格式 (列名和表名) 自动复制到活动的资源中。例如,当您使用 staging = true 配置 HiveActivity 时。在输入 S3DataNode 上指定的数据格式用于从 Hive 表暂存表定义。

暂存未启用

输入和输出对象及其字段可用于活动,但数据本身不行。例如,EmrActivity 默认情况下或在您使用 staging = false 配置其他活动时。在此配置中,数据字段可供活动使用 AWS Data Pipeline 表达式语法引用它们,这仅在满足依赖关系时发生。这仅用作依赖关系检查。活动中的代码负责将数据从输入复制到运行活动的资源。

对象之间的依赖关系

两个对象之间存在依赖关系,这会导致类似于未启用暂存的情况。这导致数据节点或活动用作执行另一个活动的先决条件。

使用 ShellCommandActivity 的数据暂存

请考虑使用 ShellCommandActivityS3DataNode 对象作为数据输入和输出的场景。AWS Data Pipeline 自动暂存数据节点,使其可通过 shell 命令使用环境变量 ${INPUT1_STAGING_DIR}${OUTPUT1_STAGING_DIR} 访问,就像在本地文件夹中一样,如下例中所示。名为 INPUT1_STAGING_DIROUTPUT1_STAGING_DIR 的变量的数字部分根据您的活动引用的数据节点数递增。

注意

只有在您的输入和输出为 S3DataNode 对象时,此场景才按所述工作。此外,只有当 directoryPath 设置在输出 S3DataNode 对象上时,才允许输出数据暂存。

{ "id": "AggregateFiles", "type": "ShellCommandActivity", "stage": "true", "command": "cat ${INPUT1_STAGING_DIR}/part* > ${OUTPUT1_STAGING_DIR}/aggregated.csv", "input": { "ref": "MyInputData" }, "output": { "ref": "MyOutputData" } }, { "id": "MyInputData", "type": "S3DataNode", "schedule": { "ref": "MySchedule" }, "filePath": "s3://my_bucket/source/#{format(@scheduledStartTime,'YYYY-MM-dd_HHmmss')}/items" } }, { "id": "MyOutputData", "type": "S3DataNode", "schedule": { "ref": "MySchedule" }, "directoryPath": "s3://my_bucket/destination/#{format(@scheduledStartTime,'YYYY-MM-dd_HHmmss')}" } }, ...

使用 Hive 的表暂存和支持暂存的数据节点

请考虑使用 HiveActivityS3DataNode 对象作为数据输入和输出的场景。AWS Data Pipeline 自动暂存数据节点,使其可通过 Hive 脚本使用变量 ${input1}${output1} 访问,就像它们是 Hive 表一样,如下例 HiveActivity 中所示。名为 inputoutput 的变量的数字部分根据您的活动引用的数据节点数递增。

注意

只有在您的输入和输出为 S3DataNodeMySqlDataNode 对象时,此场景才按所述方式工作。DynamoDBDataNode 不支持表暂存。

{ "id": "MyHiveActivity", "type": "HiveActivity", "schedule": { "ref": "MySchedule" }, "runsOn": { "ref": "MyEmrResource" }, "input": { "ref": "MyInputData" }, "output": { "ref": "MyOutputData" }, "hiveScript": "INSERT OVERWRITE TABLE ${output1} select * from ${input1};" }, { "id": "MyInputData", "type": "S3DataNode", "schedule": { "ref": "MySchedule" }, "directoryPath": "s3://test-hive/input" } }, { "id": "MyOutputData", "type": "S3DataNode", "schedule": { "ref": "MySchedule" }, "directoryPath": "s3://test-hive/output" } }, ...

使用 Hive 的表暂存和不支持暂存的数据节点

请考虑使用 HiveActivityDynamoDBDataNode 作为数据输入并将 S3DataNode 对象作为输出的场景。没有数据暂存可用于 DynamoDBDataNode,因此您必须先手动在 hive 脚本中创建表,使用变量名 #{input.tableName} 引用 DynamoDB 表。如果 DynamoDB 表是输出,类似术语也适用,除非您使用变量 #{output.tableName}。在本示例中,暂存可用于输出 S3DataNode 对象,因此您可以将输出数据节点作为 ${output1} 引用。

注意

在本示例中,表名变量具有 #(井号)字符前缀,因为 AWS Data Pipeline 使用表达式访问 tableNamedirectoryPath。有关表达式求值在 AWS Data Pipeline 中工作方式的详细信息,请参阅 表达式计算

{ "id": "MyHiveActivity", "type": "HiveActivity", "schedule": { "ref": "MySchedule" }, "runsOn": { "ref": "MyEmrResource" }, "input": { "ref": "MyDynamoData" }, "output": { "ref": "MyS3Data" }, "hiveScript": "-- Map DynamoDB Table SET dynamodb.endpoint=dynamodb.us-east-1.amazonaws.com; SET dynamodb.throughput.read.percent = 0.5; CREATE EXTERNAL TABLE dynamodb_table (item map<string,string>) STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' TBLPROPERTIES ("dynamodb.table.name" = "#{input.tableName}"); INSERT OVERWRITE TABLE ${output1} SELECT * FROM dynamodb_table;" }, { "id": "MyDynamoData", "type": "DynamoDBDataNode", "schedule": { "ref": "MySchedule" }, "tableName": "MyDDBTable" }, { "id": "MyS3Data", "type": "S3DataNode", "schedule": { "ref": "MySchedule" }, "directoryPath": "s3://test-hive/output" } }, ...