表达式 - AWS Data Pipeline

表达式

利用表达式,您可以跨相关对象共享值。表达式由 AWS Data Pipeline Web 服务在运行时处理,确保所有表达式都替代为表达式值。

表达式用“#{”和“}”分隔。您可以在字符串合法的任何管道定义对象中使用表达式。如果槽是一个引用,或者其类型为 ID、NAME、TYPE 或 SPHERE 之一,则不计算其值并且将按字面使用它。

以下表达式调用 AWS Data Pipeline 函数之一。有关更多信息,请参阅表达式计算

#{format(myDateTime,'YYYY-MM-dd hh:mm:ss')}

引用字段和对象

表达式既可以使用其所在的当前对象的字段,也可以使用通过引用链接的另一个对象的字段。

槽格式由创建时间后跟对象创建时间组成,例如 @S3BackupLocation_2018-01-31T11:05:33

您也可以引用管道定义中指定的确切的槽 ID,例如 Amazon S3 备份位置的槽 ID。要引用槽 ID,请使用 #{parent.@id}

在以下示例中,filePath 字段引用同一对象中的 id 字段来生成文件名。filePath 值的计算结果为“s3://mybucket/ExampleDataNode.csv”。

{ "id" : "ExampleDataNode", "type" : "S3DataNode", "schedule" : {"ref" : "ExampleSchedule"}, "filePath" : "s3://mybucket/#{parent.@id}.csv", "precondition" : {"ref" : "ExampleCondition"}, "onFail" : {"ref" : "FailureNotify"} }

要使用通过引用链接的另一个对象上存在的字段,请使用 node 关键字。此关键字仅适用于警报和先决条件对象。

继续前一个示例,SnsAlarm 中的表达式可引用 Schedule 中的日期和时间范围,因为 S3DataNode 将引用二者。

具体而言,FailureNotifymessage 字段可从 ExampleSchedule 中使用 @scheduledStartTime@scheduledEndTime 运行时字段,因为 ExampleDataNodeonFail 字段引用 FailureNotify,其 schedule 字段引用 ExampleSchedule

{ "id" : "FailureNotify", "type" : "SnsAlarm", "subject" : "Failed to run pipeline component", "message": "Error for interval #{node.@scheduledStartTime}..#{node.@scheduledEndTime}.", "topicArn":"arn:aws:sns:us-east-1:28619EXAMPLE:ExampleTopic" },
注意

您可以创建具有依赖项的管道,例如,管道中的任务,这些任务依赖于其他系统或任务的工作。如果管道需要某些资源,请使用与数据节点和任务关联的先决条件将这些依赖项添加到管道。这可使管道更易于调试且更具弹性。此外,由于难以跨管道排查问题,因此,请将依赖项保留在单个管道中 (如果可能)。

嵌套表达式

AWS Data Pipeline 使您能够嵌入值来创建更复杂的表达式。例如,要执行时间计算 (从 scheduledStartTime 中减去 30 分钟) 并设置结果的格式以在管道定义中使用,您可以在活动中使用以下表达式:

#{format(minusMinutes(@scheduledStartTime,30),'YYYY-MM-dd hh:mm:ss')}

并使用 node 前缀 (如果表达式是 SnsAlarm 或 Precondition 的一部分):

#{format(minusMinutes(node.@scheduledStartTime,30),'YYYY-MM-dd hh:mm:ss')}

列表

可基于列表和列表中的函数计算表达式。例如,假定按如下所示定义列表:"myList":["one","two"]。如果在表达式 #{'this is ' + myList} 中使用此列表,则该表达式的计算结果将为 ["this is one", "this is two"]。如果您有两个列表,Data Pipeline 最终将在计算中平展列表。例如,如果 myList1 定义为 [1,2]myList2 定义为 [3,4],则表达式 [#{myList1}, #{myList2}] 的计算结果将为 [1,2,3,4]

节点表达式

对于对管道组件的父对象的向后引用,AWS Data Pipeline 在 SnsAlarmPreCondition 中使用 #{node.*} 表达式。由于 SnsAlarmPreCondition 引用自一个活动或资源且未从其向后引用,因此,node 提供了对引用站点的引用方式。例如,以下管道定义演示故障通知如何使用 node 来引用其父级 (在此示例中,为 ShellCommandActivity),并在 SnsAlarm 消息中包含父级的计划的开始和结束时间。ShellCommandActivity 上的 scheduledStartTime 引用不需要 node 前缀,因为 scheduledStartTime 将引用自身。

注意

字段前面的 @ 符号指示这些字段是运行时字段。

{ "id" : "ShellOut", "type" : "ShellCommandActivity", "input" : {"ref" : "HourlyData"}, "command" : "/home/userName/xxx.sh #{@scheduledStartTime} #{@scheduledEndTime}", "schedule" : {"ref" : "HourlyPeriod"}, "stderr" : "/tmp/stderr:#{@scheduledStartTime}", "stdout" : "/tmp/stdout:#{@scheduledStartTime}", "onFail" : {"ref" : "FailureNotify"}, }, { "id" : "FailureNotify", "type" : "SnsAlarm", "subject" : "Failed to run pipeline component", "message": "Error for interval #{node.@scheduledStartTime}..#{node.@scheduledEndTime}.", "topicArn":"arn:aws:sns:us-east-1:28619EXAMPLE:ExampleTopic" },

AWS Data Pipeline 支持用户定义的字段而非运行时字段的可传递引用。可传递引用是两个管道组件之间的引用,该引用依靠另一个管道组件作为中介。以下示例显示了一个对可传递的用户定义的字段的引用和一个对不可传递的运行时字段的引用,二者均有效。有关更多信息,请参阅用户定义字段

{ "name": "DefaultActivity1", "type": "CopyActivity", "schedule": {"ref": "Once"}, "input": {"ref": "s3nodeOne"}, "onSuccess": {"ref": "action"}, "workerGroup": "test", "output": {"ref": "s3nodeTwo"} }, { "name": "action", "type": "SnsAlarm", "message": "S3 bucket '#{node.output.directoryPath}' succeeded at #{node.@actualEndTime}.", "subject": "Testing", "topicArn": "arn:aws:sns:us-east-1:28619EXAMPLE:ExampleTopic", "role": "DataPipelineDefaultRole" }

表达式计算

AWS Data Pipeline 提供了一组函数,可用于计算字段的值。以下示例使用 makeDate 函数将 Schedule 对象的 startDateTime 字段设置为 "2011-05-24T0:00:00" GMT/UTC。

"startDateTime" : "makeDate(2011,5,24)"