AWS Glue
開発者ガイド

DynamicFrame クラス

Apache Spark の主要な抽象化の 1 つは SparkSQL DataFrame で、これは R と Pandas にある DataFrame 構造に似ています。DataFrame はテーブルと似ており、機能スタイル (マップ/リデュース/フィルタ/その他) 操作と SQL 操作 (選択、プロジェクト、集計) をサポートしています。

DataFrames は、強力で広く使用されていますが、抽出、変換、およびロード (ETL) 操作に関しては制限があります。最も重要なのは、データをロードする前にスキーマを指定する必要があることです。SparkSQL は、データ — に対してパスを 2 つ作ることでこれを解決します。1 つ目はスキーマを推測し、2 つ目はデータをロードします。ただし、この推測は限定されており、実際の煩雑なデータには対応しません。たとえば、同じフィールドが異なるレコードの異なるタイプである可能性があります。Apache Spark は、多くの場合、作業を中断して、元のフィールドテキストを使用してタイプを string として報告します。これは正しくない可能性があり、スキーマの不一致を解決する方法を細かく制御する必要があります。また、大規模なデータセットの場合、ソースデータに対する追加パスが非常に高価になる可能性があります。

これらの制限に対応するために、AWS Glue により DynamicFrame が導入されました。DynamicFrame は、DataFrame と似ていますが、各レコードが自己記述できるため、最初はスキーマは必要ありません。代わりに、AWS Glue は必要に応じてオンザフライでスキーマを計算し、選択 (または共用) タイプを使用してスキーマの不一致を明示的にエンコードします。これらの不整合を解決して、固定スキーマを必要とするデータストアとデータセットを互換性のあるものにできます。

同様に、DynamicRecordDynamicFrame 内の論理レコードを表します。これは、Spark DataFrame の行と似ていますが、自己記述型であり、固定スキーマに適合しないデータに使用できます。

スキーマの不一致を解決したら、DynamicFramesDataFrames との間で変換することができます。

 — 構造 —

__init__

__init__(jdf, glue_ctx, name)

  • jdf – Java 仮想マシン (JVM) 内のデータフレームへの参照。

  • glue_ctxGlueContext クラス オブジェクト。

  • name – オプションの名前文字列。デフォルトでは空。

fromDF

fromDF(dataframe, glue_ctx, name)

DataFrame フィールドを DynamicRecord に変換することにより、DataFrameDynamicFrame に変換します。新しい DynamicFrame を返します。

DynamicRecordDynamicFrame 内の論理レコードを表します。これは、自己記述型であり、固定スキーマに適合しないデータに使用できる点を除いて、Spark DataFrame の行に似ています。

  • dataframe – 変換する Apache Spark SQL DataFrame (必須)。

  • glue_ctx – この変換のコンテキストを指定する GlueContext クラス オブジェクト (必須)。

  • name – 結果の DynamicFrame の名前 (必須)。

toDF

toDF(options)

DynamicRecordsDataFrame フィールドに変換することにより、DynamicFrame を Apache Spark DataFrame に変換します。新しい DataFrame を返します。

DynamicRecordDynamicFrame 内の論理レコードを表します。これは、自己記述型であり、固定スキーマに適合しないデータに使用できる点を除いて、Spark DataFrame の行に似ています。

  • options – オプションのリスト。ProjectCast アクションタイプを選択した場合、ターゲットのタイプを指定します。次に例を示します。

    >>>toDF([ResolveOption("a.b.c", "KeepAsStruct")]) >>>toDF([ResolveOption("a.b.c", "Project", DoubleType())])

 — 情報 —

count

count( ) – 基盤となる DataFrame の行数を返します。

schema

schema( ) – この DynamicFrame のスキーマを返します。使用できない場合は、基盤となる DataFrame のスキーマを返します。

printSchema

printSchema( ) – 基盤となる DataFrame のスキーマを表示します。

show

show(num_rows) – 基盤となる DataFrame から、指定された行数を表示します。

 — 変換 —

apply_mapping

apply_mapping(mappings, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

宣言型のマッピングをこの DynamicFrame に適用し、それらのマッピングが適用された新しい DynamicFrame を返します。

  • mappings – マッピングタプルのリストで、それぞれが (ソース列、ソースタイプ、ターゲット列、ターゲットタイプ) で構成されます。必須。

  • transformation_ctx – 状態情報を識別するために使用される一意の文字列 (オプション)。

  • info – この変換のエラー報告に関連付ける文字列 (オプション)。

  • stageThreshold – この変換中にプロセスで発生するエラーの数 (オプション: デフォルトではゼロ、プロセスがエラーを出力しないことを示します)。

  • totalThreshold – この変換までに発生したエラーのうち、プロセスでエラーとなるエラーの数 (オプション: デフォルトではゼロ、プロセスがエラーを出力しないことを示します)。

drop_fields

drop_fields(paths, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

FlatMap クラス 変換を呼び出して、DynamicFrame からフィールドを削除します。指定されたフィールドが削除された新しい DynamicFrame を返します。

  • paths – それぞれが削除するフィールドノードへのフルパスを含む文字列のリスト。

  • transformation_ctx – 状態情報を識別するために使用される一意の文字列 (オプション)。

  • info – この変換のエラー報告に関連付ける文字列 (オプション)。

  • stageThreshold – この変換中にプロセスで発生するエラーの数 (オプション: デフォルトではゼロ、プロセスがエラーを出力しないことを示します)。

  • totalThreshold – この変換までに発生したエラーのうち、プロセスでエラーとなるエラーの数 (オプション: デフォルトではゼロ、プロセスがエラーを出力しないことを示します)。

filter

filter(f, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

指定された述語関数 f を満たす入力 DynamicFrame 内のすべての DynamicRecords を選択することで構築された、新しい DynamicFrame を返します。

  • fDynamicFrame に適用する述語関数。この関数は DynamicRecord を引数として取り、DynamicRecord がフィルタ要件を満たす場合は True を返し、そうでない場合は False を返します (必須)。

    DynamicRecordDynamicFrame 内の論理レコードを表します。これは、自己記述型であり、固定スキーマに適合しないデータに使用できる点を除いて、Spark DataFrame の行に似ています。

  • transformation_ctx – 状態情報を識別するために使用される一意の文字列 (オプション)。

  • info – この変換のエラー報告に関連付ける文字列 (オプション)。

  • stageThreshold – この変換中にプロセスで発生するエラーの数 (オプション: デフォルトではゼロ、プロセスがエラーを出力しないことを示します)。

  • totalThreshold – この変換までに発生したエラーのうち、プロセスでエラーとなるエラーの数 (オプション: デフォルトではゼロ、プロセスがエラーを出力しないことを示します)。

filter 変換の使用方法の例については、「フィルタクラス」を参照してください。

join

join(paths1, paths2, frame2, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

別の DynamicFrame と等価結合を実行し、結果の DynamicFrame を返します。

  • paths1 – 結合するこのフレームのキーのリスト。

  • paths2 – 結合する別のフレームのキーのリスト。

  • frame2 – 結合する他の DynamicFrame

  • transformation_ctx – 状態情報を識別するために使用される一意の文字列 (オプション)。

  • info – この変換のエラー報告に関連付ける文字列 (オプション)。

  • stageThreshold – この変換中にプロセスで発生するエラーの数 (オプション: デフォルトではゼロ、プロセスがエラーを出力しないことを示します)。

  • totalThreshold – この変換までに発生したエラーのうち、プロセスでエラーとなるエラーの数 (オプション: デフォルトではゼロ、プロセスがエラーを出力しないことを示します)。

map

map(f, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

指定したマッピング関数を元の DynamicFrame のすべてのレコードに適用した結果の新しい DynamicFrame を返します。

  • fDynamicFrame 内のすべてのレコードに適用されるマッピング関数。この関数は、DynamicRecord を引数として取り、新しい DynamicRecord を返す必要があります (必須)。

    DynamicRecordDynamicFrame 内の論理レコードを表します。これは、自己記述型であり、固定スキーマに適合しないデータに使用できる点を除いて、Apache Spark DataFrame の行に似ています。

  • transformation_ctx – 状態情報を識別するために使用される一意の文字列 (オプション)。

  • info – 変換のエラーに関連付けられた文字列 (オプション)。

  • stageThreshold – エラーが発生する前に変換で発生する可能性のあるエラーの最大数 (オプション、デフォルト値は 0)。

  • totalThreshold – エラーを処理する前に発生する可能性のあるエラーの最大数 (オプション、デフォルト値は 0)。

map 変換の使用方法の例については、「マップクラス」を参照してください。

関係付け

relationalize(root_table_name, staging_path, options, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

ネストされた列をネスト解除し、配列の列をピボットすることによって生成されるフレームのリストを生成することにより、DynamicFrame を関係付けます。ピボットされた配列の列は、フェーズのネスト解除時に生成された結合キーを使用してルートテーブルに結合できます。

  • root_table_name – ルートテーブルの名前。

  • staging_path – ピボットテーブルのパーティションを CSV 形式で保存するパス (オプション)。ピボットされたテーブルはこのパスから読み取ります。

  • options – オプションのパラメータのディクショナリ。

  • transformation_ctx – 状態情報を識別するために使用される一意の文字列 (オプション)。

  • info – この変換のエラー報告に関連付ける文字列 (オプション)。

  • stageThreshold – この変換中にプロセスで発生するエラーの数 (オプション: デフォルトではゼロ、プロセスがエラーを出力しないことを示します)。

  • totalThreshold – この変換までに発生したエラーのうち、プロセスでエラーとなるエラーの数 (オプション: デフォルトではゼロ、プロセスがエラーを出力しないことを示します)。

rename_field

rename_field(oldName, newName, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

この DynamicFrame のフィールドの名前を変更し、フィールドの名前が変更された新しい DynamicFrame を返します。

  • oldName – 名前を変更するノードへのフルパス。

    古い名前にドットが含まれている場合、RenameField はバックティック (`) で囲まなければ機能しません。たとえば、this.old.namethisNewName に置き換えるには、rename_field を次のように呼び出します。

    newDyF = oldDyF.rename_field("`this.old.name`", "thisNewName")
  • newName – 完全パスとしての新しい名前。

  • transformation_ctx – 状態情報を識別するために使用される一意の文字列 (オプション)。

  • info – この変換のエラー報告に関連付ける文字列 (オプション)。

  • stageThreshold – この変換中にプロセスで発生するエラーの数 (オプション: デフォルトではゼロ、プロセスがエラーを出力しないことを示します)。

  • totalThreshold – この変換までに発生したエラーのうち、プロセスでエラーとなるエラーの数 (オプション: デフォルトではゼロ、プロセスがエラーを出力しないことを示します)。

resolveChoice

resolveChoice(specs = None, option="", transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

この DynamicFrame 内で選択タイプを解決し、新しい DynamicFrame を返します。

  • specs – それぞれがタプルの形式の、解決する特定のあいまいさのリスト (path, action)path 値は特定のあいまいな要素を識別し、action 値は対応する解決を識別します。specs パラメータおよび option パラメータのうち 1 つのみを使用できます。spec パラメータが None ではない場合、option パラメータは空の文字列である必要があります。逆に、option が空の文字列ではない場合、spec パラメータは None である必要があります。どちらのパラメータも指定されていない場合、AWS Glue はスキーマを解析し、それを使用してあいまいさを解決します。

    specs タプルの action 部分は、次の 4 つの解決策のうちの 1 つを指定できます。

    • cast: キャストするタイプを指定できます (例: cast:int)。

    • make_cols: データを平坦化することで潜在的なあいまいさを解消します。たとえば、columnAint または string の場合、解決策は、作成された DynamicFramecolumnA_int および columnA_string という名前の 2 つの列を生成することです。

    • make_struct: 構造体を使用してデータを表現することで、潜在的なあいまいさを解決します。たとえば、列のデータが int または string の場合、make_struct アクションを使用すると、作成された DynamicFrame に、それぞれが int および string の両方を含む構造体の列が生成されます。

    • project: 可能なデータ型の 1 つにすべてのデータを投影することで潜在的なあいまいさを解消します。たとえば、列のデータが int または string の場合、project:string アクションを使用すると、すべての int 値が文字列に変換されている、作成された DynamicFrame に列が生成されます。

    path で配列を識別する場合は、あいまいさを避けるために配列名の後に空の角括弧を置きます。たとえば、使用しているデータが次のように構造化されているとします。

    "myList": [ { "price": 100.00 }, { "price": "$100.00" } ]

    文字列バージョンではなく、数値バージョンの料金を使用する場合は、path"myList[].price" に設定し、action"cast:double" に設定できます。

  • optionspecs パラメータが None の場合のデフォルトの解決アクションです。specs パラメータが None ではない場合、空の文字列以外に設定することはできません。

  • transformation_ctx – 状態情報を識別するために使用される一意の文字列 (オプション)。

  • info – この変換のエラー報告に関連付ける文字列 (オプション)。

  • stageThreshold – この変換中にプロセスで発生するエラーの数 (オプション: デフォルトではゼロ、プロセスがエラーを出力しないことを示します)。

  • totalThreshold – この変換までに発生したエラーのうち、プロセスでエラーとなるエラーの数 (オプション: デフォルトではゼロ、プロセスがエラーを出力しないことを示します)。

df1 = df.resolveChoice(option = "make_cols") df2 = df.resolveChoice(specs = [("a.b", "make_struct"), ("c.d", "cast:double")])

select_fields

select_fields(paths, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

選択したフィールドを含む新しい DynamicFrame を返します。

  • paths – 文字列のリスト。各文字列は選択する最上位ノードへのパスです。

  • transformation_ctx – 状態情報を識別するために使用される一意の文字列 (オプション)。

  • info – この変換のエラー報告に関連付ける文字列 (オプション)。

  • stageThreshold – この変換中にプロセスで発生するエラーの数 (オプション: デフォルトではゼロ、プロセスがエラーを出力しないことを示します)。

  • totalThreshold – この変換までに発生したエラーのうち、プロセスでエラーとなるエラーの数 (オプション: デフォルトではゼロ、プロセスがエラーを出力しないことを示します)。

スピゴット

spigot(path, options={})

変換中にサンプルレコードを指定した場所に書き込み、追加の書き込みステップで入力 DynamicFrame を返します。

  • path – 書き込み先へのパス (必須)。

  • options – オプションを指定するキーと値のペア (オプション)。"topk" オプションは、最初の k レコードを書き込むことを指定します。"prob" オプションは、書き込むレコードを選択するために使用される、指定されたレコードを選択する可能性を (小数として) 指定します。

  • transformation_ctx – 状態情報を識別するために使用される一意の文字列 (オプション)。

split_fields

split_fields(paths, name1, name2, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

2 つの DynamicFrames を含む新しい DynamicFrameCollection を返します。1 つ目は分割されたすべてのノードを含み、2 つ目には残っているノードが含まれます。

  • paths – 文字列のリスト。各文字列は新しい DynamicFrame に分割するノードのフルパスです。

  • name1 – 分割された DynamicFrame の名前文字列。

  • name2 – 指定されたノードが分割された後に残る DynamicFrame の名前文字列。

  • transformation_ctx – 状態情報を識別するために使用される一意の文字列 (オプション)。

  • info – この変換のエラー報告に関連付ける文字列 (オプション)。

  • stageThreshold – この変換中にプロセスで発生するエラーの数 (オプション: デフォルトではゼロ、プロセスがエラーを出力しないことを示します)。

  • totalThreshold – この変換までに発生したエラーのうち、プロセスでエラーとなるエラーの数 (オプション: デフォルトではゼロ、プロセスがエラーを出力しないことを示します)。

split_rows

DynamicFrame の 1 つ以上の行を、新しい DynamicFrame に分割します。

split_rows(comparison_dict, name1, name2, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

2 つの DynamicFrames を含む新しい DynamicFrameCollection を返します。1 つ目には分割されたすべての行が入り、2 つ目には残りの行が入ります。

  • comparison_dict – キーが列へのパスであり、その値が、列値が比較される値にコンパレータをマッピングするための別のディクショナリであるディクショナリ。たとえば、{"age": {">": 10, "<": 20}} は、age 列の値が 10 より大きく 20 より小さいすべての行を分割します。

  • name1 – 分割された DynamicFrame の名前文字列。

  • name2 – 指定されたノードが分割された後に残る DynamicFrame の名前文字列。

  • transformation_ctx – 状態情報を識別するために使用される一意の文字列 (オプション)。

  • info – この変換のエラー報告に関連付ける文字列 (オプション)。

  • stageThreshold – この変換中にプロセスで発生するエラーの数 (オプション: デフォルトではゼロ、プロセスがエラーを出力しないことを示します)。

  • totalThreshold – この変換までに発生したエラーのうち、プロセスでエラーとなるエラーの数 (オプション: デフォルトではゼロ、プロセスがエラーを出力しないことを示します)。

アンボックス

unbox(path, format, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0, **options)

DynamicFrame の文字列フィールドをアンボックスし、アンボックスされた DynamicRecords を含む DynamicFrame を返します。

DynamicRecordDynamicFrame 内の論理レコードを表します。これは、自己記述型であり、固定スキーマに適合しないデータに使用できる点を除いて、Apache Spark DataFrame の行に似ています。

  • path – アンボックスする文字列ノードへのフルパス。

  • format – 形式の仕様 (オプション)。これは、複数の形式をサポートする Amazon Simple Storage Service (Amazon S3) または AWS Glue 接続に使用します。サポートされる形式については、「AWS Glue での ETL 入力および出力の形式オプション」を参照してください。

  • transformation_ctx – 状態情報を識別するために使用される一意の文字列 (オプション)。

  • info – この変換のエラー報告に関連付ける文字列 (オプション)。

  • stageThreshold – この変換中にプロセスで発生するエラーの数 (オプション: デフォルトではゼロ、プロセスがエラーを出力しないことを示します)。

  • totalThreshold – この変換までに発生したエラーのうち、プロセスでエラーとなるエラーの数 (オプション: デフォルトではゼロ、プロセスがエラーを出力しないことを示します)。

  • options – 次の 1 つ以上。

    • separator – 区切り文字を含む文字列。

    • escaper – エスケープ文字を含む文字列。

    • skipFirst – 最初のインスタンスをスキップするかどうかを示すブール値。

    • withSchema – スキーマを含む文字列。StructType.json( ) を使用して呼び出す必要があります。

    • withHeader – ヘッダーが含まれているかどうかを示すブール値。

以下に例を示します。unbox("a.b.c", "csv", separator="|")

ネスト解除

DynamicFrame 内のネストされたオブジェクトをネスト解除して、最上位レベルのオブジェクトにし、新しいネストされていない DynamicFrame を返します。

unnest(transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

DynamicFrame 内のネストされたオブジェクトをネスト解除して、最上位レベルのオブジェクトにし、新しいネストされていない DynamicFrame を返します。

  • transformation_ctx – 状態情報を識別するために使用される一意の文字列 (オプション)。

  • info – この変換のエラー報告に関連付ける文字列 (オプション)。

  • stageThreshold – この変換中にプロセスで発生するエラーの数 (オプション: デフォルトではゼロ、プロセスがエラーを出力しないことを示します)。

  • totalThreshold – この変換までに発生したエラーのうち、プロセスでエラーとなるエラーの数 (オプション: デフォルトではゼロ、プロセスがエラーを出力しないことを示します)。

以下に例を示します。unnest( )

書き込み

write(connection_type, connection_options, format, format_options, accumulator_size)

この DynamicFrameGlueContext クラス から指定された接続タイプの DataSink (オブジェクト) を取得し、この DynamicFrame のコンテンツの書式設定および書き込みに使用します。指定されたとおりに書式設定され、書き込まれる新しい DynamicFrame を返します。

  • connection_type – 使用する接続タイプ。有効な値には、s3mysqlpostgresqlredshiftsqlserver、および oracle があります。

  • connection_options – 使用する接続オプション (オプション)。s3connection_type では、Amazon S3 パスが定義されています。

    connection_options = {"path": "s3://aws-glue-target/temp"}

    JDBC 接続の場合、いくつかのプロパティを定義する必要があります。データベース名は URL の一部である必要があることに注意してください。オプションで接続オプションに含めることができます。

    connection_options = {"url": "jdbc-url/database", "user": "username", "password": "password","dbtable": "table-name", "redshiftTmpDir": "s3-tempdir-path"}
  • format – 形式の仕様 (オプション)。これは、複数の形式をサポートする Amazon Simple Storage Service (Amazon S3) または AWS Glue 接続に使用します。サポートされる形式については、「AWS Glue での ETL 入力および出力の形式オプション」を参照してください。

  • format_options – 指定した形式の形式オプション。サポートされる形式については、「AWS Glue での ETL 入力および出力の形式オプション」を参照してください。

  • accumulator_size – 使用する累積サイズ (オプション)。

 — エラー —

assertErrorThreshold

assertErrorThreshold( ) – この DynamicFrame を作成した変換のエラーのアサーション。基盤になる DataFrame から Exception を返します。

errorsAsDynamicFrame

errorsAsDynamicFrame( ) – 内部にネストされたエラーレコードを持つ DynamicFrame を返します。

errorsCount

errorsCount( )DynamicFrame 内のエラーの総数を返します。

stageErrorsCount

stageErrorsCount – この DynamicFrame を生成するプロセスで発生したエラーの数を返します。