翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
コード例: ResolveChoice、Lambda、および ApplyMapping を使用したデータ準備
この例で使用されているデータセットは、2 つの Data.CMS.govs3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv
) 内に置かれています。
この例のソースコードは、data_cleaning_and_lambda.py
例AWS Glue GitHub リポジトリの
AWS で実行中に Python または PySpark スクリプトをデバッグするための推奨方法は、AWS Glue Studio のノートブックを使用することです。
ステップ 1: Amazon S3 バケット内のデータをクロールする
AWS Management Console にサインインし、AWS Glue コンソール (https://console.aws.amazon.com/glue/
) を開きます。 -
クローラーの設定 で説明されているプロセスに従って
s3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv
ファイルをクロールできる新しいクローラを作成し、得られた結果のメタデータを AWS Glue Data Catalog のpayments
という名前のデータベースに配置します。 -
新しいクローラを実行し、
payments
データベースを確認します。クローラは、最初のファイルを読み込んでファイルの形式と区切り記号を判断してから、データベースにmedicare
という名前のメタデータテーブルを作成したことが分かります。新しい
medicare
テーブルのスキーマは次のようになります。Column name Data type ================================================== drg definition string provider id bigint provider name string provider street address string provider city string provider state string provider zip code bigint hospital referral region description string total discharges bigint average covered charges string average total payments string average medicare payments string
ステップ 2: 開発エンドポイントノートブックに共通スクリプトを追加する
次の共通スクリプトを開発エンドポイントノートブックに貼り付けて、必要な AWS Glue ライブラリをインポートし、単一の GlueContext
を設定します。
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job glueContext = GlueContext(SparkContext.getOrCreate())
ステップ 3: 異なるスキーマ解析を比較する
次に、Apache Spark DataFrame
によって認識されたスキーマが、AWS Glue クローラによって記録されたスキーマと同じかどうかを確認できます。以下のコードを実行します。
medicare = spark.read.format( "com.databricks.spark.csv").option( "header", "true").option( "inferSchema", "true").load( 's3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv') medicare.printSchema()
printSchema
呼び出しの出力は次のとおりです。
root
|-- DRG Definition: string (nullable = true)
|-- Provider Id: string (nullable = true)
|-- Provider Name: string (nullable = true)
|-- Provider Street Address: string (nullable = true)
|-- Provider City: string (nullable = true)
|-- Provider State: string (nullable = true)
|-- Provider Zip Code: integer (nullable = true)
|-- Hospital Referral Region Description: string (nullable = true)
|-- Total Discharges : integer (nullable = true)
|-- Average Covered Charges : string (nullable = true)
|-- Average Total Payments : string (nullable = true)
|-- Average Medicare Payments: string (nullable = true)
次に、AWS Glue DynamicFrame
によって生成されるスキーマを確認します。
medicare_dynamicframe = glueContext.create_dynamic_frame.from_catalog( database = "payments", table_name = "medicare") medicare_dynamicframe.printSchema()
printSchema
の出力は次のとおりです。
root
|-- drg definition: string
|-- provider id: choice
| |-- long
| |-- string
|-- provider name: string
|-- provider street address: string
|-- provider city: string
|-- provider state: string
|-- provider zip code: long
|-- hospital referral region description: string
|-- total discharges: long
|-- average covered charges: string
|-- average total payments: string
|-- average medicare payments: string
DynamicFrame
は、provider id
が long
型または string
型のいずれかであるスキーマを生成します。DataFrame
スキーマは Provider Id
を string
型としてリストし、Data Catalog は provider id
を bigint
型としてリストします。
正しいものはどちらでしょうか。ファイルの末尾には、その列に string
値を持つ 2 つのレコード (160,000 レコードのうち) があります。これらは、問題を説明するために導入されたエラーのあるレコードです。
このような問題に対処するために、AWS Glue DynamicFrame
では Choice 型の概念を導入しています。この場合、DynamicFrame
は、その列に long
値と string
値の両方が存在することを示しています。AWS Glue クローラはデータの 2 MB のプレフィックスのみを考慮しているため、string
値を見落としました。Apache Spark DataFrame
はデータセット全体を考慮しましたが、最も一般的な型、つまり string
型を強制的に列に割り当てました。実際、慣れていない複雑な型やバリエーションがある場合にも、Spark は最も一般的なケースを使用することがあります。
provider id
列のクエリを実行するには、Choice 型をまず解決する必要があります。DynamicFrame
で、cast:long
オプションを指定して resolveChoice
変換メソッドを使用すると、これらの string
値を long
値に変換できます。
medicare_res = medicare_dynamicframe.resolveChoice(specs = [('provider id','cast:long')]) medicare_res.printSchema()
この場合、printSchema
の出力は次のようになります。
root
|-- drg definition: string
|-- provider id: long
|-- provider name: string
|-- provider street address: string
|-- provider city: string
|-- provider state: string
|-- provider zip code: long
|-- hospital referral region description: string
|-- total discharges: long
|-- average covered charges: string
|-- average total payments: string
|-- average medicare payments: string
値がキャストできない string
だった場合に、AWS Glue は null
を挿入しました。
もう 1 つのオプションは、両方のタイプの値を保持する struct
に Choice 型を変換することです。
次に、異常だった行を確認してみましょう。
medicare_res.toDF().where("'provider id' is NULL").show()
次のように表示されています。
+--------------------+-----------+---------------+-----------------------+-------------+--------------+-----------------+------------------------------------+----------------+-----------------------+----------------------+-------------------------+
| drg definition|provider id| provider name|provider street address|provider city|provider state|provider zip code|hospital referral region description|total discharges|average covered charges|average total payments|average medicare payments|
+--------------------+-----------+---------------+-----------------------+-------------+--------------+-----------------+------------------------------------+----------------+-----------------------+----------------------+-------------------------+
|948 - SIGNS & SYM...| null| INC| 1050 DIVISION ST| MAUSTON| WI| 53948| WI - Madison| 12| $11961.41| $4619.00| $3775.33|
|948 - SIGNS & SYM...| null| INC- ST JOSEPH| 5000 W CHAMBERS ST| MILWAUKEE| WI| 53210| WI - Milwaukee| 14| $10514.28| $5562.50| $4522.78|
+--------------------+-----------+---------------+-----------------------+-------------+--------------+-----------------+------------------------------------+----------------+-----------------------+----------------------+-------------------------+
次のように、2 つの不正な形式のレコードを削除します。
medicare_dataframe = medicare_res.toDF() medicare_dataframe = medicare_dataframe.where("'provider id' is NOT NULL")
ステップ 4: データのマッピングと Apache Spark Lambda 関数の使用
AWS Glue では、まだユーザー定義関数とも呼ばれる Lambda 関数が直接サポートされていません。しかし、いつでも DynamicFrame
を Apache Spark DataFrame
との間で変換して、DynamicFrames
の特殊な機能に加えて Spark の機能を利用できます。
次に、支払い情報を数字に変換すると、Amazon Redshift や Amazon Athena のような分析エンジンが、より迅速に数値処理を実行できるようになります。
from pyspark.sql.functions import udf from pyspark.sql.types import StringType chop_f = udf(lambda x: x[1:], StringType()) medicare_dataframe = medicare_dataframe.withColumn( "ACC", chop_f( medicare_dataframe["average covered charges"])).withColumn( "ATP", chop_f( medicare_dataframe["average total payments"])).withColumn( "AMP", chop_f( medicare_dataframe["average medicare payments"])) medicare_dataframe.select(['ACC', 'ATP', 'AMP']).show()
show
呼び出しの出力は次のとおりです。
+--------+-------+-------+
| ACC| ATP| AMP|
+--------+-------+-------+
|32963.07|5777.24|4763.73|
|15131.85|5787.57|4976.71|
|37560.37|5434.95|4453.79|
|13998.28|5417.56|4129.16|
|31633.27|5658.33|4851.44|
|16920.79|6653.80|5374.14|
|11977.13|5834.74|4761.41|
|35841.09|8031.12|5858.50|
|28523.39|6113.38|5228.40|
|75233.38|5541.05|4386.94|
|67327.92|5461.57|4493.57|
|39607.28|5356.28|4408.20|
|22862.23|5374.65|4186.02|
|31110.85|5366.23|4376.23|
|25411.33|5282.93|4383.73|
| 9234.51|5676.55|4509.11|
|15895.85|5930.11|3972.85|
|19721.16|6192.54|5179.38|
|10710.88|4968.00|3898.88|
|51343.75|5996.00|4962.45|
+--------+-------+-------+
only showing top 20 rows
これらは、すべてデータ内ではまだ文字列です。強力な apply_mapping
変換メソッドを使用して、データをドロップ、名前変更、キャスト、およびネストし、他のデータプログラミング言語やシステムで容易にアクセスできるようにします。
from awsglue.dynamicframe import DynamicFrame medicare_tmp_dyf = DynamicFrame.fromDF(medicare_dataframe, glueContext, "nested") medicare_nest_dyf = medicare_tmp_dyf.apply_mapping([('drg definition', 'string', 'drg', 'string'), ('provider id', 'long', 'provider.id', 'long'), ('provider name', 'string', 'provider.name', 'string'), ('provider city', 'string', 'provider.city', 'string'), ('provider state', 'string', 'provider.state', 'string'), ('provider zip code', 'long', 'provider.zip', 'long'), ('hospital referral region description', 'string','rr', 'string'), ('ACC', 'string', 'charges.covered', 'double'), ('ATP', 'string', 'charges.total_pay', 'double'), ('AMP', 'string', 'charges.medicare_pay', 'double')]) medicare_nest_dyf.printSchema()
printSchema
の出力は次のとおりです。
root
|-- drg: string
|-- provider: struct
| |-- id: long
| |-- name: string
| |-- city: string
| |-- state: string
| |-- zip: long
|-- rr: string
|-- charges: struct
| |-- covered: double
| |-- total_pay: double
| |-- medicare_pay: double
データを Spark DataFrame
に戻すと、現在どのような状態かが分かります。
medicare_nest_dyf.toDF().show()
出力は次のとおりです。
+--------------------+--------------------+---------------+--------------------+
| drg| provider| rr| charges|
+--------------------+--------------------+---------------+--------------------+
|039 - EXTRACRANIA...|[10001,SOUTHEAST ...| AL - Dothan|[32963.07,5777.24...|
|039 - EXTRACRANIA...|[10005,MARSHALL M...|AL - Birmingham|[15131.85,5787.57...|
|039 - EXTRACRANIA...|[10006,ELIZA COFF...|AL - Birmingham|[37560.37,5434.95...|
|039 - EXTRACRANIA...|[10011,ST VINCENT...|AL - Birmingham|[13998.28,5417.56...|
|039 - EXTRACRANIA...|[10016,SHELBY BAP...|AL - Birmingham|[31633.27,5658.33...|
|039 - EXTRACRANIA...|[10023,BAPTIST ME...|AL - Montgomery|[16920.79,6653.8,...|
|039 - EXTRACRANIA...|[10029,EAST ALABA...|AL - Birmingham|[11977.13,5834.74...|
|039 - EXTRACRANIA...|[10033,UNIVERSITY...|AL - Birmingham|[35841.09,8031.12...|
|039 - EXTRACRANIA...|[10039,HUNTSVILLE...|AL - Huntsville|[28523.39,6113.38...|
|039 - EXTRACRANIA...|[10040,GADSDEN RE...|AL - Birmingham|[75233.38,5541.05...|
|039 - EXTRACRANIA...|[10046,RIVERVIEW ...|AL - Birmingham|[67327.92,5461.57...|
|039 - EXTRACRANIA...|[10055,FLOWERS HO...| AL - Dothan|[39607.28,5356.28...|
|039 - EXTRACRANIA...|[10056,ST VINCENT...|AL - Birmingham|[22862.23,5374.65...|
|039 - EXTRACRANIA...|[10078,NORTHEAST ...|AL - Birmingham|[31110.85,5366.23...|
|039 - EXTRACRANIA...|[10083,SOUTH BALD...| AL - Mobile|[25411.33,5282.93...|
|039 - EXTRACRANIA...|[10085,DECATUR GE...|AL - Huntsville|[9234.51,5676.55,...|
|039 - EXTRACRANIA...|[10090,PROVIDENCE...| AL - Mobile|[15895.85,5930.11...|
|039 - EXTRACRANIA...|[10092,D C H REGI...|AL - Tuscaloosa|[19721.16,6192.54...|
|039 - EXTRACRANIA...|[10100,THOMAS HOS...| AL - Mobile|[10710.88,4968.0,...|
|039 - EXTRACRANIA...|[10103,BAPTIST ME...|AL - Birmingham|[51343.75,5996.0,...|
+--------------------+--------------------+---------------+--------------------+
only showing top 20 rows
ステップ 5: Apache Parquet にデータを書き込む
AWS Glue は、リレーショナルデータベースが効果的に消費できる Apache Parquet のような形式でデータを書き込むことを容易にします。
glueContext.write_dynamic_frame.from_options( frame = medicare_nest_dyf, connection_type = "s3", connection_options = {"path": "s3://glue-sample-target/output-dir/medicare_parquet"}, format = "parquet")