DynamicFrame 수업 - AWS Glue

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

DynamicFrame 수업

Apache Spark의 주요 추상화 중 하나는 SparkSQL DataFrame이며, R 및 Pandas에서 찾아볼 수 있는 DataFrame 구문과 유사합니다. DataFrame은 테이블과 비슷한 기능적 스타일(맵/줄임/필터 등) 작업과 SQL 작업(선택, 계획, 집계)을 지원합니다.

DataFrames 는 광범위하게 사용되는 유용한 것이지만 추출, 변환, 로드(ETL) 작업 시 제한이 있습니다. 무엇보다도 데이터가 로딩되기 전에 스키마를 명시해야 합니다. 첫 번째는 스키마를 추론하고 두 번째는 데이터를 로드하도록 두 개를 데이터에 통과시켜 SparkSQL이 이를 해결합니다. 하지만 이러한 추론은 제한적이며 복잡한 데이터의 현실을 해결해주지 않습니다. 예를 들어, 동일한 필드는 다른 기록에서 다른 유형을 가져야 합니다. 아파치 스파크는 간혹 실행을 포기하고 기존 필드 텍스트를 사용하여 string으로써 유형을 보고합니다. 이는 올바르지 않을 수 있고 스키마 차이를 해결할 수 있는 좀 더 확실한 관리법을 알고 싶을 겁니다. 라지 데이터세트의 경우, 추가로 소스 데이터를 통과하는 것이 매우 비쌀 수 있습니다.

이러한 한계를 해결하기 위해 AWS Glue는 다음을 DynamicFrame 도입했습니다. DynamicFrame는 애초에 스키마가 필요없이 각 기록이 자기 설명적인 것을 제외하고는 DataFrame와 비슷합니다. 대신 필요한 on-the-fly 경우 스키마를 AWS Glue 계산하고 선택 (또는 통합) 유형을 사용하여 스키마 불일치를 명시적으로 인코딩합니다. 이런 불일치를 해결하여 데이터세트와 고정 스키마가 필요한 데이터 스토어를 호환하게 만들 수 있습니다.

비슷하게, DynamicRecordDynamicFrame내 논리적 기록입니다. 자기 설명적이고 고정 스키마를 준수하지 않은 데이터에 사용될 수 있다는 점을 제외하면 스파크 DataFrame의 열과 같습니다. AWS Glue with PySpark 를 사용할 때는 일반적으로 독립적으로 DynamicRecords 조작하지 않습니다. 대신, 해당 DynamicFrame을 통해 데이터 세트를 함께 변환합니다.

모든 스키마 불일치를 해결한 후 DynamicFrames로 그리고 DataFrames에서 변환할 수 있습니다.

 - 생성 -

__init__

__init__(jdf, glue_ctx, name)
  • jdf - Java Virtual Machine(JVM)의 데이터 프레임 참조입니다.

  • glue_ctxGlueContext 클래스 객체입니다.

  • name - 기본값이 빈 선택적 이름 문자열입니다.

fromDF

fromDF(dataframe, glue_ctx, name)

DataFrame 필드를 DynamicRecord 필드로 변환하여 DataFrameDynamicFrame로 변환합니다. 새로운 DynamicFrame을 반환합니다.

DynamicRecordDynamicFrame내 논리적 기록입니다. 자기 설명적이고 고정 스키마를 준수하지 않은 데이터에 사용될 수 있다는 점을 제외하면 스파크 DataFrame의 열과 비슷합니다.

이 함수는 DataFrame에서 이름이 중복된 열이 이미 해결된 것으로 예상합니다.

  • dataframe - Apache Spark SQL DataFrame으로 변환합니다(필수).

  • glue_ctx - 이 변환의 맥락을 명시하는 GlueContext 클래스 객체입니다(필수).

  • name— 결과 이름 DynamicFrame ( AWS Glue 3.0부터 선택 사항)

toDF

toDF(options)

DynamicRecordsDataFrame 필드로 변환하여 DynamicFrame을 Apache Spark DataFrame으로 변환합니다. 새로운 DataFrame을 반환합니다.

DynamicRecordDynamicFrame내 논리적 기록입니다. 자기 설명적이고 고정 스키마를 준수하지 않은 데이터에 사용될 수 있다는 점을 제외하면 스파크 DataFrame의 열과 비슷합니다.

  • options - 옵션의 목록입니다. ProjectCast 작업 유형을 선택한 경우 대상 유형을 지정합니다. 예는 다음과 같습니다.

    >>>toDF([ResolveOption("a.b.c", "KeepAsStruct")]) >>>toDF([ResolveOption("a.b.c", "Project", DoubleType())])

 - 정보 -

count

count( ) - 기본 DataFrame의 행 수를 반환합니다.

schema

schema( ) – 이 DynamicFrame의 스키마를 반환하거나, 가능하지 않는 경우 기본 DataFrame의 스키마를 반환합니다.

이 스키마를 구성하는 DynamicFrame 유형에 대한 자세한 내용은 PySpark 확장 유형 섹션을 참조하세요.

printSchema

printSchema( ) - 기본 DataFrame의 스키마를 인쇄합니다.

show

show(num_rows) - 기본 DataFrame으로부터 지정된 수의 행을 인쇄합니다.

repartition

repartition(numPartitions)numPartitions 파티션이 있는 새 DynamicFrame을 반환합니다.

coalesce

coalesce(numPartitions)numPartitions 파티션이 있는 새 DynamicFrame을 반환합니다.

 - 변형 -

apply_mapping

apply_mapping(mappings, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

DynamicFrame으로 매핑을 선언하고 지정한 필드에 해당 매핑이 적용된 새로운 DynamicFrame을 반환합니다. 지정되지 않은 필드는 새 DynamicFrame 필드에서 생략됩니다.

  • mappings - 매핑 튜플 목록(필수). 각각 (소스 열, 소스 유형, 대상 열, 대상 유형)으로 구성된 매핑 튜플 목록입니다.

    소스 열 이름에 점('.')이 있는 경우 백틱('``')으로 묶어야 합니다. 예를 들어 this.old.name(문자열)을 thisNewName에 매핑하려면 다음 튜플을 사용합니다.

    ("`this.old.name`", "string", "thisNewName", "string")
  • transformation_ctx - 고유 문자열을 통해 상태 정보를 확인합니다(선택 사항).

  • info - 이 변환에 따른 오류 보고 관련 문자열입니다(선택 사항).

  • stageThreshold - 오류가 발생되는 변환 중에 발생하는 오류 수(선택 사항). 기본값은 0이며, 이는 프로세스에 오류가 발생하지 않아야 함을 나타냅니다.

  • totalThreshold - 프로세스에서 오류가 발생해야 하는 이 변환을 포함하여 최대 발생한 오류 수입니다(선택 사항). 기본값은 0이며, 이는 프로세스에 오류가 발생하지 않아야 함을 나타냅니다.

예: apply_maping을 사용하여 필드 이름을 바꾸고 필드 유형을 변경합니다.

다음 코드 예제에서는 apply_mapping 메서드를 사용하여 선택한 필드의 이름을 바꾸고 필드 유형을 변경하는 방법을 보여 줍니다.

참고

이 예제에서 사용되는 데이터 세트에 액세스하려면 코드 예: 데이터 조인 및 관계화 항목을 참조하고 1단계: Amazon S3 버킷에서 데이터 크롤의 지침을 따르세요.

# Example: Use apply_mapping to reshape source data into # the desired column names and types as a new DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Create a DynamicFrame and view its schema persons = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="persons_json" ) print("Schema for the persons DynamicFrame:") persons.printSchema() # Select and rename fields, change field type print("Schema for the persons_mapped DynamicFrame, created with apply_mapping:") persons_mapped = persons.apply_mapping( [ ("family_name", "String", "last_name", "String"), ("name", "String", "first_name", "String"), ("birth_date", "String", "date_of_birth", "Date"), ] ) persons_mapped.printSchema()
Schema for the persons DynamicFrame: root |-- family_name: string |-- name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- gender: string |-- image: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- sort_name: string |-- images: array | |-- element: struct | | |-- url: string |-- given_name: string |-- birth_date: string |-- id: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- death_date: string Schema for the persons_mapped DynamicFrame, created with apply_mapping: root |-- last_name: string |-- first_name: string |-- date_of_birth: date

drop_fields

drop_fields(paths, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

FlatMap 클래스 변환을 불러들여 DynamicFrame에서 필드를 제거합니다. 특정 필드가 드롭된 새로운 DynamicFrame을 반환합니다.

  • paths – 문자열 목록입니다. 각각에는 드롭할 필드 노드에 대한 전체 경로가 포함되어 있습니다. 점 표기를 사용하여 중첩된 필드를 지정할 수 있습니다. 예를 들어 first 필드가 트리에서 name 필드의 자식 필드인 경우 해당 경로에 "name.first"를 지정합니다.

    필드 노드 이름에 . 리터럴이 있는 경우 이름을 백틱(`)으로 묶어야 합니다.

  • transformation_ctx - 고유 문자열을 통해 상태 정보를 확인합니다(선택 사항).

  • info - 이 변환에 따른 오류 보고 관련 문자열입니다(선택 사항).

  • stageThreshold - 오류가 발생되는 변환 중에 발생하는 오류 수(선택 사항). 기본값은 0이며, 이는 프로세스에 오류가 발생하지 않아야 함을 나타냅니다.

  • totalThreshold - 프로세스에서 오류가 발생해야 하는 이 변환을 포함하여 최대 발생한 오류 수입니다(선택 사항). 기본값은 0이며, 이는 프로세스에 오류가 발생하지 않아야 함을 나타냅니다.

예: drop_fields를 사용하여 DynamicFrame에서 필드를 제거합니다.

이 코드 예제에서는 drop_fields 메서드를 사용하여 DynamicFrame에서 선택한 최상위 및 중첩 필드를 제거합니다.

예제 데이터 세트

이 예제에서는 코드의 EXAMPLE-FRIENDS-DATA 테이블로 표시되는 다음 데이터 세트를 사용합니다.

{"name": "Sally", "age": 23, "location": {"state": "WY", "county": "Fremont"}, "friends": []} {"name": "Varun", "age": 34, "location": {"state": "NE", "county": "Douglas"}, "friends": [{"name": "Arjun", "age": 3}]} {"name": "George", "age": 52, "location": {"state": "NY"}, "friends": [{"name": "Fred"}, {"name": "Amy", "age": 15}]} {"name": "Haruki", "age": 21, "location": {"state": "AK", "county": "Denali"}} {"name": "Sheila", "age": 63, "friends": [{"name": "Nancy", "age": 22}]}

예제 코드

# Example: Use drop_fields to remove top-level and nested fields from a DynamicFrame. # Replace MY-EXAMPLE-DATABASE with your Glue Data Catalog database name. # Replace EXAMPLE-FRIENDS-DATA with your table name. from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Create a DynamicFrame from Glue Data Catalog glue_source_database = "MY-EXAMPLE-DATABASE" glue_source_table = "EXAMPLE-FRIENDS-DATA" friends = glueContext.create_dynamic_frame.from_catalog( database=glue_source_database, table_name=glue_source_table ) print("Schema for friends DynamicFrame before calling drop_fields:") friends.printSchema() # Remove location.county, remove friends.age, remove age friends = friends.drop_fields(paths=["age", "location.county", "friends.age"]) print("Schema for friends DynamicFrame after removing age, county, and friend age:") friends.printSchema()
Schema for friends DynamicFrame before calling drop_fields: root |-- name: string |-- age: int |-- location: struct | |-- state: string | |-- county: string |-- friends: array | |-- element: struct | | |-- name: string | | |-- age: int Schema for friends DynamicFrame after removing age, county, and friend age: root |-- name: string |-- location: struct | |-- state: string |-- friends: array | |-- element: struct | | |-- name: string

필터

filter(f, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

지정된 술어 함수 f를 만족하는 DynamicFrame이 입력된 DynamicRecords에 모두 포함된 새로운 DynamicFrame을 반환합니다.

  • f - DynamicFrame에 적용하는 조건자 함수입니다. 함수는 DynamicRecord를 논리로 받아들이며, DynamicRecord가 필터 요구 사항과 맞으면 True을 반환하고 아니면 False를 반환합니다(필수).

    DynamicRecordDynamicFrame내 논리적 기록입니다. 자기 설명적이고 고정 스키마를 준수하지 않은 데이터에 사용될 수 있다는 점을 제외하면 스파크 DataFrame의 열과 비슷합니다.

  • transformation_ctx - 고유 문자열을 통해 상태 정보를 확인합니다(선택 사항).

  • info - 이 변환에 따른 오류 보고 관련 문자열입니다(선택 사항).

  • stageThreshold - 오류가 발생되는 변환 중에 발생하는 오류 수(선택 사항). 기본값은 0이며, 이는 프로세스에 오류가 발생하지 않아야 함을 나타냅니다.

  • totalThreshold - 프로세스에서 오류가 발생해야 하는 이 변환을 포함하여 최대 발생한 오류 수입니다(선택 사항). 기본값은 0이며, 이는 프로세스에 오류가 발생하지 않아야 함을 나타냅니다.

예: 필터를 사용하여 필터링된 필드 선택 가져오기

이 예에서는 filter 메서드를 사용하여 다른 사람의 DynamicFrame 필드에 대한 필터링된 선택을 포함하는 새 DynamicFrame 항목을 만듭니다.

map 메서드와 같이 filter는 함수를 인수로 취하여 원본 DynamicFrame의 각 레코드에 적용됩니다. 이 함수는 레코드를 입력으로 받아 부울 값을 반환합니다. 반환 값이 true이면 레코드가 결과 DynamicFrame에 포함됩니다. 거짓이면 레코드가 제외됩니다.

참고

이 예제에서 사용되는 데이터 세트에 액세스하려면 코드 예제: ResolveChoice, Lambda 및 를 사용한 데이터 준비 ApplyMapping 항목을 참조하고 1단계: Amazon S3 버킷에서 데이터 크롤의 지침을 따르세요.

# Example: Use filter to create a new DynamicFrame # with a filtered selection of records from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Create DynamicFrame from Glue Data Catalog medicare = glueContext.create_dynamic_frame.from_options( "s3", { "paths": [ "s3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv" ] }, "csv", {"withHeader": True}, ) # Create filtered DynamicFrame with custom lambda # to filter records by Provider State and Provider City sac_or_mon = medicare.filter( f=lambda x: x["Provider State"] in ["CA", "AL"] and x["Provider City"] in ["SACRAMENTO", "MONTGOMERY"] ) # Compare record counts print("Unfiltered record count: ", medicare.count()) print("Filtered record count: ", sac_or_mon.count())
Unfiltered record count: 163065 Filtered record count: 564

join

join(paths1, paths2, frame2, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

다른 DynamicFrame로 균등 연결을 실행하고 결과인 DynamicFrame을 반환합니다.

  • paths1 - 조인할 이 프레임의 키 목록입니다.

  • paths2 - 조인할 다른 프레임의 키 목록입니다.

  • frame2 - 조인할 다른 DynamicFrame입니다.

  • transformation_ctx - 고유 문자열을 통해 상태 정보를 확인합니다(선택 사항).

  • info - 이 변환에 따른 오류 보고 관련 문자열입니다(선택 사항).

  • stageThreshold - 오류가 발생되는 변환 중에 발생하는 오류 수(선택 사항). 기본값은 0이며, 이는 프로세스에 오류가 발생하지 않아야 함을 나타냅니다.

  • totalThreshold - 프로세스에서 오류가 발생해야 하는 이 변환을 포함하여 최대 발생한 오류 수입니다(선택 사항). 기본값은 0이며, 이는 프로세스에 오류가 발생하지 않아야 함을 나타냅니다.

예: 조인을 사용하여 DynamicFrames 결합

이 예제에서는 join 메서드를 사용하여 3개를 DynamicFrames 조인합니다. AWS Glue는 사용자가 제공한 필드 키를 기반으로 조인을 수행합니다. 결과 DynamicFrame에는 지정된 키가 일치하는 두 원본 프레임의 행이 포함됩니다.

join 변환은 모든 필드를 그대로 유지한다는 점에 유의하세요. 즉 DynamicFrame, 일치하도록 지정한 필드가 중복되고 동일한 키를 포함하는 경우에도 결과에 표시됩니다. 이 예에서는 drop_fields를 사용하여 조인 후 이러한 중복 키를 제거합니다.

참고

이 예제에서 사용되는 데이터 세트에 액세스하려면 코드 예: 데이터 조인 및 관계화 항목을 참조하고 1단계: Amazon S3 버킷에서 데이터 크롤의 지침을 따르세요.

# Example: Use join to combine data from three DynamicFrames from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Load DynamicFrames from Glue Data Catalog persons = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="persons_json" ) memberships = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="memberships_json" ) orgs = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="organizations_json" ) print("Schema for the persons DynamicFrame:") persons.printSchema() print("Schema for the memberships DynamicFrame:") memberships.printSchema() print("Schema for the orgs DynamicFrame:") orgs.printSchema() # Join persons and memberships by ID persons_memberships = persons.join( paths1=["id"], paths2=["person_id"], frame2=memberships ) # Rename and drop fields from orgs # to prevent field name collisions with persons_memberships orgs = ( orgs.drop_fields(["other_names", "identifiers"]) .rename_field("id", "org_id") .rename_field("name", "org_name") ) # Create final join of all three DynamicFrames legislators_combined = orgs.join( paths1=["org_id"], paths2=["organization_id"], frame2=persons_memberships ).drop_fields(["person_id", "org_id"]) # Inspect the schema for the joined data print("Schema for the new legislators_combined DynamicFrame:") legislators_combined.printSchema()
Schema for the persons DynamicFrame: root |-- family_name: string |-- name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- gender: string |-- image: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- sort_name: string |-- images: array | |-- element: struct | | |-- url: string |-- given_name: string |-- birth_date: string |-- id: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- death_date: string Schema for the memberships DynamicFrame: root |-- area_id: string |-- on_behalf_of_id: string |-- organization_id: string |-- role: string |-- person_id: string |-- legislative_period_id: string |-- start_date: string |-- end_date: string Schema for the orgs DynamicFrame: root |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- id: string |-- classification: string |-- name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- image: string |-- seats: int |-- type: string Schema for the new legislators_combined DynamicFrame: root |-- role: string |-- seats: int |-- org_name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- type: string |-- sort_name: string |-- area_id: string |-- images: array | |-- element: struct | | |-- url: string |-- on_behalf_of_id: string |-- other_names: array | |-- element: struct | | |-- note: string | | |-- name: string | | |-- lang: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- name: string |-- birth_date: string |-- organization_id: string |-- gender: string |-- classification: string |-- legislative_period_id: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- image: string |-- given_name: string |-- start_date: string |-- family_name: string |-- id: string |-- death_date: string |-- end_date: string

map

map(f, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

특정 매핑 함수를 기존 DynamicFrame의 모든 기록에 적용한 결과인 새로운 DynamicFrame을 반환합니다.

  • f - DynamicFrame의 모든 레코드에 적용하는 매핑 함수입니다. 함수는 DynamicRecord를 인수로 받아들이고 새로운 DynamicRecord를 반환합니다(필수).

    DynamicRecordDynamicFrame내 논리적 기록입니다. 자기 설명적이고 고정 스키마를 준수하지 않은 데이터에 사용될 수 있다는 점을 제외하면 Apache Spark DataFrame의 열과 비슷합니다.

  • transformation_ctx - 고유 문자열을 통해 상태 정보를 확인합니다(선택 사항).

  • info - 변환에 따른 오류 관련 문자열입니다(선택 사항).

  • stageThreshold - 오류가 발생하기 전까지 변환에 따라 생길 수 있는 최대 오류 수입니다(선택 사항). 기본값은 0입니다.

  • totalThreshold - 오류가 진행되기 전까지 생길 수 있는 최대 전체 오류 수입니다(선택 사항). 기본값은 0입니다.

예: map을 사용하여 DynamicFrame의 모든 레코드에 함수를 적용합니다.

이 예제에서는 map 메서드를 사용하여 DynamicFrame의 모든 레코드에 기능을 적용하는 방법을 보여 줍니다. 특히 이 예에서는 여러 주소 필드를 단일 struct 유형으로 병합하기 위해 각 레코드에 MergeAddress라는 함수를 적용합니다.

참고

이 예제에서 사용되는 데이터 세트에 액세스하려면 코드 예제: ResolveChoice, Lambda 및 를 사용한 데이터 준비 ApplyMapping 항목을 참조하고 1단계: Amazon S3 버킷에서 데이터 크롤의 지침을 따르세요.

# Example: Use map to combine fields in all records # of a DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Create a DynamicFrame and view its schema medicare = glueContext.create_dynamic_frame.from_options( "s3", {"paths": ["s3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv"]}, "csv", {"withHeader": True}) print("Schema for medicare DynamicFrame:") medicare.printSchema() # Define a function to supply to the map transform # that merges address fields into a single field def MergeAddress(rec): rec["Address"] = {} rec["Address"]["Street"] = rec["Provider Street Address"] rec["Address"]["City"] = rec["Provider City"] rec["Address"]["State"] = rec["Provider State"] rec["Address"]["Zip.Code"] = rec["Provider Zip Code"] rec["Address"]["Array"] = [rec["Provider Street Address"], rec["Provider City"], rec["Provider State"], rec["Provider Zip Code"]] del rec["Provider Street Address"] del rec["Provider City"] del rec["Provider State"] del rec["Provider Zip Code"] return rec # Use map to apply MergeAddress to every record mapped_medicare = medicare.map(f = MergeAddress) print("Schema for mapped_medicare DynamicFrame:") mapped_medicare.printSchema()
Schema for medicare DynamicFrame: root |-- DRG Definition: string |-- Provider Id: string |-- Provider Name: string |-- Provider Street Address: string |-- Provider City: string |-- Provider State: string |-- Provider Zip Code: string |-- Hospital Referral Region Description: string |-- Total Discharges: string |-- Average Covered Charges: string |-- Average Total Payments: string |-- Average Medicare Payments: string Schema for mapped_medicare DynamicFrame: root |-- Average Total Payments: string |-- Average Covered Charges: string |-- DRG Definition: string |-- Average Medicare Payments: string |-- Hospital Referral Region Description: string |-- Address: struct | |-- Zip.Code: string | |-- City: string | |-- Array: array | | |-- element: string | |-- State: string | |-- Street: string |-- Provider Id: string |-- Total Discharges: string |-- Provider Name: string

mergeDynamicFrame

mergeDynamicFrame(stage_dynamic_frame, primary_keys, transformation_ctx = "", options = {}, info = "", stageThreshold = 0, totalThreshold = 0)

레코드를 식별하기 위해, 지정된 기본 키를 기반으로 이 DynamicFrame을 스테이징 DynamicFrame과 병합합니다. 중복 레코드(기본 키가 동일한 레코드)는 중복 제거되지 않습니다. 스테이징 프레임에 일치하는 레코드가 없으면 모든 레코드(중복 레코드 포함)가 소스에서 유지됩니다. 스테이징 프레임에 일치하는 레코드가 있으면 스테이징 프레임의 레코드가 AWS Glue의 소스에 있는 레코드를 덮어씁니다.

  • stage_dynamic_frame - 병합할 스테이징 DynamicFrame입니다.

  • primary_keys - 소스 및 스테이징 동적 프레임의 레코드와 일치시킬 기본 키 필드 목록입니다.

  • transformation_ctx - 현재 변환에 대한 메타데이터를 검색하는 데 사용되는 고유한 문자열입니다(선택 사항).

  • options - 이 변환에 대한 추가 정보를 제공하는 JSON 이름-값 페어 문자열입니다. 이 인수는 현재 사용되지 않습니다.

  • info - String입니다. 이 변환에 따른 오류와 관련된 문자열입니다.

  • stageThreshold - Long입니다. 오류가 발생되는 지정된 변환의 오류 수입니다.

  • totalThreshold - Long입니다. 오류가 발생되는 이 변환의 총 오류 수입니다.

이 메소드는 이 DynamicFrame을 스테이징 DynamicFrame과 병합하여 얻은 새 DynamicFrame을 반환합니다.

다음과 같은 경우 반환된 DynamicFrame에 레코드 A가 포함되어 있습니다.

  • A가 소스 프레임과 스테이징 프레임 모두에 있는 경우에는 스테이징 프레임의 A가 반환됩니다.

  • A가 소스 테이블에 있고 A.primaryKeysstagingDynamicFrame에 없는 경우에는 A는 스테이징 테이블에서 업데이트되지 않습니다.

소스 프레임과 스테이징 프레임이 동일한 스키마를 가질 필요는 없습니다.

예: 기본 키를 DynamicFrames 기반으로 두 개를 병합하는 mergeDynamicFrame 데 사용

다음 코드 예제는 기본 키 id를 기반으로 mergeDynamicFrame 메서드를 사용하여 DynamicFrame를 “스테이징” DynamicFrame과 병합하는 방법을 보여줍니다.

예제 데이터 세트

이 예제에서는 split_rows_collection로 호출된 DynamicFrameCollection에서 두 개의 DynamicFrames을 사용합니다. 다음은 split_rows_collection에서의 키 목록입니다.

dict_keys(['high', 'low'])

예제 코드

# Example: Use mergeDynamicFrame to merge DynamicFrames # based on a set of specified primary keys from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.transforms import SelectFromCollection # Inspect the original DynamicFrames frame_low = SelectFromCollection.apply(dfc=split_rows_collection, key="low") print("Inspect the DynamicFrame that contains rows where ID < 10") frame_low.toDF().show() frame_high = SelectFromCollection.apply(dfc=split_rows_collection, key="high") print("Inspect the DynamicFrame that contains rows where ID > 10") frame_high.toDF().show() # Merge the DynamicFrames based on the "id" primary key merged_high_low = frame_high.mergeDynamicFrame( stage_dynamic_frame=frame_low, primary_keys=["id"] ) # View the results where the ID is 1 or 20 print("Inspect the merged DynamicFrame that contains the combined rows") merged_high_low.toDF().where("id = 1 or id= 20").orderBy("id").show()
Inspect the DynamicFrame that contains rows where ID < 10 +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 1| 0| fax| 202-225-3307| | 1| 1| phone| 202-225-5731| | 2| 0| fax| 202-225-3307| | 2| 1| phone| 202-225-5731| | 3| 0| fax| 202-225-3307| | 3| 1| phone| 202-225-5731| | 4| 0| fax| 202-225-3307| | 4| 1| phone| 202-225-5731| | 5| 0| fax| 202-225-3307| | 5| 1| phone| 202-225-5731| | 6| 0| fax| 202-225-3307| | 6| 1| phone| 202-225-5731| | 7| 0| fax| 202-225-3307| | 7| 1| phone| 202-225-5731| | 8| 0| fax| 202-225-3307| | 8| 1| phone| 202-225-5731| | 9| 0| fax| 202-225-3307| | 9| 1| phone| 202-225-5731| | 10| 0| fax| 202-225-6328| | 10| 1| phone| 202-225-4576| +---+-----+------------------------+-------------------------+ only showing top 20 rows Inspect the DynamicFrame that contains rows where ID > 10 +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 11| 0| fax| 202-225-6328| | 11| 1| phone| 202-225-4576| | 11| 2| twitter| RepTrentFranks| | 12| 0| fax| 202-225-6328| | 12| 1| phone| 202-225-4576| | 12| 2| twitter| RepTrentFranks| | 13| 0| fax| 202-225-6328| | 13| 1| phone| 202-225-4576| | 13| 2| twitter| RepTrentFranks| | 14| 0| fax| 202-225-6328| | 14| 1| phone| 202-225-4576| | 14| 2| twitter| RepTrentFranks| | 15| 0| fax| 202-225-6328| | 15| 1| phone| 202-225-4576| | 15| 2| twitter| RepTrentFranks| | 16| 0| fax| 202-225-6328| | 16| 1| phone| 202-225-4576| | 16| 2| twitter| RepTrentFranks| | 17| 0| fax| 202-225-6328| | 17| 1| phone| 202-225-4576| +---+-----+------------------------+-------------------------+ only showing top 20 rows Inspect the merged DynamicFrame that contains the combined rows +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 1| 0| fax| 202-225-3307| | 1| 1| phone| 202-225-5731| | 20| 0| fax| 202-225-5604| | 20| 1| phone| 202-225-6536| | 20| 2| twitter| USRepLong| +---+-----+------------------------+-------------------------+

relationalize

relationalize(root_table_name, staging_path, options, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

DynamicFrame을 관계형 데이터베이스에 맞는 형식으로 변환합니다. DynamicFrame을 관계형으로 만들면 DynamoDB와 같은 NoSQL 환경에서 MySQL과 같은 관계형 데이터베이스로 데이터를 이동하려는 경우에 특히 유용합니다.

변환은 중첩된 열의 중첩을 해제하고 배열 열을 회전하여 프레임 목록을 생성합니다. 회전 배열 열은 중첩되지 않는 상태에서 생성된 조인 키를 사용하여 루트 테이블에 연결될 수 있습니다.

  • root_table_name - 루트 테이블의 스키마 이름입니다.

  • staging_path - 메소드가 CSV 포맷으로 회전 테이블의 파티션을 저장하는 경로입니다(선택 사항). 회전 테이블은 이 경로부터 다시 읽습니다.

  • options - 선택적 파라미터의 딕셔너리입니다.

  • transformation_ctx - 고유 문자열을 통해 상태 정보를 확인합니다(선택 사항).

  • info - 이 변환에 따른 오류 보고 관련 문자열입니다(선택 사항).

  • stageThreshold - 오류가 발생되는 변환 중에 발생하는 오류 수(선택 사항). 기본값은 0이며, 이는 프로세스에 오류가 발생하지 않아야 함을 나타냅니다.

  • totalThreshold - 프로세스에서 오류가 발생해야 하는 이 변환을 포함하여 최대 발생한 오류 수입니다(선택 사항). 기본값은 0이며, 이는 프로세스에 오류가 발생하지 않아야 함을 나타냅니다.

예: 관계형 만들기를 사용하여 DynamicFrame 내 중첩된 스키마를 플랫하게 만들기

이 코드 예제는 relationalize 메서드를 사용하여 중첩된 스키마를 관계형 데이터베이스에 맞는 형식으로 평면화합니다.

예제 데이터 세트

이 예제에서는 다음 스키마와 함께 legislators_combined를 호출한 DynamicFrame을 사용합니다. legislators_combinedlinks, images, 및 contact_details와 같은 여러 개의 중첩 필드를 가지고, relationalize 변환에 의해 평면화됩니다.

root |-- role: string |-- seats: int |-- org_name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- type: string |-- sort_name: string |-- area_id: string |-- images: array | |-- element: struct | | |-- url: string |-- on_behalf_of_id: string |-- other_names: array | |-- element: struct | | |-- note: string | | |-- name: string | | |-- lang: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- name: string |-- birth_date: string |-- organization_id: string |-- gender: string |-- classification: string |-- legislative_period_id: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- image: string |-- given_name: string |-- start_date: string |-- family_name: string |-- id: string |-- death_date: string |-- end_date: string

예제 코드

# Example: Use relationalize to flatten # a nested schema into a format that fits # into a relational database. # Replace DOC-EXAMPLE-S3-BUCKET/tmpDir with your own location. from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Apply relationalize and inspect new tables legislators_relationalized = legislators_combined.relationalize( "l_root", "s3://DOC-EXAMPLE-BUCKET/tmpDir" ) legislators_relationalized.keys() # Compare the schema of the contact_details # nested field to the new relationalized table that # represents it legislators_combined.select_fields("contact_details").printSchema() legislators_relationalized.select("l_root_contact_details").toDF().where( "id = 10 or id = 75" ).orderBy(["id", "index"]).show()

다음 출력을 통해 contact_details을 호출한 중첩 필드의 스키마를 relationalize 변환이 만든 테이블과 비교할 수 있습니다. 테이블 레코드는 id라는 외래 키와 배열의 위치를 나타내는 index 열을 사용하여 기본 테이블로 다시 연결됩니다.

dict_keys(['l_root', 'l_root_images', 'l_root_links', 'l_root_other_names', 'l_root_contact_details', 'l_root_identifiers']) root |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 10| 0| fax| 202-225-4160| | 10| 1| phone| 202-225-3436| | 75| 0| fax| 202-225-6791| | 75| 1| phone| 202-225-2861| | 75| 2| twitter| RepSamFarr| +---+-----+------------------------+-------------------------+

rename_field

rename_field(oldName, newName, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

DynamicFrame에서 필드 이름을 바꾸고 바꾼 필드로 새로운 DynamicFrame을 반환합니다.

  • oldName - 바꾸고자 하는 노드의 전체 경로입니다.

    기존 이름에 점이 있으면 그 주위로 억음 부호(`)를 하지 않는 한 RenameField는 실행되지 않습니다. 예를 들어 this.old.namethisNewName으로 바꾸려면 다음과 같이 rename_field를 불러올 수 있습니다.

    newDyF = oldDyF.rename_field("`this.old.name`", "thisNewName")
  • newName - 전체 경로로써 새로운 이름입니다.

  • transformation_ctx - 고유 문자열을 통해 상태 정보를 확인합니다(선택 사항).

  • info - 이 변환에 따른 오류 보고 관련 문자열입니다(선택 사항).

  • stageThreshold - 오류가 발생되는 변환 중에 발생하는 오류 수(선택 사항). 기본값은 0이며, 이는 프로세스에 오류가 발생하지 않아야 함을 나타냅니다.

  • totalThreshold - 프로세스에서 오류가 발생해야 하는 이 변환을 포함하여 최대 발생한 오류 수입니다(선택 사항). 기본값은 0이며, 이는 프로세스에 오류가 발생하지 않아야 함을 나타냅니다.

예: rename_field를 사용하여 DynamicFrame의 필드 이름을 변경합니다

이 코드 예제는 rename_field 메서드를 사용하여 DynamicFrame의 필드 이름을 변경합니다. 참고로 이 예제에서는 메서드 체인을 사용하여 여러 필드의 이름을 동시에 바꾸고 있습니다.

참고

이 예제에서 사용되는 데이터 세트에 액세스하려면 코드 예: 데이터 조인 및 관계화 항목을 참조하고 1단계: Amazon S3 버킷에서 데이터 크롤의 지침을 따르세요.

예제 코드

# Example: Use rename_field to rename fields # in a DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Inspect the original orgs schema orgs = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="organizations_json" ) print("Original orgs schema: ") orgs.printSchema() # Rename fields and view the new schema orgs = orgs.rename_field("id", "org_id").rename_field("name", "org_name") print("New orgs schema with renamed fields: ") orgs.printSchema()
Original orgs schema: root |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- id: string |-- classification: string |-- name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- image: string |-- seats: int |-- type: string New orgs schema with renamed fields: root |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- classification: string |-- org_id: string |-- org_name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- image: string |-- seats: int |-- type: string

resolveChoice

resolveChoice(specs = None, choice = "" , database = None , table_name = None , transformation_ctx="", info="", stageThreshold=0, totalThreshold=0, catalog_id = None)

DynamicFrame로 선택 유형을 해결하고 새로운 DynamicFrame을 반환합니다.

  • specs - 해결할 특정 모호성 목록이며, 각각은 (field_path, action)의 튜플 형식입니다.

    resolveChoice를 사용하는 방법은 두 가지입니다. 첫 번째는 specs 인수를 사용하여 특정 필드의 시퀀스와 이를 확인하는 방법을 지정하는 것입니다. resolveChoice의 다른 모드는 choice 인수를 사용하여 모든 ChoiceTypes에 대해 단 하나의 해결책을 지정합니다.

    specs의 값은 (field_path, action) 페어로 구성된 튜플로 지정됩니다. field_path 가치는 특정 모호한 요소를 확인하고 action 가치는 관련 해결 방안을 제안합니다. 가능한 작업은 다음과 같습니다.

    • cast:type - 모든 값을 지정된 유형으로 캐스트하는 시도를 합니다. 예를 들면 cast:int입니다.

    • make_cols - 각 고유 유형을 columnName_type이라는 이름을 가진 열로 변환합니다. 데이터를 평면화하여 잠재적 모호성을 해결합니다. 예를 들어, columnAint 혹은 string이면 해결 방안은 DynamicFrame에서 columnA_intcolumnA_string으로 된 두 열을 생산하는 것입니다.

    • make_struct – 데이터를 나타내도록 struct를 사용하여 잠재적 모호성을 해결합니다. 예를 들어, 열에서 데이터가 int 혹은 string이면 make_struct 작업은 결과적인 DynamicFrame의 구조 열을 생성합니다. 각 구조에는 intstring가 모두 포함됩니다.

    • project:type - 모든 데이터를 가능한 데이터 유형 중 하나로 투영하여 잠재적인 모호성을 해결합니다. 예를 들어, 열에서 데이터가 int 혹은 string이면 project:string 작업을 사용하면 모든 int 가치가 문자열로 변환된 결과인 DynamicFrame의 열을 생성합니다.

    만약 field_path가 배열을 확인하면, 빈 대괄호를 배열 이름 다음에 만들어 모호성을 피합니다. 예를 들어, 다음과 같은 구조화된 데이터와 작업한다고 가정합시다.

    "myList": [ { "price": 100.00 }, { "price": "$100.00" } ]

    field_path"myList[].price"로 설정하고, action"cast:double"로 설정하여 가격의 문자열 버전보다 숫자 버전을 선택할 수 있습니다.

    참고

    specschoice 파라미터 중 하나만 사용할 수 있습니다. specs 파라미터가 None이 아니면 choice 파라미터는 빈 문자열이어야 합니다. 반대로 choice 파라미터가 빈 문자열이 아니면, specs 파라미터는 None이어야 합니다.

  • choice- 모든 ChoiceTypes에 대해 단일 해상도를 지정합니다. 실행 시간 전에 ChoiceTypes의 완전한 목록을 모르는 경우에 사용할 수 있습니다. 이 인수는 specs에 대해 앞에서 나열한 작업 외에 다음 작업을 지원합니다.

    • match_catalog – 각 ChoiceType을 지정된 Data Catalog 테이블의 해당 유형에 캐스팅해봅니다.

  • database - match_catalog 작업에 사용할 Data Catalog 데이터베이스입니다.

  • table_name - match_catalog 작업에 사용할 Data Catalog 테이블입니다.

  • transformation_ctx - 고유 문자열을 통해 상태 정보를 확인합니다(선택 사항).

  • info - 이 변환에 따른 오류 보고 관련 문자열입니다(선택 사항).

  • stageThreshold - 오류가 발생되는 변환 중에 발생하는 오류 수(선택 사항). 기본값은 0이며, 이는 프로세스에 오류가 발생하지 않아야 함을 나타냅니다.

  • totalThreshold - 오류가 발생되는 변환을 포함하여 최대 발생하는 오류 수입니다(선택). 기본 값은 0이고 이는 프로세스에서 오류가 발생되지 않음을 나타냅니다.

  • catalog_id - 액세스 중인 Data Catalog의 카탈로그 ID(Data Catalog의 계정 ID)입니다. None(기본값)으로 설정하면 호출 계정의 카탈로그 ID를 사용합니다.

예: resolveChoice를 사용하여 여러 유형이 포함된 열을 처리합니다

이 코드 예제는 resolveChoice 메서드를 사용하여 여러 유형의 값이 포함된 DynamicFrame 열을 처리하는 방법을 지정합니다. 이 예제는 유형이 다른 열을 처리하는 두 가지 일반적인 방법을 보여줍니다.

  • 열을 단일 데이터 유형으로 보내버립니다.

  • 모든 유형을 별도의 열에 보관합니다.

예제 데이터 세트

참고

이 예제에서 사용되는 데이터 세트에 액세스하려면 코드 예제: ResolveChoice, Lambda 및 를 사용한 데이터 준비 ApplyMapping 항목을 참조하고 1단계: Amazon S3 버킷에서 데이터 크롤의 지침을 따르세요.

이 예제에서는 다음 스키마와 함께 medicare를 호출한 DynamicFrame을 사용합니다.

root |-- drg definition: string |-- provider id: choice | |-- long | |-- string |-- provider name: string |-- provider street address: string |-- provider city: string |-- provider state: string |-- provider zip code: long |-- hospital referral region description: string |-- total discharges: long |-- average covered charges: string |-- average total payments: string |-- average medicare payments: string

예제 코드

# Example: Use resolveChoice to handle # a column that contains multiple types from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Load the input data and inspect the "provider id" column medicare = glueContext.create_dynamic_frame.from_catalog( database="payments", table_name="medicare_hospital_provider_csv" ) print("Inspect the provider id column:") medicare.toDF().select("provider id").show() # Cast provider id to type long medicare_resolved_long = medicare.resolveChoice(specs=[("provider id", "cast:long")]) print("Schema after casting provider id to type long:") medicare_resolved_long.printSchema() medicare_resolved_long.toDF().select("provider id").show() # Create separate columns # for each provider id type medicare_resolved_cols = medicare.resolveChoice(choice="make_cols") print("Schema after creating separate columns for each type:") medicare_resolved_cols.printSchema() medicare_resolved_cols.toDF().select("provider id_long", "provider id_string").show()
Inspect the 'provider id' column: +-----------+ |provider id| +-----------+ | [10001,]| | [10005,]| | [10006,]| | [10011,]| | [10016,]| | [10023,]| | [10029,]| | [10033,]| | [10039,]| | [10040,]| | [10046,]| | [10055,]| | [10056,]| | [10078,]| | [10083,]| | [10085,]| | [10090,]| | [10092,]| | [10100,]| | [10103,]| +-----------+ only showing top 20 rows Schema after casting 'provider id' to type long: root |-- drg definition: string |-- provider id: long |-- provider name: string |-- provider street address: string |-- provider city: string |-- provider state: string |-- provider zip code: long |-- hospital referral region description: string |-- total discharges: long |-- average covered charges: string |-- average total payments: string |-- average medicare payments: string +-----------+ |provider id| +-----------+ | 10001| | 10005| | 10006| | 10011| | 10016| | 10023| | 10029| | 10033| | 10039| | 10040| | 10046| | 10055| | 10056| | 10078| | 10083| | 10085| | 10090| | 10092| | 10100| | 10103| +-----------+ only showing top 20 rows Schema after creating separate columns for each type: root |-- drg definition: string |-- provider id_string: string |-- provider id_long: long |-- provider name: string |-- provider street address: string |-- provider city: string |-- provider state: string |-- provider zip code: long |-- hospital referral region description: string |-- total discharges: long |-- average covered charges: string |-- average total payments: string |-- average medicare payments: string +----------------+------------------+ |provider id_long|provider id_string| +----------------+------------------+ | 10001| null| | 10005| null| | 10006| null| | 10011| null| | 10016| null| | 10023| null| | 10029| null| | 10033| null| | 10039| null| | 10040| null| | 10046| null| | 10055| null| | 10056| null| | 10078| null| | 10083| null| | 10085| null| | 10090| null| | 10092| null| | 10100| null| | 10103| null| +----------------+------------------+ only showing top 20 rows

select_fields

select_fields(paths, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

선택된 필드를 포함한 새로운 DynamicFrame을 반환합니다.

  • paths – 문자열 목록입니다. 각 문자열은 선택하려는 최상위 노드에 대한 경로입니다.

  • transformation_ctx - 고유 문자열을 통해 상태 정보를 확인합니다(선택 사항).

  • info - 이 변환에 따른 오류 보고 관련 문자열입니다(선택 사항).

  • stageThreshold - 오류가 발생되는 변환 중에 발생하는 오류 수(선택 사항). 기본값은 0이며, 이는 프로세스에 오류가 발생하지 않아야 함을 나타냅니다.

  • totalThreshold - 프로세스에서 오류가 발생해야 하는 이 변환을 포함하여 최대 발생한 오류 수입니다(선택 사항). 기본값은 0이며, 이는 프로세스에 오류가 발생하지 않아야 함을 나타냅니다.

예: select_fields를 사용하여 선택한 필드로 새 DynamicFrame 생성

다음 코드 예제에서는 select_fields 메서드를 사용하여 기존 DynamicFrame에서 선택한 필드 목록으로 새 DynamicFrame을 만드는 방법을 보여줍니다.

참고

이 예제에서 사용되는 데이터 세트에 액세스하려면 코드 예: 데이터 조인 및 관계화 항목을 참조하고 1단계: Amazon S3 버킷에서 데이터 크롤의 지침을 따르세요.

# Example: Use select_fields to select specific fields from a DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Create a DynamicFrame and view its schema persons = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="persons_json" ) print("Schema for the persons DynamicFrame:") persons.printSchema() # Create a new DynamicFrame with chosen fields names = persons.select_fields(paths=["family_name", "given_name"]) print("Schema for the names DynamicFrame, created with select_fields:") names.printSchema() names.toDF().show()
Schema for the persons DynamicFrame: root |-- family_name: string |-- name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- gender: string |-- image: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- sort_name: string |-- images: array | |-- element: struct | | |-- url: string |-- given_name: string |-- birth_date: string |-- id: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- death_date: string Schema for the names DynamicFrame: root |-- family_name: string |-- given_name: string +-----------+----------+ |family_name|given_name| +-----------+----------+ | Collins| Michael| | Huizenga| Bill| | Clawson| Curtis| | Solomon| Gerald| | Rigell| Edward| | Crapo| Michael| | Hutto| Earl| | Ertel| Allen| | Minish| Joseph| | Andrews| Robert| | Walden| Greg| | Kazen| Abraham| | Turner| Michael| | Kolbe| James| | Lowenthal| Alan| | Capuano| Michael| | Schrader| Kurt| | Nadler| Jerrold| | Graves| Tom| | McMillan| John| +-----------+----------+ only showing top 20 rows

simplify_ddb_json

simplify_ddb_json(): DynamicFrame

특히 DynamoDB JSON 구조에 DynamicFrame 있는 a의 중첩된 열을 단순화하고 새로 단순화된 열을 반환합니다. DynamicFrame 목록 유형에 여러 유형이나 맵 유형이 있는 경우 목록의 요소가 단순화되지 않습니다. 참고로 이는 일반 unnest 변환과 다르게 동작하는 특정 유형의 변환이며 데이터가 이미 DynamoDB JSON 구조에 있어야 합니다. 자세한 내용은 DynamoDB JSON을 참조하세요.

예를 들어, DynamoDB JSON 구조를 사용하여 내보내기를 읽는 스키마는 다음과 같을 수 있습니다.

root |-- Item: struct | |-- parentMap: struct | | |-- M: struct | | | |-- childMap: struct | | | | |-- M: struct | | | | | |-- appName: struct | | | | | | |-- S: string | | | | | |-- packageName: struct | | | | | | |-- S: string | | | | | |-- updatedAt: struct | | | | | | |-- N: string | |-- strings: struct | | |-- SS: array | | | |-- element: string | |-- numbers: struct | | |-- NS: array | | | |-- element: string | |-- binaries: struct | | |-- BS: array | | | |-- element: string | |-- isDDBJson: struct | | |-- BOOL: boolean | |-- nullValue: struct | | |-- NULL: boolean

simplify_ddb_json() 변환은 이 구조를 다음과 같이 변환합니다.

root |-- parentMap: struct | |-- childMap: struct | | |-- appName: string | | |-- packageName: string | | |-- updatedAt: string |-- strings: array | |-- element: string |-- numbers: array | |-- element: string |-- binaries: array | |-- element: string |-- isDDBJson: boolean |-- nullValue: null

예: simplify_ddb_json을 사용하여 DynamoDB JSON 단순화 호출

이 코드 예제에서는 simplify_ddb_json 메서드를 사용하여 AWS Glue DynamoDB 내보내기 커넥터를 사용하고, DynamoDB JSON을 호출하고, 단순화하고, 파티션 수를 출력합니다.

예제 코드

from pyspark.context import SparkContext from awsglue.context import GlueContext sc = SparkContext() glueContext = GlueContext(sc) dynamicFrame = glueContext.create_dynamic_frame.from_options( connection_type = "dynamodb", connection_options = { 'dynamodb.export': 'ddb', 'dynamodb.tableArn': '<table arn>', 'dynamodb.s3.bucket': '<bucket name>', 'dynamodb.s3.prefix': '<bucket prefix>', 'dynamodb.s3.bucketOwner': '<account_id of bucket>' } ) simplified = dynamicFrame.simplify_ddb_json() print(simplified.getNumPartitions())

spigot

spigot(path, options={})

작업에 의해 수행된 변환을 확인하는 데 도움이 되도록 샘플 레코드를 지정된 대상에 기록합니다.

  • path - 작성할 기본 대상 주소 경로입니다(필수).

  • options - 옵션을 지정하는 키-값 페어입니다(선택 사항). "topk" 옵션은 첫 번째 k 기록이 작성되어야 한다는 것을 명시합니다. "prob" 옵션은 특정 레코드를 선택할 가능성(십진수)을 지정합니다. 작성할 레코드를 선택할 때 사용할 수 있습니다.

  • transformation_ctx - 고유 문자열을 통해 상태 정보를 확인합니다(선택 사항).

예: spigot을 사용하여 DynamicFrame에서 Amazon S3에 샘플 필드를 작성합니다

이 코드 예제는 select_fields 변환을 적용한 후 spigot 메서드를 사용하여 Amazon S3 버킷에 샘플 레코드를 작성합니다.

예제 데이터 세트

참고

이 예제에서 사용되는 데이터 세트에 액세스하려면 코드 예: 데이터 조인 및 관계화 항목을 참조하고 1단계: Amazon S3 버킷에서 데이터 크롤의 지침을 따르세요.

이 예제에서는 다음 스키마와 함께 persons를 호출한 DynamicFrame을 사용합니다.

root |-- family_name: string |-- name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- gender: string |-- image: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- sort_name: string |-- images: array | |-- element: struct | | |-- url: string |-- given_name: string |-- birth_date: string |-- id: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- death_date: string

예제 코드

# Example: Use spigot to write sample records # to a destination during a transformation # from pyspark.context import SparkContext. # Replace DOC-EXAMPLE-BUCKET with your own location. from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Load table data into a DynamicFrame persons = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="persons_json" ) # Perform the select_fields on the DynamicFrame persons = persons.select_fields(paths=["family_name", "given_name", "birth_date"]) # Use spigot to write a sample of the transformed data # (the first 10 records) spigot_output = persons.spigot( path="s3://DOC-EXAMPLE-BUCKET", options={"topk": 10} )

다음은 spigot이 Amazon S3에 작성한 데이터 예제입니다. 예제 코드가 options={"topk": 10}를 지정하였으므로 샘플 데이터에는 처음 10개의 레코드가 포함됩니다.

{"family_name":"Collins","given_name":"Michael","birth_date":"1944-10-15"} {"family_name":"Huizenga","given_name":"Bill","birth_date":"1969-01-31"} {"family_name":"Clawson","given_name":"Curtis","birth_date":"1959-09-28"} {"family_name":"Solomon","given_name":"Gerald","birth_date":"1930-08-14"} {"family_name":"Rigell","given_name":"Edward","birth_date":"1960-05-28"} {"family_name":"Crapo","given_name":"Michael","birth_date":"1951-05-20"} {"family_name":"Hutto","given_name":"Earl","birth_date":"1926-05-12"} {"family_name":"Ertel","given_name":"Allen","birth_date":"1937-11-07"} {"family_name":"Minish","given_name":"Joseph","birth_date":"1916-09-01"} {"family_name":"Andrews","given_name":"Robert","birth_date":"1957-08-04"}

split_fields

split_fields(paths, name1, name2, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

두 개의 DynamicFrames이 포함된 새 DynamicFrameCollection을 반환합니다. 첫 번째 DynamicFrame은 분할된 모든 노드를 포함하고 두 번째는 남겨진 노드를 포함합니다.

  • paths - 문자열 목록이며, 각 문자열 목록은 새로운 DynamicFrame으로 스플릿하려는 노드의 전체 경로입니다.

  • name1 - 스플릿된 DynamicFrame의 이름 문자열입니다.

  • name2 - 스플릿된 특정 노드 이후에 남겨진 DynamicFrame을 위한 이름 문자열입니다.

  • transformation_ctx - 고유 문자열을 통해 상태 정보를 확인합니다(선택 사항).

  • info - 이 변환에 따른 오류 보고 관련 문자열입니다(선택 사항).

  • stageThreshold - 오류가 발생되는 변환 중에 발생하는 오류 수(선택 사항). 기본값은 0이며, 이는 프로세스에 오류가 발생하지 않아야 함을 나타냅니다.

  • totalThreshold - 프로세스에서 오류가 발생해야 하는 이 변환을 포함하여 최대 발생한 오류 수입니다(선택 사항). 기본값은 0이며, 이는 프로세스에 오류가 발생하지 않아야 함을 나타냅니다.

예제: split_fields를 사용하여 선택한 필드를 별도의 DynamicFrame로 분할합니다

이 코드 예제는 split_fields 메서드를 사용하여 지정된 필드 목록을 별도의 DynamicFrame으로 분할합니다.

예제 데이터 세트

이 예제에서는 legislators_relationalized라는 컬렉션의 l_root_contact_details를 호출한 DynamicFrame을 사용합니다.

l_root_contact_details에는 다음 스키마와 엔트리가 포함됩니다.

root |-- id: long |-- index: int |-- contact_details.val.type: string |-- contact_details.val.value: string +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 1| 0| phone| 202-225-5265| | 1| 1| twitter| kathyhochul| | 2| 0| phone| 202-225-3252| | 2| 1| twitter| repjackyrosen| | 3| 0| fax| 202-225-1314| | 3| 1| phone| 202-225-3772| ...

예제 코드

# Example: Use split_fields to split selected # fields into a separate DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Load the input DynamicFrame and inspect its schema frame_to_split = legislators_relationalized.select("l_root_contact_details") print("Inspect the input DynamicFrame schema:") frame_to_split.printSchema() # Split id and index fields into a separate DynamicFrame split_fields_collection = frame_to_split.split_fields(["id", "index"], "left", "right") # Inspect the resulting DynamicFrames print("Inspect the schemas of the DynamicFrames created with split_fields:") split_fields_collection.select("left").printSchema() split_fields_collection.select("right").printSchema()
Inspect the input DynamicFrame's schema: root |-- id: long |-- index: int |-- contact_details.val.type: string |-- contact_details.val.value: string Inspect the schemas of the DynamicFrames created with split_fields: root |-- id: long |-- index: int root |-- contact_details.val.type: string |-- contact_details.val.value: string

split_rows

split_rows(comparison_dict, name1, name2, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

새로운 DynamicFrame으로 DynamicFrame의 하나 이상의 열을 스플릿합니다.

이 메서드는 2개의 DynamicFrames가 포함된 새 DynamicFrameCollection을 반환합니다. 첫 번째 DynamicFrame은 스플릿된 모든 행을 포함하고 두 번째는 남겨진 열을 포함합니다.

  • comparison_dict - 열까지 경로의 키와 열 값이 비교된 값의 매핑 비교기를 위한 다른 딕셔너리인 값의 딕셔너리입니다. 예를 들어, {"age": {">": 10, "<": 20}}는 나이 열에서 가치가 10 초과 20 미만인 모든 열을 스플릿합니다.

  • name1 - 스플릿된 DynamicFrame의 이름 문자열입니다.

  • name2 - 스플릿된 특정 노드 이후에 남겨진 DynamicFrame을 위한 이름 문자열입니다.

  • transformation_ctx - 고유 문자열을 통해 상태 정보를 확인합니다(선택 사항).

  • info - 이 변환에 따른 오류 보고 관련 문자열입니다(선택 사항).

  • stageThreshold - 오류가 발생되는 변환 중에 발생하는 오류 수(선택 사항). 기본값은 0이며, 이는 프로세스에 오류가 발생하지 않아야 함을 나타냅니다.

  • totalThreshold - 프로세스에서 오류가 발생해야 하는 이 변환을 포함하여 최대 발생한 오류 수입니다(선택 사항). 기본값은 0이며, 이는 프로세스에 오류가 발생하지 않아야 함을 나타냅니다.

예제: split_rows를 사용하여 DynamicFrame 행을 분할합니다

이 코드 예제는 split_rows 메서드를 사용하여 id 필드 값을 기준으로 DynamicFrame에서 행을 분할합니다.

예제 데이터 세트

이 예제에서는 legislators_relationalized라는 컬렉션에서 선택한 l_root_contact_details를 호출한 DynamicFrame을 사용합니다.

l_root_contact_details에는 다음 스키마와 엔트리가 포함됩니다.

root |-- id: long |-- index: int |-- contact_details.val.type: string |-- contact_details.val.value: string +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 1| 0| phone| 202-225-5265| | 1| 1| twitter| kathyhochul| | 2| 0| phone| 202-225-3252| | 2| 1| twitter| repjackyrosen| | 3| 0| fax| 202-225-1314| | 3| 1| phone| 202-225-3772| | 3| 2| twitter| MikeRossUpdates| | 4| 0| fax| 202-225-1314| | 4| 1| phone| 202-225-3772| | 4| 2| twitter| MikeRossUpdates| | 5| 0| fax| 202-225-1314| | 5| 1| phone| 202-225-3772| | 5| 2| twitter| MikeRossUpdates| | 6| 0| fax| 202-225-1314| | 6| 1| phone| 202-225-3772| | 6| 2| twitter| MikeRossUpdates| | 7| 0| fax| 202-225-1314| | 7| 1| phone| 202-225-3772| | 7| 2| twitter| MikeRossUpdates| | 8| 0| fax| 202-225-1314| +---+-----+------------------------+-------------------------+

예제 코드

# Example: Use split_rows to split up # rows in a DynamicFrame based on value from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Retrieve the DynamicFrame to split frame_to_split = legislators_relationalized.select("l_root_contact_details") # Split up rows by ID split_rows_collection = frame_to_split.split_rows({"id": {">": 10}}, "high", "low") # Inspect the resulting DynamicFrames print("Inspect the DynamicFrame that contains IDs < 10") split_rows_collection.select("low").toDF().show() print("Inspect the DynamicFrame that contains IDs > 10") split_rows_collection.select("high").toDF().show()
Inspect the DynamicFrame that contains IDs < 10 +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 1| 0| phone| 202-225-5265| | 1| 1| twitter| kathyhochul| | 2| 0| phone| 202-225-3252| | 2| 1| twitter| repjackyrosen| | 3| 0| fax| 202-225-1314| | 3| 1| phone| 202-225-3772| | 3| 2| twitter| MikeRossUpdates| | 4| 0| fax| 202-225-1314| | 4| 1| phone| 202-225-3772| | 4| 2| twitter| MikeRossUpdates| | 5| 0| fax| 202-225-1314| | 5| 1| phone| 202-225-3772| | 5| 2| twitter| MikeRossUpdates| | 6| 0| fax| 202-225-1314| | 6| 1| phone| 202-225-3772| | 6| 2| twitter| MikeRossUpdates| | 7| 0| fax| 202-225-1314| | 7| 1| phone| 202-225-3772| | 7| 2| twitter| MikeRossUpdates| | 8| 0| fax| 202-225-1314| +---+-----+------------------------+-------------------------+ only showing top 20 rows Inspect the DynamicFrame that contains IDs > 10 +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 11| 0| phone| 202-225-5476| | 11| 1| twitter| RepDavidYoung| | 12| 0| phone| 202-225-4035| | 12| 1| twitter| RepStephMurphy| | 13| 0| fax| 202-226-0774| | 13| 1| phone| 202-225-6335| | 14| 0| fax| 202-226-0774| | 14| 1| phone| 202-225-6335| | 15| 0| fax| 202-226-0774| | 15| 1| phone| 202-225-6335| | 16| 0| fax| 202-226-0774| | 16| 1| phone| 202-225-6335| | 17| 0| fax| 202-226-0774| | 17| 1| phone| 202-225-6335| | 18| 0| fax| 202-226-0774| | 18| 1| phone| 202-225-6335| | 19| 0| fax| 202-226-0774| | 19| 1| phone| 202-225-6335| | 20| 0| fax| 202-226-0774| | 20| 1| phone| 202-225-6335| +---+-----+------------------------+-------------------------+ only showing top 20 rows

unbox

unbox(path, format, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0, **options)

DynamicFrame의 문자열 필드를 열고(포맷 재지정) 개봉된 DynamicRecords를 포함한 새로운 DynamicFrame를 반환합니다.

DynamicRecordDynamicFrame내 논리적 기록입니다. 자기 설명적이고 고정 스키마를 준수하지 않은 데이터에 사용될 수 있다는 점을 제외하면 Apache Spark DataFrame의 열과 비슷합니다.

  • path - 열고자 하는 문자열 노드의 전체 경로입니다.

  • format - 포맷 사양입니다(선택 사항). 여러 포맷을 지원하는 Amazon S3 또는 AWS Glue 연결에 사용할 수 있습니다. 지원되는 포맷은 AWS Glue for Spark에서 입력 및 출력의 데이터 형식 옵션를 참조하세요.

  • transformation_ctx - 고유 문자열을 통해 상태 정보를 확인합니다(선택 사항).

  • info - 이 변환에 따른 오류 보고 관련 문자열입니다(선택 사항).

  • stageThreshold - 오류가 발생되는 변환 중에 발생하는 오류 수(선택 사항). 기본값은 0이며, 이는 프로세스에 오류가 발생하지 않아야 함을 나타냅니다.

  • totalThreshold - 프로세스에서 오류가 발생해야 하는 이 변환을 포함하여 최대 발생한 오류 수입니다(선택 사항). 기본값은 0이며, 이는 프로세스에 오류가 발생하지 않아야 함을 나타냅니다.

  • options - 다음 중 한 개 이상을 수행할 수 있습니다.

    • separator - 구분자 문자를 포함한 문자열입니다.

    • escaper - 에스케이프 문자를 포함한 문자열입니다.

    • skipFirst - 첫 번째 인스턴스를 넘어갈지 여부를 나타내는 부울 값입니다.

    • withSchema - 노드 스키마의 JSON 표현을 포함하는 문자열입니다. 스키마의 JSON 표현 형식은 StructType.json()의 출력에 의해 정의됩니다.

    • withHeader - 헤더가 포함되었는지 여부를 나타내는 부울 값입니다.

예제: 개봉하기를 사용하여 문자열 필드를 구조에 개봉하기

이 코드 예제는 unbox 메서드를 사용하여 DynamicFrame의 문자열 필드를 유형 구조 필드로 개봉하거나 포맷을 다시 지정합니다.

예제 데이터 세트

이 예제에서는 다음과 같은 스키마 및 항목과 함께 mapped_with_string를 호출한 DynamicFrame을 사용합니다.

AddressString로 이름이 지정된 필드를 확인하십시오. 이 필드는 예제에서 구조로 개봉되는 필드입니다.

root |-- Average Total Payments: string |-- AddressString: string |-- Average Covered Charges: string |-- DRG Definition: string |-- Average Medicare Payments: string |-- Hospital Referral Region Description: string |-- Address: struct | |-- Zip.Code: string | |-- City: string | |-- Array: array | | |-- element: string | |-- State: string | |-- Street: string |-- Provider Id: string |-- Total Discharges: string |-- Provider Name: string +----------------------+--------------------+-----------------------+--------------------+-------------------------+------------------------------------+--------------------+-----------+----------------+--------------------+ |Average Total Payments| AddressString|Average Covered Charges| DRG Definition|Average Medicare Payments|Hospital Referral Region Description| Address|Provider Id|Total Discharges| Provider Name| +----------------------+--------------------+-----------------------+--------------------+-------------------------+------------------------------------+--------------------+-----------+----------------+--------------------+ | $5777.24|{"Street": "1108 ...| $32963.07|039 - EXTRACRANIA...| $4763.73| AL - Dothan|[36301, DOTHAN, [...| 10001| 91|SOUTHEAST ALABAMA...| | $5787.57|{"Street": "2505 ...| $15131.85|039 - EXTRACRANIA...| $4976.71| AL - Birmingham|[35957, BOAZ, [25...| 10005| 14|MARSHALL MEDICAL ...| | $5434.95|{"Street": "205 M...| $37560.37|039 - EXTRACRANIA...| $4453.79| AL - Birmingham|[35631, FLORENCE,...| 10006| 24|ELIZA COFFEE MEMO...| | $5417.56|{"Street": "50 ME...| $13998.28|039 - EXTRACRANIA...| $4129.16| AL - Birmingham|[35235, BIRMINGHA...| 10011| 25| ST VINCENT'S EAST| ...

예제 코드

# Example: Use unbox to unbox a string field # into a struct in a DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) unboxed = mapped_with_string.unbox("AddressString", "json") unboxed.printSchema() unboxed.toDF().show()
root |-- Average Total Payments: string |-- AddressString: struct | |-- Street: string | |-- City: string | |-- State: string | |-- Zip.Code: string | |-- Array: array | | |-- element: string |-- Average Covered Charges: string |-- DRG Definition: string |-- Average Medicare Payments: string |-- Hospital Referral Region Description: string |-- Address: struct | |-- Zip.Code: string | |-- City: string | |-- Array: array | | |-- element: string | |-- State: string | |-- Street: string |-- Provider Id: string |-- Total Discharges: string |-- Provider Name: string +----------------------+--------------------+-----------------------+--------------------+-------------------------+------------------------------------+--------------------+-----------+----------------+--------------------+ |Average Total Payments| AddressString|Average Covered Charges| DRG Definition|Average Medicare Payments|Hospital Referral Region Description| Address|Provider Id|Total Discharges| Provider Name| +----------------------+--------------------+-----------------------+--------------------+-------------------------+------------------------------------+--------------------+-----------+----------------+--------------------+ | $5777.24|[1108 ROSS CLARK ...| $32963.07|039 - EXTRACRANIA...| $4763.73| AL - Dothan|[36301, DOTHAN, [...| 10001| 91|SOUTHEAST ALABAMA...| | $5787.57|[2505 U S HIGHWAY...| $15131.85|039 - EXTRACRANIA...| $4976.71| AL - Birmingham|[35957, BOAZ, [25...| 10005| 14|MARSHALL MEDICAL ...| | $5434.95|[205 MARENGO STRE...| $37560.37|039 - EXTRACRANIA...| $4453.79| AL - Birmingham|[35631, FLORENCE,...| 10006| 24|ELIZA COFFEE MEMO...| | $5417.56|[50 MEDICAL PARK ...| $13998.28|039 - EXTRACRANIA...| $4129.16| AL - Birmingham|[35235, BIRMINGHA...| 10011| 25| ST VINCENT'S EAST| | $5658.33|[1000 FIRST STREE...| $31633.27|039 - EXTRACRANIA...| $4851.44| AL - Birmingham|[35007, ALABASTER...| 10016| 18|SHELBY BAPTIST ME...| | $6653.80|[2105 EAST SOUTH ...| $16920.79|039 - EXTRACRANIA...| $5374.14| AL - Montgomery|[36116, MONTGOMER...| 10023| 67|BAPTIST MEDICAL C...| | $5834.74|[2000 PEPPERELL P...| $11977.13|039 - EXTRACRANIA...| $4761.41| AL - Birmingham|[36801, OPELIKA, ...| 10029| 51|EAST ALABAMA MEDI...| | $8031.12|[619 SOUTH 19TH S...| $35841.09|039 - EXTRACRANIA...| $5858.50| AL - Birmingham|[35233, BIRMINGHA...| 10033| 32|UNIVERSITY OF ALA...| | $6113.38|[101 SIVLEY RD, H...| $28523.39|039 - EXTRACRANIA...| $5228.40| AL - Huntsville|[35801, HUNTSVILL...| 10039| 135| HUNTSVILLE HOSPITAL| | $5541.05|[1007 GOODYEAR AV...| $75233.38|039 - EXTRACRANIA...| $4386.94| AL - Birmingham|[35903, GADSDEN, ...| 10040| 34|GADSDEN REGIONAL ...| | $5461.57|[600 SOUTH THIRD ...| $67327.92|039 - EXTRACRANIA...| $4493.57| AL - Birmingham|[35901, GADSDEN, ...| 10046| 14|RIVERVIEW REGIONA...| | $5356.28|[4370 WEST MAIN S...| $39607.28|039 - EXTRACRANIA...| $4408.20| AL - Dothan|[36305, DOTHAN, [...| 10055| 45| FLOWERS HOSPITAL| | $5374.65|[810 ST VINCENT'S...| $22862.23|039 - EXTRACRANIA...| $4186.02| AL - Birmingham|[35205, BIRMINGHA...| 10056| 43|ST VINCENT'S BIRM...| | $5366.23|[400 EAST 10TH ST...| $31110.85|039 - EXTRACRANIA...| $4376.23| AL - Birmingham|[36207, ANNISTON,...| 10078| 21|NORTHEAST ALABAMA...| | $5282.93|[1613 NORTH MCKEN...| $25411.33|039 - EXTRACRANIA...| $4383.73| AL - Mobile|[36535, FOLEY, [1...| 10083| 15|SOUTH BALDWIN REG...| | $5676.55|[1201 7TH STREET ...| $9234.51|039 - EXTRACRANIA...| $4509.11| AL - Huntsville|[35609, DECATUR, ...| 10085| 27|DECATUR GENERAL H...| | $5930.11|[6801 AIRPORT BOU...| $15895.85|039 - EXTRACRANIA...| $3972.85| AL - Mobile|[36608, MOBILE, [...| 10090| 27| PROVIDENCE HOSPITAL| | $6192.54|[809 UNIVERSITY B...| $19721.16|039 - EXTRACRANIA...| $5179.38| AL - Tuscaloosa|[35401, TUSCALOOS...| 10092| 31|D C H REGIONAL ME...| | $4968.00|[750 MORPHY AVENU...| $10710.88|039 - EXTRACRANIA...| $3898.88| AL - Mobile|[36532, FAIRHOPE,...| 10100| 18| THOMAS HOSPITAL| | $5996.00|[701 PRINCETON AV...| $51343.75|039 - EXTRACRANIA...| $4962.45| AL - Birmingham|[35211, BIRMINGHA...| 10103| 33|BAPTIST MEDICAL C...| +----------------------+--------------------+-----------------------+--------------------+-------------------------+------------------------------------+--------------------+-----------+----------------+--------------------+ only showing top 20 rows

결합

union(frame1, frame2, transformation_ctx = "", info = "", stageThreshold = 0, totalThreshold = 0)

DynamicFrames유니온 투. 두 입력의 모든 레코드를 DynamicFrame 포함하는 DynamicFrames 리턴입니다. 이 변환은 동일한 데이터를 DataFrames 가진 두 개의 합집합에서 다른 결과를 반환할 수 있습니다. Spark DataFrame 유니온 동작이 필요한 경우 사용을 toDF 고려해 보십시오.

  • frame1— 처음으로 DynamicFrame 유니온을 시작했어요.

  • frame2— DynamicFrame 유니온에 이어 두 번째입니다.

  • transformation_ctx – (선택 사항) 통계/상태 정보를 확인하는 데 사용되는 고유 문자열

  • info - (선택 사항) 변환에 따른 오류 관련 문자열

  • stageThreshold— (선택 사항) 처리 오류가 발생할 때까지 변환에서 발생한 최대 오류 수

  • totalThreshold— (선택 사항) 처리 오류가 발생할 때까지의 총 최대 오류 수.

unnest

unnest(transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

DynamicFrame에서 중첩된 객체를 중첩시키지 않고 상위 객체로 만들어서 새로운 중첩되지 않은 DynamicFrame을 반환합니다.

  • transformation_ctx - 고유 문자열을 통해 상태 정보를 확인합니다(선택 사항).

  • info - 이 변환에 따른 오류 보고 관련 문자열입니다(선택 사항).

  • stageThreshold - 오류가 발생되는 변환 중에 발생하는 오류 수(선택 사항). 기본값은 0이며, 이는 프로세스에 오류가 발생하지 않아야 함을 나타냅니다.

  • totalThreshold - 프로세스에서 오류가 발생해야 하는 이 변환을 포함하여 최대 발생한 오류 수입니다(선택 사항). 기본값은 0이며, 이는 프로세스에 오류가 발생하지 않아야 함을 나타냅니다.

예제: 중첩 해제를 사용하여 중첩된 필드를 최상위 필드로 전환합니다

이 코드 예제는 unnest 메서드를 사용하여 DynamicFrame의 모든 중첩된 필드를 최상위 필드로 병합합니다.

예제 데이터 세트

이 예제에서는 다음 스키마와 함께 mapped_medicare를 호출하는 DynamicFrame을 사용합니다. Address 필드는 중첩된 데이터를 포함하는 유일한 필드라는 점에 유의하십시오.

root |-- Average Total Payments: string |-- Average Covered Charges: string |-- DRG Definition: string |-- Average Medicare Payments: string |-- Hospital Referral Region Description: string |-- Address: struct | |-- Zip.Code: string | |-- City: string | |-- Array: array | | |-- element: string | |-- State: string | |-- Street: string |-- Provider Id: string |-- Total Discharges: string |-- Provider Name: string

예제 코드

# Example: Use unnest to unnest nested # objects in a DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Unnest all nested fields unnested = mapped_medicare.unnest() unnested.printSchema()
root |-- Average Total Payments: string |-- Average Covered Charges: string |-- DRG Definition: string |-- Average Medicare Payments: string |-- Hospital Referral Region Description: string |-- Address.Zip.Code: string |-- Address.City: string |-- Address.Array: array | |-- element: string |-- Address.State: string |-- Address.Street: string |-- Provider Id: string |-- Total Discharges: string |-- Provider Name: string

unnest_ddb_json

DynamoDB JSON 구조의 DynamicFrame에 있는 중첩된 열을 한정해서 중첩 해제하고, 중첩되지 않은 새 DynamicFrame을 반환합니다. 구조체 유형 배열인 열은 중첩 해제되지 않습니다. 이 변환은 일반적인 unnest 변환과는 다르게 작동하는 특정 유형의 중첩 해제 변환이며, 데이터가 이미 DynamoDB JSON 구조에 있어야 합니다. 자세한 내용은 DynamoDB JSON을 참조하세요.

unnest_ddb_json(transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)
  • transformation_ctx - 고유 문자열을 통해 상태 정보를 확인합니다(선택 사항).

  • info - 이 변환에 따른 오류 보고 관련 문자열입니다(선택 사항).

  • stageThreshold - 오류가 발생되는 변환 중에 발생하는 오류 수 (선택: 오류가 발생되지 않은 변환을 나타내는 기본값은 0입니다).

  • totalThreshold - 오류가 발생되는 변환을 포함하여 최대 발생하는 오류 수 (선택: 오류가 발생되지 않은 변환을 나타내는 기본값은 0입니다).

예를 들어, DynamoDB JSON 구조를 사용하여 내보내기를 읽는 스키마는 다음과 같을 수 있습니다.

root |-- Item: struct | |-- ColA: struct | | |-- S: string | |-- ColB: struct | | |-- S: string | |-- ColC: struct | | |-- N: string | |-- ColD: struct | | |-- L: array | | | |-- element: null

unnest_ddb_json() 변환은 이 구조를 다음과 같이 변환합니다.

root |-- ColA: string |-- ColB: string |-- ColC: string |-- ColD: array | |-- element: null

다음 코드 예제는 AWS Glue DynamoDB 내보내기 커넥터를 사용하고, DynamoDB JSON 언네스트를 호출하고, 파티션 수를 출력하는 방법을 보여줍니다.

import sys from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job from awsglue.utils import getResolvedOptions args = getResolvedOptions(sys.argv, ["JOB_NAME"]) glue_context= GlueContext(SparkContext.getOrCreate()) job = Job(glue_context) job.init(args["JOB_NAME"], args) dynamicFrame = glue_context.create_dynamic_frame.from_options( connection_type="dynamodb", connection_options={ "dynamodb.export": "ddb", "dynamodb.tableArn": "<test_source>", "dynamodb.s3.bucket": "<bucket name>", "dynamodb.s3.prefix": "<bucket prefix>", "dynamodb.s3.bucketOwner": "<account_id>", } ) unnested = dynamicFrame.unnest_ddb_json() print(unnested.getNumPartitions()) job.commit()

write

write(connection_type, connection_options, format, format_options, accumulator_size)

DynamicFrameGlueContext 클래스에서 지정된 연결 유형의 DataSink(객체)를 얻어 DynamicFrame 내용을 포맷하고 작성하는 데 사용합니다. 명시된 대로 포맷되고 작성된 새로운 DynamicFrame을 반환합니다.

  • connection_type - 사용할 연결 유형입니다. 유효한 값에는 s3, mysql, postgresql, redshift, sqlserveroracle가 있습니다.

  • connection_options - 사용할 연결 옵션입니다(선택 사항). s3connection_type의 경우, Amazon S3 경로가 정의됩니다.

    connection_options = {"path": "s3://aws-glue-target/temp"}

    JDBC 연결의 경우, 몇 까지 속성이 정의되어야 합니다. 단, 데이터베이스 이름이 URL의 일부여야 합니다. 연결 옵션에 선택적으로 포함될 수 있습니다.

    주의

    스크립트에 암호를 저장하는 것은 권장되지 않습니다. AWS Secrets Manager 또는 AWS Glue 데이터 카탈로그에서 데이터를 검색하는 boto3 데 사용하는 것을 고려해 보십시오.

    connection_options = {"url": "jdbc-url/database", "user": "username", "password": passwordVariable,"dbtable": "table-name", "redshiftTmpDir": "s3-tempdir-path"}
  • format - 포맷 사양입니다(선택 사항). 여러 포맷을 지원하는 Amazon Simple Storage Service(Amazon S3) 또는 AWS Glue 연결에 사용됩니다. 지원되는 포맷은 AWS Glue for Spark에서 입력 및 출력의 데이터 형식 옵션를 참조하십시오.

  • format_options - 지정된 포맷에 대한 포맷 옵션입니다. 지원되는 포맷은 AWS Glue for Spark에서 입력 및 출력의 데이터 형식 옵션를 참조하십시오.

  • accumulator_size - (선택 사항) 사용할 누적 가능한 크기(바이트)입니다.

 - 오류 -

assertErrorThreshold

assertErrorThreshold( ) - 이 DynamicFrame을 생성한 변환의 오류에 대한 어설션입니다. 기본 DataFrame으로부터 Exception을 반환합니다.

errorsAsDynamic프레임

errorsAsDynamicFrame( ) - 내부에 중첩된 오류 기록을 보유한 DynamicFrame을 반환합니다.

예: errorsAsDynamic Frame을 사용하여 오류 기록 보기

다음 코드 예제에서는 errorsAsDynamicFrame 메서드를 사용하여 DynamicFrame에 대한 오류 레코드를 보는 방법을 보여 줍니다.

예제 데이터 세트

이 예제에서는 Amazon S3에 JSON으로 업로드할 수 있는 다음 데이터 세트를 사용합니다. 두 번째 레코드의 형식이 잘못되었습니다. 잘못된 형식의 데이터는 일반적으로 SparkSQL을 사용할 때 파일 구문 분석을 중단합니다. 그러나 DynamicFrame은 잘못된 형식 문제를 인식하고 잘못된 형식 행을 개별적으로 처리할 수 있는 오류 레코드로 바꿉니다.

{"id": 1, "name": "george", "surname": "washington", "height": 178} {"id": 2, "name": "benjamin", "surname": "franklin", {"id": 3, "name": "alexander", "surname": "hamilton", "height": 171} {"id": 4, "name": "john", "surname": "jay", "height": 190}

예제 코드

# Example: Use errorsAsDynamicFrame to view error records. # Replace s3://DOC-EXAMPLE-S3-BUCKET/error_data.json with your location. from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Create errors DynamicFrame, view schema errors = glueContext.create_dynamic_frame.from_options( "s3", {"paths": ["s3://DOC-EXAMPLE-S3-BUCKET/error_data.json"]}, "json" ) print("Schema of errors DynamicFrame:") errors.printSchema() # Show that errors only contains valid entries from the dataset print("errors contains only valid records from the input dataset (2 of 4 records)") errors.toDF().show() # View errors print("Errors count:", str(errors.errorsCount())) print("Errors:") errors.errorsAsDynamicFrame().toDF().show() # View error fields and error data error_record = errors.errorsAsDynamicFrame().toDF().head() error_fields = error_record["error"] print("Error fields: ") print(error_fields.asDict().keys()) print("\nError record data:") for key in error_fields.asDict().keys(): print("\n", key, ": ", str(error_fields[key]))
Schema of errors DynamicFrame: root |-- id: int |-- name: string |-- surname: string |-- height: int errors contains only valid records from the input dataset (2 of 4 records) +---+------+----------+------+ | id| name| surname|height| +---+------+----------+------+ | 1|george|washington| 178| | 4| john| jay| 190| +---+------+----------+------+ Errors count: 1 Errors: +--------------------+ | error| +--------------------+ |[[ File "/tmp/20...| +--------------------+ Error fields: dict_keys(['callsite', 'msg', 'stackTrace', 'input', 'bytesread', 'source', 'dynamicRecord']) Error record data: callsite : Row(site=' File "/tmp/2060612586885849088", line 549, in <module>\n sys.exit(main())\n File "/tmp/2060612586885849088", line 523, in main\n response = handler(content)\n File "/tmp/2060612586885849088", line 197, in execute_request\n result = node.execute()\n File "/tmp/2060612586885849088", line 103, in execute\n exec(code, global_dict)\n File "<stdin>", line 10, in <module>\n File "/opt/amazon/lib/python3.6/site-packages/awsglue/dynamicframe.py", line 625, in from_options\n format_options, transformation_ctx, push_down_predicate, **kwargs)\n File "/opt/amazon/lib/python3.6/site-packages/awsglue/context.py", line 233, in create_dynamic_frame_from_options\n source.setFormat(format, **format_options)\n', info='') msg : error in jackson reader stackTrace : com.fasterxml.jackson.core.JsonParseException: Unexpected character ('{' (code 123)): was expecting either valid name character (for unquoted name) or double-quote (for quoted) to start field name at [Source: com.amazonaws.services.glue.readers.BufferedStream@73492578; line: 3, column: 2] at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1581) at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:533) at com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:462) at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleOddName(UTF8StreamJsonParser.java:2012) at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._parseName(UTF8StreamJsonParser.java:1650) at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:740) at com.amazonaws.services.glue.readers.JacksonReader$$anonfun$hasNextGoodToken$1.apply(JacksonReader.scala:57) at com.amazonaws.services.glue.readers.JacksonReader$$anonfun$hasNextGoodToken$1.apply(JacksonReader.scala:57) at scala.collection.Iterator$$anon$9.next(Iterator.scala:162) at scala.collection.Iterator$$anon$16.hasNext(Iterator.scala:599) at scala.collection.Iterator$$anon$16.hasNext(Iterator.scala:598) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at com.amazonaws.services.glue.readers.JacksonReader$$anonfun$1.apply(JacksonReader.scala:120) at com.amazonaws.services.glue.readers.JacksonReader$$anonfun$1.apply(JacksonReader.scala:116) at com.amazonaws.services.glue.DynamicRecordBuilder.handleErr(DynamicRecordBuilder.scala:209) at com.amazonaws.services.glue.DynamicRecordBuilder.handleErrorWithException(DynamicRecordBuilder.scala:202) at com.amazonaws.services.glue.readers.JacksonReader.nextFailSafe(JacksonReader.scala:116) at com.amazonaws.services.glue.readers.JacksonReader.next(JacksonReader.scala:109) at com.amazonaws.services.glue.readers.JSONReader.next(JSONReader.scala:247) at com.amazonaws.services.glue.hadoop.TapeHadoopRecordReaderSplittable.nextKeyValue(TapeHadoopRecordReaderSplittable.scala:103) at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:230) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:121) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) input : bytesread : 252 source : dynamicRecord : Row(id=2, name='benjamin', surname='franklin')

errorsCount

errorsCount( ) - DynamicFrame의 전체 오류 수를 반환합니다.

stageErrorsCount

stageErrorsCount - 이 DynamicFrame을 생성하는 과정에서 발생한 오류 수를 반환합니다.