Spark 用の AWS Glue を使用して Amazon DocumentDB 内のテーブルに対する読み込みと書き込みを行うことができます。AWS Glue 接続を介して AWS Secrets Manager で保存されている認証情報を使用して Amazon DocumentDB に接続できます。
Amazon DocumentDB の詳細については、「Amazon DocumentDB のドキュメント」を参照してください。
注記
Amazon DocumentDB エラスティッククラスターは、現在 AWS Glue コネクタを使用する場合にはサポートされていません。エラスティッククラスターの詳細については、「Amazon DocumentDB エラスティッククラスターの使用」を参照してください。
Amazon DocumentDB コレクションへの読み取りと書き込み
注記
Amazon DocumentDB に接続する ETL ジョブを作成する際、Connections
ジョブのプロパティにおいて、Amazon DocumentDB が実行されている Virtual Private Cloud (VPC) を特定するための、接続オブジェクトを指定する必要があります。接続オブジェクトの場合、接続タイプは JDBC
で、JDBC URL
は mongo://
である必要があります。<DocumentDB_host>
:27017
注記
これらのコードサンプルは AWS Glue 3.0 用に開発されました。AWS Glue 4.0 への移行については、MongoDB を参照してください。uri
パラメータが変更されました。
注記
Amazon DocumentDB を使用する場合、書かれた文書に _id
が指定されている場合など、特定の状況では、retryWrites
は false に設定する必要があります。詳細については、Amazon DocumentDB ドキュメントの「MongoDB との機能の違い」を参照してください。
次の Python スクリプトでは、Amazon DocumentDB に対する読み取りと書き込みのための、接続タイプと接続オプションの使用方法を示しています。
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext, SparkConf
from awsglue.context import GlueContext
from awsglue.job import Job
import time
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
output_path = "s3://some_bucket/output/" + str(time.time()) + "/"
documentdb_uri = "mongodb://<mongo-instanced-ip-address>:27017"
documentdb_write_uri = "mongodb://<mongo-instanced-ip-address>:27017"
read_docdb_options = {
"uri": documentdb_uri,
"database": "test",
"collection": "coll",
"username": "username",
"password": "1234567890",
"ssl": "true",
"ssl.domain_match": "false",
"partitioner": "MongoSamplePartitioner",
"partitionerOptions.partitionSizeMB": "10",
"partitionerOptions.partitionKey": "_id"
}
write_documentdb_options = {
"retryWrites": "false",
"uri": documentdb_write_uri,
"database": "test",
"collection": "coll",
"username": "username",
"password": "pwd"
}
# Get DynamicFrame from DocumentDB
dynamic_frame2 = glueContext.create_dynamic_frame.from_options(connection_type="documentdb",
connection_options=read_docdb_options)
# Write DynamicFrame to MongoDB and DocumentDB
glueContext.write_dynamic_frame.from_options(dynamic_frame2, connection_type="documentdb",
connection_options=write_documentdb_options)
job.commit()
次の Scala スクリプトでは、Amazon DocumentDB に対する読み取りと書き込みのための、接続タイプと接続オプションの使用方法を示しています。
import com.amazonaws.services.glue.GlueContext
import com.amazonaws.services.glue.MappingSpec
import com.amazonaws.services.glue.errors.CallSite
import com.amazonaws.services.glue.util.GlueArgParser
import com.amazonaws.services.glue.util.Job
import com.amazonaws.services.glue.util.JsonOptions
import com.amazonaws.services.glue.DynamicFrame
import org.apache.spark.SparkContext
import scala.collection.JavaConverters._
object GlueApp {
val DOC_URI: String = "mongodb://<mongo-instanced-ip-address>:27017"
val DOC_WRITE_URI: String = "mongodb://<mongo-instanced-ip-address>:27017"
lazy val documentDBJsonOption = jsonOptions(DOC_URI)
lazy val writeDocumentDBJsonOption = jsonOptions(DOC_WRITE_URI)
def main(sysArgs: Array[String]): Unit = {
val spark: SparkContext = new SparkContext()
val glueContext: GlueContext = new GlueContext(spark)
val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray)
Job.init(args("JOB_NAME"), glueContext, args.asJava)
// Get DynamicFrame from DocumentDB
val resultFrame2: DynamicFrame = glueContext.getSource("documentdb", documentDBJsonOption).getDynamicFrame()
// Write DynamicFrame to DocumentDB
glueContext.getSink("documentdb", writeJsonOption).writeDynamicFrame(resultFrame2)
Job.commit()
}
private def jsonOptions(uri: String): JsonOptions = {
new JsonOptions(
s"""{"uri": "${uri}",
|"database":"test",
|"collection":"coll",
|"username": "username",
|"password": "pwd",
|"ssl":"true",
|"ssl.domain_match":"false",
|"partitioner": "MongoSamplePartitioner",
|"partitionerOptions.partitionSizeMB": "10",
|"partitionerOptions.partitionKey": "_id"}""".stripMargin)
}
}
Amazon DocumentDB 接続のオプションのリファレンス
Amazon DocumentDB (MongoDB 互換) への接続を指定します。
接続オプションは、ソース接続とシンク接続とで異なります。
"connectionType": "Documentdb" ソースとする
"connectionType": "documentdb"
をソースとして、次の接続オプションを使用します。
-
"uri"
: (必須) 読み取り元の Amazon DocumentDB ホスト (mongodb://<host>:<port>
形式)。 -
"database"
: (必須) 読み取り元の Amazon DocumentDB データベース。 -
"collection"
: (必須) 読み取り元の Amazon DocumentDB コレクション。 -
"username"
: (必須) Amazon DocumentDB のユーザー名。 -
"password"
: (必須) Amazon DocumentDB のパスワード。 -
"ssl"
: (SSL を使用する場合は必須) SSL を使用して接続する場合は、このオプションの値を"true"
に設定する必要があります。 -
"ssl.domain_match"
: (SSL を使用する場合は必須) SSL を使用して接続する場合は、このオプションの値を"false"
に設定する必要があります。 -
"batchSize"
: (オプション): 内部バッチのカーソル内で使用される、バッチごとに返されるドキュメントの数。 -
"partitioner"
: (オプション) Amazon DocumentDB から入力データを読み取るためのパーティショナーのクラス名。コネクタには、次のパーティショナーがあります。-
MongoDefaultPartitioner
(デフォルト) (AWS Glue 4.0 ではサポートされていません) -
MongoSamplePartitioner
(AWS Glue 4.0 ではサポートされていません) -
MongoShardedPartitioner
-
MongoSplitVectorPartitioner
-
MongoPaginateByCountPartitioner
-
MongoPaginateBySizePartitioner
(AWS Glue 4.0 ではサポートされていません)
-
-
"partitionerOptions"
(オプション): 指定されたパーティショナーのオプション。各パーティショナーでは、次のオプションがサポートされています。-
MongoSamplePartitioner
:partitionKey
,partitionSizeMB
,samplesPerPartition
-
MongoShardedPartitioner
:shardkey
-
MongoSplitVectorPartitioner
:partitionKey
、partitionSizeMB -
MongoPaginateByCountPartitioner
:partitionKey
,numberOfPartitions
-
MongoPaginateBySizePartitioner
:partitionKey
、partitionSizeMB
これらのオプションの詳細については、MongoDB のドキュメントの「Partitioner Configuration
」を参照してください。 -
"connectionType": "documentdb" as Sink
"connectionType": "documentdb"
をシンクとして、次の接続オプションを使用します。
-
"uri"
: (必須) 書き込み先の Amazon DocumentDB ホスト (mongodb://<host>:<port>
形式)。 -
"database"
: (必須) 書き込み先の Amazon DocumentDB データベース。 -
"collection"
: (必須) 書き込み先の Amazon DocumentDB コレクション。 -
"username"
: (必須) Amazon DocumentDB のユーザー名。 -
"password"
: (必須) Amazon DocumentDB のパスワード。 -
"extendedBsonTypes"
: (オプション)true
が指定されている場合、Amazon DocumentDB へのデータ書き込み時に拡張 BSON 型を使用できます。デフォルト:true
。 -
"replaceDocument"
: (オプション)true
の場合、_id
フィールドを含むデータセットを保存するときに、ドキュメント全体を置き換えます。false
の場合、データセットのフィールドと一致するドキュメントのフィールドのみが更新されます。デフォルト:true
。 -
"maxBatchSize"
: (オプション): データを保存するときの一括オペレーションの最大バッチサイズ。デフォルトは 512 です。 -
"retryWrites"
: (オプション): AWS Glue でネットワークエラーが発生した場合、特定の書き込みオペレーションを 1 回自動的に再試行します。