JDBC 연결 - AWS Glue

JDBC 연결

일반적으로 특정 관계형 데이터베이스 유형은 JDBC 표준을 통한 연결을 지원합니다. JDBC에 대한 자세한 내용은 Java JDBC API 설명서를 참조하세요. AWS Glue는 기본적으로 JDBC 커넥터를 통해 특정 데이터베이스 연결을 지원합니다. JDBC 라이브러리는 AWS Glue Spark 작업에서 제공됩니다. AWS Glue 라이브러리를 사용하여 이러한 데이터베이스 유형에 연결할 때 표준 옵션 세트에 액세스할 수 있습니다.

JDBC connectionType 값은 다음과 같습니다.

  • "connectionType": "sqlserver": Microsoft SQL Server 데이터베이스에 연결을 지정합니다.

  • "connectionType": "mysql": MySQL 데이터베이스에 대한 연결을 지정합니다.

  • "connectionType": "oracle": Oracle Database에 연결을 지정합니다.

  • "connectionType": "postgresql": PostgreSQL 데이터베이스에 대한 연결을 지정합니다.

  • "connectionType": "redshift": Amazon Redshift 데이터베이스에 대한 연결을 지정합니다. 자세한 내용은 Redshift 연결 단원을 참조하십시오.

다음 표에는 AWS Glue에서 지원되는 JDBC 드라이버 버전 목록이 있습니다.

제품 Glue 4.0용 JDBC 드라이버 버전 Glue 3.0용 JDBC 드라이버 버전 Glue 0.9, 1.0, 2.0용 JDBC 드라이버 버전
Microsoft SQL Server 9.4.0 7.x 6.x
MySQL 8.0.23 8.0.23 5.1
Oracle Database 21.7 21.1 11.2
PostgreSQL 42.3.6 42.2.18 42.1.x
MongoDB 4.7.2 4.0.0 2.0.0
Amazon Redshift * redshift-jdbc42-2.1.0.16 redshift-jdbc41-1.2.12.1017 redshift-jdbc41-1.2.12.1017

* Amazon Redshift 연결 유형의 경우 형식 지정 옵션을 비롯하여 JDBC 연결에 대한 연결 옵션에 포함된 다른 모든 옵션 이름 및 값 페어는 기본 SparkSQL DataSource에 직접 전달됩니다. AWS Glue 4.0 이상 버전의 Spark 작업을 포함하는 AWS Glue에서 Amazon Redshift용 AWS Glue 네이티브 커넥터는 Apache Spark용 Amazon Redshift 통합을 사용합니다. 자세한 내용은 Apache Spark용 Amazon Redshift 통합을 참조하세요. 이전 버전인 경우 Amazon Redshift data source for Spark를 참조하세요.

JDBC를 사용하여 Amazon RDS 데이터 스토어에 연결하도록 Amazon VPC를 구성하려면 AWS Glue에서 Amazon RDS 데이터 스토어에 대해 JDBC를 연결하도록 Amazon VPC 설정 섹션을 참조하세요.

참고

AWS Glue 작업은 실행 중에 하나의 서브넷에만 연결됩니다. 이로 인해 동일한 작업을 통해 여러 데이터 소스에 연결하는 기능이 영향을 받을 수 있습니다. 이 동작은 JDBC 소스에만 국한되지 않습니다.

JDBC 연결 옵션 참조

이미 JDBC AWS Glue 연결을 정의한 경우 URL, 사용자 및 암호와 같은 JDBC 연결에 정의된 구성 속성을 재사용할 수 있으므로 코드에서 연결 옵션으로 지정할 필요가 없습니다. 이 기능은 AWS Glue 3.0 이상 버전에서 사용할 수 있습니다. 이렇게 하려면 다음 연결 속성을 사용합니다.

  • "useConnectionProperties": 연결에서 구성을 사용하려는 경우 이 값을 'true'로 설정합니다.

  • "connectionName": 구성을 검색할 연결 이름을 입력합니다. 연결은 작업과 동일한 영역에 정의되어 있어야 합니다.

이 연결 옵션을 JDBC 연결에 사용하십시오.

  • "url": (필수) 데이터베이스의 JDBC URL.

  • "dbtable": (필수) 읽을 데이터베이스 테이블입니다. 데이터베이스 내의 스키마를 지원하는 JDBC 데이터 스토어의 경우 schema.table-name에 대해 지정합니다. 스키마가 제공되지 않으면 기본 "퍼블릭" 스키마가 사용됩니다.

  • "user": (필수 사항) 연결할 때 사용할 사용자 이름입니다.

  • "password": (필수) 연결할 때 사용할 암호.

  • (선택 사항) 다음 옵션을 사용하면 사용자 정의 JDBC 드라이버를 제공할 수 있습니다. AWS Glue에서 기본적으로 지원하지 않는 드라이버를 사용해야 한다면 이 옵션을 사용하십시오.

    ETL 작업은 원본과 대상이 같은 데이터베이스 제품이어도 데이터 원본과 대상에 여러 JDBC 드라이버 버전을 사용할 수 있습니다. 따라서 버전이 다른 원본 데이터베이스와 대상 데이터베이스 간에 데이터를 마이그레이션할 수 있습니다. 이 옵션을 사용하려면 먼저 Amazon S3에 JDBC 드라이버의 JAR 파일을 업로드해야 합니다.

    • "customJdbcDriverS3Path": 사용자 정의 JDBC 드라이버의 Amazon S3 경로입니다.

    • "customJdbcDriverClassName": JDBC 드라이버의 클래스 이름입니다.

  • "bulkSize": (선택 사항) JDBC 대상에 신속하게 대량 로드하기 위해 병렬 삽입을 구성하는 데 사용됩니다. 데이터를 쓰거나 삽입할 때 사용할 병렬 처리 수준의 정수 값을 지정합니다. 이 옵션은 Arch User Repository(AUR)와 같은 데이터베이스에 대한 쓰기 성능을 향상시키는 데 유용합니다.

  • "hashfield"(선택 사항) JDBC 테이블에서 병렬로 읽을 때 데이터를 파티션으로 구분하는 데 사용할 JDBC 테이블의 열 이름을 지정하기 위해 사용하는 문자열입니다. 'hashfield' 또는 'hashexpression'을 제공합니다. 자세한 내용은 JDBC 테이블을 병렬로 읽기 단원을 참조하십시오.

  • "hashexpression"(선택 사항) 정수를 반환하는 SQL select 절입니다. JDBC 테이블에서 병렬로 읽을 때 JDBC 테이블의 데이터를 파티션으로 구분하는 데 사용됩니다. 'hashfield' 또는 'hashexpression'을 제공합니다. 자세한 내용은 JDBC 테이블을 병렬로 읽기 단원을 참조하십시오.

  • "hashpartitions"(선택 사항) 양의 정수입니다. JDBC 테이블에서 병렬로 읽을 때 JDBC 테이블의 병렬 읽기 수를 지정하는 데 사용됩니다. 기본값: 7. 자세한 내용은 JDBC 테이블을 병렬로 읽기 단원을 참조하십시오.

  • "sampleQuery": (선택 사항) 사용자 지정 SQL 쿼리 문입니다. 테이블의 정보 하위 세트를 지정하여 테이블 콘텐츠의 샘플을 검색하는 데 사용됩니다. 데이터를 고려하지 않고 구성하면 DynamicFrame 메서드보다 효율성이 떨어져 시간 초과나 메모리 부족 오류가 발생할 수 있습니다. 자세한 내용은 sampleQuery 사용 단원을 참조하십시오.

  • "enablePartitioningForSampleQuery": (선택 사항) 부울입니다. 기본값: false. sampleQuery를 지정할 때 JDBC 테이블에서 병렬 읽기를 활성화하는 데 사용됩니다. true로 설정된 경우 AWS Glue에서 파티셔닝 조건을 추가하려면 sampleQuery가 'where' 또는 'and'로 끝나야 합니다. 자세한 내용은 sampleQuery 사용 단원을 참조하십시오.

  • "sampleSize": (선택 사항) 양의 정수입니다. 샘플 쿼리에서 반환되는 행의 수를 제한합니다. enablePartitioningForSampleQuery가 true인 경우에만 작동합니다. 파티셔닝이 활성화되지 않은 경우 대신 sampleQuery에서 "limit x"를 직접 추가하여 크기를 제한해야 합니다. 자세한 내용은 sampleQuery 사용 단원을 참조하십시오.

sampleQuery 사용

이 섹션에서는 sampleQuery, enablePartitioningForSampleQuerysampleSize 사용 방법을 설명합니다.

sampleQuery는 데이터 세트의 몇 개 행을 샘플링할 때 효율적입니다. 기본적으로 쿼리는 단일 실행기에 의해 실행됩니다. 데이터를 고려하지 않고 구성하면 DynamicFrame 메서드보다 효율성이 떨어져 시간 초과나 메모리 부족 오류가 발생할 수 있습니다. ETL 파이프라인의 일부로 기본 데이터베이스에서 SQL을 실행하는 방법은 일반적으로 성능 목적으로만 필요합니다. 데이터 세트의 몇 개 행을 미리 보려는 경우 show 사용을 고려합니다. SQL을 사용하여 데이터 세트를 변환하려는 경우 DataFrame 양식의 데이터에 대해 SparkSQL 변환을 정의하는 데 toDF 사용을 고려합니다.

쿼리가 다양한 테이블을 조작할 수 있지만 dbtable이 여전히 필요합니다.

sampleQuery를 사용하여 테이블 샘플 검색

기본 sampleQuery 동작을 사용하여 데이터 샘플을 검색할 때 AWS Glue는 뛰어난 처리량을 기대하지 않으므로 단일 실행기에서 쿼리를 실행합니다. 제공하는 데이터를 제한하고 성능 문제를 일으키지 않으려면 SQL에 LIMIT 절을 제공하는 것이 좋습니다.

예 분할 없이 SampleQuery 사용

다음 코드 예제에서는 분할 없이 sampleQuery를 사용하는 방법을 보여줍니다.

//A full sql query statement. val query = "select name from $tableName where age > 0 limit 1" val connectionOptions = JsonOptions(Map( "url" -> url, "dbtable" -> tableName, "user" -> user, "password" -> password, "sampleQuery" -> query )) val dyf = glueContext.getSource("mysql", connectionOptions) .getDynamicFrame()

대규모 데이터 세트에서 sampleQuery 사용

대규모 데이터 세트를 읽는 경우 JDBC 분할을 활성화하여 테이블을 병렬로 쿼리해야 할 수 있습니다. 자세한 내용은 JDBC 테이블을 병렬로 읽기 단원을 참조하십시오. JDBC 파티셔닝과 함께 sampleQuery를 사용하려면 enablePartitioningForSampleQuery를 true로 설정합니다. 이 기능을 활성화하려면 sampleQuery를 일부 변경해야 합니다.

sampleQuery와 함께 JDBC 파티셔닝을 사용하는 경우 AWS Glue에서 파티셔닝 조건을 추가하려면 쿼리가 'where' 또는 'and'로 끝나야 합니다.

JDBC 테이블에서 병렬로 읽을 때 sampleQuery의 결과를 제한하려면 LIMIT 절을 지정하는 대신, "sampleSize" 파라미터를 설정합니다.

예 JDBC 분할과 함께 sampleQuery 사용

다음 코드 예제는 JDBC 분할과 함께 sampleQuery를 사용하는 방법을 보여줍니다.

//note that the query should end with "where" or "and" if use with JDBC partitioning. val query = "select name from $tableName where age > 0 and" //Enable JDBC partitioning by setting hashfield. //to use sampleQuery with partitioning, set enablePartitioningForSampleQuery. //use sampleSize to limit the size of returned data. val connectionOptions = JsonOptions(Map( "url" -> url, "dbtable" -> tableName, "user" -> user, "password" -> password, "hashfield" -> primaryKey, "sampleQuery" -> query, "enablePartitioningForSampleQuery" -> true, "sampleSize" -> "1" )) val dyf = glueContext.getSource("mysql", connectionOptions) .getDynamicFrame()

규칙 및 제한:

샘플 쿼리는 작업 북마크와 함께 사용할 수 없습니다. 두 가지 모두에 대한 구성이 제공되면 북마크 상태는 무시됩니다.

사용자 지정 JDBC 드라이버 사용

다음 코드 예제에서는 사용자 지정 JDBC 드라이버로 JDBC 데이터베이스에서 읽고 쓰는 방법을 보여줍니다. 한 버전의 데이터베이스 제품에서 읽고 같은 제품의 나중 버전에 쓰는 작업을 볼 수 있습니다.

Python
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext, SparkConf from awsglue.context import GlueContext from awsglue.job import Job import time from pyspark.sql.types import StructType, StructField, IntegerType, StringType sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session # Construct JDBC connection options connection_mysql5_options = { "url": "jdbc:mysql://<jdbc-host-name>:3306/db", "dbtable": "test", "user": "admin", "password": "pwd"} connection_mysql8_options = { "url": "jdbc:mysql://<jdbc-host-name>:3306/db", "dbtable": "test", "user": "admin", "password": "pwd", "customJdbcDriverS3Path": "s3://DOC-EXAMPLE-BUCKET/mysql-connector-java-8.0.17.jar", "customJdbcDriverClassName": "com.mysql.cj.jdbc.Driver"} connection_oracle11_options = { "url": "jdbc:oracle:thin:@//<jdbc-host-name>:1521/ORCL", "dbtable": "test", "user": "admin", "password": "pwd"} connection_oracle18_options = { "url": "jdbc:oracle:thin:@//<jdbc-host-name>:1521/ORCL", "dbtable": "test", "user": "admin", "password": "pwd", "customJdbcDriverS3Path": "s3://DOC-EXAMPLE-BUCKET/ojdbc10.jar", "customJdbcDriverClassName": "oracle.jdbc.OracleDriver"} # Read from JDBC databases with custom driver df_mysql8 = glueContext.create_dynamic_frame.from_options(connection_type="mysql", connection_options=connection_mysql8_options) # Read DynamicFrame from MySQL 5 and write to MySQL 8 df_mysql5 = glueContext.create_dynamic_frame.from_options(connection_type="mysql", connection_options=connection_mysql5_options) glueContext.write_from_options(frame_or_dfc=df_mysql5, connection_type="mysql", connection_options=connection_mysql8_options) # Read DynamicFrame from Oracle 11 and write to Oracle 18 df_oracle11 = glueContext.create_dynamic_frame.from_options(connection_type="oracle", connection_options=connection_oracle11_options) glueContext.write_from_options(frame_or_dfc=df_oracle11, connection_type="oracle", connection_options=connection_oracle18_options)
Scala
import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.MappingSpec import com.amazonaws.services.glue.errors.CallSite import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.DynamicFrame import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object GlueApp { val MYSQL_5_URI: String = "jdbc:mysql://<jdbc-host-name>:3306/db" val MYSQL_8_URI: String = "jdbc:mysql://<jdbc-host-name>:3306/db" val ORACLE_11_URI: String = "jdbc:oracle:thin:@//<jdbc-host-name>:1521/ORCL" val ORACLE_18_URI: String = "jdbc:oracle:thin:@//<jdbc-host-name>:1521/ORCL" // Construct JDBC connection options lazy val mysql5JsonOption = jsonOptions(MYSQL_5_URI) lazy val mysql8JsonOption = customJDBCDriverJsonOptions(MYSQL_8_URI, "s3://DOC-EXAMPLE-BUCKET/mysql-connector-java-8.0.17.jar", "com.mysql.cj.jdbc.Driver") lazy val oracle11JsonOption = jsonOptions(ORACLE_11_URI) lazy val oracle18JsonOption = customJDBCDriverJsonOptions(ORACLE_18_URI, "s3://DOC-EXAMPLE-BUCKET/ojdbc10.jar", "oracle.jdbc.OracleDriver") def main(sysArgs: Array[String]): Unit = { val spark: SparkContext = new SparkContext() val glueContext: GlueContext = new GlueContext(spark) val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) // Read from JDBC database with custom driver val df_mysql8: DynamicFrame = glueContext.getSource("mysql", mysql8JsonOption).getDynamicFrame() // Read DynamicFrame from MySQL 5 and write to MySQL 8 val df_mysql5: DynamicFrame = glueContext.getSource("mysql", mysql5JsonOption).getDynamicFrame() glueContext.getSink("mysql", mysql8JsonOption).writeDynamicFrame(df_mysql5) // Read DynamicFrame from Oracle 11 and write to Oracle 18 val df_oracle11: DynamicFrame = glueContext.getSource("oracle", oracle11JsonOption).getDynamicFrame() glueContext.getSink("oracle", oracle18JsonOption).writeDynamicFrame(df_oracle11) Job.commit() } private def jsonOptions(url: String): JsonOptions = { new JsonOptions( s"""{"url": "${url}", |"dbtable":"test", |"user": "admin", |"password": "pwd"}""".stripMargin) } private def customJDBCDriverJsonOptions(url: String, customJdbcDriverS3Path: String, customJdbcDriverClassName: String): JsonOptions = { new JsonOptions( s"""{"url": "${url}", |"dbtable":"test", |"user": "admin", |"password": "pwd", |"customJdbcDriverS3Path": "${customJdbcDriverS3Path}", |"customJdbcDriverClassName" : "${customJdbcDriverClassName}"}""".stripMargin) } }