コード例: データの結合と関係付け - AWS Glue

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

コード例: データの結合と関係付け

この例では、http://everypolitician.org/ から、Amazon Simple Storage Service (Amazon S3) の sample-dataset バケットにデータセットをダウンロードして使用します。s3://awsglue-datasets/examples/us-legislators/allこのデータセットには、米国議会議員や米国下院および上院議員の議席に関する JSON 形式のデータが含まれており、このチュートリアルの目的のため少し変更され、パブリック Amazon S3 バケットで利用可能になりました。

この例のソースコードは、GitHub ウェブサイトの join_and_relationalize.py Glue サンプルリポジトリの AWS Glue ファイルにあります。

このデータを使用して、このチュートリアルでは以下のことを実行する方法を示します。

  • AWS Glue クローラを使用して、パブリックな Amazon S3 バケットに保存されているオブジェクトを分類し、それらのスキーマを AWS Glue Data Catalog に保存します。

  • クロールの結果のテーブルのメタデータとスキーマを調べます。

  • Data Catalog のメタデータを使用して Python の抽出、転送、およびロード (ETL) スクリプトを記述し、以下の操作を行います。

    • 異なるソースファイル内のデータをまとめて単一のデータテーブルに結合します (つまり、データを非正規化します)。

    • 議員のタイプ別に、結合テーブルを別のテーブルにフィルタリングします。

    • 生成されたデータを後で分析するために Apache Parquet ファイルに分割して書き出します。

AWS で実行中に Python または PySpark スクリプトをデバッグするための推奨方法は、AWS Glue Studio のノートブックを使用することです。

ステップ 1: Amazon S3 バケット内のデータをクロールする

  1. AWS Management Console にサインインし、AWS Glue コンソール (https://console.aws.amazon.com/glue/) を開きます。

  2. クローラーの設定 の手順に従って、s3://awsglue-datasets/examples/us-legislators/all データセットをクロールできる新しいクローラを、AWS Glue Data Catalog 内のデータベース legislators に作成します。サンプルデータは既に、このパブリックな Amazon S3 バケットに用意されています。

  3. 新しいクローラを実行し、legislators データベースを確認します。

    クローラは、次のメタデータテーブルを作成します。

    • persons_json

    • memberships_json

    • organizations_json

    • events_json

    • areas_json

    • countries_r_json

    これは、議員とその履歴を含むテーブルの半正規化されたテーブルの集合です。

ステップ 2: 開発エンドポイントノートブックに共通スクリプトを追加する

次の共通スクリプトを開発エンドポイントノートブックに貼り付けて、必要な AWS Glue ライブラリをインポートし、単一の GlueContext を設定します。

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job glueContext = GlueContext(SparkContext.getOrCreate())

ステップ 3: Data Catalog 内のデータでスキーマを確認する

次に、簡単な手順で AWS Glue Data Catalog から DynamicFrame を作成し、そのデータのスキーマを調べます。例えば、persons_json テーブルのスキーマを表示するには、ノートブックに以下を追加します。

persons = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="persons_json") print "Count: ", persons.count() persons.printSchema()

プリントコールの出力を以下に示します。

Count: 1961 root |-- family_name: string |-- name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- gender: string |-- image: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- note: string | | |-- name: string | | |-- lang: string |-- sort_name: string |-- images: array | |-- element: struct | | |-- url: string |-- given_name: string |-- birth_date: string |-- id: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- death_date: string

テーブル内の各人は、米国議会のメンバーです。

memberships_json テーブルのスキーマを表示するには、次のように入力します。

memberships = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="memberships_json") print "Count: ", memberships.count() memberships.printSchema()

出力は次のとおりです。

Count: 10439 root |-- area_id: string |-- on_behalf_of_id: string |-- organization_id: string |-- role: string |-- person_id: string |-- legislative_period_id: string |-- start_date: string |-- end_date: string

organizations は政党および上院と下院の 2 つの議会です。organizations_json テーブルのスキーマを表示するには、次のように入力します。

orgs = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="organizations_json") print "Count: ", orgs.count() orgs.printSchema()

出力は次のとおりです。

Count: 13 root |-- classification: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- image: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- id: string |-- name: string |-- seats: int |-- type: string

ステップ 4: データをフィルタリングする

次に、必要なフィールドのみを保持し、id の名前を org_id に変更します。データセットは、小さいため全体を表示することができます。

toDF()DynamicFrame を Apache Spark に変換するので、Apache Spark SQL に既に存在する DataFrame 変換を適用できます。

orgs = orgs.drop_fields(['other_names', 'identifiers']).rename_field( 'id', 'org_id').rename_field( 'name', 'org_name') orgs.toDF().show()

以下に出力を示します。

+--------------+--------------------+--------------------+--------------------+-----+-----------+--------------------+ |classification| org_id| org_name| links|seats| type| image| +--------------+--------------------+--------------------+--------------------+-----+-----------+--------------------+ | party| party/al| AL| null| null| null| null| | party| party/democrat| Democrat|[[website,http://...| null| null|https://upload.wi...| | party|party/democrat-li...| Democrat-Liberal|[[website,http://...| null| null| null| | legislature|d56acebe-8fdc-47b...|House of Represen...| null| 435|lower house| null| | party| party/independent| Independent| null| null| null| null| | party|party/new_progres...| New Progressive|[[website,http://...| null| null|https://upload.wi...| | party|party/popular_dem...| Popular Democrat|[[website,http://...| null| null| null| | party| party/republican| Republican|[[website,http://...| null| null|https://upload.wi...| | party|party/republican-...|Republican-Conser...|[[website,http://...| null| null| null| | party| party/democrat| Democrat|[[website,http://...| null| null|https://upload.wi...| | party| party/independent| Independent| null| null| null| null| | party| party/republican| Republican|[[website,http://...| null| null|https://upload.wi...| | legislature|8fa6c3d2-71dc-478...| Senate| null| 100|upper house| null| +--------------+--------------------+--------------------+--------------------+-----+-----------+--------------------+

memberships に表示される organizations を表示するには、次のように入力します。

memberships.select_fields(['organization_id']).toDF().distinct().show()

以下に出力を示します。

+--------------------+ | organization_id| +--------------------+ |d56acebe-8fdc-47b...| |8fa6c3d2-71dc-478...| +--------------------+

ステップ 5: すべてをまとめる

ここで AWS Glue を使用して、これらのリレーショナルテーブルを結合し、議員の memberships とそれに対応する organizations の 1 つの完全な履歴テーブルを作成します。

  1. まず、persons および membershipsid および person_id と結合します。

  2. 次に、結果を orgsorg_id および organization_id と結合します。

  3. 次に、冗長なフィールド person_id および org_id を削除します。

これらの操作はすべて、1 行の (拡張された) コードで行うことができます。

l_history = Join.apply(orgs, Join.apply(persons, memberships, 'id', 'person_id'), 'org_id', 'organization_id').drop_fields(['person_id', 'org_id']) print "Count: ", l_history.count() l_history.printSchema()

出力は次のとおりです。

Count: 10439 root |-- role: string |-- seats: int |-- org_name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- type: string |-- sort_name: string |-- area_id: string |-- images: array | |-- element: struct | | |-- url: string |-- on_behalf_of_id: string |-- other_names: array | |-- element: struct | | |-- note: string | | |-- name: string | | |-- lang: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- name: string |-- birth_date: string |-- organization_id: string |-- gender: string |-- classification: string |-- death_date: string |-- legislative_period_id: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- image: string |-- given_name: string |-- family_name: string |-- id: string |-- start_date: string |-- end_date: string

これで、分析に使用できる最終テーブルが作成されました。これは、分析のためのコンパクトで効率的な形式 (つまり Parquet) で記述することができ、AWS Glue、Amazon Athena、または Amazon Redshift Spectrum で SQL を実行できます。

次の呼び出しは、複数のファイルにわたってテーブルを書き込んで、後で解析するときに高速な並列読み込みをサポートします。

glueContext.write_dynamic_frame.from_options(frame = l_history, connection_type = "s3", connection_options = {"path": "s3://glue-sample-target/output-dir/legislator_history"}, format = "parquet")

すべての履歴データを単一のファイルにまとめるには、データフレームに変換し、再パーティション化して書き出す必要があります。

s_history = l_history.toDF().repartition(1) s_history.write.parquet('s3://glue-sample-target/output-dir/legislator_single')

または、上院と下院でそれを分けたい場合。

l_history.toDF().write.parquet('s3://glue-sample-target/output-dir/legislator_part', partitionBy=['org_name'])

ステップ 6: リレーショナルデータベース向けにデータを変換する

AWS Glue では半構造化データでも Amazon Redshift のようなリレーショナルデータベースに簡単に書き込むことができるのです。これにより、フレーム内のオブジェクトがどれほど複雑であっても、DynamicFrames をフラット化する変換 relationalize が提供されます。

この例の l_history DynamicFrame を使用して、ルートテーブル (hist_root) の名前と一時的な作業パスを relationalize に渡します。これにより、DynamicFrameCollection が返されます。その後、そのコレクション内の DynamicFrames の名前を一覧表示できます。

dfc = l_history.relationalize("hist_root", "s3://glue-sample-target/temp-dir/") dfc.keys()

keys 呼び出しの出力は次のとおりです。

[u'hist_root', u'hist_root_contact_details', u'hist_root_links', u'hist_root_other_names', u'hist_root_images', u'hist_root_identifiers']

Relationalize は、履歴テーブルを 6 つの新しいテーブルに分割します。DynamicFrame の各オブジェクトのレコードを含むルートテーブル、および配列の補助テーブルです。リレーショナルデータベースでの配列の処理は、特に配列が大きくなる場合に、最適ではないことがあります。配列を別のテーブルに分けることで、クエリの実行速度が大幅に向上します。

次に、contact_details を調べて分離を確認します。

l_history.select_fields('contact_details').printSchema() dfc.select('hist_root_contact_details').toDF().where("id = 10 or id = 75").orderBy(['id','index']).show()

show 呼び出しの出力は次のとおりです。

root |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 10| 0| fax| | | 10| 1| | 202-225-1314| | 10| 2| phone| | | 10| 3| | 202-225-3772| | 10| 4| twitter| | | 10| 5| | MikeRossUpdates| | 75| 0| fax| | | 75| 1| | 202-225-7856| | 75| 2| phone| | | 75| 3| | 202-225-2711| | 75| 4| twitter| | | 75| 5| | SenCapito| +---+-----+------------------------+-------------------------+

contact_details フィールドは、元の DynamicFrame の構造体の配列です。これらの配列の各要素は、index によってインデックス化された、補助テーブルの個別の行です。ここで id は、contact_details キーを使用する hist_root テーブルの外部キーです。

dfc.select('hist_root').toDF().where( "contact_details = 10 or contact_details = 75").select( ['id', 'given_name', 'family_name', 'contact_details']).show()

出力を次に示します。

+--------------------+----------+-----------+---------------+ | id|given_name|family_name|contact_details| +--------------------+----------+-----------+---------------+ |f4fc30ee-7b42-432...| Mike| Ross| 10| |e3c60f34-7d1b-4c0...| Shelley| Capito| 75| +--------------------+----------+-----------+---------------+

これらのコマンドでは、toDF() および where 式を使用して、表示する行をフィルタリングすることに注意してください。

したがって、hist_root テーブルを補助テーブルと結合すると、次のことが可能になります。

  • 配列をサポートせずにデータベースにデータをロードします。

  • SQL を使用して配列内の各項目にクエリを実行します。

AWS Glue 接続を使用して、Amazon Redshift の認証情報を安全に保存してアクセスします。独自の接続の作成方法については、「データへの接続」を参照してください。

DynamicFrames を 1 つずつ切り替えて、接続にデータを書き込みできるようになりました。

for df_name in dfc.keys(): m_df = dfc.select(df_name) print "Writing to table: ", df_name glueContext.write_dynamic_frame.from_jdbc_conf(frame = m_df, connection settings here)

接続設定は、リレーショナルデータベースのタイプによって異なります。

結論

全体として、AWS Glue は非常に柔軟です。通常は書くのに数日かかるところを、数行のコードで達成できます。ソースからターゲットへの ETL スクリプトの全体は、GitHub の AWS Glue サンプル内の Python ファイル join_and_relationalize.py にあります。