Qualità dei dati per lavori ETL nei notebook AWS Glue Studio - AWS Glue

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Qualità dei dati per lavori ETL nei notebook AWS Glue Studio

In questo tutorial, imparerai a utilizzare Qualità dei dati di AWS Glue per i processi di estrazione, trasformazione e caricamento (ETL) nei notebook AWS Glue Studio.

È possibile utilizzare i notebook AWS Glue Studio per modificare gli script di processo e visualizzare l'output senza dover eseguire un processo completo. È inoltre possibile aggiungere markdown e salvare i notebook come file .ipynb e script di processo. Nota che è possibile avviare un notebook senza installare software localmente o gestire server. Quando hai completato il lavoro sul codice, potrai utilizzare AWS Glue Studio per convertire facilmente il tuo notebook in un processo AWS Glue.

Il set di dati utilizzato in questo esempio è costituito da dati di pagamento di Medicare Provider scaricati da due set di dati Data.cms.gov: «Inpatient Prospective Payment System Provider Summary for the Top 100 Diagnostis-Related Groups - 011" e «Inpatient Charge Data FY 2011". FY2

Dopo aver scaricato i dati, abbiamo apportato delle modifiche al set di dati al fine di introdurre alcuni record errati nella parte finale del file. Questo file modificato si trova in un bucket pubblico Amazon S3 in s3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv.

Prerequisiti

Creazione di un processo ETL in AWS Glue Studio

Creazione di un processo ETL
  1. Modifica la versione della sessione in AWS Glue 3.0.

    Per fare ciò, rimuovi tutte le celle di codice boilerplate con il seguente magic ed esegui la cella. Nota che questo codice boilerplate viene fornito automaticamente nella prima cella quando viene creato un nuovo notebook.

    %glue_version 3.0
  2. Copia il codice seguente e incollalo nella cella.

    import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext)
  3. Nella cella successiva, importa la classe EvaluateDataQuality che valuta Qualità dei dati di AWS Glue.

    from awsgluedq.transforms import EvaluateDataQuality
  4. Nella cella successiva, leggi i dati di origine utilizzando il file .csv archiviato nel bucket pubblico Amazon S3.

    medicare = spark.read.format( "csv").option( "header", "true").option( "inferSchema", "true").load( 's3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv') medicare.printSchema()
  5. Converti i dati AWS Glue DynamicFrame in un.

    from awsglue.dynamicframe import DynamicFrame medicare_dyf = DynamicFrame.fromDF(medicare,glueContext,"medicare_dyf")
  6. Crea il set di regole utilizzando Data Quality Definition Language (DQDL).

    EvaluateDataQuality_ruleset = """ Rules = [ ColumnExists "Provider Id", IsComplete "Provider Id", ColumnValues " Total Discharges " > 15 ] ] """
  7. Convalida il set di dati rispetto al set di regole.

    EvaluateDataQualityMultiframe = EvaluateDataQuality().process_rows( frame=medicare_dyf, ruleset=EvaluateDataQuality_ruleset, publishing_options={ "dataQualityEvaluationContext": "EvaluateDataQualityMultiframe", "enableDataQualityCloudWatchMetrics": False, "enableDataQualityResultsPublishing": False, }, additional_options={"performanceTuning.caching": "CACHE_NOTHING"}, )
  8. Rivedi i risultati.

    ruleOutcomes = SelectFromCollection.apply( dfc=EvaluateDataQualityMultiframe, key="ruleOutcomes", transformation_ctx="ruleOutcomes", ) ruleOutcomes.toDF().show(truncate=False)

    Output:

    --------------------------------------+-------+-----------------------------------------------------+-------------------------------------------+ |Rule |Outcome|FailureReason |EvaluatedMetrics | +--------------------------------------+-------+-----------------------------------------------------+-------------------------------------------+ |ColumnExists "Provider Id" |Passed |null |{} | |IsComplete "Provider Id" |Passed |null |{Column.Provider Id.Completeness -> 1.0} | |ColumnValues " Total Discharges " > 15|Failed |Value: 11.0 does not meet the constraint requirement!|{Column. Total Discharges .Minimum -> 11.0}| +--------------------------------------+-------+-----------------------------------------------------+-------------------------------------------+
  9. Filtra le righe trasmesse e rivedi le righe con errori tra i risultati a livello di riga di qualità dei dati.

    owLevelOutcomes = SelectFromCollection.apply( dfc=EvaluateDataQualityMultiframe, key="rowLevelOutcomes", transformation_ctx="rowLevelOutcomes", ) rowLevelOutcomes_df = rowLevelOutcomes.toDF() # Convert Glue DynamicFrame to SparkSQL DataFrame rowLevelOutcomes_df_passed = rowLevelOutcomes_df.filter(rowLevelOutcomes_df.DataQualityEvaluationResult == "Passed") # Filter only the Passed records. rowLevelOutcomes_df.filter(rowLevelOutcomes_df.DataQualityEvaluationResult == "Failed").show(5, truncate=False) # Review the Failed records

    Output:

    +----------------------------------------+-----------+-------------------------------------+--------------------------+-------------+--------------+-----------------+------------------------------------+------------------+-------------------------+------------------------+-------------------------+--------------------------+----------------------------------------+----------------------------+---------------------------+ |DRG Definition |Provider Id|Provider Name |Provider Street Address |Provider City|Provider State|Provider Zip Code|Hospital Referral Region Description| Total Discharges | Average Covered Charges | Average Total Payments |Average Medicare Payments|DataQualityRulesPass |DataQualityRulesFail |DataQualityRulesSkip |DataQualityEvaluationResult| +----------------------------------------+-----------+-------------------------------------+--------------------------+-------------+--------------+-----------------+------------------------------------+------------------+-------------------------+------------------------+-------------------------+--------------------------+----------------------------------------+----------------------------+---------------------------+ |039 - EXTRACRANIAL PROCEDURES W/O CC/MCC|10005 |MARSHALL MEDICAL CENTER SOUTH |2505 U S HIGHWAY 431 NORTH|BOAZ |AL |35957 |AL - Birmingham |14 |$15131.85 |$5787.57 |$4976.71 |[IsComplete "Provider Id"]|[ColumnValues " Total Discharges " > 15]|[ColumnExists "Provider Id"]|Failed | |039 - EXTRACRANIAL PROCEDURES W/O CC/MCC|10046 |RIVERVIEW REGIONAL MEDICAL CENTER |600 SOUTH THIRD STREET |GADSDEN |AL |35901 |AL - Birmingham |14 |$67327.92 |$5461.57 |$4493.57 |[IsComplete "Provider Id"]|[ColumnValues " Total Discharges " > 15]|[ColumnExists "Provider Id"]|Failed | |039 - EXTRACRANIAL PROCEDURES W/O CC/MCC|10083 |SOUTH BALDWIN REGIONAL MEDICAL CENTER|1613 NORTH MCKENZIE STREET|FOLEY |AL |36535 |AL - Mobile |15 |$25411.33 |$5282.93 |$4383.73 |[IsComplete "Provider Id"]|[ColumnValues " Total Discharges " > 15]|[ColumnExists "Provider Id"]|Failed | |039 - EXTRACRANIAL PROCEDURES W/O CC/MCC|30002 |BANNER GOOD SAMARITAN MEDICAL CENTER |1111 EAST MCDOWELL ROAD |PHOENIX |AZ |85006 |AZ - Phoenix |11 |$34803.81 |$7768.90 |$6951.45 |[IsComplete "Provider Id"]|[ColumnValues " Total Discharges " > 15]|[ColumnExists "Provider Id"]|Failed | |039 - EXTRACRANIAL PROCEDURES W/O CC/MCC|30010 |CARONDELET ST MARYS HOSPITAL |1601 WEST ST MARY'S ROAD |TUCSON |AZ |85745 |AZ - Tucson |12 |$35968.50 |$6506.50 |$5379.83 |[IsComplete "Provider Id"]|[ColumnValues " Total Discharges " > 15]|[ColumnExists "Provider Id"]|Failed | +----------------------------------------+-----------+-------------------------------------+--------------------------+-------------+--------------+-----------------+------------------------------------+------------------+-------------------------+------------------------+-------------------------+--------------------------+----------------------------------------+----------------------------+---------------------------+ only showing top 5 rows

    Nota che AWS Glue Data Quality ha aggiunto quattro nuove colonne (DataQualityRulesPass DataQualityRulesFail DataQualityRulesSkip,, e DataQualityEvaluationResult). Indica i record con esito positivo e quelli con esito negativo, le regole ignorate per la valutazione a livello di riga e i risultati complessivi a livello di riga.

  10. Scrivi l'output in un bucket Amazon S3 per analizzare i dati e visualizzare i risultati.

    #Write the Passed records to the destination. glueContext.write_dynamic_frame.from_options( frame = rowLevelOutcomes_df_passed, connection_type = "s3", connection_options = {"path": "s3://glue-sample-target/output-dir/medicare_parquet"}, format = "parquet")