MongoDB 連線 - AWS Glue

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

MongoDB 連線

您可以使用 AWS Glue for Spark 從 AWS Glue 4.0 及更新版本中的 MongoDB 和 MongoDB Atlas 讀取和寫入資料表。您可以使用透過 AWS Glue 連線儲存於 AWS Secrets Manager 的使用者名稱和密碼憑證來連線至 MongoDB。

如需有關 MongoDB 的詳細資訊,請參閱 MongoDB 文件

設定 MongoDB 連線

若要從 AWS 連線至 MongoDB,您將需要 MongoDB 憑證 mongodbUsermongodbPass

若要從 AWS Glue 連線至 MongoDB,您可能需要部分先決條件:

  • 如果 Mongo DB 執行個體位於 Amazon VPC 中,請設定 Amazon VPC 以允許 AWS Glue 任務與 MongoDB 執行個體通訊,使流量不會周遊公有網際網路。

    在 Amazon VPC 中,識別或建立 AWS Glue 將在執行任務時使用的 VPC子網路安全群組。此外,您也需要確保 Amazon VPC 已完成設定,以允許 MongoDB 執行個體與此位置之間的網路流量。根據您的網路配置,這可能需要變更安全群組規則、網路 ACL、NAT 閘道及對等連線。

然後,您可以繼續設定 AWS Glue 以搭配 MongoDB 使用。

設定連至 MongoDB 的連線:
  1. 或者,在 AWS Secrets Manager 中,使用 MongoDB 憑證建立密碼。若要在 Secrets Manager 中建立機密,請遵循 AWS Secrets Manager 文件中建立 AWS Secrets Manager 機密中提供的教學課程。建立機密之後,請保留機密名稱 secretName,以便進行下一個步驟。

    • 在選取鍵/值組時,請使用 mongodbUser 值來建立 username 金鑰對。

      在選取鍵/值組時,請使用 mongodbPass 值來建立 password 金鑰對。

  2. 在 AWS Glue 主控台中,依照 新增 AWS Glue 連線 中的步驟建立連線。建立連線之後,請保留連線名稱 connectionName,以便未來在 AWS Glue 中使用。

    • 選取連線類型時,請選取 MongoDBMongoDB Atlas

    • 選取 MongoDB URLMongoDB Atlas URL 時,請提供 MongoDB 執行個體的主機名稱。

      MongoDB URL 會以 mongodb://mongoHost:mongoPort/mongoDBname 格式提供。

      MongoDB Atlas URL 會以 mongodb+srv://mongoHost:mongoPort/mongoDBname 格式提供。

      您可選用 mongoDBname 針對連線提供預設資料庫。

    • 如果您選擇建立 Secrets Manager 密碼,請選擇 AWS Secrets Manager 憑證類型

      然後,在 AWS 密碼中提供 secretName

    • 如果您選擇提供使用者名稱和密碼,請提供 mongodbUsermongodbPass

  3. 在下列情況中,您可能需要其他組態:

    • Amazon VPC 中託管於 AWS 的 MongoDB 執行個體

      • 您將需要向定義 MongoDB 安全憑證的 AWS Glue 連線提供 Amazon VPC 連線資訊。建立或更新連線時,請在網路選項中設定 VPC子網路安全群組

建立 AWS Glue MongoDB 連線後,您將需要執行下列動作,才能呼叫連線方法:

  • 如果您選擇建立 Secrets Manager 密碼,請授予與 AWS Glue 任務權限相關聯的 IAM 角色,以讀取 secretName

  • 在您的 AWS Glue 任務組態中,提供 connectionName 作為其他網路連線

若要在 AWS Glue for Spark 中使用 AWS Glue MongoDB 連線,請在連線方法呼叫中提供 connectionName 選項。或者,您也可依照 在 ETL 任務中使用 MongoDB 連線 中的步驟執行,以搭配 AWS Glue Data Catalog 使用連線。

使用 AWS Glue 連線從 MongoDB 讀取

先決條件:

  • 您想要讀取的 MongoDB 集合。您將需要集合的識別資訊。

    MongoDB 集合由資料庫名稱與集合名稱 mongodbNamemongodbCollection 識別。

  • 完成設定的 AWS Glue MongoDB 連線,可提供驗證資訊。完成上一個程序設定連至 MongoDB 的連線的步驟,以設定驗證資訊。您將會需要 AWS Glue 連線的名稱,connectionName

例如:

mongodb_read = glueContext.create_dynamic_frame.from_options( connection_type="mongodb", connection_options={ "connectionName": "connectionName", "database": "mongodbName", "collection": "mongodbCollection", "partitioner": "com.mongodb.spark.sql.connector.read.partitioner.SinglePartitionPartitioner", "partitionerOptions.partitionSizeMB": "10", "partitionerOptions.partitionKey": "_id", "disableUpdateUri": "false", } )

寫入 MongoDB 資料表

此範例會從現有的 DynamicFrame dynamicFrame 將資訊寫入 MongoDB。

先決條件:

  • 您想要寫入的 MongoDB 集合。您將需要集合的識別資訊。

    MongoDB 集合由資料庫名稱與集合名稱 mongodbNamemongodbCollection 識別。

  • 完成設定的 AWS Glue MongoDB 連線,可提供驗證資訊。完成上一個程序設定連至 MongoDB 的連線的步驟,以設定驗證資訊。您將會需要 AWS Glue 連線的名稱,connectionName

例如:

glueContext.write_dynamic_frame.from_options( frame=dynamicFrame, connection_type="mongodb", connection_options={ "connectionName": "connectionName", "database": "mongodbName", "collection": "mongodbCollection", "disableUpdateUri": "false", "retryWrites": "false", }, )

讀取和寫入 MongoDB 資料表

此範例會從現有的 DynamicFrame dynamicFrame 將資訊寫入 MongoDB。

先決條件:

  • 您想要讀取的 MongoDB 集合。您將需要集合的識別資訊。

    您想要寫入的 MongoDB 集合。您將需要集合的識別資訊。

    MongoDB 集合由資料庫名稱與集合名稱 mongodbNamemongodbCollection 識別。

  • MongoDB 驗證資訊 mongodbUsermongodbPassword

例如:

Python
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext, SparkConf from awsglue.context import GlueContext from awsglue.job import Job import time ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) output_path = "s3://some_bucket/output/" + str(time.time()) + "/" mongo_uri = "mongodb://<mongo-instanced-ip-address>:27017" mongo_ssl_uri = "mongodb://<mongo-instanced-ip-address>:27017" write_uri = "mongodb://<mongo-instanced-ip-address>:27017" read_mongo_options = { "uri": mongo_uri, "database": "mongodbName", "collection": "mongodbCollection", "username": "mongodbUsername", "password": "mongodbPassword", "partitioner": "MongoSamplePartitioner", "partitionerOptions.partitionSizeMB": "10", "partitionerOptions.partitionKey": "_id"} ssl_mongo_options = { "uri": mongo_ssl_uri, "database": "mongodbName", "collection": "mongodbCollection", "ssl": "true", "ssl.domain_match": "false" } write_mongo_options = { "uri": write_uri, "database": "mongodbName", "collection": "mongodbCollection", "username": "mongodbUsername", "password": "mongodbPassword", } # Get DynamicFrame from MongoDB dynamic_frame = glueContext.create_dynamic_frame.from_options(connection_type="mongodb", connection_options=read_mongo_options) # Write DynamicFrame to MongoDB glueContext.write_dynamic_frame.from_options(dynamicFrame, connection_type="mongodb", connection_options=write_mongo_options) job.commit()
Scala
import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.MappingSpec import com.amazonaws.services.glue.errors.CallSite import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.DynamicFrame import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object GlueApp { val DEFAULT_URI: String = "mongodb://<mongo-instanced-ip-address>:27017" val WRITE_URI: String = "mongodb://<mongo-instanced-ip-address>:27017" lazy val defaultJsonOption = jsonOptions(DEFAULT_URI) lazy val writeJsonOption = jsonOptions(WRITE_URI) def main(sysArgs: Array[String]): Unit = { val spark: SparkContext = new SparkContext() val glueContext: GlueContext = new GlueContext(spark) val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) // Get DynamicFrame from MongoDB val dynamicFrame: DynamicFrame = glueContext.getSource("mongodb", defaultJsonOption).getDynamicFrame() // Write DynamicFrame to MongoDB glueContext.getSink("mongodb", writeJsonOption).writeDynamicFrame(dynamicFrame) Job.commit() } private def jsonOptions(uri: String): JsonOptions = { new JsonOptions( s"""{"uri": "${uri}", |"database":"mongodbName", |"collection":"mongodbCollection", |"username": "mongodbUsername", |"password": "mongodbPassword", |"ssl":"true", |"ssl.domain_match":"false", |"partitioner": "MongoSamplePartitioner", |"partitionerOptions.partitionSizeMB": "10", |"partitionerOptions.partitionKey": "_id"}""".stripMargin) } }

MongoDB 連線選項參考

指定與 MongoDB 的連線。來源連線和接收器連線的連線選項不同。

以下連線屬性可在來源連線和接收器連線之間共用:

  • connectionName:用於讀取/寫入。完成設定之 AWS Glue MongoDB 連線的名稱,可向連線方法提供驗證和網路資訊。當 AWS Glue 連線完成設定 (如上一節 設定 MongoDB 連線 中所述) 後,提供 connectionName 將會取代提供 "uri""username""password" 連線選項的需要。

  • "uri":(必要) 讀取的 MongoDB 主機,格式為 mongodb://<host>:<port>。在 AWS Glue 4.0 之前的 AWS Glue 版本中使用。

  • "connection.uri":(必要) 讀取的 MongoDB 主機,格式為 mongodb://<host>:<port>。在 AWS Glue 4.0 及更新版本中使用。

  • "username":(必要) MongoDB 使用者名稱。

  • "password":(必要) MongoDB 密碼。

  • "database":(必要) 讀取的 MongoDB 資料庫。在任務指令碼中呼叫 glue_context.create_dynamic_frame_from_catalog 時,也可在 additional_options 中傳遞此選項。

  • "collection":(必要) 讀取的 MongoDB 集合。在任務指令碼中呼叫 glue_context.create_dynamic_frame_from_catalog 時,也可在 additional_options 中傳遞此選項。

"connectionType": "mongodb" as Source

使用下列有 "connectionType": "mongodb" 的連線選項作為來源:

  • "ssl":(選用) 如果 true,則啟用 SSL 連線。預設值為 false

  • "ssl.domain_match":(選用) 如果 truessltrue,則執行網域符合檢查。預設值為 true

  • "batchSize":(選用):每個批次傳回的文件數目,於內部批次的游標內使用。

  • "partitioner":(選用):從 MongoDB 讀取輸入資料的分割區類別名稱。連接器提供下列分割區:

    • MongoDefaultPartitioner (預設) (不支援 AWS Glue 4.0)

    • MongoSamplePartitioner (需要 MongoDB 3.2 或更新版本) (不支援 AWS Glue 4.0)

    • MongoShardedPartitioner (不支援 AWS Glue 4.0)

    • MongoSplitVectorPartitioner (不支援 AWS Glue 4.0)

    • MongoPaginateByCountPartitioner (不支援 AWS Glue 4.0)

    • MongoPaginateBySizePartitioner (不支援 AWS Glue 4.0)

    • com.mongodb.spark.sql.connector.read.partitioner.SinglePartitionPartitioner

    • com.mongodb.spark.sql.connector.read.partitioner.ShardedPartitioner

    • com.mongodb.spark.sql.connector.read.partitioner.PaginateIntoPartitionsPartitioner

  • "partitionerOptions" (選用):指定分割區的選項。每個分割區都支援下列選項:

    • MongoSamplePartitioner: partitionKey, partitionSizeMB, samplesPerPartition

    • MongoShardedPartitioner: shardkey

    • MongoSplitVectorPartitioner: partitionKey, partitionSizeMB

    • MongoPaginateByCountPartitioner: partitionKey, numberOfPartitions

    • MongoPaginateBySizePartitioner: partitionKey, partitionSizeMB

    如需有關這些選項的詳細資訊,請參閱 MongoDB 文件中的分割區組態

"connectionType": "mongodb" as Sink

使用下列有 "connectionType": "mongodb" 的連線選項作為接收器:

  • "ssl":(選用) 如果 true,則啟用 SSL 連線。預設值為 false

  • "ssl.domain_match":(選用) 如果 truessltrue,則執行網域符合檢查。預設值為 true

  • "extendedBsonTypes":(選用) 如果 true,則在寫入資料至 MongoDB 時允許延伸的 BSON 類型。預設值為 true

  • "replaceDocument":(選用) 如果 true,則在儲存包含 _id 欄位的資料集時取代整個文件。若為 false,則文件中僅有與資料集中欄位相符的欄位會更新。預設值為 true

  • "maxBatchSize":(選用):儲存資料時大量操作的批次大小上限。預設為 512。

  • "retryWrites":(選用) 如果 AWS Glue 發生網路錯誤,系統會自動重試特定寫入操作一次。