MongoDB 接続 - AWS Glue

MongoDB 接続

AWS Glue for Spark を使用して、AWS Glue 4.0 以降のバージョンの MongoDB および MongoDB Atlas のテーブルからの読み取りとテーブルへの書き込みを行うことができます。AWS Glue 接続を介して AWS Secrets Manager で保存されているユーザー名およびパスワードの認証情報を使用して MongoDB に接続できます。

MongoDB の詳細については、MongoDB のドキュメントを参照してください。

MongoDB 接続の設定

AWS Glue から MongoDB に接続するには、MongoDB 認証情報、mongodbUser および mongodbPass が必要です。

AWS Glue から MongoDB に接続するには、いくつかの前提条件を満たす必要がある場合があります。

  • MongoDB インスタンスが Amazon VPC 内にある場合は、トラフィックがパブリックインターネットを経由することなく、AWS Glue ジョブが MongoDB インスタンスと通信できるように Amazon VPC を設定します。

    Amazon VPC で、AWS Glue がジョブの実行中に使用する [VPC][サブネット]、および [セキュリティグループ] を特定または作成します。さらに、MongoDB インスタンスとこの場所の間のネットワークトラフィックを許可するように Amazon VPC が設定されているようにする必要があります。ネットワークレイアウトに基づいて、セキュリティグループルール、ネットワーク ACL、NAT ゲートウェイ、およびピアリング接続の変更が必要になる場合があります。

その後、MongoDB で使用できるように AWS Glue を設定する作業に進むことができます。

MongoDB に対する接続を設定するには:
  1. 必要に応じて、AWS Secrets Manager で、MongoDB 認証情報を使用してシークレットを作成します。Secrets Manager でシークレットを作成するには、AWS Secrets Manager ドキュメントの「AWS Secrets Manager シークレットを作成する」にあるチュートリアルに従ってください。シークレットを作成したら、次のステップのためにシークレット名 secretName を保存しておきます。

    • [key/value ペア] を選択する際に、mongodbUser という値を持つキー username のペアを作成します。

      [key/value ペア] を選択する際に、mongodbPass という値を持つキー password のペアを作成します。

  2. AWS Glue コンソールで、「AWS Glue 接続の追加」にあるステップに従って接続を作成します。接続を作成したら、将来的に AWS Glue で使用するために、接続名 connectionName を維持します。

    • [接続タイプ] を選択する際には、[MongoDB] または [MongoDB Atlas] を選択します。

    • [MongoDB URL] または [MongoDB Atlas URL] を選択する場合は、MongoDB インスタンスのホスト名を入力します。

      MongoDB URL は、mongodb://mongoHost:mongoPort/mongoDBname の形式で指定されます。

      MongoDB Atlas URL は、mongodb+srv://mongoHost:mongoPort/mongoDBname の形式で指定されます。

      接続にデフォルトのデータベースを指定する場合、mongoDBname はオプションです。

    • Secrets Manager シークレットを作成することを選択した場合は、AWS Secrets Manager の [認証情報タイプ] を選択します。

      その後、[AWS シークレット]secretName を入力します。

    • [ユーザー名とパスワード] を入力することを選択した場合は、mongodbUser および mongodbPass を入力します。

  3. 次の状況では、追加の設定が必要になる場合があります。

    • Amazon VPC の AWS でホストされている MongoDB インスタンスの場合

      • MongoDB セキュリティ認証情報を定義する AWS Glue 接続に、Amazon VPC 接続に関する情報を提供する必要があります。接続を作成または更新する際に、[ネットワークオプション][VPC][サブネット]、および [セキュリティグループ] を設定します。

AWS Glue MongoDB 接続を作成した後、接続メソッドを呼び出す前に次のアクションを実行する必要があります。

  • Secrets Manager シークレットを作成することを選択した場合は、AWS Glue ジョブに関連付けられた IAM ロールに secretName を読み取るための許可を付与します。

  • AWS Glue ジョブ設定で、追加のネットワーク接続として connectionName を指定します。

AWS Glue for Spark で AWS Glue MongoDB 接続を使用するには、接続メソッド呼び出しで connectionName オプションを指定します。あるいは、ETL ジョブでの MongoDB 接続の操作 のステップに従って、AWS Glue データカタログと組み合わせて接続を使用することもできます。

AWS Glue 接続を使用した MongoDB からの読み取り

前提条件:

  • 読み取り元とする MongoDB コレクション。コレクションの識別情報が必要になります。

    MongoDB コレクションは、データベース名とコレクション名である mongodbName および mongodbCollection によって識別されます。

  • 認証情報を提供するように設定された AWS Glue MongoDB 接続。認証情報を設定するには、前の手順「MongoDB に対する接続を設定するには」のステップを実行します。AWS Glue 接続、connectionName の名前が必要になります。

例:

mongodb_read = glueContext.create_dynamic_frame.from_options( connection_type="mongodb", connection_options={ "connectionName": "connectionName", "database": "mongodbName", "collection": "mongodbCollection", "partitioner": "com.mongodb.spark.sql.connector.read.partitioner.SinglePartitionPartitioner", "partitionerOptions.partitionSizeMB": "10", "partitionerOptions.partitionKey": "_id", "disableUpdateUri": "false", } )

MongoDB テーブルへの書き込み

この例では、既存の DynamicFrame である dynamicFrame から MongoDB に情報を書き込みます。

前提条件:

  • 書き込み先とする MongoDB コレクション。コレクションの識別情報が必要になります。

    MongoDB コレクションは、データベース名とコレクション名である mongodbName および mongodbCollection によって識別されます。

  • 認証情報を提供するように設定された AWS Glue MongoDB 接続。認証情報を設定するには、前の手順「MongoDB に対する接続を設定するには」のステップを実行します。AWS Glue 接続、connectionName の名前が必要になります。

例:

glueContext.write_dynamic_frame.from_options( frame=dynamicFrame, connection_type="mongodb", connection_options={ "connectionName": "connectionName", "database": "mongodbName", "collection": "mongodbCollection", "disableUpdateUri": "false", "retryWrites": "false", }, )

MongoDB テーブルに対する読み書き

この例では、既存の DynamicFrame である dynamicFrame から MongoDB に情報を書き込みます。

前提条件:

  • 読み取り元とする MongoDB コレクション。コレクションの識別情報が必要になります。

    書き込み先とする MongoDB コレクション。コレクションの識別情報が必要になります。

    MongoDB コレクションは、データベース名とコレクション名である mongodbName および mongodbCollection によって識別されます。

  • MongoDB 認証情報である mongodbUser および mongodbPassword

例:

Python
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext, SparkConf from awsglue.context import GlueContext from awsglue.job import Job import time ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) output_path = "s3://some_bucket/output/" + str(time.time()) + "/" mongo_uri = "mongodb://<mongo-instanced-ip-address>:27017" mongo_ssl_uri = "mongodb://<mongo-instanced-ip-address>:27017" write_uri = "mongodb://<mongo-instanced-ip-address>:27017" read_mongo_options = { "uri": mongo_uri, "database": "mongodbName", "collection": "mongodbCollection", "username": "mongodbUsername", "password": "mongodbPassword", "partitioner": "MongoSamplePartitioner", "partitionerOptions.partitionSizeMB": "10", "partitionerOptions.partitionKey": "_id"} ssl_mongo_options = { "uri": mongo_ssl_uri, "database": "mongodbName", "collection": "mongodbCollection", "ssl": "true", "ssl.domain_match": "false" } write_mongo_options = { "uri": write_uri, "database": "mongodbName", "collection": "mongodbCollection", "username": "mongodbUsername", "password": "mongodbPassword", } # Get DynamicFrame from MongoDB dynamic_frame = glueContext.create_dynamic_frame.from_options(connection_type="mongodb", connection_options=read_mongo_options) # Write DynamicFrame to MongoDB glueContext.write_dynamic_frame.from_options(dynamicFrame, connection_type="mongodb", connection_options=write_mongo_options) job.commit()
Scala
import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.MappingSpec import com.amazonaws.services.glue.errors.CallSite import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.DynamicFrame import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object GlueApp { val DEFAULT_URI: String = "mongodb://<mongo-instanced-ip-address>:27017" val WRITE_URI: String = "mongodb://<mongo-instanced-ip-address>:27017" lazy val defaultJsonOption = jsonOptions(DEFAULT_URI) lazy val writeJsonOption = jsonOptions(WRITE_URI) def main(sysArgs: Array[String]): Unit = { val spark: SparkContext = new SparkContext() val glueContext: GlueContext = new GlueContext(spark) val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) // Get DynamicFrame from MongoDB val dynamicFrame: DynamicFrame = glueContext.getSource("mongodb", defaultJsonOption).getDynamicFrame() // Write DynamicFrame to MongoDB glueContext.getSink("mongodb", writeJsonOption).writeDynamicFrame(dynamicFrame) Job.commit() } private def jsonOptions(uri: String): JsonOptions = { new JsonOptions( s"""{"uri": "${uri}", |"database":"mongodbName", |"collection":"mongodbCollection", |"username": "mongodbUsername", |"password": "mongodbPassword", |"ssl":"true", |"ssl.domain_match":"false", |"partitioner": "MongoSamplePartitioner", |"partitionerOptions.partitionSizeMB": "10", |"partitionerOptions.partitionKey": "_id"}""".stripMargin) } }

MongoDB 接続オプションのリファレンス

MongoDB への接続を指定します。接続オプションは、ソース接続とシンク接続とで異なります。

これらの接続プロパティは、ソース接続とシンク接続の間で共有されます。

  • connectionName — 読み込み/書き込みに使用されます。認証およびネットワークの情報を接続方法に提供するように設定された AWS Glue MongoDB 接続の名前。前のセクション「MongoDB 接続の設定」で説明したように AWS Glue 接続が設定されている場合、connectionName を入力することで、"uri""username"、および "password" 接続オプションを入力する必要がなくなります。

  • "uri": (必須) 読み込み元の MongoDB ホスト (形式: mongodb://<host>:<port>)。AWS Glue 4.0 より前のバージョンの AWS Glue で使用されます。

  • "connection.uri": (必須) 読み込み元の MongoDB ホスト (形式: mongodb://<host>:<port>)。AWS Glue 4.0 以降のバージョンで使用されます。

  • "username": (必須) MongoDB のユーザー名。

  • "password": (必須) MongoDB のパスワード。

  • "database": (必須) 読み込み元の MongoDB データベース。このオプションは、ジョブスクリプトで glue_context.create_dynamic_frame_from_catalog を呼び出す際に、additional_options を介して渡すことも可能です。

  • "collection": (必須) 読み込み元の MongoDB コレクション。このオプションは、ジョブスクリプトで glue_context.create_dynamic_frame_from_catalog を呼び出す際に、additional_options を介して渡すことも可能です。

"connectionType": "mongodb" ソースとする

"connectionType": "mongodb" をソースとして、次の接続オプションを使用します。

  • "ssl": (オプション) true の場合、SSL 接続を開始します。デフォルト: false

  • "ssl.domain_match": (任意) truessltrue の場合 、ドメイン一致チェックが実行されます。デフォルト: true

  • "batchSize": (オプション): 内部バッチのカーソル内で使用される、バッチごとに返されるドキュメントの数。

  • "partitioner": (オプション): MongoDB から入力データを読み取るためのパーティショナーのクラス名。コネクタには、次のパーティショナーがあります。

    • MongoDefaultPartitioner (デフォルト) (AWS Glue 4.0 ではサポートされていません)

    • MongoSamplePartitioner (MongoDB 3.2 以降が必要) (AWS Glue 4.0 ではサポートされていません)

    • MongoShardedPartitioner (AWS Glue 4.0 ではサポートされていません)

    • MongoSplitVectorPartitioner (AWS Glue 4.0 ではサポートされていません)

    • MongoPaginateByCountPartitioner (AWS Glue 4.0 ではサポートされていません)

    • MongoPaginateBySizePartitioner (AWS Glue 4.0 ではサポートされていません)

    • com.mongodb.spark.sql.connector.read.partitioner.SinglePartitionPartitioner

    • com.mongodb.spark.sql.connector.read.partitioner.ShardedPartitioner

    • com.mongodb.spark.sql.connector.read.partitioner.PaginateIntoPartitionsPartitioner

  • "partitionerOptions" (オプション): 指定されたパーティショナーのオプション。各パーティショナーでは、次のオプションがサポートされています。

    • MongoSamplePartitioner: partitionKey, partitionSizeMB, samplesPerPartition

    • MongoShardedPartitioner: shardkey

    • MongoSplitVectorPartitioner: partitionKey, partitionSizeMB

    • MongoPaginateByCountPartitioner: partitionKey, numberOfPartitions

    • MongoPaginateBySizePartitioner: partitionKey, partitionSizeMB

    これらのオプションの詳細については、MongoDB のドキュメントの「Partitioner Configuration」を参照してください。

"connectionType": "mongodb" as Sink

"connectionType": "mongodb" をシンクとして、次の接続オプションを使用します。

  • "ssl": (オプション) true の場合、SSL 接続を開始します。デフォルト: false

  • "ssl.domain_match": (任意) truessltrue の場合 、ドメイン一致チェックが実行されます。デフォルト: true

  • "extendedBsonTypes": (オプション) true が設定されている場合、MongoDB にデータを書き込む際に拡張 BSON 型を使用することを許可します。デフォルト: true

  • "replaceDocument": (オプション) true の場合、_id フィールドを含むデータセットを保存するときに、ドキュメント全体を置き換えます。false の場合、データセットのフィールドと一致するドキュメントのフィールドのみが更新されます。デフォルト: true

  • "maxBatchSize": (オプション): データを保存するときの一括オペレーションの最大バッチサイズ。デフォルトは 512 です。

  • "retryWrites": (オプション): AWS Glue でネットワークエラーが発生した場合、特定の書き込みオペレーションを 1 回自動的に再試行します。