DynamicFrame クラス - AWS Glue

DynamicFrame クラス

Apache Spark の主要な抽象化の 1 つは SparkSQL DataFrame で、これは R と Pandas にある DataFrame 構造に似ています。DataFrame はテーブルと似ており、機能スタイル (マップ/リデュース/フィルター/その他) 操作と SQL 操作 (選択、プロジェクト、集計) をサポートしています。

DataFrames は、強力で広く使用されていますが、抽出、変換、ロード (ETL) 操作に関しては制限があります。最も重要なのは、データをロードする前にスキーマを指定する必要があることです。SparkSQL は、データに対してパスを 2 つ作ることでこれを解決します。この 1 つ目はスキーマの推定を行い、2 つ目はデータをロードします。ただし、この推測は限定されており、実際の煩雑なデータには対応しません。例えば、同じフィールドが異なるレコードの異なるタイプである可能性があります。Apache Spark は、多くの場合、作業を中断して、元のフィールドテキストを使用してタイプを string として報告します。これは正しくない可能性があり、スキーマの不一致を解決する方法を細かくコントロールする必要があるかもしれません。また、大規模なデータセットの場合、ソースデータに対する追加パスが非常に高価になる可能性があります。

これらの制限に対応するために、AWS Glue では DynamicFrame を導入しています。DynamicFrame は、DataFrame と似ていますが、各レコードが自己記述できるため、最初はスキーマは必要ありません。代わりに、AWS Glue は必要に応じてオンザフライでスキーマを計算し、選択 (または共用) タイプを使用してスキーマの不一致を明示的にエンコードします。これらの不整合を解決して、固定スキーマを必要とするデータストアとデータセットを互換性のあるものにできます。

同様に、DynamicRecordDynamicFrame 内の論理レコードを表します。これは、Spark DataFrame の行と似ていますが、自己記述型であり、固定スキーマに適合しないデータに使用できます。

スキーマの不一致を解決したら、DynamicFramesDataFrames との間で変換することができます。

 — Construction —

__init__

__init__(jdf, glue_ctx, name)

  • jdf - Java 仮想マシン (JVM) 内のデータフレームへの参照。

  • glue_ctx - A GlueContext クラス オブジェクト。

  • name - オプションの名前文字列。デフォルトでは空。

fromDF

fromDF(dataframe, glue_ctx, name)

DataFrame フィールドを DynamicRecord に変換することにより、DataFrameDynamicFrame に変換します。新しい DynamicFrame を返します。

DynamicRecordDynamicFrame 内の論理レコードを表します。これは、自己記述型であり、固定スキーマに適合しないデータに使用できる点を除いて、Spark DataFrame の行に似ています。

  • dataframe - 変換する Apache Spark SQL DataFrame (必須)。

  • glue_ctx - この変換のコンテキストを指定する GlueContext クラス オブジェクト (必須)。

  • name – 結果の DynamicFrame の名前 (必須)。

toDF

toDF(options)

DynamicRecordsDataFrame フィールドに変換することにより、DynamicFrame を Apache Spark DataFrame に変換します。新しい DataFrame を返します。

DynamicRecordDynamicFrame 内の論理レコードを表します。これは、自己記述型であり、固定スキーマに適合しないデータに使用できる点を除いて、Spark DataFrame の行に似ています。

  • options - オプションのリスト。ProjectCast アクションタイプを選択した場合、ターゲットのタイプを指定します。次に例を示します。

    >>>toDF([ResolveOption("a.b.c", "KeepAsStruct")]) >>>toDF([ResolveOption("a.b.c", "Project", DoubleType())])

 — Information —

count( ) - 基盤となる DataFrame の行数を返します。

schema

schema( ) - この DynamicFrame のスキーマを返します。使用できない場合は、基盤となる DataFrame のスキーマを返します。

printSchema

printSchema( ) - 基盤となる DataFrame のスキーマを表示します。

show

show(num_rows) - 基盤となる DataFrame から、指定された行数を表示します。

repartition

repartition(numPartitions)numPartitions 個のパーティションを含む新しい DynamicFrame を返します。

coalesce

coalesce(numPartitions)numPartitions 個のパーティションを含む新しい DynamicFrame を返します。

 — Transforms —

apply_mapping

apply_mapping(mappings, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

宣言型のマッピングをこの DynamicFrame に適用し、それらのマッピングが適用された新しい DynamicFrame を返します。

  • mappings - マッピングタプルのリストです。それぞれがソース列、ソースタイプ、ターゲット列、ターゲットタイプで構成されます。必須。

  • transformation_ctx - 状態情報を識別するために使用される一意の文字列 (オプション)。

  • info - この変換のエラー報告に関連付ける文字列 (オプション)。

  • stageThreshold - この変換中にプロセスで発生するエラーの数 (オプション: デフォルトではゼロ、プロセスがエラーを出力しないことを示します)。

  • totalThreshold - この変換までに発生したエラーのうち、プロセスでエラーとなるエラーの数 (オプション: デフォルトではゼロ、プロセスがエラーを出力しないことを示します)。

drop_fields

drop_fields(paths, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

FlatMap クラス 変換を呼び出して、DynamicFrame からフィールドを削除します。指定されたフィールドが削除された新しい DynamicFrame を返します。

  • paths - それぞれが削除するフィールドノードへのフルパスを含む文字列のリスト。

  • transformation_ctx - 状態情報を識別するために使用される一意の文字列 (オプション)。

  • info - この変換のエラー報告に関連付ける文字列 (オプション)。

  • stageThreshold - この変換中にプロセスで発生するエラーの数 (オプション: デフォルトではゼロ、プロセスがエラーを出力しないことを示します)。

  • totalThreshold - この変換までに発生したエラーのうち、プロセスでエラーとなるエラーの数 (オプション: デフォルトではゼロ、プロセスがエラーを出力しないことを示します)。

フィルター

filter(f, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

指定された述語関数 f を満たす入力 DynamicFrame 内のすべての DynamicRecords を選択することで構築された、新しい DynamicFrame を返します。

  • f - DynamicFrame に適用する述語関数。この関数は DynamicRecord を引数として取り、DynamicRecord がフィルター要件を満たす場合は True を返し、そうでない場合は False を返します (必須)。

    DynamicRecordDynamicFrame 内の論理レコードを表します。これは、自己記述型であり、固定スキーマに適合しないデータに使用できる点を除いて、Spark DataFrame の行に似ています。

  • transformation_ctx - 状態情報を識別するために使用される一意の文字列 (オプション)。

  • info - この変換のエラー報告に関連付ける文字列 (オプション)。

  • stageThreshold - この変換中にプロセスで発生するエラーの数 (オプション: デフォルトではゼロ、プロセスがエラーを出力しないことを示します)。

  • totalThreshold - この変換までに発生したエラーのうち、プロセスでエラーとなるエラーの数 (オプション: デフォルトではゼロ、プロセスがエラーを出力しないことを示します)。

filter 変換の使用方法の例については、「フィルタクラス」を参照してください。

join

join(paths1, paths2, frame2, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

別の DynamicFrame と等価結合を実行し、結果の DynamicFrame を返します。

  • paths1 - 結合するこのフレームのキーのリスト。

  • paths2 - 結合する別のフレームのキーのリスト。

  • frame2 - 結合する他の DynamicFrame

  • transformation_ctx - 状態情報を識別するために使用される一意の文字列 (オプション)。

  • info - この変換のエラー報告に関連付ける文字列 (オプション)。

  • stageThreshold - この変換中にプロセスで発生するエラーの数 (オプション: デフォルトではゼロ、プロセスがエラーを出力しないことを示します)。

  • totalThreshold - この変換までに発生したエラーのうち、プロセスでエラーとなるエラーの数 (オプション: デフォルトではゼロ、プロセスがエラーを出力しないことを示します)。

map

map(f, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

指定したマッピング関数を元の DynamicFrame のすべてのレコードに適用した結果の新しい DynamicFrame を返します。

  • f - DynamicFrame 内のすべてのレコードに適用されるマッピング関数。この関数は、DynamicRecord を引数として取り、新しい DynamicRecord を返す必要があります (必須)。

    DynamicRecordDynamicFrame 内の論理レコードを表します。これは、自己記述型であり、固定スキーマに適合しないデータに使用できる点を除いて、Apache Spark DataFrame の行に似ています。

  • transformation_ctx - 状態情報を識別するために使用される一意の文字列 (オプション)。

  • info - 変換のエラーに関連付けられた文字列 (オプション)。

  • stageThreshold - エラーが発生する前に変換で発生する可能性のあるエラーの最大数 (オプション、デフォルト値は 0)。

  • totalThreshold - 処理がエラーを出す前に全体的に発生する可能性のあるエラーの最大数 (オプション、デフォルト値は 0)。

map 変換の使用方法の例については、「マップクラス」を参照してください。

mergeDynamicFrame

mergeDynamicFrame(stage_dynamic_frame, primary_keys, transformation_ctx = "", options = {}, info = "", stageThreshold = 0, totalThreshold = 0)

レコードを識別するために、この DynamicFrame を指定されたプライマリキーに基づくステージング DynamicFrame とマージします。重複レコード(同じプライマリキーを持つレコード)は重複除外されません。ステージングフレームに一致するレコードがない場合、すべてのレコード(重複を含む)がソースから保持されます。ステージングフレームに一致するレコードがある場合、ステージングフレームのレコードによって、AWS Glue のソースのレコードが上書きされます。

  • stage_dynamic_frame – マージするステージング DynamicFrame

  • primary_keys – ソースおよびステージング動的フレームからのレコードを照合する、プライマリキーフィールドのリスト。

  • transformation_ctx – 現在の変換に関するメタデータを取得するために使用される一意の文字列 (オプション)。

  • options – この変換に関する追加情報を提供する、JSON の名前と値のペアを示す文字列。この引数は現在使用されていません。

  • infoString。この変換でのエラーに関連付けられる任意の文字列。

  • stageThresholdLong。指定された変換で処理がエラーアウトする必要があるエラーの数。

  • totalThresholdLong。この変換までで処理がエラーアウトする必要があるエラーの合計数。

この DynamicFrame をステージング DynamicFrame とマージして取得した新しい DynamicFrame を返します。

以下の場合、返される DynamicFrame にはレコード A が含まれます。

  1. A がソースフレームとステージングフレームの両方に存在する場合、ステージングフレームの A が返されます。

  2. A がソーステーブルにあり、A.primaryKeysstagingDynamicFrame にない場合(これは、ステージングテーブルで A が更新されていないことを意味します)。

ソースフレームとステージングフレームが、同じスキーマを持つ必要はありません。

merged_frame = source_frame.mergeDynamicFrame(stage_frame, ["id"])

関係付け

relationalize(root_table_name, staging_path, options, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

ネストされた列をネスト解除し、配列の列をピボットすることによって生成されるフレームのリストを生成することにより、DynamicFrame を関係付けます。ピボットされた配列の列は、フェーズのネスト解除時に生成された結合キーを使用してルートテーブルに結合できます。

  • root_table_name - ルートテーブルの名前。

  • staging_path – ピボットテーブルのパーティションを CSV 形式で保存する先を示すパス (オプション)。ピボットされたテーブルはこのパスから読み取ります。

  • options - オプションのパラメータのディクショナリ。

  • transformation_ctx - 状態情報を識別するために使用される一意の文字列 (オプション)。

  • info - この変換のエラー報告に関連付ける文字列 (オプション)。

  • stageThreshold - この変換中にプロセスで発生するエラーの数 (オプション: デフォルトではゼロ、プロセスがエラーを出力しないことを示します)。

  • totalThreshold - この変換までに発生したエラーのうち、プロセスでエラーとなるエラーの数 (オプション: デフォルトではゼロ、プロセスがエラーを出力しないことを示します)。

rename_field

rename_field(oldName, newName, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

この DynamicFrame のフィールドの名前を変更し、フィールドの名前が変更された新しい DynamicFrame を返します。

  • oldName - 名前を変更するノードへのフルパス。

    古い名前にドットが含まれている場合、RenameField はバックティック (`) で囲まなければ機能しません。例えば、this.old.namethisNewName に置き換えるには、rename_field を次のように呼び出します。

    newDyF = oldDyF.rename_field("`this.old.name`", "thisNewName")
  • newName - 完全パスとしての新しい名前。

  • transformation_ctx - 状態情報を識別するために使用される一意の文字列 (オプション)。

  • info - この変換のエラー報告に関連付ける文字列 (オプション)。

  • stageThreshold - この変換中にプロセスで発生するエラーの数 (オプション: デフォルトではゼロ、プロセスがエラーを出力しないことを示します)。

  • totalThreshold - この変換までに発生したエラーのうち、プロセスでエラーとなるエラーの数 (オプション: デフォルトではゼロ、プロセスがエラーを出力しないことを示します)。

resolveChoice

resolveChoice(specs = None, choice = "" , database = None , table_name = None , transformation_ctx="", info="", stageThreshold=0, totalThreshold=0, catalog_id = None)

この DynamicFrame 内で選択タイプを解決し、新しい DynamicFrame を返します。

  • specs – それぞれがタプルの形式である、解決すべき特定の曖昧要素のリスト: (field_path, action)

    resolveChoice を使用するには 2 つの方法があります。最初の方法では、specs 引数により、特定のフィールドのシーケンスと解決方法を指定します。resolveChoice のもう 1 つのモードでは、choice 引数を使用して、すべての ChoiceTypes に対して単一の解決策を指定します。

    specs の値は、(field_path, action) ペアで構成されたタプルとして指定されます。field_path 値は特定のあいまいな要素を識別し、action 値は対応する解決を識別します。以下のアクションを指定できます。

    • cast:type – すべての値について、指定した型へのキャストを試みます。例えば、cast:int です。

    • make_cols – それぞれの異なるタイプを columnName_type という名前の列に変換します。データを平坦化することで潜在的なあいまいさを解消します。例えば、columnAint または string の場合、解決策は、作成された DynamicFramecolumnA_int および columnA_string という名前の 2 つの列を生成することです。

    • make_structstruct を使用してデータを表現することで、潜在的なあいまいさを解決します。例えば、列のデータが int または string の場合、make_struct アクションを使用すると、作成された DynamicFrame に、それぞれが int および string の両方を含む構造体の列が生成されます。

    • project:type – 可能なデータ型の 1 つにすべてのデータを投影することで、潜在的なあいまいさを解消します。例えば、列のデータが int または string の場合、project:string アクションを使用すると、すべての int 値が文字列に変換されている、作成された DynamicFrame に列が生成されます。

    field_path で配列を識別する場合は、あいまいさを避けるために配列名の後に空の角括弧を置きます。例えば、使用しているデータが次のように構造化されているとします。

    "myList": [ { "price": 100.00 }, { "price": "$100.00" } ]

    文字列バージョンではなく、数値バージョンの料金を使用する場合は、field_path"myList[].price" に設定し、action"cast:double" に設定できます。

    注記

    specs パラメータおよび choice パラメータのうち 1 つのみを使用できます。specs パラメータが None ではない場合、choice パラメータは空の文字列である必要があります。逆に、choice が空の文字列ではない場合、specs パラメータは None である必要があります。

  • choice – すべての ChoiceTypes について、単一の解決方法を指定します。このモードは、実行前に ChoiceTypes の完全なリストが不明な場合に使用できます。この引数では、上記の specs 用のアクションに加えて、以下のアクションもサポートされています。

    • match_catalog – 各 ChoiceType について、指定した Data Catalog テーブル内の対応する型へのキャストを試みます。

  • databasematch_catalog アクションで使用する Data Catalog データベース。

  • table_namematch_catalog アクションで使用する Data Catalog テーブル。

  • transformation_ctx - 状態情報を識別するために使用される一意の文字列 (オプション)。

  • info - この変換のエラー報告に関連付ける文字列 (オプション)。

  • stageThreshold - この変換中にプロセスで発生するエラーの数 (オプション: デフォルトではゼロ、プロセスがエラーを出力しないことを示します)。

  • totalThreshold - この変換までに発生したエラーのうち、プロセスでエラーとなるエラーの数 (オプション: デフォルトではゼロ、プロセスがエラーを出力しないことを示します)。

  • catalog_id – 現在アクセスされている Data Catalog のカタログ ID (Data Catalog のアカウント ID)。None を設定した場合(デフォルト値) では、呼び出し元アカウントのカタログ ID が使用されます。

df1 = df.resolveChoice(choice = "make_cols") df2 = df.resolveChoice(specs = [("myList[].price", "make_struct"), ("columnA", "cast:double")])

select_fields

select_fields(paths, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

選択したフィールドを含む新しい DynamicFrame を返します。

  • paths – 文字列のリスト。各文字列は選択する最上位ノードへのパスです。

  • transformation_ctx - 状態情報を識別するために使用される一意の文字列 (オプション)。

  • info - この変換のエラー報告に関連付ける文字列 (オプション)。

  • stageThreshold - この変換中にプロセスで発生するエラーの数 (オプション: デフォルトではゼロ、プロセスがエラーを出力しないことを示します)。

  • totalThreshold - この変換までに発生したエラーのうち、プロセスでエラーとなるエラーの数 (オプション: デフォルトではゼロ、プロセスがエラーを出力しないことを示します)。

スピゴット

spigot(path, options={})

変換中にサンプルレコードを指定した場所に書き込み、追加の書き込みステップで入力 DynamicFrame を返します。

  • path - 書き込み先へのパス (必須)。

  • options - オプションを指定するキーと値のペア (オプション)。"topk" オプションは、最初の k レコードを書き込むことを指定します。"prob" オプションは、書き込むレコードを選択するために使用される、指定されたレコードを選択する可能性を (小数として) 指定します。

  • transformation_ctx - 状態情報を識別するために使用される一意の文字列 (オプション)。

split_fields

split_fields(paths, name1, name2, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

2 つの DynamicFrames を含む新しい DynamicFrameCollection を返します。1 つ目は分割されたすべてのノードを含み、2 つ目には残っているノードが含まれます。

  • paths - 文字列のリスト。各文字列は新しい DynamicFrame に分割するノードのフルパスです。

  • name1 - 分割された DynamicFrame の名前文字列。

  • name2 - 指定されたノードが分割された後に残る DynamicFrame の名前文字列。

  • transformation_ctx - 状態情報を識別するために使用される一意の文字列 (オプション)。

  • info - この変換のエラー報告に関連付ける文字列 (オプション)。

  • stageThreshold - この変換中にプロセスで発生するエラーの数 (オプション: デフォルトではゼロ、プロセスがエラーを出力しないことを示します)。

  • totalThreshold - この変換までに発生したエラーのうち、プロセスでエラーとなるエラーの数 (オプション: デフォルトではゼロ、プロセスがエラーを出力しないことを示します)。

split_rows

DynamicFrame の 1 つ以上の行を、新しい DynamicFrame に分割します。

split_rows(comparison_dict, name1, name2, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

2 つの DynamicFrames を含む新しい DynamicFrameCollection を返します。1 つ目には分割されたすべての行が入り、2 つ目には残りの行が入ります。

  • comparison_dict - キーにより列へのパスを示し、値により、列値を比較する値に対しコンパレータをマッピングするための、別のディクショナリを示しているディクショナリ。例えば、{"age": {">": 10, "<": 20}} は、age 列の値が 10 より大きく 20 より小さいすべての行を分割します。

  • name1 - 分割された DynamicFrame の名前文字列。

  • name2 - 指定されたノードが分割された後に残る DynamicFrame の名前文字列。

  • transformation_ctx - 状態情報を識別するために使用される一意の文字列 (オプション)。

  • info - この変換のエラー報告に関連付ける文字列 (オプション)。

  • stageThreshold - この変換中にプロセスで発生するエラーの数 (オプション: デフォルトではゼロ、プロセスがエラーを出力しないことを示します)。

  • totalThreshold - この変換までに発生したエラーのうち、プロセスでエラーとなるエラーの数 (オプション: デフォルトではゼロ、プロセスがエラーを出力しないことを示します)。

アンボックス

unbox(path, format, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0, **options)

DynamicFrame の文字列フィールドをアンボックスし、アンボックスされた DynamicRecords を含む DynamicFrame を返します。

DynamicRecordDynamicFrame 内の論理レコードを表します。これは、自己記述型であり、固定スキーマに適合しないデータに使用できる点を除いて、Apache Spark DataFrame の行に似ています。

  • path - アンボックスする文字列ノードへのフルパス。

  • format – 形式の仕様 (オプション)。これは、Amazon Simple Storage Service (Amazon S3)、または複数の形式をサポートする AWS Glue 接続で使用されます。サポートされる形式については、「AWS Glue での ETL 入力および出力の形式オプション」を参照してください。

  • transformation_ctx - 状態情報を識別するために使用される一意の文字列 (オプション)。

  • info - この変換のエラー報告に関連付ける文字列 (オプション)。

  • stageThreshold - この変換中にプロセスで発生するエラーの数 (オプション: デフォルトではゼロ、プロセスがエラーを出力しないことを示します)。

  • totalThreshold - この変換までに発生したエラーのうち、プロセスでエラーとなるエラーの数 (オプション: デフォルトではゼロ、プロセスがエラーを出力しないことを示します)。

  • options - 次の 1 つ以上。

    • separator - 区切り文字を含む文字列。

    • escaper - エスケープ文字を含む文字列。

    • skipFirst - 最初のインスタンスをスキップするかどうかを示すブール値。

    • withSchema - スキーマを含む文字列。StructType.json( ) を使用して呼び出す必要があります。

    • withHeader - ヘッダーが含まれているかどうかを示すブール値。

例: unbox("a.b.c", "csv", separator="|")

ネスト解除

DynamicFrame 内のネストされたオブジェクトをネスト解除して、最上位レベルのオブジェクトにし、新しいネストされていない DynamicFrame を返します。

unnest(transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

DynamicFrame 内のネストされたオブジェクトをネスト解除して、最上位レベルのオブジェクトにし、新しいネストされていない DynamicFrame を返します。

  • transformation_ctx - 状態情報を識別するために使用される一意の文字列 (オプション)。

  • info - この変換のエラー報告に関連付ける文字列 (オプション)。

  • stageThreshold - この変換中にプロセスで発生するエラーの数 (オプション: デフォルトではゼロ、プロセスがエラーを出力しないことを示します)。

  • totalThreshold - この変換までに発生したエラーのうち、プロセスでエラーとなるエラーの数 (オプション: デフォルトではゼロ、プロセスがエラーを出力しないことを示します)。

例: unnest( )

unnest_ddb_json

特に DynamoDB JSON 構造体内の DynamicFrame の入れ子になっている列を展開し、新しく展開された DynamicFrame を返します。構造体型の配列のある列は、入れ子の展開が行われません。これは、通常の unnest 変換とは異なり、特殊なタイプの変換処理です。変換時に、DynamoDB JSON 構造体にデータが格納されている必要があることに注意してください。  詳細については、「DynamoDB JSON」を参照してください。

unnest_ddb_json(transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

  • transformation_ctx - 状態情報を識別するために使用される一意の文字列 (オプション)。

  • info - この変換のエラー報告に関連付ける文字列 (オプション)。

  • stageThreshold - この変換中にプロセスで発生するエラーの数 (オプション: デフォルトではゼロ、プロセスがエラーを出力しないことを示します)。

  • totalThreshold - この変換までに発生したエラーのうち、プロセスでエラーとなるエラーの数 (オプション: デフォルトではゼロ、プロセスがエラーを出力しないことを示します)。

例えば、DynamoDB JSON 構造体のあるエクスポートを読み取るスキーマは次のようになります。

root |-- Item: struct | |-- ColA: struct | | |-- S: string | |-- ColB: struct | | |-- S: string | |-- ColC: struct | | |-- N: string | |-- ColD: struct | | |-- L: array | | | |-- element: null

unnest_ddb_json() 変換は、以下のように変換します。

root |-- ColA: string |-- ColB: string |-- ColC: string |-- ColD: array | |-- element: null

次のコード例は、AWS Glue DynamoDB エクスポートコネクタを使用し、DynamoDB JSON unnest を呼び出し、パーティションの数を表示します。

import sys from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job from awsglue.utils import getResolvedOptions args = getResolvedOptions(sys.argv, ["JOB_NAME"]) glue_context= GlueContext(SparkContext.getOrCreate()) job = Job(glue_context) job.init(args["JOB_NAME"], args) dyf = glue_context.create_dynamic_frame.from_options( connection_type="dynamodb", connection_options={ "dynamodb.export": "ddb", "dynamodb.tableArn": "<test_source>", "dynamodb.s3.bucket": "<bucket name>", "dynamodb.s3.prefix": "<bucket prefix>", "dynamodb.s3.bucketOwner": "<account_id>", } ) unnested = dynamicFrame.unnest_ddb_json() print(unnested.getNumPartitions()) job.commit()

書き込み

write(connection_type, connection_options, format, format_options, accumulator_size)

この DynamicFrameGlueContext クラス から指定された接続タイプの DataSink (オブジェクト) を取得し、この DynamicFrame のコンテンツの書式設定および書き込みに使用します。指定されたとおりに書式設定され、書き込まれる新しい DynamicFrame を返します。

  • connection_type - 使用する接続タイプ。有効な値には、s3mysqlpostgresqlredshiftsqlserver、および oracle があります。

  • connection_options - 使用する接続オプション (オプション)。s3connection_type では、Amazon S3 パスが定義されています。

    connection_options = {"path": "s3://aws-glue-target/temp"}

    JDBC 接続の場合、いくつかのプロパティを定義する必要があります。データベース名は URL の一部である必要があることに注意してください。オプションで接続オプションに含めることができます。

    connection_options = {"url": "jdbc-url/database", "user": "username", "password": "password","dbtable": "table-name", "redshiftTmpDir": "s3-tempdir-path"}
  • format – 形式の仕様 (オプション)。これは、Amazon Simple Storage Service (Amazon S3)、または複数の形式をサポートする AWS Glue 接続で使用されます。サポートされる形式については、「AWS Glue での ETL 入力および出力の形式オプション」を参照してください。

  • format_options – 指定された形式についてのオプション。サポートされる形式については、「AWS Glue での ETL 入力および出力の形式オプション」を参照してください。

  • accumulator_size - 使用する累積サイズ (オプション)。

 — Errors —

assertErrorThreshold

assertErrorThreshold( ) - この DynamicFrame を作成した変換エラーに対するアサーション。基盤になる DataFrame から Exception を返します。

errorsAsDynamicFrame

errorsAsDynamicFrame( ) - 内部にネストされたエラーレコードを持つ DynamicFrame を返します。

errorsCount

errorsCount( ) - DynamicFrame 内のエラーの総数を返します。

stageErrorsCount

stageErrorsCount - この DynamicFrame を生成するプロセスで発生したエラーの数を返します。