Contoh kode: Persiapan data menggunakan ResolveChoice, Lambda, dan ApplyMapping - AWS Glue

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Contoh kode: Persiapan data menggunakan ResolveChoice, Lambda, dan ApplyMapping

Dataset yang digunakan dalam contoh ini terdiri dari data pembayaran Penyedia Medicare yang diunduh dari dua kumpulan data data.cms.gov: “Ringkasan Penyedia Sistem Pembayaran Prospektif Rawat Inap untuk 100 Grup Terkait Diagnosis Teratas - FY2011" dan “Data Biaya Rawat Inap TA 2011". Setelah mengunduh data, kami memodifikasi kumpulan data untuk memperkenalkan beberapa catatan yang salah di akhir file. File yang dimodifikasi ini terletak di bucket Amazon S3 publik di s3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv.

Anda dapat menemukan kode sumber untuk contoh ini dalam data_cleaning_and_lambda.py file di GitHub repositori AWS Gluecontoh.

Cara yang lebih disukai untuk men-debug Python PySpark atau skrip saat berjalan adalah dengan menggunakan Notebook AWS di Glue Studio. AWS

Langkah 1: Merayapi data di bucket Amazon S3

  1. Masuk ke AWS Management Console, lalu buka konsol AWS Glue di https://console.aws.amazon.com/glue/.

  2. Setelah proses yang dijelaskan dalam Bekerja dengan crawler di konsol AWS Glue, buat sebuah crawler baru yang dapat melakukan perayapan pada file s3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv, dan dapat menempatkan metadata yang dihasilkan ke dalam basis data bernama payments di Katalog Data Glue AWS.

  3. Jalankan crawler baru tersebut, dan kemudian periksa basis data payments. Anda akan menemukan bahwa crawler tersebut telah membuat tabel metadata bernama medicare dalam basis data setelah pembacaan awal file untuk menentukan format dan pembatas.

    Skema baru tabel medicare adalah sebagai berikut:

    Column name Data type ================================================== drg definition string provider id bigint provider name string provider street address string provider city string provider state string provider zip code bigint hospital referral region description string total discharges bigint average covered charges string average total payments string average medicare payments string

Langkah 2: Tambahkan skrip boilerplate ke notebook endpoint pengembangan

Rekatkan skrip boilerplate berikut ke notebook endpoint pengembangan untuk mengimpor AWS Glue pustaka yang Anda butuhkan, dan siapkan satu: GlueContext

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job glueContext = GlueContext(SparkContext.getOrCreate())

Langkah 3: Bandingkan parsing skema yang berbeda

Selanjutnya, Anda dapat melihat apakah skema yang dikenali oleh Apache Spark sama dengan skema DataFrame yang direkam oleh crawler AndaAWS Glue. Jalankan kode ini:

medicare = spark.read.format( "com.databricks.spark.csv").option( "header", "true").option( "inferSchema", "true").load( 's3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv') medicare.printSchema()

Berikut adalah output dari panggilan printSchema:

root |-- DRG Definition: string (nullable = true) |-- Provider Id: string (nullable = true) |-- Provider Name: string (nullable = true) |-- Provider Street Address: string (nullable = true) |-- Provider City: string (nullable = true) |-- Provider State: string (nullable = true) |-- Provider Zip Code: integer (nullable = true) |-- Hospital Referral Region Description: string (nullable = true) |-- Total Discharges : integer (nullable = true) |-- Average Covered Charges : string (nullable = true) |-- Average Total Payments : string (nullable = true) |-- Average Medicare Payments: string (nullable = true)

Selanjutnya, lihat skema yang AWS Glue DynamicFrame dihasilkan:

medicare_dynamicframe = glueContext.create_dynamic_frame.from_catalog( database = "payments", table_name = "medicare") medicare_dynamicframe.printSchema()

Output dari printSchema adalah sebagai berikut:

root |-- drg definition: string |-- provider id: choice | |-- long | |-- string |-- provider name: string |-- provider street address: string |-- provider city: string |-- provider state: string |-- provider zip code: long |-- hospital referral region description: string |-- total discharges: long |-- average covered charges: string |-- average total payments: string |-- average medicare payments: string

DynamicFrame menghasilkan skema di mana provider id bisa berupa jenis long atau string. Skema DataFrame mencantumkan skema Provider Id sebagai jenis string, dan Katalog Data mencantumkan provider id sebagai jenis bigint.

Yang mana yang benar? Ada dua catatan di akhir file (dari 160.000 catatan) dengan nilai string dalam kolom tersebut. Ini adalah catatan-catatan yang keliru yang diperkenalkan untuk menggambarkan masalah.

Untuk mengatasi masalah semacam ini, AWS Glue DynamicFrame memperkenalkan konsep jenis pilihan. Dalam kasus ini, DynamicFrame menunjukkan bahwa nilai long dan string dapat muncul di kolom tersebut. AWS GlueCrawler melewatkan string nilai karena hanya dianggap awalan 2 MB data. DataFrame Apache Spark dianggap sebagai seluruh set data, tetapi ia dipaksa untuk menetapkan jenis yang paling umum ke kolom, yaitu string. Sebenarnya, Spark sering menggunakan kasus yang paling umum bila ada jenis atau variasi kompleks yang tidak biasa.

Untuk meng-kueri kolom provider id, selesaikan jenis pilihan terlebih dahulu. Anda dapat menggunakan metode transformasi resolveChoice di DynamicFrame untuk mengkonversi nilai string ke nilai long dengan pilihan cast:long:

medicare_res = medicare_dynamicframe.resolveChoice(specs = [('provider id','cast:long')]) medicare_res.printSchema()

Output printSchema sekarang:

root |-- drg definition: string |-- provider id: long |-- provider name: string |-- provider street address: string |-- provider city: string |-- provider state: string |-- provider zip code: long |-- hospital referral region description: string |-- total discharges: long |-- average covered charges: string |-- average total payments: string |-- average medicare payments: string

Dimana nilainya adalah a string yang tidak bisa dilemparkan, AWS Glue dimasukkan anull.

Pilihan lain adalah mengkonversi jenis pilihan ke struct, yang menyimpan nilai dari kedua jenis tersebut.

Selanjutnya, lihat baris yang mengalami anomali:

medicare_res.toDF().where("'provider id' is NULL").show()

Jika Anda melihat seperti berikut:

+--------------------+-----------+---------------+-----------------------+-------------+--------------+-----------------+------------------------------------+----------------+-----------------------+----------------------+-------------------------+ | drg definition|provider id| provider name|provider street address|provider city|provider state|provider zip code|hospital referral region description|total discharges|average covered charges|average total payments|average medicare payments| +--------------------+-----------+---------------+-----------------------+-------------+--------------+-----------------+------------------------------------+----------------+-----------------------+----------------------+-------------------------+ |948 - SIGNS & SYM...| null| INC| 1050 DIVISION ST| MAUSTON| WI| 53948| WI - Madison| 12| $11961.41| $4619.00| $3775.33| |948 - SIGNS & SYM...| null| INC- ST JOSEPH| 5000 W CHAMBERS ST| MILWAUKEE| WI| 53210| WI - Milwaukee| 14| $10514.28| $5562.50| $4522.78| +--------------------+-----------+---------------+-----------------------+-------------+--------------+-----------------+------------------------------------+----------------+-----------------------+----------------------+-------------------------+

Sekarang hapus dua catatan yang cacat, sebagai berikut:

medicare_dataframe = medicare_res.toDF() medicare_dataframe = medicare_dataframe.where("'provider id' is NOT NULL")

Langkah 4: Petakan data dan gunakan fungsi Apache Spark Lambda

AWS Gluebelum secara langsung mendukung fungsi Lambda, juga dikenal sebagai fungsi yang ditentukan pengguna. Tapi Anda selalu dapat mengkonversi DynamicFrame ke dan dari DataFrame Apache Spark untuk memanfaatkan dari fungsionalitas Spark selain fitur khusus DynamicFrames.

Selanjutnya, ubah informasi pembayaran menjadi angka, sehingga mesin analitik seperti Amazon Redshift atau Amazon Athena dapat melakukan penderakan angkanya lebih cepat:

from pyspark.sql.functions import udf from pyspark.sql.types import StringType chop_f = udf(lambda x: x[1:], StringType()) medicare_dataframe = medicare_dataframe.withColumn( "ACC", chop_f( medicare_dataframe["average covered charges"])).withColumn( "ATP", chop_f( medicare_dataframe["average total payments"])).withColumn( "AMP", chop_f( medicare_dataframe["average medicare payments"])) medicare_dataframe.select(['ACC', 'ATP', 'AMP']).show()

Output dari panggilan show adalah sebagai berikut:

+--------+-------+-------+ | ACC| ATP| AMP| +--------+-------+-------+ |32963.07|5777.24|4763.73| |15131.85|5787.57|4976.71| |37560.37|5434.95|4453.79| |13998.28|5417.56|4129.16| |31633.27|5658.33|4851.44| |16920.79|6653.80|5374.14| |11977.13|5834.74|4761.41| |35841.09|8031.12|5858.50| |28523.39|6113.38|5228.40| |75233.38|5541.05|4386.94| |67327.92|5461.57|4493.57| |39607.28|5356.28|4408.20| |22862.23|5374.65|4186.02| |31110.85|5366.23|4376.23| |25411.33|5282.93|4383.73| | 9234.51|5676.55|4509.11| |15895.85|5930.11|3972.85| |19721.16|6192.54|5179.38| |10710.88|4968.00|3898.88| |51343.75|5996.00|4962.45| +--------+-------+-------+ only showing top 20 rows

Ini semua masih string dalam data. Kita bisa menggunakan metode transformasi apply_mapping yang kuat untuk membuang, mengubah nama, mengubah, dan meng-nest data sehingga bahasa pemrograman data dan sistem lain dapat dengan mudah mengaksesnya:

from awsglue.dynamicframe import DynamicFrame medicare_tmp_dyf = DynamicFrame.fromDF(medicare_dataframe, glueContext, "nested") medicare_nest_dyf = medicare_tmp_dyf.apply_mapping([('drg definition', 'string', 'drg', 'string'), ('provider id', 'long', 'provider.id', 'long'), ('provider name', 'string', 'provider.name', 'string'), ('provider city', 'string', 'provider.city', 'string'), ('provider state', 'string', 'provider.state', 'string'), ('provider zip code', 'long', 'provider.zip', 'long'), ('hospital referral region description', 'string','rr', 'string'), ('ACC', 'string', 'charges.covered', 'double'), ('ATP', 'string', 'charges.total_pay', 'double'), ('AMP', 'string', 'charges.medicare_pay', 'double')]) medicare_nest_dyf.printSchema()

Output printSchema adalah sebagai berikut:

root |-- drg: string |-- provider: struct | |-- id: long | |-- name: string | |-- city: string | |-- state: string | |-- zip: long |-- rr: string |-- charges: struct | |-- covered: double | |-- total_pay: double | |-- medicare_pay: double

Mengubah data kembali menjadi DataFrame Spark, Anda dapat menunjukkan apa yang terlihat seperti sekarang:

medicare_nest_dyf.toDF().show()

Output adalah sebagai berikut:

+--------------------+--------------------+---------------+--------------------+ | drg| provider| rr| charges| +--------------------+--------------------+---------------+--------------------+ |039 - EXTRACRANIA...|[10001,SOUTHEAST ...| AL - Dothan|[32963.07,5777.24...| |039 - EXTRACRANIA...|[10005,MARSHALL M...|AL - Birmingham|[15131.85,5787.57...| |039 - EXTRACRANIA...|[10006,ELIZA COFF...|AL - Birmingham|[37560.37,5434.95...| |039 - EXTRACRANIA...|[10011,ST VINCENT...|AL - Birmingham|[13998.28,5417.56...| |039 - EXTRACRANIA...|[10016,SHELBY BAP...|AL - Birmingham|[31633.27,5658.33...| |039 - EXTRACRANIA...|[10023,BAPTIST ME...|AL - Montgomery|[16920.79,6653.8,...| |039 - EXTRACRANIA...|[10029,EAST ALABA...|AL - Birmingham|[11977.13,5834.74...| |039 - EXTRACRANIA...|[10033,UNIVERSITY...|AL - Birmingham|[35841.09,8031.12...| |039 - EXTRACRANIA...|[10039,HUNTSVILLE...|AL - Huntsville|[28523.39,6113.38...| |039 - EXTRACRANIA...|[10040,GADSDEN RE...|AL - Birmingham|[75233.38,5541.05...| |039 - EXTRACRANIA...|[10046,RIVERVIEW ...|AL - Birmingham|[67327.92,5461.57...| |039 - EXTRACRANIA...|[10055,FLOWERS HO...| AL - Dothan|[39607.28,5356.28...| |039 - EXTRACRANIA...|[10056,ST VINCENT...|AL - Birmingham|[22862.23,5374.65...| |039 - EXTRACRANIA...|[10078,NORTHEAST ...|AL - Birmingham|[31110.85,5366.23...| |039 - EXTRACRANIA...|[10083,SOUTH BALD...| AL - Mobile|[25411.33,5282.93...| |039 - EXTRACRANIA...|[10085,DECATUR GE...|AL - Huntsville|[9234.51,5676.55,...| |039 - EXTRACRANIA...|[10090,PROVIDENCE...| AL - Mobile|[15895.85,5930.11...| |039 - EXTRACRANIA...|[10092,D C H REGI...|AL - Tuscaloosa|[19721.16,6192.54...| |039 - EXTRACRANIA...|[10100,THOMAS HOS...| AL - Mobile|[10710.88,4968.0,...| |039 - EXTRACRANIA...|[10103,BAPTIST ME...|AL - Birmingham|[51343.75,5996.0,...| +--------------------+--------------------+---------------+--------------------+ only showing top 20 rows

Langkah 5: Tulis data ke Apache Parquet

AWS Gluemembuatnya mudah untuk menulis data dalam format seperti Apache Parquet yang database relasional dapat secara efektif mengkonsumsi:

glueContext.write_dynamic_frame.from_options( frame = medicare_nest_dyf, connection_type = "s3", connection_options = {"path": "s3://glue-sample-target/output-dir/medicare_parquet"}, format = "parquet")