Connection types and options for ETL in AWS Glue
In AWS Glue, various PySpark and Scala methods and transforms specify the connection type using
a connectionType
parameter. They specify connection options using a
connectionOptions
or options
parameter.
The connectionType
parameter can take the values shown in the following table.
The associated connectionOptions
(or options
) parameter values for
each type are documented in the following sections. Except where otherwise noted, the parameters
apply when the connection is used as a source or sink.
For sample code that demonstrates setting and using connection options, see Examples: Setting connection types and options.
connectionType |
Connects to |
---|---|
custom.* | Spark, Athena, or JDBC data stores (see Custom and AWS Marketplace connectionType values |
documentdb | Amazon DocumentDB (with MongoDB compatibility) database |
dynamodb | Amazon DynamoDB database |
kafka |
Kafka |
kinesis | Amazon Kinesis Data Streams |
marketplace.* | Spark, Athena, or JDBC data stores (see Custom and AWS Marketplace connectionType values) |
mongodb | MongoDB |
mysql | MySQL |
oracle | Oracle |
orc | Files stored in Amazon Simple Storage Service (Amazon S3) in the Apache Hive
Optimized Row Columnar (ORC) |
parquet | Files stored in Amazon S3 in the Apache Parquet |
postgresql |
PostgreSQL |
redshift | Amazon Redshift |
s3 | Amazon S3 |
sqlserver | Microsoft SQL Server database (see JDBC connectionType values) |
"connectionType": "Documentdb"
Designates a connection to Amazon DocumentDB (with MongoDB compatibility).
Connection options differ for a source connection and a sink connection.
"connectionType": "Documentdb" as source
Use the following connection options with "connectionType": "documentdb"
as
a source:
-
"uri"
: (Required) The Amazon DocumentDB host to read from, formatted asmongodb://<host>:<port>
. -
"database"
: (Required) The Amazon DocumentDB database to read from. -
"collection"
: (Required) The Amazon DocumentDB collection to read from. -
"username"
: (Required) The Amazon DocumentDB user name. -
"password"
: (Required) The Amazon DocumentDB password. -
"ssl"
: (Required if using SSL) If your connection uses SSL, then you must include this option with the value"true"
. -
"ssl.domain_match"
: (Required if using SSL) If your connection uses SSL, then you must include this option with the value"false"
. -
"batchSize"
: (Optional): The number of documents to return per batch, used within the cursor of internal batches. -
"partitioner"
: (Optional): The class name of the partitioner for reading input data from Amazon DocumentDB. The connector provides the following partitioners:-
MongoDefaultPartitioner
(default) -
MongoSamplePartitioner
-
MongoShardedPartitioner
-
MongoSplitVectorPartitioner
-
MongoPaginateByCountPartitioner
-
MongoPaginateBySizePartitioner
-
-
"partitionerOptions"
(Optional): Options for the designated partitioner. The following options are supported for each partitioner:-
MongoSamplePartitioner
:partitionKey
,partitionSizeMB
,samplesPerPartition
-
MongoShardedPartitioner
:shardkey
-
MongoSplitVectorPartitioner
:partitionKey
, partitionSizeMB -
MongoPaginateByCountPartitioner
:partitionKey
,numberOfPartitions
-
MongoPaginateBySizePartitioner
:partitionKey
, partitionSizeMB
For more information about these options, see Partitioner Configuration
in the MongoDB documentation. For sample code, see Examples: Setting connection types and options. -
"connectionType": "Documentdb" as sink
Use the following connection options with "connectionType": "documentdb"
as
a sink:
-
"uri"
: (Required) The Amazon DocumentDB host to write to, formatted asmongodb://<host>:<port>
. -
"database"
: (Required) The Amazon DocumentDB database to write to. -
"collection"
: (Required) The Amazon DocumentDB collection to write to. -
"username"
: (Required) The Amazon DocumentDB user name. -
"password"
: (Required) The Amazon DocumentDB password. -
"extendedBsonTypes"
: (Optional) Iftrue
, allows extended BSON types when writing data to Amazon DocumentDB. The default istrue
. -
"replaceDocument"
: (Optional) Iftrue
, replaces the whole document when saving datasets that contain an_id
field. Iffalse
, only fields in the document that match the fields in the dataset are updated. The default istrue
. -
"maxBatchSize"
: (Optional): The maximum batch size for bulk operations when saving data. The default is 512.
For sample code, see Examples: Setting connection types and options.
"connectionType": "dynamodb"
Designates a connection to Amazon DynamoDB.
Connection options differ for a source connection and a sink connection.
"connectionType": "dynamodb" with the ETL connector as source
Use the following connection options with "connectionType": "dynamodb"
as a
source, when using the AWS Glue DynamoDB ETL connector:
-
"dynamodb.input.tableName"
: (Required) The DynamoDB table to read from. -
"dynamodb.throughput.read.percent"
: (Optional) The percentage of read capacity units (RCU) to use. The default is set to "0.5". Acceptable values are from "0.1" to "1.5", inclusive.-
0.5
represents the default read rate, meaning that AWS Glue will attempt to consume half of the read capacity of the table. If you increase the value above0.5
, AWS Glue increases the request rate; decreasing the value below0.5
decreases the read request rate. (The actual read rate will vary, depending on factors such as whether there is a uniform key distribution in the DynamoDB table.) -
When the DynamoDB table is in on-demand mode, AWS Glue handles the read capacity of the table as 40000. For exporting a large table, we recommend switching your DynamoDB table to on-demand mode.
-
-
"dynamodb.splits"
: (Optional) Defines how many splits we partition this DynamoDB table into while reading. The default is set to "1". Acceptable values are from "1" to "1,000,000", inclusive.-
1
represents there is no parallelism. We highly recommend that you specify a larger value for better performance by using the below formula. -
We recommend you to calculate
numSlots
using the following formula, and use it asdynamodb.splits
. If you need more performance, we recommend you to scale out your job by increasing the number of DPUs.The Number of workers (
NumberOfWorkers
) is set in job configuration. For more information, see Adding jobs in AWS Glue. The number of workers available to your job may adjust due to workload when you enable autoscaling. For context, one executor is reserved for the Spark driver; other executors are used to process data.-
numExecutors =
-
NumberOfWorkers - 1
ifWorkerType
isG.1X
orG.2X
-
MaximumCapacity * 2 - 1
ifWorkerType
isStandard
and AWS Glue Version is 2.0+.(MaximumCapacity - 1) * 2 - 1
ifWorkerType
isStandard
and AWS Glue Version is 1.0 or earlier.
-
-
numSlotsPerExecutor =
-
numSlots = numSlotsPerExecutor * numExecutors
-
-
-
"dynamodb.sts.roleArn"
: (Optional) The IAM role ARN to be assumed for cross-account access. This parameter is available in AWS Glue 1.0 or later. -
"dynamodb.sts.roleSessionName"
: (Optional) STS session name. The default is set to "glue-dynamodb-read-sts-session". This parameter is available in AWS Glue 1.0 or later.
The following code examples show how to read from (via the ETL connector) and write to DynamoDB tables. They demonstrate reading from one table and writing to another table.
AWS Glue supports reading data from another AWS account's DynamoDB table. For more information, see Cross-account cross-Region access to DynamoDB tables.
The DynamoDB ETL reader does not support filters or pushdown predicates.
"connectionType": "dynamodb" with the AWS Glue DynamoDB export connector as source
In addition to the AWS Glue DynamoDB ETL connector, AWS Glue offers a DynamoDB export connector, that invokes a DynamoDB ExportTableToPointInTime
request and stores it in an Amazon S3 location you supply, in the format of DynamoDB JSON. AWS Glue then creates a DynamicFrame object by reading the data from the Amazon S3 export location.
The export connector performs better than the ETL connector when the DynamoDB table size is larger than 80 GB. In addition, given that the export request is conducted outside from the Spark processes in an AWS Glue job, you can enable auto scaling of AWS Glue jobs to save DPU usage during the export request. With the export connector, you also do not need to configure the number of splits for Spark executor parallelism or DynamoDB throughput read percentage.
Use the following connection options with "connectionType": "dynamodb" as a source, when using the AWS Glue DynamoDB export connector, which is available only for AWS Glue version 2.0 onwards:
-
"dynamodb.export"
: (Required) A string value:If set to
ddb
enables the AWS Glue DynamoDB export connector where a newExportTableToPointInTimeRequest
will be invoked during the AWS Glue job. A new export will be generated with the location passed fromdynamodb.s3.bucket
anddynamodb.s3.prefix
.If set to
s3
enables the AWS Glue DynamoDB export connector but skips the creation of a new DynamoDB export and instead uses thedynamodb.s3.bucket
anddynamodb.s3.prefix
as the Amazon S3 location of a past export of that table.
-
"dynamodb.tableArn"
: (Required) The DynamoDB table to read from. -
"dynamodb.unnestDDBJson"
: (Optional) Takes a Boolean value. If set to true, performs an unnest transformation of the DynamoDB JSON structure that is present in exports. The default value is set to false. -
"dynamodb.s3.bucket"
: (Optional) Indicates the Amazon S3 bucket location in which the DynamoDBExportTableToPointInTime
process is to be conducted. The file format for the export is DynamoDB JSON.-
"dynamodb.s3.prefix"
: (Optional) Indicates the Amazon S3 prefix location inside the Amazon S3 bucket in which the DynamoDBExportTableToPointInTime
loads are to be stored. If neitherdynamodb.s3.prefix
nordynamodb.s3.bucket
are specified, these values will default to the Temporary Directory location specified in the AWS Glue job configuration. For more information, see Special Parameters Used by AWS Glue. -
"dynamodb.s3.bucketOwner"
: Indicates the bucket owner needed for cross-account Amazon S3 access.
-
-
"dynamodb.sts.roleArn"
: (Optional) The IAM role ARN to be assumed for cross-account access and/or cross-Region access for the DynamoDB table. Note: The same IAM role ARN will be used to access the Amazon S3 location specified for theExportTableToPointInTime
request. -
"dynamodb.sts.roleSessionName"
: (Optional) STS session name. The default is set to "glue-dynamodb-read-sts-session".
DynamoDB has specific requirements to invoke the ExportTableToPointInTime
requests. For more information, see Requesting a table export in DynamoDB. For example, Point-in-Time-Restore (PITR)
needs to be enabled on the table to use this connector. The DynamoDB connector also supports
AWS KMS encryption for DynamoDB exports to Amazon S3. Supplying your security configuration in the
AWS Glue job configuration enables AWS KMS encryption for a DynamoDB export. The KMS key must be
in the same Region as the Amazon S3 bucket.
Note that additional charges for DynamoDB export and Amazon S3 storage costs apply. Exported data in Amazon S3 persists after a job run finishes so you can reuse it without additional DynamoDB exports. A requirement for using this connector is that point-in-time recovery (PITR) is enabled for the table.
The DynamoDB ETL connector or export connector do not support filters or pushdown predicates to be applied at the DynamoDB source.
The following code examples show how to read from (via the export connector) and print the number of partitions.
These examples show how to do the read from (via the export connector) and print the number of partitions from an AWS Glue Data Catalog table that has a dynamodb
classification:
Traversing the DynamoDB JSON structure
The DynamoDB exports with the AWS Glue DynamoDB export connector can result in JSON files of specific nested structures. For more information, see Data objects. AWS Glue supplies a DynamicFrame transformation, which can unnest such structures into an easier-to-use form for downstream applications.
The transform can be invoked in one of two ways. The first way is a Boolean flag that is passed with the AWS Glue DynamoDB export connector. The second way is calling the transform function itself.
The following code examples show how to use the AWS Glue DynamoDB export connector, invoke an unnest, and print the number of partitions:
The other invocation of the transform is through a separate DynamicFrame function call. For more information, see DynamicFrame Class for Python and AWS Glue Scala DynamicFrame Class for Scala.
"connectionType": "dynamodb" with the ETL connector as sink
Use the following connection options with "connectionType": "dynamodb"
as a
sink:
-
"dynamodb.output.tableName"
: (Required) The DynamoDB table to write to. -
"dynamodb.throughput.write.percent"
: (Optional) The percentage of write capacity units (WCU) to use. The default is set to "0.5". Acceptable values are from "0.1" to "1.5", inclusive.-
0.5
represents the default write rate, meaning that AWS Glue will attempt to consume half of the write capacity of the table. If you increase the value above 0.5, AWS Glue increases the request rate; decreasing the value below 0.5 decreases the write request rate. (The actual write rate will vary, depending on factors such as whether there is a uniform key distribution in the DynamoDB table). -
When the DynamoDB table is in on-demand mode, AWS Glue handles the write capacity of the table as
40000
. For importing a large table, we recommend switching your DynamoDB table to on-demand mode.
-
-
"dynamodb.output.numParallelTasks"
: (Optional) Defines how many parallel tasks write into DynamoDB at the same time. Used to calculate permissive WCU per Spark task. If you do not want to control these details, you do not need to specify this parameter.-
permissiveWcuPerTask = TableWCU * dynamodb.throughput.write.percent / dynamodb.output.numParallelTasks
-
If you do not specify this parameter, the permissive WCU per Spark task will be automatically calculated by the following formula:
-
numPartitions = dynamicframe.getNumPartitions()
-
numExecutors =
-
(DPU - 1) * 2 - 1
ifWorkerType
isStandard
-
(NumberOfWorkers - 1)
ifWorkerType
isG.1X
orG.2X
-
-
numSlotsPerExecutor =
-
4
ifWorkerType
isStandard
-
8
ifWorkerType
isG.1X
-
16
ifWorkerType
isG.2X
-
-
numSlots = numSlotsPerExecutor * numExecutors
-
numParallelTasks = min(numPartitions, numSlots)
-
-
Example 1. DPU=10, WorkerType=Standard. Input DynamicFrame has 100 RDD partitions.
-
numPartitions = 100
-
numExecutors = (10 - 1) * 2 - 1 = 17
-
numSlots = 4 * 17 = 68
-
numParallelTasks = min(100, 68) = 68
-
-
Example 2. DPU=10, WorkerType=Standard. Input DynamicFrame has 20 RDD partitions.
-
numPartitions = 20
-
numExecutors = (10 - 1) * 2 - 1 = 17
-
numSlots = 4 * 17 = 68
-
numParallelTasks = min(20, 68) = 20
-
-
-
"dynamodb.output.retry"
: (Optional) Defines how many retries we perform when there is aProvisionedThroughputExceededException
from DynamoDB. The default is set to "10". -
"dynamodb.sts.roleArn"
: (Optional) The IAM role ARN to be assumed for cross-account access. -
"dynamodb.sts.roleSessionName"
: (Optional) STS session name. The default is set to "glue-dynamodb-write-sts-session".
The DynamoDB writer is supported in AWS Glue version 1.0 or later.
AWS Glue supports writing data into another AWS account's DynamoDB table. For more information, see Cross-account cross-Region access to DynamoDB tables.
The following code examples show how to read from and write to DynamoDB tables. They demonstrate reading from one table and writing to another table.
"connectionType": "Kafka"
Designates a connection to a Kafka cluster or an Amazon Managed Streaming for Apache Kafka cluster.
You can use the following methods under the GlueContext
object to consume
records from a Kafka streaming source:
-
getCatalogSource
-
getSource
-
getSourceWithFormat
-
createDataFrameFromOptions
If you use getCatalogSource
, then the job has the Data Catalog database and table
name information, and can use that to obtain some basic parameters for reading from the Apache
Kafka stream. If you use getSource
, getSourceWithFormat
, or createDataFrameFromOptions
you must explicitly specify these
parameters:
You can specify these options using connectionOptions
with
getSource
or createDataFrameFromOptions
, options
with getSourceWithFormat
, or
additionalOptions
with getCatalogSource
.
Use the following connection options with "connectionType": "kafka"
:
-
bootstrap.servers
(Required) A list of bootstrap server URLs, for example, asb-1.vpc-test-2.o4q88o.c6.kafka.us-east-1.amazonaws.com:9094
. This option must be specified in the API call or defined in the table metadata in the Data Catalog. -
security.protocol
(Required) The protocol used to communicate with brokers. The possible values are"SSL"
or"PLAINTEXT"
. -
topicName
(Required) A comma-separated list of topics to subscribe to. You must specify one and only one of"topicName"
,"assign"
or"subscribePattern"
. -
"assign"
: (Required) A JSON string specifying the specificTopicPartitions
to consume. You must specify one and only one of"topicName"
,"assign"
or"subscribePattern"
.Example: '{"topicA":[0,1],"topicB":[2,4]}'
-
"subscribePattern"
: (Required) A Java regex string that identifies the topic list to subscribe to. You must specify one and only one of"topicName"
,"assign"
or"subscribePattern"
.Example: 'topic.*'
-
classification
(Optional) -
delimiter
(Optional) -
"startingOffsets"
: (Optional) The starting position in the Kafka topic to read data from. The possible values are"earliest"
or"latest"
. The default value is"latest"
. -
"endingOffsets"
: (Optional) The end point when a batch query is ended. Possible values are either"latest"
or a JSON string that specifies an ending offset for eachTopicPartition
.For the JSON string, the format is
{"topicA":{"0":23,"1":-1},"topicB":{"0":-1}}
. The value-1
as an offset represents"latest"
. -
"pollTimeoutMs"
: (Optional) The timeout in milliseconds to poll data from Kafka in Spark job executors. The default value is512
. -
"numRetries"
: (Optional) The number of times to retry before failing to fetch Kafka offsets. The default value is3
. -
"retryIntervalMs"
: (Optional) The time in milliseconds to wait before retrying to fetch Kafka offsets. The default value is10
. -
"maxOffsetsPerTrigger"
: (Optional) The rate limit on the maximum number of offsets that are processed per trigger interval. The specified total number of offsets is proportionally split acrosstopicPartitions
of different volumes. The default value is null, which means that the consumer reads all offsets until the known latest offset. -
"minPartitions"
: (Optional) The desired minimum number of partitions to read from Kafka. The default value is null, which means that the number of spark partitions is equal to the number of Kafka partitions. -
"includeHeaders"
: (Optional) Whether to include the Kafka headers. When the option is set to "true", the data output will contain an additional column named "glue_streaming_kafka_headers" with typeArray[Struct(key: String, value: String)]
. The default value is "false". This option is available in AWS Glue version 3.0 or later. -
"schema"
: (Required when inferSchema set to false) The schema to use to process the payload. If classification isavro
the provided schema must be in the Avro schema format. If the classification is notavro
the provided schema must be in the DDL schema format.The following are schema examples.
-
"inferSchema"
: (Optional) The default value is 'false'. If set to 'true', the schema will be detected at runtime from the payload withinforeachbatch
. -
"avroSchema"
: (Deprecated) Parameter used to specify a schema of Avro data when Avro format is used. This parameter is now deprecated. Use theschema
parameter. -
"addRecordTimestamp"
: (Optional) When this option is set to 'true', the data output will contain an additional column named "__src_timestamp" that indicates the time when the corresponding record received by the topic. The default value is 'false'. This option is supported in AWS Glue version 4.0 or later. -
"emitConsumerLagMetrics"
: (Optional) When the option is set to 'true', for each batch, it will emit the metrics for the duration between the oldest record received by the topic and the time it arrives in AWS Glue to CloudWatch. The metric's name is "glue.driver.streaming.maxConsumerLagInMs". The default value is 'false'. This option is supported in AWS Glue version 4.0 or later.
"connectionType": "kinesis"
Designates connection options for Amazon Kinesis Data Streams.
You can read from an Amazon Kinesis data stream using information stored in a Data Catalog table, or by providing information to directly access the data stream. If you directly access the data stream, use these options to provide the information about how to access the data stream.
If you use getCatalogSource
or create_data_frame_from_catalog
to
consume records from a Kinesis streaming source, the job has the Data Catalog database and table name
information, and can use that to obtain some basic parameters for reading from the Kinesis
streaming source. If you use getSource
, getSourceWithFormat
, createDataFrameFromOptions
or
create_data_frame_from_options
, you must specify these basic parameters using
the connection options described here.
You can specify the connection options for Kinesis using the following arguments for the
specified methods in the GlueContext
class.
-
Scala
-
connectionOptions
: Use withgetSource
,createDataFrameFromOptions
-
additionalOptions
: Use withgetCatalogSource
-
options
: Use withgetSourceWithFormat
-
-
Python
-
connection_options
: Use withcreate_data_frame_from_options
-
additional_options
: Use withcreate_data_frame_from_catalog
-
options
: Use withgetSource
-
Use the following connection options for Kinesis streaming data sources:
-
streamARN
(Required) The ARN of the Kinesis data stream. -
classification
(Optional) -
delimiter
(Optional) -
"startingPosition"
: (Optional) The starting position in the Kinesis data stream to read data from. The possible values are"latest"
,"trim_horizon"
, or"earliest"
. The default value is"latest"
. -
"awsSTSRoleARN"
: (Optional) The Amazon Resource Name (ARN) of the role to assume using AWS Security Token Service (AWS STS). This role must have permissions for describe or read record operations for the Kinesis data stream. You must use this parameter when accessing a data stream in a different account. Used in conjunction with"awsSTSSessionName"
. -
"awsSTSSessionName"
: (Optional) An identifier for the session assuming the role using AWS STS. You must use this parameter when accessing a data stream in a different account. Used in conjunction with"awsSTSRoleARN"
. -
"maxFetchTimeInMs"
: (Optional) The maximum time spent in the job executor to fetch a record from the Kinesis data stream per shard, specified in milliseconds (ms). The default value is1000
. -
"maxFetchRecordsPerShard"
: (Optional) The maximum number of records to fetch per shard in the Kinesis data stream. The default value is100000
. -
"maxRecordPerRead"
: (Optional) The maximum number of records to fetch from the Kinesis data stream in eachgetRecords
operation. The default value is10000
. -
"addIdleTimeBetweenReads"
: (Optional) Adds a time delay between two consecutivegetRecords
operations. The default value is"False"
. This option is only configurable for Glue version 2.0 and above. -
"idleTimeBetweenReadsInMs"
: (Optional) The minimum time delay between two consecutivegetRecords
operations, specified in ms. The default value is1000
. This option is only configurable for Glue version 2.0 and above. -
"describeShardInterval"
: (Optional) The minimum time interval between twoListShards
API calls for your script to consider resharding. For more information, see Strategies for Resharding in Amazon Kinesis Data Streams Developer Guide. The default value is1s
. -
"numRetries"
: (Optional) The maximum number of retries for Kinesis Data Streams API requests. The default value is3
. -
"retryIntervalMs"
: (Optional) The cool-off time period (specified in ms) before retrying the Kinesis Data Streams API call. The default value is1000
. -
"maxRetryIntervalMs"
: (Optional) The maximum cool-off time period (specified in ms) between two retries of a Kinesis Data Streams API call. The default value is10000
. -
"avoidEmptyBatches"
: (Optional) Avoids creating an empty microbatch job by checking for unread data in the Kinesis data stream before the batch is started. The default value is"False"
. -
"schema"
: (Required when inferSchema set to false) The schema to use to process the payload. If classification isavro
the provided schema must be in the Avro schema format. If the classification is notavro
the provided schema must be in the DDL schema format.The following are schema examples.
-
"inferSchema"
: (Optional) The default value is 'false'. If set to 'true', the schema will be detected at runtime from the payload withinforeachbatch
. -
"avroSchema"
: (Deprecated) Parameter used to specify a schema of Avro data when Avro format is used. This parameter is now deprecated. Use theschema
parameter. -
"addRecordTimestamp"
: (Optional) When this option is set to 'true', the data output will contain an additional column named "__src_timestamp" that indicates the time when the corresponding record received by the stream. The default value is 'false'. This option is supported in AWS Glue version 4.0 or later. -
"emitConsumerLagMetrics"
: (Optional) When the option is set to 'true', for each batch, it will emit the metrics for the duration between the oldest record received by the stream and the time it arrives in AWS Glue to CloudWatch. The metric's name is "glue.driver.streaming.maxConsumerLagInMs". The default value is 'false'. This option is supported in AWS Glue version 4.0 or later.
"connectionType": "mongodb"
Designates a connection to MongoDB. Connection options differ for a source connection and a sink connection.
"connectionType": "mongodb" as source
Use the following connection options with "connectionType": "mongodb"
as a
source:
-
"uri"
: (Required) The MongoDB host to read from, formatted asmongodb://<host>:<port>
. -
"database"
: (Required) The MongoDB database to read from. This option can also be passed inadditional_options
when callingglue_context.create_dynamic_frame_from_catalog
in your job script. -
"collection"
: (Required) The MongoDB collection to read from. This option can also be passed inadditional_options
when callingglue_context.create_dynamic_frame_from_catalog
in your job script. -
"username"
: (Required) The MongoDB user name. -
"password"
: (Required) The MongoDB password. -
"ssl"
: (Optional) Iftrue
, initiates an SSL connection. The default isfalse
. -
"ssl.domain_match"
: (Optional) Iftrue
andssl
istrue
, domain match check is performed. The default istrue
. -
"batchSize"
: (Optional): The number of documents to return per batch, used within the cursor of internal batches. -
"partitioner"
: (Optional): The class name of the partitioner for reading input data from MongoDB. The connector provides the following partitioners:-
MongoDefaultPartitioner
(default) -
MongoSamplePartitioner
(Requires MongoDB 3.2 or later) -
MongoShardedPartitioner
-
MongoSplitVectorPartitioner
-
MongoPaginateByCountPartitioner
-
MongoPaginateBySizePartitioner
-
-
"partitionerOptions"
(Optional): Options for the designated partitioner. The following options are supported for each partitioner:-
MongoSamplePartitioner
:partitionKey
,partitionSizeMB
,samplesPerPartition
-
MongoShardedPartitioner
:shardkey
-
MongoSplitVectorPartitioner
:partitionKey
,partitionSizeMB
-
MongoPaginateByCountPartitioner
:partitionKey
,numberOfPartitions
-
MongoPaginateBySizePartitioner
:partitionKey
,partitionSizeMB
For more information about these options, see Partitioner Configuration
in the MongoDB documentation. For sample code, see Examples: Setting connection types and options. -
"connectionType": "mongodb" as sink
Use the following connection options with "connectionType": "mongodb"
as a
sink:
-
"uri"
: (Required) The MongoDB host to write to, formatted asmongodb://<host>:<port>
. -
"database"
: (Required) The MongoDB database to write to. -
"collection"
: (Required) The MongoDB collection to write to. -
"username"
: (Required) The MongoDB user name. -
"password"
: (Required) The MongoDB password. -
"ssl"
: (Optional) Iftrue
, initiates an SSL connection. The default isfalse
. -
"ssl.domain_match"
: (Optional) Iftrue
andssl
istrue
, domain match check is performed. The default istrue
. -
"extendedBsonTypes"
: (Optional) Iftrue
, allows extended BSON types when writing data to MongoDB. The default istrue
. -
"replaceDocument"
: (Optional) Iftrue
, replaces the whole document when saving datasets that contain an_id
field. Iffalse
, only fields in the document that match the fields in the dataset are updated. The default istrue
. -
"maxBatchSize"
: (Optional): The maximum batch size for bulk operations when saving data. The default is 512.
For sample code, see Examples: Setting connection types and options.
"connectionType": "Orc"
Designates a connection to files stored in Amazon S3 in the Apache Hive
Optimized Row Columnar (ORC)
Use the following connection options with "connectionType": "orc"
:
-
paths
: (Required) A list of the Amazon S3 paths to read from. -
(Other option name/value pairs): Any additional options, including formatting options, are passed directly to the SparkSQL
DataSource
. For more information, see Amazon Redshift data source for Spark.
"connectionType": "parquet"
Designates a connection to files stored in Amazon S3 in the Apache Parquet
Use the following connection options with "connectionType": "parquet"
:
-
paths
: (Required) A list of the Amazon S3 paths to read from. -
(Other option name/value pairs): Any additional options, including formatting options, are passed directly to the SparkSQL
DataSource
. For more information, see Amazon Redshift data source for Sparkon the GitHub website.
"connectionType": "s3"
Designates a connection to Amazon S3.
Use the following connection options with "connectionType": "s3"
:
-
"paths"
: (Required) A list of the Amazon S3 paths to read from. -
"exclusions"
: (Optional) A string containing a JSON list of Unix-style glob patterns to exclude. For example,"[\"**.pdf\"]"
excludes all PDF files. For more information about the glob syntax that AWS Glue supports, see Include and Exclude Patterns. -
"compressionType"
: or "compression
": (Optional) Specifies how the data is compressed. Use"compressionType"
for Amazon S3 sources and"compression"
for Amazon S3 targets. This is generally not necessary if the data has a standard file extension. Possible values are"gzip"
and"bzip2"
). -
"groupFiles"
: (Optional) Grouping files is turned on by default when the input contains more than 50,000 files. To turn on grouping with fewer than 50,000 files, set this parameter to"inPartition"
. To disable grouping when there are more than 50,000 files, set this parameter to"none"
. -
"groupSize"
: (Optional) The target group size in bytes. The default is computed based on the input data size and the size of your cluster. When there are fewer than 50,000 input files,"groupFiles"
must be set to"inPartition"
for this to take effect. -
"recurse"
: (Optional) If set to true, recursively reads files in all subdirectories under the specified paths. -
"maxBand"
: (Optional, advanced) This option controls the duration in milliseconds after which thes3
listing is likely to be consistent. Files with modification timestamps falling within the lastmaxBand
milliseconds are tracked specially when usingJobBookmarks
to account for Amazon S3 eventual consistency. Most users don't need to set this option. The default is 900000 milliseconds, or 15 minutes. -
"maxFilesInBand"
: (Optional, advanced) This option specifies the maximum number of files to save from the lastmaxBand
seconds. If this number is exceeded, extra files are skipped and only processed in the next job run. Most users don't need to set this option. -
"isFailFast"
: (Optional) This option determines if an AWS Glue ETL job throws reader parsing exceptions. If set totrue
, jobs fail fast if four retries of the Spark task fail to parse the data correctly.
JDBC connectionType values
The JDBC connectionType values include the following:
-
"connectionType": "sqlserver"
: Designates a connection to a Microsoft SQL Server database. -
"connectionType": "mysql"
: Designates a connection to a MySQL database. -
"connectionType": "oracle"
: Designates a connection to an Oracle database. -
"connectionType": "postgresql"
: Designates a connection to a PostgreSQL database. -
"connectionType": "redshift"
: Designates a connection to an Amazon Redshift database. For more information, see Moving data to and from Amazon Redshift.
The following table lists the JDBC driver versions that AWS Glue supports.
Product | JDBC driver versions for Glue 4.0 | JDBC driver versions for Glue 3.0 | JDBC driver versions for Glue 0.9, 1.0, 2.0 |
---|---|---|---|
Microsoft SQL Server | 9.4.0 | 7.x | 6.x |
MySQL | 8.0.23 | 8.0.23 | 5.1 |
Oracle Database | 21.7 | 21.1 | 11.2 |
PostgreSQL | 42.3.6 | 42.2.18 | 42.1.x |
MongoDB | 4.7.2 | 4.0.0 | 2.0.0 |
Amazon Redshift | redshift-jdbc42-2.1.0.9 | redshift-jdbc41-1.2.12.1017 | redshift-jdbc41-1.2.12.1017 |
If you already have a JDBC connection defined, you can reuse the configuration properties defined in it, such as: url, user and password; so you don't have to specify them in the code as connection options. To do so, use the following connection properties:
-
"useConnectionProperties"
: Set it to "true" to indicate you want to use the configuration from a connection. -
"connectionName"
: Enter the connection name to retrieve the configuration from, the connection must be defined in the same region as the job.
Use these connection options with JDBC connections:
-
"url"
: (Required) The JDBC URL for the database. -
"dbtable"
: The database table to read from. For JDBC data stores that support schemas within a database, specifyschema.table-name
. If a schema is not provided, then the default "public" schema is used. -
"redshiftTmpDir"
: (Required for Amazon Redshift, optional for other JDBC types) The Amazon S3 path where temporary data can be staged when copying out of the database. -
"user"
: (Required) The user name to use when connecting. -
"password"
: (Required) The password to use when connecting. -
(Optional) The following options allow you to supply a custom JDBC driver. Use these options if you must use a driver that AWS Glue does not natively support.
ETL jobs can use different JDBC driver versions for the data source and target, even if the source and target are the same database product. This allows you to migrate data between source and target databases with different versions. To use these options, you must first upload the JAR file of the JDBC driver to Amazon S3.
-
"customJdbcDriverS3Path"
: The Amazon S3 path of the custom JDBC driver. -
"customJdbcDriverClassName"
: The class name of JDBC driver.
-
-
"bulksize"
: (Optional) Used to configure parallel inserts for speeding up bulk loads into JDBC targets. Specify an integer value for the degree of parallelism to use when writing or inserting data. This option is helpful for improving the performance of writes into databases such as the Arch User Repository (AUR). -
"sampleQuery"
: (Optional) The custom SQL query statement for sampling. By default, the sample query is run by a single executor. If you're reading a large dataset, you might need to enable JDBC partitioning to query a table in parallel. For more information, see Reading from JDBC Tables in Parallel. To usesampleQuery
with JDBC partitioning, also setenablePartitioningForSampleQuery
to true. -
"enablePartitioningForSampleQuery"
: (Optional) By default, this option is false. Required if you want to usesampleQuery
with a partitioned JDBC table. If set to true,sampleQuery
must end with "where" or "and" for AWS Glue to append partitioning conditions. See the example that follows. -
"sampleSize"
: (Optional) Limits the number of rows returned by the sample query. Works only whenenablePartitioningForSampleQuery
is true. If partitioning is not enabled, you should instead directly add "limit x" in thesampleQuery
to limit the size.Example Use sampleQuery without partitioning
The following code example shows how to use
sampleQuery
without partitioning.//A full sql query statement. val query = "select name from $tableName where age > 0 limit 1" val connectionOptions = JsonOptions(Map( "url" -> url, "dbtable" -> table, "user" -> user, "password" -> password, "basePath" -> basePath, "sampleQuery" -> query )) val dyf = glueContext.getSource("mysql", connectionOptions) .getDynamicFrame()
Example Use sampleQuery with JDBC partitioning
The following code example shows how to use
sampleQuery
with JDBC partitioning.//note that the query should end with "where" or "and" if use with JDBC partitioning. val query = "select name from $tableName where age > 0 and" //Enable JDBC partitioning by setting hashfield. //to use sampleQuery with partitioning, set enablePartitioningForSampleQuery. //use sampleSize to limit the size of returned data. val connectionOptions = JsonOptions(Map( "url" -> url, "dbtable" -> table, "user" -> user, "password" -> password, "basePath" -> basePath, "hashfield" -> primaryKey, "sampleQuery" -> query, "enablePartitioningForSampleQuery" -> true, "sampleSize" -> "1" )) val dyf = glueContext.getSource("mysql", connectionOptions) .getDynamicFrame()
For the Amazon Redshift connection type, all other option name/value pairs that are included in
connection options for a JDBC connection, including formatting options, are passed directly to
the underlying SparkSQL DataSource. For more information, see Amazon Redshift data source for
Spark
The following code examples show how to read from and write to JDBC databases with custom JDBC drivers. They demonstrate reading from one version of a database product, and writing to a later version of the same product.
AWS Glue jobs are only associated with one subnet during a run. This may impact your ability to connect to multiple data sources through the same job. This behavior is not limited to JDBC sources.
Custom and AWS Marketplace connectionType values
These include the following:
-
"connectionType": "marketplace.athena"
: Designates a connection to an Amazon Athena data store. The connection uses a connector from AWS Marketplace. -
"connectionType": "marketplace.spark"
: Designates a connection to an Apache Spark data store. The connection uses a connector from AWS Marketplace. -
"connectionType": "marketplace.jdbc"
: Designates a connection to a JDBC data store. The connection uses a connector from AWS Marketplace. -
"connectionType": "custom.athena"
: Designates a connection to an Amazon Athena data store. The connection uses a custom connector that you upload to AWS Glue Studio. -
"connectionType": "custom.spark"
: Designates a connection to an Apache Spark data store. The connection uses a custom connector that you upload to AWS Glue Studio. -
"connectionType": "custom.jdbc"
: Designates a connection to a JDBC data store. The connection uses a custom connector that you upload to AWS Glue Studio.
Connection options for type custom.jdbc or marketplace.jdbc
-
className
– String, required, driver class name. -
connectionName
– String, required, name of the connection that is associated with the connector. -
url
– String, required, JDBC URL with placeholders (${}
) which are used to build the connection to the data source. The placeholder${secretKey}
is replaced with the secret of the same name in AWS Secrets Manager. Refer to the data store documentation for more information about constructing the URL. -
secretId
oruser/password
– String, required, used to retrieve credentials for the URL. -
dbTable
orquery
– String, required, the table or SQL query to get the data from. You can specify eitherdbTable
orquery
, but not both. -
partitionColumn
– String, optional, the name of an integer column that is used for partitioning. This option works only when it's included withlowerBound
,upperBound
, andnumPartitions
. This option works the same way as in the Spark SQL JDBC reader. For more information, see JDBC To Other Databasesin the Apache Spark SQL, DataFrames and Datasets Guide. The
lowerBound
andupperBound
values are used to decide the partition stride, not for filtering the rows in table. All rows in the table are partitioned and returned.Note When using a query instead of a table name, you should validate that the query works with the specified partitioning condition. For example:
-
If your query format is
"SELECT col1 FROM table1"
, then test the query by appending aWHERE
clause at the end of the query that uses the partition column. -
If your query format is "
SELECT col1 FROM table1 WHERE col2=val"
, then test the query by extending theWHERE
clause withAND
and an expression that uses the partition column.
-
-
lowerBound
– Integer, optional, the minimum value ofpartitionColumn
that is used to decide partition stride. -
upperBound
– Integer, optional, the maximum value ofpartitionColumn
that is used to decide partition stride. -
numPartitions
– Integer, optional, the number of partitions. This value, along withlowerBound
(inclusive) andupperBound
(exclusive), form partition strides for generatedWHERE
clause expressions that are used to split thepartitionColumn
.Important Be careful with the number of partitions because too many partitions might cause problems on your external database systems.
-
filterPredicate
– String, optional, extra condition clause to filter data from source. For example:BillingCity='Mountain View'
When using a query instead of a table name, you should validate that the query works with the specified
filterPredicate
. For example:-
If your query format is
"SELECT col1 FROM table1"
, then test the query by appending aWHERE
clause at the end of the query that uses the filter predicate. -
If your query format is
"SELECT col1 FROM table1 WHERE col2=val"
, then test the query by extending theWHERE
clause withAND
and an expression that uses the filter predicate.
-
-
dataTypeMapping
– Dictionary, optional, custom data type mapping that builds a mapping from a JDBC data type to a Glue data type. For example, the option"dataTypeMapping":{"FLOAT":"STRING"}
maps data fields of JDBC typeFLOAT
into the JavaString
type by calling theResultSet.getString()
method of the driver, and uses it to build AWS Glue records. TheResultSet
object is implemented by each driver, so the behavior is specific to the driver you use. Refer to the documentation for your JDBC driver to understand how the driver performs the conversions. -
The AWS Glue data types supported currently are:
-
DATE
-
STRING
-
TIMESTAMP
-
INT
-
FLOAT
-
LONG
-
BIGDECIMAL
-
BYTE
-
SHORT
-
DOUBLE
The JDBC data types supported are Java8 java.sql.types
. The default data type mappings (from JDBC to AWS Glue) are:
-
DATE -> DATE
-
VARCHAR -> STRING
-
CHAR -> STRING
-
LONGNVARCHAR -> STRING
-
TIMESTAMP -> TIMESTAMP
-
INTEGER -> INT
-
FLOAT -> FLOAT
-
REAL -> FLOAT
-
BIT -> BOOLEAN
-
BOOLEAN -> BOOLEAN
-
BIGINT -> LONG
-
DECIMAL -> BIGDECIMAL
-
NUMERIC -> BIGDECIMAL
-
TINYINT -> SHORT
-
SMALLINT -> SHORT
-
DOUBLE -> DOUBLE
If you use a custom data type mapping with the option
dataTypeMapping
, then you can override a default data type mapping. Only the JDBC data types listed in thedataTypeMapping
option are affected; the default mapping is used for all other JDBC data types. You can add mappings for additional JDBC data types if needed. If a JDBC data type is not included in either the default mapping or a custom mapping, then the data type converts to the AWS GlueSTRING
data type by default. -
The following Python code example shows how to read from JDBC databases with AWS Marketplace JDBC drivers. It demonstrates reading from a database and writing to an S3 location.
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [connection_type = "marketplace.jdbc", connection_options = {"dataTypeMapping":{"INTEGER":"STRING"},"upperBound":"200","query":"select id, name, department from department where id < 200","numPartitions":"4", "partitionColumn":"id","lowerBound":"0","connectionName":"test-connection-jdbc"}, transformation_ctx = "DataSource0"] ## @return: DataSource0 ## @inputs: [] DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.jdbc", connection_options = {"dataTypeMapping":{"INTEGER":"STRING"}, "upperBound":"200","query":"select id, name, department from department where id < 200","numPartitions":"4","partitionColumn":"id","lowerBound":"0", "connectionName":"test-connection-jdbc"}, transformation_ctx = "DataSource0") ## @type: ApplyMapping ## @args: [mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0"] ## @return: Transform0 ## @inputs: [frame = DataSource0] Transform0 = ApplyMapping.apply(frame = DataSource0, mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0") ## @type: DataSink ## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3://
<S3 path>
/", "partitionKeys": []}, transformation_ctx = "DataSink0"] ## @return: DataSink0 ## @inputs: [frame = Transform0] DataSink0 = glueContext.write_dynamic_frame.from_options(frame = Transform0, connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>
/", "partitionKeys": []}, transformation_ctx = "DataSink0") job.commit()
Connection options for type custom.athena or marketplace.athena
-
className
– String, required, driver class name. When you're using the Athena-CloudWatch connector, this parameter value is the prefix of the class Name (for example,"com.amazonaws.athena.connectors"
). The Athena-CloudWatch connector is composed of two classes: a metadata handler and a record handler. If you supply the common prefix here, then the API loads the correct classes based on that prefix. -
tableName
– String, required, the name of the CloudWatch log stream to read. This code snippet uses the special view nameall_log_streams
, which means that the dynamic data frame returned will contain data from all log streams in the log group. -
schemaName
– String, required, the name of the CloudWatch log group to read from. For example,/aws-glue/jobs/output
. -
connectionName
– String, required, name of the connection that is associated with the connector.
For additional options for this connector, see the Amazon Athena CloudWatch Connector README
The following Python code example shows how to read from an Athena data store using an AWS Marketplace connector. It demonstrates reading from Athena and writing to an S3 location.
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [connection_type = "marketplace.athena", connection_options = {"tableName":"all_log_streams","schemaName":"/aws-glue/jobs/output", "connectionName":"test-connection-athena"}, transformation_ctx = "DataSource0"] ## @return: DataSource0 ## @inputs: [] DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.athena", connection_options = {"tableName":"all_log_streams",, "schemaName":"/aws-glue/jobs/output","connectionName": "test-connection-athena"}, transformation_ctx = "DataSource0") ## @type: ApplyMapping ## @args: [mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0"] ## @return: Transform0 ## @inputs: [frame = DataSource0] Transform0 = ApplyMapping.apply(frame = DataSource0, mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0") ## @type: DataSink ## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3://
<S3 path>
/", "partitionKeys": []}, transformation_ctx = "DataSink0"] ## @return: DataSink0 ## @inputs: [frame = Transform0] DataSink0 = glueContext.write_dynamic_frame.from_options(frame = Transform0, connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>
/", "partitionKeys": []}, transformation_ctx = "DataSink0") job.commit()
Connection options for type custom.spark or marketplace.spark
-
className
– String, required, connector class name. -
secretId
– String, optional, used to retrieve credentials for the connector connection. -
connectionName
– String, required, name of the connection that is associated with the connector. -
Other options depend on the data store. For example, OpenSearch configuration options start with the prefix
es
, as described in the Elasticsearch for Apache Hadoopdocumentation. Spark connections to Snowflake use options such as sfUser
andsfPassword
, as described in Using the Spark Connectorin the Connecting to Snowflake guide.
The following Python code example shows how to read from an OpenSearch data store using a
marketplace.spark
connection.
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [connection_type = "marketplace.spark", connection_options = {"path":"test", "es.nodes.wan.only":"true","es.nodes":"https://
<AWS endpoint>
", "connectionName":"test-spark-es","es.port":"443"}, transformation_ctx = "DataSource0"] ## @return: DataSource0 ## @inputs: [] DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.spark", connection_options = {"path":"test","es.nodes.wan.only": "true","es.nodes":"https://<AWS endpoint>
","connectionName": "test-spark-es","es.port":"443"}, transformation_ctx = "DataSource0") ## @type: DataSink ## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>
/", "partitionKeys": []}, transformation_ctx = "DataSink0"] ## @return: DataSink0 ## @inputs: [frame = DataSource0] DataSink0 = glueContext.write_dynamic_frame.from_options(frame = DataSource0, connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>
/", "partitionKeys": []}, transformation_ctx = "DataSink0") job.commit()