AWS Glue Scala DynamicFrame-Klasse - AWS Glue

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

AWS Glue Scala DynamicFrame-Klasse

Paket: com.amazonaws.services.glue

class DynamicFrame extends Serializable with Logging ( val glueContext : GlueContext, _records : RDD[DynamicRecord], val name : String = s"", val transformationContext : String = DynamicFrame.UNDEFINED, callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0, prevErrors : => Long = 0, errorExpr : => Unit = {} )

Ein DynamicFrame ist eine verteilte Sammlung von selbstbeschreibenden DynamicRecord Objekten.

DynamicFrames wurden entwickelt, um ein flexibles Datenmodell für ETL-Operationen (Extrahieren, Transformieren und Laden) bereitzustellen. Sie benötigen kein Schema zum Erstellen, und Sie können damit Daten lesen und transformieren, die unstrukturierte oder inkonsistente Werte und Typen enthalten. Ein Schema kann bei Bedarf für solche Operationen berechnet werden, die eines benötigen.

DynamicFrames bieten eine Reihe von Transformationen für die Datenreinigung und ETL. Sie unterstützen auch die Konvertierung zu und von SparkSQL DataFrames , um sie in vorhandenen Code und die vielen Analysevorgänge zu integrieren, die DataFrames bieten.

Die folgenden Parameter werden über viele AWS Glue-Transformationen hinweg geteilt, die DynamicFrames erstellen:

  • transformationContext – Der Bezeichner für diesen DynamicFrame. Der transformationContext wird als Schlüssel für den Auftrags-Lesezeichenstatus verwendet, der während der Ausführungen persistent ist.

  • callSite – Liefert Kontextinformationen für die Fehlerberichterstattung. Diese Werte werden automatisch beim Aufruf von Python festgelegt.

  • stageThreshold – Die maximale Anzahl der Fehlerdatensätze, die aufgrund der Berechnung dieses DynamicFrame zulässig sind, ehe eine Ausnahme ausgelöst wird. Ausgenommen sind Datensätze aus dem vorherigen DynamicFrame.

  • totalThreshold – Die maximale Anzahl der Gesamtfehlersätze, bevor eine Ausnahme ausgelöst wird, einschließlich derjenigen aus früheren Frames.

Val errorsCount

val errorsCount

Die Anzahl der Fehlerdatensätze in diesem DynamicFrame. Dazu zählen Fehler aus früheren Operationen.

Def applyMapping

def applyMapping( mappings : Seq[Product4[String, String, String, String]], caseSensitive : Boolean = true, transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame
  • mappings – Eine Folge von Zuweisungen für die Erstellung eines neuen DynamicFrame.

  • caseSensitive – Legt fest, ob bei Quellspalten die Groß-/Kleinschreibung beachtet wird. Die Festlegung auf „false“ kann bei der Integration in Datenspeicher wie dem AWS Glue Data Catalog helfen, bei dem die Groß-/Kleinschreibung nicht berücksichtigt wird.

Selektiert, projiziert und wandelt Spalten basierend auf Mappingreihenfolgen um.

Jede Zuweisung besteht aus einer Quellspalte und einem Typ und einer Zielspalte und einem Typ. Zuweisungen können entweder als „vierstelliger“ Tupel (source_path, source_type, target_path, target_type) oder als MappingSpec-Objekt, das dieselben Informationen enthält, angegeben werden.

Neben der Verwendung von Zuweisungen für einfache Projektionen und Umwandlungen können Sie diese auch zum Verschachteln oder Aufheben der Verschachtelung von Feldern verwenden, indem Sie Komponenten des Pfades mit „.“ (Punkt) trennen.

Angenommen, Sie haben einen DynamicFrame mit folgendem Schema:

{{{ root |-- name: string |-- age: int |-- address: struct | |-- state: string | |-- zip: int }}}

Mit dem folgenden Aufruf können Sie die Verschachtelung der Felder state und zip aufheben:

{{{ df.applyMapping( Seq(("name", "string", "name", "string"), ("age", "int", "age", "int"), ("address.state", "string", "state", "string"), ("address.zip", "int", "zip", "int"))) }}}

Das resultierende Schema lautet wie folgt:

{{{ root |-- name: string |-- age: int |-- state: string |-- zip: int }}}

Sie können auch applyMapping verwenden, um Spalten neu zu verschachteln. Durch folgende Operation wird beispielsweise die vorherige Transformation umgekehrt und eine neue Struktur namens address am Ziel erstellt:

{{{ df.applyMapping( Seq(("name", "string", "name", "string"), ("age", "int", "age", "int"), ("state", "string", "address.state", "string"), ("zip", "int", "address.zip", "int"))) }}}

Feldnamen, die „.“ (Punkt-)Zeichen enthalten, können mit Hilfe von Backticks (``) zitiert werden.

Anmerkung

Zurzeit können Sie die applyMapping-Methode nicht für die Zuweisung von Spalten verwenden, die unter Arrays verschachtelt sind.

Def assertErrorThreshold

def assertErrorThreshold : Unit

Eine Aktion, die eine Berechnung erzwingt und sicherstellt, dass die Anzahl der Fehlerdatensätze stageThreshold und totalThreshold nicht überschreitet. Löst eine Ausnahme aus, wenn eine der Bedingungen fehlschlägt.

Def count

lazy def count

Gibt die Anzahl der Elemente in diesem DynamicFrame zurück.

Def dropField

def dropField( path : String, transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame

Gibt einen neuen DynamicFrame zurück, bei dem die angegebene Spalte entfernt wurde.

Def dropFields

def dropFields( fieldNames : Seq[String], // The column names to drop. transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame

Gibt einen neuen DynamicFrame zurück, bei dem die angegebenen Spalten entfernt wurden.

Sie können diese Methode verwenden, um verschachtelte Spalten zu löschen, einschließlich derjenigen in Arrays. Sie kann aber nicht eingesetzt werden, um bestimmte Array-Elemente zu verwerfen.

Def dropNulls

def dropNulls( transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 )

Gibt einen neuen DynamicFrame zurück, bei dem alle Nullspalten entfernt sind.

Anmerkung

Dies entfernt nur Spalten des Typs NullType. Einzelne Nullwerte in anderen Spalten werden weder entfernt noch geändert.

Def- errorsAsDynamicFrame

def errorsAsDynamicFrame

Gibt einen neuen DynamicFrame mit den Fehlersätzen aus diesem DynamicFrame zurück.

Def filter

def filter( f : DynamicRecord => Boolean, errorMsg : String = "", transformationContext : String = "", callSite : CallSite = CallSite("Not provided"), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame

Erstellt einen neuen DynamicFrame, der nur die Datensätze enthält, für die die Funktion „ftrue zurückgibt. Die Filterfunktion „f“ sollte den Eingabe-Datensatz nicht verändern.

Def getName

def getName : String

Gibt den Namen dieses DynamicFrame zurück.

Def getNumPartitions

def getNumPartitions

Gibt die Anzahl der Partitionen in diesem DynamicFrame zurück.

Def getSchemaIfberechnet

def getSchemaIfComputed : Option[Schema]

Gibt das Schema zurück, wenn es bereits berechnet wurde. Scannt die Daten nicht, wenn das Schema noch nicht berechnet wurde.

Def isSchemaComputed

def isSchemaComputed : Boolean

Gibt "true" zurück, wenn das Schema für diesen DynamicFrame bereits berechnet wurde, andernfalls "false". Wenn diese Methode "false" zurückgibt, erfordert der Aufruf der schema-Methode eine erneute Übergabe dieser Datensätze im DynamicFrame.

Def javaToPython

def javaToPython : JavaRDD[Array[Byte]]

Def join

def join( keys1 : Seq[String], keys2 : Seq[String], frame2 : DynamicFrame, transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame
  • keys1 – Die Spalten in diesem DynamicFrame, die für den Join verwendet werden sollen.

  • keys2 – Die Spalten in frame2, die für den Join verwendet werden sollen. Muss die gleiche Länge haben wie keys1.

  • frame2 – Der andere DynamicFrame für einen Join.

Gibt das Ergebnis der Durchführung eines equijoin mit frame2 über die angegebenen Schlüssel zurück.

Def map

def map( f : DynamicRecord => DynamicRecord, errorMsg : String = "", transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame

Gibt einen neuen DynamicFrame zurück, der erstellt wird, indem die angegebene Funktion „f“ auf jeden Datensatz in diesem DynamicFrame angewendet wird.

Diese Methode kopiert jeden Datensatz, ehe die angegebene Funktion angewendet wird, sodass das Verändern der Datensätze sicher ist. Wenn die Zuweisungsfunktion eine Ausnahme für einen bestimmten Datensatz auslöst, wird der Datensatz als fehlerhaft gekennzeichnet und der Stack-Trace als Spalte im Fehlerdatensatz gespeichert.

Def mergeDynamicFrames

def mergeDynamicFrames( stageDynamicFrame: DynamicFrame, primaryKeys: Seq[String], transformationContext: String = "", options: JsonOptions = JsonOptions.empty, callSite: CallSite = CallSite("Not provided"), stageThreshold: Long = 0, totalThreshold: Long = 0): DynamicFrame
  • stageDynamicFrame – Der Staging-DynamicFrame, der zusammengeführt werden soll.

  • primaryKeys – Die Liste der Primärschlüsselfelder, mit denen Datensätze aus der Quelle und Staging-DynamicFrames übereinstimmen.

  • transformationContext – Eine eindeutige Zeichenfolge, die zum Abrufen von Metadaten über die aktuelle Transformation verwendet wird (optional).

  • options – Eine Zeichenfolge von JSON-Name-Wert-Paaren, die zusätzliche Informationen für diese Transformation bereitstellen.

  • callSite – Wird verwendet, um Kontextinformationen für Fehlerberichte bereitzustellen.

  • stageThreshold – Ein Long. Die Anzahl der Fehler in der angegebenen Transformation, für die die Verarbeitung fehlerhaft sein muss.

  • totalThreshold – Ein Long. Die Gesamtzahl der Fehler bis einschließlich dieser Transformation, bei denen die Verarbeitung fehlerhaft sein muss.

Führt dieses DynamicFrame mit einem Staging-DynamicFrame basierend auf den angegebenen Primärschlüsseln zusammen, um Datensätze zu identifizieren. Doppelte Datensätze (Datensätze mit denselben Primärschlüsseln) werden nicht dedupliziert. Wenn kein übereinstimmender Datensatz im Staging-Frame vorhanden ist, werden alle Datensätze (einschließlich Duplikate) von der Quelle beibehalten. Wenn der Staging-Frame übereinstimmende Datensätze enthält, überschreiben die Datensätze aus dem Staging-Frame die Datensätze in der Quelle in AWS Glue.

Der zurückgegebene DynamicFrame enthält Datensatz A in folgenden Fällen:

  1. Wenn sowohl im Quell- als auch im Staging-Frame A vorhanden ist, wird A im Staging-Frame zurückgegeben.

  2. Wenn sich A in der Quelltabelle und A.primaryKeys nicht in stagingDynamicFrame befindet (d. h. A wird nicht in der Staging-Tabelle aktualisiert).

Der Quell- und der Staging-Frame müssen nicht dasselbe Schema haben.

val mergedFrame: DynamicFrame = srcFrame.mergeDynamicFrames(stageFrame, Seq("id1", "id2"))

Def printSchema

def printSchema : Unit

Druckt das Schema dieses DynamicFrame in einem lesbaren Format in stdout.

Def recomputeSchema

def recomputeSchema : Schema

Erzwingt eine Neuberechnung des Schemas. Dies erfordert einen Scan über die Daten, aber es kann das Schema „verschärfen“, wenn es einige Felder im aktuellen Schema gibt, die nicht in den Daten vorhanden sind.

Gibt das neu berechnete Schema zurück.

Def relationalize

def relationalize( rootTableName : String, stagingPath : String, options : JsonOptions = JsonOptions.empty, transformationContext : String = "", callSite : CallSite = CallSite("Not provided"), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : Seq[DynamicFrame]
  • rootTableName – Der Name für die Basis DynamicFrame in der Ausgabe. DynamicFrames, die durch das Zusammenfassen von Arrays erstellt werden, beginnen mit diesem als Präfix.

  • stagingPath – Der Amazon-S3-Pfad (Amazon Simple Storage Service) zum Schreiben von Zwischendaten.

  • options – Relationalisierung von Optionen und Konfiguration. Derzeit nicht verwendet.

Gleicht alle verschachtelten Strukturen an und pivotiert Arrays in separate Tabellen.

Mit dieser Operation können Sie tief verschachtelte Daten für die Aufnahme in eine relationale Datenbank vorbereiten. Verschachtelte Strukturen werden genauso wie die Unnest-Transformation auf eine Ebene gebracht. Außerdem werden Arrays in separate Tabellen pivotiert. Dabei wird jedes Array-Element zu einer Zeile. Angenommen, Sie haben einen DynamicFrame mit den folgenden Daten:

{"name": "Nancy", "age": 47, "friends": ["Fred", "Lakshmi"]} {"name": "Stephanie", "age": 28, "friends": ["Yao", "Phil", "Alvin"]} {"name": "Nathan", "age": 54, "friends": ["Nicolai", "Karen"]}

Führen Sie folgenden Code aus.

{{{ df.relationalize("people", "s3:/my_bucket/my_path", JsonOptions.empty) }}}

Es werden zwei Tabellen erstellt. Die erste Tabelle trägt den Namen „people“ und enthält Folgendes:

{{{ {"name": "Nancy", "age": 47, "friends": 1} {"name": "Stephanie", "age": 28, "friends": 2} {"name": "Nathan", "age": 54, "friends": 3) }}}

Das Freunde-Array wurde durch einen automatisch generierten Join-Schlüssel ersetzt. Eine separate Tabelle namens people.friends mit folgendem Inhalt wird erstellt:

{{{ {"id": 1, "index": 0, "val": "Fred"} {"id": 1, "index": 1, "val": "Lakshmi"} {"id": 2, "index": 0, "val": "Yao"} {"id": 2, "index": 1, "val": "Phil"} {"id": 2, "index": 2, "val": "Alvin"} {"id": 3, "index": 0, "val": "Nicolai"} {"id": 3, "index": 1, "val": "Karen"} }}}

In dieser Tabelle ist „id“ ein Join-Schlüssel, der identifiziert, aus welchem Datensatz das Array-Element stammt, „index“ bezieht sich auf die Position im ursprünglichen Array und „val“ steht für den tatsächlichen Array-Eintrag.

Die relationalize-Methode gibt die Sequenz von DynamicFrames zurück, die durch rekursives Anwenden dieses Prozesses auf alle Arrays erzeugt werden.

Anmerkung

Die AWS Glue-Bibliothek generiert automatisch Join-Schlüssel für neue Tabellen. Um sicherzustellen, dass Join-Schlüssel über alle Auftragsausführungen hinweg eindeutig sind, müssen Sie Auftrags-Lesezeichen aktivieren.

Def renameField

def renameField( oldName : String, newName : String, transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame
  • oldName – Der ursprüngliche Name der Spalte.

  • newName – Der neue Name der Spalte.

Gibt einen neuen DynamicFrame zurück, wobei das angegebene Feld umbenannt ist.

Mit dieser Methode können Sie verschachtelte Felder umbenennen. Der folgende Code benennt beispielsweise innerhalb der Adressenstruktur state zu state_code um:

{{{ df.renameField("address.state", "address.state_code") }}}

Def repartition

def repartition( numPartitions : Int, transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame

Gibt einen neuen DynamicFrame mit numPartitions Partitionen zurück.

Def resolveChoice

def resolveChoice( specs : Seq[Product2[String, String]] = Seq.empty[ResolveSpec], choiceOption : Option[ChoiceOption] = None, database : Option[String] = None, tableName : Option[String] = None, transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame
  • choiceOption – Eine Aktion, die auf alle ChoiceType-Spalten anzuwenden ist, die nicht in der Spezifikationsreihenfolge aufgeführt sind.

  • database – Die Data-Catalog-Datenbank zur Verwendung mit der match_catalog-Aktion.

  • tableName – Die Data-Catalog-Tabelle zur Verwendung mit der match_catalog-Aktion.

Gibt einen neuen DynamicFrame zurück, indem eine oder mehrere ChoiceTypes durch einen spezifischeren Typ ersetzt werden.

Es gibt zwei Möglichkeiten für die Verwendung von resolveChoice. Die erste besteht in der Angabe bestimmter Spalten und der Art, wie diese aufgelöst werden. Sie werden als Tupels, bestehend aus (Spalte, Aktion)-Paaren, angegeben.

Im Folgenden sind die möglichen Aktionen aufgeführt:

  • cast:type – Versucht, alle Werte in den angegebenen Typ umzuwandeln.

  • make_cols – Konvertiert die einzelnen verschiedenen Typen in eine Spalte namens columnName_type.

  • make_struct – Konvertiert eine Spalte in eine Struktur mit Schlüssel für die individuellen Typen.

  • project:type – Behält nur Wert des angegebenen Typs bei.

Der andere Modus für resolveChoice dient zum Angeben einer einzigen Auflösung für alle ChoiceTypes. Sie können diesen verwenden, wenn die vollständige Liste der ChoiceTypes vor der Ausführung unbekannt ist. Zusätzlich zu den soeben aufgeführten Aktionen unterstützt dieser Modus noch die folgende Aktion:

  • match_catalogChoiceType – Versucht jeden in einen entsprechenden Typ in der angegebenen Katalogtabelle umzuwandeln.

Beispiele:

Lösen Sie die user.id-Spalte auf, indem Sie eine Umwandlung in ein „int“ durchführen, und sorgen Sie dafür, dass das address-Feld nur Strukturen beibehält:

{{{ df.resolveChoice(specs = Seq(("user.id", "cast:int"), ("address", "project:struct"))) }}}

Lösen Sie alle ChoiceTypes auf, indem Sie jede Auswahl in eine eigene Spalte umwandeln:

{{{ df.resolveChoice(choiceOption = Some(ChoiceOption("make_cols"))) }}}

Lösen Sie alle ChoiceTypes auf, indem Sie diese in die Typen in der angegebenen Katalogtabelle umwandeln.

{{{ df.resolveChoice(choiceOption = Some(ChoiceOption("match_catalog")), database = Some("my_database"), tableName = Some("my_table")) }}}

Def schema

def schema : Schema

Gibt das Schema dieses DynamicFrame zurück.

Das zurückgegebene Schema enthält garantiert alle Felder in einem Datensatz in diesem DynamicFrame. In einigen wenigen Fällen kann es aber auch zusätzliche Felder enthalten. Die Unnest-Methode kann verwendet werden, um das Schema basierend auf den Datensätzen in diesem DynamicFramezu „straffen“.

Def selectField

def selectField( fieldName : String, transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame

Gibt ein einzelnes Feld als DynamicFrame zurück.

Def selectFields

def selectFields( paths : Seq[String], transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame
  • paths – Die Reihenfolge der zu wählenden Spaltennamen.

Gibt einen neuen DynamicFrame mit den angegebenen Spalten zurück.

Anmerkung

Sie können die selectFields-Methode nur verwenden, um Spalten der obersten Ebene auszuwählen. Sie können die applyMapping-Methode zum Auswählen verschachtelter Spalten einsetzen.

Def show

def show( numRows : Int = 20 ) : Unit
  • numRows – Die Anzahl der zu druckenden Zeilen.

Druckt Zeilen aus diesem DynamicFrame im JSON-Format.

Def simplifyDDBJson

DynamoDB-Exporte mit dem AWS Glue DynamoDB-Export-Konnektor führen zu JSON-Dateien mit bestimmten verschachtelten Strukturen. Weitere Informationen finden Sie unter Datenobjekte. simplifyDDBJson Vereinfacht verschachtelte Spalten in einem DynamicFrame dieser Art von Daten und gibt einen neuen vereinfachten zurück DynamicFrame. Wenn mehrere Typen oder ein Zuordnungstyp in einem Listentyp enthalten sind, werden die Elemente in der Liste nicht vereinfacht. Diese Methode unterstützt nur Daten im DynamoDB-Export-JSON-Format. Erwägen Sieunnest, ähnliche Änderungen an anderen Arten von Daten vorzunehmen.

def simplifyDDBJson() : DynamicFrame

Diese Methode verwendet keine Parameter.

Beispieleingabe

Betrachten Sie das folgende Schema, das durch einen DynamoDB-Export generiert wurde:

root |-- Item: struct | |-- parentMap: struct | | |-- M: struct | | | |-- childMap: struct | | | | |-- M: struct | | | | | |-- appName: struct | | | | | | |-- S: string | | | | | |-- packageName: struct | | | | | | |-- S: string | | | | | |-- updatedAt: struct | | | | | | |-- N: string | |-- strings: struct | | |-- SS: array | | | |-- element: string | |-- numbers: struct | | |-- NS: array | | | |-- element: string | |-- binaries: struct | | |-- BS: array | | | |-- element: string | |-- isDDBJson: struct | | |-- BOOL: boolean | |-- nullValue: struct | | |-- NULL: boolean

Beispiel-Code

import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.DynamoDbDataSink import org.apache.spark.SparkContextimport scala.collection.JavaConverters._ object GlueApp { def main(sysArgs: Array[String]): Unit = { val glueContext = new GlueContext(SparkContext.getOrCreate()) val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) val dynamicFrame = glueContext.getSourceWithFormat( connectionType = "dynamodb", options = JsonOptions(Map( "dynamodb.export" -> "ddb", "dynamodb.tableArn" -> "ddbTableARN", "dynamodb.s3.bucket" -> "exportBucketLocation", "dynamodb.s3.prefix" -> "exportBucketPrefix", "dynamodb.s3.bucketOwner" -> "exportBucketAccountID", )) ).getDynamicFrame() val simplified = dynamicFrame.simplifyDDBJson() simplified.printSchema() Job.commit() } }

Die Transformation simplifyDDBJson vereinfacht dies zu:

root |-- parentMap: struct | |-- childMap: struct | | |-- appName: string | | |-- packageName: string | | |-- updatedAt: string |-- strings: array | |-- element: string |-- numbers: array | |-- element: string |-- binaries: array | |-- element: string |-- isDDBJson: boolean |-- nullValue: null

Def spigot

def spigot( path : String, options : JsonOptions = new JsonOptions("{}"), transformationContext : String = "", callSite : CallSite = CallSite("Not provided"), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame

Pass-Through-Transformation, die die gleichen Datensätze zurückgibt, darüber hinaus aber noch ein Subset an Datensätzen schreibt.

  • path – Der Pfad in Amazon S3 zum Schreiben der Ausgabe in der Form s3://bucket//path.

  • options – Eine optionale JsonOptions-Zuweisung, die das Sampling-Verhalten beschreibt.

Gibt einen DynamicFrame zurück, der die gleichen Datensätze wie dieser enthält.

Standardmäßig werden 100 willkürliche Datensätze in den durch path angegebenen Speicherort geschrieben. Sie können dieses Verhalten anpassen, indem Sie die options-Zuweisung verwenden. Gültige Schlüssel enthalten Folgendes.

  • topk – Gibt die Gesamtzahl der geschriebenen Datensätze an. Der Standardwert ist 100.

  • prob – Gibt an, wie wahrscheinlich es ist (in Form einer Dezimalzahl), dass ein einzelner Datensatz enthalten ist. Standard = 1.

Beispielsweise würde der folgende Aufruf den Datensatz sampeln, indem er jeden Datensatz mit einer Wahrscheinlichkeit von 20 % auswählt und nach dem Schreiben von 200 Datensätzen stoppt:

{{{ df.spigot("s3://my_bucket/my_path", JsonOptions(Map("topk" -> 200, "prob" -> 0.2))) }}}

Def splitFields

def splitFields( paths : Seq[String], transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : Seq[DynamicFrame]
  • paths – Die Pfade, die in den ersten DynamicFrame aufzunehmen sind.

Gibt eine Abfolge zweier DynamicFrames zurück. Die erste DynamicFrame enthält die angegebenen Pfade und die zweite alle anderen Spalten.

Beispiel

In diesem Beispiel wird ein aus der persons Tabelle in der legislators Datenbank im AWS Glue Data Catalog DynamicFrame erstellt und DynamicFrame in zwei aufgeteilt, wobei die angegebenen Felder in das erste DynamicFrame und die verbleibenden Felder in ein zweites gehen DynamicFrame. Das Beispiel wählt dann die erste DynamicFrame aus dem Ergebnis aus.

val InputFrame = glueContext.getCatalogSource(database="legislators", tableName="persons", transformationContext="InputFrame").getDynamicFrame() val SplitField_collection = InputFrame.splitFields(paths=Seq("family_name", "name", "links.note", "links.url", "gender", "image", "identifiers.scheme", "identifiers.identifier", "other_names.lang", "other_names.note", "other_names.name"), transformationContext="SplitField_collection") val ResultFrame = SplitField_collection(0)

Def splitRows

def splitRows( paths : Seq[String], values : Seq[Any], operators : Seq[String], transformationContext : String, callSite : CallSite, stageThreshold : Long, totalThreshold : Long ) : Seq[DynamicFrame]

Teilt Zeilen basierend auf Prädikaten, die Konstanten mit Spalten vergleichen, auf.

  • paths – Die Spalten, die zum Vergleich verwendet werden sollen.

  • values – Die konstanten Werte, die zum Vergleich verwendet werden sollen.

  • operators – Die zum Vergleich zu verwendenden Operatoren.

Gibt eine Abfolge zweier DynamicFrames zurück. Die erste enthält Zeilen, für die das Prädikat "true" ist, und die zweite enthält solche, bei denen es "false" ist.

Prädikate werden über drei Sequenzen spezifiziert: „paths“ enthält die (evtl. geschachtelten) Spaltennamen, „values“ enthält die zu vergleichenden konstanten Werte und „operators“ enthält die Operatoren, die zum Vergleich verwendet werden sollen. Alle drei Sequenzen müssen gleich lang sein: Der n. Operator wird verwendet, um die n. Spalte mit dem n. Wert zu vergleichen.

Jeder Operator muss "!=", "=", "<=", "<", ">=" oder ">" sein.

Beispielsweise teilt der folgende Aufruf einen DynamicFrame so, dass der erste Ausgabe-Frame die Datensätze von Personen über 65 aus den USA enthält und der zweite alle anderen:

{{{ df.splitRows(Seq("age", "address.country"), Seq(65, "USA"), Seq("&gt;=", "=")) }}}

Def stageErrorsCount

def stageErrorsCount

Gibt die Anzahl der Fehlerdatensätze zurück, die bei der Berechnung dieses DynamicFrame generiert wurden. Ausgenommen sind Fehler aus vorherigen Operationen, die diesem DynamicFrame als Eingabe übergeben wurden.

Def toDF

def toDF( specs : Seq[ResolveSpec] = Seq.empty[ResolveSpec] ) : DataFrame

Konvertiert DynamicFrame in einen Apache Spark SQL DataFrame mit demselben Schema und denselben Datensätzen.

Anmerkung

Da DataFrames ChoiceTypes nicht unterstützen, konvertiert diese Methode ChoiceType-Spalten automatisch in StructTypes. Weitere Informationen und Optionen zur Auflösung der Auswahl finden Sie unter resolveChoice.

Def unbox

def unbox( path : String, format : String, optionString : String = "{}", transformationContext : String = "", callSite : CallSite = CallSite("Not provided"), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame
  • path – Die zu analysierende Spalte. Muss eine Zeichenfolge oder ein Binärwert sein.

  • format – Das für die Analyse zu verwendende Format.

  • optionString – An das Format zu übergebende Optionen, beispielsweise das CSV-Trennzeichen.

Analysiert eine eingebettete Zeichenfolge oder eine binäre Spalte entsprechend des angegebenen Formats. Analysierte Spalten werden unterhalb einer Struktur mit dem ursprünglichen Spaltennamen verschachtelt.

Angenommen, Sie haben eine CSV-Datei mit einer eingebetteten JSON-Spalte.

name, age, address Sally, 36, {"state": "NE", "city": "Omaha"} ...

Nach einer ersten Analyse erhalten Sie einen DynamicFrame mit folgendem Schema:

{{{ root |-- name: string |-- age: int |-- address: string }}}

Sie können unbox für die Adressspalte aufrufen, um die einzelnen Komponenten zu analysieren.

{{{ df.unbox("address", "json") }}}

Dadurch erhalten wir einen DynamicFrame mit folgendem Schema:

{{{ root |-- name: string |-- age: int |-- address: struct | |-- state: string | |-- city: string }}}

Def unnest

def unnest( transformationContext : String = "", callSite : CallSite = CallSite("Not Provided"), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame

Gibt einen neuen DynamicFrame zurück, bei dem allen verschachtelten Strukturen auf eine Ebene gebracht wurden. Namen werden mit Hilfe des „.“ (Punkt-)Zeichens erstellt.

Angenommen, Sie haben einen DynamicFrame mit folgendem Schema:

{{{ root |-- name: string |-- age: int |-- address: struct | |-- state: string | |-- city: string }}}

Der folgende Aufruf hebt die Einbettung der Adressenstruktur auf:

{{{ df.unnest() }}}

Das resultierende Schema lautet wie folgt:

{{{ root |-- name: string |-- age: int |-- address.state: string |-- address.city: string }}}

Diese Methode verhindert auch verschachtelte Strukturen innerhalb von Arrays. Aus historischen Gründen werden den Namen solcher Felder jedoch der Name des umschließenden Arrays und „.val“ vorangestellt.

Def unnestDDBJson

unnestDDBJson(transformationContext : String = "", callSite : CallSite = CallSite("Not Provided"), stageThreshold : Long = 0, totalThreshold : Long = 0): DynamicFrame

Hebt die Verschachtelung der Spalten in einem DynamicFrame auf, die sich speziell in der DynamoDB-JSON-Struktur befinden, und gibt einen neuen, nicht verschachtelten DynamicFrame zurück. Bei Spalten, die aus einem Array von Strukturtypen bestehen, wird die Verschachtelung nicht aufgehoben. Dies ist ein spezieller Typ der Transformation zum Aufheben der Verschachtelung, der sich anders verhält als die reguläre unnest-Transformation und erfordert, dass sich die Daten bereits in der DynamoDB-JSON-Struktur befinden. Weitere Informationen finden Sie unter DYNAMODB JSON.

Das Schema eines Vorgangs zum Lesen eines Exports mit der DynamoDB-JSON-Struktur könnte beispielsweise wie folgt aussehen:

root |-- Item: struct | |-- ColA: struct | | |-- S: string | |-- ColB: struct | | |-- S: string | |-- ColC: struct | | |-- N: string | |-- ColD: struct | | |-- L: array | | | |-- element: null

Die unnestDDBJson()-Transformation würde dies folgendermaßen umwandeln:

root |-- ColA: string |-- ColB: string |-- ColC: string |-- ColD: array | |-- element: null

Das folgende Codebeispiel veranschaulicht, wie Sie den AWS-Glue-DynamoDB-Export-Konnektor verwenden, die Aufhebung einer DynamoDB-JSON-Verschachtelung aufrufen und die Anzahl der Partitionen ausdrucken:

import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.DynamoDbDataSink import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object GlueApp { def main(sysArgs: Array[String]): Unit = { val glueContext = new GlueContext(SparkContext.getOrCreate()) val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) val dynamicFrame = glueContext.getSourceWithFormat( connectionType = "dynamodb", options = JsonOptions(Map( "dynamodb.export" -> "ddb", "dynamodb.tableArn" -> "<test_source>", "dynamodb.s3.bucket" -> "<bucket name>", "dynamodb.s3.prefix" -> "<bucket prefix>", "dynamodb.s3.bucketOwner" -> "<account_id of bucket>", )) ).getDynamicFrame() val unnested = dynamicFrame.unnestDDBJson() print(unnested.getNumPartitions()) Job.commit() } }

Def withFrameSchema

def withFrameSchema( getSchema : () => Schema ) : DynamicFrame
  • getSchema – Eine Funktion, die das zu verwendende Schema zurückgibt. Wird als Null-Parameter-Funktion angegeben, um eine möglicherweise kostenintensive Berechnung zu verhindern.

Legt das Schema dieses DynamicFrame auf den angegebenen Wert fest. Dies wird in erster Linie intern verwendet, um eine kostspielige Neuberechnung des Schemas zu vermeiden. Das übergebene Schema muss alle Spalten enthalten, die in den Daten vorhanden sind.

Def withName

def withName( name : String ) : DynamicFrame
  • name – Der zu verwendende neue Name.

Gibt eine Kopie dieser DynamicFrame mit einem neuen Namen zurück.

Def withTransformationContext

def withTransformationContext( ctx : String ) : DynamicFrame

Gibt eine Kopie dieser DynamicFrame mit dem angegebenen Transformationskontext zurück.