코드 예: 데이터 조인 및 관계화 - AWS Glue

코드 예: 데이터 조인 및 관계화

이 예제에서는 http://everypolitician.org/에서 Amazon Simple Storage Service(Amazon S3)(s3://awsglue-datasets/examples/us-legislators/all)의 sample-dataset 버킷으로 다운로드한 데이터 집합을 사용합니다. 데이터 세트에는 미국 입법 기관과 미 상원 및 하원에서 확보한 의석에 대한 JSON 형식의 데이터가 포함되어 있으며, 이 자습서의 용도에 맞게 퍼블릭 Amazon S3 버킷에서 사용할 수 있도록 데이터 세트가 약간 수정되었습니다.

이 예제의 소스 코드는 GitHub 웹 사이트의 AWS Glue 샘플 리포지토리에 있는 join_and_relationalize.py 파일에서 찾을 수 있습니다.

이 데이터를 사용하여 이 자습서에서는 다음을 수행하는 방법을 보여줍니다.

  • AWS Glue 크롤러를 사용하여 퍼블릭 Amazon S3 버킷에 저장된 객체를 분류하고 AWS Glue Data Catalog에 객체의 스키마를 저장합니다.

  • 크롤의 결과인 테이블 메타데이터 및 스키마를 검토합니다.

  • Data Catalog의 메타데이터를 사용하여 Python 추출, 변환, 로드(ETL) 스크립트를 작성하고 다음을 실행합니다.

    • 하나의 데이터 테이블로 다른 원본 파일의 데이터를 모읍니다 (즉, 데이터를 비정규화합니다).

    • 제정자 유형에 따라 모아진 테이블을 개별 테이블로 필터링합니다.

    • 다음 분석을 위해 결과 데이터를 Apache Parquet 파일을 작성합니다.

Python 혹은 PySpark 스크립트를 디버깅하는 가장 쉬운 방법은 개발 엔드포인트를 생성하고 코드를 여기에서 실행합니다. 작업할 개발 엔드포인트를 설정하여 시작하기를 권장합니다. 자세한 정보는 개발 엔드포인트 속성 보기을 참조하십시오.

1단계: Amazon S3 버킷에서 데이터 크롤

  1. AWS Management Console에 로그인하여 https://console.aws.amazon.com/glue/에서 AWS Glue 콘솔을 엽니다.

  2. AWS Glue 콘솔에서 크롤러 작업의 단계에 따라 AWS Glue Data Catalog에서 legislators라는 데이터베이스로 s3://awsglue-datasets/examples/us-legislators/all 데이터 집합을 크롤링할 수 있는 새 크롤러를 만듭니다. 이 예제 데이터는 이 퍼블릭 Amazon S3 버킷에 이미 존재합니다.

  3. 새로운 크롤러를 실행한 다음 legislators 데이터베이스를 확인합니다.

    크롤러는 다음 메타데이터 테이블을 생성합니다.

    • persons_json

    • memberships_json

    • organizations_json

    • events_json

    • areas_json

    • countries_r_json

    제정자와 제정자 기록을 포함한 테이블 반정규화된 컬렉션입니다.

2단계: 개발 엔드포인트 노트북에 표준 문안 스크립트 추가

다음 표준 문안 스크립트를 개발 엔드포인트 노트북에 복사하고 필요한 AWS Glue 라이브러리로 들여와 단일 GlueContext를 설정합니다.

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job glueContext = GlueContext(SparkContext.getOrCreate())

3단계: Data Catalog의 데이터에서 스키마 검토

다음으로 AWS Glue Data Catalog에서 검사 DynamicFrame을 쉽게 생성하고 데이터의 스키마를 검사할 수 있습니다. 예를 들어, persons_json 테이블의 스키마를 보고 노트북에 다음을 추가합니다.

persons = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="persons_json") print "Count: ", persons.count() persons.printSchema()

프린트 호출에 따른 출력값입니다.

Count: 1961 root |-- family_name: string |-- name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- gender: string |-- image: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- note: string | | |-- name: string | | |-- lang: string |-- sort_name: string |-- images: array | |-- element: struct | | |-- url: string |-- given_name: string |-- birth_date: string |-- id: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- death_date: string

테이블에 있는 각자는 미국 의회 기관의 멤버입니다.

memberships_json 테이블의 스키마를 보고 다음을 입력합니다.

memberships = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="memberships_json") print "Count: ", memberships.count() memberships.printSchema()

출력값은 다음과 같습니다.

Count: 10439 root |-- area_id: string |-- on_behalf_of_id: string |-- organization_id: string |-- role: string |-- person_id: string |-- legislative_period_id: string |-- start_date: string |-- end_date: string

organizations는 정당이고 미국의 상원과 하원인 의회 양원입니다. organizations_json 테이블의 스키마를 보고 다음을 입력합니다.

orgs = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="organizations_json") print "Count: ", orgs.count() orgs.printSchema()

출력값은 다음과 같습니다.

Count: 13 root |-- classification: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- image: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- id: string |-- name: string |-- seats: int |-- type: string

4단계: 데이터 필터링

원하는 필드만 남기고 idorg_id로 이름을 바꿉니다. 데이터셋은 전체를 볼 수 있을만큼 작습니다.

toDF()DynamicFrame를 Apache Spark DataFrame로 변환하기 때문에 Apache Spark SQL에 이미 존재한 변환을 적용할 수 있습니다.

orgs = orgs.drop_fields(['other_names', 'identifiers']).rename_field( 'id', 'org_id').rename_field( 'name', 'org_name') orgs.toDF().show()

다음은 출력값을 보여줍니다.

+--------------+--------------------+--------------------+--------------------+-----+-----------+--------------------+ |classification| org_id| org_name| links|seats| type| image| +--------------+--------------------+--------------------+--------------------+-----+-----------+--------------------+ | party| party/al| AL| null| null| null| null| | party| party/democrat| Democrat|[[website,http://...| null| null|https://upload.wi...| | party|party/democrat-li...| Democrat-Liberal|[[website,http://...| null| null| null| | legislature|d56acebe-8fdc-47b...|House of Represen...| null| 435|lower house| null| | party| party/independent| Independent| null| null| null| null| | party|party/new_progres...| New Progressive|[[website,http://...| null| null|https://upload.wi...| | party|party/popular_dem...| Popular Democrat|[[website,http://...| null| null| null| | party| party/republican| Republican|[[website,http://...| null| null|https://upload.wi...| | party|party/republican-...|Republican-Conser...|[[website,http://...| null| null| null| | party| party/democrat| Democrat|[[website,http://...| null| null|https://upload.wi...| | party| party/independent| Independent| null| null| null| null| | party| party/republican| Republican|[[website,http://...| null| null|https://upload.wi...| | legislature|8fa6c3d2-71dc-478...| Senate| null| 100|upper house| null| +--------------+--------------------+--------------------+--------------------+-----+-----------+--------------------+

memberships에 나타나는 organizations를 보려면 다음을 입력합니다.

memberships.select_fields(['organization_id']).toDF().distinct().show()

다음은 출력값을 보여줍니다.

+--------------------+ | organization_id| +--------------------+ |d56acebe-8fdc-47b...| |8fa6c3d2-71dc-478...| +--------------------+

5단계: 데이터 조인

이제 AWS Glue를 사용하여 이런 관계형 테이블을 조인하고 제정자 memberships의 전체 기록 테이블과 제정자에 대응하는 organizations을 생성합니다.

  1. 우선, personsmembershipsidperson_id로 조인합니다.

  2. 다음으로 org_idorganization_id에서 orgs와 결과를 조인합니다.

  3. 그 후, 중복 필드, person_idorg_id을 드롭합니다.

이 모든 작업을 하나 줄의 (확장된) 코드로 실행할 수 있습니다.

l_history = Join.apply(orgs, Join.apply(persons, memberships, 'id', 'person_id'), 'org_id', 'organization_id').drop_fields(['person_id', 'org_id']) print "Count: ", l_history.count() l_history.printSchema()

출력값은 다음과 같습니다.

Count: 10439 root |-- role: string |-- seats: int |-- org_name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- type: string |-- sort_name: string |-- area_id: string |-- images: array | |-- element: struct | | |-- url: string |-- on_behalf_of_id: string |-- other_names: array | |-- element: struct | | |-- note: string | | |-- name: string | | |-- lang: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- name: string |-- birth_date: string |-- organization_id: string |-- gender: string |-- classification: string |-- death_date: string |-- legislative_period_id: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- image: string |-- given_name: string |-- family_name: string |-- id: string |-- start_date: string |-- end_date: string

이제, 분석을 위한 최종 테이블이 있습니다. AWS Glue, Amazon Athena 또는 Amazon Redshift Spectrum에서 SQL을 실행할 수 있는 압축되고 효율적인 분석 형식(즉, Parquet)으로 작성할 수 있습니다.

다음 호출은 여러 파일의 테이블을 작성하여 나중에 분석을 할 때 빠른 병렬 판독을 지원합니다.

glueContext.write_dynamic_frame.from_options(frame = l_history, connection_type = "s3", connection_options = {"path": "s3://glue-sample-target/output-dir/legislator_history"}, format = "parquet")

하나의 파일에 기록 데이터 모두를 넣으면 데이터 프레임으로 변환하고 다시 분할하며 작성할 수 있습니다.

s_history = l_history.toDF().repartition(1) s_history.write.parquet('s3://glue-sample-target/output-dir/legislator_single')

상원과 하원으로 나누고자 한다면

l_history.toDF().write.parquet('s3://glue-sample-target/output-dir/legislator_part', partitionBy=['org_name'])

6단계: 관계형 데이터베이스를 위한 데이터 변환

AWS Glue를 사용하면 반정형 데이터가 포함된, Amazon Redshift와 같은 관계형 데이터베이스에도 데이터를 쉽게 쓸 수 있습니다. 프레임의 객체가 아무리 복잡하더라도 DynamicFrames를 평면화하는 변환 relationalize를 제공합니다.

이 예제에 l_history DynamicFrame를 사용하여 루트 테이블의 이름(hist_root)과 임시 작업 경로를 relationalize로 넘깁니다. 이렇게 하면 DynamicFrameCollection가 반환됩니다. 컬렉션의 DynamicFrames의 이름을 열거할 수 있습니다.

dfc = l_history.relationalize("hist_root", "s3://glue-sample-target/temp-dir/") dfc.keys()

다음은 이 keys 호출의 출력입니다.

[u'hist_root', u'hist_root_contact_details', u'hist_root_links', u'hist_root_other_names', u'hist_root_images', u'hist_root_identifiers']

RelationalizeDynamicFrame의 각 객체의 기록을 포함한 루트 테이블과 배열의 보조 테이블을 포함한 6 개의 새로운 테이블로 기록 테이블을 나눕니다. 관계형 데이터베이스에서 처리한 배열은 보통 차선책일 수 있습니다. 특히 배열이 커지면 그렇습니다. 배열을 다른 테이블로 나누면 쿼리 과정이 빨라집니다.

이제, contact_details를 보고 나누는 과정에 대해 알아봅니다.

l_history.select_fields('contact_details').printSchema() dfc.select('hist_root_contact_details').toDF().where("id = 10 or id = 75").orderBy(['id','index']).show()

다음은 이 show 호출의 출력입니다.

root |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 10| 0| fax| | | 10| 1| | 202-225-1314| | 10| 2| phone| | | 10| 3| | 202-225-3772| | 10| 4| twitter| | | 10| 5| | MikeRossUpdates| | 75| 0| fax| | | 75| 1| | 202-225-7856| | 75| 2| phone| | | 75| 3| | 202-225-2711| | 75| 4| twitter| | | 75| 5| | SenCapito| +---+-----+------------------------+-------------------------+

contact_details 필드는 기존 DynamicFrame의 구조 배열이었습니다. 이 배열에 있는 각 요소가 index에 따라 보조 테이블의 별도 행입니다. 여기서 idcontact_details 키가 있는 hist_root 테이블의 외래 키입니다.

dfc.select('hist_root').toDF().where( "contact_details = 10 or contact_details = 75").select( ['id', 'given_name', 'family_name', 'contact_details']).show()

출력값은 다음과 같습니다.

+--------------------+----------+-----------+---------------+ | id|given_name|family_name|contact_details| +--------------------+----------+-----------+---------------+ |f4fc30ee-7b42-432...| Mike| Ross| 10| |e3c60f34-7d1b-4c0...| Shelley| Capito| 75| +--------------------+----------+-----------+---------------+

이 명령어에는 toDF()where 순으로 선택하여 보고자 하는 열을 필터링합니다.

따라서 보조 테이블과 함께 hist_root을 조인하여 다음과 같은 작업을 수행할 수 있습니다.

  • 배열 지원 없이 데이터베이스로 데이터를 로드합니다.

  • SQL을 사용하여 배열에서 개별 항목을 쿼리합니다.

AWS Glue 연결을 사용하여 Amazon Redshift 자격 증명을 안전하게 저장하고 액세스합니다. 자체 연결을 만드는 방법에 대한 자세한 내용은 AWS Glue Data Catalog에서 연결 정의을 참조하십시오.

이제 DynamicFrames를 차례로 순환하여 데이터를 연결에 쓸 준비가 되었습니다.

for df_name in dfc.keys(): m_df = dfc.select(df_name) print "Writing to table: ", df_name glueContext.write_dynamic_frame.from_jdbc_conf(frame = m_df, connection settings here)

연결 설정은 관계형 데이터베이스 유형에 따라 달라집니다.

결론

전체적으로 AWS Glue는 유연성이 뛰어납니다. 보통 며칠을 걸려 작성해야 가능한 이 작업을 단 몇 줄의 코드로 얻을 수 있습니다. GitHub의 AWS Glue 샘플에 있는 Python 파일 join_and_relationalize.py에서 전체 소스-대상 ETL 스크립트를 찾을 수 있습니다.