Contoh kode: Bergabung dan menghubungkan data - AWS Glue

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Contoh kode: Bergabung dan menghubungkan data

Contoh ini menggunakan set data yang diunduh dari http://everypolitician.org/ ke bucket sample-dataset di Amazon Simple Storage Service (Amazon S3): s3://awsglue-datasets/examples/us-legislators/all. Set data tersebut berisi data dalam format JSON tentang legislator Amerika Serikat dan jabatan yang mereka telah jabat di DPR AS dan Senat, dan telah dimodifikasi sedikit dan dibuat tersedia dalam sebuah bucket Amazon S3 publik untuk tujuan tutorial ini.

Anda dapat menemukan kode sumber untuk contoh ini dalam join_and_relationalize.py file di repositori AWS Glue sampel di situs web. GitHub

Dengan menggunakan data ini, tutorial ini menunjukkan cara untuk melakukan hal berikut:

  • Gunakan AWS Glue crawler untuk mengklasifikasikan objek yang disimpan di bucket Amazon S3 publik dan simpan skema mereka ke dalam AWS Katalog Data Glue.

  • Memeriksa metadata tabel dan skema yang dihasilkan dari perayapan.

  • Menulis sebuah skrip extract, transform, and load (ETL) Python yang menggunakan metadata tersebut dalam Katalog Data untuk melakukan hal berikut:

    • Menggabungkan data dalam file sumber yang berbeda bersama-sama ke dalam sebuah tabel data tunggal (yakni, denormalisasi data).

    • Mem-filter tabel yang digabungkan ke dalam tabel terpisah berdasarkan jenis legislator.

    • Menuliskan data yang dihasilkan untuk memisahkan file Apache Parquet untuk analisis nanti.

Cara yang lebih disukai untuk men-debug Python PySpark atau skrip saat berjalan adalah dengan menggunakan Notebook AWS di Glue Studio. AWS

Langkah 1: Merayapi data di bucket Amazon S3

  1. Masuk ke AWS Management Console, dan buka AWS Glue konsol di https://console.aws.amazon.com/glue/.

  2. Mengikuti langkah-langkahnyaMengkonfigurasi crawler, buat crawler baru yang dapat merayapi s3://awsglue-datasets/examples/us-legislators/all kumpulan data ke dalam database bernama legislators dalam Katalog Data AWS Glue. Contoh data sudah ada di bucket Amazon S3 ini.

  3. Jalankan crawler baru tersebut, dan kemudian periksa basis data legislators.

    Crawler tersebut membuat tabel metadata berikut:

    • persons_json

    • memberships_json

    • organizations_json

    • events_json

    • areas_json

    • countries_r_json

    Ini adalah koleksi tabel semi-dinormalisasi yang berisi legislator dan riwayat mereka.

Langkah 2: Tambahkan skrip boilerplate ke notebook endpoint pengembangan

Rekatkan skrip boilerplate berikut ke notebook endpoint pengembangan untuk mengimpor AWS Glue pustaka yang Anda butuhkan, dan siapkan satu: GlueContext

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job glueContext = GlueContext(SparkContext.getOrCreate())

Langkah 3: Periksa skema dari data dalam Katalog Data

Selanjutnya, Anda dapat dengan mudah membuat check a DynamicFrame dari AWS Glue Data Catalog, dan memeriksa skema data. Misalnya, untuk melihat skema tabel persons_json, tambahkan hal berikut di notebook Anda:

persons = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="persons_json") print "Count: ", persons.count() persons.printSchema()

Berikut adalah output dari panggilan cetak:

Count: 1961 root |-- family_name: string |-- name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- gender: string |-- image: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- note: string | | |-- name: string | | |-- lang: string |-- sort_name: string |-- images: array | |-- element: struct | | |-- url: string |-- given_name: string |-- birth_date: string |-- id: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- death_date: string

Setiap orang dalam tabel adalah anggota dari beberapa badan kongres AS.

Untuk melihat skema tabel memberships_json, ketik berikut ini:

memberships = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="memberships_json") print "Count: ", memberships.count() memberships.printSchema()

Output adalah sebagai berikut:

Count: 10439 root |-- area_id: string |-- on_behalf_of_id: string |-- organization_id: string |-- role: string |-- person_id: string |-- legislative_period_id: string |-- start_date: string |-- end_date: string

organizations adalah partai dan dua majelis Kongres, Senat dan Dewan Perwakilan Rakyat. Untuk melihat skema tabel organizations_json, ketik berikut ini:

orgs = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="organizations_json") print "Count: ", orgs.count() orgs.printSchema()

Output adalah sebagai berikut:

Count: 13 root |-- classification: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- image: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- id: string |-- name: string |-- seats: int |-- type: string

Langkah 4: Filter data

Berikutnya, simpan hanya bidang yang Anda inginkan, dan ubah nama id menjadi org_id. Set data cukup kecil sehingga Anda dapat melihat semuanya.

toDF() mengkonversi DynamicFrame menjadi DataFrame Apache Spark, sehingga Anda dapat menerapkan transformasi yang sudah ada di Apache Spark SQL:

orgs = orgs.drop_fields(['other_names', 'identifiers']).rename_field( 'id', 'org_id').rename_field( 'name', 'org_name') orgs.toDF().show()

Berikut ini menunjukkan outputnya:

+--------------+--------------------+--------------------+--------------------+-----+-----------+--------------------+ |classification| org_id| org_name| links|seats| type| image| +--------------+--------------------+--------------------+--------------------+-----+-----------+--------------------+ | party| party/al| AL| null| null| null| null| | party| party/democrat| Democrat|[[website,http://...| null| null|https://upload.wi...| | party|party/democrat-li...| Democrat-Liberal|[[website,http://...| null| null| null| | legislature|d56acebe-8fdc-47b...|House of Represen...| null| 435|lower house| null| | party| party/independent| Independent| null| null| null| null| | party|party/new_progres...| New Progressive|[[website,http://...| null| null|https://upload.wi...| | party|party/popular_dem...| Popular Democrat|[[website,http://...| null| null| null| | party| party/republican| Republican|[[website,http://...| null| null|https://upload.wi...| | party|party/republican-...|Republican-Conser...|[[website,http://...| null| null| null| | party| party/democrat| Democrat|[[website,http://...| null| null|https://upload.wi...| | party| party/independent| Independent| null| null| null| null| | party| party/republican| Republican|[[website,http://...| null| null|https://upload.wi...| | legislature|8fa6c3d2-71dc-478...| Senate| null| 100|upper house| null| +--------------+--------------------+--------------------+--------------------+-----+-----------+--------------------+

Ketik berikut ini untuk melihat organizations yang muncul di memberships:

memberships.select_fields(['organization_id']).toDF().distinct().show()

Berikut ini menunjukkan outputnya:

+--------------------+ | organization_id| +--------------------+ |d56acebe-8fdc-47b...| |8fa6c3d2-71dc-478...| +--------------------+

Langkah 5: Satukan semuanya

Sekarang, gunakan AWS Glue untuk bergabung dengan tabel relasional ini dan buat satu tabel sejarah lengkap legislator memberships dan yang sesuai. organizations

  1. Pertama, gabungkan persons dan memberships pada id dan person_id.

  2. Selanjutnya, gabungkan hasilnya dengan orgs pada org_id dan organization_id.

  3. Kemudian, buang bidang-bidang redundan-nya, person_id dan org_id.

Anda dapat melakukan semua operasi ini dalam satu baris kode (diperpanjang):

l_history = Join.apply(orgs, Join.apply(persons, memberships, 'id', 'person_id'), 'org_id', 'organization_id').drop_fields(['person_id', 'org_id']) print "Count: ", l_history.count() l_history.printSchema()

Output adalah sebagai berikut:

Count: 10439 root |-- role: string |-- seats: int |-- org_name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- type: string |-- sort_name: string |-- area_id: string |-- images: array | |-- element: struct | | |-- url: string |-- on_behalf_of_id: string |-- other_names: array | |-- element: struct | | |-- note: string | | |-- name: string | | |-- lang: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- name: string |-- birth_date: string |-- organization_id: string |-- gender: string |-- classification: string |-- death_date: string |-- legislative_period_id: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- image: string |-- given_name: string |-- family_name: string |-- id: string |-- start_date: string |-- end_date: string

Anda sekarang memiliki tabel akhir yang dapat Anda gunakan untuk analisis. Anda dapat menuliskannya dalam format yang ringkas dan efisien untuk analitik — yaitu Parket — yang dapat Anda jalankan SQL, AWS Glue Amazon Athena, atau Amazon Redshift Spectrum.

Panggilan berikut menulis tabel di beberapa file untuk mendukung pembacaan paralel cepat ketika melakukan analisis kemudian:

glueContext.write_dynamic_frame.from_options(frame = l_history, connection_type = "s3", connection_options = {"path": "s3://glue-sample-target/output-dir/legislator_history"}, format = "parquet")

Untuk menempatkan semua data riwayat ke dalam satu file, Anda harus mengubahnya menjadi sebuah bingkai data, melakukan pemartisian ulang, dan menuliskannya:

s_history = l_history.toDF().repartition(1) s_history.write.parquet('s3://glue-sample-target/output-dir/legislator_single')

Atau, jika Anda ingin memisahkannya berdasarkan Senat dan DPR:

l_history.toDF().write.parquet('s3://glue-sample-target/output-dir/legislator_part', partitionBy=['org_name'])

Langkah 6: Mengubah data untuk database relasional

AWS Gluemembuatnya mudah untuk menulis data ke database relasional seperti Amazon Redshift, bahkan dengan data semi-terstruktur. Ia menawarkan transformasi relationalize, yang meratakan DynamicFrames tidak peduli seberapa kompleks objek dalam bingkai.

Dengan menggunakan l_history DynamicFrame dalam contoh ini, masukkan nama dari tabel akar (hist_root) dan path kerja sementara untuk relationalize. Ini akan mengembalikan DynamicFrameCollection. Anda kemudian dapat mencantumkan nama-nama DynamicFrames dalam koleksi itu:

dfc = l_history.relationalize("hist_root", "s3://glue-sample-target/temp-dir/") dfc.keys()

Berikut ini adalah hasil dari panggilan keys:

[u'hist_root', u'hist_root_contact_details', u'hist_root_links', u'hist_root_other_names', u'hist_root_images', u'hist_root_identifiers']

Relationalize memecahkan tabel riwayat ke dalam enam tabel baru: tabel akar yang berisi catatan untuk setiap objek dalam DynamicFrame, dan tabel tambahan untuk array. Array penanganan dalam basis data relasional sering bersifat suboptimal, terutama karena array tersebut menjadi besar. Memisahkan array ke dalam tabel yang berbeda membuat kueri lebih cepat.

Selanjutnya, lihat pemisahan dengan memeriksa contact_details:

l_history.select_fields('contact_details').printSchema() dfc.select('hist_root_contact_details').toDF().where("id = 10 or id = 75").orderBy(['id','index']).show()

Berikut ini adalah hasil dari panggilan show:

root |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 10| 0| fax| | | 10| 1| | 202-225-1314| | 10| 2| phone| | | 10| 3| | 202-225-3772| | 10| 4| twitter| | | 10| 5| | MikeRossUpdates| | 75| 0| fax| | | 75| 1| | 202-225-7856| | 75| 2| phone| | | 75| 3| | 202-225-2711| | 75| 4| twitter| | | 75| 5| | SenCapito| +---+-----+------------------------+-------------------------+

Bidang contact_details adalah sebuah array dari struct di DynamicFrame asli. Setiap elemen dari array tersebut adalah baris terpisah dalam tabel tambahan, yang diindeks berdasarkan index. id di sini adalah sebuah kunci asing dalam tabel hist_root dengan kunci contact_details:

dfc.select('hist_root').toDF().where( "contact_details = 10 or contact_details = 75").select( ['id', 'given_name', 'family_name', 'contact_details']).show()

Berikut hasilnya:

+--------------------+----------+-----------+---------------+ | id|given_name|family_name|contact_details| +--------------------+----------+-----------+---------------+ |f4fc30ee-7b42-432...| Mike| Ross| 10| |e3c60f34-7d1b-4c0...| Shelley| Capito| 75| +--------------------+----------+-----------+---------------+

Perhatikan dalam perintah ini bahwa toDF() dan kemudian ekspresi where digunakan untuk mem-filter baris yang ingin Anda lihat.

Jadi, dengan menggabungkan tabel hist_root dengan tabel tambahan akan memungkinkan Anda melakukan hal berikut:

  • Memuat data ke dalam basis data tanpa dukungan array.

  • Meng-kueri setiap item individu dalam array menggunakan SQL.

Simpan dan akses kredenal Amazon Redshift Anda dengan aman dengan koneksi. AWS Glue Untuk informasi tentang cara membuat koneksi Anda sendiri, lihat Menghubungkan ke data.

Anda sekarang siap untuk menulis data Anda ke koneksi dengan bersepeda melalui DynamicFrames satu per satu:

for df_name in dfc.keys(): m_df = dfc.select(df_name) print "Writing to table: ", df_name glueContext.write_dynamic_frame.from_jdbc_conf(frame = m_df, connection settings here)

Pengaturan koneksi Anda akan berbeda berdasarkan jenis database relasional Anda:

Kesimpulan

Secara keseluruhan, AWS Glue sangat fleksibel. Ia memungkinkan Anda mencapai, dalam beberapa baris kode, apa yang biasanya akan memerlukan waktu berhari-hari untuk ditulis. Anda dapat menemukan seluruh skrip source-to-target ETL dalam file Python dalam sampel join_and_relationalize.py pada. AWS Glue GitHub