AWS Glue for Spark 中适用于 ETL 的连接类型和选项 - AWS Glue

AWS Glue for Spark 中适用于 ETL 的连接类型和选项

在 AWS Glue for Spark 中,各种 PysPark 和 Scala 方法和转换使用 connectionType 参数指定连接类型。它们使用 connectionOptionsoptions 参数指定连接选项。

connectionType 参数可以采用下表中显示的值。下面几个部分介绍每种类型所关联的 connectionOptions(或 options)参数值。除非另有说明,否则这些参数会在连接用作源或接收器时使用。

有关展示如何设置和使用连接选项的代码示例,请参阅示例:设置连接类型和选项

connectionType 连接到
dynamodb Amazon DynamoDB 数据库
kinesis Amazon Kinesis Data Streams
S3 Amazon S3
documentdb Amazon DocumentDB (with MongoDB compatibility) 数据库
opensearch Amazon OpenSearch Service
redshift Amazon Redshift 数据库
kafka KafkaAmazon Managed Streaming for Apache Kafka
azurecosmos Azure Cosmos for NoSQL。
azuresql Azure SQL。
bigquery Google BigQuery。
mongodb MongoDB 数据库,包括 MongoDB Atlas。
sqlserver Microsoft SQL Server 数据库(请参阅JDBC 连接
mysql MySQL 数据库(请参阅JDBC 连接
oracle Oracle 数据库(请参阅JDBC 连接
postgresql PostgreSQL 数据库(请参阅JDBC 连接
saphana SAP HANA。
snowflake Snowflake 数据湖
teradata Teradata Vantage。
vertica Vertica。
custom.* Spark、Athena 或 JDBC 数据存储(请参阅自定义和 AWS Marketplace connectionType 值
marketplace.* Spark、Athena 或 JDBC 数据存储(请参阅自定义和 AWS Marketplace connectionType 值

"connectionType": "dynamodb"

指定与 Amazon DynamoDB 的连接。

源连接和接收器连接的连接选项不同。

"connectionType": "dynamodb" with the ETL connector as Source

在使用 AWS Glue DynamoDB ETL 连接器时,请使用以下连接选项并将 "connectionType": "dynamodb" 作为源:

  • "dynamodb.input.tableName":(必需)要从中读取数据的 DynamoDB 表格。

  • "dynamodb.throughput.read.percent":(可选)要使用的读取容量单位 (RCU) 的百分比。默认设置为“0.5”。可接受的值从“0.1”到“1.5”,包含这两个值。

    • 0.5 表示默认读取速率,这意味着 AWS Glue 将尝试占用表的一半的读取容量。如果增加值超过 0.5,AWS Glue 将增加请求速率;将值降低到 0.5 以下将降低读取请求速率。(实际读取速率取决于 DynamoDB 表中是否存在统一键分配的等因素。)

    • 当 DynamoDB 表处于按需模式时,AWS Glue 处理表的读取容量为 40000。要导出大型表,我们建议您将 DynamoDB 表切换为按需模式。

  • "dynamodb.splits":(可选)定义在读取时将此 DynamoDB 表分成多少个部分。默认设置为“1”。可接受的值从“1”到“1,000,000”,包含这两个值。

    • 1 表示没有并行度。我们强烈建议您使用以下公式指定更大的值以获得更好的性能。

    • 我们建议您使用以下公式计算 numSlots,并将其用作 dynamodb.splits。如果您需要更高的性能,我们建议您增加 DPU 数量以扩展任务。

      工件数量NumberOfWorkers)在作业配置中设置。有关更多信息,请参阅在 AWS Glue 中添加作业。启用自动扩展时,作业的可用工件数量可能会因工作负载而调整。就上下文而言,为 Spark 驱动程序保留了一个执行程序;其他执行程序用于处理数据。

      • numExecutors =

        • NumberOfWorkers - 1,如果 WorkerTypeG.1XG.2X

        • MaximumCapacity * 2 - 1 如果 WorkerTypeStandard 且 AWS Glue 版本是 2.0 以上。

          (MaximumCapacity - 1) * 2 - 1 如果 WorkerTypeStandard 且 AWS Glue 版本是 1.0 及以下。

      • numSlotsPerExecutor =

        Glue 3.0+
        • 4,如果 WorkerTypeStandardG.1X

        • 8,如果 WorkerTypeG.2X

        Glue 2.0 and legacy versions
        • 4,如果 WorkerTypeStandard

        • 8,如果 WorkerTypeG.1X

        • 16,如果 WorkerTypeG.2X

      • numSlots = numSlotsPerExecutor * numExecutors

  • "dynamodb.sts.roleArn":(可选)用于跨账户访问的 IAM 角色 ARN。此参数适用于 AWS Glue 1.0 或更高版本。

  • "dynamodb.sts.roleSessionName":(可选)STS 会话名称。默认设置为“glue-dynamodb-read-sts-session”。此参数适用于 AWS Glue 1.0 或更高版本。

以下代码示例演示了如何从 DynamoDB 表中读取(通过 ETL 连接器)以及向其写入数据。它们演示了如何从一个表读取数据并将数据写入其他表。

Python
import sys from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job from awsglue.utils import getResolvedOptions args = getResolvedOptions(sys.argv, ["JOB_NAME"]) glue_context= GlueContext(SparkContext.getOrCreate()) job = Job(glue_context) job.init(args["JOB_NAME"], args) dyf = glue_context.create_dynamic_frame.from_options( connection_type="dynamodb", connection_options={"dynamodb.input.tableName": "test_source", "dynamodb.throughput.read.percent": "1.0", "dynamodb.splits": "100" } ) print(dyf.getNumPartitions()) glue_context.write_dynamic_frame_from_options( frame=dyf, connection_type="dynamodb", connection_options={"dynamodb.output.tableName": "test_sink", "dynamodb.throughput.write.percent": "1.0" } ) job.commit()
Scala
import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.DynamoDbDataSink import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object GlueApp { def main(sysArgs: Array[String]): Unit = { val glueContext = new GlueContext(SparkContext.getOrCreate()) val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) val dynamicFrame = glueContext.getSourceWithFormat( connectionType = "dynamodb", options = JsonOptions(Map( "dynamodb.input.tableName" -> "test_source", "dynamodb.throughput.read.percent" -> "1.0", "dynamodb.splits" -> "100" )) ).getDynamicFrame() print(dynamicFrame.getNumPartitions()) val dynamoDbSink: DynamoDbDataSink = glueContext.getSinkWithFormat( connectionType = "dynamodb", options = JsonOptions(Map( "dynamodb.output.tableName" -> "test_sink", "dynamodb.throughput.write.percent" -> "1.0" )) ).asInstanceOf[DynamoDbDataSink] dynamoDbSink.writeDynamicFrame(dynamicFrame) Job.commit() } }
注意

AWS Glue 支持从其他 AWS 账户的 DynamoDB 表读取数据。有关更多信息,请参阅跨账户、跨区域访问 DynamoDB 表

注意

DynamoDB ETL 读取器不支持筛选条件或下推谓词。

"connectionType": "dynamodb" with the AWS Glue DynamoDB export connector as Source

除 AWS Glue DynamoDB ETL 连接器外,AWS Glue 提供 DynamoDB 导出连接器,该连接器调用 DynamoDB ExportTableToPointInTime 请求并将其以 DynamoDB JSON 格式存储在您提供的 Simple Storage Service (Amazon S3) 位置。然后,AWS Glue 通过从 Simple Storage Service (Amazon S3) 导出位置读取数据来创建 DynamicFrame 对象。

在 DynamoDB 表大小超过 80 GB 时,导出连接器的性能优于 ETL 连接器。此外,鉴于导出请求在 AWS Glue 任务中的 Spark 进程之外执行,您可以启用 AWS Glue 任务的弹性伸缩以节省导出请求期间的 DPU 使用量。借助导出连接器,您也无需为 Spark 执行程序并行度或 DynamoDB 吞吐量读取百分比配置拆分数。

在使用 AWS Glue DynamoDB 导出连接器(仅适用于 AWS Glue 版本 2.0 以上)时,使用以下连接选项并将 "connectionType": "dynamodb"用作源:

  • "dynamodb.export":(必需)字符串值:

    • 如果设置为 ddb,将启用 AWS Glue DynamoDB 导出连接器,其中在 AWS Glue 任务期间将调用新的 ExportTableToPointInTimeRequest。新的导出将通过从 dynamodb.s3.bucketdynamodb.s3.prefix 传递的位置生成。

    • 如果设置为 s3,将启用 AWS Glue DynamoDB 导出连接器但会跳过创建新的 DynamoDB 导出,而使用 dynamodb.s3.bucketdynamodb.s3.prefix 作为该表以前导出的 Simple Storage Service (Amazon S3) 位置。

  • "dynamodb.tableArn":(必需)要从中读取数据的 DynamoDB 表格。

  • "dynamodb.unnestDDBJson":(可选)采用布尔值。如果设置为 true(真),则对导出中存在的 DynamoDB JSON 结构执行解除嵌套转换。默认值设置为 false。

  • "dynamodb.s3.bucket":(可选)指示将会执行 DynamoDB ExportTableToPointInTime 进程的 Amazon S3 存储桶位置。导出的文件格式为 DynamoDB JSON。

    • "dynamodb.s3.prefix":(可选)指示将用于存储 DynamoDB ExportTableToPointInTime 负载的 Amazon S3 存储桶内的 Amazon S3 前缀位置。如果既未指定 dynamodb.s3.prefix,也未指定 dynamodb.s3.bucket,则这些值将默认为 AWS Glue 任务配置中指定的临时目录位置。有关更多信息,请参阅 AWS Glue 使用的特殊参数

    • "dynamodb.s3.bucketOwner":指示跨账户 Amazon S3 访问所需的存储桶拥有者。

  • "dynamodb.sts.roleArn":(可选)跨账户访问和/或跨区域访问 DynamoDB 表时将会代入的 IAM 角色 ARN。注意:相同的 IAM 角色 ARN 将用于访问为 ExportTableToPointInTime 请求指定的 Amazon S3 位置。

  • "dynamodb.sts.roleSessionName":(可选)STS 会话名称。默认设置为“glue-dynamodb-read-sts-session”。

注意

DynamoDB 对调用 ExportTableToPointInTime 请求有特定的要求。有关更多信息,请参阅在 DynamoDB 中请求表导出。例如,表需要启用时间点恢复 (PITR) 才能使用此连接器。DynamoDB 连接器还支持在将 DynamoDB 导出到 Amazon S3 时进行 AWS KMS 加密。在 AWS Glue 任务配置中指定安全性配置,将为 DynamoDB 导出启用 AWS KMS 加密。KMS 密钥必须与 Simple Storage Service (Amazon S3) 存储桶位于同一区域。

请注意,您需要支付 DynamoDB 导出的额外费用和 Simple Storage Service (Amazon S3) 存储成本。任务运行完成后,Simple Storage Service (Amazon S3) 中的导出数据仍然存在,因此您无需其他 DynamoDB 导出即可重复使用这些数据。使用此连接器的一个要求是该表启用了时间点恢复 (PITR)。

DynamoDB ETL 连接器或导出连接器不支持在 DynamoDB 源应用筛选条件或下推谓词。

以下代码示例演示如何进行读取(通过导出连接器)以及打印分区数量。

Python
import sys from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job from awsglue.utils import getResolvedOptions args = getResolvedOptions(sys.argv, ["JOB_NAME"]) glue_context= GlueContext(SparkContext.getOrCreate()) job = Job(glue_context) job.init(args["JOB_NAME"], args) dyf = glue_context.create_dynamic_frame.from_options( connection_type="dynamodb", connection_options={ "dynamodb.export": "ddb", "dynamodb.tableArn": "<test_source>", "dynamodb.s3.bucket": "<bucket name>", "dynamodb.s3.prefix": "<bucket prefix>", "dynamodb.s3.bucketOwner": "<account_id>", } ) print(dyf.getNumPartitions()) job.commit()
Scala
import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.DynamoDbDataSink import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object GlueApp { def main(sysArgs: Array[String]): Unit = { val glueContext = new GlueContext(SparkContext.getOrCreate()) val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) val dynamicFrame = glueContext.getSourceWithFormat( connectionType = "dynamodb", options = JsonOptions(Map( "dynamodb.export" -> "ddb", "dynamodb.tableArn" -> "<test_source>", "dynamodb.s3.bucket" -> "<bucket name>", "dynamodb.s3.prefix" -> "<bucket prefix>", "dynamodb.s3.bucketOwner" -> "<account_id of bucket>", )) ).getDynamicFrame() print(dynamicFrame.getNumPartitions()) Job.commit() } }

以下示例演示如何从具有 dynamodb 分类的 AWS Glue 数据目录表进行读取(通过导出连接器)以及打印分区数量:

Python
import sys from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job from awsglue.utils import getResolvedOptions args = getResolvedOptions(sys.argv, ["JOB_NAME"]) glue_context= GlueContext(SparkContext.getOrCreate()) job = Job(glue_context) job.init(args["JOB_NAME"], args) dynamicFrame = glue_context.create_dynamic_frame.from_catalog( database="<catalog_database>", table_name="<catalog_table_name", additional_options={ "dynamodb.export": "ddb", "dynamodb.s3.bucket": "<s3_bucket>", "dynamodb.s3.prefix": "<s3_bucket_prefix>" } ) print(dynamicFrame.getNumPartitions()) job.commit()
Scala
import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.DynamoDbDataSink import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object GlueApp { def main(sysArgs: Array[String]): Unit = { val glueContext = new GlueContext(SparkContext.getOrCreate()) val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) val dynamicFrame = glueContext.getCatalogSource( database = "<catalog_database>", tableName = "<catalog_table_name", additionalOptions = JsonOptions(Map( "dynamodb.export" -> "ddb", "dynamodb.s3.bucket" -> "<s3_bucket>", "dynamodb.s3.prefix" -> "<s3_bucket_prefix>" )) ).getDynamicFrame() print(dynamicFrame.getNumPartitions()) )

遍历 DynamoDB JSON 结构

使用 AWS Glue DynamoDB 导出连接器进行 DynamoDB 导出时可以生成具有特定嵌套结构的 JSON 文件。有关更多信息,请参阅数据对象。AWS Glue 提供 DynamicFrame 转换,可以将这些结构解除嵌套,成为易于下游应用程序使用的形式。

您可以通过以下两种方式之一调用该转换。第一种方法是通过 AWS Glue DynamoDB 导出连接器传递的布尔标志。第二种方式是通过调用转换函数本身。

以下代码示例演示如何使用 AWS Glue DynamoDB 导出连接器、调用解除嵌套命令,以及打印分区数量:

Python
import sys from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job from awsglue.utils import getResolvedOptions args = getResolvedOptions(sys.argv, ["JOB_NAME"]) glue_context= GlueContext(SparkContext.getOrCreate()) job = Job(glue_context) job.init(args["JOB_NAME"], args) dyf = glue_context.create_dynamic_frame.from_options( connection_type="dynamodb", connection_options={ "dynamodb.export": "ddb", "dynamodb.tableArn": "<test_source>", "dynamodb.unnestDDBJson": True, "dynamodb.s3.bucket": "<bucket name>", "dynamodb.s3.prefix": "<bucket prefix>", "dynamodb.s3.bucketOwner": "<account_id>", } ) print(dyf.getNumPartitions()) job.commit()
Scala
import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.DynamoDbDataSink import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object GlueApp { def main(sysArgs: Array[String]): Unit = { val glueContext = new GlueContext(SparkContext.getOrCreate()) val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) val dynamicFrame = glueContext.getSourceWithFormat( connectionType = "dynamodb", options = JsonOptions(Map( "dynamodb.export" -> "ddb", "dynamodb.tableArn" -> "<test_source>", "dynamodb.unnestDDBJson" -> true "dynamodb.s3.bucket" -> "<bucket name>", "dynamodb.s3.prefix" -> "<bucket prefix>", "dynamodb.s3.bucketOwner" -> "<account_id of bucket>", )) ).getDynamicFrame() print(dynamicFrame.getNumPartitions()) Job.commit() } }

转换的另一种调用是通过单独的 DynamicFrame 函数调用。有关更多信息,请参阅适用于 Python 的 DynamicFrame 类和适用于 Scala 的 AWS Glue Scala DynamicFrame 类

"connectionType": "dynamodb" with the ETL connector as Sink

"connectionType": "dynamodb" 用作连接器时可使用以下连接选项:

  • "dynamodb.output.tableName":(必需)要写入的 DynamoDB 表。

  • "dynamodb.throughput.write.percent":(可选)要使用的写入容量单位(WCU)的百分比。默认设置为“0.5”。可接受的值从“0.1”到“1.5”,包含这两个值。

    • 0.5 表示默认写入速率,这意味着 AWS Glue 将尝试占用表的一半的写入容量。如果增加值超过 0.5,AWS Glue 将增加请求速率;将值降低到 0.5 以下将降低写入请求速率。(实际写入速率取决于 DynamoDB 表中是否存在统一键分配等因素)。

    • 当 DynamoDB 表处于按需模式时,AWS Glue 处理表的写入容量为 40000。要导入大型表,我们建议您将 DynamoDB 表切换为按需模式。

  • "dynamodb.output.numParallelTasks":(可选)定义同时向 DynamoDB 写入数据的并行任务数。用于计算每个 Spark 任务的允许 WCU。如果您不想控制这些详细信息,则无需指定此参数。

    • permissiveWcuPerTask = TableWCU * dynamodb.throughput.write.percent / dynamodb.output.numParallelTasks

    • 如果您未指定此参数,则按照以下公式自动计算每个 Spark 任务的允许 WCU:

      • numPartitions = dynamicframe.getNumPartitions()

      • numExecutors =

        • (DPU - 1) * 2 - 1,如果 WorkerTypeStandard

        • (NumberOfWorkers - 1),如果 WorkerTypeG.1XG.2X

      • numSlotsPerExecutor =

        • 4,如果 WorkerTypeStandard

        • 8,如果 WorkerTypeG.1X

        • 16,如果 WorkerTypeG.2X

      • numSlots = numSlotsPerExecutor * numExecutors

      • numParallelTasks = min(numPartitions, numSlots)

    • 示例 1。DPU=10,WorkerType=Standard。输入 DynamicFrame 具有 100 个 RDD 分区。

      • numPartitions = 100

      • numExecutors = (10 - 1) * 2 - 1 = 17

      • numSlots = 4 * 17 = 68

      • numParallelTasks = min(100, 68) = 68

    • 示例 2。DPU=10,WorkerType=Standard。输入 DynamicFrame 具有 20 个 RDD 分区。

      • numPartitions = 20

      • numExecutors = (10 - 1) * 2 - 1 = 17

      • numSlots = 4 * 17 = 68

      • numParallelTasks = min(20, 68) = 20

  • "dynamodb.output.retry":(可选)定义存在 DynamoDB 中的 ProvisionedThroughputExceededException 时我们执行的重试次数。默认设置为“10”。

  • "dynamodb.sts.roleArn":(可选)用于跨账户访问的 IAM 角色 ARN。

  • "dynamodb.sts.roleSessionName":(可选)STS 会话名称。默认设置为“glue-dynamodb-write-sts-session”。

注意

DynamoDB 写入器在 AWS Glue 版本 1.0 或更高版本中受支持。

注意

AWS Glue 支持将数据写入其他 AWS 账户的 DynamoDB 表。有关更多信息,请参阅跨账户、跨区域访问 DynamoDB 表

以下代码示例说明如何从 DynamoDB 表中读取以及向其写入数据。它们演示了如何从一个表读取数据并将数据写入其他表。

Python
import sys from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job from awsglue.utils import getResolvedOptions args = getResolvedOptions(sys.argv, ["JOB_NAME"]) glue_context= GlueContext(SparkContext.getOrCreate()) job = Job(glue_context) job.init(args["JOB_NAME"], args) dyf = glue_context.create_dynamic_frame.from_options( connection_type="dynamodb", connection_options={ "dynamodb.input.tableName": "test_source", "dynamodb.throughput.read.percent": "1.0", "dynamodb.splits": "100" } ) print(dyf.getNumPartitions()) glue_context.write_dynamic_frame_from_options( frame=dyf, connection_type="dynamodb", connection_options={ "dynamodb.output.tableName": "test_sink", "dynamodb.throughput.write.percent": "1.0" } ) job.commit()
Scala
import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.DynamoDbDataSink import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object GlueApp { def main(sysArgs: Array[String]): Unit = { val glueContext = new GlueContext(SparkContext.getOrCreate()) val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) val dynamicFrame = glueContext.getSourceWithFormat( connectionType = "dynamodb", options = JsonOptions(Map( "dynamodb.input.tableName" -> "test_source", "dynamodb.throughput.read.percent" -> "1.0", "dynamodb.splits" -> "100" )) ).getDynamicFrame() print(dynamicFrame.getNumPartitions()) val dynamoDbSink: DynamoDbDataSink = glueContext.getSinkWithFormat( connectionType = "dynamodb", options = JsonOptions(Map( "dynamodb.output.tableName" -> "test_sink", "dynamodb.throughput.write.percent" -> "1.0" )) ).asInstanceOf[DynamoDbDataSink] dynamoDbSink.writeDynamicFrame(dynamicFrame) Job.commit() } }

"connectionType": "documentdb"

指定与 Amazon DocumentDB (with MongoDB compatibility) 的连接。

源连接和接收器连接的连接选项不同。

注意

使用 AWS Glue 连接器时,目前不支持 Amazon DocumentDB 弹性集群。有关弹性集群的更多信息,请参阅 Using Amazon DocumentDB elastic clusters

"connectionType": "documentdb" as Source

"connectionType": "documentdb" 用作源时可使用以下连接选项:

  • "uri":(必需)要从中读取数据的 Amazon DocumentDB 主机,格式为 mongodb://<host>:<port>

  • "database":(必需)要从中读取数据的 Amazon DocumentDB 数据库。

  • "collection":(必需)要从中读取数据的 Amazon DocumentDB 连接。

  • "username":(必需)Amazon DocumentDB 用户名。

  • "password":(必需)Amazon DocumentDB 密码。

  • "ssl":(如果使用 SSL,则必需)如果您的连接使用 SSL,则必须包含此选项且值为 "true"

  • "ssl.domain_match":(如果使用 SSL,则必需)如果您的连接使用 SSL,则必须包含此选项且值为 "false"

  • "batchSize":(可选)每个批处理返回的文档数量,在内部批处理的游标中使用。

  • "partitioner":(可选)从 Amazon DocumentDB 中读取输入数据的分区器的类名称。该连接器提供以下分区器:

    • MongoDefaultPartitioner(默认)(AWS Glue 4.0 不支持)

    • MongoSamplePartitioner(AWS Glue 4.0 不支持)

    • MongoShardedPartitioner

    • MongoSplitVectorPartitioner

    • MongoPaginateByCountPartitioner

    • MongoPaginateBySizePartitioner(AWS Glue 4.0 不支持)

  • "partitionerOptions":(可选)指定分区器的选项。各个分区器支持的选项如下:

    • MongoSamplePartitioner: partitionKey, partitionSizeMB, samplesPerPartition

    • MongoShardedPartitioner: shardkey

    • MongoSplitVectorPartitionerpartitionKey、partitionSizeMB

    • MongoPaginateByCountPartitioner: partitionKey, numberOfPartitions

    • MongoPaginateBySizePartitionerpartitionKey、partitionSizeMB

    有关这些选项的更多信息,请参阅 MongoDB 文档中的分区器配置。有关示例代码,请参阅 示例:设置连接类型和选项

"connectionType": "documentdb" as Sink

"connectionType": "documentdb" 用作连接器时可使用以下连接选项:

  • "uri":(必需)要在其中写入数据的 Amazon DocumentDB 主机,格式为 mongodb://<host>:<port>

  • "database":(必需)要在其中写入数据的 Amazon DocumentDB 数据库。

  • "collection":(必需)要在其中写入数据的 Amazon DocumentDB 连接。

  • "username":(必需)Amazon DocumentDB 用户名。

  • "password":(必需)Amazon DocumentDB 密码。

  • "extendedBsonTypes":(可选)如果为 true,则在 Amazon DocumentDB 中写入数据时会启用扩展 BSON 类型。默认为 true

  • "replaceDocument":(可选)如果为 true,则在保存包含 _id 字段的数据集时会替换整个文档。如果为 false,则只会更新文档中与数据集中的字段匹配的字段。默认为 true

  • "maxBatchSize":(可选)保存数据时的批量操作的最大批次大小。默认值为 512。

有关示例代码,请参阅 示例:设置连接类型和选项

自定义和 AWS Marketplace connectionType 值

这些功能包括:

  • "connectionType": "marketplace.athena":指定与 Amazon Athena 数据存储的连接。连接使用来自 AWS Marketplace 的连接器。

  • "connectionType": "marketplace.spark":指定与 Apache Spark 数据存储的连接。连接使用来自 AWS Marketplace 的连接器。

  • "connectionType": "marketplace.jdbc":指定与 JDBC 数据存储的连接。连接使用来自 AWS Marketplace 的连接器。

  • "connectionType": "custom.athena":指定与 Amazon Athena 数据存储的连接。连接使用您上传到 AWS Glue Studio 的自定义连接器。

  • "connectionType": "custom.spark":指定与 Apache Spark 数据存储的连接。连接使用您上传到 AWS Glue Studio 的自定义连接器。

  • "connectionType": "custom.jdbc":指定与 JDBC 数据存储的连接。连接使用您上传到 AWS Glue Studio 的自定义连接器。

适用于类型 custom.jdbc 或 marketplace.jdbc 的连接选项

  • className – 字符串,必需,驱动程序类名称。

  • connectionName – 字符串,必需,与连接器关联的连接的名称。

  • url – 字符串,必需,用于建立与数据源的连接且带占位符(${})的 JDBC URL。占位符 ${secretKey} 替换为 AWS Secrets Manager 中同名的密钥。有关构建 URL 的详细信息,请参阅数据存储文档。

  • secretIduser/password – 字符串,必需,用于检索 URL 的凭证。

  • dbTablequery – 字符串,必需,从中获取数据的表或 SQL 查询。您可以指定 dbTablequery,但不能同时指定两者。

  • partitionColumn – 字符串,可选,用于分区的整数列的名称。此选项仅在包含 lowerBoundupperBoundnumPartitions 时有效。此选项的工作方式与 Spark SQL JDBC 阅读器中的工作方式相同。有关更多信息,请参阅《Apache Spark SQL、DataFrame 和 Dataset 指南》中的 JDBC 转换到其他数据库

    lowerBoundupperBound 值用于确定分区步长,而不是用于筛选表中的行。对表中的所有行进行分区并返回。

    注意

    使用查询(而不是表名称)时,您应验证查询是否适用于指定的分区条件。例如:

    • 如果您的查询格式为 "SELECT col1 FROM table1",则在使用分区列的查询结尾附加 WHERE 子句,以测试查询。

    • 如果您的查询格式为 SELECT col1 FROM table1 WHERE col2=val",则通过 AND 和使用分区列的表达式扩展 WHERE 子句,以测试查询。

  • lowerBound – 整数,可选,用于确定分区步长的最小 partitionColumn 值。

  • upperBound – 整数,可选,用于确定分区步长的最大 partitionColumn 值。

  • numPartitions – 整数,可选,分区数。此值以及 lowerBound(包含)和 upperBound(排除)为用于拆分 partitionColumn 而生成的 WHERE 子句表达式构成分区步长。

    重要

    请注意分区的数量,因为分区过多可能会导致外部数据库系统出现问题。

  • filterPredicate – 字符串,可选,用于筛选源数据的额外条件子句。例如:

    BillingCity='Mountain View'

    使用查询(而不是名称)时,您应验证查询是否适用于指定的 filterPredicate。例如:

    • 如果您的查询格式为 "SELECT col1 FROM table1",则在使用筛选条件谓词的查询结尾附加 WHERE 子句,以测试查询。

    • 如果您的查询格式为 "SELECT col1 FROM table1 WHERE col2=val",则通过 AND 和使用筛选条件谓词的表达式扩展 WHERE 子句,以测试查询。

  • dataTypeMapping – 目录,可选,用于构建从 JDBC 数据类型到 Glue 数据类型的映射的自定义数据类型映射。例如,选项 "dataTypeMapping":{"FLOAT":"STRING"} 会通过调用驱动程序的 ResultSet.getString() 方法,将 JDBC 类型 FLOAT 的数据字段映射到 Java String 类型,并将其用于构建 AWS Glue 记录。ResultSet 对象由每个驱动程序实现,因此行为特定于您使用的驱动程序。请参阅 JDBC 驱动程序的文档,了解驱动程序执行转换的方式。

  • 目前受支持的 AWS Glue 数据类型包括:

    • DATE

    • STRING

    • TIMESTAMP

    • INT

    • FLOAT

    • LONG

    • BIGDECIMAL

    • BYTE

    • SHORT

    • DOUBLE

    支持的 JDBC 数据类型为 Java8 java.sql.types

    默认数据类型映射(从 JDBC 到 AWS Glue)如下:

    • DATE -> DATE

    • VARCHAR -> STRING

    • CHAR -> STRING

    • LONGNVARCHAR -> STRING

    • TIMESTAMP -> TIMESTAMP

    • INTEGER -> INT

    • FLOAT -> FLOAT

    • REAL -> FLOAT

    • BIT -> BOOLEAN

    • BOOLEAN -> BOOLEAN

    • BIGINT -> LONG

    • DECIMAL -> BIGDECIMAL

    • NUMERIC -> BIGDECIMAL

    • TINYINT -> SHORT

    • SMALLINT -> SHORT

    • DOUBLE -> DOUBLE

    如果将自定义数据类型映射与选项 dataTypeMapping 结合使用,则可以覆盖默认数据类型映射。只有 dataTypeMapping 选项中列出的 JDBC 数据类型会受到影响;默认映射适用于所有其他 JDBC 数据类型。如果需要,您可以为其他 JDBC 数据类型添加映射。如果默认映射或自定义映射中均未包含 JDBC 数据类型,则数据类型默认转换为 AWS Glue STRING 数据类型。

以下 Python 代码示例演示了如何使用 AWS Marketplace JDBC 驱动程序从 JDBC 数据库读取数据。它演示了如何从数据库读取数据并将数据写入 S3 位置。

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [connection_type = "marketplace.jdbc", connection_options = {"dataTypeMapping":{"INTEGER":"STRING"},"upperBound":"200","query":"select id, name, department from department where id < 200","numPartitions":"4", "partitionColumn":"id","lowerBound":"0","connectionName":"test-connection-jdbc"}, transformation_ctx = "DataSource0"] ## @return: DataSource0 ## @inputs: [] DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.jdbc", connection_options = {"dataTypeMapping":{"INTEGER":"STRING"}, "upperBound":"200","query":"select id, name, department from department where id < 200","numPartitions":"4","partitionColumn":"id","lowerBound":"0", "connectionName":"test-connection-jdbc"}, transformation_ctx = "DataSource0") ## @type: ApplyMapping ## @args: [mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0"] ## @return: Transform0 ## @inputs: [frame = DataSource0] Transform0 = ApplyMapping.apply(frame = DataSource0, mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0") ## @type: DataSink ## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0"] ## @return: DataSink0 ## @inputs: [frame = Transform0] DataSink0 = glueContext.write_dynamic_frame.from_options(frame = Transform0, connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0") job.commit()

适用于类型 custom.athena 或 marketplace.athena 的连接选项

  • className – 字符串,必需,驱动程序类名称。当您使用 Athena-CloudWatch 连接器时,此参数值是类名称(例如 "com.amazonaws.athena.connectors")的前缀。Athena-CloudWatch 连接器由两个类组成:元数据处理程序和记录处理程序。如果您在此处提供通用前缀,则 API 会根据该前缀加载正确的类。

  • tableName – 字符串,必需,要读取的 CloudWatch 日志流的名称。此代码段使用特别视图名称 all_log_streams,这意味着返回的动态数据框将包含日志组中所有日志流的数据。

  • schemaName – 字符串,必需,要从中读取数据的 CloudWatch 日志流的名称。例如,/aws-glue/jobs/output

  • connectionName – 字符串,必需,与连接器关联的连接的名称。

有关此连接器的其他选项,请参阅 GitHub 上的 Amazon Athena CloudWatch 连接器自述文件。

以下 Python 代码示例演示了如何从使用 AWS Marketplace 连接器的 Athena 数据存储读取数据。它演示了如何从 Athena 读取数据并将数据写入 S3 位置。

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [connection_type = "marketplace.athena", connection_options = {"tableName":"all_log_streams","schemaName":"/aws-glue/jobs/output", "connectionName":"test-connection-athena"}, transformation_ctx = "DataSource0"] ## @return: DataSource0 ## @inputs: [] DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.athena", connection_options = {"tableName":"all_log_streams",, "schemaName":"/aws-glue/jobs/output","connectionName": "test-connection-athena"}, transformation_ctx = "DataSource0") ## @type: ApplyMapping ## @args: [mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0"] ## @return: Transform0 ## @inputs: [frame = DataSource0] Transform0 = ApplyMapping.apply(frame = DataSource0, mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0") ## @type: DataSink ## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0"] ## @return: DataSink0 ## @inputs: [frame = Transform0] DataSink0 = glueContext.write_dynamic_frame.from_options(frame = Transform0, connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0") job.commit()

适用于类型 custom.spark 或 marketplace.spark 的连接选项

  • className – 字符串,必需,连接器类名称。

  • secretId – 字符串,可选,用于检索连接器连接的凭证。

  • connectionName – 字符串,必需,与连接器关联的连接的名称。

  • 其他选项取决于数据存储。例如,OpenSearch 配置选项以前缀 es 开头,正如适用于 Apache Hadoop 的 Elasticsearch 文档中所述。Spark 与 Snowflake 的连接使用 sfUsersfPassword 等连接,正如《连接 Snowflake》指南中的使用 Spark 连接器所述。

以下 Python 代码示例演示了如何从使用 marketplace.spark 连接的 OpenSearch 数据存储读取数据。

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [connection_type = "marketplace.spark", connection_options = {"path":"test", "es.nodes.wan.only":"true","es.nodes":"https://<AWS endpoint>", "connectionName":"test-spark-es","es.port":"443"}, transformation_ctx = "DataSource0"] ## @return: DataSource0 ## @inputs: [] DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.spark", connection_options = {"path":"test","es.nodes.wan.only": "true","es.nodes":"https://<AWS endpoint>","connectionName": "test-spark-es","es.port":"443"}, transformation_ctx = "DataSource0") ## @type: DataSink ## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0"] ## @return: DataSink0 ## @inputs: [frame = DataSource0] DataSink0 = glueContext.write_dynamic_frame.from_options(frame = DataSource0, connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0") job.commit()

常规选项

本节中的选项以 connection_options 形式提供,但不是专门适用于一个连接器。

配置书签时通常使用以下参数。它们可能适用于 Amazon S3 或 JDBC 工作流程。有关更多信息,请参阅使用作业书签

  • jobBookmarkKeys - 列名称的数组。

  • jobBookmarkKeysSortOrder - 定义如何根据排序顺序比较值的字符串。有效值:"asc""desc"

  • useS3ListImplementation - 用于在列出 Amazon S3 存储桶内容时管理内存性能。有关更多信息,请参阅 Optimize memory management in AWS Glue