Menu
AWS Glue
Developer Guide

The AWS Glue Scala DynamicFrame Class

Package:   com.amazonaws.services.glue

class DynamicFrame extends Serializable with Logging ( val glueContext : GlueContext, _records : RDD[DynamicRecord], val name : String = s"", val transformationContext : String = DynamicFrame.UNDEFINED, callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0, prevErrors : => Long = 0, errorExpr : => Unit = {} )

A DynamicFrame is a distributed collection of self-describing DynamicRecord objects.

DynamicFrames are designed to provide a flexible data model for ETL operations. They do not require a schema to create and they can be used to read and transform data with messy or inconsistent values and types. A schema can be computed on-demand for those operations that need one.

DynamicFrames provide a range of transformations for data cleaning and ETL. They also support conversion to and from SparkSQL DataFrames to integrate with existing code and the many analytics operations that DataFrames provide.

The following parameters are shared across many of the Glue transformations that construct DynamicFrames:

  • transformationContext  —  Identifier for this DynamicFrame. The transformationContext is used as a key for job bookmark state that is persisted across runs.

  • callSite  —  Used to provide context information for error reporting. These values are automatically set when calling from Python.

  • stageThreshold  —  Maximum number of error records allowed from the computation of this DynamicFrame before throwing an exception, excluding records present in the previous DynamicFrame.

  • totalThreshold  —  Maximum number of total error records before an exception is thrown, including those from previous frames.

val errorsCount

val errorsCount

The number of error records in this DynamicFrame. This includes errors from previous operations.

def applyMapping

def applyMapping( mappings : Seq[Product4[String, String, String, String]], caseSensitive : Boolean = true, transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame
  • mappings  —  Sequence of mappings to construct a new DynamicFrame.

  • caseSensitive  —  Whether or not to look treat source columns as case sensitive. Setting this to false may help when integrating with case-insensitive stores like AWS Glue Data Catalog.

Selects, projects, and casts columns based on a sequence of mappings.

Each mapping is comprised of a source column and type and a target column and type. Mappings may be specified as either a four-tuple (source_path, source_type, target_path, target_type) or a [[MappingSpec]] object containing the same information.

In addition to simple projections and casting, mappings can be used to nest or unnest fields by separating components of the path with '.'. For example, suppose we have a DynamicFrame with the folowing schema:

{{{ root |-- name: string |-- age: int |-- address: struct | |-- state: string | |-- zip: int }}}

We can make the following call to unnest the state and zip fields:

{{{ df.applyMapping( Seq(("name", "string", "name", "string"), ("age", "int", "age", "int"), ("address.state", "string", "state", "string"), ("address.zip", "int", "zip", "int"))) }}}

The resulting schema is:

{{{ root |-- name: string |-- age: int |-- state: string |-- zip: int }}}

We can also use applyMapping to re-nest columns. For example the following inverts the previous transformation and creates a struct named address in the target.:

{{{ df.applyMapping( Seq(("name", "string", "name", "string"), ("age", "int", "age", "int"), ("state", "string", "address.state", "string"), ("zip", "int", "address.zip", "int"))) }}}

Field names containing '.' characters can be quoted using backticks ('').

Note

The applyMapping method currently cannot be used to map columns nested under arrays.

def assertErrorThreshold

def assertErrorThreshold : Unit

Action that forces computation and verifies that the number of error records fall below stageThreshold and totalThreshold. Throws an exception if either condition fails.

def count

lazy def count

Returns the number of elements in this DynamicFrame.

def dropField

def dropField( path : String, transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame

Returns a new DynamicFrame with the specified column removed.

def dropFields

def dropFields( fieldNames : Seq[String], // The column names to drop. transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame

Returns a new DynamicFrame with the specified columns removed.

This method can be used to delete nested columns, including those inside of arrays, but it cannot be used to drop specific array elements.

def dropNulls

def dropNulls( transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 )

Returns a new DynamicFrame with all null columns removed.

Note

This only removes columns of type NullType. Individual null values in other columns are not removed or modified.

def errorsAsDynamicFrame

def errorsAsDynamicFrame

Returns a new DynamicFrame containing the error records from this DynamicFrame.

def filter

def filter( f : DynamicRecord => Boolean, errorMsg : String = "", transformationContext : String = "", callSite : CallSite = CallSite("Not provided"), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame

Constructs a new DynamicFrame containing only those records for which the function 'f' returns true. The filter function 'f' should not mutate the input record.

def getName

def getName : String

Returns the name of this DynamicFrame.

def getNumPartitions

def getNumPartitions

Returns the number of partitions in this DynamicFrame.

def getSchemaIfComputed

def getSchemaIfComputed : Option[Schema]

Returns the schema if it has already been computed. Does not scan the data if the schema has not already been computed.

def isSchemaComputed

def isSchemaComputed : Boolean

Returns true if the schema has been computed for this DynamicFrame, or false if not. If this method returns false then calling the [[schema]] method will require another pass over the records in this DynamicFrame.

def javaToPython

def javaToPython : JavaRDD[Array[Byte]]

def join

def join( keys1 : Seq[String], keys2 : Seq[String], frame2 : DynamicFrame, transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame
  • keys1  —  Columns in this DynamicFrame to use for the join.

  • keys2  —  Columns in frame2 to use for the join. Must be the same length as keys1.

  • frame2  —  DynamicFrame to join against.

Returns the result of performing an equijoin with frame2 using the specified keys.

def map

def map( f : DynamicRecord => DynamicRecord, errorMsg : String = "", transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame

Returns a new DynamicFrame constructed by applying the specified function 'f' to each record in this DynamicFrame.

This method copies each record before applying the specified function, so it is safe to mutate the records. If the mapping function throws an exception on a given record, that record will be marked as an error and the stack trace will be saved as a column in the error record.

def printSchema

def printSchema : Unit

Prints the schema of this DynamicFrame to stdout in a human-readable format.

def recomputeSchema

def recomputeSchema : Schema

Forces a schema recomputation. This requires a scan over the data but may "tighten" the schema if there are some fields in the current schema that are not present in the data.

Returns the recomputed schema.

def relationalize

def relationalize( rootTableName : String, stagingPath : String, options : JsonOptions = JsonOptions.empty, transformationContext : String = "", callSite : CallSite = CallSite("Not provided"), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : Seq[DynamicFrame]
  • rootTableName  —  The name to use for the base DynamicFrame in the output. DynamicFrames created by pivoting arrays start with this as a prefix.

  • stagingPath  —  S3 path for writing intermediate data.

  • options  —  Relationalize options and configuration. Currently unused.

Flattens all nested structures and pivots arrays into separate tables.

This operation can be used to prepare deeply nested data for ingestion into a relational database. Nested structs are flattened in the same manner as the [[unnest]] transform. Additionally arrays are pivoted into separate tables with each array element becoming a row. For example, take a DynamicFrame with the following data:

{"name": "Nancy", "age": 47, "friends": ["Fred", "Lakshmi"]} {"name": "Stephanie", "age": 28, "friends": ["Yao", "Phil", "Alvin"]} {"name": "Nathan", "age": 54, "friends": ["Nicolai", "Karen"]}

Execute the following code:

{{{ df.relationalize("people", "s3:/my_bucket/my_path", JsonOptions.empty) }}}

This produces two tables. The first is named "people" and contains:

{{{ {"name": "Nancy", "age": 47, "friends": 1} {"name": "Stephanie", "age": 28, "friends": 2} {"name": "Nathan", "age": 54, "friends": 3) }}}

Here, the friends array has been replaced with an auto-generated join key. A separate table called people.friends is created with the following content:

{{{ {"id": 1, "index": 0, "val": "Fred"} {"id": 1, "index": 1, "val": "Lakshmi"} {"id": 2, "index": 0, "val": "Yao"} {"id": 2, "index": 1, "val": "Phil"} {"id": 2, "index": 2, "val": "Alvin"} {"id": 3, "index": 0, "val": "Nicolai"} {"id": 3, "index": 1, "val": "Karen"} }}}

In this table, 'id' is a join key that identifies which record the array element came from, 'index' refers to the position in the original array, and 'val' is the actual array entry.

The relationalize method returns the sequence of DynamicFrames created by applying this process recursively to all arrays.

Note

The Glue library automatically generates join keys for new tables. In order to ensure that join keys are unique across job runs, job bookmarks must be enabled.

def renameField

def renameField( oldName : String, newName : String, transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame
  • oldName  —  The original name of the column.

  • newName  —  The new name of the column.

Returns a new DynamicFrame with the specified field renamed.

This method can be used to rename nested fields. For example, the following code woudl rename state to state_code inside the address struct:

{{{ df.renameField("address.state", "address.state_code") }}}

def repartition

def repartition( numPartitions : Int, transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame

Returns a new DynamicFrame with numPartitions partitions.

def resolveChoice

def resolveChoice( specs : Seq[Product2[String, String]] = Seq.empty[ResolveSpec], choiceOption : Option[ChoiceOption] = None, database : Option[String] = None, tableName : Option[String] = None, transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame
  • choiceOption  —  Action to apply to all ChoiceType columns not listed in the specs sequence.

  • database  —  Data Catalog database to use with the match_catalog action.

  • tableName  —  Data Catalog table to use with the match_catalog action.

Returns a new DynamicFrame by replacing one or more ChoiceTypes with a more specific type.

There are two ways to use resolveChoice. The first is to specify a sequence of specific columns and how to resolve them. These are specified as tuples made up of (column, action) pairs.

The possible actions are:

  • cast:type  —  Attempts to cast all values to the specified type.

  • make_cols  —  Converts each distinct type to a column with the name columnName_type.

  • make_struct  —  Converts a column to a struct with keys for each distinct type.

  • project:type  —  Retainz only values of the specified type.

The other mode for resolveChoice> is to specify a single resolution for all ChoiceTypes. This can be used in cases where the complete list of ChoiceTypes is unknown before execution. In addition to the actions listed above, this mode also supports the following action:

  • match_catalog  —  Attempts to cast each ChoiceType to the corresponding type in the specified catalog table.

Examples:

Resolve the user.id column by casting to an int, and make the address field retain only structs:

{{{ df.resolveChoice(specs = Seq(("user.id", "cast:int"), ("address", "project:struct"))) }}}

Resolve all ChoiceTypes by converting each choice to a separate column:

{{{ df.resolveChoice(choiceOption = Some(ChoiceOption("make_cols"))) }}}

Resolve all ChoiceTypes by casting to the types in the specified catalog table:

{{{ df.resolveChoice(choiceOption = Some(ChoiceOption("match_catalog")), database = Some("my_database"), tableName = Some("my_table")) }}}

def schema

def schema : Schema

Returns the schema of this DynamicFrame.

The returned schema is guaranteed to contain every field that is present in a record in this DynamicFrame, but in a small number of cases it may contaion additional fields as well. The [[recomputeSchema]] method can be used to "tighten" the schema based on the records in this DynamicFrame.

def selectField

def selectField( fieldName : String, transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame

Returns a single field as a DynamicFrame.

def selectFields

def selectFields( paths : Seq[String], transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame
  • paths  —  Sequence of column names to select.

Returns a new DynamicFrame containing the specified columns.

Note

The selectFields method can only be used to select top-level columns. The [[applyMapping]] method can be used to select nested columns.

def show

def show( numRows : Int = 20 ) : Unit
  • numRows  —  Number of rows to print.

Prints rows from this DynamicFrame in JSON format.

def spigot

def spigot( path : String, options : JsonOptions = new JsonOptions("{}"), transformationContext : String = "", callSite : CallSite = CallSite("Not provided"), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame

Passthrough transformation that returns the same records but writes out a subset of records as a side effect.

  • path  —  Path in S3 to which to write output, in the form s3://bucket//path.

  • options  —  Optional JsonOptions map describing the sampling behavior.

Returns a DynamicFrame containing the same records as this one.

By default, writes 100 arbitrary records to the location specified by path. This behavior can be customized using the options map. Valid keys include the following:

  • topk  —  Specifies the total number of records written out. The default is 100.

  • prob  —  Specifies the probability that an individual record is included. Default is 1.

For example, the following call would sample the dataset by selecting each record with a 20% probability and stoping after 200 records have been written:

{{{ df.spigot("s3://my_bucket/my_path", JsonOptions(Map("topk" -> 200, "prob" -> 0.2))) }}}

def splitFields

def splitFields( paths : Seq[String], transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : Seq[DynamicFrame]
  • paths  —  The paths to include in the first DynamicFrame.

Returns a sequence of two DynamicFrames. The first contains the specified paths and the second contains all other columns.

def splitRows

def splitRows( paths : Seq[String], values : Seq[Any], operators : Seq[String], transformationContext : String, callSite : CallSite, stageThreshold : Long, totalThreshold : Long ) : Seq[DynamicFrame]

Splits rows based on predicates that compare columns to constants.

  • paths  —  Columns to use for comparison.

  • values  —  Constant values to use for comparison.

  • operators  —  Operators to use for comparison.

Returns a Sequence of two DynamicFrames. The first contains rows for which the predicate is true and the second contains those for which it is false.

Predicates are specified using three sequences: 'paths' contains the (possibly nested) column names, 'values' contains the constant values to compare to, and 'operators' contains the operators to use for comaprison. All three sequences must be the same length: the nth operator will be used to compare the nth column with the nth value.

Each operator must be one of "!=", "=", "<=", <, ">=", or ">".

As an example, the following call would split a DynamicFrame so that the first output frame would contain records of people over 65 from the United States and the second woudl contain all other records:

{{{ df.splitRows(Seq("age", "address.country"), Seq(65, "USA"), Seq(">=", "=")) }}}

def stageErrorsCount

def stageErrorsCount

Returns the number of error records created while computing this DynamicFrame. This excludes errors from previous operations that were passed into this DynamicFrame as input.

def toDF

def toDF( specs : Seq[ResolveSpec] = Seq.empty[ResolveSpec] ) : DataFrame

Converts this DynamicFrame to a SparkSQL DataFrame with the same schema and records.

Note

Since DataFrames do not support ChoiceTypes, this method will automatically convert ChoiceType columns into StructTypes. See [[resolveChoice]] for additional information and options for resolving choices.

def unbox

def unbox( path : String, format : String, optionString : String = "{}", transformationContext : String = "", callSite : CallSite = CallSite("Not provided"), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame
  • path  —  The column to parse. Must be string or binary.

  • format  —  The format to use for parsing.

  • optionString  —  Options to pass to the format, such as the CSV separator.

Parses an embedded string or binary column according to the specified format. Parsed columns are nested under a struct with the original column name.

For example suppose we had a CSV file with an embedded JSON column:

name, age, address Sally, 36, {"state": "NE", "city": "Omaha"} ...

After an initial parse we would get a DynamicFrame with the following schema:

{{{ root |-- name: string |-- age: int |-- address: string }}}

We can call unbox on the address column to parse the specific components:

{{{ df.unbox("address", "json") }}}

This gives us a DynamicFrame with the following schema:

{{{ root |-- name: string |-- age: int |-- address: struct | |-- state: string | |-- city: string }}}

def unnest

def unnest( transformationContext : String = "", callSite : CallSite = CallSite("Not Provided"), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame

Returns a new DynamicFrame with all nested structures flattened. Names are constructed using the '.' character.

For example, if we have a DynamicFrame with the following schema:

{{{ root |-- name: string |-- age: int |-- address: struct | |-- state: string | |-- city: string }}}

The following call unnests the address struct:

{{{ df.unnest() }}}

The resulting schema is:

{{{ root |-- name: string |-- age: int |-- address.state: string |-- address.city: string }}}

This method also unnests nested structs inside of arrays, but for historical reasons the name of such fields are prepended with the name of the enclosing array and ".val".

def withFrameSchema

def withFrameSchema( getSchema : () => Schema ) : DynamicFrame
  • getSchema  —  Function that returns the schema to use. Specified as a zero-parameter function to defer potentially expensive computation.

Sets the schema of this DynamicFrame to the specified value. This is primarily used internally to avoid costly schema re-computation. The passed-in schema must contain all columns present in the data.

def withName

def withName( name : String ) : DynamicFrame
  • name  —  New name to use.

Returns a copy of this DynamicFrame with a new name.

def withTransformationContext

def withTransformationContext( ctx : String ) : DynamicFrame

Returns a copy of this DynamicFrame with the specified transformation context.