Ejemplo de código: Preparación de datos con ResolveChoice Lambda y ApplyMapping - AWS Glue

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Ejemplo de código: Preparación de datos con ResolveChoice Lambda y ApplyMapping

El conjunto de datos que se utiliza en este ejemplo está formado por datos de pago de Medicare Provider que se descargaron de dos sitios de Data.CMS.gov, conjuntos de datos: "Inpatient Prospective Payment System Provider Summary for the Top 100 Diagnosis-Related Groups - FY2011" e "Inpatient Charge Data FY 2011". Después de descargar los datos, modificamos el conjunto de datos para presentar un par de registros erróneos al final del archivo. Este archivo modificado se encuentra en un bucket público de Amazon S3 en s3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv.

Puedes encontrar el código fuente de este ejemplo en el data_cleaning_and_lambda.py archivo del GitHub repositorio de AWS Glueejemplos.

La forma preferida de depurar Python o PySpark scripts mientras AWS se ejecuta es usar Notebooks en AWS Glue Studio.

Paso 1: Rastrear los datos del bucket de Amazon S3

  1. Inicie sesión en la AWS Management Console y abra la consola de AWS Glue en https://console.aws.amazon.com/glue/.

  2. En función del proceso descrito en Trabajo con rastreadores en la consola de AWS Glue, cree un rastreador nuevo que pueda rastrear el archivo s3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv y poner los metadatos resultantes en una base de datos denominada payments en AWS Glue Data Catalog.

  3. Ejecute el nuevo rastreador y, a continuación, compruebe la base de datos payments. Deberá ver que el rastreador ha creado en la base de datos una tabla de metadatos denominada medicare después de leer el comienzo del archivo para determinar su formato y delimitador.

    El esquema de la tabla medicare nueva es el siguiente:

    Column name Data type ================================================== drg definition string provider id bigint provider name string provider street address string provider city string provider state string provider zip code bigint hospital referral region description string total discharges bigint average covered charges string average total payments string average medicare payments string

Paso 2: Añadir un script reutilizable al cuaderno del punto de conexión de desarrollo

Pegue el script reutilizable siguiente en el cuaderno del punto de enlace de desarrollo para importar las bibliotecas de AWS Glue que necesite y configurar un único GlueContext:

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job glueContext = GlueContext(SparkContext.getOrCreate())

Paso 3: Comparar diferentes análisis de esquemas

A continuación, puede ver si el esquema que un elemento DataFrame de Apache Spark reconoció es el mismo que su rastreador AWS Glue ha registrado. Ejecute el código siguiente:

medicare = spark.read.format( "com.databricks.spark.csv").option( "header", "true").option( "inferSchema", "true").load( 's3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv') medicare.printSchema()

Esta es la salida de la llamada printSchema:

root |-- DRG Definition: string (nullable = true) |-- Provider Id: string (nullable = true) |-- Provider Name: string (nullable = true) |-- Provider Street Address: string (nullable = true) |-- Provider City: string (nullable = true) |-- Provider State: string (nullable = true) |-- Provider Zip Code: integer (nullable = true) |-- Hospital Referral Region Description: string (nullable = true) |-- Total Discharges : integer (nullable = true) |-- Average Covered Charges : string (nullable = true) |-- Average Total Payments : string (nullable = true) |-- Average Medicare Payments: string (nullable = true)

A continuación, examine el esquema que un DynamicFrame de AWS Glue genera:

medicare_dynamicframe = glueContext.create_dynamic_frame.from_catalog( database = "payments", table_name = "medicare") medicare_dynamicframe.printSchema()

La salida de printSchema es la siguiente:

root |-- drg definition: string |-- provider id: choice | |-- long | |-- string |-- provider name: string |-- provider street address: string |-- provider city: string |-- provider state: string |-- provider zip code: long |-- hospital referral region description: string |-- total discharges: long |-- average covered charges: string |-- average total payments: string |-- average medicare payments: string

El elemento DynamicFrame genera un esquema donde provider id puede ser el tipo long o el tipo string. El esquema DataFrame indica que Provider Id es de tipo string, y el Data Catalog indica que provider id es de tipo bigint.

¿Cuál es el correcto? Hay dos registros al final del archivo (de 160 000 registros) con valores string en dicha columna. Estos son los registros erróneos que se introdujeron para ilustrar un problema.

Para abordar este tipo de problema, el DynamicFrame de AWS Glue introduce el concepto de tipo de choice (elección). En este caso, DynamicFrame muestra que los valores long y string pueden aparecer en dicha columna. El rastreador de AWS Glue no tuvo en cuenta los valores string, ya que consideró solo un prefijo de 2 MB de los datos. El elemento DataFrame de Apache Spark tuvo en cuenta el conjunto de datos en su totalidad, pero se vio obligado a asignar el tipo más general a la columna, en este caso string. De hecho, Spark a menudo recurre al caso más general cuando hay tipos complejos o variaciones con las que no está familiarizado.

Para consultar la columna provider id, primero debe resolver el tipo de elección. Puede utilizar el método de transformación resolveChoice de DynamicFrame para convertir estos valores string en valores long con una opción cast:long:

medicare_res = medicare_dynamicframe.resolveChoice(specs = [('provider id','cast:long')]) medicare_res.printSchema()

Ahora la salida printSchema es:

root |-- drg definition: string |-- provider id: long |-- provider name: string |-- provider street address: string |-- provider city: string |-- provider state: string |-- provider zip code: long |-- hospital referral region description: string |-- total discharges: long |-- average covered charges: string |-- average total payments: string |-- average medicare payments: string

Cuando el valor era un elemento string que no se podía transformar, AWS Glue insertaba un valor null.

Otra opción consiste en convertir el tipo de elección en un elemento struct, que mantiene los valores de ambos tipos.

A continuación, examine las filas que eran anómalas:

medicare_res.toDF().where("'provider id' is NULL").show()

Verá lo siguiente:

+--------------------+-----------+---------------+-----------------------+-------------+--------------+-----------------+------------------------------------+----------------+-----------------------+----------------------+-------------------------+ | drg definition|provider id| provider name|provider street address|provider city|provider state|provider zip code|hospital referral region description|total discharges|average covered charges|average total payments|average medicare payments| +--------------------+-----------+---------------+-----------------------+-------------+--------------+-----------------+------------------------------------+----------------+-----------------------+----------------------+-------------------------+ |948 - SIGNS & SYM...| null| INC| 1050 DIVISION ST| MAUSTON| WI| 53948| WI - Madison| 12| $11961.41| $4619.00| $3775.33| |948 - SIGNS & SYM...| null| INC- ST JOSEPH| 5000 W CHAMBERS ST| MILWAUKEE| WI| 53210| WI - Milwaukee| 14| $10514.28| $5562.50| $4522.78| +--------------------+-----------+---------------+-----------------------+-------------+--------------+-----------------+------------------------------------+----------------+-----------------------+----------------------+-------------------------+

Ahora elimine los dos registros mal formados, tal y como se indica a continuación:

medicare_dataframe = medicare_res.toDF() medicare_dataframe = medicare_dataframe.where("'provider id' is NOT NULL")

Paso 4: Asignar los datos y utilizar las funciones Lambda de Apache Spark

AWS Glue todavía no es compatible directamente con funciones Lambda, también conocidas como funciones definidas por el usuario. Pero siempre puede convertir un elemento DynamicFrame en un elemento DataFrame de Apache Spark y viceversa, para aprovechar la funcionalidad de Spark, además de las características especiales de DynamicFrames.

A continuación, convierta la información de pago en números, para que los motores de análisis como Amazon Redshift o Amazon Athena puedan controlar más deprisa sus números:

from pyspark.sql.functions import udf from pyspark.sql.types import StringType chop_f = udf(lambda x: x[1:], StringType()) medicare_dataframe = medicare_dataframe.withColumn( "ACC", chop_f( medicare_dataframe["average covered charges"])).withColumn( "ATP", chop_f( medicare_dataframe["average total payments"])).withColumn( "AMP", chop_f( medicare_dataframe["average medicare payments"])) medicare_dataframe.select(['ACC', 'ATP', 'AMP']).show()

La salida de la llamada show es la siguiente:

+--------+-------+-------+ | ACC| ATP| AMP| +--------+-------+-------+ |32963.07|5777.24|4763.73| |15131.85|5787.57|4976.71| |37560.37|5434.95|4453.79| |13998.28|5417.56|4129.16| |31633.27|5658.33|4851.44| |16920.79|6653.80|5374.14| |11977.13|5834.74|4761.41| |35841.09|8031.12|5858.50| |28523.39|6113.38|5228.40| |75233.38|5541.05|4386.94| |67327.92|5461.57|4493.57| |39607.28|5356.28|4408.20| |22862.23|5374.65|4186.02| |31110.85|5366.23|4376.23| |25411.33|5282.93|4383.73| | 9234.51|5676.55|4509.11| |15895.85|5930.11|3972.85| |19721.16|6192.54|5179.38| |10710.88|4968.00|3898.88| |51343.75|5996.00|4962.45| +--------+-------+-------+ only showing top 20 rows

Sigue habiendo cadenas en los datos. Podemos utilizar el potente método de transformación apply_mapping para rechazar, renombrar, difundir y anidar los datos para que otros lenguajes de programación y sistemas de datos puedan obtener fácilmente acceso a ellos:

from awsglue.dynamicframe import DynamicFrame medicare_tmp_dyf = DynamicFrame.fromDF(medicare_dataframe, glueContext, "nested") medicare_nest_dyf = medicare_tmp_dyf.apply_mapping([('drg definition', 'string', 'drg', 'string'), ('provider id', 'long', 'provider.id', 'long'), ('provider name', 'string', 'provider.name', 'string'), ('provider city', 'string', 'provider.city', 'string'), ('provider state', 'string', 'provider.state', 'string'), ('provider zip code', 'long', 'provider.zip', 'long'), ('hospital referral region description', 'string','rr', 'string'), ('ACC', 'string', 'charges.covered', 'double'), ('ATP', 'string', 'charges.total_pay', 'double'), ('AMP', 'string', 'charges.medicare_pay', 'double')]) medicare_nest_dyf.printSchema()

La salida de printSchema es la siguiente:

root |-- drg: string |-- provider: struct | |-- id: long | |-- name: string | |-- city: string | |-- state: string | |-- zip: long |-- rr: string |-- charges: struct | |-- covered: double | |-- total_pay: double | |-- medicare_pay: double

Si vuelve a convertir los datos en un elemento DataFrame de Spark, puede mostrar cómo son ahora:

medicare_nest_dyf.toDF().show()

La salida es la siguiente:

+--------------------+--------------------+---------------+--------------------+ | drg| provider| rr| charges| +--------------------+--------------------+---------------+--------------------+ |039 - EXTRACRANIA...|[10001,SOUTHEAST ...| AL - Dothan|[32963.07,5777.24...| |039 - EXTRACRANIA...|[10005,MARSHALL M...|AL - Birmingham|[15131.85,5787.57...| |039 - EXTRACRANIA...|[10006,ELIZA COFF...|AL - Birmingham|[37560.37,5434.95...| |039 - EXTRACRANIA...|[10011,ST VINCENT...|AL - Birmingham|[13998.28,5417.56...| |039 - EXTRACRANIA...|[10016,SHELBY BAP...|AL - Birmingham|[31633.27,5658.33...| |039 - EXTRACRANIA...|[10023,BAPTIST ME...|AL - Montgomery|[16920.79,6653.8,...| |039 - EXTRACRANIA...|[10029,EAST ALABA...|AL - Birmingham|[11977.13,5834.74...| |039 - EXTRACRANIA...|[10033,UNIVERSITY...|AL - Birmingham|[35841.09,8031.12...| |039 - EXTRACRANIA...|[10039,HUNTSVILLE...|AL - Huntsville|[28523.39,6113.38...| |039 - EXTRACRANIA...|[10040,GADSDEN RE...|AL - Birmingham|[75233.38,5541.05...| |039 - EXTRACRANIA...|[10046,RIVERVIEW ...|AL - Birmingham|[67327.92,5461.57...| |039 - EXTRACRANIA...|[10055,FLOWERS HO...| AL - Dothan|[39607.28,5356.28...| |039 - EXTRACRANIA...|[10056,ST VINCENT...|AL - Birmingham|[22862.23,5374.65...| |039 - EXTRACRANIA...|[10078,NORTHEAST ...|AL - Birmingham|[31110.85,5366.23...| |039 - EXTRACRANIA...|[10083,SOUTH BALD...| AL - Mobile|[25411.33,5282.93...| |039 - EXTRACRANIA...|[10085,DECATUR GE...|AL - Huntsville|[9234.51,5676.55,...| |039 - EXTRACRANIA...|[10090,PROVIDENCE...| AL - Mobile|[15895.85,5930.11...| |039 - EXTRACRANIA...|[10092,D C H REGI...|AL - Tuscaloosa|[19721.16,6192.54...| |039 - EXTRACRANIA...|[10100,THOMAS HOS...| AL - Mobile|[10710.88,4968.0,...| |039 - EXTRACRANIA...|[10103,BAPTIST ME...|AL - Birmingham|[51343.75,5996.0,...| +--------------------+--------------------+---------------+--------------------+ only showing top 20 rows

Paso 5: Escribir los datos en Apache Parquet

AWS Glue le facilita la tarea de escribir los datos en un formato como Apache Parquet que las bases de datos relacionales pueden utilizar de manera eficaz:

glueContext.write_dynamic_frame.from_options( frame = medicare_nest_dyf, connection_type = "s3", connection_options = {"path": "s3://glue-sample-target/output-dir/medicare_parquet"}, format = "parquet")