AWS Glue DynamicFrameClasse Scala - AWS Glue

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

AWS Glue DynamicFrameClasse Scala

Pacchetto: com.amazonaws.services.glue

class DynamicFrame extends Serializable with Logging ( val glueContext : GlueContext, _records : RDD[DynamicRecord], val name : String = s"", val transformationContext : String = DynamicFrame.UNDEFINED, callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0, prevErrors : => Long = 0, errorExpr : => Unit = {} )

Un DynamicFrame è una raccolta distribuita di oggetti DynamicRecord autodescrittivi.

DynamicFrame sono stati progettati per fornire un modello di dati flessibile per le operazioni ETL (estrazione, trasformazione e caricamento). Questi oggetti non richiedono la creazione di uno schema e possono essere usati per leggere e trasformare i dati che contengono valori e tipi non organizzati e non coerenti. Un schema può essere calcolato on demand per le operazioni che ne richiedono uno.

DynamicFrame offrono un'ampia gamma di trasformazioni per la pulizia dei dati e operazioni ETL. Supportano anche la conversione da e verso SparkSQL DataFrames per l'integrazione con il codice esistente e le numerose operazioni di analisi che DataFrames forniscono.

I parametri seguenti vengono condivisi tra molte trasformazioni AWS Glue che costituiscono oggetti DynamicFrame:

  • transformationContext — Identificatore per questo DynamicFrame. Il transformationContext viene usato come chiave per lo stato dei segnalibro di processo che viene mantenuto tra esecuzioni.

  • callSite — Fornisce informazioni sul contesto per la segnalazione degli errori. Questi valori vengono impostati automaticamente durante la chiamata da Python.

  • stageThreshold — Numero massimo di record di errore consentiti nel calcolo di questo DynamicFrame prima di generare un'eccezione, esclusi i record presenti nell'oggetto DynamicFrame precedente.

  • totalThreshold — numero massimo di record di errore totali prima di generare un'eccezione, inclusi quelli dei frame precedenti.

Val errorsCount

val errorsCount

Numero di record di errore in questo oggetto DynamicFrame. Include gli errori restituiti dalle operazioni precedenti.

Def applyMapping

def applyMapping( mappings : Seq[Product4[String, String, String, String]], caseSensitive : Boolean = true, transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame
  • mappings — Sequenza di mappature per creare un nuovo oggetto DynamicFrame.

  • caseSensitive — Specifica se considerare o meno le colonne di origine come colonne che fanno distinzione tra maiuscole e minuscole. L'impostazione di questo parametro su false può essere utile per l'integrazione con archivi che non fanno distinzione tra maiuscole e minuscole, come il catalogo dati AWS Glue.

Seleziona, proietta e trasmette le colonne in base a una sequenza di mappature.

Ogni mappatura è costituita da una colonna e un tipo di origine e da una colonna e un tipo target. Le mappature possono essere specificate come un 4-tuple (source_path, source_type, target_path, target_type) o un oggetto MappingSpec contenente le stesse informazioni.

Oltre che per semplici proiezioni e trasferimenti, le mappature possono essere usate per annidare campi o annullarne l'annidamento separando i componenti del percorso con "'." (punto).

Ad esempio, supponiamo di avere un DynamicFrame con lo schema seguente.

{{{ root |-- name: string |-- age: int |-- address: struct | |-- state: string | |-- zip: int }}}

Puoi effettuare la chiamata seguente per annullare l'annidamento dei campi state e zip.

{{{ df.applyMapping( Seq(("name", "string", "name", "string"), ("age", "int", "age", "int"), ("address.state", "string", "state", "string"), ("address.zip", "int", "zip", "int"))) }}}

Lo schema risultante è il seguente.

{{{ root |-- name: string |-- age: int |-- state: string |-- zip: int }}}

Puoi anche usare applyMapping per riannidare le colonne. Ad esempio, il codice seguente inverte la trasformazione precedente e crea una struttura denominata address nel target.

{{{ df.applyMapping( Seq(("name", "string", "name", "string"), ("age", "int", "age", "int"), ("state", "string", "address.state", "string"), ("zip", "int", "address.zip", "int"))) }}}

I nomi dei campi che contengono i caratteri "." (periodo) possono essere quotati utilizzando le virgolette (``).

Nota

Al momento, non puoi utilizzare il metodo applyMapping per mappare colonne annidate all'interno di matrici.

Def assertErrorThreshold

def assertErrorThreshold : Unit

Operazione che forza il calcolo e verifica che il numero di record di errore sia inferiore a stageThreshold e totalThreshold. Genera un'eccezione se una delle due condizioni non è vera.

Def conteggio

lazy def count

Restituisce il numero di elementi inclusi in questo oggetto DynamicFrame.

Def dropField

def dropField( path : String, transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame

Restituisce un nuovo oggetto DynamicFrame con la colonna specificata rimossa.

Def dropFields

def dropFields( fieldNames : Seq[String], // The column names to drop. transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame

Restituisce un nuovo oggetto DynamicFrame con le colonne specificate rimosse.

Questo metodo può essere usato per eliminare colonne annidate, incluse quelle all'interno di matrici, ma non per eliminare elementi di matrice specifici.

Def dropNulls

def dropNulls( transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 )

Restituisce un nuovo oggetto DynamicFrame con tutte le colonne null rimosse.

Nota

Rimuove solo le colonne di tipo NullType. I singoli valori null in altre colonne non vengono rimossi o modificati.

Cornice Def errorsAsDynamic

def errorsAsDynamicFrame

Restituisce un nuovo oggetto DynamicFrame contenente i record degli errori da questo DynamicFrame.

Def filtro

def filter( f : DynamicRecord => Boolean, errorMsg : String = "", transformationContext : String = "", callSite : CallSite = CallSite("Not provided"), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame

Crea un nuovo oggetto DynamicFrame contenente solo i record per i quali la funzione "f" restituisce true. La funzione di filtro "f" non dovrebbe modificare il record di input.

Def getName

def getName : String

Restituisce il nome di questo oggetto DynamicFrame.

Def getNumPartitions

def getNumPartitions

Restituisce il numero di partizioni incluse in questo oggetto DynamicFrame.

Def calcolato getSchemaIf

def getSchemaIfComputed : Option[Schema]

Restituisce lo schema se è già stato calcolato. Non analizza i dati se lo schema non è stato ancora calcolato.

Def isSchemaComputed

def isSchemaComputed : Boolean

Restituisce true se lo schema è stato calcolato per questo oggetto DynamicFrame oppure restituisce false in caso contrario. Se questo metodo restituisce false, la chiamata del metodo schema richiede un altro passaggio sui record in questo oggetto DynamicFrame.

Def javaToPython

def javaToPython : JavaRDD[Array[Byte]]

Def join

def join( keys1 : Seq[String], keys2 : Seq[String], frame2 : DynamicFrame, transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame
  • keys1 — Le colonne in questo DynamicFrame da utilizzare per l'unione.

  • keys2 — Le colonne in frame2 da utilizzare per l'unione. Deve avere la stessa lunghezza di keys1.

  • frame2 — DynamicFrame da unire.

Restituisce il risultato dell'esecuzione di una query equijoin con frame2 usando le chiavi specificate.

Def mappa

def map( f : DynamicRecord => DynamicRecord, errorMsg : String = "", transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame

Restituisce un nuovo oggetto DynamicFrame creato applicando la funzione "f" specificata a ogni record in questo oggetto DynamicFrame.

Questo metodo copia ogni record prima di applicare la funzione specificata e di conseguenza è sicuro per la modifica dei record. Se la funzione di mappatura genera un'eccezione per un determinato record, il record verrà contrassegnato come errore e l'analisi dello stack verrà salvata come colonna nel record di errore.

Def mergeDynamicFrames

def mergeDynamicFrames( stageDynamicFrame: DynamicFrame, primaryKeys: Seq[String], transformationContext: String = "", options: JsonOptions = JsonOptions.empty, callSite: CallSite = CallSite("Not provided"), stageThreshold: Long = 0, totalThreshold: Long = 0): DynamicFrame
  • stageDynamicFrame — Il DynamicFrame di gestione temporanea da unire.

  • primaryKeys — L'elenco dei campi chiave primaria per abbinare i record dall'origine e dal DynamicFrame di gestione temporanea.

  • transformationContext — Una stringa univoca utilizzata per recuperare i metadati relativi alla trasformazione corrente (opzionale).

  • options: una stringa di coppie nome-valore JSON che forniscono informazioni aggiuntive per questa trasformazione.

  • callSite — Usato per fornire informazioni sul contesto per la segnalazione degli errori.

  • stageThreshold — Un Long. Il numero di errori nella trasformazione specificata per cui l'elaborazione deve restituire un errore.

  • totalThreshold — Un Long. Il numero totale di errori fino a questa trasformazione inclusa per i quali l'elaborazione deve restituire un errore.

Unisce questo DynamicFrame con un DynamicFrame temporaneo basato sulle chiavi primarie specificate per identificare i record. I registri duplicati (registri con le stesse chiavi primarie) non vengono deduplicati. Se non è presente alcun record corrispondente nel frame temporaneo, tutti i record (inclusi i duplicati) vengono mantenuti dall'origine. Se il frame di staging dispone di record corrispondenti, i suoi registri sovrascrivono i registri nell'origine in AWS Glue.

Il DynamicFrame restituito contiene il record A in questi casi:

  1. Se A esiste sia nel frame di origine che nel frame temporaneo, viene restituito A nel frame temporaneo.

  2. Se A si trova nella tabella di origine e A.primaryKeys non si trova nel stagingDynamicFrame (ciò significa che A non viene aggiornato nella tabella temporanea).

Il frame di origine e il frame temporaneo non devono avere lo stesso schema.

val mergedFrame: DynamicFrame = srcFrame.mergeDynamicFrames(stageFrame, Seq("id1", "id2"))

Def printSchema

def printSchema : Unit

Stampa lo schema di questo oggetto DynamicFrame in stdout in un formato leggibile.

Def recomputeSchema

def recomputeSchema : Schema

Forza un nuovo calcolo dello schema. Questa operazione richiede una scansione dei dati, ma potrebbe "limitare" lo schema se lo schema corrente include dati che nono sono presenti nei dati.

Restituisce lo schema ricalcolato.

Def relazionalizzazione

def relationalize( rootTableName : String, stagingPath : String, options : JsonOptions = JsonOptions.empty, transformationContext : String = "", callSite : CallSite = CallSite("Not provided"), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : Seq[DynamicFrame]
  • rootTableName: il nome da usare per l'oggetto DynamicFrame di base nell'output. Gli oggetti DynamicFrame creati tramite il pivoting di matrici usano questo nome come prefisso.

  • stagingPath — il percorso Amazon Simple Storage Service (Amazon S3) per la scrittura di dati intermedi.

  • options — opzioni e configurazione per l'applicazione di relazioni. Attualmente inutilizzato.

Appiattisce tutte le strutture annidate e trasforma tramite pivoting le matrici in tabelle separate.

Questa operazione può essere usata per preparare dati annidati a più livelli per l'inserimento in un database relazionale. Le strutture annidate vengono appiattite allo stesso modo della trasformazione Unnest. Inoltre, le matrici vengono trasformate tramite pivoting in tabelle separate attraverso un'operazione in cui ogni elemento di matrice diventa una riga. Ad esempio, supponiamo di avere un DynamicFrame con i dati seguenti.

{"name": "Nancy", "age": 47, "friends": ["Fred", "Lakshmi"]} {"name": "Stephanie", "age": 28, "friends": ["Yao", "Phil", "Alvin"]} {"name": "Nathan", "age": 54, "friends": ["Nicolai", "Karen"]}

Eseguire il seguente codice.

{{{ df.relationalize("people", "s3:/my_bucket/my_path", JsonOptions.empty) }}}

Il codice produce due tabelle. La prima tabella è denominata "persone" e contiene quanto segue.

{{{ {"name": "Nancy", "age": 47, "friends": 1} {"name": "Stephanie", "age": 28, "friends": 2} {"name": "Nathan", "age": 54, "friends": 3) }}}

Qui le matrici friends sono state sostituite con una chiave di join generata automaticamente. Viene creata una tabella separata denominata people.friends con il contenuto seguente.

{{{ {"id": 1, "index": 0, "val": "Fred"} {"id": 1, "index": 1, "val": "Lakshmi"} {"id": 2, "index": 0, "val": "Yao"} {"id": 2, "index": 1, "val": "Phil"} {"id": 2, "index": 2, "val": "Alvin"} {"id": 3, "index": 0, "val": "Nicolai"} {"id": 3, "index": 1, "val": "Karen"} }}}

In questa tabella "id" è una chiave di join che identifica il record da cui proviene l'elemento della matrice, "index" fa riferimento alla posizione nella matrice originale e "val" è l'effettiva voce della matrice.

Il metodo relationalize restituisce la sequenza di oggetti DynamicFrame creati applicando questo processo in modo ricorsivo a tutte le matrici.

Nota

La libreria AWS Glue genera automaticamente le chiavi di join per le nuove tabelle. Per garantire che le chiavi di join siano univoche tra esecuzioni di processi, devono essere abilitati i segnalibro di processo.

Def renameField

def renameField( oldName : String, newName : String, transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame
  • oldName: nome originale della colonna.

  • newName: nuovo nome della colonna.

Restituisce un nuovo DynamicFrame con il campo specificato rinominato.

Questo metodo può essere usato per rinominare campi annidati. Ad esempio, il codice seguente rinominerebbe state in state_code all'interno della struttura address.

{{{ df.renameField("address.state", "address.state_code") }}}

Def ripartizione

def repartition( numPartitions : Int, transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame

Restituisce un nuovo oggetto DynamicFrame con partizioni numPartitions.

Def resolveChoice

def resolveChoice( specs : Seq[Product2[String, String]] = Seq.empty[ResolveSpec], choiceOption : Option[ChoiceOption] = None, database : Option[String] = None, tableName : Option[String] = None, transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame
  • choiceOption — Un'operazione da applicare a tutte le colonne ChoiceType non elencate nella sequenza delle specifiche.

  • database — Il database del catalogo dati da usare con l'operazione match_catalog.

  • tableName — La tabella del catalogo dati da usare con l'operazione match_catalog.

Restituisce un nuovo oggetto DynamicFrame sostituendo uno o più oggetti ChoiceType con un tipo più specifico.

Ci sono due modi per utilizzare resolveChoice. Il primo consiste nell'indicare una sequenza di colonne specifiche e come risolverle. Queste vengono specificate come tuple costituite da coppie (colonna, operazione).

Sono disponibili le operazioni seguenti:

  • cast:type: tenta di trasmettere tutti i valori al tipo specificato.

  • make_cols: converte ogni tipo distinto in colonna con il nome columnName_type.

  • make_struct: converte una colonna in struttura con chiavi per ogni tipo distinto.

  • project:type — mantiene solo i valori del tipo specificato.

L'altra modalità per resolveChoice è specificare una singola risoluzione per tutti gli oggetti ChoiceType. Puoi usare questa modalità nei casi in cui l'elenco completo di oggetti ChoiceType non è noto prima dell'esecuzione. Oltre alle operazioni elencate in precedenza, questa modalità supporta anche l'operazione seguente:

  • match_catalogChoiceType: tenta di trasmettere ogni oggetto al tipo corrispondente nella tabella del catalogo specificata.

Esempi:

Risoluzione della colonna user.id mediante casting a un tipo int, facendo in modo che il campo address mantenga solo le strutture:

{{{ df.resolveChoice(specs = Seq(("user.id", "cast:int"), ("address", "project:struct"))) }}}

Risoluzione di tutti gli oggetti ChoiceType mediante conversione di ogni scelta in una colonna separata:

{{{ df.resolveChoice(choiceOption = Some(ChoiceOption("make_cols"))) }}}

Risoluzione di tutte gli oggetti ChoiceType mediante casting ai tipi nella tabella del catalogo specificata:

{{{ df.resolveChoice(choiceOption = Some(ChoiceOption("match_catalog")), database = Some("my_database"), tableName = Some("my_table")) }}}

Def schema

def schema : Schema

Restituisce lo schema di questo oggetto DynamicFrame.

Lo schema restituito è garantito per contenere ogni campo presente in un record in questo DynamicFrame. Tuttavia, in un esiguo numero di casi, può anche contenere campi aggiuntivi. Puoi utilizzare il metodo Unnest per "ridurre" lo schema in base ai record in questo DynamicFrame.

Def selectField

def selectField( fieldName : String, transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame

Restituisce un singolo campo come DynamicFrame.

Def selectFields

def selectFields( paths : Seq[String], transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame
  • paths — La sequenza dei nomi delle colonne da selezionare.

Restituisce un nuovo oggetto DynamicFrame contenente le colonne specificate.

Nota

Il metodo selectFields può essere usato solo per selezionare colonne di primo livello. Puoi utilizzare il metodo applyMapping per selezionare colonne annidate.

Def mostra

def show( numRows : Int = 20 ) : Unit
  • numRows — Numero di righe da stampare.

Stampa le righe di questo oggetto DynamicFrame in formato JSON.

Def semplifica DDBJSON

Le esportazioni DynamoDB con il connettore di esportazione AWS Glue DynamoDB producono file JSON di strutture annidate specifiche. Per ulteriori informazioni, consulta Data objects. simplifyDDBJson Semplifica le colonne annidate in un tipo DynamicFrame di dati di questo tipo e ne restituisce uno nuovo semplificato. DynamicFrame Se ci sono più tipi o un tipo di mappa contenuto in un tipo di elenco, gli elementi nell'elenco non verranno semplificati. Questo metodo supporta solo i dati nel formato JSON di esportazione DynamoDB. Prendi in considerazione unnest la possibilità di apportare modifiche simili su altri tipi di dati.

def simplifyDDBJson() : DynamicFrame

Questo metodo non accetta alcun parametro.

Input di esempio

Prendi in considerazione lo schema seguente generato da un'esportazione DynamoDB:

root |-- Item: struct | |-- parentMap: struct | | |-- M: struct | | | |-- childMap: struct | | | | |-- M: struct | | | | | |-- appName: struct | | | | | | |-- S: string | | | | | |-- packageName: struct | | | | | | |-- S: string | | | | | |-- updatedAt: struct | | | | | | |-- N: string | |-- strings: struct | | |-- SS: array | | | |-- element: string | |-- numbers: struct | | |-- NS: array | | | |-- element: string | |-- binaries: struct | | |-- BS: array | | | |-- element: string | |-- isDDBJson: struct | | |-- BOOL: boolean | |-- nullValue: struct | | |-- NULL: boolean

Esempio di codice

import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.DynamoDbDataSink import org.apache.spark.SparkContextimport scala.collection.JavaConverters._ object GlueApp { def main(sysArgs: Array[String]): Unit = { val glueContext = new GlueContext(SparkContext.getOrCreate()) val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) val dynamicFrame = glueContext.getSourceWithFormat( connectionType = "dynamodb", options = JsonOptions(Map( "dynamodb.export" -> "ddb", "dynamodb.tableArn" -> "ddbTableARN", "dynamodb.s3.bucket" -> "exportBucketLocation", "dynamodb.s3.prefix" -> "exportBucketPrefix", "dynamodb.s3.bucketOwner" -> "exportBucketAccountID", )) ).getDynamicFrame() val simplified = dynamicFrame.simplifyDDBJson() simplified.printSchema() Job.commit() } }

La trasformazione simplifyDDBJson semplificherà questo processo in:

root |-- parentMap: struct | |-- childMap: struct | | |-- appName: string | | |-- packageName: string | | |-- updatedAt: string |-- strings: array | |-- element: string |-- numbers: array | |-- element: string |-- binaries: array | |-- element: string |-- isDDBJson: boolean |-- nullValue: null

Def spigot

def spigot( path : String, options : JsonOptions = new JsonOptions("{}"), transformationContext : String = "", callSite : CallSite = CallSite("Not provided"), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame

Trasformazione passthrough che restituisce gli stessi record, ma scrive un sottoinsieme di record come effetto secondario.

  • path — Il percorso in Amazon S3 in cui scrivere l'output, nel formato s3://bucket//path.

  • options  — Una mappa JsonOptions opzionale che descrive il comportamento di campionamento.

Restituisce un oggetto DynamicFrame contenente gli stessi record di questo.

Per impostazione predefinita, scrive 100 record arbitrari nel percorso specificato da path. Questo comportamento può essere personalizzato usando la mappa options. Le chiavi valide includono le seguenti:

  • topk — Specifica il numero totale di record scritti. Il valore di default è 100.

  • prob: specifica la probabilità (sotto forma di valore decimale) di inclusione di un singolo record. Il valore predefinito è 1.

Ad esempio, la chiamata seguente esegue il campionamento del set di dati selezionando ogni record con una probabilità del 20% e arrestandosi dopo la scrittura di 200 record.

{{{ df.spigot("s3://my_bucket/my_path", JsonOptions(Map("topk" -> 200, "prob" -> 0.2))) }}}

Def splitFields

def splitFields( paths : Seq[String], transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : Seq[DynamicFrame]
  • paths — Percorsi da includere nel primo DynamicFrame.

Restituisce una sequenza di due oggetti DynamicFrame. Il primo oggetto DynamicFrame contiene i percorsi specificati, mentre il secondo contiene tutte le altre colonne.

Esempio

Questo esempio prende una tabella DynamicFrame creata dalla persons tabella nel legislators database del AWS Glue Data Catalog e la DynamicFrame divide in due, con i campi specificati che entrano nel primo DynamicFrame e i campi rimanenti in un secondo DynamicFrame. L'esempio sceglie quindi il primo DynamicFrame dal risultato.

val InputFrame = glueContext.getCatalogSource(database="legislators", tableName="persons", transformationContext="InputFrame").getDynamicFrame() val SplitField_collection = InputFrame.splitFields(paths=Seq("family_name", "name", "links.note", "links.url", "gender", "image", "identifiers.scheme", "identifiers.identifier", "other_names.lang", "other_names.note", "other_names.name"), transformationContext="SplitField_collection") val ResultFrame = SplitField_collection(0)

Def splitRows

def splitRows( paths : Seq[String], values : Seq[Any], operators : Seq[String], transformationContext : String, callSite : CallSite, stageThreshold : Long, totalThreshold : Long ) : Seq[DynamicFrame]

Suddivide le righe in base a predicati che confrontano colonne e costanti.

  • paths — Colonne da usare per il confronto.

  • values — I valori di costante da usare per il confronto.

  • operators — Gli operatori da usare per il confronto.

Restituisce una sequenza di due oggetti DynamicFrame. Il primo contiene le righe per cui il predicato è true, il secondo contiene quelle per cui è false.

I predicati vengono specificati usando tre sequenze: "paths" contiene i nomi delle colonne (possibilmente annidate), "values" contiene i valori di costante rispetto ai quali eseguire il confronto e "operators" contiene gli operatori da usare per il confronto. Le tre sequenze devono essere tutte della stessa lunghezza: l'nesimo operatore verrà usato per confrontare la nesima colonna con l'nesimo valore.

Gli operatori consentiti sono: "!=", "=", "<=", "<", ">=" o ">".

Ad esempio, la chiamata seguente divide un oggetto DynamicFrame in modo che il primo frame di output contenga i record di persone degli Stati Uniti di età maggiore di 65 anni e che il secondo contenga tutti gli altri record.

{{{ df.splitRows(Seq("age", "address.country"), Seq(65, "USA"), Seq("&gt;=", "=")) }}}

Def stageErrorsCount

def stageErrorsCount

Restituisce il numero di record di errore creati durante il calcolo di questo oggetto DynamicFrame. Sono esclusi gli errori restituiti dalla operazioni precedenti passate a questo oggetto DynamicFrame come input.

Def toDF

def toDF( specs : Seq[ResolveSpec] = Seq.empty[ResolveSpec] ) : DataFrame

Converte questo DynamicFrame in un Apache Spark SQL DataFrame con lo stesso schema e gli stessi record.

Nota

Poiché gli oggetti DataFrame non supportano oggetti ChoiceType, questo metodo converte automaticamente le colonne ChoiceType in oggetti StructType. Per ulteriori informazioni sulle opzioni per le scelte di risoluzione, consulta resolveChoice.

Def unbox

def unbox( path : String, format : String, optionString : String = "{}", transformationContext : String = "", callSite : CallSite = CallSite("Not provided"), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame
  • path — La colonna da analizzare. Deve essere di tipo String o Binary.

  • format: formato da usare per l'analisi.

  • optionString: opzioni da passare al formato, ad esempio il separatore per file CSV.

Analizza una stringa incorporata o una colonna binaria in base al formato specificato. Le colonne analizzate vengono annidate all'interno di una struttura con il nome di colonna originale.

Supponi, ad esempio, di avere un file CSV con una colonna JSON incorporata.

name, age, address Sally, 36, {"state": "NE", "city": "Omaha"} ...

Dopo un'analisi iniziale, avresti un oggetto DynamicFrame con lo schema seguente.

{{{ root |-- name: string |-- age: int |-- address: string }}}

Puoi chiamare unbox nella colonna address per analizzare i componenti specifici.

{{{ df.unbox("address", "json") }}}

Otterrai un oggetto DynamicFrame con lo schema seguente.

{{{ root |-- name: string |-- age: int |-- address: struct | |-- state: string | |-- city: string }}}

Def unnest

def unnest( transformationContext : String = "", callSite : CallSite = CallSite("Not Provided"), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame

Restituisce un nuovo oggetto DynamicFrame con tutte le strutture annidate appiattite. I nomi vengono creati usando il carattere "." (punto).

Ad esempio, supponiamo di avere un DynamicFrame con lo schema seguente.

{{{ root |-- name: string |-- age: int |-- address: struct | |-- state: string | |-- city: string }}}

La chiamata seguente annulla l'annidamento della struttura address.

{{{ df.unnest() }}}

Lo schema risultante è il seguente.

{{{ root |-- name: string |-- age: int |-- address.state: string |-- address.city: string }}}

Questo metodo, inoltre, annulla l'annidamento delle strutture all'interno di array. Tuttavia, per motivi storici, ai nomi di tali campi vengono anteposti il nome della matrice di chiusura e ".val".

Def unnestDDBJson

unnestDDBJson(transformationContext : String = "", callSite : CallSite = CallSite("Not Provided"), stageThreshold : Long = 0, totalThreshold : Long = 0): DynamicFrame

Snidifica le colonne nidificate in un DynamicFrame che si trovano specificamente nella struttura JSON di DynamoDB e restituisce un nuovo DynamicFrame non annidato. Le colonne che sono di un array di struct non verranno annidate. Si noti che si tratta di un tipo specifico di trasformazione di snidamento che si comporta in modo diverso dalla normale trasformazione di unnest e richiede che i dati siano già nella struttura JSON di DynamoDB. Per ulteriori informazioni, consulta DynamoDB JSON.

Ad esempio, lo schema di lettura di un'esportazione con la struttura JSON DynamoDB potrebbe apparire come segue:

root |-- Item: struct | |-- ColA: struct | | |-- S: string | |-- ColB: struct | | |-- S: string | |-- ColC: struct | | |-- N: string | |-- ColD: struct | | |-- L: array | | | |-- element: null

La trasformazione di unnestDDBJson() lo convertirebbe in:

root |-- ColA: string |-- ColB: string |-- ColC: string |-- ColD: array | |-- element: null

L'esempio di codice seguente mostra come utilizzare il connettore di esportazione DynamoDB AWS Glue, richiamare un JSON di DynamoDB unnest e stampare il numero di partizioni:

import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.DynamoDbDataSink import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object GlueApp { def main(sysArgs: Array[String]): Unit = { val glueContext = new GlueContext(SparkContext.getOrCreate()) val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) val dynamicFrame = glueContext.getSourceWithFormat( connectionType = "dynamodb", options = JsonOptions(Map( "dynamodb.export" -> "ddb", "dynamodb.tableArn" -> "<test_source>", "dynamodb.s3.bucket" -> "<bucket name>", "dynamodb.s3.prefix" -> "<bucket prefix>", "dynamodb.s3.bucketOwner" -> "<account_id of bucket>", )) ).getDynamicFrame() val unnested = dynamicFrame.unnestDDBJson() print(unnested.getNumPartitions()) Job.commit() } }

Def withFrameSchema

def withFrameSchema( getSchema : () => Schema ) : DynamicFrame
  • getSchema — Una funzione che restituisce lo schema da usare. È specificata come funzione a zero parametri per posticipare i calcoli potenzialmente onerosi.

Imposta lo schema di questo oggetto DynamicFrame sul valore specificato. Viene usato per lo più internamente per evitare ricalcoli dello schema onerosi. Lo schema passato deve contenere tutte le colonne presenti nei dati.

Def withName

def withName( name : String ) : DynamicFrame
  • name — Il nuovo nome da usare.

Restituisce una copia di questo oggetto DynamicFrame con un nuovo nome.

Def withTransformationContext

def withTransformationContext( ctx : String ) : DynamicFrame

Restituisce una copia di questo oggetto DynamicFrame con il contesto di trasformazione specificato.