Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.
Der Datensatz, der in diesem Beispiel verwendet wird, besteht aus Zahlungsdaten des Medicare-Anbieters, die aus zwei Data.cms.gov-Datensätzens3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv
gespeichert.
Den Quellcode für dieses Beispiel finden Sie in der Datei im data_cleaning_and_lambda.py
AWS Glue
Die bevorzugte Methode zum Debuggen von Python oder PySpark Skripten während der Ausführung AWS ist die Verwendung von Notebooks auf AWS Glue Studio.
Schritt 1: Crawlen der Daten im Amazon S3 Bucket
Melden Sie sich bei der an AWS Management Console und öffnen Sie die AWS Glue Konsole bei https://console.aws.amazon.com/glue/
. -
Erstellen Sie nach dem unter beschriebenen Prozess einen neuen Crawler, der die
s3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv
Datei crawlen und die resultierenden Metadaten in einer Datenbank platzieren kann, diepayments
im AWS Glue-Datenkatalog benannt ist. Konfiguration eines Crawlers -
Führen Sie den neuen Crawler aus und überprüfen Sie die
payments
-Datenbank. Sie sollten feststellen, dass der Crawler eine Metadatentabelle mit Namenmedicare
in der Datenbank angelegt hat, nachdem er den Anfang der Datei gelesen hat, um ihr Format und Trennzeichen zu bestimmen.Das Schema der neuen
medicare
-Tabelle sieht wie folgt aus:Column name Data type ================================================== drg definition string provider id bigint provider name string provider street address string provider city string provider state string provider zip code bigint hospital referral region description string total discharges bigint average covered charges string average total payments string average medicare payments string
Schritt 2: Hinzufügen des Boilerplate-Skripts zum Entwicklungsendpunkt-Notebook
Fügen Sie das folgende Standardskript in das Entwicklungsendpunkt-Notizbuch ein, um Folgendes zu importieren AWS Glue Bibliotheken, die Sie benötigen, und richten Sie eine einzelne ein: GlueContext
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
glueContext = GlueContext(SparkContext.getOrCreate())
Schritt 3: Vergleichen der verschiedene Schema Parsings
Als Nächstes können Sie sehen, ob das Schema, das von einem Apache Spark erkannt wurde, mit dem Schema identisch DataFrame
ist, das Sie AWS Glue Crawler aufgezeichnet. Führen Sie diesen Code aus:
medicare = spark.read.format(
"com.databricks.spark.csv").option(
"header", "true").option(
"inferSchema", "true").load(
's3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv')
medicare.printSchema()
Hier ist die Ausgabe des printSchema
-Aufrufs:
root
|-- DRG Definition: string (nullable = true)
|-- Provider Id: string (nullable = true)
|-- Provider Name: string (nullable = true)
|-- Provider Street Address: string (nullable = true)
|-- Provider City: string (nullable = true)
|-- Provider State: string (nullable = true)
|-- Provider Zip Code: integer (nullable = true)
|-- Hospital Referral Region Description: string (nullable = true)
|-- Total Discharges : integer (nullable = true)
|-- Average Covered Charges : string (nullable = true)
|-- Average Total Payments : string (nullable = true)
|-- Average Medicare Payments: string (nullable = true)
Schauen Sie sich als Nächstes das Schema an, das ein AWS Glue DynamicFrame
generiert:
medicare_dynamicframe = glueContext.create_dynamic_frame.from_catalog(
database = "payments",
table_name = "medicare")
medicare_dynamicframe.printSchema()
Die Ausgabe von printSchema
sieht wie folgt aus:
root
|-- drg definition: string
|-- provider id: choice
| |-- long
| |-- string
|-- provider name: string
|-- provider street address: string
|-- provider city: string
|-- provider state: string
|-- provider zip code: long
|-- hospital referral region description: string
|-- total discharges: long
|-- average covered charges: string
|-- average total payments: string
|-- average medicare payments: string
Das DynamicFrame
generiert ein Schema, in dem provider id
entweder ein long
oder ein string
Typ sein kann. Das DataFrame
-Schema listet Provider Id
als string
-Typ auf und der Data Catalog listet provider id
als bigint
-Typ auf.
Welches ist richtig? Es gibt zwei Datensätze am Ende der Datei (von 160.000 Datensätzen) mit string
-Werten in dieser Spalte. Dies sind die fehlerhaften Datensätze, die eingefügt wurden, um ein Problem zu veranschaulichen.
Um diese Art von Problem zu lösen, AWS Glue DynamicFrame
führt das Konzept eines Auswahltyps ein. In diesem Fall zeigt das DynamicFrame
, dass sowohl long
als auch string
Werte in dieser Spalte erscheinen können. Das Tool AWS Glue Der Crawler hat die string
Werte übersehen, weil er nur ein 2-MB-Präfix der Daten berücksichtigt hat. Der Apache Spark DataFrame
berücksichtigt den gesamten Datensatz. Er wurde jedoch gezwungen, der Spalte den allgemeinsten Typ zuzuweisen (string
). Tatsächlich greift Spark oft auf den allgemeinsten Fall zurück, wenn es komplexe Typen oder Variationen gibt, mit denen es nicht vertraut ist.
Um die provider id
-Spalte abzufragen, lösen Sie zuerst den Auswahltyp auf. Sie können die resolveChoice
-Transformationsmethode in Ihrem DynamicFrame
verwenden, um diese string
Werte in long
Werte mit einer cast:long
Option umzuwandeln:
medicare_res = medicare_dynamicframe.resolveChoice(specs = [('provider id','cast:long')])
medicare_res.printSchema()
Die printSchema
-Ausgabe ist jetzt:
root
|-- drg definition: string
|-- provider id: long
|-- provider name: string
|-- provider street address: string
|-- provider city: string
|-- provider state: string
|-- provider zip code: long
|-- hospital referral region description: string
|-- total discharges: long
|-- average covered charges: string
|-- average total payments: string
|-- average medicare payments: string
Wo der Wert a warstring
, der nicht umgewandelt werden konnte, AWS Glue hat ein eingefügtnull
.
Eine weitere Möglichkeit besteht darin, den Auswahltyp in ein struct
umzuwandeln, das die Werte beider Typen beibehält.
Als nächstes schauen Sie sich die Zeilen an, die nicht normal waren:
medicare_res.toDF().where("'provider id' is NULL").show()
Sie sehen folgendes:
+--------------------+-----------+---------------+-----------------------+-------------+--------------+-----------------+------------------------------------+----------------+-----------------------+----------------------+-------------------------+
| drg definition|provider id| provider name|provider street address|provider city|provider state|provider zip code|hospital referral region description|total discharges|average covered charges|average total payments|average medicare payments|
+--------------------+-----------+---------------+-----------------------+-------------+--------------+-----------------+------------------------------------+----------------+-----------------------+----------------------+-------------------------+
|948 - SIGNS & SYM...| null| INC| 1050 DIVISION ST| MAUSTON| WI| 53948| WI - Madison| 12| $11961.41| $4619.00| $3775.33|
|948 - SIGNS & SYM...| null| INC- ST JOSEPH| 5000 W CHAMBERS ST| MILWAUKEE| WI| 53210| WI - Milwaukee| 14| $10514.28| $5562.50| $4522.78|
+--------------------+-----------+---------------+-----------------------+-------------+--------------+-----------------+------------------------------------+----------------+-----------------------+----------------------+-------------------------+
Entfernen Sie nun die beiden fehlerhaften Datensätze wie folgt:
medicare_dataframe = medicare_res.toDF()
medicare_dataframe = medicare_dataframe.where("'provider id' is NOT NULL")
Schritt 4: Zuordnen der Daten und Nutzung der Apache Spark Lambda Funktionen
AWS Glue unterstützt Lambda-Funktionen, auch bekannt als benutzerdefinierte Funktionen, noch nicht direkt. Aber Sie können jederzeit ein DynamicFrame
zu und von Apache Spark DataFrame
konvertieren, um die Vorteile der Spark-Funktionalität zusätzlich zu den speziellen Features von DynamicFrames
zu nutzen.
Als nächstes wandeln Sie die Zahlungsinformationen in Zahlen um, sodass Analytik-Engines wie Amazon Redshift oder Amazon Athena ihre Zahlen schneller verarbeiten können:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
chop_f = udf(lambda x: x[1:], StringType())
medicare_dataframe = medicare_dataframe.withColumn(
"ACC", chop_f(
medicare_dataframe["average covered charges"])).withColumn(
"ATP", chop_f(
medicare_dataframe["average total payments"])).withColumn(
"AMP", chop_f(
medicare_dataframe["average medicare payments"]))
medicare_dataframe.select(['ACC', 'ATP', 'AMP']).show()
Die Ausgabe des show
-Aufrufs sieht wie folgt aus:
+--------+-------+-------+
| ACC| ATP| AMP|
+--------+-------+-------+
|32963.07|5777.24|4763.73|
|15131.85|5787.57|4976.71|
|37560.37|5434.95|4453.79|
|13998.28|5417.56|4129.16|
|31633.27|5658.33|4851.44|
|16920.79|6653.80|5374.14|
|11977.13|5834.74|4761.41|
|35841.09|8031.12|5858.50|
|28523.39|6113.38|5228.40|
|75233.38|5541.05|4386.94|
|67327.92|5461.57|4493.57|
|39607.28|5356.28|4408.20|
|22862.23|5374.65|4186.02|
|31110.85|5366.23|4376.23|
|25411.33|5282.93|4383.73|
| 9234.51|5676.55|4509.11|
|15895.85|5930.11|3972.85|
|19721.16|6192.54|5179.38|
|10710.88|4968.00|3898.88|
|51343.75|5996.00|4962.45|
+--------+-------+-------+
only showing top 20 rows
Das sind alles Zeichenfolgen in den Daten. Wir können die leistungsstarke apply_mapping
-Transformationsmethode verwenden, um die Daten zu löschen, umzubenennen, zu casten und zu verschachteln, sodass andere Datenprogrammiersprachen und -systeme leicht darauf zugreifen können:
from awsglue.dynamicframe import DynamicFrame
medicare_tmp_dyf = DynamicFrame.fromDF(medicare_dataframe, glueContext, "nested")
medicare_nest_dyf = medicare_tmp_dyf.apply_mapping([('drg definition', 'string', 'drg', 'string'),
('provider id', 'long', 'provider.id', 'long'),
('provider name', 'string', 'provider.name', 'string'),
('provider city', 'string', 'provider.city', 'string'),
('provider state', 'string', 'provider.state', 'string'),
('provider zip code', 'long', 'provider.zip', 'long'),
('hospital referral region description', 'string','rr', 'string'),
('ACC', 'string', 'charges.covered', 'double'),
('ATP', 'string', 'charges.total_pay', 'double'),
('AMP', 'string', 'charges.medicare_pay', 'double')])
medicare_nest_dyf.printSchema()
Die printSchema
-Ausgabe sieht wie folgt aus:
root
|-- drg: string
|-- provider: struct
| |-- id: long
| |-- name: string
| |-- city: string
| |-- state: string
| |-- zip: long
|-- rr: string
|-- charges: struct
| |-- covered: double
| |-- total_pay: double
| |-- medicare_pay: double
Wenn Sie die Daten wieder in ein Spark DataFrame
verwandeln, können Sie sehen, wie sie jetzt aussehen:
medicare_nest_dyf.toDF().show()
Die Ausgabe sieht wie folgt aus:
+--------------------+--------------------+---------------+--------------------+
| drg| provider| rr| charges|
+--------------------+--------------------+---------------+--------------------+
|039 - EXTRACRANIA...|[10001,SOUTHEAST ...| AL - Dothan|[32963.07,5777.24...|
|039 - EXTRACRANIA...|[10005,MARSHALL M...|AL - Birmingham|[15131.85,5787.57...|
|039 - EXTRACRANIA...|[10006,ELIZA COFF...|AL - Birmingham|[37560.37,5434.95...|
|039 - EXTRACRANIA...|[10011,ST VINCENT...|AL - Birmingham|[13998.28,5417.56...|
|039 - EXTRACRANIA...|[10016,SHELBY BAP...|AL - Birmingham|[31633.27,5658.33...|
|039 - EXTRACRANIA...|[10023,BAPTIST ME...|AL - Montgomery|[16920.79,6653.8,...|
|039 - EXTRACRANIA...|[10029,EAST ALABA...|AL - Birmingham|[11977.13,5834.74...|
|039 - EXTRACRANIA...|[10033,UNIVERSITY...|AL - Birmingham|[35841.09,8031.12...|
|039 - EXTRACRANIA...|[10039,HUNTSVILLE...|AL - Huntsville|[28523.39,6113.38...|
|039 - EXTRACRANIA...|[10040,GADSDEN RE...|AL - Birmingham|[75233.38,5541.05...|
|039 - EXTRACRANIA...|[10046,RIVERVIEW ...|AL - Birmingham|[67327.92,5461.57...|
|039 - EXTRACRANIA...|[10055,FLOWERS HO...| AL - Dothan|[39607.28,5356.28...|
|039 - EXTRACRANIA...|[10056,ST VINCENT...|AL - Birmingham|[22862.23,5374.65...|
|039 - EXTRACRANIA...|[10078,NORTHEAST ...|AL - Birmingham|[31110.85,5366.23...|
|039 - EXTRACRANIA...|[10083,SOUTH BALD...| AL - Mobile|[25411.33,5282.93...|
|039 - EXTRACRANIA...|[10085,DECATUR GE...|AL - Huntsville|[9234.51,5676.55,...|
|039 - EXTRACRANIA...|[10090,PROVIDENCE...| AL - Mobile|[15895.85,5930.11...|
|039 - EXTRACRANIA...|[10092,D C H REGI...|AL - Tuscaloosa|[19721.16,6192.54...|
|039 - EXTRACRANIA...|[10100,THOMAS HOS...| AL - Mobile|[10710.88,4968.0,...|
|039 - EXTRACRANIA...|[10103,BAPTIST ME...|AL - Birmingham|[51343.75,5996.0,...|
+--------------------+--------------------+---------------+--------------------+
only showing top 20 rows
Schritt 5: Schreiben der Daten in Apache Parquet
AWS Glue macht es einfach, die Daten in einem Format wie Apache Parquet zu schreiben, das relationale Datenbanken effektiv nutzen können:
glueContext.write_dynamic_frame.from_options(
frame = medicare_nest_dyf,
connection_type = "s3",
connection_options = {"path": "s3://glue-sample-target/output-dir/medicare_parquet"},
format = "parquet")