Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.
Il set di dati utilizzato in questo esempio è costituito dai dati di pagamento di Medicare Provider scaricati da due set di dati Data.cms.govs3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv
.
È possibile trovare data_cleaning_and_lambda.py
il codice sorgente di questo esempio nel file in AWS Glue
Il modo preferito per eseguire il debug di Python PySpark o degli script durante l'esecuzione consiste nell'utilizzare AWS Notebooks su Glue Studio. AWS
Fase 1: esecuzione del crawling sui dati nel bucket Amazon S3
Accedi a e apri il AWS Management Console AWS Glue console all'indirizzo https://console.aws.amazon.com/glue/
. -
Seguendo il processo descritto inConfigurazione di un crawler, create un nuovo crawler in grado di eseguire la scansione del
s3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv
file e di inserire i metadati risultanti in un database denominato nel AWS Glue Datapayments
Catalog. -
Esegui il nuovo crawler e controlla il database
payments
. Il crawler dovrebbe aver creato una tabella di metadati denominatamedicare
nel database dopo aver letto l'inizio del file per determinarne il formato e il delimitatore.Lo schema della nuova tabella
medicare
è il seguente:Column name Data type ================================================== drg definition string provider id bigint provider name string provider street address string provider city string provider state string provider zip code bigint hospital referral region description string total discharges bigint average covered charges string average total payments string average medicare payments string
Fase 2: aggiunta dello script boilerplate al notebook degli endpoint di sviluppo
Incolla il seguente script boilerplate nel notebook dell'endpoint di sviluppo per importare il AWS Glue le librerie di cui hai bisogno e configurane una singola: GlueContext
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
glueContext = GlueContext(SparkContext.getOrCreate())
Fase 3: confronta differenti analisi di schema
Successivamente, puoi vedere se lo schema riconosciuto da Apache Spark DataFrame
è lo stesso del tuo AWS Glue crawler registrato. Esegui questo codice:
medicare = spark.read.format(
"com.databricks.spark.csv").option(
"header", "true").option(
"inferSchema", "true").load(
's3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv')
medicare.printSchema()
Ecco l'output dalla chiamata printSchema
:
root
|-- DRG Definition: string (nullable = true)
|-- Provider Id: string (nullable = true)
|-- Provider Name: string (nullable = true)
|-- Provider Street Address: string (nullable = true)
|-- Provider City: string (nullable = true)
|-- Provider State: string (nullable = true)
|-- Provider Zip Code: integer (nullable = true)
|-- Hospital Referral Region Description: string (nullable = true)
|-- Total Discharges : integer (nullable = true)
|-- Average Covered Charges : string (nullable = true)
|-- Average Total Payments : string (nullable = true)
|-- Average Medicare Payments: string (nullable = true)
Quindi, guarda lo schema che un AWS Glue DynamicFrame
genera:
medicare_dynamicframe = glueContext.create_dynamic_frame.from_catalog(
database = "payments",
table_name = "medicare")
medicare_dynamicframe.printSchema()
L'output printSchema
è il seguente:
root
|-- drg definition: string
|-- provider id: choice
| |-- long
| |-- string
|-- provider name: string
|-- provider street address: string
|-- provider city: string
|-- provider state: string
|-- provider zip code: long
|-- hospital referral region description: string
|-- total discharges: long
|-- average covered charges: string
|-- average total payments: string
|-- average medicare payments: string
Il DynamicFrame
genera uno schema in cui provider id
potrebbe essere un tipo long
o string
. Lo schema DataFrame
elenca Provider Id
come tipo string
e il catalogo dati elenca provider id
come tipo bigint
.
Qual è corretto? Sono disponibili due record alla fine del file (su 160.000 record) con i valori string
nella colonna. Questi sono i record errati che sono stati introdotti per illustrare un problema.
Per risolvere questo tipo di problema, il AWS Glue DynamicFrame
introduce il concetto di tipo di scelta. In questo caso, DynamicFrame
mostra che entrambi i valori long
e string
possono essere visualizzati nella colonna. Il AWS Glue il crawler non ha inserito i string
valori perché considerava solo un prefisso di 2 MB dei dati. L'Apache Spark DataFrame
ha considerato l'intero set di dati, ma è stato costretto ad assegnare il tipo più generale alla colonna, ossia string
. Infatti, Spark spesso ricorre al caso più generale quando non ci sono tipi complessi o variazioni con cui non è familiare.
Per eseguire una query sulla colonna provider id
, risolvi prima il tipo di scelta. Puoi utilizzare il metodo di trasformazione resolveChoice
in DynamicFrame
per convertire quei valori string
in valori long
con un'opzione cast:long
:
medicare_res = medicare_dynamicframe.resolveChoice(specs = [('provider id','cast:long')])
medicare_res.printSchema()
L'output printSchema
è ora:
root
|-- drg definition: string
|-- provider id: long
|-- provider name: string
|-- provider street address: string
|-- provider city: string
|-- provider state: string
|-- provider zip code: long
|-- hospital referral region description: string
|-- total discharges: long
|-- average covered charges: string
|-- average total payments: string
|-- average medicare payments: string
Se il valore era un string
che non poteva essere espresso, AWS Glue ha inserito unnull
.
Un'altra opzione consiste nel convertire il tipo di scelta in struct
, che mantiene i valori di entrambi i tipi.
Quindi, esaminare le righe anomale:
medicare_res.toDF().where("'provider id' is NULL").show()
Verrà visualizzato quanto segue:
+--------------------+-----------+---------------+-----------------------+-------------+--------------+-----------------+------------------------------------+----------------+-----------------------+----------------------+-------------------------+
| drg definition|provider id| provider name|provider street address|provider city|provider state|provider zip code|hospital referral region description|total discharges|average covered charges|average total payments|average medicare payments|
+--------------------+-----------+---------------+-----------------------+-------------+--------------+-----------------+------------------------------------+----------------+-----------------------+----------------------+-------------------------+
|948 - SIGNS & SYM...| null| INC| 1050 DIVISION ST| MAUSTON| WI| 53948| WI - Madison| 12| $11961.41| $4619.00| $3775.33|
|948 - SIGNS & SYM...| null| INC- ST JOSEPH| 5000 W CHAMBERS ST| MILWAUKEE| WI| 53210| WI - Milwaukee| 14| $10514.28| $5562.50| $4522.78|
+--------------------+-----------+---------------+-----------------------+-------------+--------------+-----------------+------------------------------------+----------------+-----------------------+----------------------+-------------------------+
Ora rimuovi i due record difettosi, come segue:
medicare_dataframe = medicare_res.toDF()
medicare_dataframe = medicare_dataframe.where("'provider id' is NOT NULL")
Fase 4: mappatura dei dati e utilizzo di funzioni Lambda Apache Spark
AWS Glue non supporta ancora direttamente le funzioni Lambda, note anche come funzioni definite dall'utente. Tuttavia, puoi sempre convertire un DynamicFrame
in e da un DataFrame
Apache Spark per trarre vantaggio dalle funzionalità Spark, oltre alle funzionalità speciali di DynamicFrames
.
Trasforma quindi i dati di pagamento in numeri, in modo che i motori di analisi come Amazon Redshift o Amazon Athena possano eseguire i calcoli più rapidamente:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
chop_f = udf(lambda x: x[1:], StringType())
medicare_dataframe = medicare_dataframe.withColumn(
"ACC", chop_f(
medicare_dataframe["average covered charges"])).withColumn(
"ATP", chop_f(
medicare_dataframe["average total payments"])).withColumn(
"AMP", chop_f(
medicare_dataframe["average medicare payments"]))
medicare_dataframe.select(['ACC', 'ATP', 'AMP']).show()
L'output dalla chiamata show
è:
+--------+-------+-------+
| ACC| ATP| AMP|
+--------+-------+-------+
|32963.07|5777.24|4763.73|
|15131.85|5787.57|4976.71|
|37560.37|5434.95|4453.79|
|13998.28|5417.56|4129.16|
|31633.27|5658.33|4851.44|
|16920.79|6653.80|5374.14|
|11977.13|5834.74|4761.41|
|35841.09|8031.12|5858.50|
|28523.39|6113.38|5228.40|
|75233.38|5541.05|4386.94|
|67327.92|5461.57|4493.57|
|39607.28|5356.28|4408.20|
|22862.23|5374.65|4186.02|
|31110.85|5366.23|4376.23|
|25411.33|5282.93|4383.73|
| 9234.51|5676.55|4509.11|
|15895.85|5930.11|3972.85|
|19721.16|6192.54|5179.38|
|10710.88|4968.00|3898.88|
|51343.75|5996.00|4962.45|
+--------+-------+-------+
only showing top 20 rows
Questi sono ancora tutte stringhe nei dati. Puoi utilizzare il potente metodo di trasformazione apply_mapping
per eliminare, rinominare, trasmettere e nidificare i dati in modo che i dati di altri linguaggi di programmazione e sistemi possano accedere facilmente:
from awsglue.dynamicframe import DynamicFrame
medicare_tmp_dyf = DynamicFrame.fromDF(medicare_dataframe, glueContext, "nested")
medicare_nest_dyf = medicare_tmp_dyf.apply_mapping([('drg definition', 'string', 'drg', 'string'),
('provider id', 'long', 'provider.id', 'long'),
('provider name', 'string', 'provider.name', 'string'),
('provider city', 'string', 'provider.city', 'string'),
('provider state', 'string', 'provider.state', 'string'),
('provider zip code', 'long', 'provider.zip', 'long'),
('hospital referral region description', 'string','rr', 'string'),
('ACC', 'string', 'charges.covered', 'double'),
('ATP', 'string', 'charges.total_pay', 'double'),
('AMP', 'string', 'charges.medicare_pay', 'double')])
medicare_nest_dyf.printSchema()
L'output printSchema
è il seguente:
root
|-- drg: string
|-- provider: struct
| |-- id: long
| |-- name: string
| |-- city: string
| |-- state: string
| |-- zip: long
|-- rr: string
|-- charges: struct
| |-- covered: double
| |-- total_pay: double
| |-- medicare_pay: double
Trasformando i dati in unDataFrame
Spark, puoi visualizzare quello che appare ora:
medicare_nest_dyf.toDF().show()
L'output è il seguente:
+--------------------+--------------------+---------------+--------------------+
| drg| provider| rr| charges|
+--------------------+--------------------+---------------+--------------------+
|039 - EXTRACRANIA...|[10001,SOUTHEAST ...| AL - Dothan|[32963.07,5777.24...|
|039 - EXTRACRANIA...|[10005,MARSHALL M...|AL - Birmingham|[15131.85,5787.57...|
|039 - EXTRACRANIA...|[10006,ELIZA COFF...|AL - Birmingham|[37560.37,5434.95...|
|039 - EXTRACRANIA...|[10011,ST VINCENT...|AL - Birmingham|[13998.28,5417.56...|
|039 - EXTRACRANIA...|[10016,SHELBY BAP...|AL - Birmingham|[31633.27,5658.33...|
|039 - EXTRACRANIA...|[10023,BAPTIST ME...|AL - Montgomery|[16920.79,6653.8,...|
|039 - EXTRACRANIA...|[10029,EAST ALABA...|AL - Birmingham|[11977.13,5834.74...|
|039 - EXTRACRANIA...|[10033,UNIVERSITY...|AL - Birmingham|[35841.09,8031.12...|
|039 - EXTRACRANIA...|[10039,HUNTSVILLE...|AL - Huntsville|[28523.39,6113.38...|
|039 - EXTRACRANIA...|[10040,GADSDEN RE...|AL - Birmingham|[75233.38,5541.05...|
|039 - EXTRACRANIA...|[10046,RIVERVIEW ...|AL - Birmingham|[67327.92,5461.57...|
|039 - EXTRACRANIA...|[10055,FLOWERS HO...| AL - Dothan|[39607.28,5356.28...|
|039 - EXTRACRANIA...|[10056,ST VINCENT...|AL - Birmingham|[22862.23,5374.65...|
|039 - EXTRACRANIA...|[10078,NORTHEAST ...|AL - Birmingham|[31110.85,5366.23...|
|039 - EXTRACRANIA...|[10083,SOUTH BALD...| AL - Mobile|[25411.33,5282.93...|
|039 - EXTRACRANIA...|[10085,DECATUR GE...|AL - Huntsville|[9234.51,5676.55,...|
|039 - EXTRACRANIA...|[10090,PROVIDENCE...| AL - Mobile|[15895.85,5930.11...|
|039 - EXTRACRANIA...|[10092,D C H REGI...|AL - Tuscaloosa|[19721.16,6192.54...|
|039 - EXTRACRANIA...|[10100,THOMAS HOS...| AL - Mobile|[10710.88,4968.0,...|
|039 - EXTRACRANIA...|[10103,BAPTIST ME...|AL - Birmingham|[51343.75,5996.0,...|
+--------------------+--------------------+---------------+--------------------+
only showing top 20 rows
Fase 5: scrittura dei dati in Apache Parquet
AWS Glue semplifica la scrittura dei dati in un formato come Apache Parquet che i database relazionali possono utilizzare efficacemente:
glueContext.write_dynamic_frame.from_options(
frame = medicare_nest_dyf,
connection_type = "s3",
connection_options = {"path": "s3://glue-sample-target/output-dir/medicare_parquet"},
format = "parquet")