Connection Types and Options for ETL in AWS Glue - AWS Glue

Connection Types and Options for ETL in AWS Glue

In AWS Glue, various PySpark and Scala methods and transforms specify the connection type using a connectionType parameter. They specify connection options using a connectionOptions or options parameter.

The connectionType parameter can take the values shown in the following table. The associated connectionOptions (or options) parameter values for each type are documented in the following sections. Except where otherwise noted, the parameters apply when the connection is used as a source or sink.

For sample code that demonstrates setting and using connection options, see Examples: Setting Connection Types and Options.

connectionType Connects To
custom.* Spark, Athena, or JDBC data stores (see Custom and AWS Marketplace connectionType values
documentdb Amazon DocumentDB (with MongoDB compatibility) database
dynamodb Amazon DynamoDB database
kafka Kafka or Amazon Managed Streaming for Apache Kafka
kinesis Amazon Kinesis Data Streams
marketplace.* Spark, Athena, or JDBC data stores (see Custom and AWS Marketplace connectionType values)
mongodb MongoDB database
mysql MySQL database (see JDBC connectionType Values)
oracle Oracle database (see JDBC connectionType Values)
orc Files stored in Amazon Simple Storage Service (Amazon S3) in the Apache Hive Optimized Row Columnar (ORC) file format
parquet Files stored in Amazon S3 in the Apache Parquet file format
postgresql PostgreSQL database (see JDBC connectionType Values)
redshift Amazon Redshift database (see JDBC connectionType Values)
s3 Amazon S3
sqlserver Microsoft SQL Server database (see JDBC connectionType Values)

"connectionType": "documentdb"

Designates a connection to Amazon DocumentDB (with MongoDB compatibility).

Connection options differ for a source connection and a sink connection.

"connectionType": "documentdb" as Source

Use the following connection options with "connectionType": "documentdb" as a source:

  • "uri": (Required) The Amazon DocumentDB host to read from, formatted as mongodb://<host>:<port>.

  • "database": (Required) The Amazon DocumentDB database to read from.

  • "collection": (Required) The Amazon DocumentDB collection to read from.

  • "username": (Required) The Amazon DocumentDB user name.

  • "password": (Required) The Amazon DocumentDB password.

  • "ssl": (Required if using SSL) If your connection uses SSL, then you must include this option with the value "true".

  • "ssl.domain_match": (Required if using SSL) If your connection uses SSL, then you must include this option with the value "false".

  • "batchSize": (Optional): The number of documents to return per batch, used within the cursor of internal batches.

  • "partitioner": (Optional): The class name of the partitioner for reading input data from Amazon DocumentDB. The connector provides the following partitioners:

    • MongoDefaultPartitioner (default)

    • MongoSamplePartitioner

    • MongoShardedPartitioner

    • MongoSplitVectorPartitioner

    • MongoPaginateByCountPartitioner

    • MongoPaginateBySizePartitioner

  • "partitionerOptions" (Optional): Options for the designated partitioner. The following options are supported for each partitioner:

    • MongoSamplePartitioner: partitionKey, partitionSizeMB, samplesPerPartition

    • MongoShardedPartitioner: shardkey

    • MongoSplitVectorPartitioner: partitionKey, partitionSizeMB

    • MongoPaginateByCountPartitioner: partitionKey, numberOfPartitions

    • MongoPaginateBySizePartitioner: partitionKey, partitionSizeMB

    For more information about these options, see Partitioner Configuration in the MongoDB documentation. For sample code, see Examples: Setting Connection Types and Options.

"connectionType": "documentdb" as Sink

Use the following connection options with "connectionType": "documentdb" as a sink:

  • "uri": (Required) The Amazon DocumentDB host to write to, formatted as mongodb://<host>:<port>.

  • "database": (Required) The Amazon DocumentDB database to write to.

  • "collection": (Required) The Amazon DocumentDB collection to write to.

  • "username": (Required) The Amazon DocumentDB user name.

  • "password": (Required) The Amazon DocumentDB password.

  • "extendedBsonTypes": (Optional) If true, allows extended BSON types when writing data to Amazon DocumentDB. The default is true.

  • "replaceDocument": (Optional) If true, replaces the whole document when saving datasets that contain an _id field. If false, only fields in the document that match the fields in the dataset are updated. The default is true.

  • "maxBatchSize": (Optional): The maximum batch size for bulk operations when saving data. The default is 512.

For sample code, see Examples: Setting Connection Types and Options.

"connectionType": "dynamodb"

Designates a connection to Amazon DynamoDB.

Connection options differ for a source connection and a sink connection.

"connectionType": "dynamodb" with the ETL connector as Source

Use the following connection options with "connectionType": "dynamodb" as a source, when using the AWS Glue DynamoDB ETL connector:

  • "dynamodb.input.tableName": (Required) The DynamoDB table to read from.

  • "dynamodb.throughput.read.percent": (Optional) The percentage of read capacity units (RCU) to use. The default is set to "0.5". Acceptable values are from "0.1" to "1.5", inclusive.

    • 0.5 represents the default read rate, meaning that AWS Glue will attempt to consume half of the read capacity of the table. If you increase the value above 0.5, AWS Glue increases the request rate; decreasing the value below 0.5 decreases the read request rate. (The actual read rate will vary, depending on factors such as whether there is a uniform key distribution in the DynamoDB table.)

    • When the DynamoDB table is in on-demand mode, AWS Glue handles the read capacity of the table as 40000. For exporting a large table, we recommend switching your DynamoDB table to on-demand mode.

  • "dynamodb.splits": (Optional) Defines how many splits we partition this DynamoDB table into while reading. The default is set to "1". Acceptable values are from "1" to "1,000,000", inclusive.

    • 1 represents there is no parallelism. We highly recommend that you specify a larger value for better performance by using the below formula.

    • We recommend you to calculate numSlots using the following formula, and use it as dynamodb.splits. If you need more performance, we recommend you to scale out your job by increasing the number of DPUs.

      • numExecutors =

        • (DPU - 1) * 2 - 1 if WorkerType is Standard

        • (NumberOfWorkers - 1) if WorkerType is G.1X or G.2X

      • numSlotsPerExecutor =

        • 4 if WorkerType is Standard

        • 8 if WorkerType is G.1X

        • 16 if WorkerType is G.2X

      • numSlots = numSlotsPerExecutor * numExecutors

  • "dynamodb.sts.roleArn": (Optional) The IAM role ARN to be assumed for cross-account access. This parameter is available in AWS Glue 1.0 or later.

  • "dynamodb.sts.roleSessionName": (Optional) STS session name. The default is set to "glue-dynamodb-read-sts-session". This parameter is available in AWS Glue 1.0 or later.

The following code examples show how to read from (via the ETL connector) and write to DynamoDB tables. They demonstrate reading from one table and writing to another table.

Python
import sys from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job from awsglue.utils import getResolvedOptions args = getResolvedOptions(sys.argv, ["JOB_NAME"]) glue_context= GlueContext(SparkContext.getOrCreate()) job = Job(glue_context) job.init(args["JOB_NAME"], args) dyf = glue_context.create_dynamic_frame.from_options( connection_type="dynamodb", connection_options={"dynamodb.input.tableName": "test_source", "dynamodb.throughput.read.percent": "1.0", "dynamodb.splits": "100" } ) print(dyf.getNumPartitions()) glue_context.write_dynamic_frame_from_options( frame=dyf, connection_type="dynamodb", connection_options={"dynamodb.output.tableName": "test_sink", "dynamodb.throughput.write.percent": "1.0" } ) job.commit()
Scala
import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.DynamoDbDataSink import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object GlueApp { def main(sysArgs: Array[String]): Unit = { val glueContext = new GlueContext(SparkContext.getOrCreate()) val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) val dynamicFrame = glueContext.getSourceWithFormat( connectionType = "dynamodb", options = JsonOptions(Map( "dynamodb.input.tableName" -> "test_source", "dynamodb.throughput.read.percent" -> "1.0", "dynamodb.splits" -> "100" )) ).getDynamicFrame() print(dynamicFrame.getNumPartitions()) val dynamoDbSink: DynamoDbDataSink = glueContext.getSinkWithFormat( connectionType = "dynamodb", options = JsonOptions(Map( "dynamodb.output.tableName" -> "test_sink", "dynamodb.throughput.write.percent" -> "1.0" )) ).asInstanceOf[DynamoDbDataSink] dynamoDbSink.writeDynamicFrame(dynamicFrame) Job.commit() } }
Note

AWS Glue supports reading data from another AWS account's DynamoDB table. For more information, see Cross-Account Cross-Region Access to DynamoDB Tables.

Note

The DynamoDB ETL reader does not support filters or pushdown predicates.

"connectionType": "dynamodb" with the AWS Glue DynamoDB export connector as Source

In addition to the AWS Glue DynamoDB ETL connector, AWS Glue offers a DynamoDB export connector, that invokes a DynamoDB ExportTableToPointInTime request and stores it in an Amazon S3 location you supply, in the format of DynamoDB JSON. AWS Glue then creates a DynamicFrame object by reading the data from the Amazon S3 export location.

The export connector performs better than the ETL connector when the DynamoDB table size is larger than 80 GB. In addition, given that the export request is conducted outside from the Spark processes in an AWS Glue job, you can enable auto scaling of AWS Glue jobs to save DPU usage during the export request. With the export connector, you also do not need to configure the number of splits for Spark executor parallelism or DynamoDB throughput read percentage.

Use the following connection options with "connectionType": "dynamodb" as a source, when using the AWS Glue DynamoDB export connector, which is available only for AWS Glue version 2.0 onwards:

  • "dynamodb.export": (Required) A string value:

    • If set to ddb enables the AWS Glue DynamoDB export connector where a new ExportTableToPointInTimeRequest will be invoked during the AWS Glue job. A new export will be generated with the location passed from dynamodb.s3.bucket and dynamodb.s3.prefix.

    • If set to s3 enables the AWS Glue DynamoDB export connector but skips the creation of a new DynamoDB export and instead uses the dynamodb.s3.bucket and dynamodb.s3.prefix as the Amazon S3 location of a past export of that table.

  • "dynamodb.tableArn": (Required) The DynamoDB table to read from.

  • "dynamodb.unnestDDBJson": (Optional) Takes a boolean value. If set to true, performs an unnest transformation of the DynamoDB JSON structure that is present in exports. The default value is set to false.

  • "dynamodb.s3.bucket": (Optional) Indicates the Amazon S3 bucket location in which the DynamoDB ExportTableToPointInTime process is to be conducted. The file format for the export is DynamoDB JSON.

    • "dynamodb.s3.prefix": (Optional) Indicates the Amazon S3 prefix location inside the Amazon S3 bucket in which the DynamoDB ExportTableToPointInTime loads are to be stored. If neither dynamodb.s3.prefix nor dynamodb.s3.bucket are specified, these values will default to the Temporary Directory location specified in the AWS Glue job configuration. For more information, see Special Parameters Used by AWS Glue.

    • "dynamodb.s3.bucketOwner": Indicates the bucket owner needed for cross-account Amazon S3 access.

  • "dynamodb.sts.roleArn": (Optional) The IAM role ARN to be assumed for cross-account access and/or cross-region access for the DynamoDB table. Note: The same IAM role ARN will be used to access the Amazon S3 location specified for the ExportTableToPointInTime request.

  • "dynamodb.sts.roleSessionName": (Optional) STS session name. The default is set to "glue-dynamodb-read-sts-session".

Note

DynamoDB has specific requirements to invoke the ExportTableToPointInTime requests. For more information, see Requesting a table export in DynamoDB. For example, Point-in-Time-Restore (PITR) needs to be enabled on the table to use this connector. The DynamoDB connector also supports KMS encryption for DynamoDB exports to Amazon S3. Supplying your security configuration in the AWS Glue job configuration enables KMS encryption for a DynamoDB export. The KMS key must be in the same Region as the Amazon S3 bucket.

Note that additional charges for DynamoDB export and Amazon S3 storage costs apply. Exported data in Amazon S3 persists after a job run finishes so you can reuse it without additional DynamoDB exports. A requirement for using this connector is that Point-In-time Recovery (PITR) is enabled for the table.

The DynamoDB ETL connector or export connector do not support filters or pushdown predicates to be applied at the DynamoDB source.

The following code examples show how to read from (via the export connector) and print the number of partitions.

Python
import sys from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job from awsglue.utils import getResolvedOptions args = getResolvedOptions(sys.argv, ["JOB_NAME"]) glue_context= GlueContext(SparkContext.getOrCreate()) job = Job(glue_context) job.init(args["JOB_NAME"], args) dyf = glue_context.create_dynamic_frame.from_options( connection_type="dynamodb", connection_options={ "dynamodb.export": "ddb", "dynamodb.tableArn": "<test_source>", "dynamodb.s3.bucket": "<bucket name>", "dynamodb.s3.prefix": "<bucket prefix>", "dynamodb.s3.bucketOwner": "<account_id>", } ) print(dyf.getNumPartitions()) job.commit()
Scala
import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.DynamoDbDataSink import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object GlueApp { def main(sysArgs: Array[String]): Unit = { val glueContext = new GlueContext(SparkContext.getOrCreate()) val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) val dynamicFrame = glueContext.getSourceWithFormat( connectionType = "dynamodb", options = JsonOptions(Map( "dynamodb.export" -> "ddb", "dynamodb.tableArn" -> "<test_source>", "dynamodb.s3.bucket" -> "<bucket name>", "dynamodb.s3.prefix" -> "<bucket prefix>", "dynamodb.s3.bucketOwner" -> "<account_id of bucket>", )) ).getDynamicFrame() print(dynamicFrame.getNumPartitions()) Job.commit() } }

These examples show how to do the read from (via the export connector) and print the number of partitions from an AWS Glue Data Catalog table that has a dynamodb classification:

Python
import sys from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job from awsglue.utils import getResolvedOptions args = getResolvedOptions(sys.argv, ["JOB_NAME"]) glue_context= GlueContext(SparkContext.getOrCreate()) job = Job(glue_context) job.init(args["JOB_NAME"], args) dynamicFrame = glue_context.create_dynamic_frame.from_catalog( database="<catalog_database>", table_name="<catalog_table_name", additional_options={ "dynamodb.export": "ddb", "dynamodb.s3.bucket": "<s3_bucket>", "dynamodb.s3.prefix": "<s3_bucket_prefix>" } ) print(dynamicFrame.getNumPartitions()) job.commit()
Scala
import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.DynamoDbDataSink import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object GlueApp { def main(sysArgs: Array[String]): Unit = { val glueContext = new GlueContext(SparkContext.getOrCreate()) val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) val dynamicFrame = glueContext.getCatalogSource( database = "<catalog_database>", tableName = "<catalog_table_name", additionalOptions = JsonOptions(Map( "dynamodb.export" -> "ddb", "dynamodb.s3.bucket" -> "<s3_bucket>", "dynamodb.s3.prefix" -> "<s3_bucket_prefix>" )) ).getDynamicFrame() print(dynamicFrame.getNumPartitions()) )

Traversing the DynamoDB JSON structure

The DynamoDB exports with the AWS Glue DynamoDB export connector can result in JSON files of specific nested structures. For more information, see Data objects. AWS Glue supplies a DynamicFrame transformation, which can unnest such structures into an easier-to-use form for downstream applications.

The transform can be invoked in one of two ways, the first being a boolean flag which is passed with the AWS Glue DynamoDB export connector. The second, by calling the transform function itself.

The following code examples show how to use the AWS Glue DynamoDB export connector, invoke an unnest, and print the number of partitions:

Python
import sys from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job from awsglue.utils import getResolvedOptions args = getResolvedOptions(sys.argv, ["JOB_NAME"]) glue_context= GlueContext(SparkContext.getOrCreate()) job = Job(glue_context) job.init(args["JOB_NAME"], args) dyf = glue_context.create_dynamic_frame.from_options( connection_type="dynamodb", connection_options={ "dynamodb.export": "ddb", "dynamodb.tableArn": "<test_source>", "dynamodb.unnestDDBJson": True, "dynamodb.s3.bucket": "<bucket name>", "dynamodb.s3.prefix": "<bucket prefix>", "dynamodb.s3.bucketOwner": "<account_id>", } ) print(dyf.getNumPartitions()) job.commit()
Scala
import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.DynamoDbDataSink import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object GlueApp { def main(sysArgs: Array[String]): Unit = { val glueContext = new GlueContext(SparkContext.getOrCreate()) val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) val dynamicFrame = glueContext.getSourceWithFormat( connectionType = "dynamodb", options = JsonOptions(Map( "dynamodb.export" -> "ddb", "dynamodb.tableArn" -> "<test_source>", "dynamodb.unnestDDBJson" -> true "dynamodb.s3.bucket" -> "<bucket name>", "dynamodb.s3.prefix" -> "<bucket prefix>", "dynamodb.s3.bucketOwner" -> "<account_id of bucket>", )) ).getDynamicFrame() print(dynamicFrame.getNumPartitions()) Job.commit() } }

The other invocation of the transform is through a separate DynamicFrame function call. For more information, see DynamicFrame Class for Python and AWS Glue Scala DynamicFrame Class for Scala.

"connectionType": "dynamodb" with the ETL connector as Sink

Use the following connection options with "connectionType": "dynamodb" as a sink:

  • "dynamodb.output.tableName": (Required) The DynamoDB table to write to.

  • "dynamodb.throughput.write.percent": (Optional) The percentage of write capacity units (WCU) to use. The default is set to "0.5". Acceptable values are from "0.1" to "1.5", inclusive.

    • 0.5 represents the default write rate, meaning that AWS Glue will attempt to consume half of the write capacity of the table. If you increase the value above 0.5, AWS Glue increases the request rate; decreasing the value below 0.5 decreases the write request rate. (The actual write rate will vary, depending on factors such as whether there is a uniform key distribution in the DynamoDB table).

    • When the DynamoDB table is in on-demand mode, AWS Glue handles the write capacity of the table as 40000. For importing a large table, we recommend switching your DynamoDB table to on-demand mode.

  • "dynamodb.output.numParallelTasks": (Optional) Defines how many parallel tasks write into DynamoDB at the same time. Used to calculate permissive WCU per Spark task. If you do not want to control these details, you do not need to specify this parameter.

    • permissiveWcuPerTask = TableWCU * dynamodb.throughput.write.percent / dynamodb.output.numParallelTasks

    • If you do not specify this parameter, the permissive WCU per Spark task will be automatically calculated by the following formula:

      • numPartitions = dynamicframe.getNumPartitions()

      • numExecutors =

        • (DPU - 1) * 2 - 1 if WorkerType is Standard

        • (NumberOfWorkers - 1) if WorkerType is G.1X or G.2X

      • numSlotsPerExecutor =

        • 4 if WorkerType is Standard

        • 8 if WorkerType is G.1X

        • 16 if WorkerType is G.2X

      • numSlots = numSlotsPerExecutor * numExecutors

      • numParallelTasks = min(numPartitions, numSlots)

    • Example 1. DPU=10, WorkerType=Standard. Input DynamicFrame has 100 RDD partitions.

      • numPartitions = 100

      • numExecutors = (10 - 1) * 2 - 1 = 17

      • numSlots = 4 * 17 = 68

      • numParallelTasks = min(100, 68) = 68

    • Example 2. DPU=10, WorkerType=Standard. Input DynamicFrame has 20 RDD partitions.

      • numPartitions = 20

      • numExecutors = (10 - 1) * 2 - 1 = 17

      • numSlots = 4 * 17 = 68

      • numParallelTasks = min(20, 68) = 20

  • "dynamodb.output.retry": (Optional) Defines how many retries we perform when there is a ProvisionedThroughputExceededException from DynamoDB. The default is set to "10".

  • "dynamodb.sts.roleArn": (Optional) The IAM role ARN to be assumed for cross-account access.

  • "dynamodb.sts.roleSessionName": (Optional) STS session name. The default is set to "glue-dynamodb-write-sts-session".

Note

The DynamoDB writer is supported in AWS Glue version 1.0 or later.

Note

AWS Glue supports writing data into another AWS account's DynamoDB table. For more information, see Cross-Account Cross-Region Access to DynamoDB Tables.

The following code examples show how to read from and write to DynamoDB tables. They demonstrate reading from one table and writing to another table.

Python
import sys from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job from awsglue.utils import getResolvedOptions args = getResolvedOptions(sys.argv, ["JOB_NAME"]) glue_context= GlueContext(SparkContext.getOrCreate()) job = Job(glue_context) job.init(args["JOB_NAME"], args) dyf = glue_context.create_dynamic_frame.from_options( connection_type="dynamodb", connection_options={ "dynamodb.input.tableName": "test_source", "dynamodb.throughput.read.percent": "1.0", "dynamodb.splits": "100" } ) print(dyf.getNumPartitions()) glue_context.write_dynamic_frame_from_options( frame=dyf, connection_type="dynamodb", connection_options={ "dynamodb.output.tableName": "test_sink", "dynamodb.throughput.write.percent": "1.0" } ) job.commit()
Scala
import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.DynamoDbDataSink import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object GlueApp { def main(sysArgs: Array[String]): Unit = { val glueContext = new GlueContext(SparkContext.getOrCreate()) val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) val dynamicFrame = glueContext.getSourceWithFormat( connectionType = "dynamodb", options = JsonOptions(Map( "dynamodb.input.tableName" -> "test_source", "dynamodb.throughput.read.percent" -> "1.0", "dynamodb.splits" -> "100" )) ).getDynamicFrame() print(dynamicFrame.getNumPartitions()) val dynamoDbSink: DynamoDbDataSink = glueContext.getSinkWithFormat( connectionType = "dynamodb", options = JsonOptions(Map( "dynamodb.output.tableName" -> "test_sink", "dynamodb.throughput.write.percent" -> "1.0" )) ).asInstanceOf[DynamoDbDataSink] dynamoDbSink.writeDynamicFrame(dynamicFrame) Job.commit() } }

"connectionType": "kafka"

Designates a connection to a Kafka cluster or an Amazon Managed Streaming for Apache Kafka cluster.

You can use the following methods under the GlueContext object to consume records from a Kafka streaming source:

  • getCatalogSource

  • getSource

  • getSourceWithFormat

  • createDataFrameFromOptions

If you use getCatalogSource, then the job has the Data Catalog database and table name information, and can use that to obtain some basic parameters for reading from the Apache Kafka stream. If you use getSource, getSourceWithFormat, or createDataFrameFromOptions you must explicitly specify these parameters:

You can specify these options using connectionOptions with getSource or createDataFrameFromOptions, options with getSourceWithFormat, or additionalOptions with getCatalogSource.

Use the following connection options with "connectionType": "kafka":

  • bootstrap.servers (Required) A list of bootstrap server URLs, for example, as b-1.vpc-test-2.o4q88o.c6.kafka.us-east-1.amazonaws.com:9094. This option must be specified in the API call or defined in the table metadata in the Data Catalog.

  • security.protocol (Required) The protocol used to communicate with brokers. The possible values are "SSL" or "PLAINTEXT".

  • topicName (Required) A comma-separated list of topics to subscribe to. You must specify one and only one of "topicName", "assign" or "subscribePattern".

  • "assign": (Required) A JSON string specifying the specific TopicPartitions to consume. You must specify one and only one of "topicName", "assign" or "subscribePattern".

    Example: '{"topicA":[0,1],"topicB":[2,4]}'

  • "subscribePattern": (Required) A Java regex string that identifies the topic list to subscribe to. You must specify one and only one of "topicName", "assign" or "subscribePattern".

    Example: 'topic.*'

  • classification (Optional)

  • delimiter (Optional)

  • "startingOffsets": (Optional) The starting position in the Kafka topic to read data from. The possible values are "earliest" or "latest". The default value is "latest".

  • "endingOffsets": (Optional) The end point when a batch query is ended. Possible values are either "latest" or a JSON string that specifies an ending offset for each TopicPartition.

    For the JSON string, the format is {"topicA":{"0":23,"1":-1},"topicB":{"0":-1}}. The value -1 as an offset represents "latest".

  • "pollTimeoutMs": (Optional) The timeout in milliseconds to poll data from Kafka in Spark job executors. The default value is 512.

  • "numRetries": (Optional) The number of times to retry before failing to fetch Kafka offsets. The default value is 3.

  • "retryIntervalMs": (Optional) The time in milliseconds to wait before retrying to fetch Kafka offsets. The default value is 10.

  • "maxOffsetsPerTrigger": (Optional) The rate limit on the maximum number of offsets that are processed per trigger interval. The specified total number of offsets is proportionally split across topicPartitions of different volumes. The default value is null, which means that the consumer reads all offsets until the known latest offset.

  • "minPartitions": (Optional) The desired minimum number of partitions to read from Kafka. The default value is null, which means that the number of spark partitions is equal to the number of Kafka partitions.

  • "includeHeaders": (Optional) Whether to include the Kafka headers. When the option is set to "true", the data output will contain an additional column named "glue_streaming_kafka_headers" with type Array[Struct(key: String, value: String)]. The default value is "false". This option is available in AWS Glue version 3.0 only.

  • "schema": (Required when inferSchema set to false) The schema to use to process the payload. If classification is avro the provided schema must be in the Avro schema format. If the classification is not avro the provided schema must be in the DDL schema format.

    The following are schema examples.

    Example in DDL schema format
    'column1' INT, 'column2' STRING , 'column3' FLOAT
    Example in Avro schema format
    { "type":"array", "items": { "type":"record", "name":"test", "fields": [ { "name":"_id", "type":"string" }, { "name":"index", "type": [ "int", "string", "float" ] } ] } }
  • "inferSchema": (Optional) The default value is 'false'. If set to 'true', the schema will be detected at runtime from the payload within foreachbatch.

  • "avroSchema": (Deprecated) Parameter used to specify a schema of Avro data when Avro format is used. This parameter is now deprecated. Use the schema parameter.

"connectionType": "kinesis"

Designates connection options for Amazon Kinesis Data Streams.

You can read from an Amazon Kinesis data stream using information stored in a Data Catalog table, or by providing information to directly access the data stream. If you directly access the data stream, use these options to provide the information about how to access the data stream.

If you use getCatalogSource or create_data_frame_from_catalog to consume records from a Kinesis streaming source, the job has the Data Catalog database and table name information, and can use that to obtain some basic parameters for reading from the Kinesis streaming source. If you use getSource, getSourceWithFormat, createDataFrameFromOptions or create_data_frame_from_options, you must specify these basic parameters using the connection options described here.

You can specify the connection options for Kinesis using the following arguments for the specified methods in the GlueContext class.

  • Scala

    • connectionOptions: Use with getSource, createDataFrameFromOptions

    • additionalOptions: Use with getCatalogSource

    • options: Use with getSourceWithFormat

  • Python

    • connection_options: Use with create_data_frame_from_options

    • additional_options: Use with create_data_frame_from_catalog

    • options: Use with getSource

Use the following connection options for Kinesis streaming data sources:

  • streamARN (Required) The ARN of the Kinesis data stream.

  • classification (Optional)

  • delimiter (Optional)

  • "startingPosition": (Optional) The starting position in the Kinesis data stream to read data from. The possible values are "latest", "trim_horizon", or "earliest". The default value is "latest".

  • "awsSTSRoleARN": (Optional) The Amazon Resource Name (ARN) of the role to assume using AWS Security Token Service (AWS STS). This role must have permissions for describe or read record operations for the Kinesis data stream. You must use this parameter when accessing a data stream in a different account. Used in conjunction with "awsSTSSessionName".

  • "awsSTSSessionName": (Optional) An identifier for the session assuming the role using AWS STS. You must use this parameter when accessing a data stream in a different account. Used in conjunction with "awsSTSRoleARN".

  • "maxFetchTimeInMs": (Optional) The maximum time spent in the job executor to fetch a record from the Kinesis data stream per shard, specified in milliseconds (ms). The default value is 1000.

  • "maxFetchRecordsPerShard": (Optional) The maximum number of records to fetch per shard in the Kinesis data stream. The default value is 100000.

  • "maxRecordPerRead": (Optional) The maximum number of records to fetch from the Kinesis data stream in each getRecords operation. The default value is 10000.

  • "addIdleTimeBetweenReads": (Optional) Adds a time delay between two consecutive getRecords operations. The default value is "False". This option is only configurable for Glue version 2.0 and above.

  • "idleTimeBetweenReadsInMs": (Optional) The minimum time delay between two consecutive getRecords operations, specified in ms. The default value is 1000. This option is only configurable for Glue version 2.0 and above.

  • "describeShardInterval": (Optional) The minimum time interval between two ListShards API calls for your script to consider resharding. For more information, see Strategies for Resharding in Amazon Kinesis Data Streams Developer Guide. The default value is 1s.

  • "numRetries": (Optional) The maximum number of retries for Kinesis Data Streams API requests. The default value is 3.

  • "retryIntervalMs": (Optional) The cool-off time period (specified in ms) before retrying the Kinesis Data Streams API call. The default value is 1000.

  • "maxRetryIntervalMs": (Optional) The maximum cool-off time period (specified in ms) between two retries of a Kinesis Data Streams API call. The default value is 10000.

  • "avoidEmptyBatches": (Optional) Avoids creating an empty microbatch job by checking for unread data in the Kinesis data stream before the batch is started. The default value is "False".

  • "schema": (Required when inferSchema set to false) The schema to use to process the payload. If classification is avro the provided schema must be in the Avro schema format. If the classification is not avro the provided schema must be in the DDL schema format.

    The following are schema examples.

    Example in DDL schema format
    'column1' INT, 'column2' STRING , 'column3' FLOAT
    Example in Avro schema format
    { "type":"array", "items": { "type":"record", "name":"test", "fields": [ { "name":"_id", "type":"string" }, { "name":"index", "type": [ "int", "string", "float" ] } ] } }
  • "inferSchema": (Optional) The default value is 'false'. If set to 'true', the schema will be detected at runtime from the payload within foreachbatch.

  • "avroSchema": (Deprecated) Parameter used to specify a schema of Avro data when Avro format is used. This parameter is now deprecated. Use the schema parameter.

"connectionType": "mongodb"

Designates a connection to MongoDB. Connection options differ for a source connection and a sink connection.

"connectionType": "mongodb" as Source

Use the following connection options with "connectionType": "mongodb" as a source:

  • "uri": (Required) The MongoDB host to read from, formatted as mongodb://<host>:<port>.

  • "database": (Required) The MongoDB database to read from. This option can also be passed in additional_options when calling glue_context.create_dynamic_frame_from_catalog in your job script.

  • "collection": (Required) The MongoDB collection to read from. This option can also be passed in additional_options when calling glue_context.create_dynamic_frame_from_catalog in your job script.

  • "username": (Required) The MongoDB user name.

  • "password": (Required) The MongoDB password.

  • "ssl": (Optional) If true, initiates an SSL connection. The default is false.

  • "ssl.domain_match": (Optional) If true and ssl is true, domain match check is performed. The default is true.

  • "batchSize": (Optional): The number of documents to return per batch, used within the cursor of internal batches.

  • "partitioner": (Optional): The class name of the partitioner for reading input data from MongoDB. The connector provides the following partitioners:

    • MongoDefaultPartitioner (default)

    • MongoSamplePartitioner (Requires MongoDB 3.2 or later)

    • MongoShardedPartitioner

    • MongoSplitVectorPartitioner

    • MongoPaginateByCountPartitioner

    • MongoPaginateBySizePartitioner

  • "partitionerOptions" (Optional): Options for the designated partitioner. The following options are supported for each partitioner:

    • MongoSamplePartitioner: partitionKey, partitionSizeMB, samplesPerPartition

    • MongoShardedPartitioner: shardkey

    • MongoSplitVectorPartitioner: partitionKey, partitionSizeMB

    • MongoPaginateByCountPartitioner: partitionKey, numberOfPartitions

    • MongoPaginateBySizePartitioner: partitionKey, partitionSizeMB

    For more information about these options, see Partitioner Configuration in the MongoDB documentation. For sample code, see Examples: Setting Connection Types and Options.

"connectionType": "mongodb" as Sink

Use the following connection options with "connectionType": "mongodb" as a sink:

  • "uri": (Required) The MongoDB host to write to, formatted as mongodb://<host>:<port>.

  • "database": (Required) The MongoDB database to write to.

  • "collection": (Required) The MongoDB collection to write to.

  • "username": (Required) The MongoDB user name.

  • "password": (Required) The MongoDB password.

  • "ssl": (Optional) If true, initiates an SSL connection. The default is false.

  • "ssl.domain_match": (Optional) If true and ssl is true, domain match check is performed. The default is true.

  • "extendedBsonTypes": (Optional) If true, allows extended BSON types when writing data to MongoDB. The default is true.

  • "replaceDocument": (Optional) If true, replaces the whole document when saving datasets that contain an _id field. If false, only fields in the document that match the fields in the dataset are updated. The default is true.

  • "maxBatchSize": (Optional): The maximum batch size for bulk operations when saving data. The default is 512.

For sample code, see Examples: Setting Connection Types and Options.

"connectionType": "orc"

Designates a connection to files stored in Amazon S3 in the Apache Hive Optimized Row Columnar (ORC) file format.

Use the following connection options with "connectionType": "orc":

  • paths: (Required) A list of the Amazon S3 paths to read from.

  • (Other option name/value pairs): Any additional options, including formatting options, are passed directly to the SparkSQL DataSource. For more information, see Redshift data source for Spark.

"connectionType": "parquet"

Designates a connection to files stored in Amazon S3 in the Apache Parquet file format.

Use the following connection options with "connectionType": "parquet":

  • paths: (Required) A list of the Amazon S3 paths to read from.

  • (Other option name/value pairs): Any additional options, including formatting options, are passed directly to the SparkSQL DataSource. For more information, see Amazon Redshift data source for Spark on the GitHub website.

"connectionType": "s3"

Designates a connection to Amazon S3.

Use the following connection options with "connectionType": "s3":

  • "paths": (Required) A list of the Amazon S3 paths to read from.

  • "exclusions": (Optional) A string containing a JSON list of Unix-style glob patterns to exclude. For example, "[\"**.pdf\"]" excludes all PDF files. For more information about the glob syntax that AWS Glue supports, see Include and Exclude Patterns.

  • "compressionType": or "compression": (Optional) Specifies how the data is compressed. Use "compressionType" for Amazon S3 sources and "compression" for Amazon S3 targets. This is generally not necessary if the data has a standard file extension. Possible values are "gzip" and "bzip").

  • "groupFiles": (Optional) Grouping files is turned on by default when the input contains more than 50,000 files. To turn on grouping with fewer than 50,000 files, set this parameter to "inPartition". To disable grouping when there are more than 50,000 files, set this parameter to "none".

  • "groupSize": (Optional) The target group size in bytes. The default is computed based on the input data size and the size of your cluster. When there are fewer than 50,000 input files, "groupFiles" must be set to "inPartition" for this to take effect.

  • "recurse": (Optional) If set to true, recursively reads files in all subdirectories under the specified paths.

  • "maxBand": (Optional, advanced) This option controls the duration in milliseconds after which the s3 listing is likely to be consistent. Files with modification timestamps falling within the last maxBand milliseconds are tracked specially when using JobBookmarks to account for Amazon S3 eventual consistency. Most users don't need to set this option. The default is 900000 milliseconds, or 15 minutes.

  • "maxFilesInBand": (Optional, advanced) This option specifies the maximum number of files to save from the last maxBand seconds. If this number is exceeded, extra files are skipped and only processed in the next job run. Most users don't need to set this option.

  • "isFailFast": (Optional) This option determines if an AWS Glue ETL job throws reader parsing exceptions. If set to true, jobs fail fast if four retries of the Spark task fail to parse the data correctly.

JDBC connectionType Values

These include the following:

  • "connectionType": "sqlserver": Designates a connection to a Microsoft SQL Server database.

  • "connectionType": "mysql": Designates a connection to a MySQL database.

  • "connectionType": "oracle": Designates a connection to an Oracle database.

  • "connectionType": "postgresql": Designates a connection to a PostgreSQL database.

  • "connectionType": "redshift": Designates a connection to an Amazon Redshift database.

The following table lists the JDBC driver versions that AWS Glue supports.

Product JDBC driver versions for Glue 0.9, 1.0, 2.0 JDBC driver versions for Glue 3.0
Microsoft SQL Server 6.x 7.x
MySQL 5.1 8.0.23
Oracle Database 11.2 21.1
PostgreSQL 42.1.x 42.2.18
MongoDB 2.0.0 4.0.0
Amazon Redshift 4.1 4.1

Use these connection options with JDBC connections:

  • "url": (Required) The JDBC URL for the database.

  • "dbtable": The database table to read from. For JDBC data stores that support schemas within a database, specify schema.table-name. If a schema is not provided, then the default "public" schema is used.

  • "redshiftTmpDir": (Required for Amazon Redshift, optional for other JDBC types) The Amazon S3 path where temporary data can be staged when copying out of the database.

  • "user": (Required) The user name to use when connecting.

  • "password": (Required) The password to use when connecting.

  • (Optional) The following options allow you to supply a custom JDBC driver. Use these options if you must use a driver that AWS Glue does not natively support. ETL jobs can use different JDBC driver versions for the data source and target, even if the source and target are the same database product. This allows you to migrate data between source and target databases with different versions. To use these options, you must first upload the jar file of the JDBC driver to Amazon S3.

    • "customJdbcDriverS3Path": Amazon S3 path of the custom JDBC driver.

    • "customJdbcDriverClassName": Class name of JDBC driver.

  • "bulksize": (Optional) Used to configure parallel inserts for speeding up bulk loads into JDBC targets. Specify an integer value for the degree of parallelism to use when writing or inserting data. This option is helpful for improving the performance of writes into databases such as Arch User Repository (AUR).

  • "sampleQuery": (Optional) The custom SQL query statement for sampling. By default the sample query is executed by single executor. If you're reading a large dataset, you may need to enable JDBC partitioning to query a table in parallel. For more information, see Reading from JDBC Tables in Parallel. To use sampleQuery with JDBC partitioning, also set enablePartitioningForSampleQuery to true.

  • "enablePartitioningForSampleQuery": (Optional) By default this option is false. Required if you want to use sampleQuery with a partitioned JDBC table. If set to true, sampleQuery must end with "where" or "and" for AWS Glue to append partitioning conditions. See the example below.

  • "sampleSize": (Optional) Limit the number of rows returned by the sample query. Works only when enablePartitioningForSampleQuery is true. If partitioning is not enabled, you should instead directly add "limit x" in the sampleQuery to limit the size.

    Example Use sampleQuery without partitioning

    The following code example shows how to use sampleQuery without partitioning.

    //A full sql query statement. val query = "select name from $tableName where age > 0 limit 1" val connectionOptions = JsonOptions(Map( "url" → url, "dbtable" → table, "user" → user, "password" → password, "basePath" → basePath, "sampleQuery" → query )) val dyf = glueContext.getSource("mysql", connectionOptions) .getDynamicFrame()

    Example Use sampleQuery with JDBC partitioning

    The following code example shows how to use sampleQuery with JDBC partitioning.

    //note that the query should end with "where" or "and" if use with JDBC partitioning. val query = "select name from $tableName where age > 0 and" //Enable JDBC partitioning by setting hashfield. //to use sampleQuery with partitioning, set enablePartitioningForSampleQuery. //use sampleSize to limit the size of returned data. val connectionOptions = JsonOptions(Map( "url" → url, "dbtable" → table, "user" → user, "password" → password, "basePath" → basePath, "hashfield" -> primaryKey, "sampleQuery" → query, "enablePartitioningForSampleQuery" -> true, "sampleSize" -> "1" )) val dyf = glueContext.getSource("mysql", connectionOptions) .getDynamicFrame()

For the Redshift connection type, all other option name/value pairs that are included in connection options for a JDBC connection, including formatting options, are passed directly to the underlying SparkSQL DataSource. For more information, see Redshift data source for Spark.

The following code examples show how to read from and write to JDBC databases with custom JDBC drivers. They demonstrate reading from one version of a database product and writing to a later version of the same product.

Python
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext, SparkConf from awsglue.context import GlueContext from awsglue.job import Job import time from pyspark.sql.types import StructType, StructField, IntegerType, StringType sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session # Construct JDBC connection options connection_mysql5_options = { "url": "jdbc:mysql://<jdbc-host-name>:3306/db", "dbtable": "test", "user": "admin", "password": "pwd"} connection_mysql8_options = { "url": "jdbc:mysql://<jdbc-host-name>:3306/db", "dbtable": "test", "user": "admin", "password": "pwd", "customJdbcDriverS3Path": "s3://path/mysql-connector-java-8.0.17.jar", "customJdbcDriverClassName": "com.mysql.cj.jdbc.Driver"} connection_oracle11_options = { "url": "jdbc:oracle:thin:@//<jdbc-host-name>:1521/ORCL", "dbtable": "test", "user": "admin", "password": "pwd"} connection_oracle18_options = { "url": "jdbc:oracle:thin:@//<jdbc-host-name>:1521/ORCL", "dbtable": "test", "user": "admin", "password": "pwd", "customJdbcDriverS3Path": "s3://path/ojdbc10.jar", "customJdbcDriverClassName": "oracle.jdbc.OracleDriver"} # Read from JDBC databases with custom driver df_mysql8 = glueContext.create_dynamic_frame.from_options(connection_type="mysql", connection_options=connection_mysql8_options) # Read DynamicFrame from MySQL 5 and write to MySQL 8 df_mysql5 = glueContext.create_dynamic_frame.from_options(connection_type="mysql", connection_options=connection_mysql5_options) glueContext.write_from_options(frame_or_dfc=df_mysql5, connection_type="mysql", connection_options=connection_mysql8_options) # Read DynamicFrame from Oracle 11 and write to Oracle 18 df_oracle11 = glueContext.create_dynamic_frame.from_options(connection_type="oracle", connection_options=connection_oracle11_options) glueContext.write_from_options(frame_or_dfc=df_oracle11, connection_type="oracle", connection_options=connection_oracle18_options)
Scala
import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.MappingSpec import com.amazonaws.services.glue.errors.CallSite import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.DynamicFrame import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object GlueApp { val MYSQL_5_URI: String = "jdbc:mysql://<jdbc-host-name>:3306/db" val MYSQL_8_URI: String = "jdbc:mysql://<jdbc-host-name>:3306/db" val ORACLE_11_URI: String = "jdbc:oracle:thin:@//<jdbc-host-name>:1521/ORCL" val ORACLE_18_URI: String = "jdbc:oracle:thin:@//<jdbc-host-name>:1521/ORCL" // Construct JDBC connection options lazy val mysql5JsonOption = jsonOptions(MYSQL_5_URI) lazy val mysql8JsonOption = customJDBCDriverJsonOptions(MYSQL_8_URI, "s3://path/mysql-connector-java-8.0.17.jar", "com.mysql.cj.jdbc.Driver") lazy val oracle11JsonOption = jsonOptions(ORACLE_11_URI) lazy val oracle18JsonOption = customJDBCDriverJsonOptions(ORACLE_18_URI, "s3://path/ojdbc10.jar", "oracle.jdbc.OracleDriver") def main(sysArgs: Array[String]): Unit = { val spark: SparkContext = new SparkContext() val glueContext: GlueContext = new GlueContext(spark) val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) // Read from JDBC database with custom driver val df_mysql8: DynamicFrame = glueContext.getSource("mysql", mysql8JsonOption).getDynamicFrame() // Read DynamicFrame from MySQL 5 and write to MySQL 8 val df_mysql5: DynamicFrame = glueContext.getSource("mysql", mysql5JsonOption).getDynamicFrame() glueContext.getSink("mysql", mysql8JsonOption).writeDynamicFrame(df_mysql5) // Read DynamicFrame from Oracle 11 and write to Oracle 18 val df_oracle11: DynamicFrame = glueContext.getSource("oracle", oracle11JsonOption).getDynamicFrame() glueContext.getSink("oracle", oracle18JsonOption).writeDynamicFrame(df_oracle11) Job.commit() } private def jsonOptions(uri: String): JsonOptions = { new JsonOptions( s"""{"url": "${url}", |"dbtable":"test", |"user": "admin", |"password": "pwd"}""".stripMargin) } private def customJDBCDriverJsonOptions(uri: String, customJdbcDriverS3Path: String, customJdbcDriverClassName: String): JsonOptions = { new JsonOptions( s"""{"url": "${url}", |"dbtable":"test", |"user": "admin", |"password": "pwd", |"customJdbcDriverS3Path": "${customJdbcDriverS3Path}", |"customJdbcDriverClassName" : "${customJdbcDriverClassName}"}""".stripMargin) } }

Custom and AWS Marketplace connectionType values

These include the following:

  • "connectionType": "marketplace.athena": Designates a connection to an Amazon Athena data store. The connection uses a connector from AWS Marketplace.

  • "connectionType": "marketplace.spark": Designates a connection to an Apache Spark data store. The connection uses a connector from AWS Marketplace.

  • "connectionType": "marketplace.jdbc": Designates a connection to a JDBC data store. The connection uses a connector from AWS Marketplace.

  • "connectionType": "custom.athena": Designates a connection to an Amazon Athena data store. The connection uses a custom connector that you upload to AWS Glue Studio.

  • "connectionType": "custom.spark": Designates a connection to an Apache Spark data store. The connection uses a custom connector that you upload to AWS Glue Studio.

  • "connectionType": "custom.jdbc": Designates a connection to a JDBC data store. The connection uses a custom connector that you upload to AWS Glue Studio.

Connection options for type custom.jdbc or marketplace.jdbc

  • className – String, required, driver class name.

  • connectionName – String, required, name of the connection that is associated with the connector.

  • url – String, required, JDBC URL with placeholders (${}) which are used to build the connection to the data source. The placeholder ${secretKey} is replaced with the secret of the same name in AWS Secrets Manager. Refer to the data store documentation for more information about constructing the URL.

  • secretId or user/password – String, required, used to retrieve credentials for the URL.

  • dbTable or query – String, required, the table or SQL query to get the data from. You can specify either dbTable or query, but not both.

  • partitionColumn – String, optional, the name of an integer column that is used for partitioning. This option works only when it's included with lowerBound, upperBound, and numPartitions. This option works the same way as in the Spark SQL JDBC reader. For more information, see JDBC To Other Databases in the Apache Spark SQL, DataFrames and Datasets Guide.

    The lowerBound and upperBound values are used to decide the partition stride, not for filtering the rows in table. All rows in the table are partitioned and returned.

    Note

    When using a query instead of a table name, you should validate that the query works with the specified partitioning condition. For example:

    • If your query format is "SELECT col1 FROM table1", then test the query by appending a WHERE clause at the end of the query that uses the partition column.

    • If your query format is "SELECT col1 FROM table1 WHERE col2=val", then test the query by extending the WHERE clause with AND and an expression that uses the partition column.

  • lowerBound – Integer, optional, the minimum value of partitionColumn that is used to decide partition stride.

  • upperBound – Integer, optional, the maximum value of partitionColumn that is used to decide partition stride.

  • numPartitions – Integer, optional, the number of partitions. This value, along with lowerBound (inclusive) and upperBound (exclusive), form partition strides for generated WHERE clause expressions that are used to split the partitionColumn.

    Important

    Be careful with the number of partitions because too many partitions might cause problems on your external database systems.

  • filterPredicate – String, optional, extra condition clause to filter data from source. For example:

    BillingCity='Mountain View'

    When using a query instead of a table name, you should validate that the query works with the specified filterPredicate. For example:

    • If your query format is "SELECT col1 FROM table1", then test the query by appending a WHERE clause at the end of the query that uses the filter predicate.

    • If your query format is "SELECT col1 FROM table1 WHERE col2=val", then test the query by extending the WHERE clause with AND and an expression that uses the filter predicate.

  • dataTypeMapping – Dictionary, optional, custom data type mapping that builds a mapping from a JDBC data type to a Glue data type. For example, the option "dataTypeMapping":{"FLOAT":"STRING"} maps data fields of JDBC type FLOAT into the Java String type by calling the ResultSet.getString() method of the driver, and uses it to build Glue Record. The ResultSet object is implemented by each driver, so the behavior is specific to the driver you use. Refer to the documentation for your JDBC driver to understand how the driver performs the conversions.

  • The AWS Glue data type supported currently are:

    • DATE

    • STRING

    • TIMESTAMP

    • INT

    • FLOAT

    • LONG

    • BIGDECIMAL

    • BYTE

    • SHORT

    • DOUBLE

    The JDBC data types supported are Java8 java.sql.types.

    The default data type mappings (from JDBC to AWS Glue) are:

    • DATE -> DATE

    • VARCHAR -> STRING

    • CHAR -> STRING

    • LONGNVARCHAR -> STRING

    • TIMESTAMP -> TIMESTAMP

    • INTEGER -> INT

    • FLOAT -> FLOAT

    • REAL -> FLOAT

    • BIT -> BOOLEAN

    • BOOLEAN -> BOOLEAN

    • BIGINT -> LONG

    • DECIMAL -> BIGDECIMAL

    • NUMERIC -> BIGDECIMAL

    • TINYINT -> SHORT

    • SMALLINT -> SHORT

    • DOUBLE -> DOUBLE

    If you use a custom data type mapping with the option dataTypeMapping, then you can override a default data type mapping. Only the JDBC data types listed in the dataTypeMapping option are affected; the default mapping is used for all other JDBC data types. You can add mappings for additional JDBC data types if needed. If a JDBC data type is not included in either the default mapping or a custom mapping, then the data type converts to the AWS Glue STRING data type by default.

The following Python code example shows how to read from JDBC databases with AWS Marketplace JDBC drivers. It demonstrates reading from a database and writing to an S3 location.

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [connection_type = "marketplace.jdbc", connection_options = {"dataTypeMapping":{"INTEGER":"STRING"},"upperBound":"200","query":"select id, name, department from department where id < 200","numPartitions":"4", "partitionColumn":"id","lowerBound":"0","connectionName":"test-connection-jdbc"}, transformation_ctx = "DataSource0"] ## @return: DataSource0 ## @inputs: [] DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.jdbc", connection_options = {"dataTypeMapping":{"INTEGER":"STRING"}, "upperBound":"200","query":"select id, name, department from department where id < 200","numPartitions":"4","partitionColumn":"id","lowerBound":"0", "connectionName":"test-connection-jdbc"}, transformation_ctx = "DataSource0") ## @type: ApplyMapping ## @args: [mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0"] ## @return: Transform0 ## @inputs: [frame = DataSource0] Transform0 = ApplyMapping.apply(frame = DataSource0, mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0") ## @type: DataSink ## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0"] ## @return: DataSink0 ## @inputs: [frame = Transform0] DataSink0 = glueContext.write_dynamic_frame.from_options(frame = Transform0, connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0") job.commit()

Connection options for type custom.athena or marketplace.athena

  • className – String, required, driver class name. When you're using the Athena-CloudWatch connector, this parameter value is the prefix of the class Name (for example, "com.amazonaws.athena.connectors"). The Athena-CloudWatch connector is composed of two classes: a metadata handler and a record handler. If you supply the common prefix here, then the API loads the correct classes based on that prefix.

  • tableName – String, required, the name of the CloudWatch log stream to read. This code snippet uses the special view name all_log_streams, which means that the dynamic data frame returned will contain data from all log streams in the log group.

  • schemaName – String, required, the name of the CloudWatch log group to read from. For example, /aws-glue/jobs/output.

  • connectionName – String, required, name of the connection that is associated with the connector.

For additional options for this connector, see the Amazon Athena CloudWatch Connector README file on GitHub.

The following Python code example shows how to read from an Athena data store using an AWS Marketplace connector. It demonstrates reading from Athena and writing to an S3 location.

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [connection_type = "marketplace.athena", connection_options = {"tableName":"all_log_streams","schemaName":"/aws-glue/jobs/output", "connectionName":"test-connection-athena"}, transformation_ctx = "DataSource0"] ## @return: DataSource0 ## @inputs: [] DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.athena", connection_options = {"tableName":"all_log_streams",, "schemaName":"/aws-glue/jobs/output","connectionName": "test-connection-athena"}, transformation_ctx = "DataSource0") ## @type: ApplyMapping ## @args: [mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0"] ## @return: Transform0 ## @inputs: [frame = DataSource0] Transform0 = ApplyMapping.apply(frame = DataSource0, mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0") ## @type: DataSink ## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0"] ## @return: DataSink0 ## @inputs: [frame = Transform0] DataSink0 = glueContext.write_dynamic_frame.from_options(frame = Transform0, connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0") job.commit()

Connection options for type custom.spark or marketplace.spark

  • className – String, required, connector class name.

  • secretId – String, optional, used to retrieve credentials for the connector connection.

  • connectionName – String, required, name of the connection that is associated with the connector.

  • Other options depend on the data store. For example, OpenSearch configuration options start with the prefix es, as described in the Elasticsearch for Apache Hadoop documentation. Spark connections to Snowflake use options such as sfUser and sfPassword, as described in Using the Spark Connector in the Connecting to Snowflake guide.

The following Python code example shows how to read from an OpenSearch data store using a marketplace.spark connection.

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [connection_type = "marketplace.spark", connection_options = {"path":"test", "es.nodes.wan.only":"true","es.nodes":"https://<AWS endpoint>", "connectionName":"test-spark-es","es.port":"443"}, transformation_ctx = "DataSource0"] ## @return: DataSource0 ## @inputs: [] DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.spark", connection_options = {"path":"test","es.nodes.wan.only": "true","es.nodes":"https://<AWS endpoint>","connectionName": "test-spark-es","es.port":"443"}, transformation_ctx = "DataSource0") ## @type: DataSink ## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0"] ## @return: DataSink0 ## @inputs: [frame = DataSource0] DataSink0 = glueContext.write_dynamic_frame.from_options(frame = DataSource0, connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0") job.commit()