本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
DynamicFrame 类
Apache Spark 中的主要抽象之一是 SparkSQL DataFrame
,它类似于在 R 和 Pandas 中找到的 DataFrame
构造。DataFrame
类似于表格,支持函数风格(map/reduce/filter/等)操作和 SQL 操作(select、project、aggregate)。
尽管 DataFrames
功能强大且运用广泛,但在提取、转换和加载 (ETL) 操作方面存在局限性。最明显的是,它们需要指定了架构后才能加载任何数据。SparkSQL 通过对数据进行两次扫描解决了这一问题 – 第一次为了推断架构,第二次为了加载数据。然而,此推断仍有局限性,且不能解决数据混乱的实际问题。例如,同样的字段在不同记录中可能属于不同类型。对此,Apache Spark 通常没有好的办法,只能使用原始字段文本将类型报告为 string
。这或许不正确,同时您可能希望更精细地控制解决架构差异的方式。对于大型数据集而言,每扫描一次源数据都会付出高昂代价。
为了解决这些局限性,AWS Glue 推出了 DynamicFrame
。DynamicFrame
类似于 DataFrame
,不同之处在于每个记录都是自描述的,因此初始并不需要架构。相反,AWS Glue 会在需要时实时计算一个架构,并使用选择(或联合)类型显式编码架构不一致之处。您可以解决这些不一致之处,以使您的数据集兼容需要固定架构的数据存储。
同样,一个 DynamicRecord
表示 DynamicFrame
中的一条逻辑记录。它类似于 Spark DataFrame
中的一行,只不过它是自描述的,可用于不符合固定架构的数据。将 AWS Glue 与 PySpark 一起使用时,通常不需要处理独立 DynamicRecords
。相反,您需要通过 DynamicFrame
一起转换数据集。
在解决任何架构不一致问题后,就可以在 DynamicFrames
和 DataFrames
之间来回转换。
– 构造 –
__init__
__init__(jdf, glue_ctx, name)
-
jdf
– 对 Java 虚拟机 (JVM) 中数据帧的引用。 -
glue_ctx
– 一个 GlueContext 班级 对象。 -
name
– 可选的名称字符串,默认为空。
fromDF
fromDF(dataframe, glue_ctx, name)
通过将 DataFrame
字段转换为 DynamicRecord
字段将 DataFrame
转换为 DynamicFrame
。返回新的 DynamicFrame
。
一个 DynamicRecord
表示 DynamicFrame
中的一条逻辑记录。它类似于 Spark DataFrame
中的一行,只不过它是自描述的,可用于不符合固定架构的数据。
此函数期望您 DataFrame
中名称重复的列已经被解析。
-
dataframe
– 要转换的 Apache Spark SQLDataFrame
(必需)。 -
glue_ctx
– 为此转换指定上下文的 GlueContext 班级 对象 (必需)。 -
name
– 生成的DynamicFrame
的名称(从 AWS Glue 3.0 起是可选的)。
toDF
toDF(options)
通过将 DynamicRecords
转换为 DataFrame
字段将 DynamicFrame
转换为 Apache Spark DataFrame
。返回新的 DataFrame
。
一个 DynamicRecord
表示 DynamicFrame
中的一条逻辑记录。它类似于 Spark DataFrame
中的一行,只不过它是自描述的,可用于不符合固定架构的数据。
-
options
– 一个选项列表。如果您选择Project
和Cast
操作类型,则指定目标类型。示例包括以下内容。>>>toDF([ResolveOption("a.b.c", "KeepAsStruct")]) >>>toDF([ResolveOption("a.b.c", "Project", DoubleType())])
– 信息 –
count
count( )
– 返回底层 DataFrame
中的行数。
架构
schema( )
– 返回此 DynamicFrame
的架构,如果此架构不可用,则返回底层 DataFrame
的架构。
有关构成此架构的 DynamicFrame
类型的更多信息,请参阅 PySpark 扩展类型。
printSchema
printSchema( )
– 打印底层 DataFrame
的架构。
show
show(num_rows)
– 打印底层 DataFrame
中的行数。
repartition
repartition(numPartitions)
– 返回具有 numPartitions
个分区的新 DynamicFrame
。
coalesce
coalesce(numPartitions)
– 返回具有 numPartitions
个分区的新 DynamicFrame
。
– 转换 –
apply_mapping
apply_mapping(mappings, transformation_ctx="", info="", stageThreshold=0,
totalThreshold=0)
将声明映射应用于此 DynamicFrame
并返回已将那些映射应用于所指定字段的新 DynamicFrame
。新 DynamicFrame
中省略了未指定的字段。
-
mappings
– 映射元组列表(必需)。每个元组包括:(源列,源类型,目标列,目标类型)。如果源列的名称有一个点“
.
”,则必须在名称外加上反引号“``
”。例如,要将this.old.name
(字符串)映射到thisNewName
,可以使用以下元组:("`this.old.name`", "string", "thisNewName", "string")
-
transformation_ctx
– 用于标识状态信息的唯一字符串 (可选)。 -
info
– 与此转换的错误报告关联的字符串 (可选)。 -
stageThreshold
– 此转换过程中遇到的将导致过程出错的错误数(可选)。默认值为零,则表示进程不应出错。 -
totalThreshold
– 此转换之前及转换过程中遇到的将导致过程出错的错误数(可选)。默认值为零,则表示进程不应出错。
示例:使用 apply_mapping 重命名字段并更改字段类型
以下代码示例展示了如何使用 apply_mapping
方法重命名选定字段并更改字段类型。
注意
要访问本示例中使用的数据集,请参阅 代码示例:对数据进行联接和关系化 并按照 步骤 1:爬取 Amazon S3 存储桶中的数据 中的说明进行操作。
# Example: Use apply_mapping to reshape source data into # the desired column names and types as a new DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Create a DynamicFrame and view its schema persons = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="persons_json" ) print("Schema for the persons DynamicFrame:") persons.printSchema() # Select and rename fields, change field type print("Schema for the persons_mapped DynamicFrame, created with apply_mapping:") persons_mapped = persons.apply_mapping( [ ("family_name", "String", "last_name", "String"), ("name", "String", "first_name", "String"), ("birth_date", "String", "date_of_birth", "Date"), ] ) persons_mapped.printSchema()
Schema for the persons DynamicFrame: root |-- family_name: string |-- name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- gender: string |-- image: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- sort_name: string |-- images: array | |-- element: struct | | |-- url: string |-- given_name: string |-- birth_date: string |-- id: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- death_date: string Schema for the persons_mapped DynamicFrame, created with apply_mapping: root |-- last_name: string |-- first_name: string |-- date_of_birth: date
drop_fields
drop_fields(paths, transformation_ctx="", info="", stageThreshold=0,
totalThreshold=0)
调用 FlatMap 类 转换以从 DynamicFrame
中删除字段。返回指定字段已删除的新的 DynamicFrame
。
-
paths
– 字符串列表。每个字符串都包含要删除的字段节点的完整路径。您可以使用点表示法来指定嵌套字段。例如,如果字段first
在树结构中是字段name
的子字段,则为路径指定"name.first"
。如果字段节点的名称有文字
.
,则必须用反引号(`
)将名称括起来。 -
transformation_ctx
– 用于标识状态信息的唯一字符串 (可选)。 -
info
– 与此转换的错误报告关联的字符串 (可选)。 -
stageThreshold
– 此转换过程中遇到的将导致过程出错的错误数(可选)。默认值为零,则表示进程不应出错。 -
totalThreshold
– 此转换之前及转换过程中遇到的将导致过程出错的错误数(可选)。默认值为零,则表示进程不应出错。
示例:使用 drop_fields 从 DynamicFrame
中删除字段
此代码示例使用 drop_fields
方法从 DynamicFrame
中移除选定的顶级字段和嵌套字段。
示例数据集
该示例使用以下数据集,其在代码中由 EXAMPLE-FRIENDS-DATA
表表示:
{"name": "Sally", "age": 23, "location": {"state": "WY", "county": "Fremont"}, "friends": []} {"name": "Varun", "age": 34, "location": {"state": "NE", "county": "Douglas"}, "friends": [{"name": "Arjun", "age": 3}]} {"name": "George", "age": 52, "location": {"state": "NY"}, "friends": [{"name": "Fred"}, {"name": "Amy", "age": 15}]} {"name": "Haruki", "age": 21, "location": {"state": "AK", "county": "Denali"}} {"name": "Sheila", "age": 63, "friends": [{"name": "Nancy", "age": 22}]}
示例代码
# Example: Use drop_fields to remove top-level and nested fields from a DynamicFrame. # Replace MY-EXAMPLE-DATABASE with your Glue Data Catalog database name. # Replace EXAMPLE-FRIENDS-DATA with your table name. from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Create a DynamicFrame from Glue Data Catalog glue_source_database = "
MY-EXAMPLE-DATABASE
" glue_source_table = "EXAMPLE-FRIENDS-DATA
" friends = glueContext.create_dynamic_frame.from_catalog( database=glue_source_database, table_name=glue_source_table ) print("Schema for friends DynamicFrame before calling drop_fields:") friends.printSchema() # Remove location.county, remove friends.age, remove age friends = friends.drop_fields(paths=["age", "location.county", "friends.age"]) print("Schema for friends DynamicFrame after removing age, county, and friend age:") friends.printSchema()
Schema for friends DynamicFrame before calling drop_fields: root |-- name: string |-- age: int |-- location: struct | |-- state: string | |-- county: string |-- friends: array | |-- element: struct | | |-- name: string | | |-- age: int Schema for friends DynamicFrame after removing age, county, and friend age: root |-- name: string |-- location: struct | |-- state: string |-- friends: array | |-- element: struct | | |-- name: string
filter
filter(f, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)
返回新 DynamicFrame
,其中包含输入 DynamicFrame
中满足指定谓词函数 f
的所有 DynamicRecords
。
-
f
– 应用到DynamicFrame
的谓词函数。该函数必须采用DynamicRecord
作为参数,如果DynamicRecord
满足筛选要求,则返回 True,否则返回 False (必需)。一个
DynamicRecord
表示DynamicFrame
中的一条逻辑记录。其类似于 SparkDataFrame
中的一行,但其具有自描述性,可用于不符合固定架构的数据。 -
transformation_ctx
– 用于标识状态信息的唯一字符串 (可选)。 -
info
– 与此转换的错误报告关联的字符串 (可选)。 -
stageThreshold
– 此转换过程中遇到的将导致过程出错的错误数(可选)。默认值为零,则表示进程不应出错。 -
totalThreshold
– 此转换之前及转换过程中遇到的将导致过程出错的错误数(可选)。默认值为零,则表示进程不应出错。
示例:使用筛选条件获取筛选后的字段选择
此示例用 filter
方法创建新 DynamicFrame
,其中包括筛选后的其他 DynamicFrame
字段选择。
类似于 map
方法,filter
采用一个函数作为实际参数,以将其应用于原 DynamicFrame
中的每个记录。该函数将记录作为输入并返回布尔值。如果返回值为 true,则该记录将包含在生成的 DynamicFrame
中。如果返回值为 false,则该记录将被排除在外。
注意
要访问本示例中使用的数据集,请参阅 代码示例:使用 ResolveChoice、Lambda 和 ApplyMapping 进行数据准备 并按照 步骤 1:爬取 Amazon S3 存储桶中的数据 中的说明进行操作。
# Example: Use filter to create a new DynamicFrame # with a filtered selection of records from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Create DynamicFrame from Glue Data Catalog medicare = glueContext.create_dynamic_frame.from_options( "s3", { "paths": [ "s3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv" ] }, "csv", {"withHeader": True}, ) # Create filtered DynamicFrame with custom lambda # to filter records by Provider State and Provider City sac_or_mon = medicare.filter( f=lambda x: x["Provider State"] in ["CA", "AL"] and x["Provider City"] in ["SACRAMENTO", "MONTGOMERY"] ) # Compare record counts print("Unfiltered record count: ", medicare.count()) print("Filtered record count: ", sac_or_mon.count())
Unfiltered record count: 163065 Filtered record count: 564
join
join(paths1, paths2, frame2, transformation_ctx="", info="", stageThreshold=0,
totalThreshold=0)
执行与另一个 DynamicFrame
的等式联接并返回生成的 DynamicFrame
。
-
paths1
– 此帧中要联接的键列表。 -
paths2
– 另一帧中要联接的键列表。 -
frame2
– 要联接的另一个DynamicFrame
。 -
transformation_ctx
– 用于标识状态信息的唯一字符串 (可选)。 -
info
– 与此转换的错误报告关联的字符串 (可选)。 -
stageThreshold
– 此转换过程中遇到的将导致过程出错的错误数(可选)。默认值为零,则表示进程不应出错。 -
totalThreshold
– 此转换之前及转换过程中遇到的将导致过程出错的错误数(可选)。默认值为零,则表示进程不应出错。
示例:使用联接来组合 DynamicFrames
此示例使用 join
方法对三个 DynamicFrames
执行联接。AWSGlue 根据您提供的字段键执行联接。由此生成的 DynamicFrame
包含来自指定密钥相匹配的两个原始帧中的行。
请注意,join
转换会让所有字段保持不变。这意味着您指定要匹配的字段会出现在生成的 DynamicFrame 中,即使这些字段是多余的并且包含相同的密钥。在此示例中,我们在联接后使用 drop_fields
移除这些冗余密钥。
注意
要访问本示例中使用的数据集,请参阅 代码示例:对数据进行联接和关系化 并按照 步骤 1:爬取 Amazon S3 存储桶中的数据 中的说明进行操作。
# Example: Use join to combine data from three DynamicFrames from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Load DynamicFrames from Glue Data Catalog persons = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="persons_json" ) memberships = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="memberships_json" ) orgs = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="organizations_json" ) print("Schema for the persons DynamicFrame:") persons.printSchema() print("Schema for the memberships DynamicFrame:") memberships.printSchema() print("Schema for the orgs DynamicFrame:") orgs.printSchema() # Join persons and memberships by ID persons_memberships = persons.join( paths1=["id"], paths2=["person_id"], frame2=memberships ) # Rename and drop fields from orgs # to prevent field name collisions with persons_memberships orgs = ( orgs.drop_fields(["other_names", "identifiers"]) .rename_field("id", "org_id") .rename_field("name", "org_name") ) # Create final join of all three DynamicFrames legislators_combined = orgs.join( paths1=["org_id"], paths2=["organization_id"], frame2=persons_memberships ).drop_fields(["person_id", "org_id"]) # Inspect the schema for the joined data print("Schema for the new legislators_combined DynamicFrame:") legislators_combined.printSchema()
Schema for the persons DynamicFrame: root |-- family_name: string |-- name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- gender: string |-- image: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- sort_name: string |-- images: array | |-- element: struct | | |-- url: string |-- given_name: string |-- birth_date: string |-- id: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- death_date: string Schema for the memberships DynamicFrame: root |-- area_id: string |-- on_behalf_of_id: string |-- organization_id: string |-- role: string |-- person_id: string |-- legislative_period_id: string |-- start_date: string |-- end_date: string Schema for the orgs DynamicFrame: root |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- id: string |-- classification: string |-- name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- image: string |-- seats: int |-- type: string Schema for the new legislators_combined DynamicFrame: root |-- role: string |-- seats: int |-- org_name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- type: string |-- sort_name: string |-- area_id: string |-- images: array | |-- element: struct | | |-- url: string |-- on_behalf_of_id: string |-- other_names: array | |-- element: struct | | |-- note: string | | |-- name: string | | |-- lang: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- name: string |-- birth_date: string |-- organization_id: string |-- gender: string |-- classification: string |-- legislative_period_id: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- image: string |-- given_name: string |-- start_date: string |-- family_name: string |-- id: string |-- death_date: string |-- end_date: string
映射
map(f, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)
返回由于将指定映射函数应用到原始 DynamicFrame
中的所有记录而产生的新的 DynamicFrame
。
-
f
– 应用到DynamicFrame
中所有记录的映射函数。该函数必须采用DynamicRecord
作为参数并返回一个新的DynamicRecord
(必需)。一个
DynamicRecord
表示DynamicFrame
中的一条逻辑记录。它类似于 Apache SparkDataFrame
中的一行,但其具有自描述性,可用于不符合固定架构的数据。 transformation_ctx
– 用于标识状态信息的唯一字符串 (可选)。info
– 与转换中的错误关联的字符串(可选)。stageThreshold
– 在转换出错之前可能在其中发生的最大错误数(可选)。默认值为 0。totalThreshold
– 在处理出错之前可能全面发生的最大错误数(可选)。默认值为 0。
示例:使用映射将函数应用于 DynamicFrame
中的每个记录
此示例展示了如何使用 map
方法将函数应用于 DynamicFrame
的每个记录。具体而言,此示例将名为 MergeAddress
的函数应用于每个记录,以便将多个地址字段合并为一个 struct
类型。
注意
要访问本示例中使用的数据集,请参阅 代码示例:使用 ResolveChoice、Lambda 和 ApplyMapping 进行数据准备 并按照 步骤 1:爬取 Amazon S3 存储桶中的数据 中的说明进行操作。
# Example: Use map to combine fields in all records # of a DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Create a DynamicFrame and view its schema medicare = glueContext.create_dynamic_frame.from_options( "s3", {"paths": ["s3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv"]}, "csv", {"withHeader": True}) print("Schema for medicare DynamicFrame:") medicare.printSchema() # Define a function to supply to the map transform # that merges address fields into a single field def MergeAddress(rec): rec["Address"] = {} rec["Address"]["Street"] = rec["Provider Street Address"] rec["Address"]["City"] = rec["Provider City"] rec["Address"]["State"] = rec["Provider State"] rec["Address"]["Zip.Code"] = rec["Provider Zip Code"] rec["Address"]["Array"] = [rec["Provider Street Address"], rec["Provider City"], rec["Provider State"], rec["Provider Zip Code"]] del rec["Provider Street Address"] del rec["Provider City"] del rec["Provider State"] del rec["Provider Zip Code"] return rec # Use map to apply MergeAddress to every record mapped_medicare = medicare.map(f = MergeAddress) print("Schema for mapped_medicare DynamicFrame:") mapped_medicare.printSchema()
Schema for medicare DynamicFrame: root |-- DRG Definition: string |-- Provider Id: string |-- Provider Name: string |-- Provider Street Address: string |-- Provider City: string |-- Provider State: string |-- Provider Zip Code: string |-- Hospital Referral Region Description: string |-- Total Discharges: string |-- Average Covered Charges: string |-- Average Total Payments: string |-- Average Medicare Payments: string Schema for mapped_medicare DynamicFrame: root |-- Average Total Payments: string |-- Average Covered Charges: string |-- DRG Definition: string |-- Average Medicare Payments: string |-- Hospital Referral Region Description: string |-- Address: struct | |-- Zip.Code: string | |-- City: string | |-- Array: array | | |-- element: string | |-- State: string | |-- Street: string |-- Provider Id: string |-- Total Discharges: string |-- Provider Name: string
mergeDynamicFrame
mergeDynamicFrame(stage_dynamic_frame, primary_keys, transformation_ctx = "",
options = {}, info = "", stageThreshold = 0, totalThreshold = 0)
基于指定主键的将此 DynamicFrame
与暂存 DynamicFrame
合并以标识记录。不会对重复记录(具有相同主键的记录)去除重复。如果暂存帧中没有匹配的记录,则从源中保留所有记录(包括重复记录)。如果暂存帧具有匹配的记录,则暂存帧中的记录将覆盖 AWS Glue 中的源中的记录。
-
stage_dynamic_frame
– 要合并的暂存DynamicFrame
。 -
primary_keys
– 要匹配源和暂存动态帧中的记录的主键字段列表。 -
transformation_ctx
– 用于检索有关当前转换的元数据的唯一字符串(可选)。 -
options
– 为此转换提供其他信息的 JSON 名称-值对的字符串。当前未使用此参数。 -
info
– 一个String
。要与此转换中的错误关联的任何字符串。 -
stageThreshold
– 一个Long
。给定转换中处理需要排除的错误的数目。 -
totalThreshold
– 一个Long
。此转换中处理需要排除的错误的总数。
该方法将返回通过将此 DynamicFrame
与暂存 DynamicFrame
合并获取的新 DynamicFrame
。
在这些情况下,返回的 DynamicFrame
将包含记录 A:
-
如果
A
在源帧和暂存帧中都存在,则返回暂存帧中的A
。 -
如果
A
在源表中,且A.primaryKeys
不在stagingDynamicFrame
中,这意味着未在暂存表中更新A
。
源帧和暂存帧不需要具有相同的架构。
示例:根据主键使用 mergeDynamicFrame 合并两个 DynamicFrames
以下代码示例显示了如何使用 mergeDynamicFrame
方法根据主键 id
将 DynamicFrame
与“staging”DynamicFrame
合并。
示例数据集
该示例使用了名为 split_rows_collection
的 DynamicFrameCollection
中的两个 DynamicFrames
。以下是 split_rows_collection
中的键列表。
dict_keys(['high', 'low'])
示例代码
# Example: Use mergeDynamicFrame to merge DynamicFrames # based on a set of specified primary keys from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.transforms import SelectFromCollection # Inspect the original DynamicFrames frame_low = SelectFromCollection.apply(dfc=split_rows_collection, key="low") print("Inspect the DynamicFrame that contains rows where ID < 10") frame_low.toDF().show() frame_high = SelectFromCollection.apply(dfc=split_rows_collection, key="high") print("Inspect the DynamicFrame that contains rows where ID > 10") frame_high.toDF().show() # Merge the DynamicFrames based on the "id" primary key merged_high_low = frame_high.mergeDynamicFrame( stage_dynamic_frame=frame_low, primary_keys=["id"] ) # View the results where the ID is 1 or 20 print("Inspect the merged DynamicFrame that contains the combined rows") merged_high_low.toDF().where("id = 1 or id= 20").orderBy("id").show()
Inspect the DynamicFrame that contains rows where ID < 10 +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 1| 0| fax| 202-225-3307| | 1| 1| phone| 202-225-5731| | 2| 0| fax| 202-225-3307| | 2| 1| phone| 202-225-5731| | 3| 0| fax| 202-225-3307| | 3| 1| phone| 202-225-5731| | 4| 0| fax| 202-225-3307| | 4| 1| phone| 202-225-5731| | 5| 0| fax| 202-225-3307| | 5| 1| phone| 202-225-5731| | 6| 0| fax| 202-225-3307| | 6| 1| phone| 202-225-5731| | 7| 0| fax| 202-225-3307| | 7| 1| phone| 202-225-5731| | 8| 0| fax| 202-225-3307| | 8| 1| phone| 202-225-5731| | 9| 0| fax| 202-225-3307| | 9| 1| phone| 202-225-5731| | 10| 0| fax| 202-225-6328| | 10| 1| phone| 202-225-4576| +---+-----+------------------------+-------------------------+ only showing top 20 rows Inspect the DynamicFrame that contains rows where ID > 10 +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 11| 0| fax| 202-225-6328| | 11| 1| phone| 202-225-4576| | 11| 2| twitter| RepTrentFranks| | 12| 0| fax| 202-225-6328| | 12| 1| phone| 202-225-4576| | 12| 2| twitter| RepTrentFranks| | 13| 0| fax| 202-225-6328| | 13| 1| phone| 202-225-4576| | 13| 2| twitter| RepTrentFranks| | 14| 0| fax| 202-225-6328| | 14| 1| phone| 202-225-4576| | 14| 2| twitter| RepTrentFranks| | 15| 0| fax| 202-225-6328| | 15| 1| phone| 202-225-4576| | 15| 2| twitter| RepTrentFranks| | 16| 0| fax| 202-225-6328| | 16| 1| phone| 202-225-4576| | 16| 2| twitter| RepTrentFranks| | 17| 0| fax| 202-225-6328| | 17| 1| phone| 202-225-4576| +---+-----+------------------------+-------------------------+ only showing top 20 rows Inspect the merged DynamicFrame that contains the combined rows +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 1| 0| fax| 202-225-3307| | 1| 1| phone| 202-225-5731| | 20| 0| fax| 202-225-5604| | 20| 1| phone| 202-225-6536| | 20| 2| twitter| USRepLong| +---+-----+------------------------+-------------------------+
relationalize
relationalize(root_table_name, staging_path, options, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)
将 DynamicFrame
转换为适用于关系数据库的形式。将数据从 DynamoDB 等 NoSQL 环境移动到 MySQL 等关系数据库时,对 DynamicFrame
进行关系化尤其有用。
该转换将通过取消嵌套列的嵌套并透视数组列来生成帧列表。可使用在取消嵌套阶段生成的联接键将透视数组列联接到根表。
root_table_name
– 根表的名称。staging_path
– 要将 CSV 格式的透视表分区存储到的路径(可选)。从该路径读取透视表。options
– 可选参数的词典。-
transformation_ctx
– 用于标识状态信息的唯一字符串 (可选)。 -
info
– 与此转换的错误报告关联的字符串 (可选)。 -
stageThreshold
– 此转换过程中遇到的将导致过程出错的错误数(可选)。默认值为零,则表示进程不应出错。 -
totalThreshold
– 此转换之前及转换过程中遇到的将导致过程出错的错误数(可选)。默认值为零,则表示进程不应出错。
示例:使用 relatialize 在 DynamicFrame
中展平嵌套架构
此代码示例使用 relationalize
方法将嵌套架构展平为适用于关系数据库的形式。
示例数据集
该示例通过以下架构使用了名为 legislators_combined
的 DynamicFrame
。legislators_combined
有多个嵌套字段,例如 links
、images
和 contact_details
,这些字段将通过 relationalize
转换进行展平。
root |-- role: string |-- seats: int |-- org_name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- type: string |-- sort_name: string |-- area_id: string |-- images: array | |-- element: struct | | |-- url: string |-- on_behalf_of_id: string |-- other_names: array | |-- element: struct | | |-- note: string | | |-- name: string | | |-- lang: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- name: string |-- birth_date: string |-- organization_id: string |-- gender: string |-- classification: string |-- legislative_period_id: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- image: string |-- given_name: string |-- start_date: string |-- family_name: string |-- id: string |-- death_date: string |-- end_date: string
示例代码
# Example: Use relationalize to flatten # a nested schema into a format that fits # into a relational database. # Replace DOC-EXAMPLE-S3-BUCKET/tmpDir with your own location. from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Apply relationalize and inspect new tables legislators_relationalized = legislators_combined.relationalize( "l_root", "
s3://DOC-EXAMPLE-BUCKET/tmpDir
" ) legislators_relationalized.keys() # Compare the schema of the contact_details # nested field to the new relationalized table that # represents it legislators_combined.select_fields("contact_details").printSchema() legislators_relationalized.select("l_root_contact_details").toDF().where( "id = 10 or id = 75" ).orderBy(["id", "index"]).show()
以下输出可让您将名为 contact_details
的嵌套字段的架构与 relationalize
转换创建的表进行比较。请注意,表记录可使用名为 id
的外键和表示数组位置的 index
列链接回主表。
dict_keys(['l_root', 'l_root_images', 'l_root_links', 'l_root_other_names', 'l_root_contact_details', 'l_root_identifiers']) root |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 10| 0| fax| 202-225-4160| | 10| 1| phone| 202-225-3436| | 75| 0| fax| 202-225-6791| | 75| 1| phone| 202-225-2861| | 75| 2| twitter| RepSamFarr| +---+-----+------------------------+-------------------------+
rename_field
rename_field(oldName, newName, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)
重命名此 DynamicFrame
中的一个字段并返回包含该重命名字段的新的 DynamicFrame
。
-
oldName
– 要重命名的节点的完整路径。如果旧名称中包含点,则
RenameField
将不起作用,除非使用反引号(`
)将其引起来。例如,要将this.old.name
替换为thisNewName
,应按如下方式调用 rename_field。newDyF = oldDyF.rename_field("`this.old.name`", "thisNewName")
-
newName
– 新名称,作为完整路径。 -
transformation_ctx
– 用于标识状态信息的唯一字符串 (可选)。 -
info
– 与此转换的错误报告关联的字符串 (可选)。 -
stageThreshold
– 此转换过程中遇到的将导致过程出错的错误数(可选)。默认值为零,则表示进程不应出错。 -
totalThreshold
– 此转换之前及转换过程中遇到的将导致过程出错的错误数(可选)。默认值为零,则表示进程不应出错。
示例:使用 rename_field 重命名 DynamicFrame
中的字段
此代码示例可使用 rename_field
方法重命名 DynamicFrame
中的字段。请注意,该示例可使用方法链同时重命名多个字段。
注意
要访问本示例中使用的数据集,请参阅 代码示例:对数据进行联接和关系化 并按照 步骤 1:爬取 Amazon S3 存储桶中的数据 中的说明进行操作。
示例代码
# Example: Use rename_field to rename fields # in a DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Inspect the original orgs schema orgs = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="organizations_json" ) print("Original orgs schema: ") orgs.printSchema() # Rename fields and view the new schema orgs = orgs.rename_field("id", "org_id").rename_field("name", "org_name") print("New orgs schema with renamed fields: ") orgs.printSchema()
Original orgs schema: root |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- id: string |-- classification: string |-- name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- image: string |-- seats: int |-- type: string New orgs schema with renamed fields: root |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- classification: string |-- org_id: string |-- org_name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- image: string |-- seats: int |-- type: string
resolveChoice
resolveChoice(specs = None, choice = "" , database = None , table_name = None ,
transformation_ctx="", info="", stageThreshold=0, totalThreshold=0, catalog_id =
None)
解析此 DynamicFrame
内的一个选择类型并返回新的 DynamicFrame
。
-
specs
– 要解析的特定歧义列表,每个歧义均采用元组形式:(field_path, action)
。可通过两种方式使用
resolveChoice
。第一种是使用specs
参数指定一系列特定字段以及如何解析它们。resolveChoice
的另一种模式是使用choice
参数为所有ChoiceTypes
指定单个解析方法。specs
的值被指定为由(field_path, action)
对组成的元组。field_path
值标识特定歧义元素,action
值标识相应解析。可能的操作如下:-
cast:
– 尝试将所有值转换为指定的类型。例如:type
cast:int
。 -
make_cols
– 将每个不同的类型转换为名为
的列。通过展平数据来解析潜在的歧义。例如,如果columnName
_type
columnA
是int
或string
,则解析就是在结果DynamicFrame
中生成名为columnA_int
和columnA_string
的两个列。 -
make_struct
– 使用struct
表示数据,解析潜在的歧义。例如,如果某个列中的数据是int
或string
,则使用make_struct
操作会在结果DynamicFrame
中生成结构列。每个结构都包含int
和string
。 -
project:
– 将所有数据投影到可能的数据类型之一,解析潜在的歧义。例如,如果某个列中的数据是type
int
或string
,则使用project:string
操作会在结果DynamicFrame
中生成一个列,其中所有int
值都已转换为字符串。
如果
field_path
识别到数组,则在数组名称后放置一个空的方括号可避免歧义。例如,假设您正在使用结构如下的数据:"myList": [ { "price": 100.00 }, { "price": "$100.00" } ]
可以通过将
field_path
设置为"myList[].price"
、将action
设置为"cast:double"
来选择价格的数字而非字符串版本。注意
只能使用
specs
和choice
参数之一。如果specs
参数不为None
,则choice
参数必须为空字符串。反过来,如果choice
不为空字符串,则specs
参数必须为None
。 -
choice
– 为所有ChoiceTypes
指定单个解析方法。这可以在运行前不知道ChoiceTypes
的完整列表的情况下使用。除了之前为specs
列出的操作外,此参数还支持以下操作:-
match_catalog
– 尝试将每个ChoiceType
转换为指定数据目录表中的对应类型。
-
-
database
– 与match_catalog
操作一起使用的数据目录数据库。 -
table_name
– 与match_catalog
操作一起使用的数据目录表。 -
transformation_ctx
– 用于标识状态信息的唯一字符串 (可选)。 -
info
– 与此转换的错误报告关联的字符串 (可选)。 -
stageThreshold
– 此转换过程中遇到的将导致过程出错的错误数(可选)。默认值为零,则表示进程不应出错。 -
totalThreshold
– 此转换之前及转换过程中遇到的将导致过程出错的错误数(可选)。默认为零,表示此过程应该不会出错。 -
catalog_id
– 正在访问的数据目录的目录 ID(数据目录的账户 ID)。如果设置为None
(默认值),它使用调用账户的目录 ID。
示例:使用 resolveChoice 处理包含多种类型的列
此代码示例使用 resolveChoice
方法来指定如何处理包含多种类型值的 DynamicFrame
列。该示例演示了处理不同类型的列的两种常用方法:
将该列转换为单一数据类型。
在单独的列中保留所有类型。
示例数据集
注意
要访问本示例中使用的数据集,请参阅 代码示例:使用 ResolveChoice、Lambda 和 ApplyMapping 进行数据准备 并按照 步骤 1:爬取 Amazon S3 存储桶中的数据 中的说明进行操作。
该示例通过以下架构使用了名为 medicare
的 DynamicFrame
:
root |-- drg definition: string |-- provider id: choice | |-- long | |-- string |-- provider name: string |-- provider street address: string |-- provider city: string |-- provider state: string |-- provider zip code: long |-- hospital referral region description: string |-- total discharges: long |-- average covered charges: string |-- average total payments: string |-- average medicare payments: string
示例代码
# Example: Use resolveChoice to handle # a column that contains multiple types from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Load the input data and inspect the "provider id" column medicare = glueContext.create_dynamic_frame.from_catalog( database="payments", table_name="medicare_hospital_provider_csv" ) print("Inspect the provider id column:") medicare.toDF().select("provider id").show() # Cast provider id to type long medicare_resolved_long = medicare.resolveChoice(specs=[("provider id", "cast:long")]) print("Schema after casting provider id to type long:") medicare_resolved_long.printSchema() medicare_resolved_long.toDF().select("provider id").show() # Create separate columns # for each provider id type medicare_resolved_cols = medicare.resolveChoice(choice="make_cols") print("Schema after creating separate columns for each type:") medicare_resolved_cols.printSchema() medicare_resolved_cols.toDF().select("provider id_long", "provider id_string").show()
Inspect the 'provider id' column: +-----------+ |provider id| +-----------+ | [10001,]| | [10005,]| | [10006,]| | [10011,]| | [10016,]| | [10023,]| | [10029,]| | [10033,]| | [10039,]| | [10040,]| | [10046,]| | [10055,]| | [10056,]| | [10078,]| | [10083,]| | [10085,]| | [10090,]| | [10092,]| | [10100,]| | [10103,]| +-----------+ only showing top 20 rows Schema after casting 'provider id' to type long: root |-- drg definition: string |-- provider id: long |-- provider name: string |-- provider street address: string |-- provider city: string |-- provider state: string |-- provider zip code: long |-- hospital referral region description: string |-- total discharges: long |-- average covered charges: string |-- average total payments: string |-- average medicare payments: string +-----------+ |provider id| +-----------+ | 10001| | 10005| | 10006| | 10011| | 10016| | 10023| | 10029| | 10033| | 10039| | 10040| | 10046| | 10055| | 10056| | 10078| | 10083| | 10085| | 10090| | 10092| | 10100| | 10103| +-----------+ only showing top 20 rows Schema after creating separate columns for each type: root |-- drg definition: string |-- provider id_string: string |-- provider id_long: long |-- provider name: string |-- provider street address: string |-- provider city: string |-- provider state: string |-- provider zip code: long |-- hospital referral region description: string |-- total discharges: long |-- average covered charges: string |-- average total payments: string |-- average medicare payments: string +----------------+------------------+ |provider id_long|provider id_string| +----------------+------------------+ | 10001| null| | 10005| null| | 10006| null| | 10011| null| | 10016| null| | 10023| null| | 10029| null| | 10033| null| | 10039| null| | 10040| null| | 10046| null| | 10055| null| | 10056| null| | 10078| null| | 10083| null| | 10085| null| | 10090| null| | 10092| null| | 10100| null| | 10103| null| +----------------+------------------+ only showing top 20 rows
select_fields
select_fields(paths, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)
返回包含选定字段的新 DynamicFrame
。
-
paths
– 字符串列表。每个字符串就是一个指向您要选择的顶层节点的路径。 -
transformation_ctx
– 用于标识状态信息的唯一字符串 (可选)。 -
info
– 与此转换的错误报告关联的字符串 (可选)。 -
stageThreshold
– 此转换过程中遇到的将导致过程出错的错误数(可选)。默认值为零,则表示进程不应出错。 -
totalThreshold
– 此转换之前及转换过程中遇到的将导致过程出错的错误数(可选)。默认值为零,则表示进程不应出错。
示例:使用 select_fields 以利用选定字段创建新 DynamicFrame
以下代码示例展示了如何使用 select_fields
方法以利用从现有 DynamicFrame
中选定的字段列表来创建新 DynamicFrame
。
注意
要访问本示例中使用的数据集,请参阅 代码示例:对数据进行联接和关系化 并按照 步骤 1:爬取 Amazon S3 存储桶中的数据 中的说明进行操作。
# Example: Use select_fields to select specific fields from a DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Create a DynamicFrame and view its schema persons = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="persons_json" ) print("Schema for the persons DynamicFrame:") persons.printSchema() # Create a new DynamicFrame with chosen fields names = persons.select_fields(paths=["family_name", "given_name"]) print("Schema for the names DynamicFrame, created with select_fields:") names.printSchema() names.toDF().show()
Schema for the persons DynamicFrame: root |-- family_name: string |-- name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- gender: string |-- image: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- sort_name: string |-- images: array | |-- element: struct | | |-- url: string |-- given_name: string |-- birth_date: string |-- id: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- death_date: string Schema for the names DynamicFrame: root |-- family_name: string |-- given_name: string +-----------+----------+ |family_name|given_name| +-----------+----------+ | Collins| Michael| | Huizenga| Bill| | Clawson| Curtis| | Solomon| Gerald| | Rigell| Edward| | Crapo| Michael| | Hutto| Earl| | Ertel| Allen| | Minish| Joseph| | Andrews| Robert| | Walden| Greg| | Kazen| Abraham| | Turner| Michael| | Kolbe| James| | Lowenthal| Alan| | Capuano| Michael| | Schrader| Kurt| | Nadler| Jerrold| | Graves| Tom| | McMillan| John| +-----------+----------+ only showing top 20 rows
simplify_ddb_json
simplify_ddb_json(): DynamicFrame
简化 DynamicFrame
中的嵌套列,其具体位于 DynamoDB JSON 结构中,并返回一个新的简化 DynamicFrame
。如果 List 类型中有多种类型或 Map 类型,则不会简化 List 中的元素。请注意,这是一种特定类型的转换,其行为与常规 unnest
转换不同,并且要求数据已存在于 DynamoDB JSON 结构中。有关更多信息,请参阅 DynamoDB JSON。
例如,使用 DynamoDB JSON 结构读取导出的架构可能如下所示:
root |-- Item: struct | |-- parentMap: struct | | |-- M: struct | | | |-- childMap: struct | | | | |-- M: struct | | | | | |-- appName: struct | | | | | | |-- S: string | | | | | |-- packageName: struct | | | | | | |-- S: string | | | | | |-- updatedAt: struct | | | | | | |-- N: string | |-- strings: struct | | |-- SS: array | | | |-- element: string | |-- numbers: struct | | |-- NS: array | | | |-- element: string | |-- binaries: struct | | |-- BS: array | | | |-- element: string | |-- isDDBJson: struct | | |-- BOOL: boolean | |-- nullValue: struct | | |-- NULL: boolean
simplify_ddb_json()
转换会将此转换为:
root |-- parentMap: struct | |-- childMap: struct | | |-- appName: string | | |-- packageName: string | | |-- updatedAt: string |-- strings: array | |-- element: string |-- numbers: array | |-- element: string |-- binaries: array | |-- element: string |-- isDDBJson: boolean |-- nullValue: null
示例:使用 simplify_ddb_json 调用 DynamoDB JSON simplify 命令
以下代码示例使用 simplify_ddb_json
方法来使用 AWS Glue DynamoDB 导出连接器、调用 DynamoDB JSON simplify 命令,以及打印分区数量。
示例代码
from pyspark.context import SparkContext from awsglue.context import GlueContext sc = SparkContext() glueContext = GlueContext(sc) dynamicFrame = glueContext.create_dynamic_frame.from_options( connection_type = "dynamodb", connection_options = { 'dynamodb.export': 'ddb', 'dynamodb.tableArn': '<table arn>', 'dynamodb.s3.bucket': '<bucket name>', 'dynamodb.s3.prefix': '<bucket prefix>', 'dynamodb.s3.bucketOwner': '<account_id of bucket>' } ) simplified = dynamicFrame.simplify_ddb_json() print(simplified.getNumPartitions())
spigot
spigot(path, options={})
将示例记录写入指定的目标,以帮助您验证作业执行的转换。
-
path
– 要写入的目标的路径(必需)。 -
options
– 指定选项的键值对(可选)。"topk"
选项指定应写入第一条k
记录。"prob"
选项指定选择任何给定记录的可能性(以十进制数字形式表示)。您可以在选择要写入的记录时使用它。 transformation_ctx
– 用于标识状态信息的唯一字符串 (可选)。
示例:使用 spigot 将 DynamicFrame
中的示例字段写入 Amazon S3
此代码示例使用 spigot
方法在应用了 select_fields
转换后将示例记录写入 Amazon S3 存储桶。
示例数据集
注意
要访问本示例中使用的数据集,请参阅 代码示例:对数据进行联接和关系化 并按照 步骤 1:爬取 Amazon S3 存储桶中的数据 中的说明进行操作。
该示例通过以下架构使用了名为 persons
的 DynamicFrame
:
root |-- family_name: string |-- name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- gender: string |-- image: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- sort_name: string |-- images: array | |-- element: struct | | |-- url: string |-- given_name: string |-- birth_date: string |-- id: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- death_date: string
示例代码
# Example: Use spigot to write sample records # to a destination during a transformation # from pyspark.context import SparkContext. # Replace DOC-EXAMPLE-BUCKET with your own location. from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Load table data into a DynamicFrame persons = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="persons_json" ) # Perform the select_fields on the DynamicFrame persons = persons.select_fields(paths=["family_name", "given_name", "birth_date"]) # Use spigot to write a sample of the transformed data # (the first 10 records) spigot_output = persons.spigot( path="
s3://DOC-EXAMPLE-BUCKET
", options={"topk": 10} )
以下是 spigot
写入 Amazon S3 的数据示例。由于指定了示例代码 options={"topk": 10}
,因此示例数据包含前 10 条记录。
{"family_name":"Collins","given_name":"Michael","birth_date":"1944-10-15"} {"family_name":"Huizenga","given_name":"Bill","birth_date":"1969-01-31"} {"family_name":"Clawson","given_name":"Curtis","birth_date":"1959-09-28"} {"family_name":"Solomon","given_name":"Gerald","birth_date":"1930-08-14"} {"family_name":"Rigell","given_name":"Edward","birth_date":"1960-05-28"} {"family_name":"Crapo","given_name":"Michael","birth_date":"1951-05-20"} {"family_name":"Hutto","given_name":"Earl","birth_date":"1926-05-12"} {"family_name":"Ertel","given_name":"Allen","birth_date":"1937-11-07"} {"family_name":"Minish","given_name":"Joseph","birth_date":"1916-09-01"} {"family_name":"Andrews","given_name":"Robert","birth_date":"1957-08-04"}
split_fields
split_fields(paths, name1, name2, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)
返回一个新的 DynamicFrameCollection
,其中包含两个 DynamicFrames
。第一个 DynamicFrame
包含所有已拆分的节点,第二个包含其余节点。
-
paths
– 字符串列表,每个字符串是到一个您要拆分为新的DynamicFrame
的节点的完整路径。 -
name1
– 拆分的DynamicFrame
的名称字符串。 -
name2
– 指定节点拆分后留存的DynamicFrame
的名称字符串。 -
transformation_ctx
– 用于标识状态信息的唯一字符串 (可选)。 -
info
– 与此转换的错误报告关联的字符串 (可选)。 -
stageThreshold
– 此转换过程中遇到的将导致过程出错的错误数(可选)。默认值为零,则表示进程不应出错。 -
totalThreshold
– 此转换之前及转换过程中遇到的将导致过程出错的错误数(可选)。默认值为零,则表示进程不应出错。
示例:使用 split_fields 将选定字段拆分为单独的 DynamicFrame
此代码示例使用 split_fields
方法将指定字段列表拆分为单独的 DynamicFrame
。
示例数据集
该示例使用了集合 legislators_relationalized
中名为 l_root_contact_details
的 DynamicFrame
。
l_root_contact_details
有以下架构和条目。
root |-- id: long |-- index: int |-- contact_details.val.type: string |-- contact_details.val.value: string +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 1| 0| phone| 202-225-5265| | 1| 1| twitter| kathyhochul| | 2| 0| phone| 202-225-3252| | 2| 1| twitter| repjackyrosen| | 3| 0| fax| 202-225-1314| | 3| 1| phone| 202-225-3772| ...
示例代码
# Example: Use split_fields to split selected # fields into a separate DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Load the input DynamicFrame and inspect its schema frame_to_split = legislators_relationalized.select("l_root_contact_details") print("Inspect the input DynamicFrame schema:") frame_to_split.printSchema() # Split id and index fields into a separate DynamicFrame split_fields_collection = frame_to_split.split_fields(["id", "index"], "left", "right") # Inspect the resulting DynamicFrames print("Inspect the schemas of the DynamicFrames created with split_fields:") split_fields_collection.select("left").printSchema() split_fields_collection.select("right").printSchema()
Inspect the input DynamicFrame's schema: root |-- id: long |-- index: int |-- contact_details.val.type: string |-- contact_details.val.value: string Inspect the schemas of the DynamicFrames created with split_fields: root |-- id: long |-- index: int root |-- contact_details.val.type: string |-- contact_details.val.value: string
split_rows
split_rows(comparison_dict, name1, name2, transformation_ctx="", info="",
stageThreshold=0, totalThreshold=0)
将 DynamicFrame
中的一个或多个行拆分到新的 DynamicFrame
中。
该方法可返回一个新的 DynamicFrameCollection
,其中包含两个 DynamicFrames
。第一个 DynamicFrame
包含所有已拆分的行,第二个包含其余行。
-
comparison_dict
– 一个词典,其中的键是到一个列的路径,值是另一个词典,用于将比较器映射到与该列值所比较的值。例如,{"age": {">": 10, "<": 20}}
拆分其年龄列中的值大于 10 但小于 20 的行。 -
name1
– 拆分的DynamicFrame
的名称字符串。 -
name2
– 指定节点拆分后留存的DynamicFrame
的名称字符串。 -
transformation_ctx
– 用于标识状态信息的唯一字符串 (可选)。 -
info
– 与此转换的错误报告关联的字符串 (可选)。 -
stageThreshold
– 此转换过程中遇到的将导致过程出错的错误数(可选)。默认值为零,则表示进程不应出错。 -
totalThreshold
– 此转换之前及转换过程中遇到的将导致过程出错的错误数(可选)。默认值为零,则表示进程不应出错。
示例:使用 split_rows 拆分 DynamicFrame
中的行
此代码示例使用 split_rows
方法根据 id
字段值拆分 DynamicFrame
中的行。
示例数据集
该示例使用了从集合 legislators_relationalized
中选择的名为 l_root_contact_details
的 DynamicFrame
。
l_root_contact_details
有以下架构和条目。
root |-- id: long |-- index: int |-- contact_details.val.type: string |-- contact_details.val.value: string +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 1| 0| phone| 202-225-5265| | 1| 1| twitter| kathyhochul| | 2| 0| phone| 202-225-3252| | 2| 1| twitter| repjackyrosen| | 3| 0| fax| 202-225-1314| | 3| 1| phone| 202-225-3772| | 3| 2| twitter| MikeRossUpdates| | 4| 0| fax| 202-225-1314| | 4| 1| phone| 202-225-3772| | 4| 2| twitter| MikeRossUpdates| | 5| 0| fax| 202-225-1314| | 5| 1| phone| 202-225-3772| | 5| 2| twitter| MikeRossUpdates| | 6| 0| fax| 202-225-1314| | 6| 1| phone| 202-225-3772| | 6| 2| twitter| MikeRossUpdates| | 7| 0| fax| 202-225-1314| | 7| 1| phone| 202-225-3772| | 7| 2| twitter| MikeRossUpdates| | 8| 0| fax| 202-225-1314| +---+-----+------------------------+-------------------------+
示例代码
# Example: Use split_rows to split up # rows in a DynamicFrame based on value from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Retrieve the DynamicFrame to split frame_to_split = legislators_relationalized.select("l_root_contact_details") # Split up rows by ID split_rows_collection = frame_to_split.split_rows({"id": {">": 10}}, "high", "low") # Inspect the resulting DynamicFrames print("Inspect the DynamicFrame that contains IDs < 10") split_rows_collection.select("low").toDF().show() print("Inspect the DynamicFrame that contains IDs > 10") split_rows_collection.select("high").toDF().show()
Inspect the DynamicFrame that contains IDs < 10 +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 1| 0| phone| 202-225-5265| | 1| 1| twitter| kathyhochul| | 2| 0| phone| 202-225-3252| | 2| 1| twitter| repjackyrosen| | 3| 0| fax| 202-225-1314| | 3| 1| phone| 202-225-3772| | 3| 2| twitter| MikeRossUpdates| | 4| 0| fax| 202-225-1314| | 4| 1| phone| 202-225-3772| | 4| 2| twitter| MikeRossUpdates| | 5| 0| fax| 202-225-1314| | 5| 1| phone| 202-225-3772| | 5| 2| twitter| MikeRossUpdates| | 6| 0| fax| 202-225-1314| | 6| 1| phone| 202-225-3772| | 6| 2| twitter| MikeRossUpdates| | 7| 0| fax| 202-225-1314| | 7| 1| phone| 202-225-3772| | 7| 2| twitter| MikeRossUpdates| | 8| 0| fax| 202-225-1314| +---+-----+------------------------+-------------------------+ only showing top 20 rows Inspect the DynamicFrame that contains IDs > 10 +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 11| 0| phone| 202-225-5476| | 11| 1| twitter| RepDavidYoung| | 12| 0| phone| 202-225-4035| | 12| 1| twitter| RepStephMurphy| | 13| 0| fax| 202-226-0774| | 13| 1| phone| 202-225-6335| | 14| 0| fax| 202-226-0774| | 14| 1| phone| 202-225-6335| | 15| 0| fax| 202-226-0774| | 15| 1| phone| 202-225-6335| | 16| 0| fax| 202-226-0774| | 16| 1| phone| 202-225-6335| | 17| 0| fax| 202-226-0774| | 17| 1| phone| 202-225-6335| | 18| 0| fax| 202-226-0774| | 18| 1| phone| 202-225-6335| | 19| 0| fax| 202-226-0774| | 19| 1| phone| 202-225-6335| | 20| 0| fax| 202-226-0774| | 20| 1| phone| 202-225-6335| +---+-----+------------------------+-------------------------+ only showing top 20 rows
unbox
unbox(path, format, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0, **options)
取消装箱(重新格式化)DynamicFrame
中的一个字符串字段并返回包含已取消装箱 DynamicRecords
的新的 DynamicFrame
。
一个 DynamicRecord
表示 DynamicFrame
中的一条逻辑记录。它类似于 Apache Spark DataFrame
中的一行,但其具有自描述性,可用于不符合固定架构的数据。
-
path
– 要取消装箱的字符串节点的完整路径。 format
– 格式规范(可选)。您可将其用于 Amazon S3 或支持多种格式的 AWS Glue 连接。相关受支持的格式,请参阅 AWS Glue for Spark 中的输入和输出的数据格式选项。-
transformation_ctx
– 用于标识状态信息的唯一字符串 (可选)。 -
info
– 与此转换的错误报告关联的字符串 (可选)。 -
stageThreshold
– 此转换过程中遇到的将导致过程出错的错误数(可选)。默认值为零,则表示进程不应出错。 -
totalThreshold
– 此转换之前及转换过程中遇到的将导致过程出错的错误数(可选)。默认值为零,则表示进程不应出错。 -
options
– 下列一个或多个:separator
– 包含分隔符字符的字符串。escaper
– 包含转义字符的字符串。skipFirst
– 指示是否跳过第一个实例的布尔值。-
withSchema
- 包含节点架构的 JSON 表示形式的字符串。架构的 JSON 表示格式由StructType.json()
的输出定义。 withHeader
– 指示是否包括标头的布尔值。
示例:使用 unbox 将字符串字段拆开为结构
此代码示例使用 unbox
方法将 DynamicFrame
中的字符串字段拆开或重新格式化为结构类型的字段。
示例数据集
该示例通过以下架构和条目使用了名为 DynamicFrame
的 mapped_with_string
。
请注意名为 AddressString
的字段。这是拆开为结构示例中的字段。
root |-- Average Total Payments: string |-- AddressString: string |-- Average Covered Charges: string |-- DRG Definition: string |-- Average Medicare Payments: string |-- Hospital Referral Region Description: string |-- Address: struct | |-- Zip.Code: string | |-- City: string | |-- Array: array | | |-- element: string | |-- State: string | |-- Street: string |-- Provider Id: string |-- Total Discharges: string |-- Provider Name: string +----------------------+--------------------+-----------------------+--------------------+-------------------------+------------------------------------+--------------------+-----------+----------------+--------------------+ |Average Total Payments| AddressString|Average Covered Charges| DRG Definition|Average Medicare Payments|Hospital Referral Region Description| Address|Provider Id|Total Discharges| Provider Name| +----------------------+--------------------+-----------------------+--------------------+-------------------------+------------------------------------+--------------------+-----------+----------------+--------------------+ | $5777.24|{"Street": "1108 ...| $32963.07|039 - EXTRACRANIA...| $4763.73| AL - Dothan|[36301, DOTHAN, [...| 10001| 91|SOUTHEAST ALABAMA...| | $5787.57|{"Street": "2505 ...| $15131.85|039 - EXTRACRANIA...| $4976.71| AL - Birmingham|[35957, BOAZ, [25...| 10005| 14|MARSHALL MEDICAL ...| | $5434.95|{"Street": "205 M...| $37560.37|039 - EXTRACRANIA...| $4453.79| AL - Birmingham|[35631, FLORENCE,...| 10006| 24|ELIZA COFFEE MEMO...| | $5417.56|{"Street": "50 ME...| $13998.28|039 - EXTRACRANIA...| $4129.16| AL - Birmingham|[35235, BIRMINGHA...| 10011| 25| ST VINCENT'S EAST| ...
示例代码
# Example: Use unbox to unbox a string field # into a struct in a DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) unboxed = mapped_with_string.unbox("AddressString", "json") unboxed.printSchema() unboxed.toDF().show()
root |-- Average Total Payments: string |-- AddressString: struct | |-- Street: string | |-- City: string | |-- State: string | |-- Zip.Code: string | |-- Array: array | | |-- element: string |-- Average Covered Charges: string |-- DRG Definition: string |-- Average Medicare Payments: string |-- Hospital Referral Region Description: string |-- Address: struct | |-- Zip.Code: string | |-- City: string | |-- Array: array | | |-- element: string | |-- State: string | |-- Street: string |-- Provider Id: string |-- Total Discharges: string |-- Provider Name: string +----------------------+--------------------+-----------------------+--------------------+-------------------------+------------------------------------+--------------------+-----------+----------------+--------------------+ |Average Total Payments| AddressString|Average Covered Charges| DRG Definition|Average Medicare Payments|Hospital Referral Region Description| Address|Provider Id|Total Discharges| Provider Name| +----------------------+--------------------+-----------------------+--------------------+-------------------------+------------------------------------+--------------------+-----------+----------------+--------------------+ | $5777.24|[1108 ROSS CLARK ...| $32963.07|039 - EXTRACRANIA...| $4763.73| AL - Dothan|[36301, DOTHAN, [...| 10001| 91|SOUTHEAST ALABAMA...| | $5787.57|[2505 U S HIGHWAY...| $15131.85|039 - EXTRACRANIA...| $4976.71| AL - Birmingham|[35957, BOAZ, [25...| 10005| 14|MARSHALL MEDICAL ...| | $5434.95|[205 MARENGO STRE...| $37560.37|039 - EXTRACRANIA...| $4453.79| AL - Birmingham|[35631, FLORENCE,...| 10006| 24|ELIZA COFFEE MEMO...| | $5417.56|[50 MEDICAL PARK ...| $13998.28|039 - EXTRACRANIA...| $4129.16| AL - Birmingham|[35235, BIRMINGHA...| 10011| 25| ST VINCENT'S EAST| | $5658.33|[1000 FIRST STREE...| $31633.27|039 - EXTRACRANIA...| $4851.44| AL - Birmingham|[35007, ALABASTER...| 10016| 18|SHELBY BAPTIST ME...| | $6653.80|[2105 EAST SOUTH ...| $16920.79|039 - EXTRACRANIA...| $5374.14| AL - Montgomery|[36116, MONTGOMER...| 10023| 67|BAPTIST MEDICAL C...| | $5834.74|[2000 PEPPERELL P...| $11977.13|039 - EXTRACRANIA...| $4761.41| AL - Birmingham|[36801, OPELIKA, ...| 10029| 51|EAST ALABAMA MEDI...| | $8031.12|[619 SOUTH 19TH S...| $35841.09|039 - EXTRACRANIA...| $5858.50| AL - Birmingham|[35233, BIRMINGHA...| 10033| 32|UNIVERSITY OF ALA...| | $6113.38|[101 SIVLEY RD, H...| $28523.39|039 - EXTRACRANIA...| $5228.40| AL - Huntsville|[35801, HUNTSVILL...| 10039| 135| HUNTSVILLE HOSPITAL| | $5541.05|[1007 GOODYEAR AV...| $75233.38|039 - EXTRACRANIA...| $4386.94| AL - Birmingham|[35903, GADSDEN, ...| 10040| 34|GADSDEN REGIONAL ...| | $5461.57|[600 SOUTH THIRD ...| $67327.92|039 - EXTRACRANIA...| $4493.57| AL - Birmingham|[35901, GADSDEN, ...| 10046| 14|RIVERVIEW REGIONA...| | $5356.28|[4370 WEST MAIN S...| $39607.28|039 - EXTRACRANIA...| $4408.20| AL - Dothan|[36305, DOTHAN, [...| 10055| 45| FLOWERS HOSPITAL| | $5374.65|[810 ST VINCENT'S...| $22862.23|039 - EXTRACRANIA...| $4186.02| AL - Birmingham|[35205, BIRMINGHA...| 10056| 43|ST VINCENT'S BIRM...| | $5366.23|[400 EAST 10TH ST...| $31110.85|039 - EXTRACRANIA...| $4376.23| AL - Birmingham|[36207, ANNISTON,...| 10078| 21|NORTHEAST ALABAMA...| | $5282.93|[1613 NORTH MCKEN...| $25411.33|039 - EXTRACRANIA...| $4383.73| AL - Mobile|[36535, FOLEY, [1...| 10083| 15|SOUTH BALDWIN REG...| | $5676.55|[1201 7TH STREET ...| $9234.51|039 - EXTRACRANIA...| $4509.11| AL - Huntsville|[35609, DECATUR, ...| 10085| 27|DECATUR GENERAL H...| | $5930.11|[6801 AIRPORT BOU...| $15895.85|039 - EXTRACRANIA...| $3972.85| AL - Mobile|[36608, MOBILE, [...| 10090| 27| PROVIDENCE HOSPITAL| | $6192.54|[809 UNIVERSITY B...| $19721.16|039 - EXTRACRANIA...| $5179.38| AL - Tuscaloosa|[35401, TUSCALOOS...| 10092| 31|D C H REGIONAL ME...| | $4968.00|[750 MORPHY AVENU...| $10710.88|039 - EXTRACRANIA...| $3898.88| AL - Mobile|[36532, FAIRHOPE,...| 10100| 18| THOMAS HOSPITAL| | $5996.00|[701 PRINCETON AV...| $51343.75|039 - EXTRACRANIA...| $4962.45| AL - Birmingham|[35211, BIRMINGHA...| 10103| 33|BAPTIST MEDICAL C...| +----------------------+--------------------+-----------------------+--------------------+-------------------------+------------------------------------+--------------------+-----------+----------------+--------------------+ only showing top 20 rows
联合
union(frame1, frame2, transformation_ctx = "",
info = "", stageThreshold = 0, totalThreshold = 0)
联合两个 DynamicFrame。返回 DynamicFrame,其中包含来自两个输入 DynamicFrame 的所有记录。这种转换可能会从两个 DataFrame 与等效数据的合并中返回不同的结果。如果您需要 Spark DataFrame 联合行为,可以考虑使用 toDF
。
-
frame1
– 第一个要联合的 DynamicFrame。 -
frame2
– 第二个要联合的 DynamicFrame。 -
transformation_ctx
–(可选)用于标识统计信息/状态信息的唯一字符串 -
info
–(可选)与转换中的错误关联的任何字符串 -
stageThreshold
–(可选)在处理出错之前转换中出现的最大错误数 -
totalThreshold
–(可选)在处理出错之前出现的最大总错误数
unnest
unnest(transformation_ctx="", info="", stageThreshold=0,
totalThreshold=0)
取消嵌套 DynamicFrame
中的嵌套对象,使其成为顶级对象,并返回新的取消嵌套的 DynamicFrame
。
-
transformation_ctx
– 用于标识状态信息的唯一字符串 (可选)。 -
info
– 与此转换的错误报告关联的字符串 (可选)。 -
stageThreshold
– 此转换过程中遇到的将导致过程出错的错误数(可选)。默认值为零,则表示进程不应出错。 -
totalThreshold
– 此转换之前及转换过程中遇到的将导致过程出错的错误数(可选)。默认值为零,则表示进程不应出错。
示例:使用 unnest 将嵌套字段转换为顶级字段
此代码示例使用 unnest
方法将 DynamicFrame
中的所有嵌套字段展平为顶级字段。
示例数据集
该示例通过以下架构使用了名为 mapped_medicare
的 DynamicFrame
。请注意,Address
字段是唯一包含嵌套数据的字段。
root |-- Average Total Payments: string |-- Average Covered Charges: string |-- DRG Definition: string |-- Average Medicare Payments: string |-- Hospital Referral Region Description: string |-- Address: struct | |-- Zip.Code: string | |-- City: string | |-- Array: array | | |-- element: string | |-- State: string | |-- Street: string |-- Provider Id: string |-- Total Discharges: string |-- Provider Name: string
示例代码
# Example: Use unnest to unnest nested # objects in a DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Unnest all nested fields unnested = mapped_medicare.unnest() unnested.printSchema()
root |-- Average Total Payments: string |-- Average Covered Charges: string |-- DRG Definition: string |-- Average Medicare Payments: string |-- Hospital Referral Region Description: string |-- Address.Zip.Code: string |-- Address.City: string |-- Address.Array: array | |-- element: string |-- Address.State: string |-- Address.Street: string |-- Provider Id: string |-- Total Discharges: string |-- Provider Name: string
unnest_ddb_json
解除 DynamicFrame
中的嵌套列,其具体位于 DynamoDB JSON 结构中,并返回一个新的非嵌套 DynamicFrame
。属于结构类型数组的列将不会被解除嵌套。请注意,这是一种特定类型的非嵌套转换,其行为与常规 unnest
转换不同,并且要求数据已存在于 DynamoDB JSON 结构中。有关更多信息,请参阅 DynamoDB JSON。
unnest_ddb_json(transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)
-
transformation_ctx
– 用于标识状态信息的唯一字符串 (可选)。 -
info
– 与此转换的错误报告关联的字符串 (可选)。 -
stageThreshold
– 此转换过程中遇到的将导致过程出错的错误数 (可选,默认为零,表示此过程应该不会出错)。 -
totalThreshold
– 此转换之前及转换过程中遇到的将导致过程出错的错误数 (可选,默认为零,表示此过程应该不会出错)。
例如,使用 DynamoDB JSON 结构读取导出的架构可能如下所示:
root |-- Item: struct | |-- ColA: struct | | |-- S: string | |-- ColB: struct | | |-- S: string | |-- ColC: struct | | |-- N: string | |-- ColD: struct | | |-- L: array | | | |-- element: null
unnest_ddb_json()
转换会将此转换为:
root |-- ColA: string |-- ColB: string |-- ColC: string |-- ColD: array | |-- element: null
以下代码示例演示了如何使用 AWS Glue DynamoDB 导出连接器、调用 DynamoDB JSON 解除嵌套命令,以及打印分区数量:
import sys from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job from awsglue.utils import getResolvedOptions args = getResolvedOptions(sys.argv, ["JOB_NAME"]) glue_context= GlueContext(SparkContext.getOrCreate()) job = Job(glue_context) job.init(args["JOB_NAME"], args) dynamicFrame = glue_context.create_dynamic_frame.from_options( connection_type="dynamodb", connection_options={ "dynamodb.export": "ddb", "dynamodb.tableArn": "<test_source>", "dynamodb.s3.bucket": "<bucket name>", "dynamodb.s3.prefix": "<bucket prefix>", "dynamodb.s3.bucketOwner": "<account_id>", } ) unnested = dynamicFrame.unnest_ddb_json() print(unnested.getNumPartitions()) job.commit()
write
write(connection_type, connection_options, format, format_options, accumulator_size)
从此 DynamicFrame
的 GlueContext 班级 获得指定连接类型的 DataSink(object) 并将其用于格式化和写入此 DynamicFrame
的内容。返回按指定进行格式化和写入的新的 DynamicFrame
。
-
connection_type
– 要使用的连接类型。有效值包括s3
、mysql
、postgresql
、redshift
、sqlserver
和oracle
。 -
connection_options
– 要使用的连接选项(可选)。对于connection_type
的s3
,将会定义 Amazon S3 路径。connection_options = {"path": "
s3://aws-glue-target/temp
"}对于 JDBC 连接,必须定义多个属性。请注意,数据库名称必须是 URL 的一部分。它可以选择性地包含在连接选项中。
警告
不建议在脚本中存储密码。请考虑使用
boto3
从 AWS Secrets Manager 或 AWS Glue Data Catalog 检索它们。connection_options = {"url": "
jdbc-url/database
", "user": "username
", "password":passwordVariable
,"dbtable": "table-name
", "redshiftTmpDir": "s3-tempdir-path
"} format
– 格式规范(可选)。这用于 Amazon Simple Storage Service(Amazon S3)或支持多种格式的 AWS Glue 连接。有关支持的格式,请参阅 AWS Glue for Spark 中的输入和输出的数据格式选项。format_options
– 指定格式的格式选项。有关支持的格式,请参阅 AWS Glue for Spark 中的输入和输出的数据格式选项。accumulator_size
- 要使用的可累积大小(以字节为单位)(可选)。
– 错误 –
assertErrorThreshold
assertErrorThreshold( )
– 创建此 DynamicFrame
的转换中的错误的资产。从底层 DataFrame
返回 Exception
。
errorsAsDynamicFrame
errorsAsDynamicFrame( )
– 返回其中包含嵌套的错误记录的 DynamicFrame
。
示例:使用 errorsAsDynamicFrame 查看错误记录
以下代码示例展示了如何使用 errorsAsDynamicFrame
方法查看 DynamicFrame
的错误记录。
示例数据集
该示例使用以下数据集,您可以将以下数据集作为 JSON 上传到 Amazon S3。请注意,第二条记录的格式有误。当您使用 SparkSQL 时,格式错误的数据通常会中断文件解析。但是,DynamicFrame
会识别格式错误的问题,并将格式错误的行转换为您能单独处理的错误记录。
{"id": 1, "name": "george", "surname": "washington", "height": 178} {"id": 2, "name": "benjamin", "surname": "franklin", {"id": 3, "name": "alexander", "surname": "hamilton", "height": 171} {"id": 4, "name": "john", "surname": "jay", "height": 190}
示例代码
# Example: Use errorsAsDynamicFrame to view error records. # Replace s3://DOC-EXAMPLE-S3-BUCKET/error_data.json with your location. from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Create errors DynamicFrame, view schema errors = glueContext.create_dynamic_frame.from_options( "s3", {"paths": ["
s3://DOC-EXAMPLE-S3-BUCKET/error_data.json
"]}, "json" ) print("Schema of errors DynamicFrame:") errors.printSchema() # Show that errors only contains valid entries from the dataset print("errors contains only valid records from the input dataset (2 of 4 records)") errors.toDF().show() # View errors print("Errors count:", str(errors.errorsCount())) print("Errors:") errors.errorsAsDynamicFrame().toDF().show() # View error fields and error data error_record = errors.errorsAsDynamicFrame().toDF().head() error_fields = error_record["error"] print("Error fields: ") print(error_fields.asDict().keys()) print("\nError record data:") for key in error_fields.asDict().keys(): print("\n", key, ": ", str(error_fields[key]))
Schema of errors DynamicFrame: root |-- id: int |-- name: string |-- surname: string |-- height: int errors contains only valid records from the input dataset (2 of 4 records) +---+------+----------+------+ | id| name| surname|height| +---+------+----------+------+ | 1|george|washington| 178| | 4| john| jay| 190| +---+------+----------+------+ Errors count: 1 Errors: +--------------------+ | error| +--------------------+ |[[ File "/tmp/20...| +--------------------+ Error fields: dict_keys(['callsite', 'msg', 'stackTrace', 'input', 'bytesread', 'source', 'dynamicRecord']) Error record data: callsite : Row(site=' File "/tmp/2060612586885849088", line 549, in <module>\n sys.exit(main())\n File "/tmp/2060612586885849088", line 523, in main\n response = handler(content)\n File "/tmp/2060612586885849088", line 197, in execute_request\n result = node.execute()\n File "/tmp/2060612586885849088", line 103, in execute\n exec(code, global_dict)\n File "<stdin>", line 10, in <module>\n File "/opt/amazon/lib/python3.6/site-packages/awsglue/dynamicframe.py", line 625, in from_options\n format_options, transformation_ctx, push_down_predicate, **kwargs)\n File "/opt/amazon/lib/python3.6/site-packages/awsglue/context.py", line 233, in create_dynamic_frame_from_options\n source.setFormat(format, **format_options)\n', info='') msg : error in jackson reader stackTrace : com.fasterxml.jackson.core.JsonParseException: Unexpected character ('{' (code 123)): was expecting either valid name character (for unquoted name) or double-quote (for quoted) to start field name at [Source: com.amazonaws.services.glue.readers.BufferedStream@73492578; line: 3, column: 2] at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1581) at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:533) at com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:462) at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleOddName(UTF8StreamJsonParser.java:2012) at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._parseName(UTF8StreamJsonParser.java:1650) at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:740) at com.amazonaws.services.glue.readers.JacksonReader$$anonfun$hasNextGoodToken$1.apply(JacksonReader.scala:57) at com.amazonaws.services.glue.readers.JacksonReader$$anonfun$hasNextGoodToken$1.apply(JacksonReader.scala:57) at scala.collection.Iterator$$anon$9.next(Iterator.scala:162) at scala.collection.Iterator$$anon$16.hasNext(Iterator.scala:599) at scala.collection.Iterator$$anon$16.hasNext(Iterator.scala:598) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at com.amazonaws.services.glue.readers.JacksonReader$$anonfun$1.apply(JacksonReader.scala:120) at com.amazonaws.services.glue.readers.JacksonReader$$anonfun$1.apply(JacksonReader.scala:116) at com.amazonaws.services.glue.DynamicRecordBuilder.handleErr(DynamicRecordBuilder.scala:209) at com.amazonaws.services.glue.DynamicRecordBuilder.handleErrorWithException(DynamicRecordBuilder.scala:202) at com.amazonaws.services.glue.readers.JacksonReader.nextFailSafe(JacksonReader.scala:116) at com.amazonaws.services.glue.readers.JacksonReader.next(JacksonReader.scala:109) at com.amazonaws.services.glue.readers.JSONReader.next(JSONReader.scala:247) at com.amazonaws.services.glue.hadoop.TapeHadoopRecordReaderSplittable.nextKeyValue(TapeHadoopRecordReaderSplittable.scala:103) at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:230) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:121) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) input : bytesread : 252 source : dynamicRecord : Row(id=2, name='benjamin', surname='franklin')
errorsCount
errorsCount( )
– 返回 DynamicFrame
中的错误总数。
stageErrorsCount
stageErrorsCount
– 返回生成此 DynamicFrame
的过程中发生的错误数。