Connection types and options for ETL in AWS Glue for Spark - AWS Glue

Connection types and options for ETL in AWS Glue for Spark

In AWS Glue for Spark, various PySpark and Scala methods and transforms specify the connection type using a connectionType parameter. They specify connection options using a connectionOptions or options parameter.

The connectionType parameter can take the values shown in the following table. The associated connectionOptions (or options) parameter values for each type are documented in the following sections. Except where otherwise noted, the parameters apply when the connection is used as a source or sink.

For sample code that demonstrates setting and using connection options, see the homepage for each connection type.

connectionType Connects to
dynamodb Amazon DynamoDB database
kinesis Amazon Kinesis Data Streams
s3 Amazon S3
documentdb Amazon DocumentDB (with MongoDB compatibility) database
opensearch Amazon OpenSearch Service.
redshift Amazon Redshift database
kafka Kafka or Amazon Managed Streaming for Apache Kafka
azurecosmos Azure Cosmos for NoSQL.
azuresql Azure SQL.
bigquery Google BigQuery.
mongodb MongoDB database, including MongoDB Atlas.
sqlserver Microsoft SQL Server database (see JDBC connections)
mysql MySQL database (see JDBC connections)
oracle Oracle database (see JDBC connections)
postgresql PostgreSQL database (see JDBC connections)
saphana SAP HANA.
snowflake Snowflake data lake
teradata Teradata Vantage.
vertica Vertica.
custom.* Spark, Athena, or JDBC data stores (see Custom and AWS Marketplace connectionType values
marketplace.* Spark, Athena, or JDBC data stores (see Custom and AWS Marketplace connectionType values)

Custom and AWS Marketplace connectionType values

These include the following:

  • "connectionType": "marketplace.athena": Designates a connection to an Amazon Athena data store. The connection uses a connector from AWS Marketplace.

  • "connectionType": "marketplace.spark": Designates a connection to an Apache Spark data store. The connection uses a connector from AWS Marketplace.

  • "connectionType": "marketplace.jdbc": Designates a connection to a JDBC data store. The connection uses a connector from AWS Marketplace.

  • "connectionType": "custom.athena": Designates a connection to an Amazon Athena data store. The connection uses a custom connector that you upload to AWS Glue Studio.

  • "connectionType": "custom.spark": Designates a connection to an Apache Spark data store. The connection uses a custom connector that you upload to AWS Glue Studio.

  • "connectionType": "custom.jdbc": Designates a connection to a JDBC data store. The connection uses a custom connector that you upload to AWS Glue Studio.

Connection options for type custom.jdbc or marketplace.jdbc

  • className – String, required, driver class name.

  • connectionName – String, required, name of the connection that is associated with the connector.

  • url – String, required, JDBC URL with placeholders (${}) which are used to build the connection to the data source. The placeholder ${secretKey} is replaced with the secret of the same name in AWS Secrets Manager. Refer to the data store documentation for more information about constructing the URL.

  • secretId or user/password – String, required, used to retrieve credentials for the URL.

  • dbTable or query – String, required, the table or SQL query to get the data from. You can specify either dbTable or query, but not both.

  • partitionColumn – String, optional, the name of an integer column that is used for partitioning. This option works only when it's included with lowerBound, upperBound, and numPartitions. This option works the same way as in the Spark SQL JDBC reader. For more information, see JDBC To Other Databases in the Apache Spark SQL, DataFrames and Datasets Guide.

    The lowerBound and upperBound values are used to decide the partition stride, not for filtering the rows in table. All rows in the table are partitioned and returned.

    Note

    When using a query instead of a table name, you should validate that the query works with the specified partitioning condition. For example:

    • If your query format is "SELECT col1 FROM table1", then test the query by appending a WHERE clause at the end of the query that uses the partition column.

    • If your query format is "SELECT col1 FROM table1 WHERE col2=val", then test the query by extending the WHERE clause with AND and an expression that uses the partition column.

  • lowerBound – Integer, optional, the minimum value of partitionColumn that is used to decide partition stride.

  • upperBound – Integer, optional, the maximum value of partitionColumn that is used to decide partition stride.

  • numPartitions – Integer, optional, the number of partitions. This value, along with lowerBound (inclusive) and upperBound (exclusive), form partition strides for generated WHERE clause expressions that are used to split the partitionColumn.

    Important

    Be careful with the number of partitions because too many partitions might cause problems on your external database systems.

  • filterPredicate – String, optional, extra condition clause to filter data from source. For example:

    BillingCity='Mountain View'

    When using a query instead of a table name, you should validate that the query works with the specified filterPredicate. For example:

    • If your query format is "SELECT col1 FROM table1", then test the query by appending a WHERE clause at the end of the query that uses the filter predicate.

    • If your query format is "SELECT col1 FROM table1 WHERE col2=val", then test the query by extending the WHERE clause with AND and an expression that uses the filter predicate.

  • dataTypeMapping – Dictionary, optional, custom data type mapping that builds a mapping from a JDBC data type to a Glue data type. For example, the option "dataTypeMapping":{"FLOAT":"STRING"} maps data fields of JDBC type FLOAT into the Java String type by calling the ResultSet.getString() method of the driver, and uses it to build AWS Glue records. The ResultSet object is implemented by each driver, so the behavior is specific to the driver you use. Refer to the documentation for your JDBC driver to understand how the driver performs the conversions.

  • The AWS Glue data types supported currently are:

    • DATE

    • STRING

    • TIMESTAMP

    • INT

    • FLOAT

    • LONG

    • BIGDECIMAL

    • BYTE

    • SHORT

    • DOUBLE

    The JDBC data types supported are Java8 java.sql.types.

    The default data type mappings (from JDBC to AWS Glue) are:

    • DATE -> DATE

    • VARCHAR -> STRING

    • CHAR -> STRING

    • LONGNVARCHAR -> STRING

    • TIMESTAMP -> TIMESTAMP

    • INTEGER -> INT

    • FLOAT -> FLOAT

    • REAL -> FLOAT

    • BIT -> BOOLEAN

    • BOOLEAN -> BOOLEAN

    • BIGINT -> LONG

    • DECIMAL -> BIGDECIMAL

    • NUMERIC -> BIGDECIMAL

    • TINYINT -> SHORT

    • SMALLINT -> SHORT

    • DOUBLE -> DOUBLE

    If you use a custom data type mapping with the option dataTypeMapping, then you can override a default data type mapping. Only the JDBC data types listed in the dataTypeMapping option are affected; the default mapping is used for all other JDBC data types. You can add mappings for additional JDBC data types if needed. If a JDBC data type is not included in either the default mapping or a custom mapping, then the data type converts to the AWS Glue STRING data type by default.

The following Python code example shows how to read from JDBC databases with AWS Marketplace JDBC drivers. It demonstrates reading from a database and writing to an S3 location.

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [connection_type = "marketplace.jdbc", connection_options = {"dataTypeMapping":{"INTEGER":"STRING"},"upperBound":"200","query":"select id, name, department from department where id < 200","numPartitions":"4", "partitionColumn":"id","lowerBound":"0","connectionName":"test-connection-jdbc"}, transformation_ctx = "DataSource0"] ## @return: DataSource0 ## @inputs: [] DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.jdbc", connection_options = {"dataTypeMapping":{"INTEGER":"STRING"}, "upperBound":"200","query":"select id, name, department from department where id < 200","numPartitions":"4","partitionColumn":"id","lowerBound":"0", "connectionName":"test-connection-jdbc"}, transformation_ctx = "DataSource0") ## @type: ApplyMapping ## @args: [mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0"] ## @return: Transform0 ## @inputs: [frame = DataSource0] Transform0 = ApplyMapping.apply(frame = DataSource0, mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0") ## @type: DataSink ## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0"] ## @return: DataSink0 ## @inputs: [frame = Transform0] DataSink0 = glueContext.write_dynamic_frame.from_options(frame = Transform0, connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0") job.commit()

Connection options for type custom.athena or marketplace.athena

  • className – String, required, driver class name. When you're using the Athena-CloudWatch connector, this parameter value is the prefix of the class Name (for example, "com.amazonaws.athena.connectors"). The Athena-CloudWatch connector is composed of two classes: a metadata handler and a record handler. If you supply the common prefix here, then the API loads the correct classes based on that prefix.

  • tableName – String, required, the name of the CloudWatch log stream to read. This code snippet uses the special view name all_log_streams, which means that the dynamic data frame returned will contain data from all log streams in the log group.

  • schemaName – String, required, the name of the CloudWatch log group to read from. For example, /aws-glue/jobs/output.

  • connectionName – String, required, name of the connection that is associated with the connector.

For additional options for this connector, see the Amazon Athena CloudWatch Connector README file on GitHub.

The following Python code example shows how to read from an Athena data store using an AWS Marketplace connector. It demonstrates reading from Athena and writing to an S3 location.

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [connection_type = "marketplace.athena", connection_options = {"tableName":"all_log_streams","schemaName":"/aws-glue/jobs/output", "connectionName":"test-connection-athena"}, transformation_ctx = "DataSource0"] ## @return: DataSource0 ## @inputs: [] DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.athena", connection_options = {"tableName":"all_log_streams",, "schemaName":"/aws-glue/jobs/output","connectionName": "test-connection-athena"}, transformation_ctx = "DataSource0") ## @type: ApplyMapping ## @args: [mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0"] ## @return: Transform0 ## @inputs: [frame = DataSource0] Transform0 = ApplyMapping.apply(frame = DataSource0, mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0") ## @type: DataSink ## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0"] ## @return: DataSink0 ## @inputs: [frame = Transform0] DataSink0 = glueContext.write_dynamic_frame.from_options(frame = Transform0, connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0") job.commit()

Connection options for type custom.spark or marketplace.spark

  • className – String, required, connector class name.

  • secretId – String, optional, used to retrieve credentials for the connector connection.

  • connectionName – String, required, name of the connection that is associated with the connector.

  • Other options depend on the data store. For example, OpenSearch configuration options start with the prefix es, as described in the Elasticsearch for Apache Hadoop documentation. Spark connections to Snowflake use options such as sfUser and sfPassword, as described in Using the Spark Connector in the Connecting to Snowflake guide.

The following Python code example shows how to read from an OpenSearch data store using a marketplace.spark connection.

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [connection_type = "marketplace.spark", connection_options = {"path":"test", "es.nodes.wan.only":"true","es.nodes":"https://<AWS endpoint>", "connectionName":"test-spark-es","es.port":"443"}, transformation_ctx = "DataSource0"] ## @return: DataSource0 ## @inputs: [] DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.spark", connection_options = {"path":"test","es.nodes.wan.only": "true","es.nodes":"https://<AWS endpoint>","connectionName": "test-spark-es","es.port":"443"}, transformation_ctx = "DataSource0") ## @type: DataSink ## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0"] ## @return: DataSink0 ## @inputs: [frame = DataSource0] DataSink0 = glueContext.write_dynamic_frame.from_options(frame = DataSource0, connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0") job.commit()

General options

The options in this section are provided as connection_options, but do not specifically apply to one connector.

The following parameters are used generally when configuring bookmarks. They may apply to Amazon S3 or JDBC workflows. For more information, see Using job bookmarks.

  • jobBookmarkKeys — Array of column names.

  • jobBookmarkKeysSortOrder — String defining how to compare values based on sort order. Valid values: "asc", "desc".

  • useS3ListImplementation — Used to manage memory performance when listing Amazon S3 bucket contents. For more information, see Optimize memory management in AWS Glue.