代码示例:对数据进行联接和关系化 - AWS 连接词



本示例使用从 http://everypolitician.org/ 下载到 Amazon Simple Storage Service(Amazon S3)中的 sample-dataset 存储桶的数据集:s3://awsglue-datasets/examples/us-legislators/all。该数据集包含有关美国议员及其在美国众议院和参议院中占有的席位的数据(JSON 格式),并且已针对本教程进行了轻微修改且在公共 Amazon S3 存储桶中提供。

您可以在 GitHub 网站上 AWS Glue 示例存储库join_and_relationalize.py 文件中找到本示例的源代码。


  • 使用 AWS Glue 爬网程序对存储在公有 Amazon S3 存储桶中的对象进行分类并将其架构保存到 AWS Glue Data Catalog。

  • 检查生成自爬网的表元数据和架构。

  • 编写 Python 提取、转移和加载(ETL)脚本,该脚本使用数据目录中的元数据执行以下操作:

    • 将不同源文件中的数据加入到单个数据表中 (即,使数据非规范化)。

    • 按议员类型筛选已加入到单独的表中的表。

    • 将生成的数据写入单独的 Apache Parquet 文件以供日后分析。

在 AWS 上运行时,调试 Python 或 PySpark 脚本的首选方法是在 AWS Glue Studio 上使用笔记本

步骤 1:爬取 Amazon S3 存储桶中的数据

  1. 登录 AWS Management Console 并打开位于 https://console.aws.amazon.com/glue/ 的 AWS Glue 控制台。

  2. 按照 配置爬网程序 中的步骤操作,创建可将 s3://awsglue-datasets/examples/us-legislators/all 数据集网络爬取到 AWS Glue Data Catalog 中名为 legislators 的数据库的新爬网程序。示例数据已位于此公共 Amazon S3 存储桶中。

  3. 运行新爬网程序,然后检查 legislators 数据库。


    • persons_json

    • memberships_json

    • organizations_json

    • events_json

    • areas_json

    • countries_r_json


步骤 2:向开发终端节点笔记本中添加样板文件脚本

将以下样板文件脚本粘贴到开发终端节点笔记本中以导入所需的 AWS Glue 库,然后设置单个 GlueContext

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job glueContext = GlueContext(SparkContext.getOrCreate())

步骤 3:检查数据目录中数据的架构

接下来,您可以轻松地从 AWS Glue Data Catalog 检查 DynamicFrame,并检查数据的架构。例如,要查看 persons_json 表的架构,请在您的笔记本中添加以下内容:

persons = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="persons_json") print "Count: ", persons.count() persons.printSchema()


Count: 1961 root |-- family_name: string |-- name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- gender: string |-- image: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- note: string | | |-- name: string | | |-- lang: string |-- sort_name: string |-- images: array | |-- element: struct | | |-- url: string |-- given_name: string |-- birth_date: string |-- id: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- death_date: string


要查看 memberships_json 表的架构,请键入以下内容:

memberships = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="memberships_json") print "Count: ", memberships.count() memberships.printSchema()

您可以在一个 (扩展) 代码行中执行所有这些操作:

Count: 10439 root |-- area_id: string |-- on_behalf_of_id: string |-- organization_id: string |-- role: string |-- person_id: string |-- legislative_period_id: string |-- start_date: string |-- end_date: string

organizations 是美国国会的党派和两大议院,即参议院和众议院。要查看 organizations_json 表的架构,请键入以下内容:

orgs = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="organizations_json") print "Count: ", orgs.count() orgs.printSchema()

您可以在一个 (扩展) 代码行中执行所有这些操作:

Count: 13 root |-- classification: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- image: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- id: string |-- name: string |-- seats: int |-- type: string

步骤 4:筛选数据

接下来,仅保留您需要的字段,然后将 id 重命名为 org_id。该数据集足够小,方便您完整查看。

toDF()DynamicFrame 转换为 Apache Spark DataFrame,因此您可以应用已存在于 Apache Spark SQL 中的转换:

orgs = orgs.drop_fields(['other_names', 'identifiers']).rename_field( 'id', 'org_id').rename_field( 'name', 'org_name') orgs.toDF().show()


+--------------+--------------------+--------------------+--------------------+-----+-----------+--------------------+ |classification| org_id| org_name| links|seats| type| image| +--------------+--------------------+--------------------+--------------------+-----+-----------+--------------------+ | party| party/al| AL| null| null| null| null| | party| party/democrat| Democrat|[[website,http://...| null| null|https://upload.wi...| | party|party/democrat-li...| Democrat-Liberal|[[website,http://...| null| null| null| | legislature|d56acebe-8fdc-47b...|House of Represen...| null| 435|lower house| null| | party| party/independent| Independent| null| null| null| null| | party|party/new_progres...| New Progressive|[[website,http://...| null| null|https://upload.wi...| | party|party/popular_dem...| Popular Democrat|[[website,http://...| null| null| null| | party| party/republican| Republican|[[website,http://...| null| null|https://upload.wi...| | party|party/republican-...|Republican-Conser...|[[website,http://...| null| null| null| | party| party/democrat| Democrat|[[website,http://...| null| null|https://upload.wi...| | party| party/independent| Independent| null| null| null| null| | party| party/republican| Republican|[[website,http://...| null| null|https://upload.wi...| | legislature|8fa6c3d2-71dc-478...| Senate| null| 100|upper house| null| +--------------+--------------------+--------------------+--------------------+-----+-----------+--------------------+

键入以下内容以查看显示在 memberships 中的 organizations



+--------------------+ | organization_id| +--------------------+ |d56acebe-8fdc-47b...| |8fa6c3d2-71dc-478...| +--------------------+

步骤 5:整合内容

现在,使用 AWS Glue 联接这些关系表并创建一个包含议员 memberships 及其对应的 organizations 的完整历史记录表。

  1. 首先,联接 idperson_id 上的 personsmemberships

  2. 接下来,将结果与 org_idorganization_id 上的 orgs 联接。

  3. 然后,删除多余的字段 person_idorg_id

您可以在一个 (扩展) 代码行中执行所有这些操作:

l_history = Join.apply(orgs, Join.apply(persons, memberships, 'id', 'person_id'), 'org_id', 'organization_id').drop_fields(['person_id', 'org_id']) print "Count: ", l_history.count() l_history.printSchema()

您可以在一个 (扩展) 代码行中执行所有这些操作:

Count: 10439 root |-- role: string |-- seats: int |-- org_name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- type: string |-- sort_name: string |-- area_id: string |-- images: array | |-- element: struct | | |-- url: string |-- on_behalf_of_id: string |-- other_names: array | |-- element: struct | | |-- note: string | | |-- name: string | | |-- lang: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- name: string |-- birth_date: string |-- organization_id: string |-- gender: string |-- classification: string |-- death_date: string |-- legislative_period_id: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- image: string |-- given_name: string |-- family_name: string |-- id: string |-- start_date: string |-- end_date: string

您现在有了可用于分析的最终表。您可以采用紧凑且高效的格式写入该表以供分析(即 Parquet),该格式可让您在 AWS Glue、Amazon Athena 或 Amazon Redshift Spectrum 中运行 SQL。


glueContext.write_dynamic_frame.from_options(frame = l_history, connection_type = "s3", connection_options = {"path": "s3://glue-sample-target/output-dir/legislator_history"}, format = "parquet")


s_history = l_history.toDF().repartition(1) s_history.write.parquet('s3://glue-sample-target/output-dir/legislator_single')


l_history.toDF().write.parquet('s3://glue-sample-target/output-dir/legislator_part', partitionBy=['org_name'])

步骤 6:转换关系数据库的数据

利用 AWS Glue,您可以轻松将数据写入到关系数据库(如 Amazon Redshift),甚至对半结构化数据也是如此。它提供了一种转换 relationalize,这将展平 DynamicFrames,无论帧中的对象的复杂度可能如何。

使用本示例中的 l_history DynamicFrame,传入根表的名称 (hist_root) 和 relationalize 的临时工作路径。这将返回 DynamicFrameCollection。您随后可以列出该集合中 DynamicFrames 的名称:

dfc = l_history.relationalize("hist_root", "s3://glue-sample-target/temp-dir/") dfc.keys()

以下是 keys 调用的输出:

[u'hist_root', u'hist_root_contact_details', u'hist_root_links', u'hist_root_other_names', u'hist_root_images', u'hist_root_identifiers']

Relationalize 将该历史记录表拆分为 6 个新表:1 个包含 DynamicFrame 中每个对象的记录的根表以及 5 个用于数组的辅助表。关系数据库中的数组处理通常不够理想,尤其是在这些数组变大时。将这些数组分成不同的表会使查询进展得快得多。

接下来,通过检查 contact_details 来查看分隔:

l_history.select_fields('contact_details').printSchema() dfc.select('hist_root_contact_details').toDF().where("id = 10 or id = 75").orderBy(['id','index']).show()

以下是 show 调用的输出:

root |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 10| 0| fax| | | 10| 1| | 202-225-1314| | 10| 2| phone| | | 10| 3| | 202-225-3772| | 10| 4| twitter| | | 10| 5| | MikeRossUpdates| | 75| 0| fax| | | 75| 1| | 202-225-7856| | 75| 2| phone| | | 75| 3| | 202-225-2711| | 75| 4| twitter| | | 75| 5| | SenCapito| +---+-----+------------------------+-------------------------+

contact_details 字段是原始 DynamicFrame 中的一个结构数组。这些数组中的每个元素都是辅助表中的单独的行,通过 index 编制索引。此处的 id 是具有键 contact_detailshist_root 表的外键:

dfc.select('hist_root').toDF().where( "contact_details = 10 or contact_details = 75").select( ['id', 'given_name', 'family_name', 'contact_details']).show()


+--------------------+----------+-----------+---------------+ | id|given_name|family_name|contact_details| +--------------------+----------+-----------+---------------+ |f4fc30ee-7b42-432...| Mike| Ross| 10| |e3c60f34-7d1b-4c0...| Shelley| Capito| 75| +--------------------+----------+-----------+---------------+

请注意,这些命令中先后使用了 toDF()where 表示式来筛选您要查看的行。

因此,将 hist_root 表与辅助表联接可让您执行以下操作:

  • 将数据加载到不支持数组的数据库中。

  • 使用 SQL 查询数组中的每个单独的项目。

使用 AWS Glue 连接安全地存储和访问您的 Amazon Redshift 凭证。有关如何创建您自己的连接的信息,请参阅 连接到数据

现在,您已准备就绪,可以通过一次遍历一个 DynamicFrames 来将数据写入到连接:

for df_name in dfc.keys(): m_df = dfc.select(df_name) print "Writing to table: ", df_name glueContext.write_dynamic_frame.from_jdbc_conf(frame = m_df, connection settings here)



总的来说,AWS Glue 非常灵活。它让您只需几行代码即可完成通常需要编写几天才能实现的功能。您可以在 GitHub 上 join_and_relationalize.py 示例的 Python 文件 AWS Glue 中找到完整的源到目标 ETL 脚本。