示例:设置连接类型和选项 - AWS Glue

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

示例:设置连接类型和选项

本节中的代码示例展示了如何设置连接到提取、转换、加载 (ETL) 源和接收器时所用的连接类型和连接选项。此代码展示了如何在 Python 和 Scala 中指定与 MongoDB 和 Amazon DocumentDB (with MongoDB compatibility) 的连接的连接类型和连接选项。此代码与用于连接到 AWS Glue 支持的其他数据存储的代码类似。

注意

当您创建连接到 Amazon DocumentDB 的 ETL 任务时,对于 Connections 任务属性,您必须指定一个连接对象,用于指定在其中运行 Amazon DocumentDB 的 Virtual Private Cloud(VPC)。对于该连接对象,连接类型必须为 JDBC,且 JDBC URL 必须为 mongo://<DocumentDB_host>:27017

使用 Python 进行连接

以下 Python 脚本展示了如何使用连接类型和连接选项来读写 MongoDB 和 Amazon DocumentDB。

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext, SparkConf from awsglue.context import GlueContext from awsglue.job import Job import time ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) output_path = "s3://some_bucket/output/" + str(time.time()) + "/" mongo_uri = "mongodb://<mongo-instanced-ip-address>:27017" mongo_ssl_uri = "mongodb://<mongo-instanced-ip-address>:27017" documentdb_uri = "mongodb://<mongo-instanced-ip-address>:27017" write_uri = "mongodb://<mongo-instanced-ip-address>:27017" documentdb_write_uri = "mongodb://<mongo-instanced-ip-address>:27017" read_docdb_options = { "uri": documentdb_uri, "database": "test", "collection": "coll", "username": "username", "password": "1234567890", "ssl": "true", "ssl.domain_match": "false", "partitioner": "MongoSamplePartitioner", "partitionerOptions.partitionSizeMB": "10", "partitionerOptions.partitionKey": "_id" } read_mongo_options = { "uri": mongo_uri, "database": "test", "collection": "coll", "username": "username", "password": "pwd", "partitioner": "MongoSamplePartitioner", "partitionerOptions.partitionSizeMB": "10", "partitionerOptions.partitionKey": "_id"} ssl_mongo_options = { "uri": mongo_ssl_uri, "database": "test", "collection": "coll", "ssl": "true", "ssl.domain_match": "false" } write_mongo_options = { "uri": write_uri, "database": "test", "collection": "coll", "username": "username", "password": "pwd" } write_documentdb_options = { "uri": documentdb_write_uri, "database": "test", "collection": "coll", "username": "username", "password": "pwd" } # Get DynamicFrame from MongoDB and DocumentDB dynamic_frame = glueContext.create_dynamic_frame.from_options(connection_type="mongodb", connection_options=read_mongo_options) dynamic_frame2 = glueContext.create_dynamic_frame.from_options(connection_type="documentdb", connection_options=read_docdb_options) # Write DynamicFrame to MongoDB and DocumentDB glueContext.write_dynamic_frame.from_options(dynamic_frame, connection_type="mongodb", connection_options=write_mongo_options) glueContext.write_dynamic_frame.from_options(dynamic_frame2, connection_type="documentdb", connection_options=write_documentdb_options) job.commit()

使用 Scala 进行连接

以下 Scala 脚本展示了如何使用连接类型和连接选项来读写 MongoDB 和 Amazon DocumentDB。

import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.MappingSpec import com.amazonaws.services.glue.errors.CallSite import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.DynamicFrame import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object GlueApp { val DEFAULT_URI: String = "mongodb://<mongo-instanced-ip-address>:27017" val WRITE_URI: String = "mongodb://<mongo-instanced-ip-address>:27017" val DOC_URI: String = "mongodb://<mongo-instanced-ip-address>:27017" val DOC_WRITE_URI: String = "mongodb://<mongo-instanced-ip-address>:27017" lazy val defaultJsonOption = jsonOptions(DEFAULT_URI) lazy val documentDBJsonOption = jsonOptions(DOC_URI) lazy val writeJsonOption = jsonOptions(WRITE_URI) lazy val writeDocumentDBJsonOption = jsonOptions(DOC_WRITE_URI) def main(sysArgs: Array[String]): Unit = { val spark: SparkContext = new SparkContext() val glueContext: GlueContext = new GlueContext(spark) val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) // Get DynamicFrame from MongoDB and DocumentDB val resultFrame: DynamicFrame = glueContext.getSource("mongodb", defaultJsonOption).getDynamicFrame() val resultFrame2: DynamicFrame = glueContext.getSource("documentdb", documentDBJsonOption).getDynamicFrame() // Write DynamicFrame to MongoDB and DocumentDB glueContext.getSink("mongodb", writeJsonOption).writeDynamicFrame(resultFrame) glueContext.getSink("documentdb", writeJsonOption).writeDynamicFrame(resultFrame2) Job.commit() } private def jsonOptions(uri: String): JsonOptions = { new JsonOptions( s"""{"uri": "${uri}", |"database":"test", |"collection":"coll", |"username": "username", |"password": "pwd", |"ssl":"true", |"ssl.domain_match":"false", |"partitioner": "MongoSamplePartitioner", |"partitionerOptions.partitionSizeMB": "10", |"partitionerOptions.partitionKey": "_id"}""".stripMargin) } }

有关更多信息,请参阅下列内容: