Tipos de conexión y opciones para ETL en AWS Glue para Spark - AWS Glue

Tipos de conexión y opciones para ETL en AWS Glue para Spark

En AWS Glue para Spark, varios métodos y transformaciones de PySpark y Scala especifican el tipo de conexión mediante un parámetro connectionType. Especifican las opciones de conexión mediante un parámetro connectionOptions o options.

El parámetro connectionType puede adoptar los valores que se muestran en la tabla siguiente. Los valores del parámetro connectionOptions (u options) asociados para cada tipo se documentan en las secciones siguientes. Salvo que se indique lo contrario, los parámetros se aplican cuando la conexión se utiliza como origen o receptor.

Para obtener un código de muestra que ilustra la configuración y que utiliza las opciones de conexión, consulte Ejemplos: configuración de tipos y opciones de conexión.

connectionType Se conecta a
custom.* Almacenes de datos Spark, Athena o JDBC (consulte Valores de conexiones de tipo personalizada y AWS Marketplace
documentdb Base de datos de Amazon DocumentDB (compatible con MongoDB)
dynamodb Base de datos Amazon DynamoDB
kafka Kafka o Amazon Managed Streaming for Apache Kafka
kinesis Amazon Kinesis Data Streams
marketplace.* Almacenes de datos Spark, Athena o JDBC (consulte Valores de conexiones de tipo personalizada y AWS Marketplace)
mongodb Base de datos de MongoDB
mysql Base de datos de MySQL (consulte Conexiones de JDBC)
oracle Base de datos de Oracle (consulte Conexiones de JDBC)
postgresql Base de datos de PostgreSQL (consulte Conexiones de JDBC)
redshift Base de datos de Amazon Redshift
s3 Amazon S3
sqlserver Base de datos de Microsoft SQL Server (consulte Conexiones de JDBC)
Snowflake Lago de datos Snowflake

"connectionType": "Documentdb"

Designa una conexión a Amazon DocumentDB (compatible con MongoDB).

Las opciones de conexión son distintas entre una conexión de origen y una conexión de receptor.

nota

Los clústeres elásticos de Amazon DocumentDB no se admiten actualmente cuando se utiliza el conector Glue AWS. Para obtener más información sobre los clústeres elásticos, consulte Uso de clústeres elásticos de Amazon DocumentDB.

"connectionType": "Documentdb" como origen

Utilice las siguientes opciones de conexión con "connectionType": "documentdb" como origen:

  • "uri": (obligatorio) el host de Amazon DocumentDB del que se va a leer, con formato mongodb://<host>:<port>.

  • "database": (obligatorio) la base de datos de Amazon DocumentDB de la que se va a leer.

  • "collection": (obligatorio) la recopilación de Amazon DocumentDB de la que se va a leer.

  • "username": (obligatorio) nombre de usuario de Amazon DocumentDB.

  • "password": (obligatorio) la contraseña de Amazon DocumentDB.

  • "ssl": (obligatorio si usa SSL) si su conexión usa SSL, debe incluir esta opción con el valor "true".

  • "ssl.domain_match": (obligatorio si usa SSL) si su conexión usa SSL, debe incluir esta opción con el valor "false".

  • "batchSize": (opcional): el número de documentos que se deben devolver por lote, que se utilizan dentro del cursor de lotes internos.

  • "partitioner": (opcional): el nombre de la clase del particionador para leer los datos de entrada de Amazon DocumentDB. El conector proporciona los siguientes particionadores:

    • MongoDefaultPartitioner (predeterminado)

    • MongoSamplePartitioner

    • MongoShardedPartitioner

    • MongoSplitVectorPartitioner

    • MongoPaginateByCountPartitioner

    • MongoPaginateBySizePartitioner

  • "partitionerOptions" ( opcional): opciones para el particionador designado. Se admiten las siguientes opciones para cada particionador:

    • MongoSamplePartitioner: partitionKey, partitionSizeMB, samplesPerPartition

    • MongoShardedPartitioner: shardkey

    • MongoSplitVectorPartitioner: partitionKey, partitionSizeMB

    • MongoPaginateByCountPartitioner: partitionKey, numberOfPartitions

    • MongoPaginateBySizePartitioner: partitionKey, partitionSizeMB

    Para obtener más información acerca de estas opciones, consulte Partitioner Configuration en la documentación de MongoDB. Para ver el código de muestra, consulte Ejemplos: configuración de tipos y opciones de conexión.

"connectionType": "Documentdb" como receptor

Utilice las siguientes opciones de conexión con "connectionType": "documentdb" como receptor:

  • "uri": (obligatorio) el host de Amazon DocumentDB al que se va a escribir, con formato mongodb://<host>:<port>.

  • "database": (obligatorio) la base de datos de Amazon DocumentDB a la que se va a escribir.

  • "collection": (obligatorio) la recopilación de Amazon DocumentDB a la que se va a escribir.

  • "username": (obligatorio) nombre de usuario de Amazon DocumentDB.

  • "password": (obligatorio) la contraseña de Amazon DocumentDB.

  • "extendedBsonTypes": (opcional) si se establece en true, permite los tipos de BSON extendidos al escribir datos en Amazon DocumentDB. El valor predeterminado es true.

  • "replaceDocument": (opcional) si es true, reemplaza todo el documento al guardar conjuntos de datos que contienen un campo _id. Si es false, solo se actualizan los campos del documento que coinciden con los campos del conjunto de datos. El valor predeterminado es true.

  • "maxBatchSize": (opcional): el tamaño máximo del lote para operaciones en bloque al guardar datos. El valor predeterminado es 512.

Para ver el código de muestra, consulte Ejemplos: configuración de tipos y opciones de conexión.

"connectionType": "dynamodb"

Designa una conexión a Amazon DynamoDB.

Las opciones de conexión son distintas entre una conexión de origen y una conexión de receptor.

“connectionType”: “dynamodb” con el conector de ETL como origen

Utilice las siguientes opciones de conexión con "connectionType": "dynamodb" como origen cuando use el conector de ETL de DynamoDB de AWS Glue:

  • "dynamodb.input.tableName": (obligatorio) la tabla de DynamoDB de la que se va a leer.

  • "dynamodb.throughput.read.percent": (opcional) porcentaje de unidades de capacidad de lectura (RCU) que se usará. El valor predeterminado se establece en "0,5". Los valores aceptables abarcan de "0,1" a "1,5", inclusive.

    • 0.5 representa la tasa de lectura predeterminada, es decir que AWS Glue intentará consumir la mitad de la capacidad de lectura de la tabla. Si usted aumenta el valor por encima de 0.5, AWS Glue incrementará la tasa de solicitudes; si reduce el valor por debajo de 0.5, disminuirá la tasa de solicitudes de lectura. (La tasa de lectura real varía, según diversos factores tales como el hecho de que exista o no una distribución uniforme de claves en la tabla de DynamoDB).

    • Cuando la tabla de DynamoDB está en modo bajo demanda, AWS Glue maneja la capacidad de lectura de la tabla como 40 000. Para exportar una tabla de gran extensión, recomendamos cambiar la tabla de DynamoDB al modo bajo demanda.

  • "dynamodb.splits": (opcional) define la cantidad de particiones en las que dividimos esta tabla de DynamoDB al leerla. El valor predeterminado se establece en "1". Los valores aceptables abarcan de "1" a "1,000,000", inclusive.

    • 1 representa que no hay paralelismo. Se recomienda especialmente que especifique un valor mayor para un mejor rendimiento mediante la fórmula siguiente.

    • Le recomendamos que calcule numSlots con la siguiente fórmula, y lo use como dynamodb.splits. Si necesita más rendimiento, le recomendamos que amplíe su trabajo al aumentar el número de DPU.

      La Cantidad de trabajos (NumberOfWorkers) se establece en la configuración del trabajo. Para obtener más información, consulte Agregar trabajos en AWS Glue. Si habilita el escalado automático, es posible que la cantidad de trabajos disponibles se ajuste debido a la carga de trabajo. Para contextualizar, un ejecutor está reservado para el controlador de Spark mientras que otros ejecutores procesan datos.

      • numExecutors =

        • NumberOfWorkers - 1 si WorkerType es G.1X o G.2X

        • MaximumCapacity * 2 - 1 si WorkerType es Standard y la versión de AWS Glue es 2.0 o superior.

          (MaximumCapacity - 1) * 2 - 1 si WorkerType es Standard y la versión de AWS Glue es 1.0 o anterior.

      • numSlotsPerExecutor =

        Glue 3.0+
        • 4 si WorkerType es Standard o G.1X

        • 8 si WorkerType es G.2X

        Glue 2.0 and legacy versions
        • 4 si WorkerType es Standard

        • 8 si WorkerType es G.1X

        • 16 si WorkerType es G.2X

      • numSlots = numSlotsPerExecutor * numExecutors

  • "dynamodb.sts.roleArn": (opcional) el ARN de rol de IAM que se asumirá para el acceso entre cuentas. Este parámetro se encuentra disponible en AWS Glue 1.0 o posterior.

  • "dynamodb.sts.roleSessionName": (opcional) nombre de sesión STS. El valor predeterminado se establece en “glue-dynamodb-read-sts-session”. Este parámetro se encuentra disponible en AWS Glue 1.0 o posterior.

En los siguientes ejemplos de código, se muestra cómo leer (con el conector de ETL) de tablas de DynamoDB y escribir en ellas. Demuestran la lectura de una tabla y la escritura en otra tabla.

Python
import sys from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job from awsglue.utils import getResolvedOptions args = getResolvedOptions(sys.argv, ["JOB_NAME"]) glue_context= GlueContext(SparkContext.getOrCreate()) job = Job(glue_context) job.init(args["JOB_NAME"], args) dyf = glue_context.create_dynamic_frame.from_options( connection_type="dynamodb", connection_options={"dynamodb.input.tableName": "test_source", "dynamodb.throughput.read.percent": "1.0", "dynamodb.splits": "100" } ) print(dyf.getNumPartitions()) glue_context.write_dynamic_frame_from_options( frame=dyf, connection_type="dynamodb", connection_options={"dynamodb.output.tableName": "test_sink", "dynamodb.throughput.write.percent": "1.0" } ) job.commit()
Scala
import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.DynamoDbDataSink import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object GlueApp { def main(sysArgs: Array[String]): Unit = { val glueContext = new GlueContext(SparkContext.getOrCreate()) val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) val dynamicFrame = glueContext.getSourceWithFormat( connectionType = "dynamodb", options = JsonOptions(Map( "dynamodb.input.tableName" -> "test_source", "dynamodb.throughput.read.percent" -> "1.0", "dynamodb.splits" -> "100" )) ).getDynamicFrame() print(dynamicFrame.getNumPartitions()) val dynamoDbSink: DynamoDbDataSink = glueContext.getSinkWithFormat( connectionType = "dynamodb", options = JsonOptions(Map( "dynamodb.output.tableName" -> "test_sink", "dynamodb.throughput.write.percent" -> "1.0" )) ).asInstanceOf[DynamoDbDataSink] dynamoDbSink.writeDynamicFrame(dynamicFrame) Job.commit() } }
nota

AWS Glue admite la lectura de datos de tablas de DynamoDB de otra cuenta de AWS. Para obtener más información, consulte Acceso entre cuentas y entre regiones a tablas de DynamoDB.

nota

El lector de ETL de DynamoDB no es compatible con filtros ni predicados de inserción.

“connectionType”: “dynamodb” con el conector de exportación de DynamoDB de AWS Glue como origen

Además del conector de ETL de DynamoDB de AWS Glue, AWS Glue ofrece un conector de exportación de DynamoDB, que invoca una solicitud ExportTableToPointInTime de DynamoDB y la almacena en una ubicación de Simple Storage Service (Amazon S3) que se proporcione, con el formato de JSON de DynamoDB. Después, AWS Glue crea un objeto DynamicFrame leyendo los datos desde la ubicación de exportación de Simple Storage Service (Amazon S3).

El conector de exportación funciona mejor que el conector de ETL cuando el tamaño de la tabla de DynamoDB es superior a 80 GB. Además, dado que la solicitud de exportación se lleva a cabo fuera de los procesos de Spark en un trabajo de AWS Glue, se puede habilitar el escalado automático de los trabajos de AWS Glue para guardar el uso de DPU durante la solicitud de exportación. Con el conector de exportación, tampoco es necesario configurar el número de divisiones del paralelismo del ejecutor de Spark o el porcentaje de lectura de rendimiento de DynamoDB.

Utilice las siguientes opciones de conexión con “connectionType”: “dynamodb” como origen cuando use el conector de exportación de DynamoDB de AWS Glue, que solo está disponible a partir de AWS Glue versión 2.0:

  • "dynamodb.export": (obligatorio) valor de cadena:

    • Si se configura como ddb, habilita el conector de exportación de DynamoDB de AWS Glue cuando se invoque una nueva ExportTableToPointInTimeRequest durante el trabajo de AWS Glue. Se generará una nueva exportación con la ubicación pasada desde dynamodb.s3.bucket y dynamodb.s3.prefix.

    • Si se configura como s3, habilita el conector de exportación de DynamoDB de AWS Glue, pero omite la creación de una nueva exportación de DynamoDB y, en su lugar, utiliza dynamodb.s3.bucket y dynamodb.s3.prefix como ubicación de Simple Storage Service (Amazon S3) de una exportación anterior de esa tabla.

  • "dynamodb.tableArn": (obligatorio) tabla de DynamoDB desde la que se debe leer.

  • "dynamodb.unnestDDBJson": (opcional) toma un valor booleano. Si se configura como true, realiza una transformación no anidada de la estructura JSON de DynamoDB que está presente en las exportaciones. El valor predeterminado está configurado como false.

  • "dynamodb.s3.bucket": (opcional) indica la ubicación del bucket de Simple Storage Service (Amazon S3) en el que debe llevarse a cabo el proceso ExportTableToPointInTime de DynamoDB. El formato de archivo para la exportación es DynamoDB JSON.

    • "dynamodb.s3.prefix": (opcional) indica la ubicación del prefijo de Simple Storage Service (Amazon S3) dentro del bucket de Simple Storage Service (Amazon S3) en el que se almacenarán las cargas ExportTableToPointInTime de DynamoDB. Si no se especifica dynamodb.s3.prefix ni dynamodb.s3.bucket, estos valores adoptarán de manera predeterminada la ubicación del directorio temporal especificada en la configuración del trabajo de AWS Glue. Para obtener más información, consulte Parámetros especiales utilizados por AWS Glue.

    • "dynamodb.s3.bucketOwner": indica el propietario del bucket que se necesita para el acceso entre cuentas de Simple Storage Service (Amazon S3).

  • "dynamodb.sts.roleArn": (opcional) ARN del rol de IAM que se asumirá para el acceso entre cuentas o el acceso entre regiones para la tabla de DynamoDB. Nota: El mismo ARN del rol de IAM se utilizará para acceder a la ubicación de Simple Storage Service (Amazon S3) especificada para la solicitud ExportTableToPointInTime.

  • "dynamodb.sts.roleSessionName": (opcional) nombre de sesión STS. El valor predeterminado se establece en “glue-dynamodb-read-sts-session”.

nota

DynamoDB tiene requisitos específicos para invocar las solicitudes ExportTableToPointInTime. Para obtener más información, consulte Solicitud de una exportación de tabla en DynamoDB. Por ejemplo, debe estar habilitada la restauración a un momento dado (PITR) en la tabla para utilizar este conector. El conector de DynamoDB también es compatible con el cifrado de AWS KMS para las exportaciones de DynamoDB a Amazon S3. Proporcionar la configuración de seguridad en la configuración del trabajo de AWS Glue habilita el cifrado de AWS KMS para una exportación de DynamoDB. La clave de KMS debe estar en la misma región que el bucket de Simple Storage Service (Amazon S3).

Tenga en cuenta que se aplican cargos adicionales por la exportación de DynamoDB y los costos de almacenamiento de Simple Storage Service (Amazon S3). Los datos exportados en Simple Storage Service (Amazon S3) se mantienen cuando finaliza la ejecución de un trabajo para que pueda reutilizarlos sin exportaciones adicionales de DynamoDB. Un requisito para utilizar este conector es que la recuperación en un momento dado (PITR) esté habilitada para la tabla.

El conector de ETL y el conector de exportación de DynamoDB no son compatibles con la aplicación de filtros o predicados de inserción en el origen de DynamoDB.

Los siguientes ejemplos de código muestran cómo leer desde particiones (a través del conector de exportación) e imprimir el número de estas.

Python
import sys from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job from awsglue.utils import getResolvedOptions args = getResolvedOptions(sys.argv, ["JOB_NAME"]) glue_context= GlueContext(SparkContext.getOrCreate()) job = Job(glue_context) job.init(args["JOB_NAME"], args) dyf = glue_context.create_dynamic_frame.from_options( connection_type="dynamodb", connection_options={ "dynamodb.export": "ddb", "dynamodb.tableArn": "<test_source>", "dynamodb.s3.bucket": "<bucket name>", "dynamodb.s3.prefix": "<bucket prefix>", "dynamodb.s3.bucketOwner": "<account_id>", } ) print(dyf.getNumPartitions()) job.commit()
Scala
import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.DynamoDbDataSink import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object GlueApp { def main(sysArgs: Array[String]): Unit = { val glueContext = new GlueContext(SparkContext.getOrCreate()) val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) val dynamicFrame = glueContext.getSourceWithFormat( connectionType = "dynamodb", options = JsonOptions(Map( "dynamodb.export" -> "ddb", "dynamodb.tableArn" -> "<test_source>", "dynamodb.s3.bucket" -> "<bucket name>", "dynamodb.s3.prefix" -> "<bucket prefix>", "dynamodb.s3.bucketOwner" -> "<account_id of bucket>", )) ).getDynamicFrame() print(dynamicFrame.getNumPartitions()) Job.commit() } }

Estos ejemplos muestran cómo realizar la lectura desde particiones (a través del conector de exportación) e imprimir el número de estas desde una tabla de Data Catalog de AWS Glue que tenga una clasificación dynamodb:

Python
import sys from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job from awsglue.utils import getResolvedOptions args = getResolvedOptions(sys.argv, ["JOB_NAME"]) glue_context= GlueContext(SparkContext.getOrCreate()) job = Job(glue_context) job.init(args["JOB_NAME"], args) dynamicFrame = glue_context.create_dynamic_frame.from_catalog( database="<catalog_database>", table_name="<catalog_table_name", additional_options={ "dynamodb.export": "ddb", "dynamodb.s3.bucket": "<s3_bucket>", "dynamodb.s3.prefix": "<s3_bucket_prefix>" } ) print(dynamicFrame.getNumPartitions()) job.commit()
Scala
import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.DynamoDbDataSink import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object GlueApp { def main(sysArgs: Array[String]): Unit = { val glueContext = new GlueContext(SparkContext.getOrCreate()) val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) val dynamicFrame = glueContext.getCatalogSource( database = "<catalog_database>", tableName = "<catalog_table_name", additionalOptions = JsonOptions(Map( "dynamodb.export" -> "ddb", "dynamodb.s3.bucket" -> "<s3_bucket>", "dynamodb.s3.prefix" -> "<s3_bucket_prefix>" )) ).getDynamicFrame() print(dynamicFrame.getNumPartitions()) )

Examen de la estructura JSON de DynamoDB

Las exportaciones de DynamoDB con el conector de exportación de DynamoDB de AWS Glue pueden generar archivos JSON con estructuras anidadas específicas. Para obtener más información, consulte Objetos de datos. AWS Glue proporciona una transformación de DynamicFrame, que puede desanidar estas estructuras y dejarlas con una forma más fácil de usar para aplicaciones posteriores.

La transformación se puede invocar de dos formas. La primera forma consiste en usar una marca booleana que se pasa con el conector de exportación de DynamoDB para AWS Glue. La segunda forma consiste en llamar a la propia función de transformación.

Los siguientes ejemplos de código muestran cómo utilizar el conector de exportación de DynamoDB de AWS Glue, invocar un desanidamiento e imprimir el número de particiones:

Python
import sys from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job from awsglue.utils import getResolvedOptions args = getResolvedOptions(sys.argv, ["JOB_NAME"]) glue_context= GlueContext(SparkContext.getOrCreate()) job = Job(glue_context) job.init(args["JOB_NAME"], args) dyf = glue_context.create_dynamic_frame.from_options( connection_type="dynamodb", connection_options={ "dynamodb.export": "ddb", "dynamodb.tableArn": "<test_source>", "dynamodb.unnestDDBJson": True, "dynamodb.s3.bucket": "<bucket name>", "dynamodb.s3.prefix": "<bucket prefix>", "dynamodb.s3.bucketOwner": "<account_id>", } ) print(dyf.getNumPartitions()) job.commit()
Scala
import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.DynamoDbDataSink import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object GlueApp { def main(sysArgs: Array[String]): Unit = { val glueContext = new GlueContext(SparkContext.getOrCreate()) val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) val dynamicFrame = glueContext.getSourceWithFormat( connectionType = "dynamodb", options = JsonOptions(Map( "dynamodb.export" -> "ddb", "dynamodb.tableArn" -> "<test_source>", "dynamodb.unnestDDBJson" -> true "dynamodb.s3.bucket" -> "<bucket name>", "dynamodb.s3.prefix" -> "<bucket prefix>", "dynamodb.s3.bucketOwner" -> "<account_id of bucket>", )) ).getDynamicFrame() print(dynamicFrame.getNumPartitions()) Job.commit() } }

La otra invocación de la transformación se realiza mediante una llamada a la función DynamicFrame independiente. Para obtener más información, consulte Clase DynamicFrame para Python y Clase DynamicFrame de Scala de AWS Glue para Scala.

“connectionType”: “dynamodb” con el conector de ETL como receptor

Utilice las siguientes opciones de conexión con "connectionType": "dynamodb" como receptor:

  • "dynamodb.output.tableName": (obligatorio) tabla de DynamoDB en la que se va a escribir.

  • "dynamodb.throughput.write.percent": (opcional) porcentaje de unidades de capacidad de escritura (WCU) que se usará. El valor predeterminado se establece en "0,5". Los valores aceptables abarcan de "0,1" a "1,5", inclusive.

    • 0.5 representa la tasa de escritura predeterminada, es decir que AWS Glue intentará consumir la mitad de la capacidad de escritura de la tabla. Si usted aumenta el valor por encima de 0,5, AWS Glue incrementará la tasa de solicitudes; si reduce el valor por debajo de 0,5, disminuirá la tasa de solicitudes de escritura. (La tasa de escritura real varía en función de diversos factores, tales como el hecho de que exista o no una distribución uniforme de claves en la tabla de DynamoDB).

    • Cuando la tabla de DynamoDB está en modo bajo demanda, AWS Glue maneja la capacidad de escritura de la tabla como 40000. Para importar una tabla grande, recomendamos cambiar la tabla de DynamoDB al modo bajo demanda.

  • "dynamodb.output.numParallelTasks": (opcional) define el número de tareas paralelas que escriben en DynamoDB al mismo tiempo. Se utiliza para calcular WCU permisiva por tarea de Spark. Si no desea controlar estos detalles, no es necesario que especifique este parámetro.

    • permissiveWcuPerTask = TableWCU * dynamodb.throughput.write.percent / dynamodb.output.numParallelTasks

    • Si no especifica este parámetro, la WCU permisiva por tarea de Spark se calculará automáticamente mediante la siguiente fórmula:

      • numPartitions = dynamicframe.getNumPartitions()

      • numExecutors =

        • (DPU - 1) * 2 - 1 si WorkerType es Standard

        • (NumberOfWorkers - 1) si WorkerType es G.1X o G.2X

      • numSlotsPerExecutor =

        • 4 si WorkerType es Standard

        • 8 si WorkerType es G.1X

        • 16 si WorkerType es G.2X

      • numSlots = numSlotsPerExecutor * numExecutors

      • numParallelTasks = min(numPartitions, numSlots)

    • Ejemplo 1. DPU=10, tipo de empleado=estándar. La entrada DynamicFrame tiene 100 particiones RDD.

      • numPartitions = 100

      • numExecutors = (10 - 1) * 2 - 1 = 17

      • numSlots = 4 * 17 = 68

      • numParallelTasks = min(100, 68) = 68

    • Ejemplo 2. DPU=10, tipo de empleado=estándar. La entrada DynamicFrame tiene 20 particiones RDD.

      • numPartitions = 20

      • numExecutors = (10 - 1) * 2 - 1 = 17

      • numSlots = 4 * 17 = 68

      • numParallelTasks = min(20, 68) = 20

  • "dynamodb.output.retry": (opcional) define el número de reintentos que realizamos cuando hay una ProvisionedThroughputExceededException de DynamoDB. El valor predeterminado se establece en “10”.

  • "dynamodb.sts.roleArn": (opcional) el ARN de rol de IAM que se asumirá para el acceso entre cuentas.

  • "dynamodb.sts.roleSessionName": (opcional) nombre de sesión STS. El valor predeterminado se establece en “glue-dynamodb-write-sts-session”.

nota

La versión de AWS Glue 1.0 o posterior admite la escritura de DynamoDB.

nota

AWS Glue admite la escritura de datos de tablas de DynamoDB en otra cuenta de AWS. Para obtener más información, consulte Acceso entre cuentas y entre regiones a tablas de DynamoDB.

En los siguientes ejemplos de código, se muestra cómo leer de tablas de DynamoDB y escribir en ellas. Demuestran la lectura de una tabla y la escritura en otra tabla.

Python
import sys from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job from awsglue.utils import getResolvedOptions args = getResolvedOptions(sys.argv, ["JOB_NAME"]) glue_context= GlueContext(SparkContext.getOrCreate()) job = Job(glue_context) job.init(args["JOB_NAME"], args) dyf = glue_context.create_dynamic_frame.from_options( connection_type="dynamodb", connection_options={ "dynamodb.input.tableName": "test_source", "dynamodb.throughput.read.percent": "1.0", "dynamodb.splits": "100" } ) print(dyf.getNumPartitions()) glue_context.write_dynamic_frame_from_options( frame=dyf, connection_type="dynamodb", connection_options={ "dynamodb.output.tableName": "test_sink", "dynamodb.throughput.write.percent": "1.0" } ) job.commit()
Scala
import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.DynamoDbDataSink import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object GlueApp { def main(sysArgs: Array[String]): Unit = { val glueContext = new GlueContext(SparkContext.getOrCreate()) val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) val dynamicFrame = glueContext.getSourceWithFormat( connectionType = "dynamodb", options = JsonOptions(Map( "dynamodb.input.tableName" -> "test_source", "dynamodb.throughput.read.percent" -> "1.0", "dynamodb.splits" -> "100" )) ).getDynamicFrame() print(dynamicFrame.getNumPartitions()) val dynamoDbSink: DynamoDbDataSink = glueContext.getSinkWithFormat( connectionType = "dynamodb", options = JsonOptions(Map( "dynamodb.output.tableName" -> "test_sink", "dynamodb.throughput.write.percent" -> "1.0" )) ).asInstanceOf[DynamoDbDataSink] dynamoDbSink.writeDynamicFrame(dynamicFrame) Job.commit() } }

"connectionType": "mongodb"

Designa una conexión a MongoDB. Las opciones de conexión son distintas entre una conexión de origen y una conexión de receptor.

"connectionType": "mongodb" como origen

Utilice las siguientes opciones de conexión con "connectionType": "mongodb" como origen:

  • "uri": (obligatorio) el host de MongoDB del que se va a leer, con formato mongodb://<host>:<port>.

  • "database": (obligatorio) la base de datos de MongoDB de la que se va a leer. Esta opción también se puede transferir a additional_options al llamar a glue_context.create_dynamic_frame_from_catalog en su script de trabajo.

  • "collection": (obligatorio) la colección de MongoDB de la que se va a leer. Esta opción también se puede transferir a additional_options al llamar a glue_context.create_dynamic_frame_from_catalog en su script de trabajo.

  • "username": (obligatorio) el nombre de usuario de MongoDB.

  • "password": (obligatorio) la contraseña de MongoDB.

  • "ssl": (opcional) si es true, inicia una conexión SSL. El valor predeterminado es false.

  • "ssl.domain_match": (opcional) si es true y ssl es true, se realiza la comprobación de coincidencia de dominio. El valor predeterminado es true.

  • "batchSize": (opcional): el número de documentos que se deben devolver por lote, que se utilizan dentro del cursor de lotes internos.

  • "partitioner": (opcional): el nombre de la clase del particionador para leer los datos de entrada de MongoDB. El conector proporciona los siguientes particionadores:

    • MongoDefaultPartitioner (predeterminado)

    • MongoSamplePartitioner (Requiere MongoDB 3.2 o posterior)

    • MongoShardedPartitioner

    • MongoSplitVectorPartitioner

    • MongoPaginateByCountPartitioner

    • MongoPaginateBySizePartitioner

  • "partitionerOptions" ( opcional): opciones para el particionador designado. Se admiten las siguientes opciones para cada particionador:

    • MongoSamplePartitioner: partitionKey, partitionSizeMB, samplesPerPartition

    • MongoShardedPartitioner: shardkey

    • MongoSplitVectorPartitioner: partitionKey, partitionSizeMB

    • MongoPaginateByCountPartitioner: partitionKey, numberOfPartitions

    • MongoPaginateBySizePartitioner: partitionKey, partitionSizeMB

    Para obtener más información acerca de estas opciones, consulte Partitioner Configuration en la documentación de MongoDB. Para ver el código de muestra, consulte Ejemplos: configuración de tipos y opciones de conexión.

"connectionType": "mongodb" como receptor

Utilice las siguientes opciones de conexión con "connectionType": "mongodb" como receptor:

  • "uri": (obligatorio) el host de MongoDB en el que se va a escribir, con formato mongodb://<host>:<port>.

  • "database": (obligatorio) la base de datos de MongoDB en la que se va a escribir.

  • "collection": (obligatorio) la colección de MongoDB en la que se va a escribir.

  • "username": (obligatorio) el nombre de usuario de MongoDB.

  • "password": (obligatorio) la contraseña de MongoDB.

  • "ssl": (opcional) si es true, inicia una conexión SSL. El valor predeterminado es false.

  • "ssl.domain_match": (opcional) si es true y ssl es true, se realiza la comprobación de coincidencia de dominio. El valor predeterminado es true.

  • "extendedBsonTypes": (opcional) si es true, habilitan los tipos de BSON ampliados al escribir datos en MongoDB. El valor predeterminado es true.

  • "replaceDocument": (opcional) si es true, reemplaza todo el documento al guardar conjuntos de datos que contienen un campo _id. Si es false, solo se actualizan los campos del documento que coinciden con los campos del conjunto de datos. El valor predeterminado es true.

  • "maxBatchSize": (opcional): el tamaño máximo del lote para operaciones en bloque al guardar datos. El valor predeterminado es 512.

Para ver el código de muestra, consulte Ejemplos: configuración de tipos y opciones de conexión.

Valores de conexiones de tipo personalizada y AWS Marketplace

Esto incluye lo siguiente:

  • "connectionType": "marketplace.athena": designa una conexión a un almacén de datos de Amazon Athena. La conexión utiliza un conector de AWS Marketplace.

  • "connectionType": "marketplace.spark": designa una conexión a un almacén de datos de Apache Spark. La conexión utiliza un conector de AWS Marketplace.

  • "connectionType": "marketplace.jdbc": designa una conexión a un almacén de datos de JDBC. La conexión utiliza un conector de AWS Marketplace.

  • "connectionType": "custom.athena": designa una conexión a un almacén de datos de Amazon Athena. La conexión utiliza un conector personalizado que se carga a AWS Glue Studio.

  • "connectionType": "custom.spark": designa una conexión a un almacén de datos de Apache Spark. La conexión utiliza un conector personalizado que se carga a AWS Glue Studio.

  • "connectionType": "custom.jdbc": designa una conexión a un almacén de datos de JDBC. La conexión utiliza un conector personalizado que se carga a AWS Glue Studio.

Opciones de conexión para el tipo custom.jdbc o marketplace.jdbc

  • className: cadena, obligatoria, nombre de clase de controlador.

  • connectionName: cadena, obligatoria, nombre de la conexión asociada al conector.

  • url: cadena, obligatoria, URL JDBC con marcadores de posición (${}) que se utilizan para construir la conexión al origen de datos. El marcador de posición ${secretKey} se reemplaza con el secreto del mismo nombre en AWS Secrets Manager. Consulte la documentación del almacén de datos para obtener más información sobre la construcción de la URL.

  • secretId o user/password: cadena, obligatoria, utilizada para recuperar credenciales de la URL.

  • dbTable o query: cadena, obligatoria, la tabla o consulta SQL de la que se obtienen los datos. Puede especificar dbTable o query, pero no ambos.

  • partitionColumn: cadena, opcional, el nombre de una columna entera que se utiliza para particionar. Esta opción solo funciona cuando está incluida con lowerBound, upperBound y numPartitions. Esta opción funciona de la misma manera que en el lector JDBC de Spark SQL. Para obtener más información, consulte JDBC a otras bases de datos en la Guía de Apache Spark SQL, DataFrames y conjuntos de datos.

    Los valores lowerBound y upperBound se utilizan para decidir el intervalo de partición, no para filtrar las filas de la tabla. Todas las filas de la tabla se particionan y se devuelven.

    nota

    Cuando se utiliza una consulta en lugar de un nombre de tabla, debe validar que la consulta funciona con la condición de partición especificada. Por ejemplo:

    • Si el formato de consulta es "SELECT col1 FROM table1", pruebe la consulta al agregar una cláusula WHERE al final de la consulta que utiliza la columna de partición.

    • Si su formato de consulta es “SELECT col1 FROM table1 WHERE col2=val", pruebe la consulta al ampliar la cláusula WHERE con AND y una expresión que utiliza la columna de partición.

  • lowerBound: entero, opcional, el valor mínimo de partitionColumn que se utiliza para decidir el intervalo de partición.

  • upperBound: entero, opcional, el valor máximo de partitionColumn que se utiliza para decidir el intervalo de partición.

  • numPartitions: entero, opcional, el número de particiones. Este valor, junto con lowerBound (inclusive) y upperBound (exclusivo), forma intervalos de partición para expresiones de la cláusula WHERE generadas, que se utilizan para dividir la partitionColumn.

    importante

    Preste atención al número de particiones, ya que demasiadas particiones pueden causar problemas en los sistemas de base de datos externos.

  • filterPredicate: cadena, opcional, condición adicional para filtrar datos desde el origen. Por ejemplo:

    BillingCity='Mountain View'

    Cuando se utiliza una consulta en lugar de una tabla, debe validar que la consulta funciona con el filterPredicate especificado. Por ejemplo:

    • Si el formato de consulta es "SELECT col1 FROM table1", pruebe la consulta al agregar una cláusula WHERE al final de la consulta que utiliza el predicado de filtrado.

    • Si su formato de consulta es "SELECT col1 FROM table1 WHERE col2=val", pruebe la consulta al ampliar la cláusula WHERE con AND y una expresión que utiliza el predicado de filtrado.

  • dataTypeMapping: diccionario, opcional, mapeo de tipos de datos personalizado, que crea un mapeo a partir de un tipo de datos JDBC a un tipo de datos de Glue. Por ejemplo, la opción "dataTypeMapping":{"FLOAT":"STRING"} asigna campos de datos de tipo FLOAT de JDBC al tipo String de Java al invocar el método ResultSet.getString() del controlador y lo utiliza para crear registros de AWS Glue. Cada controlador implementa el objeto ResultSet, por lo que el comportamiento es específico del controlador que se utiliza. Consulte la documentación del controlador JDBC para comprender cómo el controlador realiza las conversiones.

  • Los tipos de datos de AWS Glue que se admiten actualmente son:

    • DATE

    • STRING

    • TIMESTAMP

    • INT

    • FLOAT

    • LONG

    • BIGDECIMAL

    • BYTE

    • SHORT

    • DOUBLE

    Los tipos de datos JDBC soportados son Java8 java.sql.types.

    Las asignaciones de tipos de datos predeterminados (de JDBC a AWS Glue) son:

    • DATE -> DATE

    • VARCHAR -> STRING

    • CHAR -> STRING

    • LONGNVARCHAR -> STRING

    • TIMESTAMP -> TIMESTAMP

    • INTEGER -> INT

    • FLOAT -> FLOAT

    • REAL -> FLOAT

    • BIT -> BOOLEAN

    • BOOLEAN -> BOOLEAN

    • BIGINT -> LONG

    • DECIMAL -> BIGDECIMAL

    • NUMERIC -> BIGDECIMAL

    • TINYINT -> SHORT

    • SMALLINT -> SHORT

    • DOUBLE -> DOUBLE

    Si utiliza un mapeo de tipos de datos personalizada con la opción dataTypeMapping, puede anular el mapeo de tipos de datos predeterminado. Sólo los tipos de datos JDBC enumerados en la opción dataTypeMapping se ven afectados; el mapeo predeterminado se utiliza para todos los demás tipos de datos JDBC. Puede agregar mapeos para tipos de datos JDBC adicionales si es necesario. Si un tipo de datos JDBC no está incluido en la asignación predeterminada o en una asignación personalizada, el tipo de datos se convierte al tipo de datos STRING de AWS Glue de forma predeterminada.

En los ejemplos de código Python siguientes, se muestra cómo leer desde bases de datos JDBC con controladores JDBC AWS Marketplace. Demuestra la lectura desde una base de datos y la escritura en una ubicación S3.

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [connection_type = "marketplace.jdbc", connection_options = {"dataTypeMapping":{"INTEGER":"STRING"},"upperBound":"200","query":"select id, name, department from department where id < 200","numPartitions":"4", "partitionColumn":"id","lowerBound":"0","connectionName":"test-connection-jdbc"}, transformation_ctx = "DataSource0"] ## @return: DataSource0 ## @inputs: [] DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.jdbc", connection_options = {"dataTypeMapping":{"INTEGER":"STRING"}, "upperBound":"200","query":"select id, name, department from department where id < 200","numPartitions":"4","partitionColumn":"id","lowerBound":"0", "connectionName":"test-connection-jdbc"}, transformation_ctx = "DataSource0") ## @type: ApplyMapping ## @args: [mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0"] ## @return: Transform0 ## @inputs: [frame = DataSource0] Transform0 = ApplyMapping.apply(frame = DataSource0, mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0") ## @type: DataSink ## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0"] ## @return: DataSink0 ## @inputs: [frame = Transform0] DataSink0 = glueContext.write_dynamic_frame.from_options(frame = Transform0, connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0") job.commit()

Opciones de conexión para el tipo custom.athena o marketplace.athena

  • className: cadena, obligatoria, nombre de clase de controlador. Cuando se utiliza el conector Athena-CloudWatch, este valor de parámetro es el prefijo del nombre de clase (por ejemplo, "com.amazonaws.athena.connectors"). El conector Athena-CloudWatch se compone de dos clases: un controlador de metadatos y un controlador de registros. Si proporciona el prefijo común aquí, la API carga las clases correctas basadas en ese prefijo.

  • tableName: cadena, obligatoria, el nombre del flujo de registro de CloudWatch que se va a leer. Este fragmento de código usa el nombre de vista especial all_log_streams, lo que significa que el marco de datos dinámico devuelto contendrá datos de todos los flujos de registro incluidos en el grupo de registros.

  • schemaName: cadena, obligatoria, el nombre del grupo de registro de CloudWatch que se va a leer. Por ejemplo, /aws-glue/jobs/output.

  • connectionName: cadena, obligatoria, nombre de la conexión asociada al conector.

Para obtener opciones adicionales para este conector, consulte el archivo README (LÉAME) del conector de Amazon Athena CloudWatch en GitHub.

En el siguiente ejemplo de código de Python, se muestra cómo se lee desde un almacén de datos de Athena mediante un conector AWS Marketplace. Demuestra la lectura de Athena y la escritura en una ubicación S3.

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [connection_type = "marketplace.athena", connection_options = {"tableName":"all_log_streams","schemaName":"/aws-glue/jobs/output", "connectionName":"test-connection-athena"}, transformation_ctx = "DataSource0"] ## @return: DataSource0 ## @inputs: [] DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.athena", connection_options = {"tableName":"all_log_streams",, "schemaName":"/aws-glue/jobs/output","connectionName": "test-connection-athena"}, transformation_ctx = "DataSource0") ## @type: ApplyMapping ## @args: [mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0"] ## @return: Transform0 ## @inputs: [frame = DataSource0] Transform0 = ApplyMapping.apply(frame = DataSource0, mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0") ## @type: DataSink ## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0"] ## @return: DataSink0 ## @inputs: [frame = Transform0] DataSink0 = glueContext.write_dynamic_frame.from_options(frame = Transform0, connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0") job.commit()

Opciones de conexión para el tipo custom.spark o marketplace.spark

  • className: cadena, obligatoria, nombre de clase de conector.

  • secretId: cadena, opcional, se utiliza para recuperar las credenciales de la conexión del conector.

  • connectionName: cadena, obligatoria, nombre de la conexión asociada al conector.

  • Otras opciones dependen del almacén de datos. Por ejemplo, las opciones de configuración de OpenSearch comienzan con el prefijo es, tal y como se describe en la documentación Elasticsearch para Apache Hadoop. Las conexiones de Spark con Snowflake utilizan opciones tales como sfUser y sfPassword, como se describe en Uso del conector de Spark en la guía Conexión a Snowflake.

En el siguiente ejemplo de código de Python, se muestra cómo se lee desde un almacén de datos de OpenSearch mediante una conexión marketplace.spark.

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [connection_type = "marketplace.spark", connection_options = {"path":"test", "es.nodes.wan.only":"true","es.nodes":"https://<AWS endpoint>", "connectionName":"test-spark-es","es.port":"443"}, transformation_ctx = "DataSource0"] ## @return: DataSource0 ## @inputs: [] DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.spark", connection_options = {"path":"test","es.nodes.wan.only": "true","es.nodes":"https://<AWS endpoint>","connectionName": "test-spark-es","es.port":"443"}, transformation_ctx = "DataSource0") ## @type: DataSink ## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0"] ## @return: DataSink0 ## @inputs: [frame = DataSource0] DataSink0 = glueContext.write_dynamic_frame.from_options(frame = DataSource0, connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0") job.commit()

Opciones generales

Las opciones de esta sección se proporcionan como connection_options, pero no se aplican específicamente a un conector.

Los siguientes parámetros se utilizan generalmente al configurar los marcadores. Se pueden aplicar a los flujos de trabajo de Amazon S3 o JDBC. Para obtener más información, consulte Uso de marcadores de trabajo.

  • jobBookmarkKeys — Una matriz de nombres de columna.

  • jobBookmarkKeysSortOrder — Cadena que define cómo comparar valores en función del orden de clasificación. Valores válidos: "asc", "desc".

  • useS3ListImplementation — Se utiliza para administrar el rendimiento de la memoria al publicar los contenidos de los buckets de Amazon S3. Para obtener más información, consulte Cómo optimizar la gestión de la memoria en Glue AWS.