DynamoDB 接続  - AWS Glue

DynamoDB 接続 

Spark 用の AWS Glue を使用して AWS Glue の DynamoDB 内のテーブルに対する読み込みと書き込みを行うことができます。AWS Glue ジョブにアタッチされている IAM 権限を使用して DynamoDB に接続します。AWS Glue は、別の AWS アカウントの DynamoDB テーブルに対するデータの書き込みをサポートしています。詳細については、「DynamoDB テーブルへのクロスアカウントおよびクロスリージョンでのアクセス」を参照してください。

AWS Glue DynamoDB ETL コネクターに加えて、DynamoDB エクスポートコネクターを使用して DynamoDB から読み取ることができます。このコネクターは、DynamoDB ExportTableToPointInTime リクエストを呼び出し、これを指定した Amazon S3 の場所に DynamoDB JSON 形式で保存します。次に、AWS Glue は、Amazon S3 のエクスポート場所からデータを読み取ることによって DynamicFrame オブジェクトを作成します。

DynamoDB のライターは、AWS Glue バージョン 1.0 以降で利用可能です。AWS Glue DynamoDB のエクスポートコネクターは、AWS Glue バージョン 2.0 以降で利用可能です。

DynamoDB の詳細については、Amazon DynamoDB のドキュメントを参照してください。

注記

DynamoDB ETL リーダーでは、フィルタまたはプッシュダウン述語は使用できません。

DynamoDB 接続の設定

AWS Glue から DynamoDB に接続するには、AWS Glue ジョブに関連付けられた IAM ロールに DynamoDB と対話する権限を付与します。DynamoDB からの読み取りまたは書き込みに必要な権限の詳細については、「IAM ドキュメント」の DynamoDB のアクション、リソース、および条件キーを参照してください。

次の状況では、追加の設定が必要になる場合があります。 

  • DynamoDB エクスポートコネクターを使用するときは、ジョブが DynamoDB テーブルのエクスポートをリクエストできるように IAM を設定する必要があります。さらに、エクスポート用の Amazon S3 バケットを特定し、DynamoDB がそのバケットに書き込むための適切な権限と、AWS Glue ジョブがそこから読み取るための適切な権限を IAM に付与する必要があります。詳細については、「DynamoDB でテーブルのエクスポートをリクエストする」を参照してください。  

  • AWS Glue ジョブに特定の Amazon VPC 接続要件がある場合は、NETWORK AWS Glue 接続タイプを使用してネットワークオプションを指定します。DynamoDB へのアクセスは IAM によって承認されるため、AWS Glue DynamoDB 接続タイプは必要ありません。 

DynamoDB からの読み込みと書き込み

次のコード例では、DynamoDB テーブルからの読み取り (ETL コネクタ経由) と、DynamoDB のテーブルへの書き込み方法を示します。ここでは、あるテーブルから読み取りを行い、別のテーブルに対して書き込みを行っています。

Python
import sys from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job from awsglue.utils import getResolvedOptions args = getResolvedOptions(sys.argv, ["JOB_NAME"]) glue_context= GlueContext(SparkContext.getOrCreate()) job = Job(glue_context) job.init(args["JOB_NAME"], args) dyf = glue_context.create_dynamic_frame.from_options( connection_type="dynamodb", connection_options={"dynamodb.input.tableName": test_source, "dynamodb.throughput.read.percent": "1.0", "dynamodb.splits": "100" } ) print(dyf.getNumPartitions()) glue_context.write_dynamic_frame_from_options( frame=dyf, connection_type="dynamodb", connection_options={"dynamodb.output.tableName": test_sink, "dynamodb.throughput.write.percent": "1.0" } ) job.commit()
Scala
import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.DynamoDbDataSink import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object GlueApp { def main(sysArgs: Array[String]): Unit = { val glueContext = new GlueContext(SparkContext.getOrCreate()) val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) val dynamicFrame = glueContext.getSourceWithFormat( connectionType = "dynamodb", options = JsonOptions(Map( "dynamodb.input.tableName" -> test_source, "dynamodb.throughput.read.percent" -> "1.0", "dynamodb.splits" -> "100" )) ).getDynamicFrame() print(dynamicFrame.getNumPartitions()) val dynamoDbSink: DynamoDbDataSink = glueContext.getSinkWithFormat( connectionType = "dynamodb", options = JsonOptions(Map( "dynamodb.output.tableName" -> test_sink, "dynamodb.throughput.write.percent" -> "1.0" )) ).asInstanceOf[DynamoDbDataSink] dynamoDbSink.writeDynamicFrame(dynamicFrame) Job.commit() } }

DynamoDB エクスポートコネクタを使用する

DynamoDB テーブルサイズが 80 GB を超える場合、エクスポートコネクタは ETL コネクタよりもパフォーマンスが向上します。さらに、エクスポートリクエストが AWS Glue ジョブで Spark プロセスの外部で実行される場合、AWS Glue ジョブの自動スケーリングを有効にして、エクスポートリクエスト中の DPU 使用量を節約できます。エクスポートコネクタでは、Spark エグゼキューターの並列処理のためのスプリット数や、DynamoDB スループットの読み取り率を設定する必要がありません。

注記

DynamoDBには、ExportTableToPointInTime リクエストを呼び出すための特定の要件があります。詳細については、「DynamoDB でテーブルのエクスポートをリクエストする」を参照してください。例えば、このコネクタを使用するには、テーブルでポイントインタイムリストア (PITR) を有効にする必要があります。DynamoDB コネクタは、Amazon S3 への DynamoDB エクスポートの AWS KMS 暗号化もサポートします。AWS Glue ジョブの設定でセキュリティ設定を指定することで、DynamoDB エクスポートの AWS KMS 暗号化が有効になります。KMS キーは、Amazon S3 バケットと同じリージョンにある必要があります。

DynamoDB エクスポートと Amazon S3 ストレージのコストに追加料金が適用されることに注意してください。Amazon S3 でエクスポートされたデータは、ジョブ実行の終了後も保持されるため、DynamoDB を追加でエクスポートしなくても再利用できます。このコネクタを使用するための要件として、テーブルに対してポイントインタイムリカバリ (PITR) が有効になっていることが必要です。

DynamoDB ETL コネクタまたはエクスポートコネクタは、DynamoDB ソースに適用されるフィルタまたはプッシュダウン述語をサポートしていません。

次のコード例は、(エクスポートコネクタを経由して)読み取り、パーティションの数を出力する方法を示しています。

Python
import sys from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job from awsglue.utils import getResolvedOptions args = getResolvedOptions(sys.argv, ["JOB_NAME"]) glue_context= GlueContext(SparkContext.getOrCreate()) job = Job(glue_context) job.init(args["JOB_NAME"], args) dyf = glue_context.create_dynamic_frame.from_options( connection_type="dynamodb", connection_options={ "dynamodb.export": "ddb", "dynamodb.tableArn": test_source, "dynamodb.s3.bucket": bucket_name, "dynamodb.s3.prefix": bucket_prefix, "dynamodb.s3.bucketOwner": account_id_of_bucket, } ) print(dyf.getNumPartitions()) job.commit()
Scala
import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.DynamoDbDataSink import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object GlueApp { def main(sysArgs: Array[String]): Unit = { val glueContext = new GlueContext(SparkContext.getOrCreate()) val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) val dynamicFrame = glueContext.getSourceWithFormat( connectionType = "dynamodb", options = JsonOptions(Map( "dynamodb.export" -> "ddb", "dynamodb.tableArn" -> test_source, "dynamodb.s3.bucket" -> bucket_name, "dynamodb.s3.prefix" -> bucket_prefix, "dynamodb.s3.bucketOwner" -> account_id_of_bucket, )) ).getDynamicFrame() print(dynamicFrame.getNumPartitions()) Job.commit() } }

これらの例は、(エクスポートコネクタを介して)読み取りを行い、dynamodb 分類を持つ AWS Glue データカタログテーブルからパーティションの数を出力する方法を示しています。      

Python
import sys from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job from awsglue.utils import getResolvedOptions args = getResolvedOptions(sys.argv, ["JOB_NAME"]) glue_context= GlueContext(SparkContext.getOrCreate()) job = Job(glue_context) job.init(args["JOB_NAME"], args) dynamicFrame = glue_context.create_dynamic_frame.from_catalog( database=catalog_database, table_name=catalog_table_name, additional_options={ "dynamodb.export": "ddb", "dynamodb.s3.bucket": s3_bucket, "dynamodb.s3.prefix": s3_bucket_prefix } ) print(dynamicFrame.getNumPartitions()) job.commit()
Scala
import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.DynamoDbDataSink import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object GlueApp { def main(sysArgs: Array[String]): Unit = { val glueContext = new GlueContext(SparkContext.getOrCreate()) val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) val dynamicFrame = glueContext.getCatalogSource( database = catalog_database, tableName = catalog_table_name, additionalOptions = JsonOptions(Map( "dynamodb.export" -> "ddb", "dynamodb.s3.bucket" -> s3_bucket, "dynamodb.s3.prefix" -> s3_bucket_prefix )) ).getDynamicFrame() print(dynamicFrame.getNumPartitions()) )

DynamoDB エクスポート JSON の使用を簡素化

AWS Glue DynamoDB エクスポートコネクタを使用した DynamoDB エクスポートでは、特殊なネスト構造の JSON ファイルが生成されます。詳細については、データオブジェクトを参照してください。 AWS Glue は DynamicFrame 変換を提供します。これを使用して、このような構造をダウンストリームアプリケーションで使いやすい形式に変換できます。

変換は 2 つの方法のいずれかで起動できます。。DynamoDB から読み取るメソッドを呼び出すときに、値 "true" を使用して接続オプション "dynamodb.simplifyDDBJson" を設定できます。また、AWS Glue ライブラリで独立して利用可能なメソッドとして変換を呼び出すこともできます。

DynamoDB エクスポートによって生成された次のスキーマを考えてみましょう。

root |-- Item: struct | |-- parentMap: struct | | |-- M: struct | | | |-- childMap: struct | | | | |-- M: struct | | | | | |-- appName: struct | | | | | | |-- S: string | | | | | |-- packageName: struct | | | | | | |-- S: string | | | | | |-- updatedAt: struct | | | | | | |-- N: string | |-- strings: struct | | |-- SS: array | | | |-- element: string | |-- numbers: struct | | |-- NS: array | | | |-- element: string | |-- binaries: struct | | |-- BS: array | | | |-- element: string | |-- isDDBJson: struct | | |-- BOOL: boolean | |-- nullValue: struct | | |-- NULL: boolean

この simplifyDDBJson 変換により、以下のように簡略化されます。

root |-- parentMap: struct | |-- childMap: struct | | |-- appName: string | | |-- packageName: string | | |-- updatedAt: string |-- strings: array | |-- element: string |-- numbers: array | |-- element: string |-- binaries: array | |-- element: string |-- isDDBJson: boolean |-- nullValue: null
注記

simplifyDDBJson は、AWS Glue バージョン 3.0 以降で有効です。unnestDDBJson トランスフォームを使用して DynamoDB のエクスポート JSON を簡略化することもできます。ユーザーには unnestDDBJson から simplifyDDBJson への移行をお勧めします。

DynamoDB オペレーションにおける並列処理の設定

パフォーマンスを向上させるために、DynamoDB コネクタで使用できる特定のパラメーターを調整できます。並列処理パラメーターを調整する際の目標は、プロビジョニングされた AWS Glue ワーカーを最大限に活用することです。そして、さらにパフォーマンスを向上させる必要がある場合は、DPU 数を増加してジョブをスケールアウトすることをお勧めします。

ETL コネクタを使用する場合、dynamodb.splits パラメータを使用して DynamoDB 読み取りオペレーションの並列処理を変更できます。  エクスポートコネクタで読み込みをする場合は、Spark エグゼキュータの並列処理の分割数を構成する必要がありません。DynamoDB 書き込みオペレーションの並列処理は、dynamodb.output.numParallelTasks を使用して変更できます。

DynamoDB ETL コネクターによる読み取り

dynamodb.splitsジョブ構成に設定されている最大ワーカー数と以下 numSlots の計算に基づいて計算することをお勧めします。自動スケーリングの場合、実際に利用可能なワーカー数は、その上限に基づいて変更される可能性があります。最大ワーカー数の設定についての詳細は、「AWS Glue で Spark ジョブに関するジョブプロパティの構成」の「[ワーカー数] (NumberOfWorkers)」を参照してください。

  • numExecutors = NumberOfWorkers - 1

    1 つのエグゼキューターが Spark ドライバー専用になっている場合、他のエグゼキューターはデータの処理に使用されます。

  • numSlotsPerExecutor =

    AWS Glue 3.0 and later versions
    • WorkerTypeG.1X の場合は 4

    • WorkerTypeG.2X の場合は 8

    • WorkerTypeG.4X の場合は 16

    • WorkerTypeG.8X の場合は 32

    AWS Glue 2.0 and legacy versions
    • WorkerTypeG.1X の場合は 8

    • WorkerTypeG.2X の場合は 16

  • numSlots = numSlotsPerExecutor * numExecutors

dynamodb.splits を使用可能なスロット数、numSlots に設定することをおすすめします。

DynamoDB への書き込み 

この dynamodb.output.numParallelTasks パラメータは、以下の計算を使用して Spark タスクごとの WCU を決定するために使用されます。

permittedWcuPerTask = ( TableWCU * dynamodb.throughput.write.percent ) / dynamodb.output.numParallelTasks

DynamoDB ライターは、設定が DynamoDB に書き込まれる Spark タスクの数を正確に表している場合に最適に機能します。場合によっては、書き込みパフォーマンスを向上させるために、デフォルトの計算をオーバーライドする必要がある場合があります。このパラメータを指定しない場合、Spark タスクごとに許可される WCU は、次の式により自動的に計算されます。

    • numPartitions = dynamicframe.getNumPartitions()

    • numSlots (このセクションすでに定義したとおり)

    • numParallelTasks = min(numPartitions, numSlots)

  • 例 1。DPU=10、WorkerType=Standard。入力 DynamicFrame には、100 個の RDD パーティションがあります。

    • numPartitions = 100

    • numExecutors = (10 - 1) * 2 - 1 = 17

    • numSlots = 4 * 17 = 68

    • numParallelTasks = min(100, 68) = 68

  • 例 2。DPU=10、WorkerType=Standard。入力 DynamicFrame には、20 個の RDD パーティションがあります。

    • numPartitions = 20

    • numExecutors = (10 - 1) * 2 - 1 = 17

    • numSlots = 4 * 17 = 68

    • numParallelTasks = min(20, 68) = 20

注記

従来の AWS Glue バージョンのジョブと Standard Worker を使用するジョブでは、スロット数を計算するために違った方法が必要となります。これらのジョブのパフォーマンスを調整する必要がある場合は、サポートされている AWS Glue バージョンに移行することをおすすめします。

DynamoDB 接続オプションのリファレンス

Amazon DynamoDB への接続を指定します。

接続オプションは、ソース接続とシンク接続とで異なります。

"connectionType": ソースとして ETL コネクタを使用する "dynamodb"

AWS Glue DynamoDB ETL コネクタを使用している場合は、"connectionType": "dynamodb" をソースとして次の接続オプションを使用します。

  • "dynamodb.input.tableName": (必須) 読み取り元の DynamoDB テーブル。

  • "dynamodb.throughput.read.percent": (オプション) 使用する読み込みキャパシティーユニット (RCU) の割合。デフォルトでは、"0.5" に設定されます。許容値は "0.1" から "1.5" (これらの値を含む) です。

    • 0.5 ではデフォルトの読み込み速度を表し、AWS Glue はテーブルの読み込み容量の半分を消費しようとすることを意味します。上記の値を 0.5 より上に設定すると、AWS Glue は読み取りのリクエストレートを増加させ、0.5 より低くした場合はそのリクエストレートを減少させます。(実際の読み込みレートは、DynamoDB テーブルに、統一ディストリビューションのキーがあるかどうかなどの要因によって変わります。)

    • DynamoDB テーブルがオンデマンドモードの場合、AWS Glue はテーブルの読み取り容量を 40000 として処理します。大きなテーブルをエクスポートする場合は、DynamoDB テーブルをオンデマンドモードに切り替えることをお勧めします。

  • "dynamodb.splits": (オプション) 読み取り中にこの DynamoDB テーブルを分割するパーティションの数を定義します。デフォルトでは、"1" に設定されます。許容値は "1" から "1,000,000" (これらの値を含む) です。

    1 は並列処理がないことを表します。より良いパフォーマンスを得るためには、より大きな値を (以下の式を使用して) 指定することを強くお勧めします。値を適切に設定する方法の詳細は、「DynamoDB オペレーションにおける並列処理の設定」を参照してください。

  • "dynamodb.sts.roleArn": (オプション) クロスアカウントアクセスのために引き受ける IAM ロール の ARN。このパラメータは、AWS Glue 1.0 以降で使用可能です。

  • "dynamodb.sts.roleSessionName": (任意) STS セッション名。デフォルトでは、「glue-dynamodb-read-sts-session」に設定されています。このパラメータは、AWS Glue 1.0 以降で使用可能です。

"connectionType":AWS Glue DynamoDB エクスポートコネクタをソースとする "dynamodb"

AWS Glue バージョン 2.0 以降でのみ使用可能な AWS Glue DynamoDB エクスポートコネクタを使用する場合は、ソースとして "connectionType": "dynamodb" で次の接続オプションを使用します。

  • "dynamodb.export": (必須) 文字列値:

    • ddb に設定すると、AWS Glue DynamoDB エクスポートコネクタが有効になり、AWS Glue ジョブ中に新しい ExportTableToPointInTimeRequest が呼び出されます。dynamodb.s3.bucketdynamodb.s3.prefix から渡された場所で新しいエクスポートが生成されます。

    • s3 に設定すると、AWS Glue DynamoDB エクスポートコネクタが有効になりますが、新しい DynamoDB エクスポートの作成はスキップされ、代わりに dynamodb.s3.bucketdynamodb.s3.prefix がそのテーブルの過去のエクスポートの Amazon S3 ロケーションとして使用されます。

  • "dynamodb.tableArn": (必須) 読み取り元の DynamoDB テーブル。

  • "dynamodb.unnestDDBJson": (オプション) デフォルト: false。  有効な値: ブール値  true に設定すると、エクスポートに存在する DynamoDB JSON 構造体のネスト解除の変換が実行されます。"dynamodb.unnestDDBJson""dynamodb.simplifyDDBJson" を同時に true に設定するとエラーになります。AWS Glue 3.0 以降のバージョンでは、DynamoDB マップタイプを簡略化するときの動作を改善するために "dynamodb.simplifyDDBJson" を使用することをおすすめします。詳細については、「DynamoDB エクスポート JSON の使用を簡素化」を参照してください。

  • "dynamodb.simplifyDDBJson": (オプション) デフォルト: false。  有効な値: ブール値  true に設定すると、エクスポートに存在する DynamoDB JSON 構造のスキーマを簡素化するための変換を実行します。これは "dynamodb.unnestDDBJson" オプションと同じ目的ですが、DynamoDB テーブル内の DynamoDB マップタイプやネストされたマップタイプをより適切にサポートします。このオプションは、AWS Glue バージョン 3.0 以降でのみ有効です。"dynamodb.unnestDDBJson""dynamodb.simplifyDDBJson" を同時に true に設定するとエラーになります。詳細については、「DynamoDB エクスポート JSON の使用を簡素化」を参照してください。

  • "dynamodb.s3.bucket": (オプション) DynamoDB ExportTableToPointInTime プロセスが実行される Simple Storage Service (Amazon S3) バケットの場所を示します。エクスポートファイル形式は DynamoDB JSON です。

    • "dynamodb.s3.prefix": (オプション) DynamoDB ExportTableToPointInTime の読み込みが保存される Simple Storage Service (Amazon S3) バケット内の Amazon S3 プレフィックスの場所を示します。dynamodb.s3.prefixdynamodb.s3.bucket のどちらも指定しなかった場合、これらの値は AWS Glue ジョブの設定で指定された一時ディレクトリの場所がデフォルトとなります。詳細については、「AWS Glue で使用される特別なパラメータ」を参照してください。

    • "dynamodb.s3.bucketOwner": クロスアカウント Simple Storage Service (Amazon S3) アクセスのために必要なバケット所有者を示します。

  • "dynamodb.sts.roleArn": (オプション) DynamoDB テーブルのクロスアカウントアクセスおよび/またはクロスリージョンアクセスのために割り当てる IAM ロール の ARN。注: 同じ IAM ロール のARN を使用して、ExportTableToPointInTime リクエストに指定された Simple Storage Service (Amazon S3) の場所にアクセスします。

  • "dynamodb.sts.roleSessionName": (任意) STS セッション名。デフォルトでは、「glue-dynamodb-read-sts-session」に設定されています。

  • "dynamodb.exportTime" (オプション) 有効な値: ISO-8601 インスタントを表す文字列。  エクスポートが実行されるべき時点。

  • "dynamodb.sts.region": (リージョンのエンドポイントを使用してリージョン間通話を行う場合は必須) 読み取りたい DynamoDB テーブルをホストするリージョン。

"connectionType": ETLコネクタをシンクとして使用する "dynamodb"

"connectionType": "dynamodb" をシンクとして、次の接続オプションを使用します。

  • "dynamodb.output.tableName": (必須) 書き込み先の DynamoDB テーブル。

  • "dynamodb.throughput.write.percent": (オプション) 使用する書き込みキャパシティーユニット (WCU) の割合。デフォルトでは、"0.5" に設定されます。許容値は "0.1" から "1.5" (これらの値を含む) です。

    • 0.5 ではデフォルトの読み込み速度を表し、AWS Glue はテーブルの書き込み容量の半分を消費しようとすることを意味します。上記の値を 0.5 より上に設定すると、AWS Glue は書き込みリクエストレートを増加させ、0.5 より低くした場合はそのリクエストレートを減少させます。(実際の書き込みレートは、統一されたキーのディストリビューションが、DynamoDB テーブルにあるかどうかなどの要因によって変わります)。

    • DynamoDB テーブルがオンデマンドモードの場合、AWS Glue はテーブルの書き込み容量を 40000 として処理します。大きなテーブルをインポートする場合は、DynamoDB テーブルをオンデマンドモードに切り替えることをお勧めします。

  • "dynamodb.output.numParallelTasks": (オプション) 同時に DynamoDB に書き込める並列タスクの数を定義します。Spark タスクごとに許容される WCU を計算するために使用されます。ほとんどの場合、AWS Glue はこの値の妥当なデフォルト値を計算します。  詳細については、「DynamoDB オペレーションにおける並列処理の設定」を参照してください。

  • "dynamodb.output.retry": (オプション) DynamoDB から ProvisionedThroughputExceededException が送られている場合の、再試行の実行回数を定義します。デフォルトでは、"10" に設定されています。

  • "dynamodb.sts.roleArn": (オプション) クロスアカウントアクセスのために引き受ける IAM ロール の ARN。

  • "dynamodb.sts.roleSessionName": (任意) STS セッション名。デフォルトでは、「glue-dynamodb-write-sts-session」に設定されています。