翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
AWS Glue Scala DynamicFrame クラス
パッケージ: com.amazonaws.services.glue
class DynamicFrame extends Serializable with Logging (
val glueContext : GlueContext,
_records : RDD[DynamicRecord],
val name : String = s"",
val transformationContext : String = DynamicFrame.UNDEFINED,
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0,
prevErrors : => Long = 0,
errorExpr : => Unit = {} )
DynamicFrame
は、自己記述型の DynamicRecord オブジェクトの分散コレクションです。
DynamicFrame
は、ETL (抽出、変換、ロード) オペレーションの柔軟なデータモデルを提供するように設計されています。これらのオブジェクトを作成するのにスキーマは必要なく、乱雑または不整合な値や型を持つデータの読み取りと変換に使用できます。スキーマは、スキーマを必要とするオペレーションでオンデマンドで計算できます。
DynamicFrame
は、データクリーニングと ETL 用の広範な変換を提供します。また、既存のコードと統合するための SparkSQL DataFrames との相互変換や、DataFrames が提供する多くの分析オペレーションをサポートしています。
以下のパラメータは、DynamicFrame
を生成する AWS Glue 変換の多くで共有されます。
transformationContext
- このDynamicFrame
の識別子。実行間で保持されるジョブのブックマーク状態のキーとして、transformationContext
が使用されます。callSite
– エラーレポートのコンテキスト情報を提供します。これらの値は、Python から呼び出すときに、自動的に設定されます。stageThreshold
- このDynamicFrame
の計算から例外がスローされるまでのエラーレコードの最大許容数。以前のDynamicFrame
にあるレコードは除きます。totalThreshold
- 例外がスローされるまでの合計エラーレコードの最大数。以前のフレームのレコードも含みます。
Val errorsCount
val errorsCount
この DynamicFrame
のエラーレコードの数。以前のオペレーションのエラーも含みます。
Def applyMapping
def applyMapping( mappings : Seq[Product4[String, String, String, String]],
caseSensitive : Boolean = true,
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
mappings
- 新しいDynamicFrame
を生成するための一連のマッピング。caseSensitive
– ソース列で大文字と小文字を区別して扱うかどうかを指定。このパラメータを false に設定すると、AWS Glue Data Catalog のように大文字と小文字を区別しないストアと統合する場合に役立ちます。
マッピングのシーケンスに基づく選択列、プロジェクト列、およびキャスト列。
各マッピングは、ソース列/タイプとターゲット列/タイプで構成されます。マッピングは、4 タプル (source_path
、source_type
、 target_path
、target_type
) として指定するか、同じ情報を含む MappingSpec オブジェクトとして指定できます。
マッピングは、シンプルなプロジェクションやキャストに使用できるだけなく、パスのコンポーネントを ''.
" (ピリオド) で区切ることでフィールドのネストまたはネスト解除にも使用できます。
例えば、以下のスキーマを含む DynamicFrame
があるとします。
{{{
root
|-- name: string
|-- age: int
|-- address: struct
| |-- state: string
| |-- zip: int
}}}
以下の呼び出しを行って、state
フィールドと zip
フィールドをネスト解除できます。
{{{
df.applyMapping(
Seq(("name", "string", "name", "string"),
("age", "int", "age", "int"),
("address.state", "string", "state", "string"),
("address.zip", "int", "zip", "int")))
}}}
その結果、スキーマは以下のようになります。
{{{
root
|-- name: string
|-- age: int
|-- state: string
|-- zip: int
}}}
applyMapping
を使用して列を再ネストすることもできます。以下の例では、前の変換を反転し、ターゲットに address
という名前の構造体を作成します。
{{{
df.applyMapping(
Seq(("name", "string", "name", "string"),
("age", "int", "age", "int"),
("state", "string", "address.state", "string"),
("zip", "int", "address.zip", "int")))
}}}
".
" (ピリオド) 文字を含むフィールド名は、バッククォート (``
) で囲むことで使用できます。
注記
現在、applyMapping
メソッドを使用して、配列の下にネストされた列をマップすることはできません。
Def assertErrorThreshold
def assertErrorThreshold : Unit
このアクションでは、計算を適用し、エラーレコード数が stageThreshold
と totalThreshold
を下回っていることを確認します。いずれかの条件が失敗すると、例外をスローします。
Def count
lazy
def count
この DynamicFrame
の要素数を返します。
Def dropField
def dropField( path : String,
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
指定した列が削除された新しい DynamicFrame
を返します。
Def dropFields
def dropFields( fieldNames : Seq[String], // The column names to drop.
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
指定した列が削除された新しい DynamicFrame
を返します。
このメソッドを使用して、ネストされた列 (配列内の列など) を削除できますが、特定の配列要素を削除することはできません。
Def dropNulls
def dropNulls( transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0 )
すべての NULL 列が削除された新しい DynamicFrame
を返します。
注記
NullType
タイプの列のみが削除されます。他の列にある個別の null 値は削除または変更されません。
Def errorsAsDynamicFrame
def errorsAsDynamicFrame
この DynamicFrame
からのエラーレコードを含む新しい DynamicFrame
を返します。
Def filter
def filter( f : DynamicRecord => Boolean,
errorMsg : String = "",
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided"),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
"f
" 関数が true
を返すレコードのみを含む新しい DynamicFrame
を生成します。フィルター関数 "f
" は、入力レコードを変更しないものとします。
Def getName
def getName : String
この DynamicFrame
の名前を返します。
Def getNumPartitions
def getNumPartitions
この DynamicFrame
のパーティション数を返します。
Def getSchemaIfComputed
def getSchemaIfComputed : Option[Schema]
計算済みのスキーマを返します。スキーマが計算済みでない場合は、データをスキャンしません。
Def isSchemaComputed
def isSchemaComputed : Boolean
この DynamicFrame
に対してスキーマが計算された場合は true
を返し、それ以外の場合は false
を返します。このメソッドが false を返した場合、schema
メソッドを呼び出すには、この DynamicFrame
のレコードを再び渡す必要があります。
Def javaToPython
def javaToPython : JavaRDD[Array[Byte]]
Def join
def join( keys1 : Seq[String],
keys2 : Seq[String],
frame2 : DynamicFrame,
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
keys1
- 結合に使用するこのDynamicFrame
の列。keys2
– 結合に使用するframe2
の列。keys1
と同じ長さにする必要があります。frame2
- 結合の対象になるDynamicFrame
。
指定したキーを使用して frame2
との等結合を行った結果を返します。
Def map
def map( f : DynamicRecord => DynamicRecord,
errorMsg : String = "",
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
指定した関数 "f
" をこの DynamicFrame
の各レコードに適用することで生成された新しい DynamicFrame
を返します。
このメソッドは、指定した関数を適用する前に各レコードをコピーするため、レコードを安全に変更できます。特定のレコードでマッピング関数から例外がスローされた場合、そのレコードはエラーとしてマークされ、スタックトレースがエラーレコードの列として保存されます。
Def mergeDynamicFrames
def mergeDynamicFrames( stageDynamicFrame: DynamicFrame, primaryKeys: Seq[String], transformationContext: String = "",
options: JsonOptions = JsonOptions.empty, callSite: CallSite = CallSite("Not provided"),
stageThreshold: Long = 0, totalThreshold: Long = 0): DynamicFrame
stageDynamicFrame
– マージするステージングDynamicFrame
。primaryKeys
– ソースおよびステージングDynamicFrame
からのレコードを照合するプライマリキーフィールドのリスト。transformationContext
– 現在の変換に関するメタデータを取得するために使用される一意の文字列 (オプション)。options
– この変換に関する追加情報を提供する JSON の名前と値のペアを示す文字列。callSite
– エラーレポートのコンテキスト情報を提供するために使用します。stageThreshold
— ALong
。指定された変換で処理がエラーアウトする必要があるエラーの数。totalThreshold
— ALong
。この変換までで処理がエラーアウトする必要があるエラーの合計数。
レコードを識別するために、この DynamicFrame
を指定されたプライマリキーに基づくステージング DynamicFrame
とマージします。重複レコード(同じプライマリキーを持つレコード)は重複除外されません。ステージングフレームに一致するレコードがない場合、すべてのレコード(重複を含む)がソースから保持されます。ステージングフレームに一致するレコードがある場合、ステージングフレームのレコードによって、AWS Glue のソースのレコードが上書きされます。
以下の場合、返される DynamicFrame
にはレコード A が含まれます。
A
がソースフレームとステージングフレームの両方に存在する場合、ステージングフレームのA
が返されます。A
がソーステーブルにあり、A.primaryKeys
がstagingDynamicFrame
にない場合(これは、ステージングテーブルでA
が更新されていないことを意味します)。
ソースフレームとステージングフレームが、同じスキーマを持つ必要はありません。
val mergedFrame: DynamicFrame = srcFrame.mergeDynamicFrames(stageFrame, Seq("id1", "id2"))
Def printSchema
def printSchema : Unit
この DynamicFrame
のスキーマを、人間が判読できる形式で stdout
に出力します。
Def recomputeSchema
def recomputeSchema : Schema
スキーマの再計算を強制します。これには、データのスキャンが必要ですが、現在のスキーマの一部のフィールドがデータにない場合、スキーマが "強化" される場合があります。
再計算されたスキーマを返します。
Def relationalize
def relationalize( rootTableName : String,
stagingPath : String,
options : JsonOptions = JsonOptions.empty,
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided"),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : Seq[DynamicFrame]
rootTableName
– 出力の基本DynamicFrame
に使用する名前。ピボット配列によって作成されるDynamicFrame
は、この名前をプレフィックスとして使用します。stagingPath
– 中間データを書き込む Amazon Simple Storage Service (Amazon S3) のパス。options
– Relationalize のオプションと設定。現在使用されていません。
すべてのネストされた構造をフラット化し、配列を個別のテーブルにピボットします。
このオペレーションでは、リレーショナルデータベースに取り込むための深くネストされたデータを準備できます。ネストされた構造体は、ネスト解除 変換と同じ方法でフラット化されます。さらに、配列は個別のテーブルにピボットされ、各配列要素が行になります。例えば、以下のデータを含む DynamicFrame
があるとします。
{"name": "Nancy", "age": 47, "friends": ["Fred", "Lakshmi"]}
{"name": "Stephanie", "age": 28, "friends": ["Yao", "Phil", "Alvin"]}
{"name": "Nathan", "age": 54, "friends": ["Nicolai", "Karen"]}
以下のコードを実行します。
{{{ df.relationalize("people", "s3:/my_bucket/my_path", JsonOptions.empty) }}}
これにより、2 つのテーブルが生成されます。最初のテーブルは "people" という名前で、内容は以下のとおりです。
{{{ {"name": "Nancy", "age": 47, "friends": 1} {"name": "Stephanie", "age": 28, "friends": 2} {"name": "Nathan", "age": 54, "friends": 3) }}}
ここで、友人の配列は自動生成された結合キーに置き換えられています。別のテーブルは people.friends
という名前で、以下の内容で作成されます。
{{{ {"id": 1, "index": 0, "val": "Fred"} {"id": 1, "index": 1, "val": "Lakshmi"} {"id": 2, "index": 0, "val": "Yao"} {"id": 2, "index": 1, "val": "Phil"} {"id": 2, "index": 2, "val": "Alvin"} {"id": 3, "index": 0, "val": "Nicolai"} {"id": 3, "index": 1, "val": "Karen"} }}}
このテーブルで、"id
" は配列要素の元のレコードを識別する結合キーです。"index
" は元の配列内の位置を参照します。"val
" は実際の配列エントリです。
relationalize
メソッドは、このプロセスをすべての配列に再帰的に適用することで作成した DynamicFrame
のシーケンスを返します。
注記
AWS Glue ライブラリは、新しいテーブルの結合キーを自動的に生成します。ジョブの実行間で結合キーが必ず一意になるように、ジョブのブックマークを有効にする必要があります。
Def renameField
def renameField( oldName : String,
newName : String,
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
oldName
– 列の元の名前。newName
– 列の新しい名前。
指定したフィールドの名前が変更された新しい DynamicFrame
を返します。
このメソッドを使用して、ネストされたフィールドの名前を変更できます。例えば、以下のコードはアドレス構造体内の state
の名前を state_code
に変更します。
{{{ df.renameField("address.state", "address.state_code") }}}
Def repartition
def repartition( numPartitions : Int,
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
numPartitions
パーティションを含む新しい DynamicFrame
を返します。
Def resolveChoice
def resolveChoice( specs : Seq[Product2[String, String]] = Seq.empty[ResolveSpec],
choiceOption : Option[ChoiceOption] = None,
database : Option[String] = None,
tableName : Option[String] = None,
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
choiceOption
- 仕様シーケンスに列挙されていないChoiceType
列に適用するアクション。database
–match_catalog
アクションで使用する Data Catalog データベース。tableName
–match_catalog
アクションで使用する Data Catalog テーブル。
1 つ以上の ChoiceType
をより限定されたタイプに置き換えて新しい DynamicFrame
を返します。
resolveChoice
を使用するには 2 つの方法があります。最初の方法では、特定の列のシーケンスと解決方法を指定します。これらは (列、アクション) ペアで構成されたタプルとして指定します。
以下のアクションを指定できます。
cast:type
– すべての値を指定した型にキャストしようとします。make_cols
– それぞれの異なるタイプをcolumnName_type
という名前の列に変換します。make_struct
– 列を各区別型のキーを持つ構造体に変換します。project:type
- 指定した型の値のみを保持します。
resolveChoice
のもう 1 つのモードでは、すべての ChoiceType
に対して単一の解決策を指定します。このモードは、実行前に ChoiceType
の完全なリストが不明な場合に使用できます。このモードでは、上記のアクションに加えて、以下のアクションもサポートされています。
match_catalog
ChoiceType
– 指定したカタログテーブルの対応するタイプへの各 のキャストを試行します。
例:
user.id
列を解決するために int にキャストし、address
フィールドで構造体のみを保持します。
{{{ df.resolveChoice(specs = Seq(("user.id", "cast:int"), ("address", "project:struct"))) }}}
すべての ChoiceType
を解決するために、各選択肢を別個の列に変換します。
{{{ df.resolveChoice(choiceOption = Some(ChoiceOption("make_cols"))) }}}
すべての ChoiceType
を解決するために、指定したカタログテーブルのタイプにキャストします。
{{{ df.resolveChoice(choiceOption = Some(ChoiceOption("match_catalog")), database = Some("my_database"), tableName = Some("my_table")) }}}
Def schema
def schema : Schema
この DynamicFrame
のスキーマを返します。
返されたスキーマには必ず、この DynamicFrame
のレコードにあるすべてのフィールドが含まれます。しかし、その他のフィールドが含まれる場合がまれにあります。ネスト解除 メソッドを使用して、この DynamicFrame
のレコードに基づいてスキーマを "強化" できます。
Def selectField
def selectField( fieldName : String,
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
単一のフィールドを DynamicFrame
として返します。
Def selectFields
def selectFields( paths : Seq[String],
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
paths
- 選択する列名のシーケンス。
指定した列を含む新しい DynamicFrame
を返します。
注記
selectFields
メソッドでは、最上位の列のみを選択できます。applyMapping メソッドでは、ネストされた列を選択できます。
Def show
def show( numRows : Int = 20 ) : Unit
numRows
- 出力する行の数。
この DynamicFrame
の行を JSON 形式で出力します。
Def simplifyDDBJson
DynamoDB は AWS Glue DynamoDB エクスポートコネクタを使用してエクスポートし、特殊なネスト構造の JSON ファイルが生成されます。詳細については、「データオブジェクト」を参照してください。simplifyDDBJson
このタイプのデータの DynamicFrame にあるネスト化された列を単純化し、新しい単純化された DynamicFrame を返します。リストタイプに複数タイプまたは Map タイプが含まれている場合、リストの要素は単純化されません。このメソッドは、DynamoDB エクスポート JSON 形式のデータのみをサポートします。unnest
が他の種類のデータにも同様の変更を加えることを考慮してください。
def simplifyDDBJson() : DynamicFrame
このメソッドにはパラメータはありません。
入力例
DynamoDB エクスポートによって生成された次のスキーマを考えてみましょう。
root |-- Item: struct | |-- parentMap: struct | | |-- M: struct | | | |-- childMap: struct | | | | |-- M: struct | | | | | |-- appName: struct | | | | | | |-- S: string | | | | | |-- packageName: struct | | | | | | |-- S: string | | | | | |-- updatedAt: struct | | | | | | |-- N: string | |-- strings: struct | | |-- SS: array | | | |-- element: string | |-- numbers: struct | | |-- NS: array | | | |-- element: string | |-- binaries: struct | | |-- BS: array | | | |-- element: string | |-- isDDBJson: struct | | |-- BOOL: boolean | |-- nullValue: struct | | |-- NULL: boolean
コードの例
import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.DynamoDbDataSink import org.apache.spark.SparkContextimport scala.collection.JavaConverters._ object GlueApp { def main(sysArgs: Array[String]): Unit = { val glueContext = new GlueContext(SparkContext.getOrCreate()) val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) val dynamicFrame = glueContext.getSourceWithFormat( connectionType = "dynamodb", options = JsonOptions(Map( "dynamodb.export" -> "ddb", "dynamodb.tableArn" -> "
ddbTableARN
", "dynamodb.s3.bucket" -> "exportBucketLocation
", "dynamodb.s3.prefix" -> "exportBucketPrefix
", "dynamodb.s3.bucketOwner" -> "exportBucketAccountID
", )) ).getDynamicFrame() val simplified = dynamicFrame.simplifyDDBJson() simplified.printSchema() Job.commit() } }
この simplifyDDBJson
変換により、以下のように簡略化されます。
root |-- parentMap: struct | |-- childMap: struct | | |-- appName: string | | |-- packageName: string | | |-- updatedAt: string |-- strings: array | |-- element: string |-- numbers: array | |-- element: string |-- binaries: array | |-- element: string |-- isDDBJson: boolean |-- nullValue: null
Def spigot
def spigot( path : String,
options : JsonOptions = new JsonOptions("{}"),
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided"),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
同じレコードを返すが、副作用としてレコードのサブセットを書き出すパススルー変換。
path
– 出力をs3://bucket//path
形式で書き込む先の Amazon S3 のパス。options
- サンプリング動作を記述するオプションのJsonOptions
マップ。
このレコードと同じレコードを含む DynamicFrame
を返します。
デフォルトでは、path
で指定した場所に任意の 100 レコードを書き込みます。この動作は、options
マップを使用してカスタマイズできます。有効なキーは以下のとおりです。
topk
– 書き出されるレコードの総数を指定します。デフォルトは 100 です。prob
– 各レコードを含める確率を (小数として) 指定します。デフォルトは1です。
例えば、以下の呼び出しでは、データセットをサンプリングするために、20 パーセントの確率で各レコードを選択し、200 レコードを書き出した後で停止します。
{{{ df.spigot("s3://my_bucket/my_path", JsonOptions(Map("topk" -> 200, "prob" -> 0.2))) }}}
Def splitFields
def splitFields( paths : Seq[String],
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : Seq[DynamicFrame]
paths
- 最初のDynamicFrame
に含めるパス。
2 つの DynamicFrame
のシーケンスを返します。DynamicFrame
の 1 つ目には指定したパスが含まれ、2 つ目には他のすべての列が含まれます。
例
この例では、persons
Glue Data Catalog の legislators
データベースの AWS テーブルから作成した DynamicFrame を 2 つに分割し、指定したフィールドを 1 つ目の DynamicFrame に、残りのフィールドを 2 つ目の DynamicFrame に配置しています。次に、結果から最初の DynamicFrame を選択します。
val InputFrame = glueContext.getCatalogSource(database="legislators", tableName="persons", transformationContext="InputFrame").getDynamicFrame() val SplitField_collection = InputFrame.splitFields(paths=Seq("family_name", "name", "links.note", "links.url", "gender", "image", "identifiers.scheme", "identifiers.identifier", "other_names.lang", "other_names.note", "other_names.name"), transformationContext="SplitField_collection") val ResultFrame = SplitField_collection(0)
Def splitRows
def splitRows( paths : Seq[String],
values : Seq[Any],
operators : Seq[String],
transformationContext : String,
callSite : CallSite,
stageThreshold : Long,
totalThreshold : Long
) : Seq[DynamicFrame]
列と定数を比較する述語に基づいて行を分割します。
paths
- 比較に使用する列。values
- 比較に使用する定数値。operators
- 比較に使用する演算子。
2 つの DynamicFrame
のシーケンスを返します。1 つ目には述語が true の行を含め、2 つ目には述語が false の行を含めます。
述語を指定するには 3 つのシーケンスを使用します。"paths
" には、ネストされている可能性がある列の名前を含め、"'values
' には、比較する定数値を含め、"operators
" には、比較に使用する演算子を含めます。3 つすべてのシーケンスを同じ長さにする必要があります。n
演算子では、n
番目の列を n
番目の値と比較します。
各演算子は "!=
"、"=
"、"<=
"、"<
"、">=
"、">
" のいずれかにする必要があります。
以下の呼び出しの例では、DynamicFrame
を分割し、1 つ目の出力フレームには米国の 65 才を超える人々のレコード、2 つ目には他のすべてのレコードを含めています。
{{{ df.splitRows(Seq("age", "address.country"), Seq(65, "USA"), Seq(">=", "=")) }}}
Def stageErrorsCount
def stageErrorsCount
この DynamicFrame
の計算中に生じたエラーレコードの数を返します。この DynamicFrame
に入力として渡した以前のオペレーションのエラーは含まれません。
Def toDF
def toDF( specs : Seq[ResolveSpec] = Seq.empty[ResolveSpec] ) : DataFrame
この DynamicFrame
を、同じスキーマとレコードを含む Apache Spark SQL DataFrame
に変換します。
注記
DataFrame
は ChoiceType
をサポートしていないため、このメソッドは ChoiceType
列を StructType
に自動的に変換します。選択肢の解決の詳細とオプションについては、「resolveChoice」を参照してください。
Def unbox
def unbox( path : String,
format : String,
optionString : String = "{}",
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided"),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
path
– 解析する列。文字列またはバイナリにする必要があります。format
– 解析に使用する形式。optionString
– 形式に渡すオプション (CSV 区切り記号など)。
指定した形式に従って、埋め込まれた文字列またはバイナリ列を解析します。解析された列は、元の列名で構造体の下にネストされます。
例えば、JSON 列が埋め込まれた CSV ファイルがあるとします。
name, age, address Sally, 36, {"state": "NE", "city": "Omaha"} ...
最初の解析の後、以下のスキーマを含む DynamicFrame
を取得します。
{{{ root |-- name: string |-- age: int |-- address: string }}}
アドレス列で unbox
を呼び出して、特定のコンポーネントを解析できます。
{{{ df.unbox("address", "json") }}}
これにより、以下のスキーマを含む DynamicFrame
を取得します。
{{{ root |-- name: string |-- age: int |-- address: struct | |-- state: string | |-- city: string }}}
Def unnest
def unnest( transformationContext : String = "",
callSite : CallSite = CallSite("Not Provided"),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
すべてのネストされた構造体が平坦化された新しい DynamicFrame
を返します。名前は ".
" (ピリオド) 文字を使用して生成されます。
例えば、以下のスキーマを含む DynamicFrame
があるとします。
{{{ root |-- name: string |-- age: int |-- address: struct | |-- state: string | |-- city: string }}}
次の呼び出しでアドレス構造体がネスト解除されます。
{{{ df.unnest() }}}
その結果、スキーマは以下のようになります。
{{{ root |-- name: string |-- age: int |-- address.state: string |-- address.city: string }}}
このメソッドはまた、配列内でネストされた構造体をネスト解除しません。しかしこれまでの慣習上、そのようなフィールドの名前の前には、外側の配列の名前と ".val
" が付けられます。
Def unnestDDBJson
unnestDDBJson(transformationContext : String = "",
callSite : CallSite = CallSite("Not Provided"),
stageThreshold : Long = 0,
totalThreshold : Long = 0): DynamicFrame
特に DynamoDB JSON 構造体内にある DynamicFrame
でネスト化された列をネスト解除すると、新しくネスト解除された DynamicFrame
が返されます。構造体型の配列のある列は、ネスト解除されません。これは、通常の unnest
変換とは異なる特殊なタイプのネスト解除変換で、データが DynamoDB JSON 構造体に格納されている必要があることに注意してください。詳細については、「DynamoDB JSON」を参照してください。
例えば、DynamoDB JSON 構造体のあるエクスポートを読み取るスキーマは次のようになります。
root |-- Item: struct | |-- ColA: struct | | |-- S: string | |-- ColB: struct | | |-- S: string | |-- ColC: struct | | |-- N: string | |-- ColD: struct | | |-- L: array | | | |-- element: null
unnestDDBJson()
変換は、以下のように変換します。
root |-- ColA: string |-- ColB: string |-- ColC: string |-- ColD: array | |-- element: null
次のコード例は、AWS Glue DynamoDB エクスポートコネクタを使用し、DynamoDB JSON unnest を呼び出し、パーティションの数を表示します。
import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.DynamoDbDataSink import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object GlueApp { def main(sysArgs: Array[String]): Unit = { val glueContext = new GlueContext(SparkContext.getOrCreate()) val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) val dynamicFrame = glueContext.getSourceWithFormat( connectionType = "dynamodb", options = JsonOptions(Map( "dynamodb.export" -> "ddb", "dynamodb.tableArn" -> "<test_source>", "dynamodb.s3.bucket" -> "<bucket name>", "dynamodb.s3.prefix" -> "<bucket prefix>", "dynamodb.s3.bucketOwner" -> "<account_id of bucket>", )) ).getDynamicFrame() val unnested = dynamicFrame.unnestDDBJson() print(unnested.getNumPartitions()) Job.commit() } }
Def withFrameSchema
def withFrameSchema( getSchema : () => Schema ) : DynamicFrame
getSchema
– 使用するスキーマを返す関数。高価である可能性が高い計算を延期するために、パラメータがゼロである関数として指定します。
この DynamicFrame
のスキーマを、指定した値に設定します。高価なスキーマの再計算を回避するために主に内部で使用されます。渡すスキーマには、データ内に存在するすべての列を含める必要があります。
Def withName
def withName( name : String ) : DynamicFrame
name
- 使用する新しい名前。
この DynamicFrame
のコピーを新しい名前で返します。
Def withTransformationContext
def withTransformationContext( ctx : String ) : DynamicFrame
この DynamicFrame
のコピーを、指定した変換コンテキストで返します。