翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
コード例: データの結合と関係付け
この例では、http://everypolitician.org/sample-dataset
バケットにデータセットをダウンロードして使用します。s3://awsglue-datasets/examples/us-legislators/all
このデータセットには、米国議会議員や米国下院および上院議員の議席に関する JSON 形式のデータが含まれており、このチュートリアルの目的のため少し変更され、パブリック Amazon S3 バケットで利用可能になりました。
この例のソースコードは、GitHub ウェブサイトの join_and_relationalize.py
Glue サンプルリポジトリ
このデータを使用して、このチュートリアルでは以下のことを実行する方法を示します。
AWS Glue クローラを使用して、パブリックな Amazon S3 バケットに保存されているオブジェクトを分類し、それらのスキーマを AWS Glue Data Catalog に保存します。
クロールの結果のテーブルのメタデータとスキーマを調べます。
-
Data Catalog のメタデータを使用して Python の抽出、転送、およびロード (ETL) スクリプトを記述し、以下の操作を行います。
異なるソースファイル内のデータをまとめて単一のデータテーブルに結合します (つまり、データを非正規化します)。
議員のタイプ別に、結合テーブルを別のテーブルにフィルタリングします。
生成されたデータを後で分析するために Apache Parquet ファイルに分割して書き出します。
AWS で実行中に Python または PySpark スクリプトをデバッグするための推奨方法は、AWS Glue Studio のノートブックを使用することです。
ステップ 1: Amazon S3 バケット内のデータをクロールする
-
AWS Management Console にサインインし、AWS Glue コンソール (https://console.aws.amazon.com/glue/
) を開きます。 -
クローラーの設定 の手順に従って、
s3://awsglue-datasets/examples/us-legislators/all
データセットをクロールできる新しいクローラを、AWS Glue Data Catalog 内のデータベースlegislators
に作成します。サンプルデータは既に、このパブリックな Amazon S3 バケットに用意されています。 -
新しいクローラを実行し、
legislators
データベースを確認します。クローラは、次のメタデータテーブルを作成します。
-
persons_json
-
memberships_json
-
organizations_json
-
events_json
-
areas_json
-
countries_r_json
これは、議員とその履歴を含むテーブルの半正規化されたテーブルの集合です。
-
ステップ 2: 開発エンドポイントノートブックに共通スクリプトを追加する
次の共通スクリプトを開発エンドポイントノートブックに貼り付けて、必要な AWS Glue ライブラリをインポートし、単一の GlueContext
を設定します。
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job glueContext = GlueContext(SparkContext.getOrCreate())
ステップ 3: Data Catalog 内のデータでスキーマを確認する
次に、簡単な手順で AWS Glue Data Catalog から DynamicFrame を作成し、そのデータのスキーマを調べます。例えば、persons_json
テーブルのスキーマを表示するには、ノートブックに以下を追加します。
persons = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="persons_json") print "Count: ", persons.count() persons.printSchema()
プリントコールの出力を以下に示します。
Count: 1961
root
|-- family_name: string
|-- name: string
|-- links: array
| |-- element: struct
| | |-- note: string
| | |-- url: string
|-- gender: string
|-- image: string
|-- identifiers: array
| |-- element: struct
| | |-- scheme: string
| | |-- identifier: string
|-- other_names: array
| |-- element: struct
| | |-- note: string
| | |-- name: string
| | |-- lang: string
|-- sort_name: string
|-- images: array
| |-- element: struct
| | |-- url: string
|-- given_name: string
|-- birth_date: string
|-- id: string
|-- contact_details: array
| |-- element: struct
| | |-- type: string
| | |-- value: string
|-- death_date: string
テーブル内の各人は、米国議会のメンバーです。
memberships_json
テーブルのスキーマを表示するには、次のように入力します。
memberships = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="memberships_json") print "Count: ", memberships.count() memberships.printSchema()
出力は次のとおりです。
Count: 10439
root
|-- area_id: string
|-- on_behalf_of_id: string
|-- organization_id: string
|-- role: string
|-- person_id: string
|-- legislative_period_id: string
|-- start_date: string
|-- end_date: string
organizations
は政党および上院と下院の 2 つの議会です。organizations_json
テーブルのスキーマを表示するには、次のように入力します。
orgs = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="organizations_json") print "Count: ", orgs.count() orgs.printSchema()
出力は次のとおりです。
Count: 13
root
|-- classification: string
|-- links: array
| |-- element: struct
| | |-- note: string
| | |-- url: string
|-- image: string
|-- identifiers: array
| |-- element: struct
| | |-- scheme: string
| | |-- identifier: string
|-- other_names: array
| |-- element: struct
| | |-- lang: string
| | |-- note: string
| | |-- name: string
|-- id: string
|-- name: string
|-- seats: int
|-- type: string
ステップ 4: データをフィルタリングする
次に、必要なフィールドのみを保持し、id
の名前を org_id
に変更します。データセットは、小さいため全体を表示することができます。
toDF()
は DynamicFrame
を Apache Spark に変換するので、Apache Spark SQL に既に存在する DataFrame
変換を適用できます。
orgs = orgs.drop_fields(['other_names', 'identifiers']).rename_field( 'id', 'org_id').rename_field( 'name', 'org_name') orgs.toDF().show()
以下に出力を示します。
+--------------+--------------------+--------------------+--------------------+-----+-----------+--------------------+
|classification| org_id| org_name| links|seats| type| image|
+--------------+--------------------+--------------------+--------------------+-----+-----------+--------------------+
| party| party/al| AL| null| null| null| null|
| party| party/democrat| Democrat|[[website,http://...| null| null|https://upload.wi...|
| party|party/democrat-li...| Democrat-Liberal|[[website,http://...| null| null| null|
| legislature|d56acebe-8fdc-47b...|House of Represen...| null| 435|lower house| null|
| party| party/independent| Independent| null| null| null| null|
| party|party/new_progres...| New Progressive|[[website,http://...| null| null|https://upload.wi...|
| party|party/popular_dem...| Popular Democrat|[[website,http://...| null| null| null|
| party| party/republican| Republican|[[website,http://...| null| null|https://upload.wi...|
| party|party/republican-...|Republican-Conser...|[[website,http://...| null| null| null|
| party| party/democrat| Democrat|[[website,http://...| null| null|https://upload.wi...|
| party| party/independent| Independent| null| null| null| null|
| party| party/republican| Republican|[[website,http://...| null| null|https://upload.wi...|
| legislature|8fa6c3d2-71dc-478...| Senate| null| 100|upper house| null|
+--------------+--------------------+--------------------+--------------------+-----+-----------+--------------------+
memberships
に表示される organizations
を表示するには、次のように入力します。
memberships.select_fields(['organization_id']).toDF().distinct().show()
以下に出力を示します。
+--------------------+
| organization_id|
+--------------------+
|d56acebe-8fdc-47b...|
|8fa6c3d2-71dc-478...|
+--------------------+
ステップ 5: すべてをまとめる
ここで AWS Glue を使用して、これらのリレーショナルテーブルを結合し、議員の memberships
とそれに対応する organizations
の 1 つの完全な履歴テーブルを作成します。
-
まず、
persons
およびmemberships
をid
およびperson_id
と結合します。 -
次に、結果を
orgs
とorg_id
およびorganization_id
と結合します。 -
次に、冗長なフィールド
person_id
およびorg_id
を削除します。
これらの操作はすべて、1 行の (拡張された) コードで行うことができます。
l_history = Join.apply(orgs, Join.apply(persons, memberships, 'id', 'person_id'), 'org_id', 'organization_id').drop_fields(['person_id', 'org_id']) print "Count: ", l_history.count() l_history.printSchema()
出力は次のとおりです。
Count: 10439
root
|-- role: string
|-- seats: int
|-- org_name: string
|-- links: array
| |-- element: struct
| | |-- note: string
| | |-- url: string
|-- type: string
|-- sort_name: string
|-- area_id: string
|-- images: array
| |-- element: struct
| | |-- url: string
|-- on_behalf_of_id: string
|-- other_names: array
| |-- element: struct
| | |-- note: string
| | |-- name: string
| | |-- lang: string
|-- contact_details: array
| |-- element: struct
| | |-- type: string
| | |-- value: string
|-- name: string
|-- birth_date: string
|-- organization_id: string
|-- gender: string
|-- classification: string
|-- death_date: string
|-- legislative_period_id: string
|-- identifiers: array
| |-- element: struct
| | |-- scheme: string
| | |-- identifier: string
|-- image: string
|-- given_name: string
|-- family_name: string
|-- id: string
|-- start_date: string
|-- end_date: string
これで、分析に使用できる最終テーブルが作成されました。これは、分析のためのコンパクトで効率的な形式 (つまり Parquet) で記述することができ、AWS Glue、Amazon Athena、または Amazon Redshift Spectrum で SQL を実行できます。
次の呼び出しは、複数のファイルにわたってテーブルを書き込んで、後で解析するときに高速な並列読み込みをサポートします。
glueContext.write_dynamic_frame.from_options(frame = l_history, connection_type = "s3", connection_options = {"path": "s3://glue-sample-target/output-dir/legislator_history"}, format = "parquet")
すべての履歴データを単一のファイルにまとめるには、データフレームに変換し、再パーティション化して書き出す必要があります。
s_history = l_history.toDF().repartition(1) s_history.write.parquet('s3://glue-sample-target/output-dir/legislator_single')
または、上院と下院でそれを分けたい場合。
l_history.toDF().write.parquet('s3://glue-sample-target/output-dir/legislator_part', partitionBy=['org_name'])
ステップ 6: リレーショナルデータベース向けにデータを変換する
AWS Glue では半構造化データでも Amazon Redshift のようなリレーショナルデータベースに簡単に書き込むことができるのです。これにより、フレーム内のオブジェクトがどれほど複雑であっても、DynamicFrames
をフラット化する変換 relationalize
が提供されます。
この例の l_history
DynamicFrame
を使用して、ルートテーブル (hist_root
) の名前と一時的な作業パスを relationalize
に渡します。これにより、DynamicFrameCollection
が返されます。その後、そのコレクション内の DynamicFrames
の名前を一覧表示できます。
dfc = l_history.relationalize("hist_root", "s3://glue-sample-target/temp-dir/") dfc.keys()
keys
呼び出しの出力は次のとおりです。
[u'hist_root', u'hist_root_contact_details', u'hist_root_links', u'hist_root_other_names', u'hist_root_images', u'hist_root_identifiers']
Relationalize
は、履歴テーブルを 6 つの新しいテーブルに分割します。DynamicFrame
の各オブジェクトのレコードを含むルートテーブル、および配列の補助テーブルです。リレーショナルデータベースでの配列の処理は、特に配列が大きくなる場合に、最適ではないことがあります。配列を別のテーブルに分けることで、クエリの実行速度が大幅に向上します。
次に、contact_details
を調べて分離を確認します。
l_history.select_fields('contact_details').printSchema() dfc.select('hist_root_contact_details').toDF().where("id = 10 or id = 75").orderBy(['id','index']).show()
show
呼び出しの出力は次のとおりです。
root
|-- contact_details: array
| |-- element: struct
| | |-- type: string
| | |-- value: string
+---+-----+------------------------+-------------------------+
| id|index|contact_details.val.type|contact_details.val.value|
+---+-----+------------------------+-------------------------+
| 10| 0| fax| |
| 10| 1| | 202-225-1314|
| 10| 2| phone| |
| 10| 3| | 202-225-3772|
| 10| 4| twitter| |
| 10| 5| | MikeRossUpdates|
| 75| 0| fax| |
| 75| 1| | 202-225-7856|
| 75| 2| phone| |
| 75| 3| | 202-225-2711|
| 75| 4| twitter| |
| 75| 5| | SenCapito|
+---+-----+------------------------+-------------------------+
contact_details
フィールドは、元の DynamicFrame
の構造体の配列です。これらの配列の各要素は、index
によってインデックス化された、補助テーブルの個別の行です。ここで id
は、contact_details
キーを使用する hist_root
テーブルの外部キーです。
dfc.select('hist_root').toDF().where( "contact_details = 10 or contact_details = 75").select( ['id', 'given_name', 'family_name', 'contact_details']).show()
出力を次に示します。
+--------------------+----------+-----------+---------------+
| id|given_name|family_name|contact_details|
+--------------------+----------+-----------+---------------+
|f4fc30ee-7b42-432...| Mike| Ross| 10|
|e3c60f34-7d1b-4c0...| Shelley| Capito| 75|
+--------------------+----------+-----------+---------------+
これらのコマンドでは、toDF()
および where
式を使用して、表示する行をフィルタリングすることに注意してください。
したがって、hist_root
テーブルを補助テーブルと結合すると、次のことが可能になります。
配列をサポートせずにデータベースにデータをロードします。
SQL を使用して配列内の各項目にクエリを実行します。
AWS Glue 接続を使用して、Amazon Redshift の認証情報を安全に保存してアクセスします。独自の接続の作成方法については、「データへの接続」を参照してください。
DynamicFrames
を 1 つずつ切り替えて、接続にデータを書き込みできるようになりました。
for df_name in dfc.keys(): m_df = dfc.select(df_name) print "Writing to table: ", df_name glueContext.write_dynamic_frame.from_jdbc_conf(frame = m_df,
connection settings here
)
接続設定は、リレーショナルデータベースのタイプによって異なります。
-
Amazon Redshift への書き込み手順については、「Redshift 接続」を参照してください。
-
その他のデータベースについては、「AWS Glue for Spark での ETL の接続タイプとオプション」を参照してください。
結論
全体として、AWS Glue は非常に柔軟です。通常は書くのに数日かかるところを、数行のコードで達成できます。ソースからターゲットへの ETL スクリプトの全体は、GitHub の AWS Glue サンプルjoin_and_relationalize.py
にあります。