Codebeispiel: Datenvorbereitung mit ResolveChoice, Lambda und ApplyMapping - AWS Glue

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Codebeispiel: Datenvorbereitung mit ResolveChoice, Lambda und ApplyMapping

Der in diesem Beispiel verwendete Datensatz besteht aus Zahlungsdaten von Gesundheitsdienstleistern, die von zwei Data.CMS.gov-Datensätzen heruntergeladen wurden: Inpatient Prospective Payment System Provider Summary for the Top 100 Diagnosis-Related Groups – FY2011 und Inpatient Charge Data FY 2011. Nach dem Download änderten wir die Daten derart, dass wir einige fehlerhafte Datensätze am Ende der Datei hinzufügten. Diese geänderte Datei ist in einem öffentlichen Amazon S3 Bucket unter s3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv gespeichert.

Den Quellcode für dieses Beispiel finden Sie in der data_cleaning_and_lambda.py Datei im AWS Glue GitHub Beispiel-Repository.

Die bevorzugte Methode zum Debuggen von Python oder PySpark Skripten während der Ausführung AWS ist die Verwendung von Notebooks auf AWS Glue Studio.

Schritt 1: Crawlen der Daten im Amazon S3 Bucket

  1. Melden Sie sich bei der an AWS Management Console und öffnen Sie die AWS Glue Konsole unter https://console.aws.amazon.com/glue/.

  2. Erstellen Sie nach dem unter beschriebenen Prozess einen neuen Crawler, der die s3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv Datei crawlen und die resultierenden Metadaten in einer Datenbank platzieren kann, die payments im AWS Glue-Datenkatalog benannt ist. Konfiguration eines Crawlers

  3. Führen Sie den neuen Crawler aus und überprüfen Sie die payments-Datenbank. Sie sollten feststellen, dass der Crawler eine Metadatentabelle mit Namen medicare in der Datenbank angelegt hat, nachdem er den Anfang der Datei gelesen hat, um ihr Format und Trennzeichen zu bestimmen.

    Das Schema der neuen medicare-Tabelle sieht wie folgt aus:

    Column name Data type ================================================== drg definition string provider id bigint provider name string provider street address string provider city string provider state string provider zip code bigint hospital referral region description string total discharges bigint average covered charges string average total payments string average medicare payments string

Schritt 2: Hinzufügen des Boilerplate-Skripts zum Entwicklungsendpunkt-Notebook

Fügen Sie das folgende Boilerplate-Skript in das Entwicklungsendpunkt-Notebook ein, um die gewünschten AWS Glue-Bibliotheken zu importieren, und richten Sie einen einzelnen GlueContext ein:

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job glueContext = GlueContext(SparkContext.getOrCreate())

Schritt 3: Vergleichen der verschiedene Schema Parsings

Als nächstes können Sie sehen, ob das Schema, das von einem Apache Spark DataFrame erkannt wurde, mit dem Schema übereinstimmt, das Ihr AWS Glue-Crawler aufgezeichnet hat. Führen Sie diesen Code aus:

medicare = spark.read.format( "com.databricks.spark.csv").option( "header", "true").option( "inferSchema", "true").load( 's3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv') medicare.printSchema()

Hier ist die Ausgabe des printSchema-Aufrufs:

root |-- DRG Definition: string (nullable = true) |-- Provider Id: string (nullable = true) |-- Provider Name: string (nullable = true) |-- Provider Street Address: string (nullable = true) |-- Provider City: string (nullable = true) |-- Provider State: string (nullable = true) |-- Provider Zip Code: integer (nullable = true) |-- Hospital Referral Region Description: string (nullable = true) |-- Total Discharges : integer (nullable = true) |-- Average Covered Charges : string (nullable = true) |-- Average Total Payments : string (nullable = true) |-- Average Medicare Payments: string (nullable = true)

Als nächstes schauen Sie sich das Schema an, das ein AWS Glue-DynamicFrame generiert:

medicare_dynamicframe = glueContext.create_dynamic_frame.from_catalog( database = "payments", table_name = "medicare") medicare_dynamicframe.printSchema()

Die Ausgabe von printSchema sieht wie folgt aus:

root |-- drg definition: string |-- provider id: choice | |-- long | |-- string |-- provider name: string |-- provider street address: string |-- provider city: string |-- provider state: string |-- provider zip code: long |-- hospital referral region description: string |-- total discharges: long |-- average covered charges: string |-- average total payments: string |-- average medicare payments: string

Das DynamicFrame generiert ein Schema, in dem provider id entweder ein long oder ein string Typ sein kann. Das DataFrame-Schema listet Provider Id als string-Typ auf und der Data Catalog listet provider id als bigint-Typ auf.

Welches ist richtig? Es gibt zwei Datensätze am Ende der Datei (von 160.000 Datensätzen) mit string-Werten in dieser Spalte. Dies sind die fehlerhaften Datensätze, die eingefügt wurden, um ein Problem zu veranschaulichen.

Um diese Art von Problem zu lösen, führt AWS Glue-DynamicFrame das Konzept eines Auswahltyps ein. In diesem Fall zeigt das DynamicFrame, dass sowohl long als auch string Werte in dieser Spalte erscheinen können. Der AWS Glue-Crawler verpasste die string-Werte, da er nur die ersten 2 MB der Daten berücksichtigt. Der Apache Spark DataFrame berücksichtigt den gesamten Datensatz. Er wurde jedoch gezwungen, der Spalte den allgemeinsten Typ zuzuweisen (string). Tatsächlich greift Spark oft auf den allgemeinsten Fall zurück, wenn es komplexe Typen oder Variationen gibt, mit denen es nicht vertraut ist.

Um die provider id-Spalte abzufragen, lösen Sie zuerst den Auswahltyp auf. Sie können die resolveChoice-Transformationsmethode in Ihrem DynamicFrame verwenden, um diese string Werte in long Werte mit einer cast:long Option umzuwandeln:

medicare_res = medicare_dynamicframe.resolveChoice(specs = [('provider id','cast:long')]) medicare_res.printSchema()

Die printSchema-Ausgabe ist jetzt:

root |-- drg definition: string |-- provider id: long |-- provider name: string |-- provider street address: string |-- provider city: string |-- provider state: string |-- provider zip code: long |-- hospital referral region description: string |-- total discharges: long |-- average covered charges: string |-- average total payments: string |-- average medicare payments: string

Wo der Wert ein string war, das nicht gecastet werden konnte, fügt AWS Glue ein null ein.

Eine weitere Möglichkeit besteht darin, den Auswahltyp in ein struct umzuwandeln, das die Werte beider Typen beibehält.

Als nächstes schauen Sie sich die Zeilen an, die nicht normal waren:

medicare_res.toDF().where("'provider id' is NULL").show()

Sie sehen folgendes:

+--------------------+-----------+---------------+-----------------------+-------------+--------------+-----------------+------------------------------------+----------------+-----------------------+----------------------+-------------------------+ | drg definition|provider id| provider name|provider street address|provider city|provider state|provider zip code|hospital referral region description|total discharges|average covered charges|average total payments|average medicare payments| +--------------------+-----------+---------------+-----------------------+-------------+--------------+-----------------+------------------------------------+----------------+-----------------------+----------------------+-------------------------+ |948 - SIGNS & SYM...| null| INC| 1050 DIVISION ST| MAUSTON| WI| 53948| WI - Madison| 12| $11961.41| $4619.00| $3775.33| |948 - SIGNS & SYM...| null| INC- ST JOSEPH| 5000 W CHAMBERS ST| MILWAUKEE| WI| 53210| WI - Milwaukee| 14| $10514.28| $5562.50| $4522.78| +--------------------+-----------+---------------+-----------------------+-------------+--------------+-----------------+------------------------------------+----------------+-----------------------+----------------------+-------------------------+

Entfernen Sie nun die beiden fehlerhaften Datensätze wie folgt:

medicare_dataframe = medicare_res.toDF() medicare_dataframe = medicare_dataframe.where("'provider id' is NOT NULL")

Schritt 4: Zuordnen der Daten und Nutzung der Apache Spark Lambda Funktionen

AWS Glue unterstützt direkt Lambda-Funktionen (benutzerdefinierte Funktionen) noch nicht. Aber Sie können jederzeit ein DynamicFrame zu und von Apache Spark DataFrame konvertieren, um die Vorteile der Spark-Funktionalität zusätzlich zu den speziellen Features von DynamicFrames zu nutzen.

Als nächstes wandeln Sie die Zahlungsinformationen in Zahlen um, sodass Analytik-Engines wie Amazon Redshift oder Amazon Athena ihre Zahlen schneller verarbeiten können:

from pyspark.sql.functions import udf from pyspark.sql.types import StringType chop_f = udf(lambda x: x[1:], StringType()) medicare_dataframe = medicare_dataframe.withColumn( "ACC", chop_f( medicare_dataframe["average covered charges"])).withColumn( "ATP", chop_f( medicare_dataframe["average total payments"])).withColumn( "AMP", chop_f( medicare_dataframe["average medicare payments"])) medicare_dataframe.select(['ACC', 'ATP', 'AMP']).show()

Die Ausgabe des show-Aufrufs sieht wie folgt aus:

+--------+-------+-------+ | ACC| ATP| AMP| +--------+-------+-------+ |32963.07|5777.24|4763.73| |15131.85|5787.57|4976.71| |37560.37|5434.95|4453.79| |13998.28|5417.56|4129.16| |31633.27|5658.33|4851.44| |16920.79|6653.80|5374.14| |11977.13|5834.74|4761.41| |35841.09|8031.12|5858.50| |28523.39|6113.38|5228.40| |75233.38|5541.05|4386.94| |67327.92|5461.57|4493.57| |39607.28|5356.28|4408.20| |22862.23|5374.65|4186.02| |31110.85|5366.23|4376.23| |25411.33|5282.93|4383.73| | 9234.51|5676.55|4509.11| |15895.85|5930.11|3972.85| |19721.16|6192.54|5179.38| |10710.88|4968.00|3898.88| |51343.75|5996.00|4962.45| +--------+-------+-------+ only showing top 20 rows

Das sind alles Zeichenfolgen in den Daten. Wir können die leistungsstarke apply_mapping-Transformationsmethode verwenden, um die Daten zu löschen, umzubenennen, zu casten und zu verschachteln, sodass andere Datenprogrammiersprachen und -systeme leicht darauf zugreifen können:

from awsglue.dynamicframe import DynamicFrame medicare_tmp_dyf = DynamicFrame.fromDF(medicare_dataframe, glueContext, "nested") medicare_nest_dyf = medicare_tmp_dyf.apply_mapping([('drg definition', 'string', 'drg', 'string'), ('provider id', 'long', 'provider.id', 'long'), ('provider name', 'string', 'provider.name', 'string'), ('provider city', 'string', 'provider.city', 'string'), ('provider state', 'string', 'provider.state', 'string'), ('provider zip code', 'long', 'provider.zip', 'long'), ('hospital referral region description', 'string','rr', 'string'), ('ACC', 'string', 'charges.covered', 'double'), ('ATP', 'string', 'charges.total_pay', 'double'), ('AMP', 'string', 'charges.medicare_pay', 'double')]) medicare_nest_dyf.printSchema()

Die printSchema-Ausgabe sieht wie folgt aus:

root |-- drg: string |-- provider: struct | |-- id: long | |-- name: string | |-- city: string | |-- state: string | |-- zip: long |-- rr: string |-- charges: struct | |-- covered: double | |-- total_pay: double | |-- medicare_pay: double

Wenn Sie die Daten wieder in ein Spark DataFrame verwandeln, können Sie sehen, wie sie jetzt aussehen:

medicare_nest_dyf.toDF().show()

Die Ausgabe sieht wie folgt aus:

+--------------------+--------------------+---------------+--------------------+ | drg| provider| rr| charges| +--------------------+--------------------+---------------+--------------------+ |039 - EXTRACRANIA...|[10001,SOUTHEAST ...| AL - Dothan|[32963.07,5777.24...| |039 - EXTRACRANIA...|[10005,MARSHALL M...|AL - Birmingham|[15131.85,5787.57...| |039 - EXTRACRANIA...|[10006,ELIZA COFF...|AL - Birmingham|[37560.37,5434.95...| |039 - EXTRACRANIA...|[10011,ST VINCENT...|AL - Birmingham|[13998.28,5417.56...| |039 - EXTRACRANIA...|[10016,SHELBY BAP...|AL - Birmingham|[31633.27,5658.33...| |039 - EXTRACRANIA...|[10023,BAPTIST ME...|AL - Montgomery|[16920.79,6653.8,...| |039 - EXTRACRANIA...|[10029,EAST ALABA...|AL - Birmingham|[11977.13,5834.74...| |039 - EXTRACRANIA...|[10033,UNIVERSITY...|AL - Birmingham|[35841.09,8031.12...| |039 - EXTRACRANIA...|[10039,HUNTSVILLE...|AL - Huntsville|[28523.39,6113.38...| |039 - EXTRACRANIA...|[10040,GADSDEN RE...|AL - Birmingham|[75233.38,5541.05...| |039 - EXTRACRANIA...|[10046,RIVERVIEW ...|AL - Birmingham|[67327.92,5461.57...| |039 - EXTRACRANIA...|[10055,FLOWERS HO...| AL - Dothan|[39607.28,5356.28...| |039 - EXTRACRANIA...|[10056,ST VINCENT...|AL - Birmingham|[22862.23,5374.65...| |039 - EXTRACRANIA...|[10078,NORTHEAST ...|AL - Birmingham|[31110.85,5366.23...| |039 - EXTRACRANIA...|[10083,SOUTH BALD...| AL - Mobile|[25411.33,5282.93...| |039 - EXTRACRANIA...|[10085,DECATUR GE...|AL - Huntsville|[9234.51,5676.55,...| |039 - EXTRACRANIA...|[10090,PROVIDENCE...| AL - Mobile|[15895.85,5930.11...| |039 - EXTRACRANIA...|[10092,D C H REGI...|AL - Tuscaloosa|[19721.16,6192.54...| |039 - EXTRACRANIA...|[10100,THOMAS HOS...| AL - Mobile|[10710.88,4968.0,...| |039 - EXTRACRANIA...|[10103,BAPTIST ME...|AL - Birmingham|[51343.75,5996.0,...| +--------------------+--------------------+---------------+--------------------+ only showing top 20 rows

Schritt 5: Schreiben der Daten in Apache Parquet

AWS Glue erleichtert das Schreiben der Daten in einem Format wie Apache Parquet, das relationale Datenbanken effektiv nutzen können:

glueContext.write_dynamic_frame.from_options( frame = medicare_nest_dyf, connection_type = "s3", connection_options = {"path": "s3://glue-sample-target/output-dir/medicare_parquet"}, format = "parquet")