AWS Glue Scala DynamicFrame クラス - AWS Glue

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

AWS Glue Scala DynamicFrame クラス

パッケージ: com.amazonaws.services.glue

class DynamicFrame extends Serializable with Logging ( val glueContext : GlueContext, _records : RDD[DynamicRecord], val name : String = s"", val transformationContext : String = DynamicFrame.UNDEFINED, callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0, prevErrors : => Long = 0, errorExpr : => Unit = {} )

DynamicFrame は、自己記述型の DynamicRecord オブジェクトの分散コレクションです。

DynamicFrame は、ETL (抽出、変換、ロード) オペレーションの柔軟なデータモデルを提供するように設計されています。これらのオブジェクトを作成するのにスキーマは必要なく、乱雑または不整合な値や型を持つデータの読み取りと変換に使用できます。スキーマは、スキーマを必要とするオペレーションでオンデマンドで計算できます。

DynamicFrame は、データクリーニングと ETL 用の広範な変換を提供します。また、既存のコードと統合するための SparkSQL DataFrames との相互変換や、DataFrames が提供する多くの分析オペレーションをサポートしています。

以下のパラメータは、DynamicFrame を生成する AWS Glue 変換の多くで共有されます。

  • transformationContext - この DynamicFrame の識別子。実行間で保持されるジョブのブックマーク状態のキーとして、transformationContext が使用されます。

  • callSite – エラーレポートのコンテキスト情報を提供します。これらの値は、Python から呼び出すときに、自動的に設定されます。

  • stageThreshold - この DynamicFrame の計算から例外がスローされるまでのエラーレコードの最大許容数。以前の DynamicFrame にあるレコードは除きます。

  • totalThreshold - 例外がスローされるまでの合計エラーレコードの最大数。以前のフレームのレコードも含みます。

Val errorsCount

val errorsCount

この DynamicFrame のエラーレコードの数。以前のオペレーションのエラーも含みます。

Def applyMapping

def applyMapping( mappings : Seq[Product4[String, String, String, String]], caseSensitive : Boolean = true, transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame
  • mappings - 新しい DynamicFrame を生成するための一連のマッピング。

  • caseSensitive – ソース列で大文字と小文字を区別して扱うかどうかを指定。このパラメータを false に設定すると、AWS Glue Data Catalog のように大文字と小文字を区別しないストアと統合する場合に役立ちます。

マッピングのシーケンスに基づく選択列、プロジェクト列、およびキャスト列。

各マッピングは、ソース列/タイプとターゲット列/タイプで構成されます。マッピングは、4 タプル (source_pathsource_type target_pathtarget_type) として指定するか、同じ情報を含む MappingSpec オブジェクトとして指定できます。

マッピングは、シンプルなプロジェクションやキャストに使用できるだけなく、パスのコンポーネントを ''." (ピリオド) で区切ることでフィールドのネストまたはネスト解除にも使用できます。

例えば、以下のスキーマを含む DynamicFrame があるとします。

{{{ root |-- name: string |-- age: int |-- address: struct | |-- state: string | |-- zip: int }}}

以下の呼び出しを行って、state フィールドと zip フィールドをネスト解除できます。

{{{ df.applyMapping( Seq(("name", "string", "name", "string"), ("age", "int", "age", "int"), ("address.state", "string", "state", "string"), ("address.zip", "int", "zip", "int"))) }}}

その結果、スキーマは以下のようになります。

{{{ root |-- name: string |-- age: int |-- state: string |-- zip: int }}}

applyMapping を使用して列を再ネストすることもできます。以下の例では、前の変換を反転し、ターゲットに address という名前の構造体を作成します。

{{{ df.applyMapping( Seq(("name", "string", "name", "string"), ("age", "int", "age", "int"), ("state", "string", "address.state", "string"), ("zip", "int", "address.zip", "int"))) }}}

"." (ピリオド) 文字を含むフィールド名は、バッククォート (``) で囲むことで使用できます。

注記

現在、applyMapping メソッドを使用して、配列の下にネストされた列をマップすることはできません。

Def assertErrorThreshold

def assertErrorThreshold : Unit

このアクションでは、計算を適用し、エラーレコード数が stageThresholdtotalThreshold を下回っていることを確認します。いずれかの条件が失敗すると、例外をスローします。

Def count

lazy def count

この DynamicFrame の要素数を返します。

Def dropField

def dropField( path : String, transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame

指定した列が削除された新しい DynamicFrame を返します。

Def dropFields

def dropFields( fieldNames : Seq[String], // The column names to drop. transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame

指定した列が削除された新しい DynamicFrame を返します。

このメソッドを使用して、ネストされた列 (配列内の列など) を削除できますが、特定の配列要素を削除することはできません。

Def dropNulls

def dropNulls( transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 )

すべての NULL 列が削除された新しい DynamicFrame を返します。

注記

NullType タイプの列のみが削除されます。他の列にある個別の null 値は削除または変更されません。

Def errorsAsDynamicFrame

def errorsAsDynamicFrame

この DynamicFrame からのエラーレコードを含む新しい DynamicFrame を返します。

Def filter

def filter( f : DynamicRecord => Boolean, errorMsg : String = "", transformationContext : String = "", callSite : CallSite = CallSite("Not provided"), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame

"f" 関数が true を返すレコードのみを含む新しい DynamicFrame を生成します。フィルター関数 "f" は、入力レコードを変更しないものとします。

Def getName

def getName : String

この DynamicFrame の名前を返します。

Def getNumPartitions

def getNumPartitions

この DynamicFrame のパーティション数を返します。

Def getSchemaIfComputed

def getSchemaIfComputed : Option[Schema]

計算済みのスキーマを返します。スキーマが計算済みでない場合は、データをスキャンしません。

Def isSchemaComputed

def isSchemaComputed : Boolean

この DynamicFrame に対してスキーマが計算された場合は true を返し、それ以外の場合は false を返します。このメソッドが false を返した場合、schema メソッドを呼び出すには、この DynamicFrame のレコードを再び渡す必要があります。

Def javaToPython

def javaToPython : JavaRDD[Array[Byte]]

Def join

def join( keys1 : Seq[String], keys2 : Seq[String], frame2 : DynamicFrame, transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame
  • keys1 - 結合に使用するこの DynamicFrame の列。

  • keys2 – 結合に使用する frame2 の列。keys1 と同じ長さにする必要があります。

  • frame2 - 結合の対象になる DynamicFrame

指定したキーを使用して frame2 との等結合を行った結果を返します。

Def map

def map( f : DynamicRecord => DynamicRecord, errorMsg : String = "", transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame

指定した関数 "f" をこの DynamicFrame の各レコードに適用することで生成された新しい DynamicFrame を返します。

このメソッドは、指定した関数を適用する前に各レコードをコピーするため、レコードを安全に変更できます。特定のレコードでマッピング関数から例外がスローされた場合、そのレコードはエラーとしてマークされ、スタックトレースがエラーレコードの列として保存されます。

Def mergeDynamicFrames

def mergeDynamicFrames( stageDynamicFrame: DynamicFrame, primaryKeys: Seq[String], transformationContext: String = "", options: JsonOptions = JsonOptions.empty, callSite: CallSite = CallSite("Not provided"), stageThreshold: Long = 0, totalThreshold: Long = 0): DynamicFrame
  • stageDynamicFrame – マージするステージング DynamicFrame

  • primaryKeys – ソースおよびステージング DynamicFrame からのレコードを照合するプライマリキーフィールドのリスト。

  • transformationContext – 現在の変換に関するメタデータを取得するために使用される一意の文字列 (オプション)。

  • options – この変換に関する追加情報を提供する JSON の名前と値のペアを示す文字列。

  • callSite – エラーレポートのコンテキスト情報を提供するために使用します。

  • stageThreshold — A Long。指定された変換で処理がエラーアウトする必要があるエラーの数。

  • totalThreshold — A Long。この変換までで処理がエラーアウトする必要があるエラーの合計数。

レコードを識別するために、この DynamicFrame を指定されたプライマリキーに基づくステージング DynamicFrame とマージします。重複レコード(同じプライマリキーを持つレコード)は重複除外されません。ステージングフレームに一致するレコードがない場合、すべてのレコード(重複を含む)がソースから保持されます。ステージングフレームに一致するレコードがある場合、ステージングフレームのレコードによって、AWS Glue のソースのレコードが上書きされます。

以下の場合、返される DynamicFrame にはレコード A が含まれます。

  1. A がソースフレームとステージングフレームの両方に存在する場合、ステージングフレームの A が返されます。

  2. A がソーステーブルにあり、A.primaryKeysstagingDynamicFrame にない場合(これは、ステージングテーブルで A が更新されていないことを意味します)。

ソースフレームとステージングフレームが、同じスキーマを持つ必要はありません。

val mergedFrame: DynamicFrame = srcFrame.mergeDynamicFrames(stageFrame, Seq("id1", "id2"))

Def printSchema

def printSchema : Unit

この DynamicFrame のスキーマを、人間が判読できる形式で stdout に出力します。

Def recomputeSchema

def recomputeSchema : Schema

スキーマの再計算を強制します。これには、データのスキャンが必要ですが、現在のスキーマの一部のフィールドがデータにない場合、スキーマが "強化" される場合があります。

再計算されたスキーマを返します。

Def relationalize

def relationalize( rootTableName : String, stagingPath : String, options : JsonOptions = JsonOptions.empty, transformationContext : String = "", callSite : CallSite = CallSite("Not provided"), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : Seq[DynamicFrame]
  • rootTableName – 出力の基本 DynamicFrame に使用する名前。ピボット配列によって作成される DynamicFrame は、この名前をプレフィックスとして使用します。

  • stagingPath – 中間データを書き込む Amazon Simple Storage Service (Amazon S3) のパス。

  • options – Relationalize のオプションと設定。現在使用されていません。

すべてのネストされた構造をフラット化し、配列を個別のテーブルにピボットします。

このオペレーションでは、リレーショナルデータベースに取り込むための深くネストされたデータを準備できます。ネストされた構造体は、ネスト解除 変換と同じ方法でフラット化されます。さらに、配列は個別のテーブルにピボットされ、各配列要素が行になります。例えば、以下のデータを含む DynamicFrame があるとします。

{"name": "Nancy", "age": 47, "friends": ["Fred", "Lakshmi"]} {"name": "Stephanie", "age": 28, "friends": ["Yao", "Phil", "Alvin"]} {"name": "Nathan", "age": 54, "friends": ["Nicolai", "Karen"]}

以下のコードを実行します。

{{{ df.relationalize("people", "s3:/my_bucket/my_path", JsonOptions.empty) }}}

これにより、2 つのテーブルが生成されます。最初のテーブルは "people" という名前で、内容は以下のとおりです。

{{{ {"name": "Nancy", "age": 47, "friends": 1} {"name": "Stephanie", "age": 28, "friends": 2} {"name": "Nathan", "age": 54, "friends": 3) }}}

ここで、友人の配列は自動生成された結合キーに置き換えられています。別のテーブルは people.friends という名前で、以下の内容で作成されます。

{{{ {"id": 1, "index": 0, "val": "Fred"} {"id": 1, "index": 1, "val": "Lakshmi"} {"id": 2, "index": 0, "val": "Yao"} {"id": 2, "index": 1, "val": "Phil"} {"id": 2, "index": 2, "val": "Alvin"} {"id": 3, "index": 0, "val": "Nicolai"} {"id": 3, "index": 1, "val": "Karen"} }}}

このテーブルで、"id" は配列要素の元のレコードを識別する結合キーです。"index" は元の配列内の位置を参照します。"val" は実際の配列エントリです。

relationalize メソッドは、このプロセスをすべての配列に再帰的に適用することで作成した DynamicFrame のシーケンスを返します。

注記

AWS Glue ライブラリは、新しいテーブルの結合キーを自動的に生成します。ジョブの実行間で結合キーが必ず一意になるように、ジョブのブックマークを有効にする必要があります。

Def renameField

def renameField( oldName : String, newName : String, transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame
  • oldName – 列の元の名前。

  • newName – 列の新しい名前。

指定したフィールドの名前が変更された新しい DynamicFrame を返します。

このメソッドを使用して、ネストされたフィールドの名前を変更できます。例えば、以下のコードはアドレス構造体内の state の名前を state_code に変更します。

{{{ df.renameField("address.state", "address.state_code") }}}

Def repartition

def repartition( numPartitions : Int, transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame

numPartitions パーティションを含む新しい DynamicFrame を返します。

Def resolveChoice

def resolveChoice( specs : Seq[Product2[String, String]] = Seq.empty[ResolveSpec], choiceOption : Option[ChoiceOption] = None, database : Option[String] = None, tableName : Option[String] = None, transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame
  • choiceOption - 仕様シーケンスに列挙されていない ChoiceType 列に適用するアクション。

  • databasematch_catalog アクションで使用する Data Catalog データベース。

  • tableNamematch_catalog アクションで使用する Data Catalog テーブル。

1 つ以上の ChoiceType をより限定されたタイプに置き換えて新しい DynamicFrame を返します。

resolveChoice を使用するには 2 つの方法があります。最初の方法では、特定の列のシーケンスと解決方法を指定します。これらは (列、アクション) ペアで構成されたタプルとして指定します。

以下のアクションを指定できます。

  • cast:type – すべての値を指定した型にキャストしようとします。

  • make_cols – それぞれの異なるタイプを columnName_type という名前の列に変換します。

  • make_struct – 列を各区別型のキーを持つ構造体に変換します。

  • project:type - 指定した型の値のみを保持します。

resolveChoice のもう 1 つのモードでは、すべての ChoiceType に対して単一の解決策を指定します。このモードは、実行前に ChoiceType の完全なリストが不明な場合に使用できます。このモードでは、上記のアクションに加えて、以下のアクションもサポートされています。

  • match_catalogChoiceType – 指定したカタログテーブルの対応するタイプへの各 のキャストを試行します。

例:

user.id 列を解決するために int にキャストし、address フィールドで構造体のみを保持します。

{{{ df.resolveChoice(specs = Seq(("user.id", "cast:int"), ("address", "project:struct"))) }}}

すべての ChoiceType を解決するために、各選択肢を別個の列に変換します。

{{{ df.resolveChoice(choiceOption = Some(ChoiceOption("make_cols"))) }}}

すべての ChoiceType を解決するために、指定したカタログテーブルのタイプにキャストします。

{{{ df.resolveChoice(choiceOption = Some(ChoiceOption("match_catalog")), database = Some("my_database"), tableName = Some("my_table")) }}}

Def schema

def schema : Schema

この DynamicFrame のスキーマを返します。

返されたスキーマには必ず、この DynamicFrame のレコードにあるすべてのフィールドが含まれます。しかし、その他のフィールドが含まれる場合がまれにあります。ネスト解除 メソッドを使用して、この DynamicFrame のレコードに基づいてスキーマを "強化" できます。

Def selectField

def selectField( fieldName : String, transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame

単一のフィールドを DynamicFrame として返します。

Def selectFields

def selectFields( paths : Seq[String], transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame
  • paths - 選択する列名のシーケンス。

指定した列を含む新しい DynamicFrame を返します。

注記

selectFields メソッドでは、最上位の列のみを選択できます。applyMapping メソッドでは、ネストされた列を選択できます。

Def show

def show( numRows : Int = 20 ) : Unit
  • numRows - 出力する行の数。

この DynamicFrame の行を JSON 形式で出力します。

Def simplifyDDBJson

DynamoDB は AWS Glue DynamoDB エクスポートコネクタを使用してエクスポートし、特殊なネスト構造の JSON ファイルが生成されます。詳細については、「データオブジェクト」を参照してください。simplifyDDBJsonこのタイプのデータの DynamicFrame にあるネスト化された列を単純化し、新しい単純化された DynamicFrame を返します。リストタイプに複数タイプまたは Map タイプが含まれている場合、リストの要素は単純化されません。このメソッドは、DynamoDB エクスポート JSON 形式のデータのみをサポートします。unnest が他の種類のデータにも同様の変更を加えることを考慮してください。

def simplifyDDBJson() : DynamicFrame

このメソッドにはパラメータはありません。

入力例

DynamoDB エクスポートによって生成された次のスキーマを考えてみましょう。

root |-- Item: struct | |-- parentMap: struct | | |-- M: struct | | | |-- childMap: struct | | | | |-- M: struct | | | | | |-- appName: struct | | | | | | |-- S: string | | | | | |-- packageName: struct | | | | | | |-- S: string | | | | | |-- updatedAt: struct | | | | | | |-- N: string | |-- strings: struct | | |-- SS: array | | | |-- element: string | |-- numbers: struct | | |-- NS: array | | | |-- element: string | |-- binaries: struct | | |-- BS: array | | | |-- element: string | |-- isDDBJson: struct | | |-- BOOL: boolean | |-- nullValue: struct | | |-- NULL: boolean

コードの例

import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.DynamoDbDataSink import org.apache.spark.SparkContextimport scala.collection.JavaConverters._ object GlueApp { def main(sysArgs: Array[String]): Unit = { val glueContext = new GlueContext(SparkContext.getOrCreate()) val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) val dynamicFrame = glueContext.getSourceWithFormat( connectionType = "dynamodb", options = JsonOptions(Map( "dynamodb.export" -> "ddb", "dynamodb.tableArn" -> "ddbTableARN", "dynamodb.s3.bucket" -> "exportBucketLocation", "dynamodb.s3.prefix" -> "exportBucketPrefix", "dynamodb.s3.bucketOwner" -> "exportBucketAccountID", )) ).getDynamicFrame() val simplified = dynamicFrame.simplifyDDBJson() simplified.printSchema() Job.commit() } }

この simplifyDDBJson 変換により、以下のように簡略化されます。

root |-- parentMap: struct | |-- childMap: struct | | |-- appName: string | | |-- packageName: string | | |-- updatedAt: string |-- strings: array | |-- element: string |-- numbers: array | |-- element: string |-- binaries: array | |-- element: string |-- isDDBJson: boolean |-- nullValue: null

Def spigot

def spigot( path : String, options : JsonOptions = new JsonOptions("{}"), transformationContext : String = "", callSite : CallSite = CallSite("Not provided"), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame

同じレコードを返すが、副作用としてレコードのサブセットを書き出すパススルー変換。

  • path – 出力を s3://bucket//path 形式で書き込む先の Amazon S3 のパス。

  • options - サンプリング動作を記述するオプションの JsonOptions マップ。

このレコードと同じレコードを含む DynamicFrame を返します。

デフォルトでは、path で指定した場所に任意の 100 レコードを書き込みます。この動作は、options マップを使用してカスタマイズできます。有効なキーは以下のとおりです。

  • topk – 書き出されるレコードの総数を指定します。デフォルトは 100 です。

  • prob – 各レコードを含める確率を (小数として) 指定します。デフォルトは1です。

例えば、以下の呼び出しでは、データセットをサンプリングするために、20 パーセントの確率で各レコードを選択し、200 レコードを書き出した後で停止します。

{{{ df.spigot("s3://my_bucket/my_path", JsonOptions(Map("topk" -> 200, "prob" -> 0.2))) }}}

Def splitFields

def splitFields( paths : Seq[String], transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : Seq[DynamicFrame]
  • paths - 最初の DynamicFrame に含めるパス。

2 つの DynamicFrame のシーケンスを返します。DynamicFrame の 1 つ目には指定したパスが含まれ、2 つ目には他のすべての列が含まれます。

この例では、persons Glue Data Catalog の legislators データベースの AWS テーブルから作成した DynamicFrame を 2 つに分割し、指定したフィールドを 1 つ目の DynamicFrame に、残りのフィールドを 2 つ目の DynamicFrame に配置しています。次に、結果から最初の DynamicFrame を選択します。

val InputFrame = glueContext.getCatalogSource(database="legislators", tableName="persons", transformationContext="InputFrame").getDynamicFrame() val SplitField_collection = InputFrame.splitFields(paths=Seq("family_name", "name", "links.note", "links.url", "gender", "image", "identifiers.scheme", "identifiers.identifier", "other_names.lang", "other_names.note", "other_names.name"), transformationContext="SplitField_collection") val ResultFrame = SplitField_collection(0)

Def splitRows

def splitRows( paths : Seq[String], values : Seq[Any], operators : Seq[String], transformationContext : String, callSite : CallSite, stageThreshold : Long, totalThreshold : Long ) : Seq[DynamicFrame]

列と定数を比較する述語に基づいて行を分割します。

  • paths - 比較に使用する列。

  • values - 比較に使用する定数値。

  • operators - 比較に使用する演算子。

2 つの DynamicFrame のシーケンスを返します。1 つ目には述語が true の行を含め、2 つ目には述語が false の行を含めます。

述語を指定するには 3 つのシーケンスを使用します。"paths" には、ネストされている可能性がある列の名前を含め、"'values' には、比較する定数値を含め、"operators" には、比較に使用する演算子を含めます。3 つすべてのシーケンスを同じ長さにする必要があります。n 演算子では、n 番目の列を n 番目の値と比較します。

各演算子は "!="、"="、"<="、"<"、">="、">" のいずれかにする必要があります。

以下の呼び出しの例では、DynamicFrame を分割し、1 つ目の出力フレームには米国の 65 才を超える人々のレコード、2 つ目には他のすべてのレコードを含めています。

{{{ df.splitRows(Seq("age", "address.country"), Seq(65, "USA"), Seq("&gt;=", "=")) }}}

Def stageErrorsCount

def stageErrorsCount

この DynamicFrame の計算中に生じたエラーレコードの数を返します。この DynamicFrame に入力として渡した以前のオペレーションのエラーは含まれません。

Def toDF

def toDF( specs : Seq[ResolveSpec] = Seq.empty[ResolveSpec] ) : DataFrame

この DynamicFrame を、同じスキーマとレコードを含む Apache Spark SQL DataFrame に変換します。

注記

DataFrameChoiceType をサポートしていないため、このメソッドは ChoiceType 列を StructType に自動的に変換します。選択肢の解決の詳細とオプションについては、「resolveChoice」を参照してください。

Def unbox

def unbox( path : String, format : String, optionString : String = "{}", transformationContext : String = "", callSite : CallSite = CallSite("Not provided"), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame
  • path – 解析する列。文字列またはバイナリにする必要があります。

  • format – 解析に使用する形式。

  • optionString – 形式に渡すオプション (CSV 区切り記号など)。

指定した形式に従って、埋め込まれた文字列またはバイナリ列を解析します。解析された列は、元の列名で構造体の下にネストされます。

例えば、JSON 列が埋め込まれた CSV ファイルがあるとします。

name, age, address Sally, 36, {"state": "NE", "city": "Omaha"} ...

最初の解析の後、以下のスキーマを含む DynamicFrame を取得します。

{{{ root |-- name: string |-- age: int |-- address: string }}}

アドレス列で unbox を呼び出して、特定のコンポーネントを解析できます。

{{{ df.unbox("address", "json") }}}

これにより、以下のスキーマを含む DynamicFrame を取得します。

{{{ root |-- name: string |-- age: int |-- address: struct | |-- state: string | |-- city: string }}}

Def unnest

def unnest( transformationContext : String = "", callSite : CallSite = CallSite("Not Provided"), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame

すべてのネストされた構造体が平坦化された新しい DynamicFrame を返します。名前は "." (ピリオド) 文字を使用して生成されます。

例えば、以下のスキーマを含む DynamicFrame があるとします。

{{{ root |-- name: string |-- age: int |-- address: struct | |-- state: string | |-- city: string }}}

次の呼び出しでアドレス構造体がネスト解除されます。

{{{ df.unnest() }}}

その結果、スキーマは以下のようになります。

{{{ root |-- name: string |-- age: int |-- address.state: string |-- address.city: string }}}

このメソッドはまた、配列内でネストされた構造体をネスト解除しません。しかしこれまでの慣習上、そのようなフィールドの名前の前には、外側の配列の名前と ".val" が付けられます。

Def unnestDDBJson

unnestDDBJson(transformationContext : String = "", callSite : CallSite = CallSite("Not Provided"), stageThreshold : Long = 0, totalThreshold : Long = 0): DynamicFrame

特に DynamoDB JSON 構造体内にある DynamicFrame でネスト化された列をネスト解除すると、新しくネスト解除された DynamicFrame が返されます。構造体型の配列のある列は、ネスト解除されません。これは、通常の unnest 変換とは異なる特殊なタイプのネスト解除変換で、データが DynamoDB JSON 構造体に格納されている必要があることに注意してください。詳細については、「DynamoDB JSON」を参照してください。

例えば、DynamoDB JSON 構造体のあるエクスポートを読み取るスキーマは次のようになります。

root |-- Item: struct | |-- ColA: struct | | |-- S: string | |-- ColB: struct | | |-- S: string | |-- ColC: struct | | |-- N: string | |-- ColD: struct | | |-- L: array | | | |-- element: null

unnestDDBJson() 変換は、以下のように変換します。

root |-- ColA: string |-- ColB: string |-- ColC: string |-- ColD: array | |-- element: null

次のコード例は、AWS Glue DynamoDB エクスポートコネクタを使用し、DynamoDB JSON unnest を呼び出し、パーティションの数を表示します。

import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.DynamoDbDataSink import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object GlueApp { def main(sysArgs: Array[String]): Unit = { val glueContext = new GlueContext(SparkContext.getOrCreate()) val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) val dynamicFrame = glueContext.getSourceWithFormat( connectionType = "dynamodb", options = JsonOptions(Map( "dynamodb.export" -> "ddb", "dynamodb.tableArn" -> "<test_source>", "dynamodb.s3.bucket" -> "<bucket name>", "dynamodb.s3.prefix" -> "<bucket prefix>", "dynamodb.s3.bucketOwner" -> "<account_id of bucket>", )) ).getDynamicFrame() val unnested = dynamicFrame.unnestDDBJson() print(unnested.getNumPartitions()) Job.commit() } }

Def withFrameSchema

def withFrameSchema( getSchema : () => Schema ) : DynamicFrame
  • getSchema – 使用するスキーマを返す関数。高価である可能性が高い計算を延期するために、パラメータがゼロである関数として指定します。

この DynamicFrame のスキーマを、指定した値に設定します。高価なスキーマの再計算を回避するために主に内部で使用されます。渡すスキーマには、データ内に存在するすべての列を含める必要があります。

Def withName

def withName( name : String ) : DynamicFrame
  • name - 使用する新しい名前。

この DynamicFrame のコピーを新しい名前で返します。

Def withTransformationContext

def withTransformationContext( ctx : String ) : DynamicFrame

この DynamicFrame のコピーを、指定した変換コンテキストで返します。