DynamicFrame クラス - AWS Glue

DynamicFrame クラス

Apache Spark の主要な抽象化の 1 つは SparkSQL DataFrame で、これは R と Pandas にある DataFrame 構造に似ています。DataFrame はテーブルと似ており、機能スタイル (マップ/リデュース/フィルター/その他) 操作と SQL 操作 (選択、プロジェクト、集計) をサポートしています。

DataFrames は、強力で広く使用されていますが、抽出、変換、ロード (ETL) 操作に関しては制限があります。最も重要なのは、データをロードする前にスキーマを指定する必要があることです。SparkSQL は、データに対してパスを 2 つ作ることでこれを解決します。この 1 つ目はスキーマの推定を行い、2 つ目はデータをロードします。ただし、この推測は限定されており、実際の煩雑なデータには対応しません。例えば、同じフィールドが異なるレコードの異なるタイプである可能性があります。Apache Spark は、多くの場合、作業を中断して、元のフィールドテキストを使用して型を string として報告します。これは正しくない可能性があり、スキーマの不一致を解決する方法を細かくコントロールする必要があるかもしれません。また、大規模なデータセットの場合、ソースデータに対する追加パスが非常に高価になる可能性があります。

これらの制限に対応するために、AWS Glue では DynamicFrame を導入しています。DynamicFrame は、DataFrame と似ていますが、各レコードが自己記述できるため、最初はスキーマは必要ありません。代わりに、AWS Glue は必要に応じてオンザフライでスキーマを計算し、選択 (または共用) タイプを使用してスキーマの不一致を明示的にエンコードします。これらの不整合を解決して、固定スキーマを必要とするデータストアとデータセットを互換性のあるものにできます。

同様に、DynamicRecordDynamicFrame 内の論理レコードを表します。これは、Spark DataFrame の行と似ていますが、自己記述型であり、固定スキーマに適合しないデータに使用できます。

スキーマの不一致を解決したら、DynamicFramesDataFrames との間で変換することができます。

 — construction —

__init__

__init__(jdf, glue_ctx, name)

  • jdf - Java 仮想マシン (JVM) 内のデータフレームへの参照。

  • glue_ctxGlueContext クラス オブジェクト。

  • name - オプションの名前文字列。デフォルトでは空。

fromDF

fromDF(dataframe, glue_ctx, name)

DataFrame フィールドを DynamicRecord に変換することにより、DataFrameDynamicFrame に変換します。新しい DynamicFrame を返します。

DynamicRecordDynamicFrame 内の論理レコードを表します。これは、自己記述型であり、固定スキーマに適合しないデータに使用できる点を除いて、Spark DataFrame の行に似ています。

  • dataframe - 変換する Apache Spark SQL DataFrame (必須)。

  • glue_ctx - この変換のコンテキストを指定する GlueContext クラス オブジェクト (必須)。

  • name – 結果の DynamicFrame の名前 (必須)。

toDF

toDF(options)

DynamicRecordsDataFrame フィールドに変換することにより、DynamicFrame を Apache Spark DataFrame に変換します。新しい DataFrame を返します。

DynamicRecordDynamicFrame 内の論理レコードを表します。これは、自己記述型であり、固定スキーマに適合しないデータに使用できる点を除いて、Spark DataFrame の行に似ています。

  • options - オプションのリスト。ProjectCast アクションタイプを選択した場合、ターゲットのタイプを指定します。次に例を示します。

    >>>toDF([ResolveOption("a.b.c", "KeepAsStruct")]) >>>toDF([ResolveOption("a.b.c", "Project", DoubleType())])

 — information —

count( ) - 基盤となる DataFrame の行数を返します。

schema

schema( ) - この DynamicFrame のスキーマを返します。使用できない場合は、基盤となる DataFrame のスキーマを返します。

printSchema

printSchema( ) - 基盤となる DataFrame のスキーマを表示します。

show

show(num_rows) - 基盤となる DataFrame から、指定された行数を表示します。

repartition

repartition(numPartitions)numPartitions 個のパーティションを含む新しい DynamicFrame を返します。

coalesce

coalesce(numPartitions)numPartitions 個のパーティションを含む新しい DynamicFrame を返します。

 — transforms —

apply_mapping

apply_mapping(mappings, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

DynamicFrame に宣言型のマッピングを適用し、それらのマッピングが適用された新しい DynamicFrame を指定したフィールドに返します。未指定のフィールドは新しい DynamicFrame から除外されます。

  • mappings — マッピングタプルのリスト (必須)。それぞれが (ソース列、ソースタイプ、ターゲット列、ターゲットタイプ) で構成されます。

    ソース列で、名前にドット「.」が含まれている場合、バックティック「``」で囲む必要があります。例えば、this.old.name (文字列) を thisNewName にマッピングするには、次のタプルを使用します。

    ("`this.old.name`", "string", "thisNewName", "string")
  • transformation_ctx - 状態情報を識別するために使用される一意の文字列 (オプション)。

  • info - この変換のエラー報告に関連付ける文字列 (オプション)。

  • stageThreshold - この変換中に発生した、プロセスがエラーを出力するエラーの数 (オプション)。デフォルトはゼロで、エラーを出力しないことを示します。

  • totalThreshold - この変換までに発生したエラーのうち、プロセスがエラーを出力するエラーの数 (オプション)。デフォルトはゼロで、エラーを出力しないことを示します。

例: apply_mapping を使用してフィールドの名前を変更し、フィールドタイプを変更する

次のコード例は、apply_mapping メソッドを使用して、選択したフィールドの名前を変更し、フィールドタイプを変更する方法を示しています。

注記

この例で使用されているデータセットにアクセスするには、「コード例: データの結合と関係付け」を参照し、「ステップ 1: Amazon S3 バケット内のデータをクロールする」の手順に従います。

# Example: Use apply_mapping to reshape source data into # the desired column names and types as a new DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Create a DynamicFrame and view its schema persons = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="persons_json" ) print("Schema for the persons DynamicFrame:") persons.printSchema() # Select and rename fields, change field type print("Schema for the persons_mapped DynamicFrame, created with apply_mapping:") persons_mapped = persons.apply_mapping( [ ("family_name", "String", "last_name", "String"), ("name", "String", "first_name", "String"), ("birth_date", "String", "date_of_birth", "Date"), ] ) persons_mapped.printSchema()
Schema for the persons DynamicFrame: root |-- family_name: string |-- name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- gender: string |-- image: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- sort_name: string |-- images: array | |-- element: struct | | |-- url: string |-- given_name: string |-- birth_date: string |-- id: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- death_date: string Schema for the persons_mapped DynamicFrame, created with apply_mapping: root |-- last_name: string |-- first_name: string |-- date_of_birth: date

drop_fields

drop_fields(paths, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

FlatMap クラス 変換を呼び出して、DynamicFrame からフィールドを削除します。指定されたフィールドが削除された新しい DynamicFrame を返します。

  • paths - 文字列のリスト。それぞれに、ドロップするフィールドノードへのフルパスが含まれます。ドット表記を使用して、ネストされたフィールドを指定できます。例えば、フィールド first がツリー内のフィールド name の子である場合は、パスに "name.first" を指定します。

  • transformation_ctx - 状態情報を識別するために使用される一意の文字列 (オプション)。

  • info - この変換のエラー報告に関連付ける文字列 (オプション)。

  • stageThreshold - この変換中に発生した、プロセスがエラーを出力するエラーの数 (オプション)。デフォルトはゼロで、エラーを出力しないことを示します。

  • totalThreshold - この変換までに発生したエラーのうち、プロセスがエラーを出力するエラーの数 (オプション)。デフォルトはゼロで、エラーを出力しないことを示します。

例: drop_fields を使用して DynamicFrame からフィールドを削除する

このコード例では、drop_fields メソッドを使用して、選択したトップレベルフィールドとネストされたフィールドを DynamicFrame から削除します。

データセットの例

この例では、コード内の EXAMPLE-FRIENDS-DATA テーブルで表される次のデータセットを使用します。

{"name": "Sally", "age": 23, "location": {"state": "WY", "county": "Fremont"}, "friends": []} {"name": "Varun", "age": 34, "location": {"state": "NE", "county": "Douglas"}, "friends": [{"name": "Arjun", "age": 3}]} {"name": "George", "age": 52, "location": {"state": "NY"}, "friends": [{"name": "Fred"}, {"name": "Amy", "age": 15}]} {"name": "Haruki", "age": 21, "location": {"state": "AK", "county": "Denali"}} {"name": "Sheila", "age": 63, "friends": [{"name": "Nancy", "age": 22}]}

コードの例

# Example: Use drop_fields to remove top-level and nested fields from a DynamicFrame. # Replace MY-EXAMPLE-DATABASE with your Glue Data Catalog database name. # Replace EXAMPLE-FRIENDS-DATA with your table name. from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Create a DynamicFrame from Glue Data Catalog glue_source_database = "MY-EXAMPLE-DATABASE" glue_source_table = "EXAMPLE-FRIENDS-DATA" friends = glueContext.create_dynamic_frame.from_catalog( database=glue_source_database, table_name=glue_source_table ) print("Schema for friends DynamicFrame before calling drop_fields:") friends.printSchema() # Remove location.county, remove friends.age, remove age friends = friends.drop_fields(paths=["age", "location.county", "friends.age"]) print("Schema for friends DynamicFrame after removing age, county, and friend age:") friends.printSchema()
Schema for friends DynamicFrame before calling drop_fields: root |-- name: string |-- age: int |-- location: struct | |-- state: string | |-- county: string |-- friends: array | |-- element: struct | | |-- name: string | | |-- age: int Schema for friends DynamicFrame after removing age, county, and friend age: root |-- name: string |-- location: struct | |-- state: string |-- friends: array | |-- element: struct | | |-- name: string

フィルター

filter(f, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

指定された述語関数 f を満たす入力 DynamicFrame 内のすべての DynamicRecords を含む、新しい DynamicFrame を返します。

  • f - DynamicFrame に適用する述語関数。この関数は DynamicRecord を引数として取り、DynamicRecord がフィルター要件を満たす場合は True を返し、そうでない場合は False を返します (必須)。

    DynamicRecordDynamicFrame 内の論理レコードを表します。これは、自己記述型であり、固定スキーマに準拠しないデータに使用できる点を除いて、Spark DataFrame の行に似ています。

  • transformation_ctx - 状態情報を識別するために使用される一意の文字列 (オプション)。

  • info - この変換のエラー報告に関連付ける文字列 (オプション)。

  • stageThreshold - この変換中に発生した、プロセスがエラーを出力するエラーの数 (オプション)。デフォルトはゼロで、エラーを出力しないことを示します。

  • totalThreshold - この変換までに発生したエラーのうち、プロセスがエラーを出力するエラーの数 (オプション)。デフォルトはゼロで、エラーを出力しないことを示します。

例: フィルターを使用して、フィールドのフィルタリングされた選択内容を取得する

この例では、filter メソッドを使用して、別の DynamicFrame のフィールドのフィルタリングされた選択内容を含む新しい DynamicFrame を作成します。

map メソッドと同様に、filter は、元の DynamicFrame の各レコードに適用される引数として関数を取ります。この関数はレコードを入力として受け取り、ブール値を返します。戻り値が true の場合、レコードは結果として生じる DynamicFrame に含まれます。false の場合、レコードは除外されます。

注記

この例で使用されているデータセットにアクセスするには、「コード例: ResolveChoice、Lambda、および ApplyMapping を使用したデータ準備」を参照し、「ステップ 1: Amazon S3 バケット内のデータをクロールする」の手順に従います。

# Example: Use filter to create a new DynamicFrame # with a filtered selection of records from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Create DynamicFrame from Glue Data Catalog medicare = glueContext.create_dynamic_frame.from_options( "s3", { "paths": [ "s3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv" ] }, "csv", {"withHeader": True}, ) # Create filtered DynamicFrame with custom lambda # to filter records by Provider State and Provider City sac_or_mon = medicare.filter( f=lambda x: x["Provider State"] in ["CA", "AL"] and x["Provider City"] in ["SACRAMENTO", "MONTGOMERY"] ) # Compare record counts print("Unfiltered record count: ", medicare.count()) print("Filtered record count: ", sac_or_mon.count())
Unfiltered record count: 163065 Filtered record count: 564

join

join(paths1, paths2, frame2, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

別の DynamicFrame と等価結合を実行し、結果の DynamicFrame を返します。

  • paths1 - 結合するこのフレームのキーのリスト。

  • paths2 - 結合する別のフレームのキーのリスト。

  • frame2 - 結合する他の DynamicFrame

  • transformation_ctx - 状態情報を識別するために使用される一意の文字列 (オプション)。

  • info - この変換のエラー報告に関連付ける文字列 (オプション)。

  • stageThreshold - この変換中に発生した、プロセスがエラーを出力するエラーの数 (オプション)。デフォルトはゼロで、エラーを出力しないことを示します。

  • totalThreshold - この変換までに発生したエラーのうち、プロセスがエラーを出力するエラーの数 (オプション)。デフォルトはゼロで、エラーを出力しないことを示します。

例: 結合を使用して DynamicFrames を結合する

この例では、join メソッドを使用して 3 つの DynamicFrames で結合を実行します。AWSGlue は、入力されたフィールドキーに基づいて結合を実行します。結果として生じる DynamicFrame には、指定されたキーが一致する元の 2 つのフレームからの行が含まれます。

join 変換では、すべてのフィールドがそのまま保持されることに注意してください。これは、一致するように指定したフィールドが冗長で同じキーが含まれていても、結果として生じる DynamicFrame に表示されることを意味します。この例では、drop_fields を使用して、結合後にこれらの冗長なキーを削除します。

注記

この例で使用されているデータセットにアクセスするには、「コード例: データの結合と関係付け」を参照し、「ステップ 1: Amazon S3 バケット内のデータをクロールする」の手順に従います。

# Example: Use join to combine data from three DynamicFrames from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Load DynamicFrames from Glue Data Catalog persons = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="persons_json" ) memberships = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="memberships_json" ) orgs = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="organizations_json" ) print("Schema for the persons DynamicFrame:") persons.printSchema() print("Schema for the memberships DynamicFrame:") memberships.printSchema() print("Schema for the orgs DynamicFrame:") orgs.printSchema() # Join persons and memberships by ID persons_memberships = persons.join( paths1=["id"], paths2=["person_id"], frame2=memberships ) # Rename and drop fields from orgs # to prevent field name collisions with persons_memberships orgs = ( orgs.drop_fields(["other_names", "identifiers"]) .rename_field("id", "org_id") .rename_field("name", "org_name") ) # Create final join of all three DynamicFrames legislators_combined = orgs.join( paths1=["org_id"], paths2=["organization_id"], frame2=persons_memberships ).drop_fields(["person_id", "org_id"]) # Inspect the schema for the joined data print("Schema for the new legislators_combined DynamicFrame:") legislators_combined.printSchema()
Schema for the persons DynamicFrame: root |-- family_name: string |-- name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- gender: string |-- image: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- sort_name: string |-- images: array | |-- element: struct | | |-- url: string |-- given_name: string |-- birth_date: string |-- id: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- death_date: string Schema for the memberships DynamicFrame: root |-- area_id: string |-- on_behalf_of_id: string |-- organization_id: string |-- role: string |-- person_id: string |-- legislative_period_id: string |-- start_date: string |-- end_date: string Schema for the orgs DynamicFrame: root |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- id: string |-- classification: string |-- name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- image: string |-- seats: int |-- type: string Schema for the new legislators_combined DynamicFrame: root |-- role: string |-- seats: int |-- org_name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- type: string |-- sort_name: string |-- area_id: string |-- images: array | |-- element: struct | | |-- url: string |-- on_behalf_of_id: string |-- other_names: array | |-- element: struct | | |-- note: string | | |-- name: string | | |-- lang: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- name: string |-- birth_date: string |-- organization_id: string |-- gender: string |-- classification: string |-- legislative_period_id: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- image: string |-- given_name: string |-- start_date: string |-- family_name: string |-- id: string |-- death_date: string |-- end_date: string

map

map(f, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

指定したマッピング関数を元の DynamicFrame のすべてのレコードに適用した結果の新しい DynamicFrame を返します。

  • f - DynamicFrame 内のすべてのレコードに適用されるマッピング関数。この関数は、DynamicRecord を引数として取り、新しい DynamicRecord を返す必要があります (必須)。

    DynamicRecordDynamicFrame 内の論理レコードを表します。これは、自己記述型であり、固定スキーマに準拠しないデータに使用できる点を除いて、Apache Spark DataFrame の行に似ています。

  • transformation_ctx - 状態情報を識別するために使用される一意の文字列 (オプション)。

  • info - 変換のエラーに関連付けられた文字列 (オプション)。

  • stageThreshold – エラーを出力する前に、変換で発生する可能性のあるエラーの最大数 (オプション)。デフォルト値は 0 です。

  • totalThreshold – エラーの出力を処理する前に、全体で発生する可能性のあるエラーの最大数 (オプション)。デフォルト値は 0 です。

例: マッピングを使用して、DynamicFrame のすべてのレコードに関数を適用する

この例は、map メソッドを使用して、関数を DynamicFrame のすべてのレコードに適用する方法を示しています。具体的には、この例では、複数のアドレスフィールドを単一の struct タイプに結合するため、MergeAddress という名前の関数を各レコードに適用します。

注記

この例で使用されているデータセットにアクセスするには、「コード例: ResolveChoice、Lambda、および ApplyMapping を使用したデータ準備」を参照し、「ステップ 1: Amazon S3 バケット内のデータをクロールする」の手順に従います。

# Example: Use map to combine fields in all records # of a DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Create a DynamicFrame and view its schema medicare = glueContext.create_dynamic_frame.from_options( "s3", {"paths": ["s3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv"]}, "csv", {"withHeader": True}) print("Schema for medicare DynamicFrame:") medicare.printSchema() # Define a function to supply to the map transform # that merges address fields into a single field def MergeAddress(rec): rec["Address"] = {} rec["Address"]["Street"] = rec["Provider Street Address"] rec["Address"]["City"] = rec["Provider City"] rec["Address"]["State"] = rec["Provider State"] rec["Address"]["Zip.Code"] = rec["Provider Zip Code"] rec["Address"]["Array"] = [rec["Provider Street Address"], rec["Provider City"], rec["Provider State"], rec["Provider Zip Code"]] del rec["Provider Street Address"] del rec["Provider City"] del rec["Provider State"] del rec["Provider Zip Code"] return rec # Use map to apply MergeAddress to every record mapped_medicare = medicare.map(f = MergeAddress) print("Schema for mapped_medicare DynamicFrame:") mapped_medicare.printSchema()
Schema for medicare DynamicFrame: root |-- DRG Definition: string |-- Provider Id: string |-- Provider Name: string |-- Provider Street Address: string |-- Provider City: string |-- Provider State: string |-- Provider Zip Code: string |-- Hospital Referral Region Description: string |-- Total Discharges: string |-- Average Covered Charges: string |-- Average Total Payments: string |-- Average Medicare Payments: string Schema for mapped_medicare DynamicFrame: root |-- Average Total Payments: string |-- Average Covered Charges: string |-- DRG Definition: string |-- Average Medicare Payments: string |-- Hospital Referral Region Description: string |-- Address: struct | |-- Zip.Code: string | |-- City: string | |-- Array: array | | |-- element: string | |-- State: string | |-- Street: string |-- Provider Id: string |-- Total Discharges: string |-- Provider Name: string

mergeDynamicFrame

mergeDynamicFrame(stage_dynamic_frame, primary_keys, transformation_ctx = "", options = {}, info = "", stageThreshold = 0, totalThreshold = 0)

レコードを識別するために、この DynamicFrame を指定されたプライマリキーに基づくステージング DynamicFrame とマージします。重複レコード (同じプライマリキーを持つレコード) は重複除外されません。ステージングフレームに一致するレコードがない場合、すべてのレコード(重複を含む)がソースから保持されます。ステージングフレームに一致するレコードがある場合、ステージングフレームのレコードによって、AWS Glue のソースのレコードが上書きされます。

  • stage_dynamic_frame – マージするステージング DynamicFrame

  • primary_keys – ソースおよびステージング動的フレームからのレコードを照合する、プライマリキーフィールドのリスト。

  • transformation_ctx – 現在の変換に関するメタデータを取得するために使用される一意の文字列 (オプション)。

  • options – この変換に関する追加情報を提供する、JSON の名前と値のペアを示す文字列。この引数は現在使用されていません。

  • infoString。この変換でのエラーに関連付けられる任意の文字列。

  • stageThresholdLong。指定された変換で処理がエラーアウトする必要があるエラーの数。

  • totalThresholdLong。この変換までに発生したエラーのうち、処理でエラーを出力する必要があるエラーの合計数。

このメソッドは、この DynamicFrame をステージング DynamicFrame とマージして取得した新しい DynamicFrame を返します。

以下の場合、返される DynamicFrame にはレコード A が含まれます。

  • A がソースフレームとステージングフレームの両方に存在する場合、ステージングフレームの A が返されます。

  • A がソーステーブルに存在し、A.primaryKeysstagingDynamicFrame に存在しない場合、A はステージングテーブルで更新されていません。

ソースフレームとステージングフレームが、同じスキーマを持つ必要はありません。

例: mergeDynamicFrame を使用して、プライマリキーに基づいて 2 つの DynamicFrames をマージする

次のコード例は、mergeDynamicFrame メソッドを使用して、プライマリキー id に基づいて DynamicFrame を「ステージング」DynamicFrame とマージする方法を示しています。

データセットの例

この例では、split_rows_collection と呼ばれる DynamicFrameCollection からの 2 つの DynamicFrames を使用しています。次のリストに示しているのは、split_rows_collection のキーです。

dict_keys(['high', 'low'])

コードの例

# Example: Use mergeDynamicFrame to merge DynamicFrames # based on a set of specified primary keys from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.transforms import SelectFromCollection # Inspect the original DynamicFrames frame_low = SelectFromCollection.apply(dfc=split_rows_collection, key="low") print("Inspect the DynamicFrame that contains rows where ID < 10") frame_low.toDF().show() frame_high = SelectFromCollection.apply(dfc=split_rows_collection, key="high") print("Inspect the DynamicFrame that contains rows where ID > 10") frame_high.toDF().show() # Merge the DynamicFrames based on the "id" primary key merged_high_low = frame_high.mergeDynamicFrame( stage_dynamic_frame=frame_low, primary_keys=["id"] ) # View the results where the ID is 1 or 20 print("Inspect the merged DynamicFrame that contains the combined rows") merged_high_low.toDF().where("id = 1 or id= 20").orderBy("id").show()
Inspect the DynamicFrame that contains rows where ID < 10 +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 1| 0| fax| 202-225-3307| | 1| 1| phone| 202-225-5731| | 2| 0| fax| 202-225-3307| | 2| 1| phone| 202-225-5731| | 3| 0| fax| 202-225-3307| | 3| 1| phone| 202-225-5731| | 4| 0| fax| 202-225-3307| | 4| 1| phone| 202-225-5731| | 5| 0| fax| 202-225-3307| | 5| 1| phone| 202-225-5731| | 6| 0| fax| 202-225-3307| | 6| 1| phone| 202-225-5731| | 7| 0| fax| 202-225-3307| | 7| 1| phone| 202-225-5731| | 8| 0| fax| 202-225-3307| | 8| 1| phone| 202-225-5731| | 9| 0| fax| 202-225-3307| | 9| 1| phone| 202-225-5731| | 10| 0| fax| 202-225-6328| | 10| 1| phone| 202-225-4576| +---+-----+------------------------+-------------------------+ only showing top 20 rows Inspect the DynamicFrame that contains rows where ID > 10 +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 11| 0| fax| 202-225-6328| | 11| 1| phone| 202-225-4576| | 11| 2| twitter| RepTrentFranks| | 12| 0| fax| 202-225-6328| | 12| 1| phone| 202-225-4576| | 12| 2| twitter| RepTrentFranks| | 13| 0| fax| 202-225-6328| | 13| 1| phone| 202-225-4576| | 13| 2| twitter| RepTrentFranks| | 14| 0| fax| 202-225-6328| | 14| 1| phone| 202-225-4576| | 14| 2| twitter| RepTrentFranks| | 15| 0| fax| 202-225-6328| | 15| 1| phone| 202-225-4576| | 15| 2| twitter| RepTrentFranks| | 16| 0| fax| 202-225-6328| | 16| 1| phone| 202-225-4576| | 16| 2| twitter| RepTrentFranks| | 17| 0| fax| 202-225-6328| | 17| 1| phone| 202-225-4576| +---+-----+------------------------+-------------------------+ only showing top 20 rows Inspect the merged DynamicFrame that contains the combined rows +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 1| 0| fax| 202-225-3307| | 1| 1| phone| 202-225-5731| | 20| 0| fax| 202-225-5604| | 20| 1| phone| 202-225-6536| | 20| 2| twitter| USRepLong| +---+-----+------------------------+-------------------------+

関係付け

relationalize(root_table_name, staging_path, options, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

DynamicFrame をリレーショナルデータベースに適合する形式に変換します。DynamicFrame の関係付けは、DynamoDB などの NoSQL 環境から MySQL などのリレーショナルデータベースにデータを移動する場合に特に便利です。

この変換は、ネストされた列をネスト解除し、配列の列をピボットすることでフレームのリストを生成します。フェーズのネスト解除時に生成された結合キーを使用して、ピボットされた配列の列をルートテーブルに結合できます。

  • root_table_name - ルートテーブルの名前。

  • staging_path – このメソッドがピボットテーブルのパーティションを CSV 形式で保存する保存先のパス (オプション)。ピボットされたテーブルはこのパスから読み取ります。

  • options - オプションのパラメータのディクショナリ。

  • transformation_ctx - 状態情報を識別するために使用される一意の文字列 (オプション)。

  • info - この変換のエラー報告に関連付ける文字列 (オプション)。

  • stageThreshold - この変換中に発生した、プロセスがエラーを出力するエラーの数 (オプション)。デフォルトはゼロで、エラーを出力しないことを示します。

  • totalThreshold - この変換までに発生したエラーのうち、プロセスがエラーを出力するエラーの数 (オプション)。デフォルトはゼロで、エラーを出力しないことを示します。

例: relationalize を使用して、DynamicFrame のネストされたスキーマをフラット化する

このコード例では、relationalize メソッドを使用して、ネストされたスキーマをリレーショナルデータベースに適合する形式にフラット化します。

データセットの例

この例では、次のスキーマを持つ legislators_combined と呼ばれる DynamicFrame を使用しています。legislators_combined には、relationalize 変換によってフラット化される、linksimagescontact_details などのネストされた複数のフィールドがあります。

root |-- role: string |-- seats: int |-- org_name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- type: string |-- sort_name: string |-- area_id: string |-- images: array | |-- element: struct | | |-- url: string |-- on_behalf_of_id: string |-- other_names: array | |-- element: struct | | |-- note: string | | |-- name: string | | |-- lang: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- name: string |-- birth_date: string |-- organization_id: string |-- gender: string |-- classification: string |-- legislative_period_id: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- image: string |-- given_name: string |-- start_date: string |-- family_name: string |-- id: string |-- death_date: string |-- end_date: string

コードの例

# Example: Use relationalize to flatten # a nested schema into a format that fits # into a relational database. # Replace DOC-EXAMPLE-S3-BUCKET/tmpDir with your own location. from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Apply relationalize and inspect new tables legislators_relationalized = legislators_combined.relationalize( "l_root", "s3://DOC-EXAMPLE-BUCKET/tmpDir" ) legislators_relationalized.keys() # Compare the schema of the contact_details # nested field to the new relationalized table that # represents it legislators_combined.select_fields("contact_details").printSchema() legislators_relationalized.select("l_root_contact_details").toDF().where( "id = 10 or id = 75" ).orderBy(["id", "index"]).show()

次の出力では、contact_details と呼ばれるネストされたフィールドのスキーマを、relationalize 変換によって作成されたテーブルと比較できます。テーブルのレコードが id と呼ばれる外部キーと配列の位置を表す index 列を使用してメインテーブルにリンクされていることに注意してください。

dict_keys(['l_root', 'l_root_images', 'l_root_links', 'l_root_other_names', 'l_root_contact_details', 'l_root_identifiers']) root |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 10| 0| fax| 202-225-4160| | 10| 1| phone| 202-225-3436| | 75| 0| fax| 202-225-6791| | 75| 1| phone| 202-225-2861| | 75| 2| twitter| RepSamFarr| +---+-----+------------------------+-------------------------+

rename_field

rename_field(oldName, newName, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

この DynamicFrame のフィールドの名前を変更し、フィールドの名前が変更された新しい DynamicFrame を返します。

  • oldName - 名前を変更するノードへのフルパス。

    古い名前にドットが含まれている場合、RenameField はバックティック (`) で囲まなければ機能しません。例えば、this.old.namethisNewName に置き換えるには、rename_field を次のように呼び出します。

    newDyF = oldDyF.rename_field("`this.old.name`", "thisNewName")
  • newName - 完全パスとしての新しい名前。

  • transformation_ctx - 状態情報を識別するために使用される一意の文字列 (オプション)。

  • info - この変換のエラー報告に関連付ける文字列 (オプション)。

  • stageThreshold - この変換中に発生した、プロセスがエラーを出力するエラーの数 (オプション)。デフォルトはゼロで、エラーを出力しないことを示します。

  • totalThreshold - この変換までに発生したエラーのうち、プロセスがエラーを出力するエラーの数 (オプション)。デフォルトはゼロで、エラーを出力しないことを示します。

例: rename_field を使用して、DynamicFrame のフィールドの名前を変更する

このコード例では、rename_field メソッドを使用して DynamicFrame のフィールドの名前を変更します。この例では、メソッドの連鎖を使用して、複数のフィールドの名前を同時に変更していることに注意してください。

注記

この例で使用されているデータセットにアクセスするには、「コード例: データの結合と関係付け」を参照し、「ステップ 1: Amazon S3 バケット内のデータをクロールする」の手順に従います。

コードの例

# Example: Use rename_field to rename fields # in a DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Inspect the original orgs schema orgs = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="organizations_json" ) print("Original orgs schema: ") orgs.printSchema() # Rename fields and view the new schema orgs = orgs.rename_field("id", "org_id").rename_field("name", "org_name") print("New orgs schema with renamed fields: ") orgs.printSchema()
Original orgs schema: root |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- id: string |-- classification: string |-- name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- image: string |-- seats: int |-- type: string New orgs schema with renamed fields: root |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- classification: string |-- org_id: string |-- org_name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- image: string |-- seats: int |-- type: string

resolveChoice

resolveChoice(specs = None, choice = "" , database = None , table_name = None , transformation_ctx="", info="", stageThreshold=0, totalThreshold=0, catalog_id = None)

この DynamicFrame 内で選択タイプを解決し、新しい DynamicFrame を返します。

  • specs – それぞれがタプルの形式である、解決すべき特定の曖昧要素のリスト: (field_path, action)

    resolveChoice を使用するには 2 つの方法があります。最初の方法では、specs 引数により、特定のフィールドのシーケンスと解決方法を指定します。resolveChoice のもう 1 つのモードでは、choice 引数を使用して、すべての ChoiceTypes に対して単一の解決策を指定します。

    specs の値は、(field_path, action) ペアで構成されたタプルとして指定されます。field_path 値は特定のあいまいな要素を識別し、action 値は対応する解決を識別します。以下のアクションを指定できます。

    • cast:type – すべての値について、指定した型へのキャストを試みます。例: cast:int

    • make_cols – それぞれの異なるタイプを columnName_type という名前の列に変換します。データをフラット化することで潜在的なあいまいさを解消します。例えば、columnAint または string の場合、解決策は、作成された DynamicFramecolumnA_int および columnA_string という名前の 2 つの列を生成することです。

    • make_structstruct を使用してデータを表現することで、潜在的なあいまいさを解決します。例えば、列のデータが int または string となる可能性がある場合、make_struct アクションを使用すると、作成された DynamicFrame に構造体の列が生成されます。各構造体には、intstring の両方が含まれています。

    • project:type – 可能なデータ型の 1 つにすべてのデータを投影することで、潜在的なあいまいさを解消します。例えば、列のデータが int または string の場合、project:string アクションを使用すると、すべての int 値が文字列に変換されている、作成された DynamicFrame に列が生成されます。

    field_path で配列を識別する場合は、あいまいさを避けるために配列名の後に空の角括弧を置きます。例えば、使用しているデータが次のように構造化されているとします。

    "myList": [ { "price": 100.00 }, { "price": "$100.00" } ]

    field_path"myList[].price" に設定し、action"cast:double" に設定すると、文字列バージョンではなく、数値バージョンの料金を選択できます。

    注記

    specs パラメータおよび choice パラメータのうち 1 つのみを使用できます。specs パラメータが None ではない場合、choice パラメータは空の文字列である必要があります。逆に、choice が空の文字列ではない場合、specs パラメータは None である必要があります。

  • choice – すべての ChoiceTypes について、単一の解決方法を指定します。このモードは、ランタイム前に ChoiceTypes の完全なリストが不明な場合に使用できます。この引数では、上記の specs 用のアクションに加えて、以下のアクションもサポートされています。

    • match_catalog – 各 ChoiceType について、指定した Data Catalog テーブル内の対応する型へのキャストを試みます。

  • databasematch_catalog アクションで使用する Data Catalog データベース。

  • table_namematch_catalog アクションで使用する Data Catalog テーブル。

  • transformation_ctx - 状態情報を識別するために使用される一意の文字列 (オプション)。

  • info - この変換のエラー報告に関連付ける文字列 (オプション)。

  • stageThreshold - この変換中に発生した、プロセスがエラーを出力するエラーの数 (オプション)。デフォルトはゼロで、エラーを出力しないことを示します。

  • totalThreshold – この変換までに発生したエラーのうち、プロセスでエラーを出力する必要があるエラーの数 (オプション)。デフォルトはゼロで、プロセスでエラーを出力しないことを示します。

  • catalog_id – 現在アクセスされている Data Catalog のカタログ ID (Data Catalog のアカウント ID)。None を設定した場合(デフォルト値) では、呼び出し元アカウントのカタログ ID が使用されます。

例: resolveChoice を使用して、複数の型を含む列を処理する

このコード例では、resolveChoice メソッドを使用して、複数の型の値を含む DynamicFrame 列の処理方法を指定します。この例は、型の異なる列を処理する一般的な 2 つの方法を示しています。

  • 列を単一のデータ型にキャストします。

  • すべての型を別々の列に保持します。

データセットの例

注記

この例で使用されているデータセットにアクセスするには、「コード例: ResolveChoice、Lambda、および ApplyMapping を使用したデータ準備」を参照し、「ステップ 1: Amazon S3 バケット内のデータをクロールする」の手順に従います。

この例では、次のスキーマを持つ medicare と呼ばれる DynamicFrame を使用しています。

root |-- drg definition: string |-- provider id: choice | |-- long | |-- string |-- provider name: string |-- provider street address: string |-- provider city: string |-- provider state: string |-- provider zip code: long |-- hospital referral region description: string |-- total discharges: long |-- average covered charges: string |-- average total payments: string |-- average medicare payments: string

コードの例

# Example: Use resolveChoice to handle # a column that contains multiple types from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Load the input data and inspect the "provider id" column medicare = glueContext.create_dynamic_frame.from_catalog( database="payments", table_name="medicare_hospital_provider_csv" ) print("Inspect the provider id column:") medicare.toDF().select("provider id").show() # Cast provider id to type long medicare_resolved_long = medicare.resolveChoice(specs=[("provider id", "cast:long")]) print("Schema after casting provider id to type long:") medicare_resolved_long.printSchema() medicare_resolved_long.toDF().select("provider id").show() # Create separate columns # for each provider id type medicare_resolved_cols = medicare.resolveChoice(choice="make_cols") print("Schema after creating separate columns for each type:") medicare_resolved_cols.printSchema() medicare_resolved_cols.toDF().select("provider id_long", "provider id_string").show()
Inspect the 'provider id' column: +-----------+ |provider id| +-----------+ | [10001,]| | [10005,]| | [10006,]| | [10011,]| | [10016,]| | [10023,]| | [10029,]| | [10033,]| | [10039,]| | [10040,]| | [10046,]| | [10055,]| | [10056,]| | [10078,]| | [10083,]| | [10085,]| | [10090,]| | [10092,]| | [10100,]| | [10103,]| +-----------+ only showing top 20 rows Schema after casting 'provider id' to type long: root |-- drg definition: string |-- provider id: long |-- provider name: string |-- provider street address: string |-- provider city: string |-- provider state: string |-- provider zip code: long |-- hospital referral region description: string |-- total discharges: long |-- average covered charges: string |-- average total payments: string |-- average medicare payments: string +-----------+ |provider id| +-----------+ | 10001| | 10005| | 10006| | 10011| | 10016| | 10023| | 10029| | 10033| | 10039| | 10040| | 10046| | 10055| | 10056| | 10078| | 10083| | 10085| | 10090| | 10092| | 10100| | 10103| +-----------+ only showing top 20 rows Schema after creating separate columns for each type: root |-- drg definition: string |-- provider id_string: string |-- provider id_long: long |-- provider name: string |-- provider street address: string |-- provider city: string |-- provider state: string |-- provider zip code: long |-- hospital referral region description: string |-- total discharges: long |-- average covered charges: string |-- average total payments: string |-- average medicare payments: string +----------------+------------------+ |provider id_long|provider id_string| +----------------+------------------+ | 10001| null| | 10005| null| | 10006| null| | 10011| null| | 10016| null| | 10023| null| | 10029| null| | 10033| null| | 10039| null| | 10040| null| | 10046| null| | 10055| null| | 10056| null| | 10078| null| | 10083| null| | 10085| null| | 10090| null| | 10092| null| | 10100| null| | 10103| null| +----------------+------------------+ only showing top 20 rows

select_fields

select_fields(paths, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

選択したフィールドを含む新しい DynamicFrame を返します。

  • paths - 文字列のリスト。各文字列は、選択する最上位ノードへのパスです。

  • transformation_ctx - 状態情報を識別するために使用される一意の文字列 (オプション)。

  • info - この変換のエラー報告に関連付ける文字列 (オプション)。

  • stageThreshold - この変換中に発生した、プロセスがエラーを出力するエラーの数 (オプション)。デフォルトはゼロで、エラーを出力しないことを示します。

  • totalThreshold - この変換までに発生したエラーのうち、プロセスがエラーを出力するエラーの数 (オプション)。デフォルトはゼロで、エラーを出力しないことを示します。

例: select_fields を使用して、選択したフィールドで新しい DynamicFrame を作成する

次のコード例は、select_fields メソッドを使用して、既存の DynamicFrame から選択されたフィールドのリストを使用し、新しい DynamicFrame を作成する方法を示しています。

注記

この例で使用されているデータセットにアクセスするには、「コード例: データの結合と関係付け」を参照し、「ステップ 1: Amazon S3 バケット内のデータをクロールする」の手順に従います。

# Example: Use select_fields to select specific fields from a DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Create a DynamicFrame and view its schema persons = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="persons_json" ) print("Schema for the persons DynamicFrame:") persons.printSchema() # Create a new DynamicFrame with chosen fields names = persons.select_fields(paths=["family_name", "given_name"]) print("Schema for the names DynamicFrame, created with select_fields:") names.printSchema() names.toDF().show()
Schema for the persons DynamicFrame: root |-- family_name: string |-- name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- gender: string |-- image: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- sort_name: string |-- images: array | |-- element: struct | | |-- url: string |-- given_name: string |-- birth_date: string |-- id: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- death_date: string Schema for the names DynamicFrame: root |-- family_name: string |-- given_name: string +-----------+----------+ |family_name|given_name| +-----------+----------+ | Collins| Michael| | Huizenga| Bill| | Clawson| Curtis| | Solomon| Gerald| | Rigell| Edward| | Crapo| Michael| | Hutto| Earl| | Ertel| Allen| | Minish| Joseph| | Andrews| Robert| | Walden| Greg| | Kazen| Abraham| | Turner| Michael| | Kolbe| James| | Lowenthal| Alan| | Capuano| Michael| | Schrader| Kurt| | Nadler| Jerrold| | Graves| Tom| | McMillan| John| +-----------+----------+ only showing top 20 rows

スピゴット

spigot(path, options={})

ジョブで実行された変換が確認しやすくなるように、サンプルレコードを指定した送信先に書き込みます。

  • path – 書き込み先へのパス (必須)。

  • options – オプションを指定するキーと値のペア (オプション)。"topk" オプションは、最初の k レコードを書き込むことを指定します。"prob" オプションは、指定されたレコードを選択する確率(10 進数)を指定します。これを使用して、書き込むレコードを選択できます。

  • transformation_ctx - 状態情報を識別するために使用される一意の文字列 (オプション)。

例: spigot を使用して DynamicFrame のサンプルフィールドを Amazon S3 に書き込む

このコード例では、spigot メソッドを使用して、select_fields 変換を適用した後に Amazon S3 バケットにサンプルレコードを書き込みます。

データセットの例

注記

この例で使用されているデータセットにアクセスするには、「コード例: データの結合と関係付け」を参照し、「ステップ 1: Amazon S3 バケット内のデータをクロールする」の手順に従います。

この例では、次のスキーマを持つ persons と呼ばれる DynamicFrame を使用しています。

root |-- family_name: string |-- name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- gender: string |-- image: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- sort_name: string |-- images: array | |-- element: struct | | |-- url: string |-- given_name: string |-- birth_date: string |-- id: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- death_date: string

コードの例

# Example: Use spigot to write sample records # to a destination during a transformation # from pyspark.context import SparkContext. # Replace DOC-EXAMPLE-BUCKET with your own location. from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Load table data into a DynamicFrame persons = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="persons_json" ) # Perform the select_fields on the DynamicFrame persons = persons.select_fields(paths=["family_name", "given_name", "birth_date"]) # Use spigot to write a sample of the transformed data # (the first 10 records) spigot_output = persons.spigot( path="s3://DOC-EXAMPLE-BUCKET", options={"topk": 10} )

spigot で Amazon S3 に書き込むデータの例を次に示します。サンプルコードで options={"topk": 10} を指定したため、サンプルデータには最初の 10 個のレコードが含まれます。

{"family_name":"Collins","given_name":"Michael","birth_date":"1944-10-15"} {"family_name":"Huizenga","given_name":"Bill","birth_date":"1969-01-31"} {"family_name":"Clawson","given_name":"Curtis","birth_date":"1959-09-28"} {"family_name":"Solomon","given_name":"Gerald","birth_date":"1930-08-14"} {"family_name":"Rigell","given_name":"Edward","birth_date":"1960-05-28"} {"family_name":"Crapo","given_name":"Michael","birth_date":"1951-05-20"} {"family_name":"Hutto","given_name":"Earl","birth_date":"1926-05-12"} {"family_name":"Ertel","given_name":"Allen","birth_date":"1937-11-07"} {"family_name":"Minish","given_name":"Joseph","birth_date":"1916-09-01"} {"family_name":"Andrews","given_name":"Robert","birth_date":"1957-08-04"}

split_fields

split_fields(paths, name1, name2, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

2 つの DynamicFrames を含む新しい DynamicFrameCollection を返します。1 つ目の DynamicFrame には分割されたすべてのノードが含まれ、2 つ目には残りのノードが含まれています。

  • paths - 文字列のリスト。各文字列は新しい DynamicFrame に分割するノードのフルパスです。

  • name1 - 分割された DynamicFrame の名前文字列。

  • name2 - 指定されたノードが分割された後に残る DynamicFrame の名前文字列。

  • transformation_ctx - 状態情報を識別するために使用される一意の文字列 (オプション)。

  • info - この変換のエラー報告に関連付ける文字列 (オプション)。

  • stageThreshold - この変換中に発生した、プロセスがエラーを出力するエラーの数 (オプション)。デフォルトはゼロで、エラーを出力しないことを示します。

  • totalThreshold - この変換までに発生したエラーのうち、プロセスがエラーを出力するエラーの数 (オプション)。デフォルトはゼロで、エラーを出力しないことを示します。

例: split_fields を使用して、選択したフィールドを別の DynamicFrame に分割する

このコード例では、split_fields メソッドを使用して、指定したフィールドのリストを別の DynamicFrame に分割します。

データセットの例

この例では、legislators_relationalized という名前のコレクションからの l_root_contact_details と呼ばれる DynamicFrame が使用されています。

l_root_contact_details には、次のスキーマとエントリが含まれています。

root |-- id: long |-- index: int |-- contact_details.val.type: string |-- contact_details.val.value: string +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 1| 0| phone| 202-225-5265| | 1| 1| twitter| kathyhochul| | 2| 0| phone| 202-225-3252| | 2| 1| twitter| repjackyrosen| | 3| 0| fax| 202-225-1314| | 3| 1| phone| 202-225-3772| ...

コードの例

# Example: Use split_fields to split selected # fields into a separate DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Load the input DynamicFrame and inspect its schema frame_to_split = legislators_relationalized.select("l_root_contact_details") print("Inspect the input DynamicFrame schema:") frame_to_split.printSchema() # Split id and index fields into a separate DynamicFrame split_fields_collection = frame_to_split.split_fields(["id", "index"], "left", "right") # Inspect the resulting DynamicFrames print("Inspect the schemas of the DynamicFrames created with split_fields:") split_fields_collection.select("left").printSchema() split_fields_collection.select("right").printSchema()
Inspect the input DynamicFrame's schema: root |-- id: long |-- index: int |-- contact_details.val.type: string |-- contact_details.val.value: string Inspect the schemas of the DynamicFrames created with split_fields: root |-- id: long |-- index: int root |-- contact_details.val.type: string |-- contact_details.val.value: string

split_rows

split_rows(comparison_dict, name1, name2, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

DynamicFrame の 1 つ以上の行を、新しい DynamicFrame に分割します。

このメソッドは、2 つの DynamicFrames を含む新しい DynamicFrameCollection を返します。1 つ目の DynamicFrame には分割されたすべての行が含まれ、2 つ目には残りの行が含まれています。

  • comparison_dict – 列へのパスを示すためのキーと、列の値の比較対象である値に対しコンパレータをマッピングする別のディクショナリを示すための値を持つ、ディクショナリ。例えば、{"age": {">": 10, "<": 20}} は、age 列の値が 10 より大きく 20 より小さいすべての行を分割します。

  • name1 - 分割された DynamicFrame の名前文字列。

  • name2 - 指定されたノードが分割された後に残る DynamicFrame の名前文字列。

  • transformation_ctx - 状態情報を識別するために使用される一意の文字列 (オプション)。

  • info - この変換のエラー報告に関連付ける文字列 (オプション)。

  • stageThreshold - この変換中に発生した、プロセスがエラーを出力するエラーの数 (オプション)。デフォルトはゼロで、エラーを出力しないことを示します。

  • totalThreshold - この変換までに発生したエラーのうち、プロセスがエラーを出力するエラーの数 (オプション)。デフォルトはゼロで、エラーを出力しないことを示します。

例: split_rows を使用して、DynamicFrame 内の行を分割する

このコード例では、split_rows メソッドを使用して、id フィールド値に基づいて DynamicFrame の行を分割します。

データセットの例

この例では、legislators_relationalized という名前のコレクションから選択された l_root_contact_details と呼ばれる DynamicFrame が使用されています。

l_root_contact_details には、次のスキーマとエントリが含まれています。

root |-- id: long |-- index: int |-- contact_details.val.type: string |-- contact_details.val.value: string +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 1| 0| phone| 202-225-5265| | 1| 1| twitter| kathyhochul| | 2| 0| phone| 202-225-3252| | 2| 1| twitter| repjackyrosen| | 3| 0| fax| 202-225-1314| | 3| 1| phone| 202-225-3772| | 3| 2| twitter| MikeRossUpdates| | 4| 0| fax| 202-225-1314| | 4| 1| phone| 202-225-3772| | 4| 2| twitter| MikeRossUpdates| | 5| 0| fax| 202-225-1314| | 5| 1| phone| 202-225-3772| | 5| 2| twitter| MikeRossUpdates| | 6| 0| fax| 202-225-1314| | 6| 1| phone| 202-225-3772| | 6| 2| twitter| MikeRossUpdates| | 7| 0| fax| 202-225-1314| | 7| 1| phone| 202-225-3772| | 7| 2| twitter| MikeRossUpdates| | 8| 0| fax| 202-225-1314| +---+-----+------------------------+-------------------------+

コードの例

# Example: Use split_rows to split up # rows in a DynamicFrame based on value from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Retrieve the DynamicFrame to split frame_to_split = legislators_relationalized.select("l_root_contact_details") # Split up rows by ID split_rows_collection = frame_to_split.split_rows({"id": {">": 10}}, "high", "low") # Inspect the resulting DynamicFrames print("Inspect the DynamicFrame that contains IDs < 10") split_rows_collection.select("low").toDF().show() print("Inspect the DynamicFrame that contains IDs > 10") split_rows_collection.select("high").toDF().show()
Inspect the DynamicFrame that contains IDs < 10 +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 1| 0| phone| 202-225-5265| | 1| 1| twitter| kathyhochul| | 2| 0| phone| 202-225-3252| | 2| 1| twitter| repjackyrosen| | 3| 0| fax| 202-225-1314| | 3| 1| phone| 202-225-3772| | 3| 2| twitter| MikeRossUpdates| | 4| 0| fax| 202-225-1314| | 4| 1| phone| 202-225-3772| | 4| 2| twitter| MikeRossUpdates| | 5| 0| fax| 202-225-1314| | 5| 1| phone| 202-225-3772| | 5| 2| twitter| MikeRossUpdates| | 6| 0| fax| 202-225-1314| | 6| 1| phone| 202-225-3772| | 6| 2| twitter| MikeRossUpdates| | 7| 0| fax| 202-225-1314| | 7| 1| phone| 202-225-3772| | 7| 2| twitter| MikeRossUpdates| | 8| 0| fax| 202-225-1314| +---+-----+------------------------+-------------------------+ only showing top 20 rows Inspect the DynamicFrame that contains IDs > 10 +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 11| 0| phone| 202-225-5476| | 11| 1| twitter| RepDavidYoung| | 12| 0| phone| 202-225-4035| | 12| 1| twitter| RepStephMurphy| | 13| 0| fax| 202-226-0774| | 13| 1| phone| 202-225-6335| | 14| 0| fax| 202-226-0774| | 14| 1| phone| 202-225-6335| | 15| 0| fax| 202-226-0774| | 15| 1| phone| 202-225-6335| | 16| 0| fax| 202-226-0774| | 16| 1| phone| 202-225-6335| | 17| 0| fax| 202-226-0774| | 17| 1| phone| 202-225-6335| | 18| 0| fax| 202-226-0774| | 18| 1| phone| 202-225-6335| | 19| 0| fax| 202-226-0774| | 19| 1| phone| 202-225-6335| | 20| 0| fax| 202-226-0774| | 20| 1| phone| 202-225-6335| +---+-----+------------------------+-------------------------+ only showing top 20 rows

アンボックス

unbox(path, format, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0, **options)

DynamicFrame の文字列フィールドをアンボックス (再フォーマット) し、アンボックスされた DynamicRecords を含む新しい DynamicFrame を返します。

DynamicRecordDynamicFrame 内の論理レコードを表します。これは、自己記述型であり、固定スキーマに準拠しないデータに使用できる点を除いて、Apache Spark DataFrame の行に似ています。

  • path - アンボックスする文字列ノードへのフルパス。

  • format – 形式の仕様 (オプション)。Amazon S3 や、複数の形式をサポートする AWS Glue 接続の場合に使用します。サポートされる形式については、「AWS Glue での入出力のデータ形式に関するオプション」を参照してください。

  • transformation_ctx - 状態情報を識別するために使用される一意の文字列 (オプション)。

  • info - この変換のエラー報告に関連付ける文字列 (オプション)。

  • stageThreshold - この変換中に発生した、プロセスがエラーを出力するエラーの数 (オプション)。デフォルトはゼロで、エラーを出力しないことを示します。

  • totalThreshold - この変換までに発生したエラーのうち、プロセスがエラーを出力するエラーの数 (オプション)。デフォルトはゼロで、エラーを出力しないことを示します。

  • options - 次の 1 つ以上。

    • separator – 区切り文字を含む文字列。

    • escaper – エスケープ文字を含む文字列。

    • skipFirst – 最初のインスタンスをスキップするかどうかを示すブール値。

    • withSchema – スキーマを含む文字列。呼び出す場合は StructType.json( ) を使用する必要があります。

    • withHeader – ヘッダーが含まれているかどうかを示すブール値。

例: unbox を使用して、文字列フィールドを構造体にアンボックスする

このコード例では、unbox メソッドを使用して、DynamicFrame の文字列フィールドを struct 型のフィールドにアンボックスまたは再フォーマットします。

データセットの例

この例では、次のスキーマとエントリを持つ mapped_with_string と呼ばれる DynamicFrame を使用しています。

AddressString という名前のフィールドに注目してください。この例では、このフィールドを構造体にアンボックスしています。

root |-- Average Total Payments: string |-- AddressString: string |-- Average Covered Charges: string |-- DRG Definition: string |-- Average Medicare Payments: string |-- Hospital Referral Region Description: string |-- Address: struct | |-- Zip.Code: string | |-- City: string | |-- Array: array | | |-- element: string | |-- State: string | |-- Street: string |-- Provider Id: string |-- Total Discharges: string |-- Provider Name: string +----------------------+--------------------+-----------------------+--------------------+-------------------------+------------------------------------+--------------------+-----------+----------------+--------------------+ |Average Total Payments| AddressString|Average Covered Charges| DRG Definition|Average Medicare Payments|Hospital Referral Region Description| Address|Provider Id|Total Discharges| Provider Name| +----------------------+--------------------+-----------------------+--------------------+-------------------------+------------------------------------+--------------------+-----------+----------------+--------------------+ | $5777.24|{"Street": "1108 ...| $32963.07|039 - EXTRACRANIA...| $4763.73| AL - Dothan|[36301, DOTHAN, [...| 10001| 91|SOUTHEAST ALABAMA...| | $5787.57|{"Street": "2505 ...| $15131.85|039 - EXTRACRANIA...| $4976.71| AL - Birmingham|[35957, BOAZ, [25...| 10005| 14|MARSHALL MEDICAL ...| | $5434.95|{"Street": "205 M...| $37560.37|039 - EXTRACRANIA...| $4453.79| AL - Birmingham|[35631, FLORENCE,...| 10006| 24|ELIZA COFFEE MEMO...| | $5417.56|{"Street": "50 ME...| $13998.28|039 - EXTRACRANIA...| $4129.16| AL - Birmingham|[35235, BIRMINGHA...| 10011| 25| ST VINCENT'S EAST| ...

コードの例

# Example: Use unbox to unbox a string field # into a struct in a DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) unboxed = mapped_with_string.unbox("AddressString", "json") unboxed.printSchema() unboxed.toDF().show()
root |-- Average Total Payments: string |-- AddressString: struct | |-- Street: string | |-- City: string | |-- State: string | |-- Zip.Code: string | |-- Array: array | | |-- element: string |-- Average Covered Charges: string |-- DRG Definition: string |-- Average Medicare Payments: string |-- Hospital Referral Region Description: string |-- Address: struct | |-- Zip.Code: string | |-- City: string | |-- Array: array | | |-- element: string | |-- State: string | |-- Street: string |-- Provider Id: string |-- Total Discharges: string |-- Provider Name: string +----------------------+--------------------+-----------------------+--------------------+-------------------------+------------------------------------+--------------------+-----------+----------------+--------------------+ |Average Total Payments| AddressString|Average Covered Charges| DRG Definition|Average Medicare Payments|Hospital Referral Region Description| Address|Provider Id|Total Discharges| Provider Name| +----------------------+--------------------+-----------------------+--------------------+-------------------------+------------------------------------+--------------------+-----------+----------------+--------------------+ | $5777.24|[1108 ROSS CLARK ...| $32963.07|039 - EXTRACRANIA...| $4763.73| AL - Dothan|[36301, DOTHAN, [...| 10001| 91|SOUTHEAST ALABAMA...| | $5787.57|[2505 U S HIGHWAY...| $15131.85|039 - EXTRACRANIA...| $4976.71| AL - Birmingham|[35957, BOAZ, [25...| 10005| 14|MARSHALL MEDICAL ...| | $5434.95|[205 MARENGO STRE...| $37560.37|039 - EXTRACRANIA...| $4453.79| AL - Birmingham|[35631, FLORENCE,...| 10006| 24|ELIZA COFFEE MEMO...| | $5417.56|[50 MEDICAL PARK ...| $13998.28|039 - EXTRACRANIA...| $4129.16| AL - Birmingham|[35235, BIRMINGHA...| 10011| 25| ST VINCENT'S EAST| | $5658.33|[1000 FIRST STREE...| $31633.27|039 - EXTRACRANIA...| $4851.44| AL - Birmingham|[35007, ALABASTER...| 10016| 18|SHELBY BAPTIST ME...| | $6653.80|[2105 EAST SOUTH ...| $16920.79|039 - EXTRACRANIA...| $5374.14| AL - Montgomery|[36116, MONTGOMER...| 10023| 67|BAPTIST MEDICAL C...| | $5834.74|[2000 PEPPERELL P...| $11977.13|039 - EXTRACRANIA...| $4761.41| AL - Birmingham|[36801, OPELIKA, ...| 10029| 51|EAST ALABAMA MEDI...| | $8031.12|[619 SOUTH 19TH S...| $35841.09|039 - EXTRACRANIA...| $5858.50| AL - Birmingham|[35233, BIRMINGHA...| 10033| 32|UNIVERSITY OF ALA...| | $6113.38|[101 SIVLEY RD, H...| $28523.39|039 - EXTRACRANIA...| $5228.40| AL - Huntsville|[35801, HUNTSVILL...| 10039| 135| HUNTSVILLE HOSPITAL| | $5541.05|[1007 GOODYEAR AV...| $75233.38|039 - EXTRACRANIA...| $4386.94| AL - Birmingham|[35903, GADSDEN, ...| 10040| 34|GADSDEN REGIONAL ...| | $5461.57|[600 SOUTH THIRD ...| $67327.92|039 - EXTRACRANIA...| $4493.57| AL - Birmingham|[35901, GADSDEN, ...| 10046| 14|RIVERVIEW REGIONA...| | $5356.28|[4370 WEST MAIN S...| $39607.28|039 - EXTRACRANIA...| $4408.20| AL - Dothan|[36305, DOTHAN, [...| 10055| 45| FLOWERS HOSPITAL| | $5374.65|[810 ST VINCENT'S...| $22862.23|039 - EXTRACRANIA...| $4186.02| AL - Birmingham|[35205, BIRMINGHA...| 10056| 43|ST VINCENT'S BIRM...| | $5366.23|[400 EAST 10TH ST...| $31110.85|039 - EXTRACRANIA...| $4376.23| AL - Birmingham|[36207, ANNISTON,...| 10078| 21|NORTHEAST ALABAMA...| | $5282.93|[1613 NORTH MCKEN...| $25411.33|039 - EXTRACRANIA...| $4383.73| AL - Mobile|[36535, FOLEY, [1...| 10083| 15|SOUTH BALDWIN REG...| | $5676.55|[1201 7TH STREET ...| $9234.51|039 - EXTRACRANIA...| $4509.11| AL - Huntsville|[35609, DECATUR, ...| 10085| 27|DECATUR GENERAL H...| | $5930.11|[6801 AIRPORT BOU...| $15895.85|039 - EXTRACRANIA...| $3972.85| AL - Mobile|[36608, MOBILE, [...| 10090| 27| PROVIDENCE HOSPITAL| | $6192.54|[809 UNIVERSITY B...| $19721.16|039 - EXTRACRANIA...| $5179.38| AL - Tuscaloosa|[35401, TUSCALOOS...| 10092| 31|D C H REGIONAL ME...| | $4968.00|[750 MORPHY AVENU...| $10710.88|039 - EXTRACRANIA...| $3898.88| AL - Mobile|[36532, FAIRHOPE,...| 10100| 18| THOMAS HOSPITAL| | $5996.00|[701 PRINCETON AV...| $51343.75|039 - EXTRACRANIA...| $4962.45| AL - Birmingham|[35211, BIRMINGHA...| 10103| 33|BAPTIST MEDICAL C...| +----------------------+--------------------+-----------------------+--------------------+-------------------------+------------------------------------+--------------------+-----------+----------------+--------------------+ only showing top 20 rows

ネスト解除

unnest(transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

DynamicFrame 内のネストされたオブジェクトをネスト解除して、最上位レベルのオブジェクトにし、新しくネスト解除された DynamicFrame を返します。

  • transformation_ctx - 状態情報を識別するために使用される一意の文字列 (オプション)。

  • info - この変換のエラー報告に関連付ける文字列 (オプション)。

  • stageThreshold - この変換中に発生した、プロセスがエラーを出力するエラーの数 (オプション)。デフォルトはゼロで、エラーを出力しないことを示します。

  • totalThreshold - この変換までに発生したエラーのうち、プロセスがエラーを出力するエラーの数 (オプション)。デフォルトはゼロで、エラーを出力しないことを示します。

例: unnest を使用して、ネストされたフィールドを最上位フィールドに変換する

このコード例では、unnest メソッドを使用して、DynamicFrame のネストされたすべてのフィールドを最上位フィールドにフラット化します。

データセットの例

この例では、次のスキーマを持つ mapped_medicare と呼ばれる DynamicFrame を使用しています。Address フィールドは、ネストされたデータを含む唯一のフィールドであることに注意してください。

root |-- Average Total Payments: string |-- Average Covered Charges: string |-- DRG Definition: string |-- Average Medicare Payments: string |-- Hospital Referral Region Description: string |-- Address: struct | |-- Zip.Code: string | |-- City: string | |-- Array: array | | |-- element: string | |-- State: string | |-- Street: string |-- Provider Id: string |-- Total Discharges: string |-- Provider Name: string

コードの例

# Example: Use unnest to unnest nested # objects in a DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Unnest all nested fields unnested = mapped_medicare.unnest() unnested.printSchema()
root |-- Average Total Payments: string |-- Average Covered Charges: string |-- DRG Definition: string |-- Average Medicare Payments: string |-- Hospital Referral Region Description: string |-- Address.Zip.Code: string |-- Address.City: string |-- Address.Array: array | |-- element: string |-- Address.State: string |-- Address.Street: string |-- Provider Id: string |-- Total Discharges: string |-- Provider Name: string

unnest_ddb_json

特に DynamoDB JSON 構造体にある DynamicFrame でネストされた列をネスト解除すると、新しくネスト解除された DynamicFrame が返されます。構造体型の配列のある列は、ネスト解除されません。これは、通常の unnest 変換とは異なる特殊なタイプのネスト解除変換で、データが DynamoDB JSON 構造体に格納されている必要があることに注意してください。詳細については、「DynamoDB JSON」を参照してください。

unnest_ddb_json(transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

  • transformation_ctx - 状態情報を識別するために使用される一意の文字列 (オプション)。

  • info - この変換のエラー報告に関連付ける文字列 (オプション)。

  • stageThreshold - この変換中にプロセスで発生するエラーの数 (オプション: デフォルトではゼロ、プロセスがエラーを出力しないことを示します)。

  • totalThreshold - この変換までに発生したエラーのうち、プロセスでエラーとなるエラーの数 (オプション: デフォルトではゼロ、プロセスがエラーを出力しないことを示します)。

例えば、DynamoDB JSON 構造体のあるエクスポートを読み取るスキーマは次のようになります。

root |-- Item: struct | |-- ColA: struct | | |-- S: string | |-- ColB: struct | | |-- S: string | |-- ColC: struct | | |-- N: string | |-- ColD: struct | | |-- L: array | | | |-- element: null

unnest_ddb_json() 変換は、以下のように変換します。

root |-- ColA: string |-- ColB: string |-- ColC: string |-- ColD: array | |-- element: null

次のコード例は、AWS Glue DynamoDB エクスポートコネクタを使用し、DynamoDB JSON unnest を呼び出し、パーティションの数を表示します。

import sys from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job from awsglue.utils import getResolvedOptions args = getResolvedOptions(sys.argv, ["JOB_NAME"]) glue_context= GlueContext(SparkContext.getOrCreate()) job = Job(glue_context) job.init(args["JOB_NAME"], args) dyf = glue_context.create_dynamic_frame.from_options( connection_type="dynamodb", connection_options={ "dynamodb.export": "ddb", "dynamodb.tableArn": "<test_source>", "dynamodb.s3.bucket": "<bucket name>", "dynamodb.s3.prefix": "<bucket prefix>", "dynamodb.s3.bucketOwner": "<account_id>", } ) unnested = dynamicFrame.unnest_ddb_json() print(unnested.getNumPartitions()) job.commit()

書き込み

write(connection_type, connection_options, format, format_options, accumulator_size)

この DynamicFrameGlueContext クラス から指定された接続タイプの DataSink (オブジェクト) を取得し、この DynamicFrame のコンテンツの書式設定および書き込みに使用します。指定されたとおりに書式設定され、書き込まれる新しい DynamicFrame を返します。

  • connection_type - 使用する接続タイプ。有効な値には、s3mysqlpostgresqlredshiftsqlserver、および oracle があります。

  • connection_options - 使用する接続オプション (オプション)。s3connection_type では、Amazon S3 パスが定義されています。

    connection_options = {"path": "s3://aws-glue-target/temp"}

    JDBC 接続の場合、いくつかのプロパティを定義する必要があります。データベース名は URL の一部である必要があることに注意してください。オプションで接続オプションに含めることができます。

    connection_options = {"url": "jdbc-url/database", "user": "username", "password": "password","dbtable": "table-name", "redshiftTmpDir": "s3-tempdir-path"}
  • format – 形式の仕様 (オプション)。これは、Amazon Simple Storage Service (Amazon S3)、または複数の形式をサポートする AWS Glue 接続で使用されます。サポートされる形式については、「AWS Glue での入出力のデータ形式に関するオプション」を参照してください。

  • format_options – 指定された形式についてのオプション。サポートされる形式については、「AWS Glue での入出力のデータ形式に関するオプション」を参照してください。

  • accumulator_size - 使用する累積サイズ (オプション)。

 — errors —

assertErrorThreshold

assertErrorThreshold( ) - この DynamicFrame を作成した変換エラーに対するアサーション。基盤になる DataFrame から Exception を返します。

errorsAsDynamicFrame

errorsAsDynamicFrame( ) - 内部にネストされたエラーレコードを持つ DynamicFrame を返します。

例: errorsAsDynamicFrame を使用してエラーレコードを表示する

次のコード例は、errorsAsDynamicFrame メソッドを使用して DynamicFrame のエラーレコードを表示する方法を示しています。

データセットの例

この例では、JSON として Amazon S3 にアップロードできる次のデータセットを使用します。2 つ目のレコードの形式に誤りがあることに注意してください。通常、SparkSQL を使用すると、不正な形式のデータによってファイルの解析が中断されます。ただし、DynamicFrame は、不正な形式の問題を認識し、不正な行を個別に処理できるエラーレコードに変換します。

{"id": 1, "name": "george", "surname": "washington", "height": 178} {"id": 2, "name": "benjamin", "surname": "franklin", {"id": 3, "name": "alexander", "surname": "hamilton", "height": 171} {"id": 4, "name": "john", "surname": "jay", "height": 190}

コードの例

# Example: Use errorsAsDynamicFrame to view error records. # Replace s3://DOC-EXAMPLE-S3-BUCKET/error_data.json with your location. from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Create errors DynamicFrame, view schema errors = glueContext.create_dynamic_frame.from_options( "s3", {"paths": ["s3://DOC-EXAMPLE-S3-BUCKET/error_data.json"]}, "json" ) print("Schema of errors DynamicFrame:") errors.printSchema() # Show that errors only contains valid entries from the dataset print("errors contains only valid records from the input dataset (2 of 4 records)") errors.toDF().show() # View errors print("Errors count:", str(errors.errorsCount())) print("Errors:") errors.errorsAsDynamicFrame().toDF().show() # View error fields and error data error_record = errors.errorsAsDynamicFrame().toDF().head() error_fields = error_record["error"] print("Error fields: ") print(error_fields.asDict().keys()) print("\nError record data:") for key in error_fields.asDict().keys(): print("\n", key, ": ", str(error_fields[key]))
Schema of errors DynamicFrame: root |-- id: int |-- name: string |-- surname: string |-- height: int errors contains only valid records from the input dataset (2 of 4 records) +---+------+----------+------+ | id| name| surname|height| +---+------+----------+------+ | 1|george|washington| 178| | 4| john| jay| 190| +---+------+----------+------+ Errors count: 1 Errors: +--------------------+ | error| +--------------------+ |[[ File "/tmp/20...| +--------------------+ Error fields: dict_keys(['callsite', 'msg', 'stackTrace', 'input', 'bytesread', 'source', 'dynamicRecord']) Error record data: callsite : Row(site=' File "/tmp/2060612586885849088", line 549, in <module>\n sys.exit(main())\n File "/tmp/2060612586885849088", line 523, in main\n response = handler(content)\n File "/tmp/2060612586885849088", line 197, in execute_request\n result = node.execute()\n File "/tmp/2060612586885849088", line 103, in execute\n exec(code, global_dict)\n File "<stdin>", line 10, in <module>\n File "/opt/amazon/lib/python3.6/site-packages/awsglue/dynamicframe.py", line 625, in from_options\n format_options, transformation_ctx, push_down_predicate, **kwargs)\n File "/opt/amazon/lib/python3.6/site-packages/awsglue/context.py", line 233, in create_dynamic_frame_from_options\n source.setFormat(format, **format_options)\n', info='') msg : error in jackson reader stackTrace : com.fasterxml.jackson.core.JsonParseException: Unexpected character ('{' (code 123)): was expecting either valid name character (for unquoted name) or double-quote (for quoted) to start field name at [Source: com.amazonaws.services.glue.readers.BufferedStream@73492578; line: 3, column: 2] at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1581) at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:533) at com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:462) at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleOddName(UTF8StreamJsonParser.java:2012) at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._parseName(UTF8StreamJsonParser.java:1650) at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:740) at com.amazonaws.services.glue.readers.JacksonReader$$anonfun$hasNextGoodToken$1.apply(JacksonReader.scala:57) at com.amazonaws.services.glue.readers.JacksonReader$$anonfun$hasNextGoodToken$1.apply(JacksonReader.scala:57) at scala.collection.Iterator$$anon$9.next(Iterator.scala:162) at scala.collection.Iterator$$anon$16.hasNext(Iterator.scala:599) at scala.collection.Iterator$$anon$16.hasNext(Iterator.scala:598) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at com.amazonaws.services.glue.readers.JacksonReader$$anonfun$1.apply(JacksonReader.scala:120) at com.amazonaws.services.glue.readers.JacksonReader$$anonfun$1.apply(JacksonReader.scala:116) at com.amazonaws.services.glue.DynamicRecordBuilder.handleErr(DynamicRecordBuilder.scala:209) at com.amazonaws.services.glue.DynamicRecordBuilder.handleErrorWithException(DynamicRecordBuilder.scala:202) at com.amazonaws.services.glue.readers.JacksonReader.nextFailSafe(JacksonReader.scala:116) at com.amazonaws.services.glue.readers.JacksonReader.next(JacksonReader.scala:109) at com.amazonaws.services.glue.readers.JSONReader.next(JSONReader.scala:247) at com.amazonaws.services.glue.hadoop.TapeHadoopRecordReaderSplittable.nextKeyValue(TapeHadoopRecordReaderSplittable.scala:103) at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:230) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:121) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) input : bytesread : 252 source : dynamicRecord : Row(id=2, name='benjamin', surname='franklin')

errorsCount

errorsCount( ) - DynamicFrame 内のエラーの総数を返します。

stageErrorsCount

stageErrorsCount - この DynamicFrame を生成するプロセスで発生したエラーの数を返します。