コード例: データの結合と関係付け - AWS Glue

コード例: データの結合と関係付け

この例では、http://everypolitician.org/ から、Amazon Simple Storage Service (Amazon S3) の sample-dataset バケットにデータセットをダウンロードして使用します。s3://awsglue-datasets/examples/us-legislators/allこのデータセットには、米国議会議員や米国下院および上院議員の議席に関する JSON 形式のデータが含まれており、このチュートリアルの目的のため少し変更され、パブリック Amazon S3 バケットで利用可能になりました。

この例のソースコードは、GitHub ウェブサイトの join_and_relationalize.py Glue サンプルリポジトリの AWS Glue ファイルにあります。

このデータを使用して、このチュートリアルでは以下のことを実行する方法を示します。

  • AWS Glue クローラを使用して、パブリックな Amazon S3 バケットに保存されているオブジェクトを分類し、それらのスキーマを AWS Glue Data Catalog に保存します。

  • クロールの結果のテーブルのメタデータとスキーマを調べます。

  • Data Catalog のメタデータを使用して Python の抽出、転送、およびロード (ETL) スクリプトを記述し、以下の操作を行います。

    • 異なるソースファイル内のデータをまとめて単一のデータテーブルに結合します (つまり、データを非正規化します)。

    • 議員のタイプ別に、結合テーブルを別のテーブルにフィルタリングします。

    • 生成されたデータを後で分析するために Apache Parquet ファイルに分割して書き出します。

Python または PySpark スクリプトをデバッグする最も簡単な方法は、開発エンドポイントを作成してコードを実行することです。作業する開発エンドポイントを設定することから始めることをお勧めします。詳しくは、「開発エンドポイントのプロパティの表示 」を参照してください。

ステップ 1: Amazon S3 バケット内のデータをクロールする

  1. AWS Management Console にサインインし、AWS Glue コンソール (https://console.aws.amazon.com/glue/) を開きます。

  2. AWS Glue コンソールでのクローラーの使用 の手順に従って、s3://awsglue-datasets/examples/us-legislators/all データセットをクロールできる新しいクローラを、AWS Glue Data Catalog 内のデータベース legislators に作成します。サンプルデータは既に、このパブリックな Amazon S3 バケットに用意されています。

  3. 新しいクローラを実行し、legislators データベースを確認します。

    クローラは、次のメタデータテーブルを作成します。

    • persons_json

    • memberships_json

    • organizations_json

    • events_json

    • areas_json

    • countries_r_json

    これは、議員とその履歴を含むテーブルの半正規化されたテーブルの集合です。

ステップ 2: 開発エンドポイントノートブックに共通スクリプトを追加する

次の共通スクリプトを開発エンドポイントノートブックに貼り付けて、必要な AWS Glue ライブラリをインポートし、単一の GlueContext を設定します。

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job glueContext = GlueContext(SparkContext.getOrCreate())

ステップ 3: Data Catalog 内のデータでスキーマを確認する

次に、簡単な手順で AWS Glue Data Catalog から DynamicFrame を作成し、そのデータのスキーマを調べます。例えば、persons_json テーブルのスキーマを表示するには、ノートブックに以下を追加します。

persons = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="persons_json") print "Count: ", persons.count() persons.printSchema()

プリントコールの出力を以下に示します。

Count: 1961 root |-- family_name: string |-- name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- gender: string |-- image: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- note: string | | |-- name: string | | |-- lang: string |-- sort_name: string |-- images: array | |-- element: struct | | |-- url: string |-- given_name: string |-- birth_date: string |-- id: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- death_date: string

テーブル内の各人は、米国議会のメンバーです。

memberships_json テーブルのスキーマを表示するには、次のように入力します。

memberships = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="memberships_json") print "Count: ", memberships.count() memberships.printSchema()

出力は次のとおりです。

Count: 10439 root |-- area_id: string |-- on_behalf_of_id: string |-- organization_id: string |-- role: string |-- person_id: string |-- legislative_period_id: string |-- start_date: string |-- end_date: string

organizations は政党および上院と下院の 2 つの議会です。organizations_json テーブルのスキーマを表示するには、次のように入力します。

orgs = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="organizations_json") print "Count: ", orgs.count() orgs.printSchema()

出力は次のとおりです。

Count: 13 root |-- classification: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- image: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- id: string |-- name: string |-- seats: int |-- type: string

ステップ 4: データをフィルタリングする

次に、必要なフィールドのみを保持し、id の名前を org_id に変更します。データセットは、小さいため全体を表示することができます。

toDF()DynamicFrame を Apache Spark に変換するので、Apache Spark SQL に既に存在する DataFrame 変換を適用できます。

orgs = orgs.drop_fields(['other_names', 'identifiers']).rename_field( 'id', 'org_id').rename_field( 'name', 'org_name') orgs.toDF().show()

以下に出力を示します。

+--------------+--------------------+--------------------+--------------------+-----+-----------+--------------------+ |classification| org_id| org_name| links|seats| type| image| +--------------+--------------------+--------------------+--------------------+-----+-----------+--------------------+ | party| party/al| AL| null| null| null| null| | party| party/democrat| Democrat|[[website,http://...| null| null|https://upload.wi...| | party|party/democrat-li...| Democrat-Liberal|[[website,http://...| null| null| null| | legislature|d56acebe-8fdc-47b...|House of Represen...| null| 435|lower house| null| | party| party/independent| Independent| null| null| null| null| | party|party/new_progres...| New Progressive|[[website,http://...| null| null|https://upload.wi...| | party|party/popular_dem...| Popular Democrat|[[website,http://...| null| null| null| | party| party/republican| Republican|[[website,http://...| null| null|https://upload.wi...| | party|party/republican-...|Republican-Conser...|[[website,http://...| null| null| null| | party| party/democrat| Democrat|[[website,http://...| null| null|https://upload.wi...| | party| party/independent| Independent| null| null| null| null| | party| party/republican| Republican|[[website,http://...| null| null|https://upload.wi...| | legislature|8fa6c3d2-71dc-478...| Senate| null| 100|upper house| null| +--------------+--------------------+--------------------+--------------------+-----+-----------+--------------------+

memberships に表示される organizations を表示するには、次のように入力します。

memberships.select_fields(['organization_id']).toDF().distinct().show()

以下に出力を示します。

+--------------------+ | organization_id| +--------------------+ |d56acebe-8fdc-47b...| |8fa6c3d2-71dc-478...| +--------------------+

ステップ 5: すべてをまとめる

ここで AWS Glue を使用して、これらのリレーショナルテーブルを結合し、議員の memberships とそれに対応する organizations の 1 つの完全な履歴テーブルを作成します。

  1. まず、persons および membershipsid および person_id と結合します。

  2. 次に、結果を orgsorg_id および organization_id と結合します。

  3. 次に、冗長なフィールド person_id および org_id を削除します。

これらの操作はすべて、1 行の (拡張された) コードで行うことができます。

l_history = Join.apply(orgs, Join.apply(persons, memberships, 'id', 'person_id'), 'org_id', 'organization_id').drop_fields(['person_id', 'org_id']) print "Count: ", l_history.count() l_history.printSchema()

出力は次のとおりです。

Count: 10439 root |-- role: string |-- seats: int |-- org_name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- type: string |-- sort_name: string |-- area_id: string |-- images: array | |-- element: struct | | |-- url: string |-- on_behalf_of_id: string |-- other_names: array | |-- element: struct | | |-- note: string | | |-- name: string | | |-- lang: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- name: string |-- birth_date: string |-- organization_id: string |-- gender: string |-- classification: string |-- death_date: string |-- legislative_period_id: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- image: string |-- given_name: string |-- family_name: string |-- id: string |-- start_date: string |-- end_date: string

これで、分析に使用できる最終テーブルが作成されました。これは、分析のためのコンパクトで効率的な形式 (つまり Parquet) で記述することができ、AWS Glue、Amazon Athena、または Amazon Redshift Spectrum で SQL を実行できます。

次の呼び出しは、複数のファイルにわたってテーブルを書き込んで、後で解析するときに高速な並列読み込みをサポートします。

glueContext.write_dynamic_frame.from_options(frame = l_history, connection_type = "s3", connection_options = {"path": "s3://glue-sample-target/output-dir/legislator_history"}, format = "parquet")

すべての履歴データを単一のファイルにまとめるには、データフレームに変換し、再パーティション化して書き出す必要があります。

s_history = l_history.toDF().repartition(1) s_history.write.parquet('s3://glue-sample-target/output-dir/legislator_single')

または、上院と下院でそれを分けたい場合。

l_history.toDF().write.parquet('s3://glue-sample-target/output-dir/legislator_part', partitionBy=['org_name'])

ステップ 6: リレーショナルデータベース向けにデータを変換する

AWS Glue では半構造化データでも Amazon Redshift のようなリレーショナルデータベースに簡単に書き込むことができるのです。これにより、フレーム内のオブジェクトがどれほど複雑であっても、DynamicFrames をフラット化する変換 relationalize が提供されます。

この例の l_history DynamicFrame を使用して、ルートテーブル (hist_root) の名前と一時的な作業パスを relationalize に渡します。これにより、DynamicFrameCollection が返されます。その後、そのコレクション内の DynamicFrames の名前を一覧表示できます。

dfc = l_history.relationalize("hist_root", "s3://glue-sample-target/temp-dir/") dfc.keys()

keys 呼び出しの出力は次のとおりです。

[u'hist_root', u'hist_root_contact_details', u'hist_root_links', u'hist_root_other_names', u'hist_root_images', u'hist_root_identifiers']

Relationalize は、履歴テーブルを 6 つの新しいテーブルに分割します。DynamicFrame の各オブジェクトのレコードを含むルートテーブル、および配列の補助テーブルです。リレーショナルデータベースでの配列の処理は、特に配列が大きくなる場合に、最適ではないことがあります。配列を別のテーブルに分けることで、クエリの実行速度が大幅に向上します。

次に、contact_details を調べて分離を確認します。

l_history.select_fields('contact_details').printSchema() dfc.select('hist_root_contact_details').toDF().where("id = 10 or id = 75").orderBy(['id','index']).show()

show 呼び出しの出力は次のとおりです。

root |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 10| 0| fax| | | 10| 1| | 202-225-1314| | 10| 2| phone| | | 10| 3| | 202-225-3772| | 10| 4| twitter| | | 10| 5| | MikeRossUpdates| | 75| 0| fax| | | 75| 1| | 202-225-7856| | 75| 2| phone| | | 75| 3| | 202-225-2711| | 75| 4| twitter| | | 75| 5| | SenCapito| +---+-----+------------------------+-------------------------+

contact_details フィールドは、元の DynamicFrame の構造体の配列です。これらの配列の各要素は、index によってインデックス化された、補助テーブルの個別の行です。ここで id は、contact_details キーを使用する hist_root テーブルの外部キーです。

dfc.select('hist_root').toDF().where( "contact_details = 10 or contact_details = 75").select( ['id', 'given_name', 'family_name', 'contact_details']).show()

出力を次に示します。

+--------------------+----------+-----------+---------------+ | id|given_name|family_name|contact_details| +--------------------+----------+-----------+---------------+ |f4fc30ee-7b42-432...| Mike| Ross| 10| |e3c60f34-7d1b-4c0...| Shelley| Capito| 75| +--------------------+----------+-----------+---------------+

これらのコマンドでは、toDF() および where 式を使用して、表示する行をフィルタリングすることに注意してください。

したがって、hist_root テーブルを補助テーブルと結合すると、次のことが可能になります。

  • 配列をサポートせずにデータベースにデータをロードします。

  • SQL を使用して配列内の各項目にクエリを実行します。

AWS Glue 接続を使用して、Amazon Redshift の認証情報を安全に保存してアクセスします。独自の接続の作成方法については、「AWS Glue Data Catalog での接続の定義」を参照してください。

DynamicFrames を 1 つずつ切り替えて、接続にデータを書き込みできるようになりました。

for df_name in dfc.keys(): m_df = dfc.select(df_name) print "Writing to table: ", df_name glueContext.write_dynamic_frame.from_jdbc_conf(frame = m_df, connection settings here)

接続設定は、リレーショナルデータベースのタイプによって異なります。

結論

全体として、AWS Glue は非常に柔軟です。通常は書くのに数日かかるところを、数行のコードで達成できます。ソースからターゲットへの ETL スクリプトの全体は、GitHub の AWS Glue サンプル内の Python ファイル join_and_relationalize.py にあります。