Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.
DynamicFrame classe
Una delle principali astrazioni di Apache Spark è Spark SQLDataFrame
, che è simile al DataFrame
costrutto trovato in R e Pandas. A DataFrame
è simile a una tabella e supporta operazioni e operazioni in stile funzionale (map/reduce/filter/etc.) e operazioni (selezione, progetto, aggregazione). SQL
DataFrames
sono potenti e ampiamente utilizzate, ma presentano limitazioni rispetto alle operazioni di estrazione, trasformazione e caricamento (). ETL Principalmente, richiedono che venga specificato uno schema prima di caricare qualsiasi dato. Spark SQL risolve questo problema effettuando due passaggi sui dati: il primo per dedurre lo schema e il secondo per caricare i dati. Tuttavia, l'inferenza è limitata e non gestisce i casi di dati non organizzati. Lo stesso campo, ad esempio, potrebbe essere di tipo diverso in record diversi. Apache Spark spesso si ferma e definisce il tipo come string
utilizzando il testo del campo originale. Questo potrebbe non essere corretto e potrebbe essere richiesto un controllo più preciso sulle modalità di risoluzione delle discrepanze dello schema. Inoltre, per i set di dati di grandi dimensioni, un ulteriore passaggio sui dati di origine potrebbe essere proibitivo in termini di costi.
Per ovviare a queste limitazioni, AWS Glue introduce ilDynamicFrame
. Un DynamicFrame
è simile a un DataFrame
, con la differenza che ogni record è autodescrittivo, quindi inizialmente non è richiesto alcuno schema. Anziché, AWS Glue calcola uno schema on-the-fly quando richiesto e codifica esplicitamente le incongruenze dello schema utilizzando un tipo di scelta (o unione). Puoi risolvere queste incongruenze per rendere i set di dati compatibili con i datastore che richiedono uno schema fisso.
Analogamente, un DynamicRecord
rappresenta un record logico all'interno di un DynamicFrame
. È come una riga in un DataFrame
Spark, con la differenza che è autodescrittivo e può essere utilizzato per dati non conformi a uno schema fisso. Quando si utilizza AWS Glue with PySpark, in genere non si manipola in modo indipendenteDynamicRecords
. Viceversa, di solito si trasforma il set di dati nel suo complesso attraverso il rispettivo DynamicFrame
.
Dopo aver risolto eventuali incongruenze dello schema, puoi convertire DynamicFrames
in e da DataFrames
.
— construction —
__init__
__init__(jdf, glue_ctx, name)
-
jdf
— Un riferimento al frame di dati nella Java Virtual Machine (JVM). -
glue_ctx
: un oggetto GlueContext classe. -
name
: una stringa nome opzionale, vuota per impostazione predefinita.
fromDF
fromDF(dataframe, glue_ctx, name)
Converte un DataFrame
in un DynamicFrame
convertendo campi del DataFrame
in campi del DynamicRecord
. Restituisce il nuovo DynamicFrame
.
Un DynamicRecord
rappresenta un record logico all'interno di un DynamicFrame
. È simile a una riga in un DataFrame
Spark, con la differenza che è autodescrittivo e può essere utilizzato per dati non conformi a uno schema fisso.
Questa funzione prevede che le colonne con nomi duplicati nel DataFrame
siano già state risolte.
-
dataframe
— Apache Spark SQLDataFrame
da convertire (richiesto). -
glue_ctx
: l'oggetto GlueContext classe che specifica il contesto di questa trasformazione (richiesto). -
name
— Il nome del risultatoDynamicFrame
(opzionale a partire da AWS Glue 3.0).
toDF
toDF(options)
Converte un DynamicFrame
in un DataFrame
Apache Spark, convertendo DynamicRecords
in campi di DataFrame
. Restituisce il nuovo DataFrame
.
Un DynamicRecord
rappresenta un record logico all'interno di un DynamicFrame
. È simile a una riga in un DataFrame
Spark, con la differenza che è autodescrittivo e può essere utilizzato per dati non conformi a uno schema fisso.
-
options
: un elenco di opzioni. Consente di specificare opzioni aggiuntive per il processo di conversione. Alcune opzioni valide che puoi usare con il parametro `options`:-
format
— specifica il formato dei dati, ad esempio json, csv, parquet). -
separater or sep
— per CSV i file, specifica il delimitatore. -
header
— per CSV i file, indica se la prima riga è un'intestazione (vero/falso). -
inferSchema
— indica a Spark di inferire automaticamente lo schema (vero/falso).
Ecco un esempio di utilizzo del parametro `options` con il metodo `toDF`:
from awsglue.context import GlueContext from awsglue.dynamicframe import DynamicFrame from pyspark.context import SparkContext sc = SparkContext() glueContext = GlueContext(sc) csv_dyf = glueContext.create_dynamic_frame.from_options( connection_type="s3", connection_options={"paths": ["s3://my-bucket/path/to/csv/"]}, format="csv" ) csv_cf = csv_dyf.toDF(options={ "separator": ",", "header": "true", "ïnferSchema": "true" })
Se scegli il tipo di operazione
Project
eCast
, devi specificare il tipo di destinazione. Gli esempi includono quanto segue.>>>toDF([ResolveOption("a.b.c", "KeepAsStruct")]) >>>toDF([ResolveOption("a.b.c", "Project", DoubleType())])
-
— information —
count
count( )
: restituisce il numero di righe nell'oggetto sottostante DataFrame
.
schema
schema( )
: restituisce lo schema di questo DynamicFrame
oppure, se non è disponibile, lo schema del DataFrame
sottostante.
Per ulteriori informazioni sui tipi di DynamicFrame
che compongono questo schema, consulta la pagina Tipi di estensione PySpark.
printSchema
printSchema( )
: stampa lo schema dell'oggetto sottostante DataFrame
.
show
show(num_rows)
: stampa un numero di righe specificato dall'oggetto sottostante DataFrame
.
repartition
repartition(numPartitions)
: restituisce un nuovo oggetto DynamicFrame
con partizioni numPartitions
.
coalesce
coalesce(numPartitions)
– Restituisce un nuovo oggetto DynamicFrame
con partizioni numPartitions
.
— transforms —
apply_mapping
apply_mapping(mappings, transformation_ctx="", info="", stageThreshold=0,
totalThreshold=0)
Applica una mappatura dichiarativa a DynamicFrame
e restituisce un nuovo DynamicFrame
con tali mappature applicate ai campi specificati. I campi non specificati vengono omessi dal nuovo DynamicFrame
.
-
mappings
: un elenco di tuple di mappatura (obbligatorio). Ognuna è costituito da: colonna di origine, tipo di origine, colonna di destinazione, tipo di destinazione.Se il nome della colonna di origine include un punto (
.
), esso deve essere racchiuso tra apici inversi (``
). Ad esempio, per mapparethis.old.name
(stringa) athisNewName
, devi utilizzare la tupla seguente:("`this.old.name`", "string", "thisNewName", "string")
-
transformation_ctx
: una stringa univoca utilizzata per identificare informazioni sullo stato (opzionale). -
info
: una stringa da associare alla segnalazione errori per questa trasformazione (opzionale). -
stageThreshold
: il numero di errori rilevati durante questa trasformazione raggiunto il quale il processo deve interrompersi (facoltativo). L'impostazione predefinita è zero, indicando che il processo non deve terminare. -
totalThreshold
: il numero massimo di errori rilevati durante questa trasformazione raggiunto il quale il processo deve interrompersi (facoltativo). L'impostazione predefinita è zero, indicando che il processo non deve terminare.
Esempio: usa apply_mapping per rinominare campi e modificare tipi di campo
L'esempio di codice seguente mostra come utilizzare il metodo apply_mapping
per rinominare i campi selezionati e modificare tipi di campo.
Nota
Per accedere al set di dati utilizzato in questo esempio, consulta Esempio di codice: unione e relazioni dei dati e segui le istruzioni in Fase 1: esecuzione del crawling sui dati nel bucket Amazon S3.
# Example: Use apply_mapping to reshape source data into # the desired column names and types as a new DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Create a DynamicFrame and view its schema persons = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="persons_json" ) print("Schema for the persons DynamicFrame:") persons.printSchema() # Select and rename fields, change field type print("Schema for the persons_mapped DynamicFrame, created with apply_mapping:") persons_mapped = persons.apply_mapping( [ ("family_name", "String", "last_name", "String"), ("name", "String", "first_name", "String"), ("birth_date", "String", "date_of_birth", "Date"), ] ) persons_mapped.printSchema()
Schema for the persons DynamicFrame: root |-- family_name: string |-- name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- gender: string |-- image: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- sort_name: string |-- images: array | |-- element: struct | | |-- url: string |-- given_name: string |-- birth_date: string |-- id: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- death_date: string Schema for the persons_mapped DynamicFrame, created with apply_mapping: root |-- last_name: string |-- first_name: string |-- date_of_birth: date
drop_fields
drop_fields(paths, transformation_ctx="", info="", stageThreshold=0,
totalThreshold=0)
Richiama la trasformazione Classe FlatMap per rimuovere campi da un DynamicFrame
. Restituisce un nuovo DynamicFrame
con i campi specificati rimossi.
-
paths
: un elenco di stringhe. Ognuna contiene il percorso completo di un nodo del campo da rimuovere. Puoi utilizzare la notazione a punti per specificare campi nidificati. Ad esempio, se il campofirst
è figlio del camponame
nell'albero, specifica"name.first"
per il percorso.Se il nome di un nodo di campo contiene un punto (
.
) letterale, è necessario racchiudere il nome tra apici inversi (`
). -
transformation_ctx
: una stringa univoca utilizzata per identificare informazioni sullo stato (opzionale). -
info
: una stringa da associare alla segnalazione errori per questa trasformazione (opzionale). -
stageThreshold
: il numero di errori rilevati durante questa trasformazione raggiunto il quale il processo deve interrompersi (facoltativo). L'impostazione predefinita è zero, indicando che il processo non deve terminare. -
totalThreshold
: il numero massimo di errori rilevati durante questa trasformazione raggiunto il quale il processo deve interrompersi (facoltativo). L'impostazione predefinita è zero, indicando che il processo non deve terminare.
Esempio: utilizza drop_fields per rimuovere campi da un DynamicFrame
Questo esempio di codice utilizza il metodo drop_fields
per rimuovere i campi di primo livello e i campi nidificati selezionati da un DynamicFrame
.
Set di dati di esempio
L'esempio utilizza il set di dati seguente rappresentato dalla tabella EXAMPLE-FRIENDS-DATA
nel codice:
{"name": "Sally", "age": 23, "location": {"state": "WY", "county": "Fremont"}, "friends": []} {"name": "Varun", "age": 34, "location": {"state": "NE", "county": "Douglas"}, "friends": [{"name": "Arjun", "age": 3}]} {"name": "George", "age": 52, "location": {"state": "NY"}, "friends": [{"name": "Fred"}, {"name": "Amy", "age": 15}]} {"name": "Haruki", "age": 21, "location": {"state": "AK", "county": "Denali"}} {"name": "Sheila", "age": 63, "friends": [{"name": "Nancy", "age": 22}]}
Esempio di codice
# Example: Use drop_fields to remove top-level and nested fields from a DynamicFrame. # Replace MY-EXAMPLE-DATABASE with your Glue Data Catalog database name. # Replace EXAMPLE-FRIENDS-DATA with your table name. from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Create a DynamicFrame from Glue Data Catalog glue_source_database = "
MY-EXAMPLE-DATABASE
" glue_source_table = "EXAMPLE-FRIENDS-DATA
" friends = glueContext.create_dynamic_frame.from_catalog( database=glue_source_database, table_name=glue_source_table ) print("Schema for friends DynamicFrame before calling drop_fields:") friends.printSchema() # Remove location.county, remove friends.age, remove age friends = friends.drop_fields(paths=["age", "location.county", "friends.age"]) print("Schema for friends DynamicFrame after removing age, county, and friend age:") friends.printSchema()
Schema for friends DynamicFrame before calling drop_fields: root |-- name: string |-- age: int |-- location: struct | |-- state: string | |-- county: string |-- friends: array | |-- element: struct | | |-- name: string | | |-- age: int Schema for friends DynamicFrame after removing age, county, and friend age: root |-- name: string |-- location: struct | |-- state: string |-- friends: array | |-- element: struct | | |-- name: string
filter
filter(f, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)
Restituisce un nuovo DynamicFrame
contenente tutti i DynamicRecords
nel DynamicFrame
di input che soddisfano la funzione predicato specificata f
.
-
f
: funzione predicato da applicare all'oggettoDynamicFrame
. La funzione deve richiedere unDynamicRecord
come argomento e restituire True se ilDynamicRecord
soddisfa i requisiti del filtro o False in caso contrario (obbligatorio).Un
DynamicRecord
rappresenta un record logico all'interno di unDynamicFrame
. È simile a una riga in unDataFrame
Spark, con la differenza che è autodescrittivo e può essere utilizzato per dati non conformi a uno schema fisso. -
transformation_ctx
: una stringa univoca utilizzata per identificare informazioni sullo stato (opzionale). -
info
: una stringa da associare alla segnalazione errori per questa trasformazione (opzionale). -
stageThreshold
: il numero di errori rilevati durante questa trasformazione raggiunto il quale il processo deve interrompersi (facoltativo). L'impostazione predefinita è zero, indicando che il processo non deve terminare. -
totalThreshold
: il numero massimo di errori rilevati durante questa trasformazione raggiunto il quale il processo deve interrompersi (facoltativo). L'impostazione predefinita è zero, indicando che il processo non deve terminare.
Esempio: usa il filtro per ottenere una selezione filtrata di campi
In questo esempio viene utilizzato il metodo filter
per creare un nuovo DynamicFrame
che include una selezione filtrata di campi di un altro DynamicFrame
.
Come il metodo map
, filter
assume una funzione come argomento che viene applicato a ogni record nel DynamicFrame
originario. La funzione accetta un record come input e restituisce un valore booleano. Se il valore restituito è vero, il record viene incluso nel DynamicFrame
risultante. Se è falso, il record viene escluso.
Nota
Per accedere al set di dati utilizzato in questo esempio, consulta Esempio di codice: preparazione dei dati utilizzando ResolveChoice, Lambda e ApplyMapping e segui le istruzioni in Fase 1: esecuzione del crawling sui dati nel bucket Amazon S3.
# Example: Use filter to create a new DynamicFrame # with a filtered selection of records from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Create DynamicFrame from Glue Data Catalog medicare = glueContext.create_dynamic_frame.from_options( "s3", { "paths": [ "s3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv" ] }, "csv", {"withHeader": True}, ) # Create filtered DynamicFrame with custom lambda # to filter records by Provider State and Provider City sac_or_mon = medicare.filter( f=lambda x: x["Provider State"] in ["CA", "AL"] and x["Provider City"] in ["SACRAMENTO", "MONTGOMERY"] ) # Compare record counts print("Unfiltered record count: ", medicare.count()) print("Filtered record count: ", sac_or_mon.count())
Unfiltered record count: 163065 Filtered record count: 564
join
join(paths1, paths2, frame2, transformation_ctx="", info="", stageThreshold=0,
totalThreshold=0)
Esegue un equi join con un altro DynamicFrame
e restituisce il DynamicFrame
risultante.
-
paths1
: un elenco delle chiavi di questo frame da unire. -
paths2
: un elenco delle chiavi dell'altro frame da unire. -
frame2
: l'altroDynamicFrame
da unire. -
transformation_ctx
: una stringa univoca utilizzata per identificare informazioni sullo stato (opzionale). -
info
: una stringa da associare alla segnalazione errori per questa trasformazione (opzionale). -
stageThreshold
: il numero di errori rilevati durante questa trasformazione raggiunto il quale il processo deve interrompersi (facoltativo). L'impostazione predefinita è zero, indicando che il processo non deve terminare. -
totalThreshold
: il numero massimo di errori rilevati durante questa trasformazione raggiunto il quale il processo deve interrompersi (facoltativo). L'impostazione predefinita è zero, indicando che il processo non deve terminare.
Esempio: usa join per combinare DynamicFrames
Questo esempio utilizza il join
metodo per eseguire un'unione su tre. DynamicFrames
AWS Glue esegue l'unione in base alle chiavi di campo fornite. Il DynamicFrame
risultante contiene le righe dei due frame originali in cui le chiavi specificate corrispondono.
Tieni presente che la trasformazione join
mantiene intatti tutti i campi. Ciò significa che i campi specificati per la corrispondenza vengono visualizzati nel risultato DynamicFrame, anche se sono ridondanti e contengono le stesse chiavi. In questo esempio viene utilizzato drop_fields
per rimuovere tali chiavi ridondanti dopo l'unione.
Nota
Per accedere al set di dati utilizzato in questo esempio, consulta Esempio di codice: unione e relazioni dei dati e segui le istruzioni in Fase 1: esecuzione del crawling sui dati nel bucket Amazon S3.
# Example: Use join to combine data from three DynamicFrames from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Load DynamicFrames from Glue Data Catalog persons = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="persons_json" ) memberships = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="memberships_json" ) orgs = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="organizations_json" ) print("Schema for the persons DynamicFrame:") persons.printSchema() print("Schema for the memberships DynamicFrame:") memberships.printSchema() print("Schema for the orgs DynamicFrame:") orgs.printSchema() # Join persons and memberships by ID persons_memberships = persons.join( paths1=["id"], paths2=["person_id"], frame2=memberships ) # Rename and drop fields from orgs # to prevent field name collisions with persons_memberships orgs = ( orgs.drop_fields(["other_names", "identifiers"]) .rename_field("id", "org_id") .rename_field("name", "org_name") ) # Create final join of all three DynamicFrames legislators_combined = orgs.join( paths1=["org_id"], paths2=["organization_id"], frame2=persons_memberships ).drop_fields(["person_id", "org_id"]) # Inspect the schema for the joined data print("Schema for the new legislators_combined DynamicFrame:") legislators_combined.printSchema()
Schema for the persons DynamicFrame: root |-- family_name: string |-- name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- gender: string |-- image: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- sort_name: string |-- images: array | |-- element: struct | | |-- url: string |-- given_name: string |-- birth_date: string |-- id: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- death_date: string Schema for the memberships DynamicFrame: root |-- area_id: string |-- on_behalf_of_id: string |-- organization_id: string |-- role: string |-- person_id: string |-- legislative_period_id: string |-- start_date: string |-- end_date: string Schema for the orgs DynamicFrame: root |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- id: string |-- classification: string |-- name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- image: string |-- seats: int |-- type: string Schema for the new legislators_combined DynamicFrame: root |-- role: string |-- seats: int |-- org_name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- type: string |-- sort_name: string |-- area_id: string |-- images: array | |-- element: struct | | |-- url: string |-- on_behalf_of_id: string |-- other_names: array | |-- element: struct | | |-- note: string | | |-- name: string | | |-- lang: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- name: string |-- birth_date: string |-- organization_id: string |-- gender: string |-- classification: string |-- legislative_period_id: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- image: string |-- given_name: string |-- start_date: string |-- family_name: string |-- id: string |-- death_date: string |-- end_date: string
map
map(f, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)
Restituisce un nuovo DynamicFrame
ottenuto applicando la funzione di mappatura specificata a tutti i record nel DynamicFrame
originale.
-
f
: funzione di mappatura da applicare a tutti i record nell'oggettoDynamicFrame
. La funzione deve richiedere unDynamicRecord
come argomento e restituire un nuovoDynamicRecord
(obbligatorio).Un
DynamicRecord
rappresenta un record logico all'interno di unDynamicFrame
. È simile a una riga in unDataFrame
Apache Spark, con la differenza che è autodescrittivo e può essere utilizzato per dati non conformi a uno schema fisso. transformation_ctx
: una stringa univoca utilizzata per identificare informazioni sullo stato (opzionale).info
: una stringa associata a errori nella trasformazione (facoltativo).stageThreshold
: il numero massimo di errori che si possono verificare nella trasformazione prima che venga arrestata (facoltativo). Il valore di default è zero.totalThreshold
: il numero massimo di errori che si possono verificare in totale prima che l'elaborazione venga arrestata (facoltativo). Il valore di default è zero.
Esempio: utilizza la mappa per applicare una funzione a ogni record in un DynamicFrame
In questo esempio viene utilizzato il metodo map
per applicare una funzione a ogni record di un DynamicFrame
. Nello specifico, questo esempio applica una funzione denominata MergeAddress
a ogni record per unire diversi campi indirizzo in un singolo tipo struct
.
Nota
Per accedere al set di dati utilizzato in questo esempio, consulta Esempio di codice: preparazione dei dati utilizzando ResolveChoice, Lambda e ApplyMapping e segui le istruzioni in Fase 1: esecuzione del crawling sui dati nel bucket Amazon S3.
# Example: Use map to combine fields in all records # of a DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Create a DynamicFrame and view its schema medicare = glueContext.create_dynamic_frame.from_options( "s3", {"paths": ["s3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv"]}, "csv", {"withHeader": True}) print("Schema for medicare DynamicFrame:") medicare.printSchema() # Define a function to supply to the map transform # that merges address fields into a single field def MergeAddress(rec): rec["Address"] = {} rec["Address"]["Street"] = rec["Provider Street Address"] rec["Address"]["City"] = rec["Provider City"] rec["Address"]["State"] = rec["Provider State"] rec["Address"]["Zip.Code"] = rec["Provider Zip Code"] rec["Address"]["Array"] = [rec["Provider Street Address"], rec["Provider City"], rec["Provider State"], rec["Provider Zip Code"]] del rec["Provider Street Address"] del rec["Provider City"] del rec["Provider State"] del rec["Provider Zip Code"] return rec # Use map to apply MergeAddress to every record mapped_medicare = medicare.map(f = MergeAddress) print("Schema for mapped_medicare DynamicFrame:") mapped_medicare.printSchema()
Schema for medicare DynamicFrame: root |-- DRG Definition: string |-- Provider Id: string |-- Provider Name: string |-- Provider Street Address: string |-- Provider City: string |-- Provider State: string |-- Provider Zip Code: string |-- Hospital Referral Region Description: string |-- Total Discharges: string |-- Average Covered Charges: string |-- Average Total Payments: string |-- Average Medicare Payments: string Schema for mapped_medicare DynamicFrame: root |-- Average Total Payments: string |-- Average Covered Charges: string |-- DRG Definition: string |-- Average Medicare Payments: string |-- Hospital Referral Region Description: string |-- Address: struct | |-- Zip.Code: string | |-- City: string | |-- Array: array | | |-- element: string | |-- State: string | |-- Street: string |-- Provider Id: string |-- Total Discharges: string |-- Provider Name: string
mergeDynamicFrame
mergeDynamicFrame(stage_dynamic_frame, primary_keys, transformation_ctx = "",
options = {}, info = "", stageThreshold = 0, totalThreshold = 0)
Unisce questo DynamicFrame
con un DynamicFrame
temporaneo basato sulle chiavi primarie specificate per identificare i record. I record duplicati (record con le stesse chiavi primarie) non vengono deduplicati. Se non è presente alcun record corrispondente nel frame temporaneo, tutti i record (inclusi i duplicati) vengono mantenuti dall'origine. Se lo staging frame contiene record corrispondenti, i record dello staging frame sovrascrivono i record nell'origine in AWS Glue.
-
stage_dynamic_frame
: ilDynamicFrame
di gestione temporanea da unire. -
primary_keys
: l'elenco dei campi chiave primaria per abbinare i record dall'origine e dai frame dinamici di gestione temporanei. -
transformation_ctx
: una stringa univoca utilizzata per recuperare i metadati relativi alla trasformazione corrente (opzionale). -
options
— Una stringa di coppie JSON nome-valore che forniscono informazioni aggiuntive per questa trasformazione. Questo argomento non è attualmente utilizzato. -
info
: unString
. Qualsiasi stringa da associare agli errori in questa trasformazione. -
stageThreshold
: unLong
. Il numero di errori nella trasformazione specificata per cui l'elaborazione deve restituire un errore. -
totalThreshold
: unLong
. Il numero totale di errori fino a questa trasformazione inclusa per i quali l'elaborazione deve restituire un errore.
Questo metodo restituisce un nuovo DynamicFrame
ottenuto unendo questo DynamicFrame
con il DynamicFrame
temporaneo.
Il DynamicFrame
restituito contiene il record A in questi casi:
-
Se
A
esiste sia nel frame di origine che nel frame temporaneo, viene restituitoA
nel frame temporaneo. -
Se
A
si trova nella tabella di origine eA.primaryKeys
non si trova nelstagingDynamicFrame
,A
non viene aggiornato nella tabella temporanea.
Il frame di origine e il frame temporaneo non devono avere lo stesso schema.
Esempio: mergeDynamicFrame da utilizzare per unire due in DynamicFrames
base a una chiave primaria
Il seguente esempio di codice mostra come utilizzare il metodo mergeDynamicFrame
per unire un DynamicFrame
con un DynamicFrame
di staging, in base alla chiave primaria id
.
Set di dati di esempio
L'esempio utilizza due DynamicFrames
da una DynamicFrameCollection
chiamata split_rows_collection
. Di seguito è riportato un elenco di chiavi in split_rows_collection
.
dict_keys(['high', 'low'])
Esempio di codice
# Example: Use mergeDynamicFrame to merge DynamicFrames # based on a set of specified primary keys from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.transforms import SelectFromCollection # Inspect the original DynamicFrames frame_low = SelectFromCollection.apply(dfc=split_rows_collection, key="low") print("Inspect the DynamicFrame that contains rows where ID < 10") frame_low.toDF().show() frame_high = SelectFromCollection.apply(dfc=split_rows_collection, key="high") print("Inspect the DynamicFrame that contains rows where ID > 10") frame_high.toDF().show() # Merge the DynamicFrames based on the "id" primary key merged_high_low = frame_high.mergeDynamicFrame( stage_dynamic_frame=frame_low, primary_keys=["id"] ) # View the results where the ID is 1 or 20 print("Inspect the merged DynamicFrame that contains the combined rows") merged_high_low.toDF().where("id = 1 or id= 20").orderBy("id").show()
Inspect the DynamicFrame that contains rows where ID < 10 +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 1| 0| fax| 202-225-3307| | 1| 1| phone| 202-225-5731| | 2| 0| fax| 202-225-3307| | 2| 1| phone| 202-225-5731| | 3| 0| fax| 202-225-3307| | 3| 1| phone| 202-225-5731| | 4| 0| fax| 202-225-3307| | 4| 1| phone| 202-225-5731| | 5| 0| fax| 202-225-3307| | 5| 1| phone| 202-225-5731| | 6| 0| fax| 202-225-3307| | 6| 1| phone| 202-225-5731| | 7| 0| fax| 202-225-3307| | 7| 1| phone| 202-225-5731| | 8| 0| fax| 202-225-3307| | 8| 1| phone| 202-225-5731| | 9| 0| fax| 202-225-3307| | 9| 1| phone| 202-225-5731| | 10| 0| fax| 202-225-6328| | 10| 1| phone| 202-225-4576| +---+-----+------------------------+-------------------------+ only showing top 20 rows Inspect the DynamicFrame that contains rows where ID > 10 +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 11| 0| fax| 202-225-6328| | 11| 1| phone| 202-225-4576| | 11| 2| twitter| RepTrentFranks| | 12| 0| fax| 202-225-6328| | 12| 1| phone| 202-225-4576| | 12| 2| twitter| RepTrentFranks| | 13| 0| fax| 202-225-6328| | 13| 1| phone| 202-225-4576| | 13| 2| twitter| RepTrentFranks| | 14| 0| fax| 202-225-6328| | 14| 1| phone| 202-225-4576| | 14| 2| twitter| RepTrentFranks| | 15| 0| fax| 202-225-6328| | 15| 1| phone| 202-225-4576| | 15| 2| twitter| RepTrentFranks| | 16| 0| fax| 202-225-6328| | 16| 1| phone| 202-225-4576| | 16| 2| twitter| RepTrentFranks| | 17| 0| fax| 202-225-6328| | 17| 1| phone| 202-225-4576| +---+-----+------------------------+-------------------------+ only showing top 20 rows Inspect the merged DynamicFrame that contains the combined rows +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 1| 0| fax| 202-225-3307| | 1| 1| phone| 202-225-5731| | 20| 0| fax| 202-225-5604| | 20| 1| phone| 202-225-6536| | 20| 2| twitter| USRepLong| +---+-----+------------------------+-------------------------+
relationalize
relationalize(root_table_name, staging_path, options, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)
Converte un DynamicFrame
in un modulo che si inserisce in un database relazionale. La relazionalizzazione di a DynamicFrame
è particolarmente utile quando si desidera spostare dati da un SQL ambiente No come DynamoDB a un database relazionale come My. SQL
La trasformazione genera un elenco di frame rimuovendo le colonne annidate e ruotando le colonne dell'array. La colonna matrice trasformata mediante pivot può essere unita alla tabella root utilizzando la join-key generata durante la fase di annullamento dell'annidamento.
root_table_name
: il nome della tabella root.staging_path
— Il percorso in cui il metodo può memorizzare le partizioni delle tabelle pivotate in formato (opzionale). CSV Le tabelle trasformate mediante pivot vengono rilette da questo percorso.options
: un dizionario dei parametri opzionali.-
transformation_ctx
: una stringa univoca utilizzata per identificare informazioni sullo stato (opzionale). -
info
: una stringa da associare alla segnalazione errori per questa trasformazione (opzionale). -
stageThreshold
: il numero di errori rilevati durante questa trasformazione raggiunto il quale il processo deve interrompersi (facoltativo). L'impostazione predefinita è zero, indicando che il processo non deve terminare. -
totalThreshold
: il numero massimo di errori rilevati durante questa trasformazione raggiunto il quale il processo deve interrompersi (facoltativo). L'impostazione predefinita è zero, indicando che il processo non deve terminare.
Esempio: usa la relazionalizzazione per livellare uno schema annidato in un DynamicFrame
Questo esempio di codice utilizza il metodo relationalize
per livellare uno schema annidato in una forma adatta a un database relazionale.
Set di dati di esempio
L'esempio utilizza un DynamicFrame
chiamato legislators_combined
con lo schema seguente. legislators_combined
ha più campi annidati come links
, images
, econtact_details
, che verranno livellati dalla trasformazione relationalize
.
root |-- role: string |-- seats: int |-- org_name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- type: string |-- sort_name: string |-- area_id: string |-- images: array | |-- element: struct | | |-- url: string |-- on_behalf_of_id: string |-- other_names: array | |-- element: struct | | |-- note: string | | |-- name: string | | |-- lang: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- name: string |-- birth_date: string |-- organization_id: string |-- gender: string |-- classification: string |-- legislative_period_id: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- image: string |-- given_name: string |-- start_date: string |-- family_name: string |-- id: string |-- death_date: string |-- end_date: string
Esempio di codice
# Example: Use relationalize to flatten # a nested schema into a format that fits # into a relational database. # Replace DOC-EXAMPLE-S3-BUCKET/tmpDir with your own location. from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Apply relationalize and inspect new tables legislators_relationalized = legislators_combined.relationalize( "l_root", "
s3://DOC-EXAMPLE-BUCKET/tmpDir
" ) legislators_relationalized.keys() # Compare the schema of the contact_details # nested field to the new relationalized table that # represents it legislators_combined.select_fields("contact_details").printSchema() legislators_relationalized.select("l_root_contact_details").toDF().where( "id = 10 or id = 75" ).orderBy(["id", "index"]).show()
L'output seguente consente di confrontare lo schema del campo annidato chiamato contact_details
con la tabella creata dalla trasformazione relationalize
. Si noti che i record della tabella rimandano alla tabella principale utilizzando una chiave esterna chiamata id
e una colonna index
che rappresenta le posizioni dell'array.
dict_keys(['l_root', 'l_root_images', 'l_root_links', 'l_root_other_names', 'l_root_contact_details', 'l_root_identifiers']) root |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 10| 0| fax| 202-225-4160| | 10| 1| phone| 202-225-3436| | 75| 0| fax| 202-225-6791| | 75| 1| phone| 202-225-2861| | 75| 2| twitter| RepSamFarr| +---+-----+------------------------+-------------------------+
rename_field
rename_field(oldName, newName, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)
Rinomina un campo in questo DynamicFrame
e restituisce un nuovo DynamicFrame
con il campo rinominato.
-
oldName
: il percorso completo al nodo che desideri rinominare.Se il vecchio nome contiene dei punti,
RenameField
non funziona a meno che non venga racchiuso tra virgolette (`
). Ad esempio, per sostituirethis.old.name
conthisNewName
, chiamare rename_field come segue:newDyF = oldDyF.rename_field("`this.old.name`", "thisNewName")
-
newName
: il nuovo nome, come un percorso completo. -
transformation_ctx
: una stringa univoca utilizzata per identificare informazioni sullo stato (opzionale). -
info
: una stringa da associare alla segnalazione errori per questa trasformazione (opzionale). -
stageThreshold
: il numero di errori rilevati durante questa trasformazione raggiunto il quale il processo deve interrompersi (facoltativo). L'impostazione predefinita è zero, indicando che il processo non deve terminare. -
totalThreshold
: il numero massimo di errori rilevati durante questa trasformazione raggiunto il quale il processo deve interrompersi (facoltativo). L'impostazione predefinita è zero, indicando che il processo non deve terminare.
Esempio: usa rename_field per rinominare i campi in un DynamicFrame
Questo esempio di codice utilizza il metodo rename_field
per rinominare i campi in un DynamicFrame
. Si noti che l'esempio utilizza il concatenamento di metodi per rinominare più campi contemporaneamente.
Nota
Per accedere al set di dati utilizzato in questo esempio, consulta Esempio di codice: unione e relazioni dei dati e segui le istruzioni in Fase 1: esecuzione del crawling sui dati nel bucket Amazon S3.
Esempio di codice
# Example: Use rename_field to rename fields # in a DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Inspect the original orgs schema orgs = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="organizations_json" ) print("Original orgs schema: ") orgs.printSchema() # Rename fields and view the new schema orgs = orgs.rename_field("id", "org_id").rename_field("name", "org_name") print("New orgs schema with renamed fields: ") orgs.printSchema()
Original orgs schema: root |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- id: string |-- classification: string |-- name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- image: string |-- seats: int |-- type: string New orgs schema with renamed fields: root |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- classification: string |-- org_id: string |-- org_name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- image: string |-- seats: int |-- type: string
resolveChoice
resolveChoice(specs = None, choice = "" , database = None , table_name = None ,
transformation_ctx="", info="", stageThreshold=0, totalThreshold=0, catalog_id =
None)
Risolve un tipo di scelta all'interno di questo DynamicFrame
e restituisce il nuovo DynamicFrame
.
-
specs
: elenco di ambiguità specifiche da risolvere, ognuna sotto forma di tupla:(field_path, action)
.Ci sono due modi per utilizzare
resolveChoice
. Il primo consiste nell'indicare un argomentospecs
per specificare una sequenza di campi specifici e come risolverli. L'altra modalità perresolveChoice
è usare un singolo argomentochoice
per specificare una singola risoluzione per tutti iChoiceTypes
.I valori per
specs
vengono specificati come tuple costituiti da coppie(field_path, action)
. Il valorefield_path
identifica un elemento ambiguo specifico e il valoreaction
identifica la soluzione corrispondente. Sono disponibili le operazioni seguenti:-
cast:
: tenta di trasmettere tutti i valori al tipo specificato. Ad esempio:type
cast:int
. -
make_cols
: converte ogni tipo distinto in colonna con il nome
. Risolve una potenziale ambiguità livellando i dati. Ad esempio, secolumnName
_type
columnA
fosse unint
o unastring
, la soluzione consisterebbe nel produrre due colonne denominatecolumnA_int
ecolumnA_string
nelDynamicFrame
risultante. -
make_struct
: risolve una potenziale ambiguità utilizzando unastruct
per rappresentare i dati. Ad esempio, se i dati in una colonna sono unint
o unastring
, con l'utilizzo dell'operazionemake_struct
viene prodotta una colonna di strutture nel risultanteDynamicFrame
. Ogni struttura contiene sia unint
che unstring
. -
project:
: risolve una potenziale ambiguità proiettando tutti i dati su uno dei tipi di dati possibili. Ad esempio, se i dati in una colonna sono untype
int
o unastring
, utilizzando un'operazioneproject:string
viene prodotta una colonna nelDynamicFrame
risultante, dove tutti i valoriint
sono stati convertiti in stringhe.
Se il
field_path
identifica un array, inserisci parentesi quadre vuote dopo il nome dell'array per evitare ambiguità. Ad esempio, supponiamo che tu stia lavorando con dati strutturati nel seguente modo:"myList": [ { "price": 100.00 }, { "price": "$100.00" } ]
Puoi selezionare la versione numerica invece di quella di stringa del prezzo impostando il
field_path
su"myList[].price"
e laaction
su"cast:double"
.Nota
Può essere utilizzato solo uno dei parametri
specs
echoice
. Se il parametrospecs
non èNone
, allora il parametrochoice
deve essere una stringa vuota. Viceversa, sechoice
non è una stringa vuota, allora il parametrospecs
deve essereNone
. -
choice
: specifica una singola risoluzione per tutti iChoiceTypes
. Puoi usare questa modalità nei casi in cui l'elenco completo diChoiceTypes
non è noto prima del runtime. Oltre alle operazioni elencate in precedenza perspecs
, questa modalità supporta anche l'operazione seguente:-
match_catalog
: tenta di trasmettere ogniChoiceType
al tipo corrispondente nella tabella del catalogo specificata.
-
-
database
: il database del catalogo dati da usare con l'operazionematch_catalog
. -
table_name
: la tabella del catalogo dati da usare con l'operazionematch_catalog
. -
transformation_ctx
: una stringa univoca utilizzata per identificare informazioni sullo stato (opzionale). -
info
: una stringa da associare alla segnalazione errori per questa trasformazione (opzionale). -
stageThreshold
: il numero di errori rilevati durante questa trasformazione raggiunto il quale il processo deve interrompersi (facoltativo). L'impostazione predefinita è zero, indicando che il processo non deve terminare. -
totalThreshold
: il numero di errori riscontrati fino a questa trasformazione compresa, raggiunto il quale il processo dovrebbe interrompersi (opzionale: impostazione predefinita: zero, a indicare che il processo non dovrebbe interrompersi). -
catalog_id
: l'ID catalogo del catalogo dati a cui si accede (l'ID account del catalogo dati). Se impostato suNone
(valore predefinito), utilizza l'ID catalogo dell'account chiamante.
Esempio: resolveChoice da utilizzare per gestire una colonna che contiene più tipi
Questo esempio di codice utilizza il metodo resolveChoice
per specificare come gestire una colonna DynamicFrame
che contiene valori di più tipi. L'esempio mostra due modi comuni per gestire una colonna con tipi diversi:
Trasforma la colonna in un singolo tipo di dati.
Conserva tutti i tipi in colonne separate.
Set di dati di esempio
Nota
Per accedere al set di dati utilizzato in questo esempio, consulta Esempio di codice: preparazione dei dati utilizzando ResolveChoice, Lambda e ApplyMapping e segui le istruzioni in Fase 1: esecuzione del crawling sui dati nel bucket Amazon S3.
L'esempio utilizza un DynamicFrame
chiamato medicare
con il seguente schema:
root |-- drg definition: string |-- provider id: choice | |-- long | |-- string |-- provider name: string |-- provider street address: string |-- provider city: string |-- provider state: string |-- provider zip code: long |-- hospital referral region description: string |-- total discharges: long |-- average covered charges: string |-- average total payments: string |-- average medicare payments: string
Esempio di codice
# Example: Use resolveChoice to handle # a column that contains multiple types from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Load the input data and inspect the "provider id" column medicare = glueContext.create_dynamic_frame.from_catalog( database="payments", table_name="medicare_hospital_provider_csv" ) print("Inspect the provider id column:") medicare.toDF().select("provider id").show() # Cast provider id to type long medicare_resolved_long = medicare.resolveChoice(specs=[("provider id", "cast:long")]) print("Schema after casting provider id to type long:") medicare_resolved_long.printSchema() medicare_resolved_long.toDF().select("provider id").show() # Create separate columns # for each provider id type medicare_resolved_cols = medicare.resolveChoice(choice="make_cols") print("Schema after creating separate columns for each type:") medicare_resolved_cols.printSchema() medicare_resolved_cols.toDF().select("provider id_long", "provider id_string").show()
Inspect the 'provider id' column: +-----------+ |provider id| +-----------+ | [10001,]| | [10005,]| | [10006,]| | [10011,]| | [10016,]| | [10023,]| | [10029,]| | [10033,]| | [10039,]| | [10040,]| | [10046,]| | [10055,]| | [10056,]| | [10078,]| | [10083,]| | [10085,]| | [10090,]| | [10092,]| | [10100,]| | [10103,]| +-----------+ only showing top 20 rows Schema after casting 'provider id' to type long: root |-- drg definition: string |-- provider id: long |-- provider name: string |-- provider street address: string |-- provider city: string |-- provider state: string |-- provider zip code: long |-- hospital referral region description: string |-- total discharges: long |-- average covered charges: string |-- average total payments: string |-- average medicare payments: string +-----------+ |provider id| +-----------+ | 10001| | 10005| | 10006| | 10011| | 10016| | 10023| | 10029| | 10033| | 10039| | 10040| | 10046| | 10055| | 10056| | 10078| | 10083| | 10085| | 10090| | 10092| | 10100| | 10103| +-----------+ only showing top 20 rows Schema after creating separate columns for each type: root |-- drg definition: string |-- provider id_string: string |-- provider id_long: long |-- provider name: string |-- provider street address: string |-- provider city: string |-- provider state: string |-- provider zip code: long |-- hospital referral region description: string |-- total discharges: long |-- average covered charges: string |-- average total payments: string |-- average medicare payments: string +----------------+------------------+ |provider id_long|provider id_string| +----------------+------------------+ | 10001| null| | 10005| null| | 10006| null| | 10011| null| | 10016| null| | 10023| null| | 10029| null| | 10033| null| | 10039| null| | 10040| null| | 10046| null| | 10055| null| | 10056| null| | 10078| null| | 10083| null| | 10085| null| | 10090| null| | 10092| null| | 10100| null| | 10103| null| +----------------+------------------+ only showing top 20 rows
select_fields
select_fields(paths, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)
Restituisce un nuovo DynamicFrame
contenente i campi selezionati.
-
paths
: un elenco di stringhe. Ogni stringa è un percorso completo di un nodo di livello superiore da selezionare. -
transformation_ctx
: una stringa univoca utilizzata per identificare informazioni sullo stato (opzionale). -
info
: una stringa da associare alla segnalazione errori per questa trasformazione (opzionale). -
stageThreshold
: il numero di errori rilevati durante questa trasformazione raggiunto il quale il processo deve interrompersi (facoltativo). L'impostazione predefinita è zero, indicando che il processo non deve terminare. -
totalThreshold
: il numero massimo di errori rilevati durante questa trasformazione raggiunto il quale il processo deve interrompersi (facoltativo). L'impostazione predefinita è zero, indicando che il processo non deve terminare.
Esempio: usa select_fields per creare un nuovo DynamicFrame
con i campi scelti
L'esempio di codice seguente mostra come utilizzare il metodo select_fields
per creare un nuovo DynamicFrame
con un elenco di campi scelto da un DynamicFrame
esistente.
Nota
Per accedere al set di dati utilizzato in questo esempio, consulta Esempio di codice: unione e relazioni dei dati e segui le istruzioni in Fase 1: esecuzione del crawling sui dati nel bucket Amazon S3.
# Example: Use select_fields to select specific fields from a DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Create a DynamicFrame and view its schema persons = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="persons_json" ) print("Schema for the persons DynamicFrame:") persons.printSchema() # Create a new DynamicFrame with chosen fields names = persons.select_fields(paths=["family_name", "given_name"]) print("Schema for the names DynamicFrame, created with select_fields:") names.printSchema() names.toDF().show()
Schema for the persons DynamicFrame: root |-- family_name: string |-- name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- gender: string |-- image: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- sort_name: string |-- images: array | |-- element: struct | | |-- url: string |-- given_name: string |-- birth_date: string |-- id: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- death_date: string Schema for the names DynamicFrame: root |-- family_name: string |-- given_name: string +-----------+----------+ |family_name|given_name| +-----------+----------+ | Collins| Michael| | Huizenga| Bill| | Clawson| Curtis| | Solomon| Gerald| | Rigell| Edward| | Crapo| Michael| | Hutto| Earl| | Ertel| Allen| | Minish| Joseph| | Andrews| Robert| | Walden| Greg| | Kazen| Abraham| | Turner| Michael| | Kolbe| James| | Lowenthal| Alan| | Capuano| Michael| | Schrader| Kurt| | Nadler| Jerrold| | Graves| Tom| | McMillan| John| +-----------+----------+ only showing top 20 rows
simply_ddb_json
simplify_ddb_json(): DynamicFrame
Semplifica le colonne annidate in una struttura DynamicFrame
che si trova specificamente nella JSON struttura DynamoDB e ne restituisce una nuova semplificata. DynamicFrame
Se ci sono più tipi o tipi di mappa in un tipo di elenco, gli elementi nell'elenco non verranno semplificati. Tieni presente che si tratta di un tipo specifico di trasformazione che si comporta in modo diverso dalla unnest
trasformazione normale e richiede che i dati siano già presenti nella struttura DynamoDBJSON. Per ulteriori informazioni, consulta DynamoDB. JSON
Ad esempio, lo schema di lettura di un'esportazione con la struttura JSON DynamoDB potrebbe essere simile al seguente:
root |-- Item: struct | |-- parentMap: struct | | |-- M: struct | | | |-- childMap: struct | | | | |-- M: struct | | | | | |-- appName: struct | | | | | | |-- S: string | | | | | |-- packageName: struct | | | | | | |-- S: string | | | | | |-- updatedAt: struct | | | | | | |-- N: string | |-- strings: struct | | |-- SS: array | | | |-- element: string | |-- numbers: struct | | |-- NS: array | | | |-- element: string | |-- binaries: struct | | |-- BS: array | | | |-- element: string | |-- isDDBJson: struct | | |-- BOOL: boolean | |-- nullValue: struct | | |-- NULL: boolean
La trasformazione di simplify_ddb_json()
lo convertirebbe in:
root |-- parentMap: struct | |-- childMap: struct | | |-- appName: string | | |-- packageName: string | | |-- updatedAt: string |-- strings: array | |-- element: string |-- numbers: array | |-- element: string |-- binaries: array | |-- element: string |-- isDDBJson: boolean |-- nullValue: null
Esempio: utilizza simply_ddb_json per richiamare un DynamoDB simple JSON
Questo esempio di codice utilizza il simplify_ddb_json
metodo per utilizzare il connettore di esportazione AWS Glue DynamoDB, richiamare un DynamoDB simple e stampare il numero di JSON partizioni.
Esempio di codice
from pyspark.context import SparkContext from awsglue.context import GlueContext sc = SparkContext() glueContext = GlueContext(sc) dynamicFrame = glueContext.create_dynamic_frame.from_options( connection_type = "dynamodb", connection_options = { 'dynamodb.export': 'ddb', 'dynamodb.tableArn': '<table arn>', 'dynamodb.s3.bucket': '<bucket name>', 'dynamodb.s3.prefix': '<bucket prefix>', 'dynamodb.s3.bucketOwner': '<account_id of bucket>' } ) simplified = dynamicFrame.simplify_ddb_json() print(simplified.getNumPartitions())
spigot
spigot(path, options={})
Scrive record di esempio in una destinazione specificata per aiutarti a verificare le trasformazioni eseguite dal tuo lavoro.
-
path
: il percorso della destinazione in cui scrivere (obbligatorio). -
options
: coppie chiave-valore che specificano opzioni (opzionale). L'opzione"topk"
specifica che devono essere scritti i primi recordk
. L'opzione"prob"
specifica la probabilità (sotto forma di valore decimale) di scelta di un dato record. Puoi usarlo per selezionare i record da scrivere. transformation_ctx
: una stringa univoca utilizzata per identificare informazioni sullo stato (opzionale).
Esempio: usa lo spigot per scrivere campi di esempio da DynamicFrame
ad Amazon S3
Questo esempio di codice utilizza il metodo spigot
per scrivere record di esempio in un bucket Amazon S3 dopo aver applicato la trasformazione select_fields
.
Set di dati di esempio
Nota
Per accedere al set di dati utilizzato in questo esempio, consulta Esempio di codice: unione e relazioni dei dati e segui le istruzioni in Fase 1: esecuzione del crawling sui dati nel bucket Amazon S3.
L'esempio utilizza un DynamicFrame
chiamato persons
con il seguente schema:
root |-- family_name: string |-- name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- gender: string |-- image: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- sort_name: string |-- images: array | |-- element: struct | | |-- url: string |-- given_name: string |-- birth_date: string |-- id: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- death_date: string
Esempio di codice
# Example: Use spigot to write sample records # to a destination during a transformation # from pyspark.context import SparkContext. # Replace DOC-EXAMPLE-BUCKET with your own location. from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Load table data into a DynamicFrame persons = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="persons_json" ) # Perform the select_fields on the DynamicFrame persons = persons.select_fields(paths=["family_name", "given_name", "birth_date"]) # Use spigot to write a sample of the transformed data # (the first 10 records) spigot_output = persons.spigot( path="
s3://DOC-EXAMPLE-BUCKET
", options={"topk": 10} )
Di seguito è riportato un esempio di dati che spigot
scrive su Amazon S3. Poiché è stato specificato il codice di esempio options={"topk": 10}
, i dati di esempio contengono i primi 10 record.
{"family_name":"Collins","given_name":"Michael","birth_date":"1944-10-15"} {"family_name":"Huizenga","given_name":"Bill","birth_date":"1969-01-31"} {"family_name":"Clawson","given_name":"Curtis","birth_date":"1959-09-28"} {"family_name":"Solomon","given_name":"Gerald","birth_date":"1930-08-14"} {"family_name":"Rigell","given_name":"Edward","birth_date":"1960-05-28"} {"family_name":"Crapo","given_name":"Michael","birth_date":"1951-05-20"} {"family_name":"Hutto","given_name":"Earl","birth_date":"1926-05-12"} {"family_name":"Ertel","given_name":"Allen","birth_date":"1937-11-07"} {"family_name":"Minish","given_name":"Joseph","birth_date":"1916-09-01"} {"family_name":"Andrews","given_name":"Robert","birth_date":"1957-08-04"}
split_fields
split_fields(paths, name1, name2, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)
Restituisce un nuovo DynamicFrameCollection
che ne contiene due DynamicFrames
. Il primo DynamicFrame
contiene tutti i nodi che sono stati separati e il secondo contiene i nodi rimanenti.
-
paths
: elenco di stringhe, ciascuna delle quali è un percorso completo di un nodo da separare in un nuovo oggettoDynamicFrame
. -
name1
: una stringa nome per ilDynamicFrame
separato. -
name2
: una stringa nome per ilDynamicFrame
che rimane dopo aver separato i nodi specificati. -
transformation_ctx
: una stringa univoca utilizzata per identificare informazioni sullo stato (opzionale). -
info
: una stringa da associare alla segnalazione errori per questa trasformazione (opzionale). -
stageThreshold
: il numero di errori rilevati durante questa trasformazione raggiunto il quale il processo deve interrompersi (facoltativo). L'impostazione predefinita è zero, indicando che il processo non deve terminare. -
totalThreshold
: il numero massimo di errori rilevati durante questa trasformazione raggiunto il quale il processo deve interrompersi (facoltativo). L'impostazione predefinita è zero, indicando che il processo non deve terminare.
Esempio: usare split_fields per dividere i campi selezionati in un campo separato DynamicFrame
Questo esempio di codice utilizza il metodo split_fields
per dividere un elenco di campi specificati in un campo separato DynamicFrame
.
Set di dati di esempio
L'esempio utilizza un DynamicFrame
chiamato l_root_contact_details
che proviene da una raccolta denominata legislators_relationalized
.
l_root_contact_details
ha il seguente schema e le seguenti voci.
root |-- id: long |-- index: int |-- contact_details.val.type: string |-- contact_details.val.value: string +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 1| 0| phone| 202-225-5265| | 1| 1| twitter| kathyhochul| | 2| 0| phone| 202-225-3252| | 2| 1| twitter| repjackyrosen| | 3| 0| fax| 202-225-1314| | 3| 1| phone| 202-225-3772| ...
Esempio di codice
# Example: Use split_fields to split selected # fields into a separate DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Load the input DynamicFrame and inspect its schema frame_to_split = legislators_relationalized.select("l_root_contact_details") print("Inspect the input DynamicFrame schema:") frame_to_split.printSchema() # Split id and index fields into a separate DynamicFrame split_fields_collection = frame_to_split.split_fields(["id", "index"], "left", "right") # Inspect the resulting DynamicFrames print("Inspect the schemas of the DynamicFrames created with split_fields:") split_fields_collection.select("left").printSchema() split_fields_collection.select("right").printSchema()
Inspect the input DynamicFrame's schema: root |-- id: long |-- index: int |-- contact_details.val.type: string |-- contact_details.val.value: string Inspect the schemas of the DynamicFrames created with split_fields: root |-- id: long |-- index: int root |-- contact_details.val.type: string |-- contact_details.val.value: string
split_rows
split_rows(comparison_dict, name1, name2, transformation_ctx="", info="",
stageThreshold=0, totalThreshold=0)
Suddivide una o più righe di un DynamicFrame
in un nuovo DynamicFrame
.
Il metodo restituisce un nuovo DynamicFrameCollection
che contiene due DynamicFrames
. Il primo DynamicFrame
contiene tutti i nodi che sono stati separati e il secondo contiene i nodi rimanenti.
-
comparison_dict
: un dizionario in cui la chiave è un percorso verso una colonna e il valore è un altro dizionario per la mappatura di comparatori rispetto a valori con i quali vengono confrontati i valori di colonna. Ad esempio,{"age": {">": 10, "<": 20}}
divide tutte le righe il cui valore nella colonna età è superiore a 10 e inferiore a 20. -
name1
: una stringa nome per ilDynamicFrame
separato. -
name2
: una stringa nome per ilDynamicFrame
che rimane dopo aver separato i nodi specificati. -
transformation_ctx
: una stringa univoca utilizzata per identificare informazioni sullo stato (opzionale). -
info
: una stringa da associare alla segnalazione errori per questa trasformazione (opzionale). -
stageThreshold
: il numero di errori rilevati durante questa trasformazione raggiunto il quale il processo deve interrompersi (facoltativo). L'impostazione predefinita è zero, indicando che il processo non deve terminare. -
totalThreshold
: il numero massimo di errori rilevati durante questa trasformazione raggiunto il quale il processo deve interrompersi (facoltativo). L'impostazione predefinita è zero, indicando che il processo non deve terminare.
Esempio: usare split_rows per dividere le righe in un DynamicFrame
Questo esempio di codice utilizza il metodo split_rows
per dividere le righe in un DynamicFrame
in base al valore del campo id
.
Set di dati di esempio
L'esempio utilizza un DynamicFrame
chiamato l_root_contact_details
che proviene da una raccolta denominata legislators_relationalized
.
l_root_contact_details
ha il seguente schema e le seguenti voci.
root |-- id: long |-- index: int |-- contact_details.val.type: string |-- contact_details.val.value: string +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 1| 0| phone| 202-225-5265| | 1| 1| twitter| kathyhochul| | 2| 0| phone| 202-225-3252| | 2| 1| twitter| repjackyrosen| | 3| 0| fax| 202-225-1314| | 3| 1| phone| 202-225-3772| | 3| 2| twitter| MikeRossUpdates| | 4| 0| fax| 202-225-1314| | 4| 1| phone| 202-225-3772| | 4| 2| twitter| MikeRossUpdates| | 5| 0| fax| 202-225-1314| | 5| 1| phone| 202-225-3772| | 5| 2| twitter| MikeRossUpdates| | 6| 0| fax| 202-225-1314| | 6| 1| phone| 202-225-3772| | 6| 2| twitter| MikeRossUpdates| | 7| 0| fax| 202-225-1314| | 7| 1| phone| 202-225-3772| | 7| 2| twitter| MikeRossUpdates| | 8| 0| fax| 202-225-1314| +---+-----+------------------------+-------------------------+
Esempio di codice
# Example: Use split_rows to split up # rows in a DynamicFrame based on value from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Retrieve the DynamicFrame to split frame_to_split = legislators_relationalized.select("l_root_contact_details") # Split up rows by ID split_rows_collection = frame_to_split.split_rows({"id": {">": 10}}, "high", "low") # Inspect the resulting DynamicFrames print("Inspect the DynamicFrame that contains IDs < 10") split_rows_collection.select("low").toDF().show() print("Inspect the DynamicFrame that contains IDs > 10") split_rows_collection.select("high").toDF().show()
Inspect the DynamicFrame that contains IDs < 10 +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 1| 0| phone| 202-225-5265| | 1| 1| twitter| kathyhochul| | 2| 0| phone| 202-225-3252| | 2| 1| twitter| repjackyrosen| | 3| 0| fax| 202-225-1314| | 3| 1| phone| 202-225-3772| | 3| 2| twitter| MikeRossUpdates| | 4| 0| fax| 202-225-1314| | 4| 1| phone| 202-225-3772| | 4| 2| twitter| MikeRossUpdates| | 5| 0| fax| 202-225-1314| | 5| 1| phone| 202-225-3772| | 5| 2| twitter| MikeRossUpdates| | 6| 0| fax| 202-225-1314| | 6| 1| phone| 202-225-3772| | 6| 2| twitter| MikeRossUpdates| | 7| 0| fax| 202-225-1314| | 7| 1| phone| 202-225-3772| | 7| 2| twitter| MikeRossUpdates| | 8| 0| fax| 202-225-1314| +---+-----+------------------------+-------------------------+ only showing top 20 rows Inspect the DynamicFrame that contains IDs > 10 +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 11| 0| phone| 202-225-5476| | 11| 1| twitter| RepDavidYoung| | 12| 0| phone| 202-225-4035| | 12| 1| twitter| RepStephMurphy| | 13| 0| fax| 202-226-0774| | 13| 1| phone| 202-225-6335| | 14| 0| fax| 202-226-0774| | 14| 1| phone| 202-225-6335| | 15| 0| fax| 202-226-0774| | 15| 1| phone| 202-225-6335| | 16| 0| fax| 202-226-0774| | 16| 1| phone| 202-225-6335| | 17| 0| fax| 202-226-0774| | 17| 1| phone| 202-225-6335| | 18| 0| fax| 202-226-0774| | 18| 1| phone| 202-225-6335| | 19| 0| fax| 202-226-0774| | 19| 1| phone| 202-225-6335| | 20| 0| fax| 202-226-0774| | 20| 1| phone| 202-225-6335| +---+-----+------------------------+-------------------------+ only showing top 20 rows
unbox
unbox(path, format, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0, **options)
Esegue la conversione unboxing di un campo stringa in un oggetto DynamicFrame
e restituisce un nuovo oggetto DynamicFrame
che contiene gli oggetti DynamicRecords
sottoposti a conversione unboxing.
Un DynamicRecord
rappresenta un record logico all'interno di un DynamicFrame
. È simile a una riga in un DataFrame
Apache Spark, con la differenza che è autodescrittivo e può essere utilizzato per dati non conformi a uno schema fisso.
-
path
: un percorso completo al nodo stringa che desideri cancellare. format
: una specifica del formato (facoltativa). Lo usi per un Amazon S3 o AWS Glue connessione che supporta più formati. Consulta Opzioni del formato dati per input e output in AWS Glue per Spark per informazioni sui formati supportati.-
transformation_ctx
: una stringa univoca utilizzata per identificare informazioni sullo stato (opzionale). -
info
: una stringa da associare alla segnalazione errori per questa trasformazione (opzionale). -
stageThreshold
: il numero di errori rilevati durante questa trasformazione raggiunto il quale il processo deve interrompersi (facoltativo). L'impostazione predefinita è zero, indicando che il processo non deve terminare. -
totalThreshold
: il numero massimo di errori rilevati durante questa trasformazione raggiunto il quale il processo deve interrompersi (facoltativo). L'impostazione predefinita è zero, indicando che il processo non deve terminare. -
options
: una o più delle seguenti:separator
: una stringa che contiene il carattere separatore.escaper
: una stringa che contiene il carattere escape.skipFirst
: un valore Boolean che indica se saltare la prima istanza.-
withSchema
— Una stringa contenente una JSON rappresentazione dello schema del nodo. Il formato della JSON rappresentazione di uno schema è definito dall'output diStructType.json()
. withHeader
: un valore Boolean che indica se è inclusa un'intestazione.
Esempio: usare unbox per decomprimere un campo di stringa in un campo struct
Questo esempio di codice utilizza il metodo unbox
per decomprimere o riformattare un campo di tipo stringa DynamicFrame
in un campo di tipo struct.
Set di dati di esempio
L'esempio utilizza un DynamicFrame
chiamato mapped_with_string
con i seguenti schema e voci:
Nota il campo denominato AddressString
. Questo è il campo che di cui l'esempio esegue l'unboxing in un campo struct.
root |-- Average Total Payments: string |-- AddressString: string |-- Average Covered Charges: string |-- DRG Definition: string |-- Average Medicare Payments: string |-- Hospital Referral Region Description: string |-- Address: struct | |-- Zip.Code: string | |-- City: string | |-- Array: array | | |-- element: string | |-- State: string | |-- Street: string |-- Provider Id: string |-- Total Discharges: string |-- Provider Name: string +----------------------+--------------------+-----------------------+--------------------+-------------------------+------------------------------------+--------------------+-----------+----------------+--------------------+ |Average Total Payments| AddressString|Average Covered Charges| DRG Definition|Average Medicare Payments|Hospital Referral Region Description| Address|Provider Id|Total Discharges| Provider Name| +----------------------+--------------------+-----------------------+--------------------+-------------------------+------------------------------------+--------------------+-----------+----------------+--------------------+ | $5777.24|{"Street": "1108 ...| $32963.07|039 - EXTRACRANIA...| $4763.73| AL - Dothan|[36301, DOTHAN, [...| 10001| 91|SOUTHEAST ALABAMA...| | $5787.57|{"Street": "2505 ...| $15131.85|039 - EXTRACRANIA...| $4976.71| AL - Birmingham|[35957, BOAZ, [25...| 10005| 14|MARSHALL MEDICAL ...| | $5434.95|{"Street": "205 M...| $37560.37|039 - EXTRACRANIA...| $4453.79| AL - Birmingham|[35631, FLORENCE,...| 10006| 24|ELIZA COFFEE MEMO...| | $5417.56|{"Street": "50 ME...| $13998.28|039 - EXTRACRANIA...| $4129.16| AL - Birmingham|[35235, BIRMINGHA...| 10011| 25| ST VINCENT'S EAST| ...
Esempio di codice
# Example: Use unbox to unbox a string field # into a struct in a DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) unboxed = mapped_with_string.unbox("AddressString", "json") unboxed.printSchema() unboxed.toDF().show()
root |-- Average Total Payments: string |-- AddressString: struct | |-- Street: string | |-- City: string | |-- State: string | |-- Zip.Code: string | |-- Array: array | | |-- element: string |-- Average Covered Charges: string |-- DRG Definition: string |-- Average Medicare Payments: string |-- Hospital Referral Region Description: string |-- Address: struct | |-- Zip.Code: string | |-- City: string | |-- Array: array | | |-- element: string | |-- State: string | |-- Street: string |-- Provider Id: string |-- Total Discharges: string |-- Provider Name: string +----------------------+--------------------+-----------------------+--------------------+-------------------------+------------------------------------+--------------------+-----------+----------------+--------------------+ |Average Total Payments| AddressString|Average Covered Charges| DRG Definition|Average Medicare Payments|Hospital Referral Region Description| Address|Provider Id|Total Discharges| Provider Name| +----------------------+--------------------+-----------------------+--------------------+-------------------------+------------------------------------+--------------------+-----------+----------------+--------------------+ | $5777.24|[1108 ROSS CLARK ...| $32963.07|039 - EXTRACRANIA...| $4763.73| AL - Dothan|[36301, DOTHAN, [...| 10001| 91|SOUTHEAST ALABAMA...| | $5787.57|[2505 U S HIGHWAY...| $15131.85|039 - EXTRACRANIA...| $4976.71| AL - Birmingham|[35957, BOAZ, [25...| 10005| 14|MARSHALL MEDICAL ...| | $5434.95|[205 MARENGO STRE...| $37560.37|039 - EXTRACRANIA...| $4453.79| AL - Birmingham|[35631, FLORENCE,...| 10006| 24|ELIZA COFFEE MEMO...| | $5417.56|[50 MEDICAL PARK ...| $13998.28|039 - EXTRACRANIA...| $4129.16| AL - Birmingham|[35235, BIRMINGHA...| 10011| 25| ST VINCENT'S EAST| | $5658.33|[1000 FIRST STREE...| $31633.27|039 - EXTRACRANIA...| $4851.44| AL - Birmingham|[35007, ALABASTER...| 10016| 18|SHELBY BAPTIST ME...| | $6653.80|[2105 EAST SOUTH ...| $16920.79|039 - EXTRACRANIA...| $5374.14| AL - Montgomery|[36116, MONTGOMER...| 10023| 67|BAPTIST MEDICAL C...| | $5834.74|[2000 PEPPERELL P...| $11977.13|039 - EXTRACRANIA...| $4761.41| AL - Birmingham|[36801, OPELIKA, ...| 10029| 51|EAST ALABAMA MEDI...| | $8031.12|[619 SOUTH 19TH S...| $35841.09|039 - EXTRACRANIA...| $5858.50| AL - Birmingham|[35233, BIRMINGHA...| 10033| 32|UNIVERSITY OF ALA...| | $6113.38|[101 SIVLEY RD, H...| $28523.39|039 - EXTRACRANIA...| $5228.40| AL - Huntsville|[35801, HUNTSVILL...| 10039| 135| HUNTSVILLE HOSPITAL| | $5541.05|[1007 GOODYEAR AV...| $75233.38|039 - EXTRACRANIA...| $4386.94| AL - Birmingham|[35903, GADSDEN, ...| 10040| 34|GADSDEN REGIONAL ...| | $5461.57|[600 SOUTH THIRD ...| $67327.92|039 - EXTRACRANIA...| $4493.57| AL - Birmingham|[35901, GADSDEN, ...| 10046| 14|RIVERVIEW REGIONA...| | $5356.28|[4370 WEST MAIN S...| $39607.28|039 - EXTRACRANIA...| $4408.20| AL - Dothan|[36305, DOTHAN, [...| 10055| 45| FLOWERS HOSPITAL| | $5374.65|[810 ST VINCENT'S...| $22862.23|039 - EXTRACRANIA...| $4186.02| AL - Birmingham|[35205, BIRMINGHA...| 10056| 43|ST VINCENT'S BIRM...| | $5366.23|[400 EAST 10TH ST...| $31110.85|039 - EXTRACRANIA...| $4376.23| AL - Birmingham|[36207, ANNISTON,...| 10078| 21|NORTHEAST ALABAMA...| | $5282.93|[1613 NORTH MCKEN...| $25411.33|039 - EXTRACRANIA...| $4383.73| AL - Mobile|[36535, FOLEY, [1...| 10083| 15|SOUTH BALDWIN REG...| | $5676.55|[1201 7TH STREET ...| $9234.51|039 - EXTRACRANIA...| $4509.11| AL - Huntsville|[35609, DECATUR, ...| 10085| 27|DECATUR GENERAL H...| | $5930.11|[6801 AIRPORT BOU...| $15895.85|039 - EXTRACRANIA...| $3972.85| AL - Mobile|[36608, MOBILE, [...| 10090| 27| PROVIDENCE HOSPITAL| | $6192.54|[809 UNIVERSITY B...| $19721.16|039 - EXTRACRANIA...| $5179.38| AL - Tuscaloosa|[35401, TUSCALOOS...| 10092| 31|D C H REGIONAL ME...| | $4968.00|[750 MORPHY AVENU...| $10710.88|039 - EXTRACRANIA...| $3898.88| AL - Mobile|[36532, FAIRHOPE,...| 10100| 18| THOMAS HOSPITAL| | $5996.00|[701 PRINCETON AV...| $51343.75|039 - EXTRACRANIA...| $4962.45| AL - Birmingham|[35211, BIRMINGHA...| 10103| 33|BAPTIST MEDICAL C...| +----------------------+--------------------+-----------------------+--------------------+-------------------------+------------------------------------+--------------------+-----------+----------------+--------------------+ only showing top 20 rows
unione
union(frame1, frame2, transformation_ctx = "",
info = "", stageThreshold = 0, totalThreshold = 0)
Unione due DynamicFrames. Restituzioni DynamicFrame contenenti tutti i record di entrambi gli input DynamicFrames. Questa trasformazione può restituire risultati diversi dall'unione di due DataFrames con dati equivalenti. Se hai bisogno del comportamento dell' DataFrame unione Spark, prendi in considerazione l'utilizzo toDF
di.
-
frame1
— I primi DynamicFrame a unirsi. -
frame2
— Seconda dopo DynamicFrame l'unione. -
transformation_ctx
: (facoltativo) una stringa univoca utilizzata per identificare informazioni su statistiche/stato -
info
: (facoltativo) qualsiasi stringa da associare agli errori nella trasformazione -
stageThreshold
: (facoltativo) numero massimo di errori nella trasformazione fino a che l'elaborazione si interrompe a causa di un errore -
totalThreshold
: (facoltativo) numero massimo di errori totali fino a che l'elaborazione si interrompe a causa di un errore.
unnest
unnest(transformation_ctx="", info="", stageThreshold=0,
totalThreshold=0)
Annulla l'annidamento di oggetti nidificati in un DynamicFrame
rendendoli oggetti di primo livello e restituendo un nuovo DynamicFrame
non nidificato.
-
transformation_ctx
: una stringa univoca utilizzata per identificare informazioni sullo stato (opzionale). -
info
: una stringa da associare alla segnalazione errori per questa trasformazione (opzionale). -
stageThreshold
: il numero di errori rilevati durante questa trasformazione raggiunto il quale il processo deve interrompersi (facoltativo). L'impostazione predefinita è zero, indicando che il processo non deve terminare. -
totalThreshold
: il numero massimo di errori rilevati durante questa trasformazione raggiunto il quale il processo deve interrompersi (facoltativo). L'impostazione predefinita è zero, indicando che il processo non deve terminare.
Esempio: usare unnest per trasformare i campi annidati in campi di primo livello
Questo esempio di codice utilizza il metodo unnest
per raggruppare tutti i campi annidati di aDynamicFrame
in campi di primo livello.
Set di dati di esempio
L'esempio utilizza un DynamicFrame
chiamato mapped_medicare
con il seguente schema. Nota che il campo Address
è l'unico campo che contiene dati annidati.
root |-- Average Total Payments: string |-- Average Covered Charges: string |-- DRG Definition: string |-- Average Medicare Payments: string |-- Hospital Referral Region Description: string |-- Address: struct | |-- Zip.Code: string | |-- City: string | |-- Array: array | | |-- element: string | |-- State: string | |-- Street: string |-- Provider Id: string |-- Total Discharges: string |-- Provider Name: string
Esempio di codice
# Example: Use unnest to unnest nested # objects in a DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Unnest all nested fields unnested = mapped_medicare.unnest() unnested.printSchema()
root |-- Average Total Payments: string |-- Average Covered Charges: string |-- DRG Definition: string |-- Average Medicare Payments: string |-- Hospital Referral Region Description: string |-- Address.Zip.Code: string |-- Address.City: string |-- Address.Array: array | |-- element: string |-- Address.State: string |-- Address.Street: string |-- Provider Id: string |-- Total Discharges: string |-- Provider Name: string
unnest_ddb_json
Snidifica le colonne nidificate in un DynamicFrame
elemento specifico della JSON struttura DynamoDB e ne restituisce una nuova non annidata. DynamicFrame
Le colonne che sono di un array di struct non verranno annidate. Tieni presente che si tratta di un tipo specifico di trasformazione di unnesting che si comporta in modo diverso dalla unnest
trasformazione normale e richiede che i dati siano già presenti nella struttura DynamoDB. JSON Per ulteriori informazioni, consulta DynamoDB. JSON
unnest_ddb_json(transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)
-
transformation_ctx
: una stringa univoca utilizzata per identificare informazioni sullo stato (opzionale). -
info
: una stringa da associare alla segnalazione errori per questa trasformazione (opzionale). -
stageThreshold
: il numero di errori riscontrati durante questa trasformazione, raggiunto il quale il processo dovrebbe interrompersi (opzionale impostazione predefinita: zero, a indicare che il processo non dovrebbe interrompersi). -
totalThreshold
: il numero di errori riscontrati fino a questa trasformazione compresa, raggiunto il quale il processo dovrebbe interrompersi (opzionale: impostazione predefinita: zero, a indicare che il processo non dovrebbe interrompersi).
Ad esempio, lo schema di lettura di un'esportazione con la struttura JSON DynamoDB potrebbe essere simile al seguente:
root |-- Item: struct | |-- ColA: struct | | |-- S: string | |-- ColB: struct | | |-- S: string | |-- ColC: struct | | |-- N: string | |-- ColD: struct | | |-- L: array | | | |-- element: null
La trasformazione di unnest_ddb_json()
lo convertirebbe in:
root |-- ColA: string |-- ColB: string |-- ColC: string |-- ColD: array | |-- element: null
L'esempio di codice seguente mostra come utilizzare il connettore di esportazione AWS Glue DynamoDB, richiamare un unnest DynamoDB e stampare il numero di JSON partizioni:
import sys from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job from awsglue.utils import getResolvedOptions args = getResolvedOptions(sys.argv, ["JOB_NAME"]) glue_context= GlueContext(SparkContext.getOrCreate()) job = Job(glue_context) job.init(args["JOB_NAME"], args) dynamicFrame = glue_context.create_dynamic_frame.from_options( connection_type="dynamodb", connection_options={ "dynamodb.export": "ddb", "dynamodb.tableArn": "<test_source>", "dynamodb.s3.bucket": "<bucket name>", "dynamodb.s3.prefix": "<bucket prefix>", "dynamodb.s3.bucketOwner": "<account_id>", } ) unnested = dynamicFrame.unnest_ddb_json() print(unnested.getNumPartitions()) job.commit()
write
write(connection_type, connection_options, format, format_options, accumulator_size)
Ottiene un DataSink(object) del tipo di connessione specificata da GlueContext classe di questo DynamicFrame
e lo utilizza per formattare e scrivere i contenuti di questo DynamicFrame
. Restituisce il nuovo DynamicFrame
formattato e scritto come specificato.
-
connection_type
: il tipo di connessione da utilizzare. I valori validi sonos3
,mysql
,postgresql
,redshift
,sqlserver
eoracle
. -
connection_options
: l'opzione di connessione da utilizzare (opzionale). Per unconnection_type
dis3
è definito un percorso Amazon S3.connection_options = {"path": "
s3://aws-glue-target/temp
"}Per le JDBC connessioni, è necessario definire diverse proprietà. Si noti che il nome del database deve far parte diURL. Puoi opzionalmente essere incluso nelle opzioni di connessione.
avvertimento
Si consiglia di non archiviare le password nello script. Valuta la possibilità
boto3
di utilizzarli per recuperarli da AWS Secrets Manager o dal AWS Glue Data Catalog.connection_options = {"url": "
jdbc-url/database
", "user": "username
", "password":passwordVariable
,"dbtable": "table-name
", "redshiftTmpDir": "s3-tempdir-path
"} format
: una specifica del formato (facoltativa). Viene utilizzato per Amazon Simple Storage Service (Amazon S3) o un AWS Glue connessione che supporta più formati. Consulta Opzioni del formato dati per input e output in AWS Glue per Spark per informazioni sui formati supportati.format_options
: opzioni di formato per il formato specificato. Consulta Opzioni del formato dati per input e output in AWS Glue per Spark per informazioni sui formati supportati.accumulator_size
: la dimensione accumulabile da utilizzare, in byte (facoltativa).
— errori —
assertErrorThreshold
assertErrorThreshold( )
: asserzione per gli errori nelle trasformazioni che hanno creato questo oggetto DynamicFrame
. Restituisce una Exception
dal DataFrame
sottostante.
errorsAsDynamicCornice
errorsAsDynamicFrame( )
: restituisce un DynamicFrame
che ha record di errore nidificati al suo interno.
Esempio: utilizzare errorsAsDynamic Frame per visualizzare i record di errori
L'esempio di codice seguente mostra come utilizzare il metodo errorsAsDynamicFrame
per visualizzare un record degli errori per un DynamicFrame
.
Set di dati di esempio
L'esempio utilizza il seguente set di dati che puoi caricare su Amazon JSON S3 come. Tieni presente che il formato del secondo record non è corretto. I dati non validi in genere interrompono l'analisi dei file quando usi Spark. SQL DynamicFrame
, tuttavia, riconosce i problemi di formato non corretto e trasforma le righe con formato non corretto in record degli errori che puoi gestire singolarmente.
{"id": 1, "name": "george", "surname": "washington", "height": 178} {"id": 2, "name": "benjamin", "surname": "franklin", {"id": 3, "name": "alexander", "surname": "hamilton", "height": 171} {"id": 4, "name": "john", "surname": "jay", "height": 190}
Esempio di codice
# Example: Use errorsAsDynamicFrame to view error records. # Replace s3://DOC-EXAMPLE-S3-BUCKET/error_data.json with your location. from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Create errors DynamicFrame, view schema errors = glueContext.create_dynamic_frame.from_options( "s3", {"paths": ["
s3://DOC-EXAMPLE-S3-BUCKET/error_data.json
"]}, "json" ) print("Schema of errors DynamicFrame:") errors.printSchema() # Show that errors only contains valid entries from the dataset print("errors contains only valid records from the input dataset (2 of 4 records)") errors.toDF().show() # View errors print("Errors count:", str(errors.errorsCount())) print("Errors:") errors.errorsAsDynamicFrame().toDF().show() # View error fields and error data error_record = errors.errorsAsDynamicFrame().toDF().head() error_fields = error_record["error"] print("Error fields: ") print(error_fields.asDict().keys()) print("\nError record data:") for key in error_fields.asDict().keys(): print("\n", key, ": ", str(error_fields[key]))
Schema of errors DynamicFrame: root |-- id: int |-- name: string |-- surname: string |-- height: int errors contains only valid records from the input dataset (2 of 4 records) +---+------+----------+------+ | id| name| surname|height| +---+------+----------+------+ | 1|george|washington| 178| | 4| john| jay| 190| +---+------+----------+------+ Errors count: 1 Errors: +--------------------+ | error| +--------------------+ |[[ File "/tmp/20...| +--------------------+ Error fields: dict_keys(['callsite', 'msg', 'stackTrace', 'input', 'bytesread', 'source', 'dynamicRecord']) Error record data: callsite : Row(site=' File "/tmp/2060612586885849088", line 549, in <module>\n sys.exit(main())\n File "/tmp/2060612586885849088", line 523, in main\n response = handler(content)\n File "/tmp/2060612586885849088", line 197, in execute_request\n result = node.execute()\n File "/tmp/2060612586885849088", line 103, in execute\n exec(code, global_dict)\n File "<stdin>", line 10, in <module>\n File "/opt/amazon/lib/python3.6/site-packages/awsglue/dynamicframe.py", line 625, in from_options\n format_options, transformation_ctx, push_down_predicate, **kwargs)\n File "/opt/amazon/lib/python3.6/site-packages/awsglue/context.py", line 233, in create_dynamic_frame_from_options\n source.setFormat(format, **format_options)\n', info='') msg : error in jackson reader stackTrace : com.fasterxml.jackson.core.JsonParseException: Unexpected character ('{' (code 123)): was expecting either valid name character (for unquoted name) or double-quote (for quoted) to start field name at [Source: com.amazonaws.services.glue.readers.BufferedStream@73492578; line: 3, column: 2] at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1581) at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:533) at com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:462) at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleOddName(UTF8StreamJsonParser.java:2012) at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._parseName(UTF8StreamJsonParser.java:1650) at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:740) at com.amazonaws.services.glue.readers.JacksonReader$$anonfun$hasNextGoodToken$1.apply(JacksonReader.scala:57) at com.amazonaws.services.glue.readers.JacksonReader$$anonfun$hasNextGoodToken$1.apply(JacksonReader.scala:57) at scala.collection.Iterator$$anon$9.next(Iterator.scala:162) at scala.collection.Iterator$$anon$16.hasNext(Iterator.scala:599) at scala.collection.Iterator$$anon$16.hasNext(Iterator.scala:598) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at com.amazonaws.services.glue.readers.JacksonReader$$anonfun$1.apply(JacksonReader.scala:120) at com.amazonaws.services.glue.readers.JacksonReader$$anonfun$1.apply(JacksonReader.scala:116) at com.amazonaws.services.glue.DynamicRecordBuilder.handleErr(DynamicRecordBuilder.scala:209) at com.amazonaws.services.glue.DynamicRecordBuilder.handleErrorWithException(DynamicRecordBuilder.scala:202) at com.amazonaws.services.glue.readers.JacksonReader.nextFailSafe(JacksonReader.scala:116) at com.amazonaws.services.glue.readers.JacksonReader.next(JacksonReader.scala:109) at com.amazonaws.services.glue.readers.JSONReader.next(JSONReader.scala:247) at com.amazonaws.services.glue.hadoop.TapeHadoopRecordReaderSplittable.nextKeyValue(TapeHadoopRecordReaderSplittable.scala:103) at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:230) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:121) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) input : bytesread : 252 source : dynamicRecord : Row(id=2, name='benjamin', surname='franklin')
errorsCount
errorsCount( )
: restituisce il numero totale di errori in un oggetto DynamicFrame
.
stageErrorsCount
stageErrorsCount
: restituisce il numero di errori che si sono verificati nel processo di generazione di questo oggetto DynamicFrame
.