程式碼範例:加入和關聯化資料 - AWS Glue

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

程式碼範例:加入和關聯化資料

本範例使用從 http://everypolitician.org/ 下載至 Amazon Simple Storage Service (Amazon S3) 的 sample-dataset 儲存貯體的資料集:s3://awsglue-datasets/examples/us-legislators/all。資料集包含美國國會議員和他們在美國眾議院和參議院內座位的 JSON 格式的資料,已針對教學用途稍作修改,並透過公有 Amazon S3 儲存貯體提供。

您可以在網站範例儲存庫中的join_and_relationalize.py檔案中找到此範例的AWS Glue原始程式 GitHub 碼。

本指南將利用這項資料告訴您如何執行下列動作:

  • 使用AWS Glue爬蟲程式將存放在公用 Amazon S3 儲存貯體中的物件分類,並將其結構描述儲存到 AWS Glue 資料型錄中。

  • 檢查爬蟲程式所產生的資料表中繼資料和結構描述。

  • 編寫 Python 擷取、傳輸和載入 (ETL) 指令碼,使用 Data Catalog 中的中繼資料執行下列動作:

    • 將來自不同原始檔案的資料加入到單一資料表 (也就是將資料去正規化)。

    • 篩選加入的資料表,依國會議員類型放入不同的資料表。

    • 將產生的資料寫入到單獨的 Apache Parquet 檔案中,供以後分析之用。

在執行時偵錯 Python 或 PySpark 指令碼的建議方式 AWS 是在 AWS Glue Studio 上使用筆記型電腦

步驟 1:在 Amazon S3 儲存貯體中網路爬取資料

  1. 請登入 AWS Management Console,然後開啟AWS Glue主控台,網址為 https://console.aws.amazon.com/glue/

  2. 遵循中的步驟設定爬行者程式,建立新的爬行者程式,以將資料s3://awsglue-datasets/examples/us-legislators/all集編目到 AWS Glue 資料目錄legislators中名稱的資料庫。範例資料已放在這個公有 Amazon S3 儲存貯體中。

  3. 執行新的爬蟲程式,接著查看 legislators 資料庫。

    爬蟲程式建立下列中繼資料資料表:

    • persons_json

    • memberships_json

    • organizations_json

    • events_json

    • areas_json

    • countries_r_json

    這是一個包含國會議員和其歷史的半標準化資料表集合。

步驟 2:新增樣板指令碼至開發端點筆記本

將以下樣板指令碼貼至開發端點以匯入您需要的 AWS Glue 程式庫,並且設定單一的 GlueContext

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job glueContext = GlueContext(SparkContext.getOrCreate())

步驟 3:從資料目錄中的資料檢查結構描述

接下來,您可以輕鬆地 DynamicFrame 從 AWS Glue 資料型錄建立檢查,並檢查資料的結構描述。例如,若要查看 persons_json 資料表的結構描述,請將下列內容新增到筆記本:

persons = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="persons_json") print "Count: ", persons.count() persons.printSchema()

以下為列印呼叫的輸出:

Count: 1961 root |-- family_name: string |-- name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- gender: string |-- image: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- note: string | | |-- name: string | | |-- lang: string |-- sort_name: string |-- images: array | |-- element: struct | | |-- url: string |-- given_name: string |-- birth_date: string |-- id: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- death_date: string

資料表中的每個人都是美國國會的成員。

若要檢視 memberships_json 資料表的結構描述,請輸入如下命令:

memberships = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="memberships_json") print "Count: ", memberships.count() memberships.printSchema()

其輸出如下:

Count: 10439 root |-- area_id: string |-- on_behalf_of_id: string |-- organization_id: string |-- role: string |-- person_id: string |-- legislative_period_id: string |-- start_date: string |-- end_date: string

organizations 為政黨和參議院與眾議院這兩個議會殿堂。若要檢視 organizations_json 資料表的結構描述,請輸入如下命令:

orgs = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="organizations_json") print "Count: ", orgs.count() orgs.printSchema()

其輸出如下:

Count: 13 root |-- classification: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- image: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- id: string |-- name: string |-- seats: int |-- type: string

步驟 4:篩選資料

接著,保留需要的欄位,將 id 重新命名為 org_id。資料集很小,可以從整體來檢視。

toDF() 會將 DynamicFrame 轉換為 Apache Spark DataFrame,因此您可套用 Apache Spark SQL 中現有的轉換:

orgs = orgs.drop_fields(['other_names', 'identifiers']).rename_field( 'id', 'org_id').rename_field( 'name', 'org_name') orgs.toDF().show()

以下將顯示輸出:

+--------------+--------------------+--------------------+--------------------+-----+-----------+--------------------+ |classification| org_id| org_name| links|seats| type| image| +--------------+--------------------+--------------------+--------------------+-----+-----------+--------------------+ | party| party/al| AL| null| null| null| null| | party| party/democrat| Democrat|[[website,http://...| null| null|https://upload.wi...| | party|party/democrat-li...| Democrat-Liberal|[[website,http://...| null| null| null| | legislature|d56acebe-8fdc-47b...|House of Represen...| null| 435|lower house| null| | party| party/independent| Independent| null| null| null| null| | party|party/new_progres...| New Progressive|[[website,http://...| null| null|https://upload.wi...| | party|party/popular_dem...| Popular Democrat|[[website,http://...| null| null| null| | party| party/republican| Republican|[[website,http://...| null| null|https://upload.wi...| | party|party/republican-...|Republican-Conser...|[[website,http://...| null| null| null| | party| party/democrat| Democrat|[[website,http://...| null| null|https://upload.wi...| | party| party/independent| Independent| null| null| null| null| | party| party/republican| Republican|[[website,http://...| null| null|https://upload.wi...| | legislature|8fa6c3d2-71dc-478...| Senate| null| 100|upper house| null| +--------------+--------------------+--------------------+--------------------+-----+-----------+--------------------+

輸入以下以檢視出現在 membershipsorganizations

memberships.select_fields(['organization_id']).toDF().distinct().show()

以下將顯示輸出:

+--------------------+ | organization_id| +--------------------+ |d56acebe-8fdc-47b...| |8fa6c3d2-71dc-478...| +--------------------+

步驟 5:全部整合為一

現在,使用 AWS Glue 加入這些關聯式表格,並建立一份關於國會議員 memberships 及其對應的 organizations 的完整歷史記錄資料表。

  1. 首先,加入 personsmembershipsidperson_id

  2. 接著,將結果加入到 orgsorg_idorganization_id

  3. 然後,捨棄冗餘欄位 person_idorg_id

您可以在同一 (延伸) 指令碼行執行所有這些操作:

l_history = Join.apply(orgs, Join.apply(persons, memberships, 'id', 'person_id'), 'org_id', 'organization_id').drop_fields(['person_id', 'org_id']) print "Count: ", l_history.count() l_history.printSchema()

其輸出如下:

Count: 10439 root |-- role: string |-- seats: int |-- org_name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- type: string |-- sort_name: string |-- area_id: string |-- images: array | |-- element: struct | | |-- url: string |-- on_behalf_of_id: string |-- other_names: array | |-- element: struct | | |-- note: string | | |-- name: string | | |-- lang: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- name: string |-- birth_date: string |-- organization_id: string |-- gender: string |-- classification: string |-- death_date: string |-- legislative_period_id: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- image: string |-- given_name: string |-- family_name: string |-- id: string |-- start_date: string |-- end_date: string

您現在取得最終的資料表,可用於分析。您可以用精巧、有效率的格式編寫,以用於分析 (也就是 Parquet),在 AWS Glue、Amazon Athena 或 Amazon Redshift Spectrum 上執行 SQL。

以下呼叫將資料表編寫到多個檔案,在稍後執行分析時支援快速平行讀取:

glueContext.write_dynamic_frame.from_options(frame = l_history, connection_type = "s3", connection_options = {"path": "s3://glue-sample-target/output-dir/legislator_history"}, format = "parquet")

若要將所有歷史記錄資料合併成單一檔案,您必須將其轉換為資料框架、分割並編寫:

s_history = l_history.toDF().repartition(1) s_history.write.parquet('s3://glue-sample-target/output-dir/legislator_single')

或者,如果您希望將其分為參議院和眾議院:

l_history.toDF().write.parquet('s3://glue-sample-target/output-dir/legislator_part', partitionBy=['org_name'])

步驟 6:轉換關聯式資料庫的資料

AWS Glue 可讓您輕鬆地將資料寫入關聯式資料庫,例如 Amazon Redshift,即使是半結構化資料。其提供轉換 relationalize,可將 DynamicFrames 扁平化,無論框架中的物件多複雜。

使用本範例中的 l_history DynamicFrame,以根資料表 (hist_root) 的名稱和暫時任務路徑傳送至 relationalize。將傳回 DynamicFrameCollection。然後,您可以將 DynamicFrames 的名稱列在該集合中:

dfc = l_history.relationalize("hist_root", "s3://glue-sample-target/temp-dir/") dfc.keys()

以下為 keys 呼叫的輸出:

[u'hist_root', u'hist_root_contact_details', u'hist_root_links', u'hist_root_other_names', u'hist_root_images', u'hist_root_identifiers']

Relationalize 將歷史記錄資料表分成六個新資料表:根資料表包含在 DynamicFrame 中的各物件記錄,和陣列的輔助資料表。關聯式資料庫中的陣列處理通常為次最佳化,尤其在這些陣列變得龐大時。將陣列分成不同的資料表,可加快查詢速度。

接著,檢查 contact_details 以查看分隔:

l_history.select_fields('contact_details').printSchema() dfc.select('hist_root_contact_details').toDF().where("id = 10 or id = 75").orderBy(['id','index']).show()

以下為 show 呼叫的輸出:

root |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 10| 0| fax| | | 10| 1| | 202-225-1314| | 10| 2| phone| | | 10| 3| | 202-225-3772| | 10| 4| twitter| | | 10| 5| | MikeRossUpdates| | 75| 0| fax| | | 75| 1| | 202-225-7856| | 75| 2| phone| | | 75| 3| | 202-225-2711| | 75| 4| twitter| | | 75| 5| | SenCapito| +---+-----+------------------------+-------------------------+

contact_details 欄位為原始 DynamicFrame 中結構的陣列。這些陣列的每個元素都是輔助資料表中的單獨資料列,以 index 編製索引。此處的 idhist_root 資料表的外部金鑰,金鑰為 contact_details

dfc.select('hist_root').toDF().where( "contact_details = 10 or contact_details = 75").select( ['id', 'given_name', 'family_name', 'contact_details']).show()

以下為其輸出:

+--------------------+----------+-----------+---------------+ | id|given_name|family_name|contact_details| +--------------------+----------+-----------+---------------+ |f4fc30ee-7b42-432...| Mike| Ross| 10| |e3c60f34-7d1b-4c0...| Shelley| Capito| 75| +--------------------+----------+-----------+---------------+

請注意,這些命令將使用 toDF(),然後是 where 表達式,來篩選您想要查看的資料列。

因此,加入 hist_root 資料表與輔助資料表可執行下列動作:

  • 無需陣列支援便能將資料載入到資料庫。

  • 使用 SQL 查詢陣列中的每個個別項目。

使用 AWS Glue 連線以安全存放和存取您的 Amazon Redshift 憑證。有關如何建立自己的連線的詳細資訊,請參閱 連線至資料

您現已準備好透過一次循環一個 DynamicFrames 來將資料寫入連接器:

for df_name in dfc.keys(): m_df = dfc.select(df_name) print "Writing to table: ", df_name glueContext.write_dynamic_frame.from_jdbc_conf(frame = m_df, connection settings here)

您的連接器設定將因您的關聯式資料庫類型而異:

結論

整體而言,AWS Glue 極具彈性。它讓您用幾行程式碼便能完成通常要好幾天才能完成撰寫的任務。您可以在上 GitHub的AWS Glue範例中找到 Python 檔案join_and_relationalize.py中的整個 source-to-target ETL 指令碼。