AWS Glue for Spark 中适用于 ETL 的连接类型和选项
在 AWS Glue for Spark 中,各种 PysPark 和 Scala 方法和转换使用 connectionType
参数指定连接类型。它们使用 connectionOptions
或 options
参数指定连接选项。
connectionType
参数可以采用下表中显示的值。下面几个部分介绍每种类型所关联的 connectionOptions
(或 options
)参数值。除非另有说明,否则这些参数会在连接用作源或接收器时使用。
有关展示如何设置和使用连接选项的代码示例,请参阅每种连接类型的主页。
connectionType |
连接到 |
---|---|
dynamodb | Amazon DynamoDB 数据库 |
kinesis | Amazon Kinesis Data Streams |
S3 | Amazon S3 |
documentdb | Amazon DocumentDB (with MongoDB compatibility) 数据库 |
opensearch | Amazon OpenSearch Service。 |
redshift | Amazon Redshift |
kafka | Kafka |
azurecosmos | Azure Cosmos for NoSQL。 |
azuresql | Azure SQL。 |
bigquery | Google BigQuery。 |
mongodb | MongoDB |
sqlserver | Microsoft SQL Server 数据库(请参阅JDBC 连接) |
mysql | MySQL |
oracle | Oracle |
postgresql | PostgreSQL |
saphana | SAP HANA。 |
snowflake | Snowflake |
teradata | Teradata Vantage。 |
vertica | Vertica。 |
custom.* | Spark、Athena 或 JDBC 数据存储(请参阅自定义和 AWS Marketplace connectionType 值) |
marketplace.* | Spark、Athena 或 JDBC 数据存储(请参阅自定义和 AWS Marketplace connectionType 值) |
AWS Glue 5.0 for Spark 中 ETL 的 DataFrame 选项
DataFrame 是一个类似于表的按命名列组织的数据集,并支持函数式(map/reduce/filter/等)操作和 SQL 操作(select、project、aggregate)。
要为 Glue 支持的数据来源创建 DataFrame,需要以下项:
数据来源连接器
ClassName
数据来源连接
Options
同样,要将 DataFrame 写入 Glue 支持的数据接收器,也需要相同项:
数据接收器连接器
ClassName
数据接收器连接
Options
请注意,DataFrame 不支持 AWS Glue 功能(例如作业书签)和 DynamicFrame 选项(例如 connectionName
)。有关 DataFrame 及其支持的操作的更多详细信息,请参阅 DataFrame
指定连接器类名
要指定数据来源/接收器的 ClassName
,请使用 .format
选项提供定义数据来源/接收器的相应连接器 ClassName
。
JDBC 连接器
对于 JDBC 连接器,请指定 jdbc
作为 .format
选项的值,并在 driver
选项中提供 JDBC 驱动程序 ClassName
。
df = spark.read.format("jdbc").option("driver", "<DATA SOURCE JDBC DRIVER CLASSNAME>")... df.write.format("jdbc").option("driver", "<DATA SINK JDBC DRIVER CLASSNAME>")...
下表列出了 AWS Glue for DataFrames 支持的数据来源的 JDBC 驱动程序 ClassName
。
数据来源 | 驱动程序类名 |
---|---|
PostgreSQL | org.postgresql.Driver |
Oracle | oracle.jdbc.driver.OracleDriver |
SQLServer | com.microsoft.sqlserver.jdbc.SQLServerDriver |
MySQL | com.mysql.jdbc.Driver |
SAPHana | com.sap.db.jdbc.Driver |
Teradata | com.teradata.jdbc.TeraDriver |
Spark 连接器
对于 Spark 连接器,请将连接器的 ClassName
指定为 .format
选项的值。
df = spark.read.format("<DATA SOURCE CONNECTOR CLASSNAME>")... df.write.format("<DATA SINK CONNECTOR CLASSNAME>")...
下表列出了 AWS Glue for DataFrames 中支持的数据来源的 Spark 连接器 ClassName
。
数据来源 | 类名 |
---|---|
MongoDB/DocumentDB | glue.spark.mongodb |
Redshift | io.github.spark_redshift_community.spark.redshift |
AzureCosmos | cosmos.oltp |
AzureSQL | com.microsoft.sqlserver.jdbc.spark |
BigQuery | com.google.cloud.spark.bigquery |
OpenSearch | org.opensearch.spark.sql |
Snowflake | net.snowflake.spark.snowflake |
Vertica | com.vertica.spark.datasource.VerticaSource |
指定连接选项
要指定与数据来源/接收器的连接的 Options
,请使用 .option(<KEY>, <VALUE>)
提供单个选项或使用 .options(<MAP>)
提供多个选项作为键值映射。
每个数据来源/接收器都支持自己的一组连接 Options
。有关可用 Options
的详细信息,请参阅下表中列出的特定数据来源/接收器 Spark 连接器的公共文档。
示例
以下示例从 PostgreSQL 读取并写入 SnowFlake:
Python
例如:
from awsglue.context import GlueContext from pyspark.sql import SparkSession spark = SparkSession.builder.getOrCreate() dataSourceClassName = "jdbc" dataSourceOptions = { "driver": "org.postgresql.Driver", "url": "<url>", "user": "<user>", "password": "<password>", "dbtable": "<dbtable>", } dataframe = spark.read.format(className).options(**options).load() dataSinkClassName = "net.snowflake.spark.snowflake" dataSinkOptions = { "sfUrl": "<url>", "sfUsername": "<username>", "sfPassword": "<password>", "sfDatabase" -> "<database>", "sfSchema" -> "<schema>", "sfWarehouse" -> "<warehouse>" } dataframe.write.format(dataSinkClassName).options(**dataSinkOptions).save()
Scala
例如:
import org.apache.spark.sql.SparkSession val spark = SparkSession.builder().getOrCreate() val dataSourceClassName = "jdbc" val dataSourceOptions = Map( "driver" -> "org.postgresql.Driver", "url" -> "<url>", "user" -> "<user>", "password" -> "<password>", "dbtable" -> "<dbtable>" ) val dataframe = spark.read.format(dataSourceClassName).options(dataSourceOptions).load() val dataSinkClassName = "net.snowflake.spark.snowflake" val dataSinkOptions = Map( "sfUrl" -> "<url>", "sfUsername" -> "<username>", "sfPassword" -> "<password>", "sfDatabase" -> "<database>", "sfSchema" -> "<schema>", "sfWarehouse" -> "<warehouse>" ) dataframe.write.format(dataSinkClassName).options(dataSinkOptions).save()
自定义和 AWS Marketplace connectionType 值
这些功能包括:
-
"connectionType": "marketplace.athena"
:指定与 Amazon Athena 数据存储的连接。连接使用来自 AWS Marketplace 的连接器。 -
"connectionType": "marketplace.spark"
:指定与 Apache Spark 数据存储的连接。连接使用来自 AWS Marketplace 的连接器。 -
"connectionType": "marketplace.jdbc"
:指定与 JDBC 数据存储的连接。连接使用来自 AWS Marketplace 的连接器。 -
"connectionType": "custom.athena"
:指定与 Amazon Athena 数据存储的连接。连接使用您上传到 AWS Glue Studio 的自定义连接器。 -
"connectionType": "custom.spark"
:指定与 Apache Spark 数据存储的连接。连接使用您上传到 AWS Glue Studio 的自定义连接器。 -
"connectionType": "custom.jdbc"
:指定与 JDBC 数据存储的连接。连接使用您上传到 AWS Glue Studio 的自定义连接器。
适用于类型 custom.jdbc 或 marketplace.jdbc 的连接选项
-
className
– 字符串,必需,驱动程序类名称。 -
connectionName
– 字符串,必需,与连接器关联的连接的名称。 -
url
– 字符串,必需,用于建立与数据源的连接且带占位符(${}
)的 JDBC URL。占位符${secretKey}
替换为 AWS Secrets Manager 中同名的密钥。有关构建 URL 的详细信息,请参阅数据存储文档。 -
secretId
或user/password
– 字符串,必需,用于检索 URL 的凭证。 -
dbTable
或query
– 字符串,必需,从中获取数据的表或 SQL 查询。您可以指定dbTable
或query
,但不能同时指定两者。 -
partitionColumn
– 字符串,可选,用于分区的整数列的名称。此选项仅在包含lowerBound
、upperBound
和numPartitions
时有效。此选项的工作方式与 Spark SQL JDBC 阅读器中的工作方式相同。有关更多信息,请参阅《Apache Spark SQL、DataFrame 和 Dataset 指南》中的 JDBC 转换到其他数据库。 lowerBound
和upperBound
值用于确定分区步长,而不是用于筛选表中的行。对表中的所有行进行分区并返回。注意
使用查询(而不是表名称)时,您应验证查询是否适用于指定的分区条件。例如:
-
如果您的查询格式为
"SELECT col1 FROM table1"
,则在使用分区列的查询结尾附加WHERE
子句,以测试查询。 -
如果您的查询格式为
SELECT col1 FROM table1 WHERE col2=val"
,则通过AND
和使用分区列的表达式扩展WHERE
子句,以测试查询。
-
-
lowerBound
– 整数,可选,用于确定分区步长的最小partitionColumn
值。 -
upperBound
– 整数,可选,用于确定分区步长的最大partitionColumn
值。 -
numPartitions
– 整数,可选,分区数。此值以及lowerBound
(包含)和upperBound
(排除)为用于拆分partitionColumn
而生成的WHERE
子句表达式构成分区步长。重要
请注意分区的数量,因为分区过多可能会导致外部数据库系统出现问题。
-
filterPredicate
– 字符串,可选,用于筛选源数据的额外条件子句。例如:BillingCity='Mountain View'
使用查询(而不是表名称)时,您应验证查询是否适用于指定的
filterPredicate
。例如:-
如果您的查询格式为
"SELECT col1 FROM table1"
,则在使用筛选条件谓词的查询结尾附加WHERE
子句,以测试查询。 -
如果您的查询格式为
"SELECT col1 FROM table1 WHERE col2=val"
,则通过AND
和使用筛选条件谓词的表达式扩展WHERE
子句,以测试查询。
-
-
dataTypeMapping
– 目录,可选,用于构建从 JDBC 数据类型到 Glue 数据类型的映射的自定义数据类型映射。例如,选项"dataTypeMapping":{"FLOAT":"STRING"}
会通过调用驱动程序的ResultSet.getString()
方法,将 JDBC 类型FLOAT
的数据字段映射到 JavaString
类型,并将其用于构建 AWS Glue 记录。ResultSet
对象由每个驱动程序实现,因此行为特定于您使用的驱动程序。请参阅 JDBC 驱动程序的文档,了解驱动程序执行转换的方式。 -
目前受支持的 AWS Glue 数据类型包括:
-
DATE
-
string
-
TIMESTAMP
-
INT
-
FLOAT
-
LONG
-
BIGDECIMAL
-
BYTE
-
SHORT
-
DOUBLE
支持的 JDBC 数据类型为 Java8 java.sql.types
。 默认数据类型映射(从 JDBC 到 AWS Glue)如下:
-
DATE -> DATE
-
VARCHAR -> STRING
-
CHAR -> STRING
-
LONGNVARCHAR -> STRING
-
TIMESTAMP -> TIMESTAMP
-
INTEGER -> INT
-
FLOAT -> FLOAT
-
REAL -> FLOAT
-
BIT -> BOOLEAN
-
BOOLEAN -> BOOLEAN
-
BIGINT -> LONG
-
DECIMAL -> BIGDECIMAL
-
NUMERIC -> BIGDECIMAL
-
TINYINT -> SHORT
-
SMALLINT -> SHORT
-
DOUBLE -> DOUBLE
如果将自定义数据类型映射与选项
dataTypeMapping
结合使用,则可以覆盖默认数据类型映射。只有dataTypeMapping
选项中列出的 JDBC 数据类型会受到影响;默认映射适用于所有其他 JDBC 数据类型。如果需要,您可以为其他 JDBC 数据类型添加映射。如果默认映射或自定义映射中均未包含 JDBC 数据类型,则数据类型默认转换为 AWS GlueSTRING
数据类型。 -
以下 Python 代码示例演示了如何使用 AWS Marketplace JDBC 驱动程序从 JDBC 数据库读取数据。它演示了如何从数据库读取数据并将数据写入 S3 位置。
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [connection_type = "marketplace.jdbc", connection_options = {"dataTypeMapping":{"INTEGER":"STRING"},"upperBound":"200","query":"select id, name, department from department where id < 200","numPartitions":"4", "partitionColumn":"id","lowerBound":"0","connectionName":"test-connection-jdbc"}, transformation_ctx = "DataSource0"] ## @return: DataSource0 ## @inputs: [] DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.jdbc", connection_options = {"dataTypeMapping":{"INTEGER":"STRING"}, "upperBound":"200","query":"select id, name, department from department where id < 200","numPartitions":"4","partitionColumn":"id","lowerBound":"0", "connectionName":"test-connection-jdbc"}, transformation_ctx = "DataSource0") ## @type: ApplyMapping ## @args: [mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0"] ## @return: Transform0 ## @inputs: [frame = DataSource0] Transform0 = ApplyMapping.apply(frame = DataSource0, mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0") ## @type: DataSink ## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3://
<S3 path>
/", "partitionKeys": []}, transformation_ctx = "DataSink0"] ## @return: DataSink0 ## @inputs: [frame = Transform0] DataSink0 = glueContext.write_dynamic_frame.from_options(frame = Transform0, connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>
/", "partitionKeys": []}, transformation_ctx = "DataSink0") job.commit()
适用于类型 custom.athena 或 marketplace.athena 的连接选项
-
className
– 字符串,必需,驱动程序类名称。当您使用 Athena-CloudWatch 连接器时,此参数值是类名称(例如"com.amazonaws.athena.connectors"
)的前缀。Athena-CloudWatch 连接器由两个类组成:元数据处理程序和记录处理程序。如果您在此处提供通用前缀,则 API 会根据该前缀加载正确的类。 -
tableName
– 字符串,必需,要读取的 CloudWatch 日志流的名称。此代码段使用特别视图名称all_log_streams
,这意味着返回的动态数据框将包含日志组中所有日志流的数据。 -
schemaName
– 字符串,必需,要从中读取数据的 CloudWatch 日志流的名称。例如,/aws-glue/jobs/output
。 -
connectionName
– 字符串,必需,与连接器关联的连接的名称。
有关此连接器的其他选项,请参阅 GitHub 上的 Amazon Athena CloudWatch 连接器自述
以下 Python 代码示例演示了如何从使用 AWS Marketplace 连接器的 Athena 数据存储读取数据。它演示了如何从 Athena 读取数据并将数据写入 S3 位置。
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [connection_type = "marketplace.athena", connection_options = {"tableName":"all_log_streams","schemaName":"/aws-glue/jobs/output", "connectionName":"test-connection-athena"}, transformation_ctx = "DataSource0"] ## @return: DataSource0 ## @inputs: [] DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.athena", connection_options = {"tableName":"all_log_streams",, "schemaName":"/aws-glue/jobs/output","connectionName": "test-connection-athena"}, transformation_ctx = "DataSource0") ## @type: ApplyMapping ## @args: [mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0"] ## @return: Transform0 ## @inputs: [frame = DataSource0] Transform0 = ApplyMapping.apply(frame = DataSource0, mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0") ## @type: DataSink ## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3://
<S3 path>
/", "partitionKeys": []}, transformation_ctx = "DataSink0"] ## @return: DataSink0 ## @inputs: [frame = Transform0] DataSink0 = glueContext.write_dynamic_frame.from_options(frame = Transform0, connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>
/", "partitionKeys": []}, transformation_ctx = "DataSink0") job.commit()
适用于类型 custom.spark 或 marketplace.spark 的连接选项
-
className
– 字符串,必需,连接器类名称。 -
secretId
– 字符串,可选,用于检索连接器连接的凭证。 -
connectionName
– 字符串,必需,与连接器关联的连接的名称。 -
其他选项取决于数据存储。例如,OpenSearch 配置选项以前缀
es
开头,正如适用于 Apache Hadoop 的 Elasticsearch文档中所述。Spark 与 Snowflake 的连接使用 sfUser
和sfPassword
等连接,正如《连接 Snowflake》指南中的使用 Spark 连接器所述。
以下 Python 代码示例演示了如何从使用 marketplace.spark
连接的 OpenSearch 数据存储读取数据。
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [connection_type = "marketplace.spark", connection_options = {"path":"test", "es.nodes.wan.only":"true","es.nodes":"https://
<AWS endpoint>
", "connectionName":"test-spark-es","es.port":"443"}, transformation_ctx = "DataSource0"] ## @return: DataSource0 ## @inputs: [] DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.spark", connection_options = {"path":"test","es.nodes.wan.only": "true","es.nodes":"https://<AWS endpoint>
","connectionName": "test-spark-es","es.port":"443"}, transformation_ctx = "DataSource0") ## @type: DataSink ## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>
/", "partitionKeys": []}, transformation_ctx = "DataSink0"] ## @return: DataSink0 ## @inputs: [frame = DataSource0] DataSink0 = glueContext.write_dynamic_frame.from_options(frame = DataSource0, connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>
/", "partitionKeys": []}, transformation_ctx = "DataSink0") job.commit()
常规选项
本节中的选项以 connection_options
形式提供,但不是专门适用于一个连接器。
配置书签时通常使用以下参数。它们可能适用于 Amazon S3 或 JDBC 工作流程。有关更多信息,请参阅 使用作业书签。
jobBookmarkKeys
- 列名称的数组。jobBookmarkKeysSortOrder
- 定义如何根据排序顺序比较值的字符串。有效值:"asc"
、"desc"
。useS3ListImplementation
- 用于在列出 Amazon S3 存储桶内容时管理内存性能。有关更多信息,请参阅 Optimize memory management in AWS Glue。