AWS Glue Scala DynamicFrame 类
程序包:com.amazonaws.services.glue
class DynamicFrame extends Serializable with Logging (
val glueContext : GlueContext,
_records : RDD[DynamicRecord],
val name : String = s"",
val transformationContext : String = DynamicFrame.UNDEFINED,
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0,
prevErrors : => Long = 0,
errorExpr : => Unit = {} )
DynamicFrame
是自描述的 DynamicRecord 对象的分布式集合。
DynamicFrame
旨在为 ETL(提取、转换和加载)操作提供灵活的数据模型。它们不需要创建架构,可用于读取和转换具有杂乱或不一致的值和类型的数据。可以按需为需要架构的那些操作计算架构。
DynamicFrame
为数据清理和 ETL 提供了一系列转换。它们还支持转换为 SparkSQL DataFrame 和从其转换以与现有代码和 DataFrame 提供的许多分析操作集成。
跨构造 DynamicFrame
的许多 AWS Glue 转换共享以下参数:
transformationContext
— 此DynamicFrame
的标识符。transformationContext
用作跨运行保存的作业书签状态的密钥。callSite
– 为错误报告提供上下文信息。在从 Python 调用时,会自动设置这些值。stageThreshold
– 在引发异常之前允许的来自此DynamicFrame
计算的最大错误记录数,不包括以前的DynamicFrame
中存在的记录。totalThreshold
– 引发异常之前的最大错误记录总数,包括以前的帧中的记录。
Val errorsCount
val errorsCount
此 DynamicFrame
中的错误记录数。这包括来自以前的操作的错误。
Def applyMapping
def applyMapping( mappings : Seq[Product4[String, String, String, String]],
caseSensitive : Boolean = true,
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
mappings
– 用于构造新DynamicFrame
的映射序列。caseSensitive
– 是否将源列视为区分大小写。在与不区分大小写的存储(如 AWS Glue 数据目录)集成时,将此项设置为 false 可能很有帮助。
基于一系列的映射选择、投影和转换列。
每个映射由源列和类型以及目标列和类型构成。映射可指定为四元组 (source_path
、source_type
、 target_path
、target_type
) 或包含相同信息的 MappingSpec 对象。
除了将映射用于简单的投影和转换外,还可以通过使用“.
”(句点)分隔路径的组件来用于嵌套或取消嵌套字段。
例如,假设您有一个包含以下架构的 DynamicFrame
。
{{{
root
|-- name: string
|-- age: int
|-- address: struct
| |-- state: string
| |-- zip: int
}}}
您可以进行以下调用来取消嵌套 state
和 zip
字段。
{{{
df.applyMapping(
Seq(("name", "string", "name", "string"),
("age", "int", "age", "int"),
("address.state", "string", "state", "string"),
("address.zip", "int", "zip", "int")))
}}}
生成的架构如下所示。
{{{
root
|-- name: string
|-- age: int
|-- state: string
|-- zip: int
}}}
您还可以使用 applyMapping
来重新嵌套列。例如,以下代码反转以前的转换,并在目标中创建一个名为 address
的结构。
{{{
df.applyMapping(
Seq(("name", "string", "name", "string"),
("age", "int", "age", "int"),
("state", "string", "address.state", "string"),
("zip", "int", "address.zip", "int")))
}}}
包含“.
”(句点)字符的字段名称可以使用反引号 (``
) 括起来。
注意
目前,您不能使用 applyMapping
方法映射嵌套在数组下的列。
Def assertErrorThreshold
def assertErrorThreshold : Unit
强制计算并验证错误记录数是否低于 stageThreshold
和 totalThreshold
的操作。如果任一条件失败,则引发异常。
Def count
lazy
def count
返回 DynamicFrame
中的元素数量。
Def dropField
def dropField( path : String,
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
返回已删除指定列的新 DynamicFrame
。
Def dropFields
def dropFields( fieldNames : Seq[String], // The column names to drop.
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
返回已删除指定列的新 DynamicFrame
。
您可以使用此方法删除嵌套列(包括数组中的列),但不能用于删除特定数组元素。
Def dropNulls
def dropNulls( transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0 )
返回已删除所有空列的新 DynamicFrame
。
注意
这只删除类型为 NullType
的列。不删除或修改其他列中的单个空值。
Def errorsAsDynamicFrame
def errorsAsDynamicFrame
返回包含此 DynamicFrame
中的错误记录的新 DynamicFrame
。
Def filter
def filter( f : DynamicRecord => Boolean,
errorMsg : String = "",
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided"),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
构造只包含函数“f
”为其返回 true
的那些记录的新 DynamicFrame
。筛选器函数“f
”不应转变输入记录。
Def getName
def getName : String
返回此 DynamicFrame
的名称。
Def getNumPartitions
def getNumPartitions
返回 DynamicFrame
中的分区数量。
Def getSchemaIfComputed
def getSchemaIfComputed : Option[Schema]
如果架构已经过计算,则返回该架构。如果架构尚未经过计算,则不扫描数据。
Def isSchemaComputed
def isSchemaComputed : Boolean
如果已为此 DynamicFrame
计算架构,则返回 true
;否则返回 false
。如果此方法返回 false,则调用 schema
方法将需要再次扫描此 DynamicFrame
中的记录。
Def javaToPython
def javaToPython : JavaRDD[Array[Byte]]
Def join
def join( keys1 : Seq[String],
keys2 : Seq[String],
frame2 : DynamicFrame,
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
keys1
– 此DynamicFrame
中用于联接的列。keys2
–frame2
中用于联接的列。必须与keys1
的长度相同。frame2
– 要联接的DynamicFrame
。
返回使用指定的键对 frame2
执行 equijoin 的结果。
Def map
def map( f : DynamicRecord => DynamicRecord,
errorMsg : String = "",
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
返回通过对此 DynamicFrame
中的每个记录应用指定函数“f
”构造的新 DynamicFrame
。
此方法先复制每个记录,然后再应用指定函数,因此可以安全地转变记录。如果映射函数在给定记录上引发异常,则该记录将标记为错误,并且堆栈跟踪将另存为错误记录中的一个列。
Def mergeDynamicFrames
def mergeDynamicFrames( stageDynamicFrame: DynamicFrame, primaryKeys: Seq[String], transformationContext: String = "",
options: JsonOptions = JsonOptions.empty, callSite: CallSite = CallSite("Not provided"),
stageThreshold: Long = 0, totalThreshold: Long = 0): DynamicFrame
stageDynamicFrame
– 要合并的暂存DynamicFrame
。primaryKeys
– 要匹配源和暂存DynamicFrame
中的记录的主键字段列表。transformationContext
– 用于检索有关当前转换的元数据的唯一字符串(可选)。options
– 为此转换提供其他信息的 JSON 名称-值对的字符串。callSite
– 用于为错误报告提供上下文信息。stageThreshold
— 一个Long
。给定转换中处理需要排除的错误的数目。totalThreshold
— 一个Long
。此转换中处理需要排除的错误的总数。
基于指定主键的将此 DynamicFrame
与暂存 DynamicFrame
合并以标识记录。不会对重复记录(具有相同主键的记录)去除重复。如果暂存帧中没有匹配的记录,则从源中保留所有记录(包括重复记录)。如果暂存帧具有匹配的记录,则暂存帧中的记录将覆盖 AWS Glue 中的源中的记录。
在以下情况下,返回的 DynamicFrame
将包含记录 A:
如果
A
在源帧和暂存帧中都存在,则返回暂存帧中的A
。如果
A
在源表中,且A.primaryKeys
不在stagingDynamicFrame
中(这意味着,未在暂存表中更新A
)。
源帧和暂存帧不需要具有相同的架构。
val mergedFrame: DynamicFrame = srcFrame.mergeDynamicFrames(stageFrame, Seq("id1", "id2"))
Def printSchema
def printSchema : Unit
以人类可读的格式将此 DynamicFrame
的架构输出到 stdout
。
Def recomputeSchema
def recomputeSchema : Schema
强制重新计算架构。这需要扫描数据,但如果数据中不存在当前架构中的一些字段,则可能会“压缩”架构。
返回经过重新计算的架构。
Def relationalize
def relationalize( rootTableName : String,
stagingPath : String,
options : JsonOptions = JsonOptions.empty,
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided"),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : Seq[DynamicFrame]
rootTableName
– 在输出中用于基本DynamicFrame
的名称。通过透视数组创建的DynamicFrame
以此作为前缀开头。stagingPath
– 用于写入中间数据的 Amazon Simple Storage Service(Amazon S3)路径。options
– 关系化选项和配置。目前未使用。
展平所有嵌套的结构并将数组透视为单独的表。
您可以使用此操作准备深度嵌套的数据以提取到关系数据库中。使用与 Unnest 转换相同的方式展平嵌套结构。此外,将数组透视为单独的表,其中每个数组元素成为一行。例如,假设您有一个包含以下数据的 DynamicFrame
。
{"name": "Nancy", "age": 47, "friends": ["Fred", "Lakshmi"]}
{"name": "Stephanie", "age": 28, "friends": ["Yao", "Phil", "Alvin"]}
{"name": "Nathan", "age": 54, "friends": ["Nicolai", "Karen"]}
运行以下代码。
{{{ df.relationalize("people", "s3:/my_bucket/my_path", JsonOptions.empty) }}}
这会生成两个表。第一个表名为“people”,其中包含以下内容。
{{{ {"name": "Nancy", "age": 47, "friends": 1} {"name": "Stephanie", "age": 28, "friends": 2} {"name": "Nathan", "age": 54, "friends": 3) }}}
在这里,friends 数组已被替换为自动生成的联接键。创建了名为 people.friends
的单独表,其中包含以下内容。
{{{ {"id": 1, "index": 0, "val": "Fred"} {"id": 1, "index": 1, "val": "Lakshmi"} {"id": 2, "index": 0, "val": "Yao"} {"id": 2, "index": 1, "val": "Phil"} {"id": 2, "index": 2, "val": "Alvin"} {"id": 3, "index": 0, "val": "Nicolai"} {"id": 3, "index": 1, "val": "Karen"} }}}
在此表中,“id
”是标识数组元素来自哪个记录的联接键,“index
”是指在原始数组中的位置,“val
”是实际的数组条目。
relationalize
方法返回通过以递归方式对所有数组应用此过程创建的 DynamicFrame
序列。
注意
AWS Glue 库自动为新表生成联接键。为确保联接键在作业运行中是唯一的,您必须启用作业书签。
Def renameField
def renameField( oldName : String,
newName : String,
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
oldName
– 列的原始名称。newName
– 列的新名称。
返回新 DynamicFrame
,其中的指定字段进行了重命名。
您可以使用此方法重命名嵌套字段。例如,以下代码在 address 结构中将 state
重命名为 state_code
。
{{{ df.renameField("address.state", "address.state_code") }}}
Def repartition
def repartition( numPartitions : Int,
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
返回具有 numPartitions
分区的新 DynamicFrame
。
Def resolveChoice
def resolveChoice( specs : Seq[Product2[String, String]] = Seq.empty[ResolveSpec],
choiceOption : Option[ChoiceOption] = None,
database : Option[String] = None,
tableName : Option[String] = None,
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
choiceOption
– 应用于 specs 序列中未列出的所有ChoiceType
列的操作。database
– 与match_catalog
操作一起使用的数据目录数据库。tableName
– 与match_catalog
操作一起使用的数据目录表。
通过将一个或多个 ChoiceType
替换为更具体的类型来返回新 DynamicFrame
。
可通过两种方式使用 resolveChoice
。第一种是指定一系列特定列以及如何解析它们。这些指定为由 (列, 操作) 对组成的元组。
可能的操作如下:
cast:type
– 尝试将所有值转换为指定的类型。make_cols
– 将每个不同的类型转换为名为columnName_type
的列。make_struct
– 将列转换为具有每个不同类型的键的结构。project:type
– 仅保留指定类型的值。
resolveChoice
的另一种模式是为所有 ChoiceType
指定单个解析方法。这可以在执行前不知道 ChoiceType
的完整列表的情况下使用。除了上面列出的操作外,此模式还支持以下操作:
match_catalog
ChoiceType
– 尝试将每个 转换为指定目录表中的对应类型。
示例:
通过转换为 int 来解析 user.id
列,并使 address
字段仅保留结构。
{{{ df.resolveChoice(specs = Seq(("user.id", "cast:int"), ("address", "project:struct"))) }}}
通过将每个选择转换为单独的列来解析所有 ChoiceType
。
{{{ df.resolveChoice(choiceOption = Some(ChoiceOption("make_cols"))) }}}
通过转换为指定目录表中的类型来解析所有 ChoiceType
。
{{{ df.resolveChoice(choiceOption = Some(ChoiceOption("match_catalog")), database = Some("my_database"), tableName = Some("my_table")) }}}
Def schema
def schema : Schema
返回此 DynamicFrame
的架构。
保证返回的架构包含此 DynamicFrame
中的记录中存在的每个字段。但在少数情况下,它可能还包含其他字段。您可以使用 Unnest 方法基于此 DynamicFrame
中的记录来“压缩”架构。
Def selectField
def selectField( fieldName : String,
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
返回作为 DynamicFrame
的单个字段。
Def selectFields
def selectFields( paths : Seq[String],
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
paths
– 要选择的列名称序列。
返回包含指定列的新 DynamicFrame
。
注意
您只能使用 selectFields
方法选择顶级列。您可以使用 applyMapping 方法选择嵌套列。
Def show
def show( numRows : Int = 20 ) : Unit
numRows
– 要输出的行数。
以 JSON 格式输出此 DynamicFrame
中的行。
Def spigot
def spigot( path : String,
options : JsonOptions = new JsonOptions("{}"),
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided"),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
返回相同记录但写出一部分记录作为副作用的传递转换。
path
– 将输出写入的 Amazon S3 中的路径,格式为s3://bucket//path
。options
– 描述取样行为的可选的JsonOptions
映射。
返回包含与这一个相同的记录的 DynamicFrame
。
默认情况下,将 100 个任意记录写入通过 path
指定的位置。您可以使用 options
映射自定义此行为。有效键包括:
topk
– 指定写出的记录的总数。默认值为 100。prob
– 指定包含单个记录的概率(小数形式)。默认值为 1。
例如,以下调用将对数据集进行取样,方法是以 20% 的概率选择每个记录并在写入 200 个记录后停止。
{{{ df.spigot("s3://my_bucket/my_path", JsonOptions(Map("topk" -> 200, "prob" -> 0.2))) }}}
Def splitFields
def splitFields( paths : Seq[String],
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : Seq[DynamicFrame]
paths
— 包括在第一个DynamicFrame
中的路径。
返回两个 DynamicFrame
的序列。第一个 DynamicFrame
包含指定路径,第二个包含所有其他列。
示例
此示例采用从 AWS Glue 数据目录中的 legislators
数据库中的 persons
表创建的 DynamicFrame,并将 DynamicFrame 拆分为两个,其中指定的字段进入第一个 DynamicFrame,其余字段进入第二个 DynamicFrame。然后,示例将从结果中选择第一个 DynamicFrame。
val InputFrame = glueContext.getCatalogSource(database="legislators", tableName="persons", transformationContext="InputFrame").getDynamicFrame() val SplitField_collection = InputFrame.splitFields(paths=Seq("family_name", "name", "links.note", "links.url", "gender", "image", "identifiers.scheme", "identifiers.identifier", "other_names.lang", "other_names.note", "other_names.name"), transformationContext="SplitField_collection") val ResultFrame = SplitField_collection(0)
Def splitRows
def splitRows( paths : Seq[String],
values : Seq[Any],
operators : Seq[String],
transformationContext : String,
callSite : CallSite,
stageThreshold : Long,
totalThreshold : Long
) : Seq[DynamicFrame]
基于将列与常量比较的谓词来拆分行。
paths
– 用于比较的列。values
– 用于比较的常量值。operators
– 用于比较的运算符。
返回两个 DynamicFrame
的序列。第一个包含其谓词为 true 的行,第二个包含其谓词为 false 的行。
使用三个序列指定谓词:“paths
”包含(可能嵌套的)列名称,“values
”包含要与其进行比较的常量值,“operators
”包含用于比较的运算符。所有这三个序列必须长度相同:第 n
个运算符将用于将第 n
个列与第 n
个值进行比较。
每个运算符必须是以下运算符之一:“!=
”、“=
”、“<=
”、“<
”、“>=
”或“>
”。
例如,以下调用将拆分 DynamicFrame
,以便第一个输出帧将包含美国 65 岁以上的人员记录,第二个将包含所有其他记录。
{{{ df.splitRows(Seq("age", "address.country"), Seq(65, "USA"), Seq(">=", "=")) }}}
Def stageErrorsCount
def stageErrorsCount
返回在计算此 DynamicFrame
时创建的错误记录数。这不包括作为输入传递给此 DynamicFrame
的以前操作中的错误。
Def toDF
def toDF( specs : Seq[ResolveSpec] = Seq.empty[ResolveSpec] ) : DataFrame
将此 DynamicFrame
转换为具有相同架构和记录的 Apache Spark SQL DataFrame
。
注意
由于 DataFrame
不支持 ChoiceType
,因此此方法自动将 ChoiceType
列转换为 StructType
。有关更多信息和用于解析选择的选项,请参阅 resolveChoice。
Def unbox
def unbox( path : String,
format : String,
optionString : String = "{}",
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided"),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
path
– 要分析的列。必须是字符串或二进制。format
– 用于分析的格式。optionString
– 传递到格式的选项,如 CSV 分隔符。
根据指定的格式分析嵌入式字符串或二进制列。已分析的列嵌套在具有原始列名称的结构下。
例如,假设您具有包含一个嵌入式 JSON 列的 CSV 文件。
name, age, address Sally, 36, {"state": "NE", "city": "Omaha"} ...
在初始分析后,您将获取具有以下架构的 DynamicFrame
。
{{{ root |-- name: string |-- age: int |-- address: string }}}
您可以对地址列调用 unbox
来分析特定组件。
{{{ df.unbox("address", "json") }}}
这为我们提供了具有以下架构的 DynamicFrame
。
{{{ root |-- name: string |-- age: int |-- address: struct | |-- state: string | |-- city: string }}}
Def unnest
def unnest( transformationContext : String = "",
callSite : CallSite = CallSite("Not Provided"),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
返回平展了所有嵌套结构的新 DynamicFrame
。构造名称时使用“.
”(句点)字符。
例如,假设您有一个包含以下架构的 DynamicFrame
。
{{{ root |-- name: string |-- age: int |-- address: struct | |-- state: string | |-- city: string }}}
以下调用取消嵌套 address 结构。
{{{ df.unnest() }}}
生成的架构如下所示。
{{{ root |-- name: string |-- age: int |-- address.state: string |-- address.city: string }}}
此方法还取消嵌套数组中的嵌套结构。但由于历史原因,此类字段的名称前附加了括起来的数组的名称和“.val
”。
Def unnestDDBJson
unnestDDBJson(transformationContext : String = "",
callSite : CallSite = CallSite("Not Provided"),
stageThreshold : Long = 0,
totalThreshold : Long = 0): DynamicFrame
解除 DynamicFrame
中的嵌套列,具体位于 DynamoDB JSON 结构中,并返回一个新的非嵌套 DynamicFrame
。属于结构类型数组的列将不会被解除嵌套。请注意,这是一种特定类型的非嵌套转换,其行为与常规 unnest
转换不同,并且要求数据已存在于 DynamoDB JSON 结构中。有关更多信息,请参阅 DynamoDB JSON。
例如,使用 DynamoDB JSON 结构读取导出的架构可能如下所示:
root |-- Item: struct | |-- ColA: struct | | |-- S: string | |-- ColB: struct | | |-- S: string | |-- ColC: struct | | |-- N: string | |-- ColD: struct | | |-- L: array | | | |-- element: null
unnestDDBJson()
转换会将此转换为:
root |-- ColA: string |-- ColB: string |-- ColC: string |-- ColD: array | |-- element: null
以下代码示例演示了如何使用 AWS Glue DynamoDB 导出连接器、调用 DynamoDB JSON 解除嵌套命令,以及打印分区数量:
import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.DynamoDbDataSink import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object GlueApp { def main(sysArgs: Array[String]): Unit = { val glueContext = new GlueContext(SparkContext.getOrCreate()) val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) val dynamicFrame = glueContext.getSourceWithFormat( connectionType = "dynamodb", options = JsonOptions(Map( "dynamodb.export" -> "ddb", "dynamodb.tableArn" -> "<test_source>", "dynamodb.s3.bucket" -> "<bucket name>", "dynamodb.s3.prefix" -> "<bucket prefix>", "dynamodb.s3.bucketOwner" -> "<account_id of bucket>", )) ).getDynamicFrame() val unnested = dynamicFrame.unnestDDBJson() print(unnested.getNumPartitions()) Job.commit() } }
Def withFrameSchema
def withFrameSchema( getSchema : () => Schema ) : DynamicFrame
getSchema
– 返回要使用的架构的函数。指定为零参数函数来推迟可能昂贵的计算。
将此 DynamicFrame
的架构设置为指定值。这主要在内部使用以避免成本高昂的架构重新计算。传入的架构必须包含数据中存在的所有列。
Def withName
def withName( name : String ) : DynamicFrame
name
– 要使用的新名称。
返回使用新名称的此 DynamicFrame
的副本。
Def withTransformationContext
def withTransformationContext( ctx : String ) : DynamicFrame
返回具有指定转换上下文的此 DynamicFrame
的副本。