Ejemplos: configuración de tipos y opciones de conexión - AWS Glue

Ejemplos: configuración de tipos y opciones de conexión

El código de muestra de esta sección ilustra cómo establecer tipos de conexión y opciones de conexión cuando se conecta a orígenes y receptores de extracción, transformación y carga (ETL). El código muestra cómo especificar tipos de conexión y opciones de conexión en Python y Scala para conexiones a MongoDB y Amazon DocumentDB (compatible con MongoDB). El código es similar para conectarse a otros almacenes de datos compatibles con AWS Glue.

nota

Cuando crea un trabajo de ETL que se conecta a Amazon DocumentDB, para la propiedad del trabajo Connections, debe designar un objeto de conexión que especifique la nube privada virtual (VPC) en la que se ejecuta Amazon DocumentDB. Para el objeto de conexión, el tipo de conexión debe ser JDBC y el JDBC URL debe ser mongo://<DocumentDB_host>:27017.

Conexión con Python

El siguiente script de Python muestra el uso de tipos de conexión y opciones de conexión para leer y escribir en MongoDB y Amazon DocumentDB.

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext, SparkConf from awsglue.context import GlueContext from awsglue.job import Job import time ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) output_path = "s3://some_bucket/output/" + str(time.time()) + "/" mongo_uri = "mongodb://<mongo-instanced-ip-address>:27017" mongo_ssl_uri = "mongodb://<mongo-instanced-ip-address>:27017" documentdb_uri = "mongodb://<mongo-instanced-ip-address>:27017" write_uri = "mongodb://<mongo-instanced-ip-address>:27017" documentdb_write_uri = "mongodb://<mongo-instanced-ip-address>:27017" read_docdb_options = { "uri": documentdb_uri, "database": "test", "collection": "coll", "username": "username", "password": "1234567890", "ssl": "true", "ssl.domain_match": "false", "partitioner": "MongoSamplePartitioner", "partitionerOptions.partitionSizeMB": "10", "partitionerOptions.partitionKey": "_id" } read_mongo_options = { "uri": mongo_uri, "database": "test", "collection": "coll", "username": "username", "password": "pwd", "partitioner": "MongoSamplePartitioner", "partitionerOptions.partitionSizeMB": "10", "partitionerOptions.partitionKey": "_id"} ssl_mongo_options = { "uri": mongo_ssl_uri, "database": "test", "collection": "coll", "ssl": "true", "ssl.domain_match": "false" } write_mongo_options = { "uri": write_uri, "database": "test", "collection": "coll", "username": "username", "password": "pwd" } write_documentdb_options = { "uri": documentdb_write_uri, "database": "test", "collection": "coll", "username": "username", "password": "pwd" } # Get DynamicFrame from MongoDB and DocumentDB dynamic_frame = glueContext.create_dynamic_frame.from_options(connection_type="mongodb", connection_options=read_mongo_options) dynamic_frame2 = glueContext.create_dynamic_frame.from_options(connection_type="documentdb", connection_options=read_docdb_options) # Write DynamicFrame to MongoDB and DocumentDB glueContext.write_dynamic_frame.from_options(dynamic_frame, connection_type="mongodb", connection_options=write_mongo_options) glueContext.write_dynamic_frame.from_options(dynamic_frame2, connection_type="documentdb", connection_options=write_documentdb_options) job.commit()

Conexión con Scala

El siguiente script de Scala muestra el uso de tipos de conexión y opciones de conexión para leer y escribir en MongoDB y Amazon DocumentDB.

import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.MappingSpec import com.amazonaws.services.glue.errors.CallSite import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.DynamicFrame import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object GlueApp { val DEFAULT_URI: String = "mongodb://<mongo-instanced-ip-address>:27017" val WRITE_URI: String = "mongodb://<mongo-instanced-ip-address>:27017" val DOC_URI: String = "mongodb://<mongo-instanced-ip-address>:27017" val DOC_WRITE_URI: String = "mongodb://<mongo-instanced-ip-address>:27017" lazy val defaultJsonOption = jsonOptions(DEFAULT_URI) lazy val documentDBJsonOption = jsonOptions(DOC_URI) lazy val writeJsonOption = jsonOptions(WRITE_URI) lazy val writeDocumentDBJsonOption = jsonOptions(DOC_WRITE_URI) def main(sysArgs: Array[String]): Unit = { val spark: SparkContext = new SparkContext() val glueContext: GlueContext = new GlueContext(spark) val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) // Get DynamicFrame from MongoDB and DocumentDB val resultFrame: DynamicFrame = glueContext.getSource("mongodb", defaultJsonOption).getDynamicFrame() val resultFrame2: DynamicFrame = glueContext.getSource("documentdb", documentDBJsonOption).getDynamicFrame() // Write DynamicFrame to MongoDB and DocumentDB glueContext.getSink("mongodb", writeJsonOption).writeDynamicFrame(resultFrame) glueContext.getSink("documentdb", writeJsonOption).writeDynamicFrame(resultFrame2) Job.commit() } private def jsonOptions(uri: String): JsonOptions = { new JsonOptions( s"""{"uri": "${uri}", |"database":"test", |"collection":"coll", |"username": "username", |"password": "pwd", |"ssl":"true", |"ssl.domain_match":"false", |"partitioner": "MongoSamplePartitioner", |"partitionerOptions.partitionSizeMB": "10", |"partitionerOptions.partitionKey": "_id"}""".stripMargin) } }

Para obtener más información, consulte los siguientes: