Connection types and options for ETL in AWS Glue for Spark
In AWS Glue for Spark, various PySpark and Scala methods and transforms specify the connection type using
a connectionType
parameter. They specify connection options using a
connectionOptions
or options
parameter.
The connectionType
parameter can take the values shown in the following table.
The associated connectionOptions
(or options
) parameter values for
each type are documented in the following sections. Except where otherwise noted, the parameters
apply when the connection is used as a source or sink.
For sample code that demonstrates setting and using connection options, see the homepage for each connection type.
connectionType |
Connects to |
---|---|
dynamodb | Amazon DynamoDB database |
kinesis | Amazon Kinesis Data Streams |
s3 | Amazon S3 |
documentdb | Amazon DocumentDB (with MongoDB compatibility) database |
opensearch | Amazon OpenSearch Service. |
redshift | Amazon Redshift |
kafka |
Kafka |
azurecosmos | Azure Cosmos for NoSQL. |
azuresql | Azure SQL. |
bigquery | Google BigQuery. |
mongodb | MongoDB |
sqlserver | Microsoft SQL Server database (see JDBC connections) |
mysql | MySQL |
oracle | Oracle |
postgresql |
PostgreSQL |
saphana | SAP HANA. |
snowflake | Snowflake |
teradata | Teradata Vantage. |
vertica | Vertica. |
custom.* | Spark, Athena, or JDBC data stores (see Custom and AWS Marketplace connectionType values |
marketplace.* | Spark, Athena, or JDBC data stores (see Custom and AWS Marketplace connectionType values) |
Custom and AWS Marketplace connectionType values
These include the following:
-
"connectionType": "marketplace.athena"
: Designates a connection to an Amazon Athena data store. The connection uses a connector from AWS Marketplace. -
"connectionType": "marketplace.spark"
: Designates a connection to an Apache Spark data store. The connection uses a connector from AWS Marketplace. -
"connectionType": "marketplace.jdbc"
: Designates a connection to a JDBC data store. The connection uses a connector from AWS Marketplace. -
"connectionType": "custom.athena"
: Designates a connection to an Amazon Athena data store. The connection uses a custom connector that you upload to AWS Glue Studio. -
"connectionType": "custom.spark"
: Designates a connection to an Apache Spark data store. The connection uses a custom connector that you upload to AWS Glue Studio. -
"connectionType": "custom.jdbc"
: Designates a connection to a JDBC data store. The connection uses a custom connector that you upload to AWS Glue Studio.
Connection options for type custom.jdbc or marketplace.jdbc
-
className
– String, required, driver class name. -
connectionName
– String, required, name of the connection that is associated with the connector. -
url
– String, required, JDBC URL with placeholders (${}
) which are used to build the connection to the data source. The placeholder${secretKey}
is replaced with the secret of the same name in AWS Secrets Manager. Refer to the data store documentation for more information about constructing the URL. -
secretId
oruser/password
– String, required, used to retrieve credentials for the URL. -
dbTable
orquery
– String, required, the table or SQL query to get the data from. You can specify eitherdbTable
orquery
, but not both. -
partitionColumn
– String, optional, the name of an integer column that is used for partitioning. This option works only when it's included withlowerBound
,upperBound
, andnumPartitions
. This option works the same way as in the Spark SQL JDBC reader. For more information, see JDBC To Other Databasesin the Apache Spark SQL, DataFrames and Datasets Guide. The
lowerBound
andupperBound
values are used to decide the partition stride, not for filtering the rows in table. All rows in the table are partitioned and returned.Note
When using a query instead of a table name, you should validate that the query works with the specified partitioning condition. For example:
-
If your query format is
"SELECT col1 FROM table1"
, then test the query by appending aWHERE
clause at the end of the query that uses the partition column. -
If your query format is "
SELECT col1 FROM table1 WHERE col2=val"
, then test the query by extending theWHERE
clause withAND
and an expression that uses the partition column.
-
-
lowerBound
– Integer, optional, the minimum value ofpartitionColumn
that is used to decide partition stride. -
upperBound
– Integer, optional, the maximum value ofpartitionColumn
that is used to decide partition stride. -
numPartitions
– Integer, optional, the number of partitions. This value, along withlowerBound
(inclusive) andupperBound
(exclusive), form partition strides for generatedWHERE
clause expressions that are used to split thepartitionColumn
.Important
Be careful with the number of partitions because too many partitions might cause problems on your external database systems.
-
filterPredicate
– String, optional, extra condition clause to filter data from source. For example:BillingCity='Mountain View'
When using a query instead of a table name, you should validate that the query works with the specified
filterPredicate
. For example:-
If your query format is
"SELECT col1 FROM table1"
, then test the query by appending aWHERE
clause at the end of the query that uses the filter predicate. -
If your query format is
"SELECT col1 FROM table1 WHERE col2=val"
, then test the query by extending theWHERE
clause withAND
and an expression that uses the filter predicate.
-
-
dataTypeMapping
– Dictionary, optional, custom data type mapping that builds a mapping from a JDBC data type to a Glue data type. For example, the option"dataTypeMapping":{"FLOAT":"STRING"}
maps data fields of JDBC typeFLOAT
into the JavaString
type by calling theResultSet.getString()
method of the driver, and uses it to build AWS Glue records. TheResultSet
object is implemented by each driver, so the behavior is specific to the driver you use. Refer to the documentation for your JDBC driver to understand how the driver performs the conversions. -
The AWS Glue data types supported currently are:
-
DATE
-
STRING
-
TIMESTAMP
-
INT
-
FLOAT
-
LONG
-
BIGDECIMAL
-
BYTE
-
SHORT
-
DOUBLE
The JDBC data types supported are Java8 java.sql.types
. The default data type mappings (from JDBC to AWS Glue) are:
-
DATE -> DATE
-
VARCHAR -> STRING
-
CHAR -> STRING
-
LONGNVARCHAR -> STRING
-
TIMESTAMP -> TIMESTAMP
-
INTEGER -> INT
-
FLOAT -> FLOAT
-
REAL -> FLOAT
-
BIT -> BOOLEAN
-
BOOLEAN -> BOOLEAN
-
BIGINT -> LONG
-
DECIMAL -> BIGDECIMAL
-
NUMERIC -> BIGDECIMAL
-
TINYINT -> SHORT
-
SMALLINT -> SHORT
-
DOUBLE -> DOUBLE
If you use a custom data type mapping with the option
dataTypeMapping
, then you can override a default data type mapping. Only the JDBC data types listed in thedataTypeMapping
option are affected; the default mapping is used for all other JDBC data types. You can add mappings for additional JDBC data types if needed. If a JDBC data type is not included in either the default mapping or a custom mapping, then the data type converts to the AWS GlueSTRING
data type by default. -
The following Python code example shows how to read from JDBC databases with AWS Marketplace JDBC drivers. It demonstrates reading from a database and writing to an S3 location.
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [connection_type = "marketplace.jdbc", connection_options = {"dataTypeMapping":{"INTEGER":"STRING"},"upperBound":"200","query":"select id, name, department from department where id < 200","numPartitions":"4", "partitionColumn":"id","lowerBound":"0","connectionName":"test-connection-jdbc"}, transformation_ctx = "DataSource0"] ## @return: DataSource0 ## @inputs: [] DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.jdbc", connection_options = {"dataTypeMapping":{"INTEGER":"STRING"}, "upperBound":"200","query":"select id, name, department from department where id < 200","numPartitions":"4","partitionColumn":"id","lowerBound":"0", "connectionName":"test-connection-jdbc"}, transformation_ctx = "DataSource0") ## @type: ApplyMapping ## @args: [mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0"] ## @return: Transform0 ## @inputs: [frame = DataSource0] Transform0 = ApplyMapping.apply(frame = DataSource0, mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0") ## @type: DataSink ## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3://
<S3 path>
/", "partitionKeys": []}, transformation_ctx = "DataSink0"] ## @return: DataSink0 ## @inputs: [frame = Transform0] DataSink0 = glueContext.write_dynamic_frame.from_options(frame = Transform0, connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>
/", "partitionKeys": []}, transformation_ctx = "DataSink0") job.commit()
Connection options for type custom.athena or marketplace.athena
-
className
– String, required, driver class name. When you're using the Athena-CloudWatch connector, this parameter value is the prefix of the class Name (for example,"com.amazonaws.athena.connectors"
). The Athena-CloudWatch connector is composed of two classes: a metadata handler and a record handler. If you supply the common prefix here, then the API loads the correct classes based on that prefix. -
tableName
– String, required, the name of the CloudWatch log stream to read. This code snippet uses the special view nameall_log_streams
, which means that the dynamic data frame returned will contain data from all log streams in the log group. -
schemaName
– String, required, the name of the CloudWatch log group to read from. For example,/aws-glue/jobs/output
. -
connectionName
– String, required, name of the connection that is associated with the connector.
For additional options for this connector, see the Amazon Athena CloudWatch Connector README
The following Python code example shows how to read from an Athena data store using an AWS Marketplace connector. It demonstrates reading from Athena and writing to an S3 location.
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [connection_type = "marketplace.athena", connection_options = {"tableName":"all_log_streams","schemaName":"/aws-glue/jobs/output", "connectionName":"test-connection-athena"}, transformation_ctx = "DataSource0"] ## @return: DataSource0 ## @inputs: [] DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.athena", connection_options = {"tableName":"all_log_streams",, "schemaName":"/aws-glue/jobs/output","connectionName": "test-connection-athena"}, transformation_ctx = "DataSource0") ## @type: ApplyMapping ## @args: [mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0"] ## @return: Transform0 ## @inputs: [frame = DataSource0] Transform0 = ApplyMapping.apply(frame = DataSource0, mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0") ## @type: DataSink ## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3://
<S3 path>
/", "partitionKeys": []}, transformation_ctx = "DataSink0"] ## @return: DataSink0 ## @inputs: [frame = Transform0] DataSink0 = glueContext.write_dynamic_frame.from_options(frame = Transform0, connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>
/", "partitionKeys": []}, transformation_ctx = "DataSink0") job.commit()
Connection options for type custom.spark or marketplace.spark
-
className
– String, required, connector class name. -
secretId
– String, optional, used to retrieve credentials for the connector connection. -
connectionName
– String, required, name of the connection that is associated with the connector. -
Other options depend on the data store. For example, OpenSearch configuration options start with the prefix
es
, as described in the Elasticsearch for Apache Hadoopdocumentation. Spark connections to Snowflake use options such as sfUser
andsfPassword
, as described in Using the Spark Connectorin the Connecting to Snowflake guide.
The following Python code example shows how to read from an OpenSearch data store using a
marketplace.spark
connection.
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [connection_type = "marketplace.spark", connection_options = {"path":"test", "es.nodes.wan.only":"true","es.nodes":"https://
<AWS endpoint>
", "connectionName":"test-spark-es","es.port":"443"}, transformation_ctx = "DataSource0"] ## @return: DataSource0 ## @inputs: [] DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.spark", connection_options = {"path":"test","es.nodes.wan.only": "true","es.nodes":"https://<AWS endpoint>
","connectionName": "test-spark-es","es.port":"443"}, transformation_ctx = "DataSource0") ## @type: DataSink ## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>
/", "partitionKeys": []}, transformation_ctx = "DataSink0"] ## @return: DataSink0 ## @inputs: [frame = DataSource0] DataSink0 = glueContext.write_dynamic_frame.from_options(frame = DataSource0, connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>
/", "partitionKeys": []}, transformation_ctx = "DataSink0") job.commit()
General options
The options in this section are provided as connection_options
, but do not specifically apply
to one connector.
The following parameters are used generally when configuring bookmarks. They may apply to Amazon S3 or JDBC workflows. For more information, see Using job bookmarks.
jobBookmarkKeys
— Array of column names.jobBookmarkKeysSortOrder
— String defining how to compare values based on sort order. Valid values:"asc"
,"desc"
.useS3ListImplementation
— Used to manage memory performance when listing Amazon S3 bucket contents. For more information, see Optimize memory management in AWS Glue.