AWS Glue Studio 笔记本中 ETL 作业的数据质量
在本教程中,您将学习如何使用 AWS Glue Data Quality 来处理 AWS Glue Studio 笔记本中的提取、转换、加载(ETL)作业。
您可以在 AWS Glue Studio 中使用笔记本编辑作业脚本并查看输出,而不必运行完整作业。您还可以添加 Markdown 并将笔记本另存为 .ipynb
文件和作业脚本。请注意:您可以在无需本地安装软件或管理服务器的情况下开启笔记本。当您对自己的代码感到满意时,可以使用 AWS Glue Studio 轻松地将笔记本转换为 AWS Glue 作业。
此示例使用的数据集包括从两个 Data.CMS.gov 数据集下载的医疗保健提供商付款数据:"Inpatient Prospective Payment System Provider Summary for the Top 100 Diagnosis-Related Groups - FY2011" 和 "Inpatient Charge Data FY 2011"。
下载该数据后,我们修改了数据集,以在文件末尾引入了几个错误的记录。这个修改过的文件位于 s3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv
上的公有 Amazon S3 存储桶。
先决条件
-
具有 Amazon S3 权限的 AWS Glue 角色可以写入您的目标 Amazon S3 存储桶
-
一个新的笔记本(请参阅 Getting started with notebooks in AWS Glue Studio)
在 AWS Glue Studio 中创建 ETL 作业
创建 ETL 作业
-
将会话版本更改为 AWS Glue 3.0。
为此,请使用以下魔术命令删除所有样板代码单元格,然后运行该单元格。请注意,创建新笔记本时,此样板代码会自动在第一个单元格中提供。
%glue_version 3.0
-
将以下代码复制并粘贴到单元格中。
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext)
-
在下一个单元格中,导入评估 AWS Glue Data Quality 的
EvaluateDataQuality
类。from awsgluedq.transforms import EvaluateDataQuality
-
在下一个单元格中,使用存储在公共 Amazon S3 存储桶中的 .csv 文件读入源数据。
medicare = spark.read.format( "csv").option( "header", "true").option( "inferSchema", "true").load( 's3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv') medicare.printSchema()
-
将数据转换为 AWS Glue DynamicFrame。
from awsglue.dynamicframe import DynamicFrame medicare_dyf = DynamicFrame.fromDF(medicare,glueContext,"medicare_dyf")
-
使用数据质量定义语言(DQDL)创建规则集。
EvaluateDataQuality_ruleset = """ Rules = [ ColumnExists "Provider Id", IsComplete "Provider Id", ColumnValues " Total Discharges " > 15 ] ] """
-
根据规则集验证数据集。
EvaluateDataQualityMultiframe = EvaluateDataQuality().process_rows( frame=medicare_dyf, ruleset=EvaluateDataQuality_ruleset, publishing_options={ "dataQualityEvaluationContext": "EvaluateDataQualityMultiframe", "enableDataQualityCloudWatchMetrics": False, "enableDataQualityResultsPublishing": False, }, additional_options={"performanceTuning.caching": "CACHE_NOTHING"}, )
-
查看结果。
ruleOutcomes = SelectFromCollection.apply( dfc=EvaluateDataQualityMultiframe, key="ruleOutcomes", transformation_ctx="ruleOutcomes", ) ruleOutcomes.toDF().show(truncate=False)
输出:
--------------------------------------+-------+-----------------------------------------------------+-------------------------------------------+ |Rule |Outcome|FailureReason |EvaluatedMetrics | +--------------------------------------+-------+-----------------------------------------------------+-------------------------------------------+ |ColumnExists "Provider Id" |Passed |null |{} | |IsComplete "Provider Id" |Passed |null |{Column.Provider Id.Completeness -> 1.0} | |ColumnValues " Total Discharges " > 15|Failed |Value: 11.0 does not meet the constraint requirement!|{Column. Total Discharges .Minimum -> 11.0}| +--------------------------------------+-------+-----------------------------------------------------+-------------------------------------------+
-
筛选通过的行并查看数据质量行级结果中的失败行。
owLevelOutcomes = SelectFromCollection.apply( dfc=EvaluateDataQualityMultiframe, key="rowLevelOutcomes", transformation_ctx="rowLevelOutcomes", ) rowLevelOutcomes_df = rowLevelOutcomes.toDF() # Convert Glue DynamicFrame to SparkSQL DataFrame rowLevelOutcomes_df_passed = rowLevelOutcomes_df.filter(rowLevelOutcomes_df.DataQualityEvaluationResult == "Passed") # Filter only the Passed records. rowLevelOutcomes_df.filter(rowLevelOutcomes_df.DataQualityEvaluationResult == "Failed").show(5, truncate=False) # Review the Failed records
输出:
+----------------------------------------+-----------+-------------------------------------+--------------------------+-------------+--------------+-----------------+------------------------------------+------------------+-------------------------+------------------------+-------------------------+--------------------------+----------------------------------------+----------------------------+---------------------------+ |DRG Definition |Provider Id|Provider Name |Provider Street Address |Provider City|Provider State|Provider Zip Code|Hospital Referral Region Description| Total Discharges | Average Covered Charges | Average Total Payments |Average Medicare Payments|DataQualityRulesPass |DataQualityRulesFail |DataQualityRulesSkip |DataQualityEvaluationResult| +----------------------------------------+-----------+-------------------------------------+--------------------------+-------------+--------------+-----------------+------------------------------------+------------------+-------------------------+------------------------+-------------------------+--------------------------+----------------------------------------+----------------------------+---------------------------+ |039 - EXTRACRANIAL PROCEDURES W/O CC/MCC|10005 |MARSHALL MEDICAL CENTER SOUTH |2505 U S HIGHWAY 431 NORTH|BOAZ |AL |35957 |AL - Birmingham |14 |$15131.85 |$5787.57 |$4976.71 |[IsComplete "Provider Id"]|[ColumnValues " Total Discharges " > 15]|[ColumnExists "Provider Id"]|Failed | |039 - EXTRACRANIAL PROCEDURES W/O CC/MCC|10046 |RIVERVIEW REGIONAL MEDICAL CENTER |600 SOUTH THIRD STREET |GADSDEN |AL |35901 |AL - Birmingham |14 |$67327.92 |$5461.57 |$4493.57 |[IsComplete "Provider Id"]|[ColumnValues " Total Discharges " > 15]|[ColumnExists "Provider Id"]|Failed | |039 - EXTRACRANIAL PROCEDURES W/O CC/MCC|10083 |SOUTH BALDWIN REGIONAL MEDICAL CENTER|1613 NORTH MCKENZIE STREET|FOLEY |AL |36535 |AL - Mobile |15 |$25411.33 |$5282.93 |$4383.73 |[IsComplete "Provider Id"]|[ColumnValues " Total Discharges " > 15]|[ColumnExists "Provider Id"]|Failed | |039 - EXTRACRANIAL PROCEDURES W/O CC/MCC|30002 |BANNER GOOD SAMARITAN MEDICAL CENTER |1111 EAST MCDOWELL ROAD |PHOENIX |AZ |85006 |AZ - Phoenix |11 |$34803.81 |$7768.90 |$6951.45 |[IsComplete "Provider Id"]|[ColumnValues " Total Discharges " > 15]|[ColumnExists "Provider Id"]|Failed | |039 - EXTRACRANIAL PROCEDURES W/O CC/MCC|30010 |CARONDELET ST MARYS HOSPITAL |1601 WEST ST MARY'S ROAD |TUCSON |AZ |85745 |AZ - Tucson |12 |$35968.50 |$6506.50 |$5379.83 |[IsComplete "Provider Id"]|[ColumnValues " Total Discharges " > 15]|[ColumnExists "Provider Id"]|Failed | +----------------------------------------+-----------+-------------------------------------+--------------------------+-------------+--------------+-----------------+------------------------------------+------------------+-------------------------+------------------------+-------------------------+--------------------------+----------------------------------------+----------------------------+---------------------------+ only showing top 5 rows
请注意,AWS Glue Data Quality 增加了四个新列(DataQualityRulesPass、DataQualityRulesFail、DataQualityRulesSkip 和 DataQualityEvaluationResult)。这表示已通过的记录、失败的记录、跳过的行级评估规则以及总体行级结果。
-
将输出写入 Amazon S3 存储桶,以分析数据并可视化结果。
#Write the Passed records to the destination. glueContext.write_dynamic_frame.from_options( frame = rowLevelOutcomes_df_passed, connection_type = "s3", connection_options = {"path": "s3://glue-sample-target/output-dir/medicare_parquet"}, format = "parquet")