JDBC 连接 - AWS Glue

JDBC 连接

某些(通常是关系型)数据库类型支持通过 JDBC 标准进行连接。有关 JDBC 的更多信息,请参阅 Java JDBC API 文档。AWSGlue 原生支持通过 JDBC 连接器连接到某些数据库,JDBC 库在 AWS Glue Spark 作业中提供。使用 AWS Glue 库连接到这些数据库类型时,您可以访问一组标准选项。

JDBC connectionType 值包括:

  • "connectionType": "sqlserver":指定与 Microsoft SQL Server 数据库的连接。

  • "connectionType": "mysql":指定与 MySQL 数据库的连接。

  • "connectionType": "oracle":指定与 Oracle 数据库的连接。

  • "connectionType": "postgresql":指定与 PostgreSQL 数据库的连接。

  • "connectionType": "redshift":指定与 Amazon Redshift 数据库的连接。有关更多信息,请参阅 Redshift 连接

下表列出了 AWS Glue 支持的 JDBC 驱动程序版本。

产品 Glue 4.0 的 JDBC 驱动程序版本 Glue 3.0 的 JDBC 驱动程序版本 Glue 0.9、1.0、2.0 的 JDBC 驱动程序版本
Microsoft SQL Server 9.4.0 7.x 6.x
MySQL 8.0.23 8.0.23 5.1
Oracle Database 21.7 21.1 11.2
PostgreSQL 42.3.6 42.2.18 42.1.x
MongoDB 4.7.2 4.0.0 2.0.0
Amazon Redshift * redshift-jdbc42-2.1.0.16 redshift-jdbc41-1.2.12.1017 redshift-jdbc41-1.2.12.1017

* 如果是 Amazon Redshift 连接类型,用于 JDBC 连接的连接选项中包含的所有其他选项名称/值对(包括格式选项)将直接传递到底层 SparkSQL DataSource。在 AWS Glue 4.0 及更高版本的 AWS Glue with Spark 作业中,Amazon Redshift 的 AWS Glue 原生连接器使用适用于 Apache Spark 的 Amazon Redshift 集成。有关更多信息,请参阅 Amazon Redshift integration for Apache Spark。在之前的版本中,请参阅 Amazon Redshift data source for Spark

要将您的 Amazon VPC 配置为使用 JDBC 连接到 Amazon RDS 数据存储,请参阅 设置 Amazon VPC 以建立从 AWS Glue 到 Amazon RDS 数据存储的 JDBC 连接

注意

AWS Glue 作业在一次运行期间仅与一个子网关联。这可能会影响您使用同一作业连接到多个数据来源。此行为不仅限于 JDBC 源。

JDBC 连接选项参考

如果您已经定义了 JDBC AWS Glue 连接,则可以重复使用其中定义的配置属性,例如:url、用户和密码;不必在代码中将它们指定为连接选项。此功能只在 AWS Glue 3.0 及更高版本中提供。为此,请使用以下连接属性:

  • "useConnectionProperties":设置为 "true" 以表示您要使用连接中的配置。

  • "connectionName":输入要从中检索配置的连接名称,必须在与作业相同的区域中定义连接。

将这些连接选项与 JDBC 连接结合使用:

  • "url":(必填)数据库的 JDBC URL。

  • "dbtable":(必需)要从中读取数据的数据库表。对于在数据库中支持架构的 JDBC 数据存储,指定 schema.table-name。如果未提供架构,则使用默认的“public”架构。

  • "user":(必需)在连接时使用的用户名。

  • "password":(必填)在连接时使用的密码。

  • (可选)以下选项允许您提供自定义 JDBC 驱动程序。如果必须使用 AWS Glue 本身不支持的驱动程序,请使用这些选项。

    ETL 作业可以为数据源和目标使用不同的 JDBC 驱动程序版本,即使源和目标是相同的数据库产品也是如此。这允许您在不同版本的源数据库和目标数据库之间迁移数据。要使用这些选项,您必须首先将 JDBC 驱动程序的 JAR 文件上传到 Amazon S3。

    • "customJdbcDriverS3Path":自定义 JDBC 驱动程序的 Amazon S3 路径。

    • "customJdbcDriverClassName":JDBC 驱动程序的类名。

  • "bulkSize":(可选)用于配置并行插入以加速批量加载到 JDBC 目标。为写入或插入数据时要使用的并行度指定整数值。此选项有助于提高写入数据库(如 Arch User Repository (AUR))的性能。

  • "hashfield"(可选)一个字符串,用于指定 JDBC 表中一列的名称,用于在并行读取 JDBC 表时将数据划分为多个分区。提供“哈希字段”或“哈希表达式”。有关更多信息,请参阅 从 JDBC 表并行读取

  • "hashexpression"(可选)返回整数的 SQL 选择子句。用于在并行读取 JDBC 表时将 JDBC 表中的数据划分为多个分区。提供“哈希字段”或“哈希表达式”。有关更多信息,请参阅 从 JDBC 表并行读取

  • "hashpartitions"(可选)正整数。用于指定并行读取 JDBC 表时对 JDBC 表进行并行读取的次数。默认值:7。有关更多信息,请参阅 从 JDBC 表并行读取

  • "sampleQuery":(可选)自定义 SQL 查询语句。用于指定表中信息的子集以检索表格内容的样本。如果不考虑您的数据进行配置,则其效率可能低于 DynamicFrame 方法,从而导致超时或内存不足错误。有关更多信息,请参阅 使用 sampleQuery

  • "enablePartitioningForSampleQuery":(可选)布尔值。默认值:false。用于在指定 sampleQuery 时并行读取 JDBC 表。如果设置为 true,sampleQuery 必须以“where”或“and”结尾,以便于 AWS Glue 追加分区条件。有关更多信息,请参阅 使用 sampleQuery

  • "sampleSize":(可选)正整数。限制示例查询返回的行数。仅当 enablePartitioningForSampleQuery 为 true 时有用。如果未启用分区,则应直接在 sampleQuery 中添加 "limit x" 以限制大小。有关更多信息,请参阅 使用 sampleQuery

使用 sampleQuery

本部分介绍如何设置 sampleQuerysampleSizeenablePartitioningForSampleQuery

sampleQuery 是对数据集的几行进行采样的有效方法。默认情况下,查询由单个执行程序执行。如果不考虑您的数据进行配置,则其效率可能低于 DynamicFrame 方法,从而导致超时或内存不足错误。通常仅出于性能考虑,才需要在底层数据库上运行 SQL 作为 ETL 管道的一部分。如果试图预览数据集的几行,请考虑使用 show。如果您试图使用 SQL 转换数据集,请考虑使用 toDF,在 DataFrame 表单中针对您的数据定义 SparkSQL 转换。

虽然您的查询可能会处理各种表,但 dbtable 仍然是必需的。

使用 sampleQuery 检索表的样本

当使用默认 sampleQuery 行为检索数据样本时,AWS Glue 预计吞吐量不会很大,因此它会在单个执行程序上运行查询。为了限制您提供的数据,以便不会导致性能问题,我们建议您为 SQL 提供一个 LIMIT 子句。

例 在不进行分区的情况下使用 samplQuery

以下代码示例演示了如何在不进行分区的情况下使用 sampleQuery

//A full sql query statement. val query = "select name from $tableName where age > 0 limit 1" val connectionOptions = JsonOptions(Map( "url" -> url, "dbtable" -> tableName, "user" -> user, "password" -> password, "sampleQuery" -> query )) val dyf = glueContext.getSource("mysql", connectionOptions) .getDynamicFrame()

对较大的数据集使用 sampleQuery

如果您读取的是大型数据集,则可能需要启用 JDBC 分区才能并行查询表。有关更多信息,请参阅 从 JDBC 表并行读取。要将 sampleQuery 与 JDBC 分区一同使用,请将 enablePartitioningForSampleQuery 设置为 true。启用此功能需要您对自己的 sampleQuery 进行一些更改。

将 JDBC 分区与 sampleQuery 一起使用时,查询必须以“where”或“and”结尾,以便于 AWS Glue 附加分区条件。

如果您想在并行读取 JDBC 表时限制 sampleQuery 的结果,请设置 "sampleSize" 参数而不是指定 LIMIT 子句。

例 将 sampleQuery 与 JDBC 分区一起使用

以下代码示例演示了如何将 sampleQuery 与 JDBC 分区一起使用。

//note that the query should end with "where" or "and" if use with JDBC partitioning. val query = "select name from $tableName where age > 0 and" //Enable JDBC partitioning by setting hashfield. //to use sampleQuery with partitioning, set enablePartitioningForSampleQuery. //use sampleSize to limit the size of returned data. val connectionOptions = JsonOptions(Map( "url" -> url, "dbtable" -> tableName, "user" -> user, "password" -> password, "hashfield" -> primaryKey, "sampleQuery" -> query, "enablePartitioningForSampleQuery" -> true, "sampleSize" -> "1" )) val dyf = glueContext.getSource("mysql", connectionOptions) .getDynamicFrame()

注释和限制:

示例查询不可以与作业书签一起使用。提供两者的配置时,书签状态将会被忽略。

使用自定义 JDBC 驱动程序

以下代码示例演示了如何使用自定义 JDBC 驱动程序读取和写入 JDBC 数据库。这些示例演示了如何从一个版本的数据库产品读取和写入同一产品的更高版本。

Python
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext, SparkConf from awsglue.context import GlueContext from awsglue.job import Job import time from pyspark.sql.types import StructType, StructField, IntegerType, StringType sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session # Construct JDBC connection options connection_mysql5_options = { "url": "jdbc:mysql://<jdbc-host-name>:3306/db", "dbtable": "test", "user": "admin", "password": "pwd"} connection_mysql8_options = { "url": "jdbc:mysql://<jdbc-host-name>:3306/db", "dbtable": "test", "user": "admin", "password": "pwd", "customJdbcDriverS3Path": "s3://DOC-EXAMPLE-BUCKET/mysql-connector-java-8.0.17.jar", "customJdbcDriverClassName": "com.mysql.cj.jdbc.Driver"} connection_oracle11_options = { "url": "jdbc:oracle:thin:@//<jdbc-host-name>:1521/ORCL", "dbtable": "test", "user": "admin", "password": "pwd"} connection_oracle18_options = { "url": "jdbc:oracle:thin:@//<jdbc-host-name>:1521/ORCL", "dbtable": "test", "user": "admin", "password": "pwd", "customJdbcDriverS3Path": "s3://DOC-EXAMPLE-BUCKET/ojdbc10.jar", "customJdbcDriverClassName": "oracle.jdbc.OracleDriver"} # Read from JDBC databases with custom driver df_mysql8 = glueContext.create_dynamic_frame.from_options(connection_type="mysql", connection_options=connection_mysql8_options) # Read DynamicFrame from MySQL 5 and write to MySQL 8 df_mysql5 = glueContext.create_dynamic_frame.from_options(connection_type="mysql", connection_options=connection_mysql5_options) glueContext.write_from_options(frame_or_dfc=df_mysql5, connection_type="mysql", connection_options=connection_mysql8_options) # Read DynamicFrame from Oracle 11 and write to Oracle 18 df_oracle11 = glueContext.create_dynamic_frame.from_options(connection_type="oracle", connection_options=connection_oracle11_options) glueContext.write_from_options(frame_or_dfc=df_oracle11, connection_type="oracle", connection_options=connection_oracle18_options)
Scala
import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.MappingSpec import com.amazonaws.services.glue.errors.CallSite import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.DynamicFrame import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object GlueApp { val MYSQL_5_URI: String = "jdbc:mysql://<jdbc-host-name>:3306/db" val MYSQL_8_URI: String = "jdbc:mysql://<jdbc-host-name>:3306/db" val ORACLE_11_URI: String = "jdbc:oracle:thin:@//<jdbc-host-name>:1521/ORCL" val ORACLE_18_URI: String = "jdbc:oracle:thin:@//<jdbc-host-name>:1521/ORCL" // Construct JDBC connection options lazy val mysql5JsonOption = jsonOptions(MYSQL_5_URI) lazy val mysql8JsonOption = customJDBCDriverJsonOptions(MYSQL_8_URI, "s3://DOC-EXAMPLE-BUCKET/mysql-connector-java-8.0.17.jar", "com.mysql.cj.jdbc.Driver") lazy val oracle11JsonOption = jsonOptions(ORACLE_11_URI) lazy val oracle18JsonOption = customJDBCDriverJsonOptions(ORACLE_18_URI, "s3://DOC-EXAMPLE-BUCKET/ojdbc10.jar", "oracle.jdbc.OracleDriver") def main(sysArgs: Array[String]): Unit = { val spark: SparkContext = new SparkContext() val glueContext: GlueContext = new GlueContext(spark) val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) // Read from JDBC database with custom driver val df_mysql8: DynamicFrame = glueContext.getSource("mysql", mysql8JsonOption).getDynamicFrame() // Read DynamicFrame from MySQL 5 and write to MySQL 8 val df_mysql5: DynamicFrame = glueContext.getSource("mysql", mysql5JsonOption).getDynamicFrame() glueContext.getSink("mysql", mysql8JsonOption).writeDynamicFrame(df_mysql5) // Read DynamicFrame from Oracle 11 and write to Oracle 18 val df_oracle11: DynamicFrame = glueContext.getSource("oracle", oracle11JsonOption).getDynamicFrame() glueContext.getSink("oracle", oracle18JsonOption).writeDynamicFrame(df_oracle11) Job.commit() } private def jsonOptions(url: String): JsonOptions = { new JsonOptions( s"""{"url": "${url}", |"dbtable":"test", |"user": "admin", |"password": "pwd"}""".stripMargin) } private def customJDBCDriverJsonOptions(url: String, customJdbcDriverS3Path: String, customJdbcDriverClassName: String): JsonOptions = { new JsonOptions( s"""{"url": "${url}", |"dbtable":"test", |"user": "admin", |"password": "pwd", |"customJdbcDriverS3Path": "${customJdbcDriverS3Path}", |"customJdbcDriverClassName" : "${customJdbcDriverClassName}"}""".stripMargin) } }