DynamicFrame クラス - AWS Glue

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

DynamicFrame クラス

Apache Spark の主要な抽象化の 1 つは SparkSQL DataFrame で、これは R と Pandas にある DataFrame 構造に似ています。DataFrame はテーブルと似ており、機能スタイル (マップ/リデュース/フィルター/その他) 操作と SQL 操作 (選択、プロジェクト、集計) をサポートしています。

DataFrames は、強力で広く使用されていますが、抽出、変換、ロード (ETL) 操作に関しては制限があります。最も重要なのは、データをロードする前にスキーマを指定する必要があることです。SparkSQL は、データに対してパスを 2 つ作ることでこれを解決します。この 1 つ目はスキーマの推定を行い、2 つ目はデータをロードします。ただし、この推測は限定されており、実際の煩雑なデータには対応しません。例えば、同じフィールドが異なるレコードの異なるタイプである可能性があります。Apache Spark は、多くの場合、作業を中断して、元のフィールドテキストを使用して型を string として報告します。これは正しくない可能性があり、スキーマの不一致を解決する方法を細かくコントロールする必要があるかもしれません。また、大規模なデータセットの場合、ソースデータに対する追加パスが非常に高価になる可能性があります。

これらの制限に対応するために、AWS Glue では DynamicFrame を導入しています。DynamicFrame は、DataFrame と似ていますが、各レコードが自己記述できるため、最初はスキーマは必要ありません。代わりに、AWS Glue は必要に応じてオンザフライでスキーマを計算し、選択 (または共用) タイプを使用してスキーマの不一致を明示的にエンコードします。これらの不整合を解決して、固定スキーマを必要とするデータストアとデータセットを互換性のあるものにできます。

同様に、DynamicRecordDynamicFrame 内の論理レコードを表します。これは、Spark DataFrame の行と似ていますが、自己記述型であり、固定スキーマに適合しないデータに使用できます。PySpark で AWS Glue を使用する場合、通常は独立した操作 DynamicRecords は行いません。むしろ、データセットをその DynamicFrame を使ってまとめて変換します。

スキーマの不一致を解決したら、DynamicFramesDataFrames との間で変換することができます。

 — construction —

__init__

__init__(jdf, glue_ctx, name)
  • jdf - Java 仮想マシン (JVM) 内のデータフレームへの参照。

  • glue_ctxGlueContext クラス オブジェクト。

  • name - オプションの名前文字列。デフォルトでは空。

fromDF

fromDF(dataframe, glue_ctx, name)

DataFrame フィールドを DynamicRecord に変換することにより、DataFrameDynamicFrame に変換します。新しい DynamicFrame を返します。

DynamicRecordDynamicFrame 内の論理レコードを表します。これは、自己記述型であり、固定スキーマに適合しないデータに使用できる点を除いて、Spark DataFrame の行に似ています。

この関数は、DataFrame で名前と重複する列がすでに解決されていることを前提としています。

  • dataframe - 変換する Apache Spark SQL DataFrame (必須)。

  • glue_ctx - この変換のコンテキストを指定する GlueContext クラス オブジェクト (必須)。

  • name - 結果 DynamicFrame の名前 (AWS Glue 3.0 以降はオプション)。

toDF

toDF(options)

DynamicRecordsDataFrame フィールドに変換することにより、DynamicFrame を Apache Spark DataFrame に変換します。新しい DataFrame を返します。

DynamicRecordDynamicFrame 内の論理レコードを表します。これは、自己記述型であり、固定スキーマに適合しないデータに使用できる点を除いて、Spark DataFrame の行に似ています。

  • options - オプションのリスト。ProjectCast アクションタイプを選択した場合、ターゲットのタイプを指定します。次に例を示します。

    >>>toDF([ResolveOption("a.b.c", "KeepAsStruct")]) >>>toDF([ResolveOption("a.b.c", "Project", DoubleType())])

 — information —

count

count( ) - 基盤となる DataFrame の行数を返します。

スキーマ

schema( ) - この DynamicFrame のスキーマを返します。使用できない場合は、基盤となる DataFrame のスキーマを返します。

このスキーマを構成する DynamicFrame タイプの詳細については、「PySpark 拡張子型」を参照してください。

printSchema

printSchema( ) - 基盤となる DataFrame のスキーマを表示します。

show

show(num_rows) - 基盤となる DataFrame から、指定された行数を表示します。

repartition

repartition(numPartitions)numPartitions 個のパーティションを含む新しい DynamicFrame を返します。

coalesce

coalesce(numPartitions)numPartitions 個のパーティションを含む新しい DynamicFrame を返します。

 — transforms —

apply_mapping

apply_mapping(mappings, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

DynamicFrame に宣言型のマッピングを適用し、それらのマッピングが適用された新しい DynamicFrame を指定したフィールドに返します。未指定のフィールドは新しい DynamicFrame から除外されます。

  • mappings — マッピングタプルのリスト (必須)。それぞれが (ソース列、ソースタイプ、ターゲット列、ターゲットタイプ) で構成されます。

    ソース列で、名前にドット「.」が含まれている場合、バックティック「``」で囲む必要があります。例えば、this.old.name (文字列) を thisNewName にマッピングするには、次のタプルを使用します。

    ("`this.old.name`", "string", "thisNewName", "string")
  • transformation_ctx - 状態情報を識別するために使用される一意の文字列 (オプション)。

  • info - この変換のエラー報告に関連付ける文字列 (オプション)。

  • stageThreshold - この変換中に発生した、プロセスがエラーを出力するエラーの数 (オプション)。デフォルトはゼロで、エラーを出力しないことを示します。

  • totalThreshold - この変換までに発生したエラーのうち、プロセスがエラーを出力するエラーの数 (オプション)。デフォルトはゼロで、エラーを出力しないことを示します。

例: apply_mapping を使用してフィールドの名前を変更し、フィールドタイプを変更する

次のコード例は、apply_mapping メソッドを使用して、選択したフィールドの名前を変更し、フィールドタイプを変更する方法を示しています。

注記

この例で使用されているデータセットにアクセスするには、「コード例: データの結合と関係付け」を参照し、「ステップ 1: Amazon S3 バケット内のデータをクロールする」の手順に従います。

# Example: Use apply_mapping to reshape source data into # the desired column names and types as a new DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Create a DynamicFrame and view its schema persons = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="persons_json" ) print("Schema for the persons DynamicFrame:") persons.printSchema() # Select and rename fields, change field type print("Schema for the persons_mapped DynamicFrame, created with apply_mapping:") persons_mapped = persons.apply_mapping( [ ("family_name", "String", "last_name", "String"), ("name", "String", "first_name", "String"), ("birth_date", "String", "date_of_birth", "Date"), ] ) persons_mapped.printSchema()
Schema for the persons DynamicFrame: root |-- family_name: string |-- name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- gender: string |-- image: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- sort_name: string |-- images: array | |-- element: struct | | |-- url: string |-- given_name: string |-- birth_date: string |-- id: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- death_date: string Schema for the persons_mapped DynamicFrame, created with apply_mapping: root |-- last_name: string |-- first_name: string |-- date_of_birth: date

drop_fields

drop_fields(paths, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

FlatMap クラス 変換を呼び出して、DynamicFrame からフィールドを削除します。指定されたフィールドが削除された新しい DynamicFrame を返します。

  • paths - 文字列のリスト。それぞれに、ドロップするフィールドノードへのフルパスが含まれます。ドット表記を使用して、ネストされたフィールドを指定できます。例えば、フィールド first がツリー内のフィールド name の子である場合は、パスに "name.first" を指定します。

    フィールドノードの名前にリテラル . が含まれている場合は、その名前をバックティック (`) で囲む必要があります。

  • transformation_ctx - 状態情報を識別するために使用される一意の文字列 (オプション)。

  • info - この変換のエラー報告に関連付ける文字列 (オプション)。

  • stageThreshold - この変換中に発生した、プロセスがエラーを出力するエラーの数 (オプション)。デフォルトはゼロで、エラーを出力しないことを示します。

  • totalThreshold - この変換までに発生したエラーのうち、プロセスがエラーを出力するエラーの数 (オプション)。デフォルトはゼロで、エラーを出力しないことを示します。

例: drop_fields を使用して DynamicFrame からフィールドを削除する

このコード例では、drop_fields メソッドを使用して、選択したトップレベルフィールドとネストされたフィールドを DynamicFrame から削除します。

データセットの例

この例では、コード内の EXAMPLE-FRIENDS-DATA テーブルで表される次のデータセットを使用します。

{"name": "Sally", "age": 23, "location": {"state": "WY", "county": "Fremont"}, "friends": []} {"name": "Varun", "age": 34, "location": {"state": "NE", "county": "Douglas"}, "friends": [{"name": "Arjun", "age": 3}]} {"name": "George", "age": 52, "location": {"state": "NY"}, "friends": [{"name": "Fred"}, {"name": "Amy", "age": 15}]} {"name": "Haruki", "age": 21, "location": {"state": "AK", "county": "Denali"}} {"name": "Sheila", "age": 63, "friends": [{"name": "Nancy", "age": 22}]}

コードの例

# Example: Use drop_fields to remove top-level and nested fields from a DynamicFrame. # Replace MY-EXAMPLE-DATABASE with your Glue Data Catalog database name. # Replace EXAMPLE-FRIENDS-DATA with your table name. from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Create a DynamicFrame from Glue Data Catalog glue_source_database = "MY-EXAMPLE-DATABASE" glue_source_table = "EXAMPLE-FRIENDS-DATA" friends = glueContext.create_dynamic_frame.from_catalog( database=glue_source_database, table_name=glue_source_table ) print("Schema for friends DynamicFrame before calling drop_fields:") friends.printSchema() # Remove location.county, remove friends.age, remove age friends = friends.drop_fields(paths=["age", "location.county", "friends.age"]) print("Schema for friends DynamicFrame after removing age, county, and friend age:") friends.printSchema()
Schema for friends DynamicFrame before calling drop_fields: root |-- name: string |-- age: int |-- location: struct | |-- state: string | |-- county: string |-- friends: array | |-- element: struct | | |-- name: string | | |-- age: int Schema for friends DynamicFrame after removing age, county, and friend age: root |-- name: string |-- location: struct | |-- state: string |-- friends: array | |-- element: struct | | |-- name: string

フィルター

filter(f, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

指定された述語関数 f を満たす入力 DynamicFrame 内のすべての DynamicRecords を含む、新しい DynamicFrame を返します。

  • f - DynamicFrame に適用する述語関数。この関数は DynamicRecord を引数として取り、DynamicRecord がフィルター要件を満たす場合は True を返し、そうでない場合は False を返します (必須)。

    DynamicRecordDynamicFrame 内の論理レコードを表します。これは、自己記述型であり、固定スキーマに準拠しないデータに使用できる点を除いて、Spark DataFrame の行に似ています。

  • transformation_ctx - 状態情報を識別するために使用される一意の文字列 (オプション)。

  • info - この変換のエラー報告に関連付ける文字列 (オプション)。

  • stageThreshold - この変換中に発生した、プロセスがエラーを出力するエラーの数 (オプション)。デフォルトはゼロで、エラーを出力しないことを示します。

  • totalThreshold - この変換までに発生したエラーのうち、プロセスがエラーを出力するエラーの数 (オプション)。デフォルトはゼロで、エラーを出力しないことを示します。

例: フィルターを使用して、フィールドのフィルタリングされた選択内容を取得する

この例では、filter メソッドを使用して、別の DynamicFrame のフィールドのフィルタリングされた選択内容を含む新しい DynamicFrame を作成します。

map メソッドと同様に、filter は、元の DynamicFrame の各レコードに適用される引数として関数を取ります。この関数はレコードを入力として受け取り、ブール値を返します。戻り値が true の場合、レコードは結果として生じる DynamicFrame に含まれます。false の場合、レコードは除外されます。

注記

この例で使用されているデータセットにアクセスするには、「コード例: ResolveChoice、Lambda、および ApplyMapping を使用したデータ準備」を参照し、「ステップ 1: Amazon S3 バケット内のデータをクロールする」の手順に従います。

# Example: Use filter to create a new DynamicFrame # with a filtered selection of records from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Create DynamicFrame from Glue Data Catalog medicare = glueContext.create_dynamic_frame.from_options( "s3", { "paths": [ "s3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv" ] }, "csv", {"withHeader": True}, ) # Create filtered DynamicFrame with custom lambda # to filter records by Provider State and Provider City sac_or_mon = medicare.filter( f=lambda x: x["Provider State"] in ["CA", "AL"] and x["Provider City"] in ["SACRAMENTO", "MONTGOMERY"] ) # Compare record counts print("Unfiltered record count: ", medicare.count()) print("Filtered record count: ", sac_or_mon.count())
Unfiltered record count: 163065 Filtered record count: 564

join

join(paths1, paths2, frame2, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

別の DynamicFrame と等価結合を実行し、結果の DynamicFrame を返します。

  • paths1 - 結合するこのフレームのキーのリスト。

  • paths2 - 結合する別のフレームのキーのリスト。

  • frame2 - 結合する他の DynamicFrame

  • transformation_ctx - 状態情報を識別するために使用される一意の文字列 (オプション)。

  • info - この変換のエラー報告に関連付ける文字列 (オプション)。

  • stageThreshold - この変換中に発生した、プロセスがエラーを出力するエラーの数 (オプション)。デフォルトはゼロで、エラーを出力しないことを示します。

  • totalThreshold - この変換までに発生したエラーのうち、プロセスがエラーを出力するエラーの数 (オプション)。デフォルトはゼロで、エラーを出力しないことを示します。

例: 結合を使用して DynamicFrames を結合する

この例では、join メソッドを使用して 3 つの DynamicFrames で結合を実行します。AWSGlue は、入力されたフィールドキーに基づいて結合を実行します。結果として生じる DynamicFrame には、指定されたキーが一致する元の 2 つのフレームからの行が含まれます。

join 変換では、すべてのフィールドがそのまま保持されることに注意してください。これは、一致するように指定したフィールドが冗長で同じキーが含まれていても、結果として生じる DynamicFrame に表示されることを意味します。この例では、drop_fields を使用して、結合後にこれらの冗長なキーを削除します。

注記

この例で使用されているデータセットにアクセスするには、「コード例: データの結合と関係付け」を参照し、「ステップ 1: Amazon S3 バケット内のデータをクロールする」の手順に従います。

# Example: Use join to combine data from three DynamicFrames from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Load DynamicFrames from Glue Data Catalog persons = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="persons_json" ) memberships = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="memberships_json" ) orgs = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="organizations_json" ) print("Schema for the persons DynamicFrame:") persons.printSchema() print("Schema for the memberships DynamicFrame:") memberships.printSchema() print("Schema for the orgs DynamicFrame:") orgs.printSchema() # Join persons and memberships by ID persons_memberships = persons.join( paths1=["id"], paths2=["person_id"], frame2=memberships ) # Rename and drop fields from orgs # to prevent field name collisions with persons_memberships orgs = ( orgs.drop_fields(["other_names", "identifiers"]) .rename_field("id", "org_id") .rename_field("name", "org_name") ) # Create final join of all three DynamicFrames legislators_combined = orgs.join( paths1=["org_id"], paths2=["organization_id"], frame2=persons_memberships ).drop_fields(["person_id", "org_id"]) # Inspect the schema for the joined data print("Schema for the new legislators_combined DynamicFrame:") legislators_combined.printSchema()
Schema for the persons DynamicFrame: root |-- family_name: string |-- name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- gender: string |-- image: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- sort_name: string |-- images: array | |-- element: struct | | |-- url: string |-- given_name: string |-- birth_date: string |-- id: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- death_date: string Schema for the memberships DynamicFrame: root |-- area_id: string |-- on_behalf_of_id: string |-- organization_id: string |-- role: string |-- person_id: string |-- legislative_period_id: string |-- start_date: string |-- end_date: string Schema for the orgs DynamicFrame: root |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- id: string |-- classification: string |-- name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- image: string |-- seats: int |-- type: string Schema for the new legislators_combined DynamicFrame: root |-- role: string |-- seats: int |-- org_name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- type: string |-- sort_name: string |-- area_id: string |-- images: array | |-- element: struct | | |-- url: string |-- on_behalf_of_id: string |-- other_names: array | |-- element: struct | | |-- note: string | | |-- name: string | | |-- lang: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- name: string |-- birth_date: string |-- organization_id: string |-- gender: string |-- classification: string |-- legislative_period_id: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- image: string |-- given_name: string |-- start_date: string |-- family_name: string |-- id: string |-- death_date: string |-- end_date: string

マップ

map(f, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

指定したマッピング関数を元の DynamicFrame のすべてのレコードに適用した結果の新しい DynamicFrame を返します。

  • f - DynamicFrame 内のすべてのレコードに適用されるマッピング関数。この関数は、DynamicRecord を引数として取り、新しい DynamicRecord を返す必要があります (必須)。

    DynamicRecordDynamicFrame 内の論理レコードを表します。これは、自己記述型であり、固定スキーマに準拠しないデータに使用できる点を除いて、Apache Spark DataFrame の行に似ています。

  • transformation_ctx - 状態情報を識別するために使用される一意の文字列 (オプション)。

  • info - 変換のエラーに関連付けられた文字列 (オプション)。

  • stageThreshold – エラーを出力する前に、変換で発生する可能性のあるエラーの最大数 (オプション)。デフォルト値は 0 です。

  • totalThreshold – エラーの出力を処理する前に、全体で発生する可能性のあるエラーの最大数 (オプション)。デフォルト値は 0 です。

例: マッピングを使用して、DynamicFrame のすべてのレコードに関数を適用する

この例は、map メソッドを使用して、関数を DynamicFrame のすべてのレコードに適用する方法を示しています。具体的には、この例では、複数のアドレスフィールドを単一の struct タイプに結合するため、MergeAddress という名前の関数を各レコードに適用します。

注記

この例で使用されているデータセットにアクセスするには、「コード例: ResolveChoice、Lambda、および ApplyMapping を使用したデータ準備」を参照し、「ステップ 1: Amazon S3 バケット内のデータをクロールする」の手順に従います。

# Example: Use map to combine fields in all records # of a DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Create a DynamicFrame and view its schema medicare = glueContext.create_dynamic_frame.from_options( "s3", {"paths": ["s3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv"]}, "csv", {"withHeader": True}) print("Schema for medicare DynamicFrame:") medicare.printSchema() # Define a function to supply to the map transform # that merges address fields into a single field def MergeAddress(rec): rec["Address"] = {} rec["Address"]["Street"] = rec["Provider Street Address"] rec["Address"]["City"] = rec["Provider City"] rec["Address"]["State"] = rec["Provider State"] rec["Address"]["Zip.Code"] = rec["Provider Zip Code"] rec["Address"]["Array"] = [rec["Provider Street Address"], rec["Provider City"], rec["Provider State"], rec["Provider Zip Code"]] del rec["Provider Street Address"] del rec["Provider City"] del rec["Provider State"] del rec["Provider Zip Code"] return rec # Use map to apply MergeAddress to every record mapped_medicare = medicare.map(f = MergeAddress) print("Schema for mapped_medicare DynamicFrame:") mapped_medicare.printSchema()
Schema for medicare DynamicFrame: root |-- DRG Definition: string |-- Provider Id: string |-- Provider Name: string |-- Provider Street Address: string |-- Provider City: string |-- Provider State: string |-- Provider Zip Code: string |-- Hospital Referral Region Description: string |-- Total Discharges: string |-- Average Covered Charges: string |-- Average Total Payments: string |-- Average Medicare Payments: string Schema for mapped_medicare DynamicFrame: root |-- Average Total Payments: string |-- Average Covered Charges: string |-- DRG Definition: string |-- Average Medicare Payments: string |-- Hospital Referral Region Description: string |-- Address: struct | |-- Zip.Code: string | |-- City: string | |-- Array: array | | |-- element: string | |-- State: string | |-- Street: string |-- Provider Id: string |-- Total Discharges: string |-- Provider Name: string

mergeDynamicFrame

mergeDynamicFrame(stage_dynamic_frame, primary_keys, transformation_ctx = "", options = {}, info = "", stageThreshold = 0, totalThreshold = 0)

レコードを識別するために、この DynamicFrame を指定されたプライマリキーに基づくステージング DynamicFrame とマージします。重複レコード (同じプライマリキーを持つレコード) は重複除外されません。ステージングフレームに一致するレコードがない場合、すべてのレコード(重複を含む)がソースから保持されます。ステージングフレームに一致するレコードがある場合、ステージングフレームのレコードによって、AWS Glue のソースのレコードが上書きされます。

  • stage_dynamic_frame – マージするステージング DynamicFrame

  • primary_keys – ソースおよびステージング動的フレームからのレコードを照合する、プライマリキーフィールドのリスト。

  • transformation_ctx – 現在の変換に関するメタデータを取得するために使用される一意の文字列 (オプション)。

  • options – この変換に関する追加情報を提供する、JSON の名前と値のペアを示す文字列。この引数は現在使用されていません。

  • infoString。この変換でのエラーに関連付けられる任意の文字列。

  • stageThresholdLong。指定された変換で処理がエラーアウトする必要があるエラーの数。

  • totalThresholdLong。この変換までに発生したエラーのうち、処理でエラーを出力する必要があるエラーの合計数。

このメソッドは、この DynamicFrame をステージング DynamicFrame とマージして取得した新しい DynamicFrame を返します。

以下の場合、返される DynamicFrame にはレコード A が含まれます。

  • A がソースフレームとステージングフレームの両方に存在する場合、ステージングフレームの A が返されます。

  • A がソーステーブルに存在し、A.primaryKeysstagingDynamicFrame に存在しない場合、A はステージングテーブルで更新されていません。

ソースフレームとステージングフレームが、同じスキーマを持つ必要はありません。

例: mergeDynamicFrame を使用して、プライマリキーに基づいて 2 つの DynamicFrames をマージする

次のコード例は、mergeDynamicFrame メソッドを使用して、プライマリキー id に基づいて DynamicFrame を「ステージング」DynamicFrame とマージする方法を示しています。

データセットの例

この例では、split_rows_collection と呼ばれる DynamicFrameCollection からの 2 つの DynamicFrames を使用しています。次のリストに示しているのは、split_rows_collection のキーです。

dict_keys(['high', 'low'])

コードの例

# Example: Use mergeDynamicFrame to merge DynamicFrames # based on a set of specified primary keys from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.transforms import SelectFromCollection # Inspect the original DynamicFrames frame_low = SelectFromCollection.apply(dfc=split_rows_collection, key="low") print("Inspect the DynamicFrame that contains rows where ID < 10") frame_low.toDF().show() frame_high = SelectFromCollection.apply(dfc=split_rows_collection, key="high") print("Inspect the DynamicFrame that contains rows where ID > 10") frame_high.toDF().show() # Merge the DynamicFrames based on the "id" primary key merged_high_low = frame_high.mergeDynamicFrame( stage_dynamic_frame=frame_low, primary_keys=["id"] ) # View the results where the ID is 1 or 20 print("Inspect the merged DynamicFrame that contains the combined rows") merged_high_low.toDF().where("id = 1 or id= 20").orderBy("id").show()
Inspect the DynamicFrame that contains rows where ID < 10 +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 1| 0| fax| 202-225-3307| | 1| 1| phone| 202-225-5731| | 2| 0| fax| 202-225-3307| | 2| 1| phone| 202-225-5731| | 3| 0| fax| 202-225-3307| | 3| 1| phone| 202-225-5731| | 4| 0| fax| 202-225-3307| | 4| 1| phone| 202-225-5731| | 5| 0| fax| 202-225-3307| | 5| 1| phone| 202-225-5731| | 6| 0| fax| 202-225-3307| | 6| 1| phone| 202-225-5731| | 7| 0| fax| 202-225-3307| | 7| 1| phone| 202-225-5731| | 8| 0| fax| 202-225-3307| | 8| 1| phone| 202-225-5731| | 9| 0| fax| 202-225-3307| | 9| 1| phone| 202-225-5731| | 10| 0| fax| 202-225-6328| | 10| 1| phone| 202-225-4576| +---+-----+------------------------+-------------------------+ only showing top 20 rows Inspect the DynamicFrame that contains rows where ID > 10 +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 11| 0| fax| 202-225-6328| | 11| 1| phone| 202-225-4576| | 11| 2| twitter| RepTrentFranks| | 12| 0| fax| 202-225-6328| | 12| 1| phone| 202-225-4576| | 12| 2| twitter| RepTrentFranks| | 13| 0| fax| 202-225-6328| | 13| 1| phone| 202-225-4576| | 13| 2| twitter| RepTrentFranks| | 14| 0| fax| 202-225-6328| | 14| 1| phone| 202-225-4576| | 14| 2| twitter| RepTrentFranks| | 15| 0| fax| 202-225-6328| | 15| 1| phone| 202-225-4576| | 15| 2| twitter| RepTrentFranks| | 16| 0| fax| 202-225-6328| | 16| 1| phone| 202-225-4576| | 16| 2| twitter| RepTrentFranks| | 17| 0| fax| 202-225-6328| | 17| 1| phone| 202-225-4576| +---+-----+------------------------+-------------------------+ only showing top 20 rows Inspect the merged DynamicFrame that contains the combined rows +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 1| 0| fax| 202-225-3307| | 1| 1| phone| 202-225-5731| | 20| 0| fax| 202-225-5604| | 20| 1| phone| 202-225-6536| | 20| 2| twitter| USRepLong| +---+-----+------------------------+-------------------------+

関係付け

relationalize(root_table_name, staging_path, options, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

DynamicFrame をリレーショナルデータベースに適合する形式に変換します。DynamicFrame の関係付けは、DynamoDB などの NoSQL 環境から MySQL などのリレーショナルデータベースにデータを移動する場合に特に便利です。

この変換は、ネストされた列をネスト解除し、配列の列をピボットすることでフレームのリストを生成します。フェーズのネスト解除時に生成された結合キーを使用して、ピボットされた配列の列をルートテーブルに結合できます。

  • root_table_name - ルートテーブルの名前。

  • staging_path – このメソッドがピボットテーブルのパーティションを CSV 形式で保存する保存先のパス (オプション)。ピボットされたテーブルはこのパスから読み取ります。

  • options - オプションのパラメータのディクショナリ。

  • transformation_ctx - 状態情報を識別するために使用される一意の文字列 (オプション)。

  • info - この変換のエラー報告に関連付ける文字列 (オプション)。

  • stageThreshold - この変換中に発生した、プロセスがエラーを出力するエラーの数 (オプション)。デフォルトはゼロで、エラーを出力しないことを示します。

  • totalThreshold - この変換までに発生したエラーのうち、プロセスがエラーを出力するエラーの数 (オプション)。デフォルトはゼロで、エラーを出力しないことを示します。

例: relationalize を使用して、DynamicFrame のネストされたスキーマをフラット化する

このコード例では、relationalize メソッドを使用して、ネストされたスキーマをリレーショナルデータベースに適合する形式にフラット化します。

データセットの例

この例では、次のスキーマを持つ legislators_combined と呼ばれる DynamicFrame を使用しています。legislators_combined には、relationalize 変換によってフラット化される、linksimagescontact_details などのネストされた複数のフィールドがあります。

root |-- role: string |-- seats: int |-- org_name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- type: string |-- sort_name: string |-- area_id: string |-- images: array | |-- element: struct | | |-- url: string |-- on_behalf_of_id: string |-- other_names: array | |-- element: struct | | |-- note: string | | |-- name: string | | |-- lang: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- name: string |-- birth_date: string |-- organization_id: string |-- gender: string |-- classification: string |-- legislative_period_id: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- image: string |-- given_name: string |-- start_date: string |-- family_name: string |-- id: string |-- death_date: string |-- end_date: string

コードの例

# Example: Use relationalize to flatten # a nested schema into a format that fits # into a relational database. # Replace DOC-EXAMPLE-S3-BUCKET/tmpDir with your own location. from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Apply relationalize and inspect new tables legislators_relationalized = legislators_combined.relationalize( "l_root", "s3://DOC-EXAMPLE-BUCKET/tmpDir" ) legislators_relationalized.keys() # Compare the schema of the contact_details # nested field to the new relationalized table that # represents it legislators_combined.select_fields("contact_details").printSchema() legislators_relationalized.select("l_root_contact_details").toDF().where( "id = 10 or id = 75" ).orderBy(["id", "index"]).show()

次の出力では、contact_details と呼ばれるネストされたフィールドのスキーマを、relationalize 変換によって作成されたテーブルと比較できます。テーブルのレコードが id と呼ばれる外部キーと配列の位置を表す index 列を使用してメインテーブルにリンクされていることに注意してください。

dict_keys(['l_root', 'l_root_images', 'l_root_links', 'l_root_other_names', 'l_root_contact_details', 'l_root_identifiers']) root |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 10| 0| fax| 202-225-4160| | 10| 1| phone| 202-225-3436| | 75| 0| fax| 202-225-6791| | 75| 1| phone| 202-225-2861| | 75| 2| twitter| RepSamFarr| +---+-----+------------------------+-------------------------+

rename_field

rename_field(oldName, newName, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

この DynamicFrame のフィールドの名前を変更し、フィールドの名前が変更された新しい DynamicFrame を返します。

  • oldName - 名前を変更するノードへのフルパス。

    古い名前にドットが含まれている場合、RenameField はバックティック (`) で囲まなければ機能しません。例えば、this.old.namethisNewName に置き換えるには、rename_field を次のように呼び出します。

    newDyF = oldDyF.rename_field("`this.old.name`", "thisNewName")
  • newName - 完全パスとしての新しい名前。

  • transformation_ctx - 状態情報を識別するために使用される一意の文字列 (オプション)。

  • info - この変換のエラー報告に関連付ける文字列 (オプション)。

  • stageThreshold - この変換中に発生した、プロセスがエラーを出力するエラーの数 (オプション)。デフォルトはゼロで、エラーを出力しないことを示します。

  • totalThreshold - この変換までに発生したエラーのうち、プロセスがエラーを出力するエラーの数 (オプション)。デフォルトはゼロで、エラーを出力しないことを示します。

例: rename_field を使用して、DynamicFrame のフィールドの名前を変更する

このコード例では、rename_field メソッドを使用して DynamicFrame のフィールドの名前を変更します。この例では、メソッドの連鎖を使用して、複数のフィールドの名前を同時に変更していることに注意してください。

注記

この例で使用されているデータセットにアクセスするには、「コード例: データの結合と関係付け」を参照し、「ステップ 1: Amazon S3 バケット内のデータをクロールする」の手順に従います。

コードの例

# Example: Use rename_field to rename fields # in a DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Inspect the original orgs schema orgs = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="organizations_json" ) print("Original orgs schema: ") orgs.printSchema() # Rename fields and view the new schema orgs = orgs.rename_field("id", "org_id").rename_field("name", "org_name") print("New orgs schema with renamed fields: ") orgs.printSchema()
Original orgs schema: root |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- id: string |-- classification: string |-- name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- image: string |-- seats: int |-- type: string New orgs schema with renamed fields: root |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- classification: string |-- org_id: string |-- org_name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- image: string |-- seats: int |-- type: string

resolveChoice

resolveChoice(specs = None, choice = "" , database = None , table_name = None , transformation_ctx="", info="", stageThreshold=0, totalThreshold=0, catalog_id = None)

この DynamicFrame 内で選択タイプを解決し、新しい DynamicFrame を返します。

  • specs – それぞれがタプルの形式である、解決すべき特定の曖昧要素のリスト: (field_path, action)

    resolveChoice を使用するには 2 つの方法があります。最初の方法では、specs 引数により、特定のフィールドのシーケンスと解決方法を指定します。resolveChoice のもう 1 つのモードでは、choice 引数を使用して、すべての ChoiceTypes に対して単一の解決策を指定します。

    specs の値は、(field_path, action) ペアで構成されたタプルとして指定されます。field_path 値は特定のあいまいな要素を識別し、action 値は対応する解決を識別します。以下のアクションを指定できます。

    • cast:type – すべての値について、指定した型へのキャストを試みます。例: cast:int

    • make_cols – それぞれの異なるタイプを columnName_type という名前の列に変換します。データをフラット化することで潜在的なあいまいさを解消します。例えば、columnAint または string の場合、解決策は、作成された DynamicFramecolumnA_int および columnA_string という名前の 2 つの列を生成することです。

    • make_structstruct を使用してデータを表現することで、潜在的なあいまいさを解決します。例えば、列のデータが int または string となる可能性がある場合、make_struct アクションを使用すると、作成された DynamicFrame に構造体の列が生成されます。各構造体には、intstring の両方が含まれています。

    • project:type – 可能なデータ型の 1 つにすべてのデータを投影することで、潜在的なあいまいさを解消します。例えば、列のデータが int または string の場合、project:string アクションを使用すると、すべての int 値が文字列に変換されている、作成された DynamicFrame に列が生成されます。

    field_path で配列を識別する場合は、あいまいさを避けるために配列名の後に空の角括弧を置きます。例えば、使用しているデータが次のように構造化されているとします。

    "myList": [ { "price": 100.00 }, { "price": "$100.00" } ]

    field_path"myList[].price" に設定し、action"cast:double" に設定すると、文字列バージョンではなく、数値バージョンの料金を選択できます。

    注記

    specs パラメータおよび choice パラメータのうち 1 つのみを使用できます。specs パラメータが None ではない場合、choice パラメータは空の文字列である必要があります。逆に、choice が空の文字列ではない場合、specs パラメータは None である必要があります。

  • choice – すべての ChoiceTypes について、単一の解決方法を指定します。このモードは、ランタイム前に ChoiceTypes の完全なリストが不明な場合に使用できます。この引数では、上記の specs 用のアクションに加えて、以下のアクションもサポートされています。

    • match_catalog – 各 ChoiceType について、指定した Data Catalog テーブル内の対応する型へのキャストを試みます。

  • databasematch_catalog アクションで使用する Data Catalog データベース。

  • table_namematch_catalog アクションで使用する Data Catalog テーブル。

  • transformation_ctx - 状態情報を識別するために使用される一意の文字列 (オプション)。

  • info - この変換のエラー報告に関連付ける文字列 (オプション)。

  • stageThreshold - この変換中に発生した、プロセスがエラーを出力するエラーの数 (オプション)。デフォルトはゼロで、エラーを出力しないことを示します。

  • totalThreshold – この変換までに発生したエラーのうち、プロセスでエラーを出力する必要があるエラーの数 (オプション)。デフォルトはゼロで、プロセスでエラーを出力しないことを示します。

  • catalog_id – 現在アクセスされている Data Catalog のカタログ ID (Data Catalog のアカウント ID)。None を設定した場合(デフォルト値) では、呼び出し元アカウントのカタログ ID が使用されます。

例: resolveChoice を使用して、複数の型を含む列を処理する

このコード例では、resolveChoice メソッドを使用して、複数の型の値を含む DynamicFrame 列の処理方法を指定します。この例は、型の異なる列を処理する一般的な 2 つの方法を示しています。

  • 列を単一のデータ型にキャストします。

  • すべての型を別々の列に保持します。

データセットの例

注記

この例で使用されているデータセットにアクセスするには、「コード例: ResolveChoice、Lambda、および ApplyMapping を使用したデータ準備」を参照し、「ステップ 1: Amazon S3 バケット内のデータをクロールする」の手順に従います。

この例では、次のスキーマを持つ medicare と呼ばれる DynamicFrame を使用しています。

root |-- drg definition: string |-- provider id: choice | |-- long | |-- string |-- provider name: string |-- provider street address: string |-- provider city: string |-- provider state: string |-- provider zip code: long |-- hospital referral region description: string |-- total discharges: long |-- average covered charges: string |-- average total payments: string |-- average medicare payments: string

コードの例

# Example: Use resolveChoice to handle # a column that contains multiple types from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Load the input data and inspect the "provider id" column medicare = glueContext.create_dynamic_frame.from_catalog( database="payments", table_name="medicare_hospital_provider_csv" ) print("Inspect the provider id column:") medicare.toDF().select("provider id").show() # Cast provider id to type long medicare_resolved_long = medicare.resolveChoice(specs=[("provider id", "cast:long")]) print("Schema after casting provider id to type long:") medicare_resolved_long.printSchema() medicare_resolved_long.toDF().select("provider id").show() # Create separate columns # for each provider id type medicare_resolved_cols = medicare.resolveChoice(choice="make_cols") print("Schema after creating separate columns for each type:") medicare_resolved_cols.printSchema() medicare_resolved_cols.toDF().select("provider id_long", "provider id_string").show()
Inspect the 'provider id' column: +-----------+ |provider id| +-----------+ | [10001,]| | [10005,]| | [10006,]| | [10011,]| | [10016,]| | [10023,]| | [10029,]| | [10033,]| | [10039,]| | [10040,]| | [10046,]| | [10055,]| | [10056,]| | [10078,]| | [10083,]| | [10085,]| | [10090,]| | [10092,]| | [10100,]| | [10103,]| +-----------+ only showing top 20 rows Schema after casting 'provider id' to type long: root |-- drg definition: string |-- provider id: long |-- provider name: string |-- provider street address: string |-- provider city: string |-- provider state: string |-- provider zip code: long |-- hospital referral region description: string |-- total discharges: long |-- average covered charges: string |-- average total payments: string |-- average medicare payments: string +-----------+ |provider id| +-----------+ | 10001| | 10005| | 10006| | 10011| | 10016| | 10023| | 10029| | 10033| | 10039| | 10040| | 10046| | 10055| | 10056| | 10078| | 10083| | 10085| | 10090| | 10092| | 10100| | 10103| +-----------+ only showing top 20 rows Schema after creating separate columns for each type: root |-- drg definition: string |-- provider id_string: string |-- provider id_long: long |-- provider name: string |-- provider street address: string |-- provider city: string |-- provider state: string |-- provider zip code: long |-- hospital referral region description: string |-- total discharges: long |-- average covered charges: string |-- average total payments: string |-- average medicare payments: string +----------------+------------------+ |provider id_long|provider id_string| +----------------+------------------+ | 10001| null| | 10005| null| | 10006| null| | 10011| null| | 10016| null| | 10023| null| | 10029| null| | 10033| null| | 10039| null| | 10040| null| | 10046| null| | 10055| null| | 10056| null| | 10078| null| | 10083| null| | 10085| null| | 10090| null| | 10092| null| | 10100| null| | 10103| null| +----------------+------------------+ only showing top 20 rows

select_fields

select_fields(paths, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

選択したフィールドを含む新しい DynamicFrame を返します。

  • paths - 文字列のリスト。各文字列は、選択する最上位ノードへのパスです。

  • transformation_ctx - 状態情報を識別するために使用される一意の文字列 (オプション)。

  • info - この変換のエラー報告に関連付ける文字列 (オプション)。

  • stageThreshold - この変換中に発生した、プロセスがエラーを出力するエラーの数 (オプション)。デフォルトはゼロで、エラーを出力しないことを示します。

  • totalThreshold - この変換までに発生したエラーのうち、プロセスがエラーを出力するエラーの数 (オプション)。デフォルトはゼロで、エラーを出力しないことを示します。

例: select_fields を使用して、選択したフィールドで新しい DynamicFrame を作成する

次のコード例は、select_fields メソッドを使用して、既存の DynamicFrame から選択されたフィールドのリストを使用し、新しい DynamicFrame を作成する方法を示しています。

注記

この例で使用されているデータセットにアクセスするには、「コード例: データの結合と関係付け」を参照し、「ステップ 1: Amazon S3 バケット内のデータをクロールする」の手順に従います。

# Example: Use select_fields to select specific fields from a DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Create a DynamicFrame and view its schema persons = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="persons_json" ) print("Schema for the persons DynamicFrame:") persons.printSchema() # Create a new DynamicFrame with chosen fields names = persons.select_fields(paths=["family_name", "given_name"]) print("Schema for the names DynamicFrame, created with select_fields:") names.printSchema() names.toDF().show()
Schema for the persons DynamicFrame: root |-- family_name: string |-- name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- gender: string |-- image: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- sort_name: string |-- images: array | |-- element: struct | | |-- url: string |-- given_name: string |-- birth_date: string |-- id: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- death_date: string Schema for the names DynamicFrame: root |-- family_name: string |-- given_name: string +-----------+----------+ |family_name|given_name| +-----------+----------+ | Collins| Michael| | Huizenga| Bill| | Clawson| Curtis| | Solomon| Gerald| | Rigell| Edward| | Crapo| Michael| | Hutto| Earl| | Ertel| Allen| | Minish| Joseph| | Andrews| Robert| | Walden| Greg| | Kazen| Abraham| | Turner| Michael| | Kolbe| James| | Lowenthal| Alan| | Capuano| Michael| | Schrader| Kurt| | Nadler| Jerrold| | Graves| Tom| | McMillan| John| +-----------+----------+ only showing top 20 rows

simplify_ddb_json

simplify_ddb_json(): DynamicFrame

特に DynamoDB JSON 構造にある DynamicFrame でネスト化された列を単純化し、新しい単純化された DynamicFrame を返します。リストタイプに複数タイプまたは Map タイプがある場合、リストの要素は単純化されません。これは通常の unnest 変換とは異なって動作する変換の特定タイプであり、データが既に DynamoDB JSON 構造に格納されている必要があることに注意してください。詳細については、「DynamoDB JSON」を参照してください。

例えば、DynamoDB JSON 構造体のあるエクスポートを読み取るスキーマは次のようになります。

root |-- Item: struct | |-- parentMap: struct | | |-- M: struct | | | |-- childMap: struct | | | | |-- M: struct | | | | | |-- appName: struct | | | | | | |-- S: string | | | | | |-- packageName: struct | | | | | | |-- S: string | | | | | |-- updatedAt: struct | | | | | | |-- N: string | |-- strings: struct | | |-- SS: array | | | |-- element: string | |-- numbers: struct | | |-- NS: array | | | |-- element: string | |-- binaries: struct | | |-- BS: array | | | |-- element: string | |-- isDDBJson: struct | | |-- BOOL: boolean | |-- nullValue: struct | | |-- NULL: boolean

simplify_ddb_json() 変換は、以下のように変換します。

root |-- parentMap: struct | |-- childMap: struct | | |-- appName: string | | |-- packageName: string | | |-- updatedAt: string |-- strings: array | |-- element: string |-- numbers: array | |-- element: string |-- binaries: array | |-- element: string |-- isDDBJson: boolean |-- nullValue: null

例: simplify_ddb_json を使用して DynamoDB JSON の単純化を呼び出す

このコード例では simplify_ddb_json メソッドを使用し、AWS Glue DynamoDB エクスポートコネクタを使用し、DynamoDB JSON 単純化を呼び出してパーティションの数を表示します。

コードの例

from pyspark.context import SparkContext from awsglue.context import GlueContext sc = SparkContext() glueContext = GlueContext(sc) dynamicFrame = glueContext.create_dynamic_frame.from_options( connection_type = "dynamodb", connection_options = { 'dynamodb.export': 'ddb', 'dynamodb.tableArn': '<table arn>', 'dynamodb.s3.bucket': '<bucket name>', 'dynamodb.s3.prefix': '<bucket prefix>', 'dynamodb.s3.bucketOwner': '<account_id of bucket>' } ) simplified = dynamicFrame.simplify_ddb_json() print(simplified.getNumPartitions())

スピゴット

spigot(path, options={})

ジョブで実行された変換が確認しやすくなるように、サンプルレコードを指定した送信先に書き込みます。

  • path – 書き込み先へのパス (必須)。

  • options – オプションを指定するキーと値のペア (オプション)。"topk" オプションは、最初の k レコードを書き込むことを指定します。"prob" オプションは、指定されたレコードを選択する確率(10 進数)を指定します。これを使用して、書き込むレコードを選択できます。

  • transformation_ctx - 状態情報を識別するために使用される一意の文字列 (オプション)。

例: spigot を使用して DynamicFrame のサンプルフィールドを Amazon S3 に書き込む

このコード例では、spigot メソッドを使用して、select_fields 変換を適用した後に Amazon S3 バケットにサンプルレコードを書き込みます。

データセットの例

注記

この例で使用されているデータセットにアクセスするには、「コード例: データの結合と関係付け」を参照し、「ステップ 1: Amazon S3 バケット内のデータをクロールする」の手順に従います。

この例では、次のスキーマを持つ persons と呼ばれる DynamicFrame を使用しています。

root |-- family_name: string |-- name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- gender: string |-- image: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- sort_name: string |-- images: array | |-- element: struct | | |-- url: string |-- given_name: string |-- birth_date: string |-- id: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- death_date: string

コードの例

# Example: Use spigot to write sample records # to a destination during a transformation # from pyspark.context import SparkContext. # Replace DOC-EXAMPLE-BUCKET with your own location. from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Load table data into a DynamicFrame persons = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="persons_json" ) # Perform the select_fields on the DynamicFrame persons = persons.select_fields(paths=["family_name", "given_name", "birth_date"]) # Use spigot to write a sample of the transformed data # (the first 10 records) spigot_output = persons.spigot( path="s3://DOC-EXAMPLE-BUCKET", options={"topk": 10} )

spigot で Amazon S3 に書き込むデータの例を次に示します。サンプルコードで options={"topk": 10} を指定したため、サンプルデータには最初の 10 個のレコードが含まれます。

{"family_name":"Collins","given_name":"Michael","birth_date":"1944-10-15"} {"family_name":"Huizenga","given_name":"Bill","birth_date":"1969-01-31"} {"family_name":"Clawson","given_name":"Curtis","birth_date":"1959-09-28"} {"family_name":"Solomon","given_name":"Gerald","birth_date":"1930-08-14"} {"family_name":"Rigell","given_name":"Edward","birth_date":"1960-05-28"} {"family_name":"Crapo","given_name":"Michael","birth_date":"1951-05-20"} {"family_name":"Hutto","given_name":"Earl","birth_date":"1926-05-12"} {"family_name":"Ertel","given_name":"Allen","birth_date":"1937-11-07"} {"family_name":"Minish","given_name":"Joseph","birth_date":"1916-09-01"} {"family_name":"Andrews","given_name":"Robert","birth_date":"1957-08-04"}

split_fields

split_fields(paths, name1, name2, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

2 つの DynamicFrames を含む新しい DynamicFrameCollection を返します。1 つ目の DynamicFrame には分割されたすべてのノードが含まれ、2 つ目には残りのノードが含まれています。

  • paths - 文字列のリスト。各文字列は新しい DynamicFrame に分割するノードのフルパスです。

  • name1 - 分割された DynamicFrame の名前文字列。

  • name2 - 指定されたノードが分割された後に残る DynamicFrame の名前文字列。

  • transformation_ctx - 状態情報を識別するために使用される一意の文字列 (オプション)。

  • info - この変換のエラー報告に関連付ける文字列 (オプション)。

  • stageThreshold - この変換中に発生した、プロセスがエラーを出力するエラーの数 (オプション)。デフォルトはゼロで、エラーを出力しないことを示します。

  • totalThreshold - この変換までに発生したエラーのうち、プロセスがエラーを出力するエラーの数 (オプション)。デフォルトはゼロで、エラーを出力しないことを示します。

例: split_fields を使用して、選択したフィールドを別の DynamicFrame に分割する

このコード例では、split_fields メソッドを使用して、指定したフィールドのリストを別の DynamicFrame に分割します。

データセットの例

この例では、legislators_relationalized という名前のコレクションからの l_root_contact_details と呼ばれる DynamicFrame が使用されています。

l_root_contact_details には、次のスキーマとエントリが含まれています。

root |-- id: long |-- index: int |-- contact_details.val.type: string |-- contact_details.val.value: string +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 1| 0| phone| 202-225-5265| | 1| 1| twitter| kathyhochul| | 2| 0| phone| 202-225-3252| | 2| 1| twitter| repjackyrosen| | 3| 0| fax| 202-225-1314| | 3| 1| phone| 202-225-3772| ...

コードの例

# Example: Use split_fields to split selected # fields into a separate DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Load the input DynamicFrame and inspect its schema frame_to_split = legislators_relationalized.select("l_root_contact_details") print("Inspect the input DynamicFrame schema:") frame_to_split.printSchema() # Split id and index fields into a separate DynamicFrame split_fields_collection = frame_to_split.split_fields(["id", "index"], "left", "right") # Inspect the resulting DynamicFrames print("Inspect the schemas of the DynamicFrames created with split_fields:") split_fields_collection.select("left").printSchema() split_fields_collection.select("right").printSchema()
Inspect the input DynamicFrame's schema: root |-- id: long |-- index: int |-- contact_details.val.type: string |-- contact_details.val.value: string Inspect the schemas of the DynamicFrames created with split_fields: root |-- id: long |-- index: int root |-- contact_details.val.type: string |-- contact_details.val.value: string

split_rows

split_rows(comparison_dict, name1, name2, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

DynamicFrame の 1 つ以上の行を、新しい DynamicFrame に分割します。

このメソッドは、2 つの DynamicFrames を含む新しい DynamicFrameCollection を返します。1 つ目の DynamicFrame には分割されたすべての行が含まれ、2 つ目には残りの行が含まれています。

  • comparison_dict – 列へのパスを示すためのキーと、列の値の比較対象である値に対しコンパレータをマッピングする別のディクショナリを示すための値を持つ、ディクショナリ。例えば、{"age": {">": 10, "<": 20}} は、age 列の値が 10 より大きく 20 より小さいすべての行を分割します。

  • name1 - 分割された DynamicFrame の名前文字列。

  • name2 - 指定されたノードが分割された後に残る DynamicFrame の名前文字列。

  • transformation_ctx - 状態情報を識別するために使用される一意の文字列 (オプション)。

  • info - この変換のエラー報告に関連付ける文字列 (オプション)。

  • stageThreshold - この変換中に発生した、プロセスがエラーを出力するエラーの数 (オプション)。デフォルトはゼロで、エラーを出力しないことを示します。

  • totalThreshold - この変換までに発生したエラーのうち、プロセスがエラーを出力するエラーの数 (オプション)。デフォルトはゼロで、エラーを出力しないことを示します。

例: split_rows を使用して、DynamicFrame 内の行を分割する

このコード例では、split_rows メソッドを使用して、id フィールド値に基づいて DynamicFrame の行を分割します。

データセットの例

この例では、legislators_relationalized という名前のコレクションから選択された l_root_contact_details と呼ばれる DynamicFrame が使用されています。

l_root_contact_details には、次のスキーマとエントリが含まれています。

root |-- id: long |-- index: int |-- contact_details.val.type: string |-- contact_details.val.value: string +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 1| 0| phone| 202-225-5265| | 1| 1| twitter| kathyhochul| | 2| 0| phone| 202-225-3252| | 2| 1| twitter| repjackyrosen| | 3| 0| fax| 202-225-1314| | 3| 1| phone| 202-225-3772| | 3| 2| twitter| MikeRossUpdates| | 4| 0| fax| 202-225-1314| | 4| 1| phone| 202-225-3772| | 4| 2| twitter| MikeRossUpdates| | 5| 0| fax| 202-225-1314| | 5| 1| phone| 202-225-3772| | 5| 2| twitter| MikeRossUpdates| | 6| 0| fax| 202-225-1314| | 6| 1| phone| 202-225-3772| | 6| 2| twitter| MikeRossUpdates| | 7| 0| fax| 202-225-1314| | 7| 1| phone| 202-225-3772| | 7| 2| twitter| MikeRossUpdates| | 8| 0| fax| 202-225-1314| +---+-----+------------------------+-------------------------+

コードの例

# Example: Use split_rows to split up # rows in a DynamicFrame based on value from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Retrieve the DynamicFrame to split frame_to_split = legislators_relationalized.select("l_root_contact_details") # Split up rows by ID split_rows_collection = frame_to_split.split_rows({"id": {">": 10}}, "high", "low") # Inspect the resulting DynamicFrames print("Inspect the DynamicFrame that contains IDs < 10") split_rows_collection.select("low").toDF().show() print("Inspect the DynamicFrame that contains IDs > 10") split_rows_collection.select("high").toDF().show()
Inspect the DynamicFrame that contains IDs < 10 +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 1| 0| phone| 202-225-5265| | 1| 1| twitter| kathyhochul| | 2| 0| phone| 202-225-3252| | 2| 1| twitter| repjackyrosen| | 3| 0| fax| 202-225-1314| | 3| 1| phone| 202-225-3772| | 3| 2| twitter| MikeRossUpdates| | 4| 0| fax| 202-225-1314| | 4| 1| phone| 202-225-3772| | 4| 2| twitter| MikeRossUpdates| | 5| 0| fax| 202-225-1314| | 5| 1| phone| 202-225-3772| | 5| 2| twitter| MikeRossUpdates| | 6| 0| fax| 202-225-1314| | 6| 1| phone| 202-225-3772| | 6| 2| twitter| MikeRossUpdates| | 7| 0| fax| 202-225-1314| | 7| 1| phone| 202-225-3772| | 7| 2| twitter| MikeRossUpdates| | 8| 0| fax| 202-225-1314| +---+-----+------------------------+-------------------------+ only showing top 20 rows Inspect the DynamicFrame that contains IDs > 10 +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 11| 0| phone| 202-225-5476| | 11| 1| twitter| RepDavidYoung| | 12| 0| phone| 202-225-4035| | 12| 1| twitter| RepStephMurphy| | 13| 0| fax| 202-226-0774| | 13| 1| phone| 202-225-6335| | 14| 0| fax| 202-226-0774| | 14| 1| phone| 202-225-6335| | 15| 0| fax| 202-226-0774| | 15| 1| phone| 202-225-6335| | 16| 0| fax| 202-226-0774| | 16| 1| phone| 202-225-6335| | 17| 0| fax| 202-226-0774| | 17| 1| phone| 202-225-6335| | 18| 0| fax| 202-226-0774| | 18| 1| phone| 202-225-6335| | 19| 0| fax| 202-226-0774| | 19| 1| phone| 202-225-6335| | 20| 0| fax| 202-226-0774| | 20| 1| phone| 202-225-6335| +---+-----+------------------------+-------------------------+ only showing top 20 rows

アンボックス

unbox(path, format, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0, **options)

DynamicFrame の文字列フィールドをアンボックス (再フォーマット) し、アンボックスされた DynamicRecords を含む新しい DynamicFrame を返します。

DynamicRecordDynamicFrame 内の論理レコードを表します。これは、自己記述型であり、固定スキーマに準拠しないデータに使用できる点を除いて、Apache Spark DataFrame の行に似ています。

  • path - アンボックスする文字列ノードへのフルパス。

  • format – 形式の仕様 (オプション)。Amazon S3 や、複数の形式をサポートする AWS Glue 接続の場合に使用します。サポートされる形式については、「AWS Glue for Spark での入出力のデータ形式に関するオプション」を参照してください。

  • transformation_ctx - 状態情報を識別するために使用される一意の文字列 (オプション)。

  • info - この変換のエラー報告に関連付ける文字列 (オプション)。

  • stageThreshold - この変換中に発生した、プロセスがエラーを出力するエラーの数 (オプション)。デフォルトはゼロで、エラーを出力しないことを示します。

  • totalThreshold - この変換までに発生したエラーのうち、プロセスがエラーを出力するエラーの数 (オプション)。デフォルトはゼロで、エラーを出力しないことを示します。

  • options - 次の 1 つ以上。

    • separator – 区切り文字を含む文字列。

    • escaper – エスケープ文字を含む文字列。

    • skipFirst – 最初のインスタンスをスキップするかどうかを示すブール値。

    • withSchema — ノードのスキーマの JSON 表現を含む文字列。スキーマの JSON 表現の形式は、StructType.json() の出力によって定義されます。

    • withHeader – ヘッダーが含まれているかどうかを示すブール値。

例: unbox を使用して、文字列フィールドを構造体にアンボックスする

このコード例では、unbox メソッドを使用して、DynamicFrame の文字列フィールドを struct 型のフィールドにアンボックスまたは再フォーマットします。

データセットの例

この例では、次のスキーマとエントリを持つ mapped_with_string と呼ばれる DynamicFrame を使用しています。

AddressString という名前のフィールドに注目してください。この例では、このフィールドを構造体にアンボックスしています。

root |-- Average Total Payments: string |-- AddressString: string |-- Average Covered Charges: string |-- DRG Definition: string |-- Average Medicare Payments: string |-- Hospital Referral Region Description: string |-- Address: struct | |-- Zip.Code: string | |-- City: string | |-- Array: array | | |-- element: string | |-- State: string | |-- Street: string |-- Provider Id: string |-- Total Discharges: string |-- Provider Name: string +----------------------+--------------------+-----------------------+--------------------+-------------------------+------------------------------------+--------------------+-----------+----------------+--------------------+ |Average Total Payments| AddressString|Average Covered Charges| DRG Definition|Average Medicare Payments|Hospital Referral Region Description| Address|Provider Id|Total Discharges| Provider Name| +----------------------+--------------------+-----------------------+--------------------+-------------------------+------------------------------------+--------------------+-----------+----------------+--------------------+ | $5777.24|{"Street": "1108 ...| $32963.07|039 - EXTRACRANIA...| $4763.73| AL - Dothan|[36301, DOTHAN, [...| 10001| 91|SOUTHEAST ALABAMA...| | $5787.57|{"Street": "2505 ...| $15131.85|039 - EXTRACRANIA...| $4976.71| AL - Birmingham|[35957, BOAZ, [25...| 10005| 14|MARSHALL MEDICAL ...| | $5434.95|{"Street": "205 M...| $37560.37|039 - EXTRACRANIA...| $4453.79| AL - Birmingham|[35631, FLORENCE,...| 10006| 24|ELIZA COFFEE MEMO...| | $5417.56|{"Street": "50 ME...| $13998.28|039 - EXTRACRANIA...| $4129.16| AL - Birmingham|[35235, BIRMINGHA...| 10011| 25| ST VINCENT'S EAST| ...

コードの例

# Example: Use unbox to unbox a string field # into a struct in a DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) unboxed = mapped_with_string.unbox("AddressString", "json") unboxed.printSchema() unboxed.toDF().show()
root |-- Average Total Payments: string |-- AddressString: struct | |-- Street: string | |-- City: string | |-- State: string | |-- Zip.Code: string | |-- Array: array | | |-- element: string |-- Average Covered Charges: string |-- DRG Definition: string |-- Average Medicare Payments: string |-- Hospital Referral Region Description: string |-- Address: struct | |-- Zip.Code: string | |-- City: string | |-- Array: array | | |-- element: string | |-- State: string | |-- Street: string |-- Provider Id: string |-- Total Discharges: string |-- Provider Name: string +----------------------+--------------------+-----------------------+--------------------+-------------------------+------------------------------------+--------------------+-----------+----------------+--------------------+ |Average Total Payments| AddressString|Average Covered Charges| DRG Definition|Average Medicare Payments|Hospital Referral Region Description| Address|Provider Id|Total Discharges| Provider Name| +----------------------+--------------------+-----------------------+--------------------+-------------------------+------------------------------------+--------------------+-----------+----------------+--------------------+ | $5777.24|[1108 ROSS CLARK ...| $32963.07|039 - EXTRACRANIA...| $4763.73| AL - Dothan|[36301, DOTHAN, [...| 10001| 91|SOUTHEAST ALABAMA...| | $5787.57|[2505 U S HIGHWAY...| $15131.85|039 - EXTRACRANIA...| $4976.71| AL - Birmingham|[35957, BOAZ, [25...| 10005| 14|MARSHALL MEDICAL ...| | $5434.95|[205 MARENGO STRE...| $37560.37|039 - EXTRACRANIA...| $4453.79| AL - Birmingham|[35631, FLORENCE,...| 10006| 24|ELIZA COFFEE MEMO...| | $5417.56|[50 MEDICAL PARK ...| $13998.28|039 - EXTRACRANIA...| $4129.16| AL - Birmingham|[35235, BIRMINGHA...| 10011| 25| ST VINCENT'S EAST| | $5658.33|[1000 FIRST STREE...| $31633.27|039 - EXTRACRANIA...| $4851.44| AL - Birmingham|[35007, ALABASTER...| 10016| 18|SHELBY BAPTIST ME...| | $6653.80|[2105 EAST SOUTH ...| $16920.79|039 - EXTRACRANIA...| $5374.14| AL - Montgomery|[36116, MONTGOMER...| 10023| 67|BAPTIST MEDICAL C...| | $5834.74|[2000 PEPPERELL P...| $11977.13|039 - EXTRACRANIA...| $4761.41| AL - Birmingham|[36801, OPELIKA, ...| 10029| 51|EAST ALABAMA MEDI...| | $8031.12|[619 SOUTH 19TH S...| $35841.09|039 - EXTRACRANIA...| $5858.50| AL - Birmingham|[35233, BIRMINGHA...| 10033| 32|UNIVERSITY OF ALA...| | $6113.38|[101 SIVLEY RD, H...| $28523.39|039 - EXTRACRANIA...| $5228.40| AL - Huntsville|[35801, HUNTSVILL...| 10039| 135| HUNTSVILLE HOSPITAL| | $5541.05|[1007 GOODYEAR AV...| $75233.38|039 - EXTRACRANIA...| $4386.94| AL - Birmingham|[35903, GADSDEN, ...| 10040| 34|GADSDEN REGIONAL ...| | $5461.57|[600 SOUTH THIRD ...| $67327.92|039 - EXTRACRANIA...| $4493.57| AL - Birmingham|[35901, GADSDEN, ...| 10046| 14|RIVERVIEW REGIONA...| | $5356.28|[4370 WEST MAIN S...| $39607.28|039 - EXTRACRANIA...| $4408.20| AL - Dothan|[36305, DOTHAN, [...| 10055| 45| FLOWERS HOSPITAL| | $5374.65|[810 ST VINCENT'S...| $22862.23|039 - EXTRACRANIA...| $4186.02| AL - Birmingham|[35205, BIRMINGHA...| 10056| 43|ST VINCENT'S BIRM...| | $5366.23|[400 EAST 10TH ST...| $31110.85|039 - EXTRACRANIA...| $4376.23| AL - Birmingham|[36207, ANNISTON,...| 10078| 21|NORTHEAST ALABAMA...| | $5282.93|[1613 NORTH MCKEN...| $25411.33|039 - EXTRACRANIA...| $4383.73| AL - Mobile|[36535, FOLEY, [1...| 10083| 15|SOUTH BALDWIN REG...| | $5676.55|[1201 7TH STREET ...| $9234.51|039 - EXTRACRANIA...| $4509.11| AL - Huntsville|[35609, DECATUR, ...| 10085| 27|DECATUR GENERAL H...| | $5930.11|[6801 AIRPORT BOU...| $15895.85|039 - EXTRACRANIA...| $3972.85| AL - Mobile|[36608, MOBILE, [...| 10090| 27| PROVIDENCE HOSPITAL| | $6192.54|[809 UNIVERSITY B...| $19721.16|039 - EXTRACRANIA...| $5179.38| AL - Tuscaloosa|[35401, TUSCALOOS...| 10092| 31|D C H REGIONAL ME...| | $4968.00|[750 MORPHY AVENU...| $10710.88|039 - EXTRACRANIA...| $3898.88| AL - Mobile|[36532, FAIRHOPE,...| 10100| 18| THOMAS HOSPITAL| | $5996.00|[701 PRINCETON AV...| $51343.75|039 - EXTRACRANIA...| $4962.45| AL - Birmingham|[35211, BIRMINGHA...| 10103| 33|BAPTIST MEDICAL C...| +----------------------+--------------------+-----------------------+--------------------+-------------------------+------------------------------------+--------------------+-----------+----------------+--------------------+ only showing top 20 rows

union

union(frame1, frame2, transformation_ctx = "", info = "", stageThreshold = 0, totalThreshold = 0)

2 つの DynamicFrames を結合します。両方の入力 DynamicFrame からのすべてのレコードを含む DynamicFrames を返します。この変換は、2 つの DataFrames rame を同等のデータで結合した結果とは異なる結果を返す場合があります。Spark DataFrame ユニオンの動作が必要な場合は、toDF の使用を検討してください。

  • frame1 — 最初に結合する DynamicFrame。

  • frame2 — 2 番目に結合する DynamicFrame。

  • transformation_ctx – (オプション) 統計/ステータス情報を識別するために使用される一意の文字列。

  • info – (オプション) 変換のエラーに関連付けられる文字列。

  • stageThreshold — (オプション) 処理がエラーになるまでの変換中の最大エラー数。

  • totalThreshold — (オプション) 処理がエラーになるまでの変換中の最大エラー数。

ネスト解除

unnest(transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

DynamicFrame 内のネストされたオブジェクトをネスト解除して、最上位レベルのオブジェクトにし、新しくネスト解除された DynamicFrame を返します。

  • transformation_ctx - 状態情報を識別するために使用される一意の文字列 (オプション)。

  • info - この変換のエラー報告に関連付ける文字列 (オプション)。

  • stageThreshold - この変換中に発生した、プロセスがエラーを出力するエラーの数 (オプション)。デフォルトはゼロで、エラーを出力しないことを示します。

  • totalThreshold - この変換までに発生したエラーのうち、プロセスがエラーを出力するエラーの数 (オプション)。デフォルトはゼロで、エラーを出力しないことを示します。

例: unnest を使用して、ネストされたフィールドを最上位フィールドに変換する

このコード例では、unnest メソッドを使用して、DynamicFrame のネストされたすべてのフィールドを最上位フィールドにフラット化します。

データセットの例

この例では、次のスキーマを持つ mapped_medicare と呼ばれる DynamicFrame を使用しています。Address フィールドは、ネストされたデータを含む唯一のフィールドであることに注意してください。

root |-- Average Total Payments: string |-- Average Covered Charges: string |-- DRG Definition: string |-- Average Medicare Payments: string |-- Hospital Referral Region Description: string |-- Address: struct | |-- Zip.Code: string | |-- City: string | |-- Array: array | | |-- element: string | |-- State: string | |-- Street: string |-- Provider Id: string |-- Total Discharges: string |-- Provider Name: string

コードの例

# Example: Use unnest to unnest nested # objects in a DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Unnest all nested fields unnested = mapped_medicare.unnest() unnested.printSchema()
root |-- Average Total Payments: string |-- Average Covered Charges: string |-- DRG Definition: string |-- Average Medicare Payments: string |-- Hospital Referral Region Description: string |-- Address.Zip.Code: string |-- Address.City: string |-- Address.Array: array | |-- element: string |-- Address.State: string |-- Address.Street: string |-- Provider Id: string |-- Total Discharges: string |-- Provider Name: string

unnest_ddb_json

特に DynamoDB JSON 構造体にある DynamicFrame でネストされた列をネスト解除すると、新しくネスト解除された DynamicFrame が返されます。構造体型の配列のある列は、ネスト解除されません。これは、通常の unnest 変換とは異なる特殊なタイプのネスト解除変換で、データが DynamoDB JSON 構造体に格納されている必要があることに注意してください。詳細については、「DynamoDB JSON」を参照してください。

unnest_ddb_json(transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)
  • transformation_ctx - 状態情報を識別するために使用される一意の文字列 (オプション)。

  • info - この変換のエラー報告に関連付ける文字列 (オプション)。

  • stageThreshold - この変換中にプロセスで発生するエラーの数 (オプション: デフォルトではゼロ、プロセスがエラーを出力しないことを示します)。

  • totalThreshold - この変換までに発生したエラーのうち、プロセスでエラーとなるエラーの数 (オプション: デフォルトではゼロ、プロセスがエラーを出力しないことを示します)。

例えば、DynamoDB JSON 構造体のあるエクスポートを読み取るスキーマは次のようになります。

root |-- Item: struct | |-- ColA: struct | | |-- S: string | |-- ColB: struct | | |-- S: string | |-- ColC: struct | | |-- N: string | |-- ColD: struct | | |-- L: array | | | |-- element: null

unnest_ddb_json() 変換は、以下のように変換します。

root |-- ColA: string |-- ColB: string |-- ColC: string |-- ColD: array | |-- element: null

次のコード例は、AWS Glue DynamoDB エクスポートコネクタを使用し、DynamoDB JSON unnest を呼び出し、パーティションの数を表示します。

import sys from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job from awsglue.utils import getResolvedOptions args = getResolvedOptions(sys.argv, ["JOB_NAME"]) glue_context= GlueContext(SparkContext.getOrCreate()) job = Job(glue_context) job.init(args["JOB_NAME"], args) dynamicFrame = glue_context.create_dynamic_frame.from_options( connection_type="dynamodb", connection_options={ "dynamodb.export": "ddb", "dynamodb.tableArn": "<test_source>", "dynamodb.s3.bucket": "<bucket name>", "dynamodb.s3.prefix": "<bucket prefix>", "dynamodb.s3.bucketOwner": "<account_id>", } ) unnested = dynamicFrame.unnest_ddb_json() print(unnested.getNumPartitions()) job.commit()

書き込み

write(connection_type, connection_options, format, format_options, accumulator_size)

この DynamicFrameGlueContext クラス から指定された接続タイプの DataSink (オブジェクト) を取得し、この DynamicFrame のコンテンツの書式設定および書き込みに使用します。指定されたとおりに書式設定され、書き込まれる新しい DynamicFrame を返します。

  • connection_type - 使用する接続タイプ。有効な値には、s3mysqlpostgresqlredshiftsqlserver、および oracle があります。

  • connection_options - 使用する接続オプション (オプション)。s3connection_type では、Amazon S3 パスが定義されています。

    connection_options = {"path": "s3://aws-glue-target/temp"}

    JDBC 接続の場合、いくつかのプロパティを定義する必要があります。データベース名は URL の一部である必要があることに注意してください。オプションで接続オプションに含めることができます。

    警告

    スクリプトにパスワードを保存することはお勧めしません。AWS Secrets Manager または AWS Glue データカタログから取得する場合には、boto3 を使用することを検討してください。

    connection_options = {"url": "jdbc-url/database", "user": "username", "password": passwordVariable,"dbtable": "table-name", "redshiftTmpDir": "s3-tempdir-path"}
  • format – 形式の仕様 (オプション)。これは、Amazon Simple Storage Service (Amazon S3)、または複数の形式をサポートする AWS Glue 接続で使用されます。サポートされる形式については、「AWS Glue for Spark での入出力のデータ形式に関するオプション」を参照してください。

  • format_options – 指定された形式についてのオプション。サポートされる形式については、「AWS Glue for Spark での入出力のデータ形式に関するオプション」を参照してください。

  • accumulator_size - 使用するバイト単位の累積サイズ (オプション)。

 — errors —

assertErrorThreshold

assertErrorThreshold( ) - この DynamicFrame を作成した変換エラーに対するアサーション。基盤になる DataFrame から Exception を返します。

errorsAsDynamicFrame

errorsAsDynamicFrame( ) - 内部にネストされたエラーレコードを持つ DynamicFrame を返します。

例: errorsAsDynamicFrame を使用してエラーレコードを表示する

次のコード例は、errorsAsDynamicFrame メソッドを使用して DynamicFrame のエラーレコードを表示する方法を示しています。

データセットの例

この例では、JSON として Amazon S3 にアップロードできる次のデータセットを使用します。2 つ目のレコードの形式に誤りがあることに注意してください。通常、SparkSQL を使用すると、不正な形式のデータによってファイルの解析が中断されます。ただし、DynamicFrame は、不正な形式の問題を認識し、不正な行を個別に処理できるエラーレコードに変換します。

{"id": 1, "name": "george", "surname": "washington", "height": 178} {"id": 2, "name": "benjamin", "surname": "franklin", {"id": 3, "name": "alexander", "surname": "hamilton", "height": 171} {"id": 4, "name": "john", "surname": "jay", "height": 190}

コードの例

# Example: Use errorsAsDynamicFrame to view error records. # Replace s3://DOC-EXAMPLE-S3-BUCKET/error_data.json with your location. from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Create errors DynamicFrame, view schema errors = glueContext.create_dynamic_frame.from_options( "s3", {"paths": ["s3://DOC-EXAMPLE-S3-BUCKET/error_data.json"]}, "json" ) print("Schema of errors DynamicFrame:") errors.printSchema() # Show that errors only contains valid entries from the dataset print("errors contains only valid records from the input dataset (2 of 4 records)") errors.toDF().show() # View errors print("Errors count:", str(errors.errorsCount())) print("Errors:") errors.errorsAsDynamicFrame().toDF().show() # View error fields and error data error_record = errors.errorsAsDynamicFrame().toDF().head() error_fields = error_record["error"] print("Error fields: ") print(error_fields.asDict().keys()) print("\nError record data:") for key in error_fields.asDict().keys(): print("\n", key, ": ", str(error_fields[key]))
Schema of errors DynamicFrame: root |-- id: int |-- name: string |-- surname: string |-- height: int errors contains only valid records from the input dataset (2 of 4 records) +---+------+----------+------+ | id| name| surname|height| +---+------+----------+------+ | 1|george|washington| 178| | 4| john| jay| 190| +---+------+----------+------+ Errors count: 1 Errors: +--------------------+ | error| +--------------------+ |[[ File "/tmp/20...| +--------------------+ Error fields: dict_keys(['callsite', 'msg', 'stackTrace', 'input', 'bytesread', 'source', 'dynamicRecord']) Error record data: callsite : Row(site=' File "/tmp/2060612586885849088", line 549, in <module>\n sys.exit(main())\n File "/tmp/2060612586885849088", line 523, in main\n response = handler(content)\n File "/tmp/2060612586885849088", line 197, in execute_request\n result = node.execute()\n File "/tmp/2060612586885849088", line 103, in execute\n exec(code, global_dict)\n File "<stdin>", line 10, in <module>\n File "/opt/amazon/lib/python3.6/site-packages/awsglue/dynamicframe.py", line 625, in from_options\n format_options, transformation_ctx, push_down_predicate, **kwargs)\n File "/opt/amazon/lib/python3.6/site-packages/awsglue/context.py", line 233, in create_dynamic_frame_from_options\n source.setFormat(format, **format_options)\n', info='') msg : error in jackson reader stackTrace : com.fasterxml.jackson.core.JsonParseException: Unexpected character ('{' (code 123)): was expecting either valid name character (for unquoted name) or double-quote (for quoted) to start field name at [Source: com.amazonaws.services.glue.readers.BufferedStream@73492578; line: 3, column: 2] at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1581) at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:533) at com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:462) at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleOddName(UTF8StreamJsonParser.java:2012) at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._parseName(UTF8StreamJsonParser.java:1650) at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:740) at com.amazonaws.services.glue.readers.JacksonReader$$anonfun$hasNextGoodToken$1.apply(JacksonReader.scala:57) at com.amazonaws.services.glue.readers.JacksonReader$$anonfun$hasNextGoodToken$1.apply(JacksonReader.scala:57) at scala.collection.Iterator$$anon$9.next(Iterator.scala:162) at scala.collection.Iterator$$anon$16.hasNext(Iterator.scala:599) at scala.collection.Iterator$$anon$16.hasNext(Iterator.scala:598) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at com.amazonaws.services.glue.readers.JacksonReader$$anonfun$1.apply(JacksonReader.scala:120) at com.amazonaws.services.glue.readers.JacksonReader$$anonfun$1.apply(JacksonReader.scala:116) at com.amazonaws.services.glue.DynamicRecordBuilder.handleErr(DynamicRecordBuilder.scala:209) at com.amazonaws.services.glue.DynamicRecordBuilder.handleErrorWithException(DynamicRecordBuilder.scala:202) at com.amazonaws.services.glue.readers.JacksonReader.nextFailSafe(JacksonReader.scala:116) at com.amazonaws.services.glue.readers.JacksonReader.next(JacksonReader.scala:109) at com.amazonaws.services.glue.readers.JSONReader.next(JSONReader.scala:247) at com.amazonaws.services.glue.hadoop.TapeHadoopRecordReaderSplittable.nextKeyValue(TapeHadoopRecordReaderSplittable.scala:103) at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:230) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:121) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) input : bytesread : 252 source : dynamicRecord : Row(id=2, name='benjamin', surname='franklin')

errorsCount

errorsCount( ) - DynamicFrame 内のエラーの総数を返します。

stageErrorsCount

stageErrorsCount - この DynamicFrame を生成するプロセスで発生したエラーの数を返します。