Exemple de code : Données de jonction et de mise en relation - AWS Glue

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Exemple de code : Données de jonction et de mise en relation

Cet exemple utilise un ensemble de données qui a été téléchargé sur http://everypolitician.org/ dans le compartiment sample-dataset d'Amazon Simple Storage Service (Amazon S3) : s3://awsglue-datasets/examples/us-legislators/all. Cet ensemble de données contient des données au format JSON concernant les législateurs américains et les fonctions qu'ils ont occupées à la Chambre des représentants et au Sénat. Il a été légèrement modifié et il a été mis à la disposition des utilisateurs dans un compartiment Amazon S3 public aux fins de ce didacticiel.

Vous pouvez trouver le code source de cet exemple dans le fichier join_and_relationalize.py dans le référentiel d'exemples AWS Glue sur le site web de GitHub.

Grâce à ces données, ce didacticiel vous montre comment effectuer les opérations suivantes :

  • Utilisez un crawler AWS Glue pour classer les objets qui sont stockés dans un compartiment Amazon S3 public et enregistrer leurs schémas dans AWS Glue Data Catalog.

  • Examiner les métadonnées et les schémas de la table résultant de l'analyse ;

  • Écrire un script Extract-transform-load (ETL) en Python qui utilise les métadonnées dans Data Catalog pour effectuer les actions suivantes :

    • Joindre les données dans différents fichiers sources en une seule table de données (c'est-à-dire, dénormaliser les données).

    • Filtrer la table jointe en tables distinctes par type de législateur.

    • Écrire les données résultantes en vue de séparer les fichiers Apache Parquet à des fins d'analyse ultérieure.

La meilleure façon de déboguer des scripts Python ou PySpark pendant leur exécution sur AWS est d'utiliser Notebooks on AWS Glue Studio.

Étape 1 : analyser les données dans le compartiment Amazon S3

  1. Connectez-vous à la AWS Management Console et ouvrez la console AWS Glue à l'adresse https://console.aws.amazon.com/glue/.

  2. En procédant comme indiqué dans Utilisation des crawlers sur la console AWS Glue, créez un crawler qui peut analyser l'ensemble de données s3://awsglue-datasets/examples/us-legislators/all dans une base de données nommée legislators dans AWS Glue Data Catalog. Les exemples de données sont déjà dans ce compartiment Amazon S3 public.

  3. Exécutez le nouvel crawler, puis activez la base de données legislators.

    L'crawler crée les tables de métadonnées suivantes :

    • persons_json

    • memberships_json

    • organizations_json

    • events_json

    • areas_json

    • countries_r_json

    Il s'agit d'un ensemble de tables semi-normalisé contenant les législateurs et leurs carrières.

Étape 2 : Ajouter le script Boilerplate au bloc-notes de point de terminaison de développement

Collez le script Boilerplate suivant dans le bloc-notes du point de terminaison de développement pour importer les bibliothèques AWS Glue dont vous avez besoin, et configurez un GlueContext :

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job glueContext = GlueContext(SparkContext.getOrCreate())

Étape 3 : examiner les schémas à partir des données de Data Catalog

Ensuite, vous pouvez facilement créer, examiner un DynamicFrame à partir d'AWS Glue Data Catalog et examiner les schémas des données. Par exemple, pour afficher le schéma de la table persons_json, ajoutez les éléments suivants à votre bloc-notes :

persons = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="persons_json") print "Count: ", persons.count() persons.printSchema()

Voici le résultat des appels d'impression :

Count: 1961 root |-- family_name: string |-- name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- gender: string |-- image: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- note: string | | |-- name: string | | |-- lang: string |-- sort_name: string |-- images: array | |-- element: struct | | |-- url: string |-- given_name: string |-- birth_date: string |-- id: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- death_date: string

Chaque personne contenue dans la table est un membre de certains des organismes du Congrès des Etats-Unis.

Pour afficher le schéma de la table memberships_json, tapez ce qui suit :

memberships = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="memberships_json") print "Count: ", memberships.count() memberships.printSchema()

La sortie est la suivante :

Count: 10439 root |-- area_id: string |-- on_behalf_of_id: string |-- organization_id: string |-- role: string |-- person_id: string |-- legislative_period_id: string |-- start_date: string |-- end_date: string

Les organizations sont des partis et les deux chambres du Congrès, le Sénat et la Chambre des représentants. Pour afficher le schéma de la table organizations_json, tapez ce qui suit :

orgs = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="organizations_json") print "Count: ", orgs.count() orgs.printSchema()

La sortie est la suivante :

Count: 13 root |-- classification: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- image: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- id: string |-- name: string |-- seats: int |-- type: string

Étape 4 : Filtrer les données

Ensuite, conservez uniquement les champs de votre choix, et renommez id en org_id. L'ensemble de données est suffisamment petit pour que vous puissiez l'afficher entièrement.

toDF() convertit DynamicFrame en DataFrame Apache Spark, afin que vous puissiez appliquer les transformations qui existent déjà dans Apache Spark SQL :

orgs = orgs.drop_fields(['other_names', 'identifiers']).rename_field( 'id', 'org_id').rename_field( 'name', 'org_name') orgs.toDF().show()

Le résultat est présenté ci-dessous :

+--------------+--------------------+--------------------+--------------------+-----+-----------+--------------------+ |classification| org_id| org_name| links|seats| type| image| +--------------+--------------------+--------------------+--------------------+-----+-----------+--------------------+ | party| party/al| AL| null| null| null| null| | party| party/democrat| Democrat|[[website,http://...| null| null|https://upload.wi...| | party|party/democrat-li...| Democrat-Liberal|[[website,http://...| null| null| null| | legislature|d56acebe-8fdc-47b...|House of Represen...| null| 435|lower house| null| | party| party/independent| Independent| null| null| null| null| | party|party/new_progres...| New Progressive|[[website,http://...| null| null|https://upload.wi...| | party|party/popular_dem...| Popular Democrat|[[website,http://...| null| null| null| | party| party/republican| Republican|[[website,http://...| null| null|https://upload.wi...| | party|party/republican-...|Republican-Conser...|[[website,http://...| null| null| null| | party| party/democrat| Democrat|[[website,http://...| null| null|https://upload.wi...| | party| party/independent| Independent| null| null| null| null| | party| party/republican| Republican|[[website,http://...| null| null|https://upload.wi...| | legislature|8fa6c3d2-71dc-478...| Senate| null| 100|upper house| null| +--------------+--------------------+--------------------+--------------------+-----+-----------+--------------------+

Tapez la commande suivante pour afficher les organizations qui apparaissent dans memberships :

memberships.select_fields(['organization_id']).toDF().distinct().show()

Le résultat est présenté ci-dessous :

+--------------------+ | organization_id| +--------------------+ |d56acebe-8fdc-47b...| |8fa6c3d2-71dc-478...| +--------------------+

Étape 5 : Synthèse

À présent, utilisez AWS Glue pour joindre ces tables relationnelles et créer une table complète des carrières des memberships des législateurs et de leurs organizations correspondantes.

  1. Commencez par joindre persons et memberships à id et person_id.

  2. Ensuite, joignez le résultat avec orgs à org_id et organization_id.

  3. Ensuite, déplacez les champs redondants, person_id et org_id.

Vous pouvez effectuer toutes ces opérations en une seule ligne de code (étendu) :

l_history = Join.apply(orgs, Join.apply(persons, memberships, 'id', 'person_id'), 'org_id', 'organization_id').drop_fields(['person_id', 'org_id']) print "Count: ", l_history.count() l_history.printSchema()

La sortie est la suivante :

Count: 10439 root |-- role: string |-- seats: int |-- org_name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- type: string |-- sort_name: string |-- area_id: string |-- images: array | |-- element: struct | | |-- url: string |-- on_behalf_of_id: string |-- other_names: array | |-- element: struct | | |-- note: string | | |-- name: string | | |-- lang: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- name: string |-- birth_date: string |-- organization_id: string |-- gender: string |-- classification: string |-- death_date: string |-- legislative_period_id: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- image: string |-- given_name: string |-- family_name: string |-- id: string |-- start_date: string |-- end_date: string

Vous disposez à présent de la table finale que vous pouvez utiliser pour l'analyse. Vous pouvez l'écrire dans un format compact et efficace pour les analyses, notamment Parquet, sur lesquelles vous pouvez exécuter SQL dans AWS Glue, Amazon Athena ou Amazon Redshift Spectrum.

L'appel suivant enregistre la table dans plusieurs fichiers afin de prendre en charge des lectures parallèles rapides lors de l'exécution d'une analyse ultérieure :

glueContext.write_dynamic_frame.from_options(frame = l_history, connection_type = "s3", connection_options = {"path": "s3://glue-sample-target/output-dir/legislator_history"}, format = "parquet")

Pour réunir toutes les données d'historique en un seul fichier, vous devez les convertir dans un cadre de données, les repartitionner et les écrire :

s_history = l_history.toDF().repartition(1) s_history.write.parquet('s3://glue-sample-target/output-dir/legislator_single')

Sinon, si vous souhaitez les distinguer en fonction du Sénat et de la Chambre des représentants :

l_history.toDF().write.parquet('s3://glue-sample-target/output-dir/legislator_part', partitionBy=['org_name'])

Étape 6 : Transformer les données pour les bases de données relationnelles

AWS Glue permet d'écrire les données dans des bases de données relationnelles comme Amazon Redshift, même avec des données semi-structurées. Ainsi, vous obtenez une relationalize de transformation, qui aplatit DynamicFrames, quelle que soit la complexité des objets du cadre.

Dans cet exemple, à l'aide de l_history DynamicFrame, transmettez le nom d'une table racine (hist_root) et un chemin de travail temporaire relationalize. Cela renvoie une DynamicFrameCollection. Vous pouvez ensuite répertorier les noms des DynamicFrames de cet ensemble :

dfc = l_history.relationalize("hist_root", "s3://glue-sample-target/temp-dir/") dfc.keys()

Voici la sortie de l'appel keys :

[u'hist_root', u'hist_root_contact_details', u'hist_root_links', u'hist_root_other_names', u'hist_root_images', u'hist_root_identifiers']

Relationalize a décomposé la table d'historique en six nouvelles tables : une table racine qui contient un enregistrement pour chaque objet dans le DynamicFrame et des tables auxiliaires pour les tableaux. La gestion de tableaux dans les bases de données relationnelles est souvent sous-optimale, en particulier lorsque ces tableaux deviennent volumineux. Décomposer les tableaux en différentes tables permet d'accélérer considérablement les requêtes.

Ensuite, consultez la décomposition en examinant contact_details :

l_history.select_fields('contact_details').printSchema() dfc.select('hist_root_contact_details').toDF().where("id = 10 or id = 75").orderBy(['id','index']).show()

Voici la sortie de l'appel show :

root |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 10| 0| fax| | | 10| 1| | 202-225-1314| | 10| 2| phone| | | 10| 3| | 202-225-3772| | 10| 4| twitter| | | 10| 5| | MikeRossUpdates| | 75| 0| fax| | | 75| 1| | 202-225-7856| | 75| 2| phone| | | 75| 3| | 202-225-2711| | 75| 4| twitter| | | 75| 5| | SenCapito| +---+-----+------------------------+-------------------------+

Le champ contact_details était un tableau de structures dans le DynamicFrame d'origine. Chaque élément de ces tableaux est une ligne distincte dans la table auxiliaire, indexée par index. L'id ici est une clé étrangère dans la table hist_root avec la clé contact_details :

dfc.select('hist_root').toDF().where( "contact_details = 10 or contact_details = 75").select( ['id', 'given_name', 'family_name', 'contact_details']).show()

En voici la sortie :

+--------------------+----------+-----------+---------------+ | id|given_name|family_name|contact_details| +--------------------+----------+-----------+---------------+ |f4fc30ee-7b42-432...| Mike| Ross| 10| |e3c60f34-7d1b-4c0...| Shelley| Capito| 75| +--------------------+----------+-----------+---------------+

Notez dans ces commandes que toDF(), puis une expression where sont utilisés pour filtrer les lignes à afficher.

Ainsi, la jonction de la table hist_root avec les tables auxiliaire vous permet d'effectuer les actions suivantes :

  • Charger des données dans les bases de données sans prise en charge par un tableau ;

  • Interroger chaque élément individuel dans un tableau à l'aide de SQL.

Stockez et accédez en toute sécurité à vos informations d'identification Amazon Redshift avec une connexion AWS Glue. Pour plus d'informations sur la création de votre propre connexion, consultez Connexion aux données.

Vous êtes maintenant prêt à écrire vos données dans une connexion en parcourant le DynamicFrames un par un :

for df_name in dfc.keys(): m_df = dfc.select(df_name) print "Writing to table: ", df_name glueContext.write_dynamic_frame.from_jdbc_conf(frame = m_df, connection settings here)

Les paramètres de connexion varient en fonction de votre type de base de données relationnelle :

Conclusion

En général, AWS Glue est très flexible. Il vous permet de réaliser, en quelques lignes de code, ce qui prendrait normalement plusieurs jours pour écrire. Pour connaître tous les scripts ETL source vers cible, consultez le fichier Python join_and_relationalize.py à la page Exemples AWS Glue de GitHub.