Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.
Clase DynamicFrame Scala de AWS Glue
Paquete: com.amazonaws.services.glue
class DynamicFrame extends Serializable with Logging (
val glueContext : GlueContext,
_records : RDD[DynamicRecord],
val name : String = s"",
val transformationContext : String = DynamicFrame.UNDEFINED,
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0,
prevErrors : => Long = 0,
errorExpr : => Unit = {} )
DynamicFrame
es una colección distribuida de objetos DynamicRecord autodescriptivos.
Los elementos DynamicFrame
s se han diseñado para proporcionar un modelo de datos flexible para operaciones de ETL (extracción, transformación y carga). No necesitan un esquema para crearse y se pueden usar para leer y transformar datos que contienen valores y tipos confusos o incoherentes. Un esquema se puede calcular bajo demanda para las operaciones que necesiten uno.
Los objetos DynamicFrame
proporcionan una serie de transformaciones para la limpieza de datos y ETL. También admiten la conversión a clases DataFrame de SparkSQL, y desde dichas clases, para integrarse con el código existente y las numerosas operaciones de análisis que proporcionan las clases DataFrame.
Los siguientes parámetros se comparten entre las numerosas transformaciones de AWS Glue que construyen objetos DynamicFrame
:
transformationContext
: identificador de esteDynamicFrame
. El parámetrotransformationContext
se utiliza como clave para el estado de marcador de flujo de trabajo que se conserva de una ejecución a otra.callSite
: proporciona información de contexto para los informes de error. Estos valores se establecen automáticamente cuando se realiza la llamada desde Python.stageThreshold
: número máximo de registros de error permitidos en el cálculo de este objetoDynamicFrame
antes de generar una excepción, sin incluir los registros presentes en el objetoDynamicFrame
anterior.totalThreshold
: número máximo registros de error totales antes de que se genere una excepción, incluidos los de marcos anteriores.
Val errorsCount
val errorsCount
El número de registros de error en este objeto DynamicFrame
. Se incluyen errores de las operaciones anteriores.
Def applyMapping
def applyMapping( mappings : Seq[Product4[String, String, String, String]],
caseSensitive : Boolean = true,
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
mappings
: secuencia de mapeos para construir un objetoDynamicFrame
nuevo.caseSensitive
: indica si hay que considerar que las columnas de origen distinguen entre mayúsculas y minúsculas. Configurar este valor como falso puede ser de ayuda al realizar integraciones con almacenes que no distinguen entre mayúsculas y minúsculas, como AWS Glue Data Catalog.
Selecciona, proyecta y convierte las columnas en función de una secuencia de mapeos.
Cada mapeo se compone de una columna y un tipo de origen y una columna y un tipo de destino. Los mapeos se pueden especificar como una tupla cuádruple (source_path
, source_type
, target_path
, target_type
) o un objeto MappingSpec que contiene la misma información.
Además de usar los mapeos para proyecciones y conversiones simples, también se pueden usar para anidar o aplanar campos separando los componentes de la ruta con ".
" (punto).
Por ejemplo, suponga que tiene una instancia de DynamicFrame
con el siguiente esquema.
{{{
root
|-- name: string
|-- age: int
|-- address: struct
| |-- state: string
| |-- zip: int
}}}
Puede hacer la siguiente llamada para quitar el anidamiento en los campos state
y zip
:
{{{
df.applyMapping(
Seq(("name", "string", "name", "string"),
("age", "int", "age", "int"),
("address.state", "string", "state", "string"),
("address.zip", "int", "zip", "int")))
}}}
El esquema resultante es el siguiente.
{{{
root
|-- name: string
|-- age: int
|-- state: string
|-- zip: int
}}}
También puede utilizar applyMapping
para volver a anidar columnas. Por ejemplo, el siguiente código invierte la transformación anterior y crea una estructura denominada address
en el destino.
{{{
df.applyMapping(
Seq(("name", "string", "name", "string"),
("age", "int", "age", "int"),
("state", "string", "address.state", "string"),
("zip", "int", "address.zip", "int")))
}}}
Los nombres de los campos que contienen caracteres ".
" (punto) se pueden citar mediante acentos graves (``
).
nota
Actualmente no se puede usar el método applyMapping
para mapear columnas anidadas en matrices.
Def assertErrorThreshold
def assertErrorThreshold : Unit
Acción que fuerza el cómputo y verifica que el número de registros de error esté por debajo de stageThreshold
y totalThreshold
. Genera una excepción si no se cumple ninguna de las condiciones.
Def count
lazy
def count
Devuelve el número de elementos en este objeto DynamicFrame
.
Def dropField
def dropField( path : String,
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
Devuelve un objeto DynamicFrame
nuevo sin la columna especificada.
Def dropFields
def dropFields( fieldNames : Seq[String], // The column names to drop.
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
Devuelve un objeto DynamicFrame
nuevo sin las columnas especificadas.
Este método se puede utilizar para eliminar columnas anidadas, incluidas las que se encuentran en matrices, pero no para descartar elementos de matriz específicos.
Def dropNulls
def dropNulls( transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0 )
Devuelve un objeto DynamicFrame
nuevo sin las columnas nulas.
nota
Solo se eliminan las columnas del tipo NullType
. Los valores nulos individuales de otras columnas no se eliminan ni modifican.
def errorsAsDynamicFrame
def errorsAsDynamicFrame
Devuelve un objeto DynamicFrame
nuevo que contiene los registros de error de este DynamicFrame
.
Def filter
def filter( f : DynamicRecord => Boolean,
errorMsg : String = "",
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided"),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
Crea un objeto DynamicFrame
nuevo que solo contiene los registros en los que la función "f
" devuelve true
. La función de filtro "f
" no debe mutar el registro de entrada.
Def getName
def getName : String
Devuelve el nombre de este objeto DynamicFrame
.
Def getNumPartitions
def getNumPartitions
Devuelve el número de particiones en este objeto DynamicFrame
.
Def getSchemaIfComputed
def getSchemaIfComputed : Option[Schema]
Devuelve el esquema si ya se ha calculado. No analiza los datos si aún no se ha calculado el esquema.
Def isSchemaComputed
def isSchemaComputed : Boolean
Devuelve true
si el esquema se ha calculado para este objeto DynamicFrame
, de lo contrario devuelve false
. Si este método devuelve false, la llamada al método schema
necesitará otra pasada por los registros en este objeto DynamicFrame
.
Def javaToPython
def javaToPython : JavaRDD[Array[Byte]]
Def join
def join( keys1 : Seq[String],
keys2 : Seq[String],
frame2 : DynamicFrame,
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
keys1
: las columnas de este objetoDynamicFrame
que se utilizarán para la combinación.keys2
: las columnas deframe2
que se utilizarán para la combinación. Deben tener la misma longitud quekeys1
.frame2
: el elementoDynamicFrame
con el que debe realizarse la combinación.
Devuelve el resultado de realizar una combinación equijoin con frame2
utilizando las claves especificadas.
Def map
def map( f : DynamicRecord => DynamicRecord,
errorMsg : String = "",
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
Devuelve un objeto DynamicFrame
nuevo creado aplicando la función "f
" especificada a cada registro de este objeto DynamicFrame
.
Este método copia cada registro antes de aplicar la función especificada, por lo que es seguro para mutar los registros. Si la función de mapeo genera una excepción en un registro determinado, dicho registro se marca como un error y la pila de seguimiento se guarda como una columna en el registro de errores.
Def mergeDynamicFrames
def mergeDynamicFrames( stageDynamicFrame: DynamicFrame, primaryKeys: Seq[String], transformationContext: String = "",
options: JsonOptions = JsonOptions.empty, callSite: CallSite = CallSite("Not provided"),
stageThreshold: Long = 0, totalThreshold: Long = 0): DynamicFrame
stageDynamicFrame
:DynamicFrame
provisional que se fusionará.primaryKeys
: la lista de campos de clave principal para hacer coincidir los registros de losDynamicFrame
de origen y provisionales.transformationContext
: una cadena única que se utiliza para recuperar metadatos sobre la transformación actual (opcional).options
: una cadena de pares de nombre-valor de JSON que proporcionan información adicional para esta transformación.callSite
: se utiliza para proporcionar información de contexto para los informes de error.stageThreshold
: un valorLong
. Número de errores de la transformación especificada que provocarán que el proceso se termine.totalThreshold
: un valorLong
. Número total de errores hasta esta transformación (incluida) que provocarán que el proceso se termine.
Combina este objeto DynamicFrame
con una instancia provisional de DynamicFrame
en función de las claves principales especificadas para identificar registros. Los registros duplicados (registros con las mismas claves principales) no se eliminan. Si no hay ningún registro que coincida en el marco provisional, se retienen todos los registros del origen (incluidos los duplicados). Si el marco provisional tiene registros coincidentes, estos sobrescriben a los registros del origen en AWS Glue.
La instancia de DynamicFrame
devuelta contiene el registro A en los siguientes casos:
Si
A
existe tanto en el marco de origen como en el marco provisional, se devuelveA
en el marco provisional.Si
A
está en la tabla de origen yA.primaryKeys
no está enstagingDynamicFrame
(significa queA
no se actualiza en la tabla provisional).
No es necesario que el marco de origen y el marco provisional tengan el mismo esquema.
val mergedFrame: DynamicFrame = srcFrame.mergeDynamicFrames(stageFrame, Seq("id1", "id2"))
Def printSchema
def printSchema : Unit
Imprime el esquema de este objeto DynamicFrame
en stdout
en un formato fácil de leer.
Def recomputeSchema
def recomputeSchema : Schema
Fuerza un nuevo cálculo del esquema. Es necesario realizar un análisis de los datos, lo que podría "endurecer" el esquema si hay algunos campos del esquema actual que no están en los datos.
Devuelve el esquema calculado de nuevo.
Def relationalize
def relationalize( rootTableName : String,
stagingPath : String,
options : JsonOptions = JsonOptions.empty,
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided"),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : Seq[DynamicFrame]
rootTableName
: el nombre que se va a utilizar para elDynamicFrame
base en la salida. LosDynamicFrame
que se crean mediante matrices dinámicas comienzan con este prefijo.stagingPath
: la ruta de Amazon Simple Storage Service (Amazon S3) para escribir datos intermedios.options
: opciones y configuración de Rationalize. En la actualidad no se utiliza.
Aplana todas las estructuras anidadas y convierte las matrices en tablas independientes.
Puede utilizar esta operación para preparar datos con anidamiento profundo para su incorporación a una base de datos relacional. Las estructuras anidadas se aplanan de la misma manera en que lo haría la transformación Unnest. Además, las matrices se dividen en tablas independientes en las que cada elemento de matriz se convierte en una fila. Por ejemplo, supongamos que tiene una instancia de DynamicFrame
con los siguientes datos.
{"name": "Nancy", "age": 47, "friends": ["Fred", "Lakshmi"]}
{"name": "Stephanie", "age": 28, "friends": ["Yao", "Phil", "Alvin"]}
{"name": "Nathan", "age": 54, "friends": ["Nicolai", "Karen"]}
Ejecute el siguiente código.
{{{ df.relationalize("people", "s3:/my_bucket/my_path", JsonOptions.empty) }}}
Se generan dos tablas. La primera tabla se denomina "people" y contiene lo siguiente.
{{{ {"name": "Nancy", "age": 47, "friends": 1} {"name": "Stephanie", "age": 28, "friends": 2} {"name": "Nathan", "age": 54, "friends": 3) }}}
Aquí, la matriz friends se ha reemplazado por una clave de combinación generada automáticamente. Se crea una tabla diferente llamada people.friends
con el siguiente contenido.
{{{ {"id": 1, "index": 0, "val": "Fred"} {"id": 1, "index": 1, "val": "Lakshmi"} {"id": 2, "index": 0, "val": "Yao"} {"id": 2, "index": 1, "val": "Phil"} {"id": 2, "index": 2, "val": "Alvin"} {"id": 3, "index": 0, "val": "Nicolai"} {"id": 3, "index": 1, "val": "Karen"} }}}
En esta tabla, "id
" es una clave de combinación que identifica de qué registro procede el elemento de matriz, "index
" hace referencia a la posición en la matriz original y "val
" es la entrada de matriz real.
El método relationalize
devuelve la secuencia de objetos DynamicFrame
creados aplicando este proceso de forma recursiva a todas las matrices.
nota
La biblioteca de AWS Glue genera automáticamente las claves de combinación de las tablas nuevas. Para garantizar que las claves de combinación sean únicas en las ejecuciones de flujo de trabajo, debe habilitar los marcadores de flujo de trabajo.
Def renameField
def renameField( oldName : String,
newName : String,
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
oldName
: el nombre original de la columna.newName
: el nuevo nombre de la columna.
Devuelve un DynamicFrame
nuevo que incluye el campo especificado con un nuevo nombre.
Puede usar este método para cambiar el nombre de los campos anidados. Por ejemplo, el siguiente código cambiaría el nombre de state
a state_code
en la estructura de dirección.
{{{ df.renameField("address.state", "address.state_code") }}}
Def repartition
def repartition( numPartitions : Int,
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
Devuelve un objeto DynamicFrame
nuevo con numPartitions
particiones.
Def resolveChoice
def resolveChoice( specs : Seq[Product2[String, String]] = Seq.empty[ResolveSpec],
choiceOption : Option[ChoiceOption] = None,
database : Option[String] = None,
tableName : Option[String] = None,
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
choiceOption
: acción que se aplica a todas las columnasChoiceType
que no están indicadas en la secuencia specs.database
: la base de datos de Data Catalog que se usará con la acciónmatch_catalog
.tableName
: la tabla de Data Catalog que se usará con la acciónmatch_catalog
.
Devuelve un objeto DynamicFrame
nuevo reemplazando uno o varios ChoiceType
por un tipo más específico.
Hay dos formas de usar resolveChoice
. La primera consiste en especificar una secuencia de columnas específicas y cómo resolverlas. Se especifican como tuplas compuestas de pares (columna, acción).
Los posibles valores son los siguientes:
cast:type
: intenta convertir todos los valores al tipo especificado.make_cols
: convierte cada tipo diferenciado en una columna con el nombrecolumnName_type
.make_struct
: convierte una columna en una estructura con claves para cada tipo diferenciado.project:type
: retiene solo valores del tipo especificado.
El otro modo de resolveChoice
es especificar una resolución única para todas las columnas ChoiceType
. Se puede utilizar en los casos en los que se desconozca la lista completa de ChoiceType
antes de la ejecución. Además de las acciones enumeradas anteriormente, este modo también admite la siguiente acción:
match_catalog
: intenta convertir cadaChoiceType
al tipo correspondiente en la tabla de catálogos especificada.
Ejemplos:
Resolver la columna user.id
mediante la conversión a int y hacer que el campo address
solo conserve estructuras.
{{{ df.resolveChoice(specs = Seq(("user.id", "cast:int"), ("address", "project:struct"))) }}}
Resolver todas las columnas ChoiceType
convirtiendo cada opción en una columna independiente.
{{{ df.resolveChoice(choiceOption = Some(ChoiceOption("make_cols"))) }}}
Resolver todas las columnas ChoiceType
mediante la conversión a los tipos de la tabla de catálogos especificada.
{{{ df.resolveChoice(choiceOption = Some(ChoiceOption("match_catalog")), database = Some("my_database"), tableName = Some("my_table")) }}}
Def schema
def schema : Schema
Devuelve el esquema de este objeto DynamicFrame
.
El esquema devuelto contiene todos los campos presentes en un registro en este objeto DynamicFrame
. Sin embargo, en un pequeño número de casos, es posible que también contenga campos adicionales. Puede usar el método Unnest para "ajustar" el esquema en función de los registros de este objeto DynamicFrame
.
Def selectField
def selectField( fieldName : String,
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
Devuelve un campo individual como un objeto DynamicFrame
.
Def selectFields
def selectFields( paths : Seq[String],
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
paths
: secuencia de los nombres de columna que se seleccionarán.
Devuelve un nuevo objeto DynamicFrame
que contiene las columnas especificadas.
nota
Solo puede usar el método selectFields
para seleccionar columnas de nivel superior. Puede usar el método applyMapping para seleccionar columnas anidadas.
Def show
def show( numRows : Int = 20 ) : Unit
numRows
: número de filas que se imprimirán.
Imprime filas de este objeto DynamicFrame
en formato JSON.
Def simplifyDDBJson
Las exportaciones de DynamoDB con el conector de exportaciones de DynamoDB de AWS Glue generan archivos JSON con estructuras anidadas específicas. Para obtener más información, consulte Objetos de datos. simplifyDDBJson
Simplifica las columnas anidadas en un DynamicFrame de este tipo de datos y genera un DynamicFrame nuevo y simplificado. Si hay varios tipos y un tipo de mapa contenidos en tipo de lista, los elementos en la lista no se simplificarán. Este método solo es compatible con los datos exportados de DynamoDB con el formato JSON. Considere la acción de unnest
para realizar cambios similares en otros tipos de datos.
def simplifyDDBJson() : DynamicFrame
Este método no toma ningún parámetro.
Ejemplo de entrada
Considere el siguiente esquema generado por una exportación a DynamoDB:
root |-- Item: struct | |-- parentMap: struct | | |-- M: struct | | | |-- childMap: struct | | | | |-- M: struct | | | | | |-- appName: struct | | | | | | |-- S: string | | | | | |-- packageName: struct | | | | | | |-- S: string | | | | | |-- updatedAt: struct | | | | | | |-- N: string | |-- strings: struct | | |-- SS: array | | | |-- element: string | |-- numbers: struct | | |-- NS: array | | | |-- element: string | |-- binaries: struct | | |-- BS: array | | | |-- element: string | |-- isDDBJson: struct | | |-- BOOL: boolean | |-- nullValue: struct | | |-- NULL: boolean
Código de ejemplo
import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.DynamoDbDataSink import org.apache.spark.SparkContextimport scala.collection.JavaConverters._ object GlueApp { def main(sysArgs: Array[String]): Unit = { val glueContext = new GlueContext(SparkContext.getOrCreate()) val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) val dynamicFrame = glueContext.getSourceWithFormat( connectionType = "dynamodb", options = JsonOptions(Map( "dynamodb.export" -> "ddb", "dynamodb.tableArn" -> "
ddbTableARN
", "dynamodb.s3.bucket" -> "exportBucketLocation
", "dynamodb.s3.prefix" -> "exportBucketPrefix
", "dynamodb.s3.bucketOwner" -> "exportBucketAccountID
", )) ).getDynamicFrame() val simplified = dynamicFrame.simplifyDDBJson() simplified.printSchema() Job.commit() } }
La transformación de simplifyDDBJson
simplificará esto de la siguiente forma:
root |-- parentMap: struct | |-- childMap: struct | | |-- appName: string | | |-- packageName: string | | |-- updatedAt: string |-- strings: array | |-- element: string |-- numbers: array | |-- element: string |-- binaries: array | |-- element: string |-- isDDBJson: boolean |-- nullValue: null
Def spigot
def spigot( path : String,
options : JsonOptions = new JsonOptions("{}"),
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided"),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
Transformación de paso a través que devuelve los mismos registros, pero escribe un subconjunto de registros como efecto secundario.
path
: ruta de Amazon S3 en la que se escribirá la salida, con el formatos3://bucket//path
.options
: mapaJsonOptions
opcional que describe el comportamiento de muestreo.
Devuelve un objeto DynamicFrame
que contiene los mismos registros que este.
De forma predeterminada, escribe 100 registros arbitrarios en la ubicación especificada por path
. Puede personalizar este comportamiento mediante el mapa options
. Entre las claves válidas se incluyen:
topk
: especifica el número total de registros escritos. El valor predeterminado es 100.prob
: especifica la probabilidad de que se incluya un registro individual (como decimal). El valor predeterminado es 1.
Por ejemplo, la siguiente llamada realizaría un muestreo del conjunto de datos seleccionando cada registro con una probabilidad del 20 % y se detendría después de que se hubieran escrito 200 registros.
{{{ df.spigot("s3://my_bucket/my_path", JsonOptions(Map("topk" -> 200, "prob" -> 0.2))) }}}
Def splitFields
def splitFields( paths : Seq[String],
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : Seq[DynamicFrame]
paths
: las rutas que se incluirán en el primer objetoDynamicFrame
.
Devuelve una secuencia de dos objetos DynamicFrame
. El primer DynamicFrame
contiene las rutas especificadas y el segundo contiene las demás columnas.
Ejemplo
En este ejemplo, se toma un objeto DynamicFrame creado a partir de la tabla persons
en la base de datos legislators
de Data Catalog de AWS Glue y divide el DynamicFrame en dos, con los campos especificados incluidos en el primer DynamicFrame y los campos restantes incluidos en un segundo DynamicFrame. Luego, el ejemplo elige el primer DynamicFrame del resultado.
val InputFrame = glueContext.getCatalogSource(database="legislators", tableName="persons", transformationContext="InputFrame").getDynamicFrame() val SplitField_collection = InputFrame.splitFields(paths=Seq("family_name", "name", "links.note", "links.url", "gender", "image", "identifiers.scheme", "identifiers.identifier", "other_names.lang", "other_names.note", "other_names.name"), transformationContext="SplitField_collection") val ResultFrame = SplitField_collection(0)
Def splitRows
def splitRows( paths : Seq[String],
values : Seq[Any],
operators : Seq[String],
transformationContext : String,
callSite : CallSite,
stageThreshold : Long,
totalThreshold : Long
) : Seq[DynamicFrame]
Divide las filas en función de predicados que comparan columnas con constantes.
paths
: columnas que se van a utilizar para la comparación.values
: valores constantes que se van a utilizar para la comparación.operators
: operadores que se van a utilizar para la comparación.
Devuelve una secuencia de dos objetos DynamicFrame
. El primero contiene filas en las que se aplica el predicado y el segundo contiene las que no se les aplica.
Los predicados se especifican usando tres secuencias: "paths
" contiene los nombres de columna (posiblemente anidadas), "values
" contiene los valores constantes con los que se realizará la comparación y "operators
" contiene los operadores que se usarán para la comparación. Las tres secuencias deben tener la misma longitud: el operador que ocupa la posición n
se usa para comparar la columna n
con el valor n
.
Cada operador debe ser uno de los siguientes: "!=
", "=
", "<=
", "<
", ">=
" o ">
".
A modo de ejemplo, la siguiente llamada dividiría un objeto DynamicFrame
de modo que el primer marco de salida contuviera los registros de personas de más de 65 años de Estados Unidos y el segundo contuviera todos los demás registros.
{{{ df.splitRows(Seq("age", "address.country"), Seq(65, "USA"), Seq(">=", "=")) }}}
Def stageErrorsCount
def stageErrorsCount
Devuelve el número de registros de error creado al calcular este objeto DynamicFrame
. Se excluyen los errores de operaciones anteriores que se pasaron a este objeto DynamicFrame
como entrada.
Def toDF
def toDF( specs : Seq[ResolveSpec] = Seq.empty[ResolveSpec] ) : DataFrame
Convierte este DynamicFrame
en un DataFrame
Apache Spark SQL con el mismo esquema y registros.
nota
Dado que los objetos DataFrame
no admiten ChoiceType
, este método convierte automáticamente las columnas ChoiceType
en StructType
. Para obtener más información y opciones para la resolución de elección, consulte resolveChoice.
Def unbox
def unbox( path : String,
format : String,
optionString : String = "{}",
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided"),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
path
: la columna que se va a analizar. Debe tener formato de cadena o binario.format
: el formato que se utilizará para el análisis.optionString
: opciones que se pasarán al formato, como el separador CSV.
Analiza una cadena insertada o una columna binaria de acuerdo con el formato especificado. Las columnas analizadas se anidan en una estructura con el nombre de columna original.
Por ejemplo, suponga que tiene un archivo CSV con una columna JSON insertada.
name, age, address Sally, 36, {"state": "NE", "city": "Omaha"} ...
Después de un análisis inicial, obtendrá un objeto DynamicFrame
con el siguiente esquema.
{{{ root |-- name: string |-- age: int |-- address: string }}}
Puede llamar a unbox
en la columna de dirección para analizar los componentes específicos.
{{{ df.unbox("address", "json") }}}
Esto nos proporciona un objeto DynamicFrame
con el siguiente esquema.
{{{ root |-- name: string |-- age: int |-- address: struct | |-- state: string | |-- city: string }}}
Def unnest
def unnest( transformationContext : String = "",
callSite : CallSite = CallSite("Not Provided"),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
Devuelve un objeto DynamicFrame
nuevo con todas las estructuras anidadas aplanadas. Los nombres se crean con el carácter ".
" (punto).
Por ejemplo, suponga que tiene una instancia de DynamicFrame
con el siguiente esquema.
{{{ root |-- name: string |-- age: int |-- address: struct | |-- state: string | |-- city: string }}}
La siguiente llamada elimina el anidamiento de la estructura de dirección:
{{{ df.unnest() }}}
El esquema resultante es el siguiente.
{{{ root |-- name: string |-- age: int |-- address.state: string |-- address.city: string }}}
Este método también aplana las estructuras anidadas de dentro de las matrices. Sin embargo, por razones históricas, los nombres de dichos campos tienen anexado previamente el nombre de la matriz contenedora y ".val
".
Def unnestDDBJson
unnestDDBJson(transformationContext : String = "",
callSite : CallSite = CallSite("Not Provided"),
stageThreshold : Long = 0,
totalThreshold : Long = 0): DynamicFrame
Desanida las columnas anidadas de un elemento DynamicFrame
que se encuentren específicamente en la estructura JSON de DynamoDB y devuelve un nuevo elemento DynamicFrame
no anidado. Las columnas que sean de una matriz de tipos de estructuras no se desanidarán. Tenga en cuenta que esta transformación de desanidamiento es un tipo específico que se comporta de modo diferente a la transformación unnest
normal y requiere que los datos ya estén en la estructura JSON de DynamoDB. Para obtener más información, consulte JSON de DynamoDB.
Por ejemplo, el esquema de una lectura de una exportación con la estructura JSON de DynamoDB puede parecerse al siguiente:
root |-- Item: struct | |-- ColA: struct | | |-- S: string | |-- ColB: struct | | |-- S: string | |-- ColC: struct | | |-- N: string | |-- ColD: struct | | |-- L: array | | | |-- element: null
La transformación unnestDDBJson()
convertiría esto en:
root |-- ColA: string |-- ColB: string |-- ColC: string |-- ColD: array | |-- element: null
En el siguiente ejemplo de código, se muestra cómo utilizar el conector de exportación de DynamoDB de AWS Glue, invocar un desanidamiento JSON de DynamoDB e imprimir el número de particiones:
import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.DynamoDbDataSink import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object GlueApp { def main(sysArgs: Array[String]): Unit = { val glueContext = new GlueContext(SparkContext.getOrCreate()) val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) val dynamicFrame = glueContext.getSourceWithFormat( connectionType = "dynamodb", options = JsonOptions(Map( "dynamodb.export" -> "ddb", "dynamodb.tableArn" -> "<test_source>", "dynamodb.s3.bucket" -> "<bucket name>", "dynamodb.s3.prefix" -> "<bucket prefix>", "dynamodb.s3.bucketOwner" -> "<account_id of bucket>", )) ).getDynamicFrame() val unnested = dynamicFrame.unnestDDBJson() print(unnested.getNumPartitions()) Job.commit() } }
Def withFrameSchema
def withFrameSchema( getSchema : () => Schema ) : DynamicFrame
getSchema
: función que devuelve el esquema que se va a utilizar. Se especifica como una función sin parámetros para diferir el cómputo potencialmente costoso.
Establece el esquema de este objeto DynamicFrame
en el valor especificado. Se utiliza principalmente de forma interna para evitar un nuevo y costoso cálculo de esquema. El esquema pasado debe contener todas las columnas presentes en los datos.
Def withName
def withName( name : String ) : DynamicFrame
name
: nuevo nombre que se usará.
Devuelve una copia de este objeto DynamicFrame
con un nuevo nombre.
Def withTransformationContext
def withTransformationContext( ctx : String ) : DynamicFrame
Devuelve una copia de este objeto DynamicFrame
con el contexto de transformación especificado.