예: 연결 유형 및 옵션 설정 - AWS Glue

예: 연결 유형 및 옵션 설정

이 섹션의 샘플 코드에서는 ETL(추출, 변환 및 로드) 소스 및 싱크에 연결할 때 연결 유형 및 연결 옵션을 설정하는 방법을 보여 줍니다. 이 코드는 MongoDB 및 Amazon DocumentDB(MongoDB와 호환)에 연결할 때 Python과 Scala 모두의 연결 유형 및 연결 옵션을 지정하는 방법을 보여줍니다. 이 코드는 AWS Glue에서 지원하는 다른 데이터 스토어에 연결하는 것과 유사합니다.

참고

Amazon DocumentDB에 연결하는 ETL 작업을 생성할 때 Connections 작업 속성에 대해 Amazon DocumentDB가 실행 중인 Virtual Private Cloud(VPC)를 지정하는 연결 객체를 지정해야 합니다. 연결 객체의 경우 연결 유형은 JDBC여야 하며 JDBC URLmongo://<DocumentDB_host>:27017이어야 합니다.

Python으로 연결

다음 Python 스크립트는 MongoDB 및 Amazon DocumentDB를 읽고 쓰기 위한 연결 유형 및 연결 옵션을 사용하는 방법을 보여줍니다.

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext, SparkConf from awsglue.context import GlueContext from awsglue.job import Job import time ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) output_path = "s3://some_bucket/output/" + str(time.time()) + "/" mongo_uri = "mongodb://<mongo-instanced-ip-address>:27017" mongo_ssl_uri = "mongodb://<mongo-instanced-ip-address>:27017" documentdb_uri = "mongodb://<mongo-instanced-ip-address>:27017" write_uri = "mongodb://<mongo-instanced-ip-address>:27017" documentdb_write_uri = "mongodb://<mongo-instanced-ip-address>:27017" read_docdb_options = { "uri": documentdb_uri, "database": "test", "collection": "coll", "username": "username", "password": "1234567890", "ssl": "true", "ssl.domain_match": "false", "partitioner": "MongoSamplePartitioner", "partitionerOptions.partitionSizeMB": "10", "partitionerOptions.partitionKey": "_id" } read_mongo_options = { "uri": mongo_uri, "database": "test", "collection": "coll", "username": "username", "password": "pwd", "partitioner": "MongoSamplePartitioner", "partitionerOptions.partitionSizeMB": "10", "partitionerOptions.partitionKey": "_id"} ssl_mongo_options = { "uri": mongo_ssl_uri, "database": "test", "collection": "coll", "ssl": "true", "ssl.domain_match": "false" } write_mongo_options = { "uri": write_uri, "database": "test", "collection": "coll", "username": "username", "password": "pwd" } write_documentdb_options = { "uri": documentdb_write_uri, "database": "test", "collection": "coll", "username": "username", "password": "pwd" } # Get DynamicFrame from MongoDB and DocumentDB dynamic_frame = glueContext.create_dynamic_frame.from_options(connection_type="mongodb", connection_options=read_mongo_options) dynamic_frame2 = glueContext.create_dynamic_frame.from_options(connection_type="documentdb", connection_options=read_docdb_options) # Write DynamicFrame to MongoDB and DocumentDB glueContext.write_dynamic_frame.from_options(dynamic_frame, connection_type="mongodb", connection_options=write_mongo_options) glueContext.write_dynamic_frame.from_options(dynamic_frame2, connection_type="documentdb", connection_options=write_documentdb_options) job.commit()

Scala로 연결

다음 Scala 스크립트는 MongoDB 및 Amazon DocumentDB를 읽고 쓰기 위한 연결 유형 및 연결 옵션을 사용하는 방법을 보여줍니다.

import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.MappingSpec import com.amazonaws.services.glue.errors.CallSite import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.DynamicFrame import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object GlueApp { val DEFAULT_URI: String = "mongodb://<mongo-instanced-ip-address>:27017" val WRITE_URI: String = "mongodb://<mongo-instanced-ip-address>:27017" val DOC_URI: String = "mongodb://<mongo-instanced-ip-address>:27017" val DOC_WRITE_URI: String = "mongodb://<mongo-instanced-ip-address>:27017" lazy val defaultJsonOption = jsonOptions(DEFAULT_URI) lazy val documentDBJsonOption = jsonOptions(DOC_URI) lazy val writeJsonOption = jsonOptions(WRITE_URI) lazy val writeDocumentDBJsonOption = jsonOptions(DOC_WRITE_URI) def main(sysArgs: Array[String]): Unit = { val spark: SparkContext = new SparkContext() val glueContext: GlueContext = new GlueContext(spark) val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) // Get DynamicFrame from MongoDB and DocumentDB val resultFrame: DynamicFrame = glueContext.getSource("mongodb", defaultJsonOption).getDynamicFrame() val resultFrame2: DynamicFrame = glueContext.getSource("documentdb", documentDBJsonOption).getDynamicFrame() // Write DynamicFrame to MongoDB and DocumentDB glueContext.getSink("mongodb", writeJsonOption).writeDynamicFrame(resultFrame) glueContext.getSink("documentdb", writeJsonOption).writeDynamicFrame(resultFrame2) Job.commit() } private def jsonOptions(uri: String): JsonOptions = { new JsonOptions( s"""{"uri": "${uri}", |"database":"test", |"collection":"coll", |"username": "username", |"password": "pwd", |"ssl":"true", |"ssl.domain_match":"false", |"partitioner": "MongoSamplePartitioner", |"partitionerOptions.partitionSizeMB": "10", |"partitionerOptions.partitionKey": "_id"}""".stripMargin) } }

자세한 내용은 다음 자료를 참조하세요.