AWS Glue Studio 노트북에서 ETL 작업에 대한 데이터 품질
이 자습서에서는 AWS Glue Studio 노트북에서 추출, 전환, 적재(ETL) 작업에 대해 AWS Glue Data Quality를 사용하는 방법을 알아봅니다.
AWS Glue Studio에서 노트북을 사용하여 전체 작업을 실행하지 않고도 작업 스크립트를 편집하고 출력을 볼 수 있습니다. 마크다운을 추가하고 노트북을 .ipynb
파일 및 작업 스크립트로 저장할 수도 있습니다. 소프트웨어를 로컬로 설치하거나 서버를 관리하지 않고도 노트북을 시작할 수 있습니다. 코드가 만족스러우면 AWS Glue Studio를 사용하여 노트북을 AWS Glue 작업으로 쉽게 전환할 수 있습니다.
이 예제에서 사용된 데이터 세트는 두 Data.CMS.gov 데이터 세트('Inpatient Prospective Payment System Provider Summary for the Top 100 Diagnosis-Related Groups - FY2011' 및 'Inpatient Charge Data FY 2011')에서 다운로드한 Medicare Provider 지불 데이터로 구성됩니다.
데이터 다운로드 후 데이터 집합을 수정하여 파일 끝에 몇 가지 잘못된 기록을 소개합니다. 이 수정된 파일은 s3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv
의 퍼블릭 Amazon S3 버킷에 있습니다.
사전 조건
-
대상 Amazon S3 버킷에 대한 Amazon S3 쓰기 권한이 있는 AWS Glue 역할
-
새 노트북(AWS Glue Studio에서 노트북 시작하기 참조)
AWS Glue Studio에서 ETL 작업 생성
ETL 작업을 생성하려면
-
세션 버전을 AWS Glue 3.0으로 변경합니다.
이렇게 하려면 다음과 같은 매직으로 모든 표준 문안 코드 셀을 제거하고 셀을 실행합니다. 이 표준 문안 코드는 새 노트북이 생성되면 첫 번째 셀에 자동으로 제공됩니다.
%glue_version 3.0
-
다음 코드를 복사하여 셀에서 실행합니다.
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext)
-
다음 셀에서는 AWS Glue Data Quality를 평가하는
EvaluateDataQuality
클래스를 가져옵니다.from awsgluedq.transforms import EvaluateDataQuality
-
다음 셀에서는 퍼블릭 Amazon S3 버킷에 저장된.csv 파일을 사용하여 소스 데이터를 읽습니다.
medicare = spark.read.format( "csv").option( "header", "true").option( "inferSchema", "true").load( 's3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv') medicare.printSchema()
-
데이터를 AWS Glue DynamicFrame으로 변환합니다.
from awsglue.dynamicframe import DynamicFrame medicare_dyf = DynamicFrame.fromDF(medicare,glueContext,"medicare_dyf")
-
데이터 품질 정의 언어(DQDL)를 사용하여 규칙 세트를 생성합니다.
EvaluateDataQuality_ruleset = """ Rules = [ ColumnExists "Provider Id", IsComplete "Provider Id", ColumnValues " Total Discharges " > 15 ] ] """
-
규칙 세트를 기준으로 데이터 세트를 검증합니다.
EvaluateDataQualityMultiframe = EvaluateDataQuality().process_rows( frame=medicare_dyf, ruleset=EvaluateDataQuality_ruleset, publishing_options={ "dataQualityEvaluationContext": "EvaluateDataQualityMultiframe", "enableDataQualityCloudWatchMetrics": False, "enableDataQualityResultsPublishing": False, }, additional_options={"performanceTuning.caching": "CACHE_NOTHING"}, )
-
결과를 검토합니다.
ruleOutcomes = SelectFromCollection.apply( dfc=EvaluateDataQualityMultiframe, key="ruleOutcomes", transformation_ctx="ruleOutcomes", ) ruleOutcomes.toDF().show(truncate=False)
출력:
--------------------------------------+-------+-----------------------------------------------------+-------------------------------------------+ |Rule |Outcome|FailureReason |EvaluatedMetrics | +--------------------------------------+-------+-----------------------------------------------------+-------------------------------------------+ |ColumnExists "Provider Id" |Passed |null |{} | |IsComplete "Provider Id" |Passed |null |{Column.Provider Id.Completeness -> 1.0} | |ColumnValues " Total Discharges " > 15|Failed |Value: 11.0 does not meet the constraint requirement!|{Column. Total Discharges .Minimum -> 11.0}| +--------------------------------------+-------+-----------------------------------------------------+-------------------------------------------+
-
Data Quality 행 수준 결과에서 통과된 행을 필터링하고 실패한 행을 검토합니다.
owLevelOutcomes = SelectFromCollection.apply( dfc=EvaluateDataQualityMultiframe, key="rowLevelOutcomes", transformation_ctx="rowLevelOutcomes", ) rowLevelOutcomes_df = rowLevelOutcomes.toDF() # Convert Glue DynamicFrame to SparkSQL DataFrame rowLevelOutcomes_df_passed = rowLevelOutcomes_df.filter(rowLevelOutcomes_df.DataQualityEvaluationResult == "Passed") # Filter only the Passed records. rowLevelOutcomes_df.filter(rowLevelOutcomes_df.DataQualityEvaluationResult == "Failed").show(5, truncate=False) # Review the Failed records
출력:
+----------------------------------------+-----------+-------------------------------------+--------------------------+-------------+--------------+-----------------+------------------------------------+------------------+-------------------------+------------------------+-------------------------+--------------------------+----------------------------------------+----------------------------+---------------------------+ |DRG Definition |Provider Id|Provider Name |Provider Street Address |Provider City|Provider State|Provider Zip Code|Hospital Referral Region Description| Total Discharges | Average Covered Charges | Average Total Payments |Average Medicare Payments|DataQualityRulesPass |DataQualityRulesFail |DataQualityRulesSkip |DataQualityEvaluationResult| +----------------------------------------+-----------+-------------------------------------+--------------------------+-------------+--------------+-----------------+------------------------------------+------------------+-------------------------+------------------------+-------------------------+--------------------------+----------------------------------------+----------------------------+---------------------------+ |039 - EXTRACRANIAL PROCEDURES W/O CC/MCC|10005 |MARSHALL MEDICAL CENTER SOUTH |2505 U S HIGHWAY 431 NORTH|BOAZ |AL |35957 |AL - Birmingham |14 |$15131.85 |$5787.57 |$4976.71 |[IsComplete "Provider Id"]|[ColumnValues " Total Discharges " > 15]|[ColumnExists "Provider Id"]|Failed | |039 - EXTRACRANIAL PROCEDURES W/O CC/MCC|10046 |RIVERVIEW REGIONAL MEDICAL CENTER |600 SOUTH THIRD STREET |GADSDEN |AL |35901 |AL - Birmingham |14 |$67327.92 |$5461.57 |$4493.57 |[IsComplete "Provider Id"]|[ColumnValues " Total Discharges " > 15]|[ColumnExists "Provider Id"]|Failed | |039 - EXTRACRANIAL PROCEDURES W/O CC/MCC|10083 |SOUTH BALDWIN REGIONAL MEDICAL CENTER|1613 NORTH MCKENZIE STREET|FOLEY |AL |36535 |AL - Mobile |15 |$25411.33 |$5282.93 |$4383.73 |[IsComplete "Provider Id"]|[ColumnValues " Total Discharges " > 15]|[ColumnExists "Provider Id"]|Failed | |039 - EXTRACRANIAL PROCEDURES W/O CC/MCC|30002 |BANNER GOOD SAMARITAN MEDICAL CENTER |1111 EAST MCDOWELL ROAD |PHOENIX |AZ |85006 |AZ - Phoenix |11 |$34803.81 |$7768.90 |$6951.45 |[IsComplete "Provider Id"]|[ColumnValues " Total Discharges " > 15]|[ColumnExists "Provider Id"]|Failed | |039 - EXTRACRANIAL PROCEDURES W/O CC/MCC|30010 |CARONDELET ST MARYS HOSPITAL |1601 WEST ST MARY'S ROAD |TUCSON |AZ |85745 |AZ - Tucson |12 |$35968.50 |$6506.50 |$5379.83 |[IsComplete "Provider Id"]|[ColumnValues " Total Discharges " > 15]|[ColumnExists "Provider Id"]|Failed | +----------------------------------------+-----------+-------------------------------------+--------------------------+-------------+--------------+-----------------+------------------------------------+------------------+-------------------------+------------------------+-------------------------+--------------------------+----------------------------------------+----------------------------+---------------------------+ only showing top 5 rows
AWS Glue Data Quality에서는 새 열 네 개(DataQualityRulesPass, DataQualityRulesFail, DataQualityRulesSkip, DataQualityEvaluationResult)를 추가했습니다. 이는 통과한 레코드, 실패한 레코드, 행 수준 평가에서 건너뛴 규칙, 전체 행 수준 결과를 나타냅니다.
-
출력을 Amazon S3 버킷에 기록하여 데이터를 분석하고 결과를 시각화합니다.
#Write the Passed records to the destination. glueContext.write_dynamic_frame.from_options( frame = rowLevelOutcomes_df_passed, connection_type = "s3", connection_options = {"path": "s3://glue-sample-target/output-dir/medicare_parquet"}, format = "parquet")