Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.
Tipos de conexión y opciones para ETL en AWS Glue
EnAWS Glue, varios métodos PySpark y transformaciones de Scala especifican el tipo de conexión mediante unconnectionType
parámetro. Especifican las opciones de conexión mediante un parámetro connectionOptions
o options
.
El parámetro connectionType
puede adoptar los valores que se muestran en la tabla siguiente. Los valores del parámetro connectionOptions
(u options
) asociados para cada tipo se documentan en las secciones siguientes. Salvo que se indique lo contrario, los parámetros se aplican cuando la conexión se utiliza como origen o receptor.
Para obtener un código de muestra que ilustra la configuración y que utiliza las opciones de conexión, consulte Ejemplos: configuración de tipos y opciones de conexión.
"connectionType": "Documentdb"
Designa una conexión a Amazon DocumentDB (compatible con MongoDB).
Las opciones de conexión son distintas entre una conexión de origen y una conexión de receptor.
"connectionType": "Documentdb" como origen
Utilice las siguientes opciones de conexión con "connectionType": "documentdb"
como origen:
-
"uri"
: (obligatorio) el host de Amazon DocumentDB del que se va a leer, con formatomongodb://<host>:<port>
. -
"database"
: (obligatorio) la base de datos de Amazon DocumentDB de la que se va a leer. -
"collection"
: (obligatorio) la recopilación de Amazon DocumentDB de la que se va a leer. -
"username"
: (obligatorio) nombre de usuario de Amazon DocumentDB. -
"password"
: (obligatorio) la contraseña de Amazon DocumentDB. -
"ssl"
: (obligatorio si usa SSL) si su conexión usa SSL, debe incluir esta opción con el valor"true"
. -
"ssl.domain_match"
: (obligatorio si usa SSL) si su conexión usa SSL, debe incluir esta opción con el valor"false"
. -
"batchSize"
: (opcional): el número de documentos que se deben devolver por lote, que se utilizan dentro del cursor de lotes internos. -
"partitioner"
: (opcional): el nombre de la clase del particionador para leer los datos de entrada de Amazon DocumentDB. El conector proporciona los siguientes particionadores:-
MongoDefaultPartitioner
(predeterminado) -
MongoSamplePartitioner
-
MongoShardedPartitioner
-
MongoSplitVectorPartitioner
-
MongoPaginateByCountPartitioner
-
MongoPaginateBySizePartitioner
-
-
"partitionerOptions"
( opcional): opciones para el particionador designado. Se admiten las siguientes opciones para cada particionador:-
MongoSamplePartitioner
:partitionKey
,partitionSizeMB
,samplesPerPartition
-
MongoShardedPartitioner
:shardkey
-
MongoSplitVectorPartitioner
:partitionKey
, partitionSizeMB -
MongoPaginateByCountPartitioner
:partitionKey
,numberOfPartitions
-
MongoPaginateBySizePartitioner
:partitionKey
, partitionSizeMB
Para obtener más información acerca de estas opciones, consulte Partitioner Configuration
en la documentación de MongoDB. Para ver el código de muestra, consulte Ejemplos: configuración de tipos y opciones de conexión. -
"connectionType": "Documentdb" como receptor
Utilice las siguientes opciones de conexión con "connectionType": "documentdb"
como receptor:
-
"uri"
: (obligatorio) el host de Amazon DocumentDB al que se va a escribir, con formatomongodb://<host>:<port>
. -
"database"
: (obligatorio) la base de datos de Amazon DocumentDB a la que se va a escribir. -
"collection"
: (obligatorio) la recopilación de Amazon DocumentDB a la que se va a escribir. -
"username"
: (obligatorio) nombre de usuario de Amazon DocumentDB. -
"password"
: (obligatorio) la contraseña de Amazon DocumentDB. -
"extendedBsonTypes"
: (opcional) si se establece entrue
, permite los tipos de BSON extendidos al escribir datos en Amazon DocumentDB. El valor predeterminado estrue
. -
"replaceDocument"
: (opcional) si estrue
, reemplaza todo el documento al guardar conjuntos de datos que contienen un campo_id
. Si esfalse
, solo se actualizan los campos del documento que coinciden con los campos del conjunto de datos. El valor predeterminado estrue
. -
"maxBatchSize"
: (opcional): el tamaño máximo del lote para operaciones en bloque al guardar datos. El valor predeterminado es 512.
Para ver el código de muestra, consulte Ejemplos: configuración de tipos y opciones de conexión.
"connectionType": "dynamodb"
Designa una conexión a Amazon DynamoDB.
Las opciones de conexión son distintas entre una conexión de origen y una conexión de receptor.
“connectionType”: “dynamodb” con el conector de ETL como origen
Utilice las siguientes opciones de conexión con "connectionType": "dynamodb"
como origen cuando use el conector de ETL de DynamoDB de AWS Glue:
-
"dynamodb.input.tableName"
: (obligatorio) la tabla de DynamoDB de la que se va a leer. -
"dynamodb.throughput.read.percent"
: (opcional) porcentaje de unidades de capacidad de lectura (RCU) que se usará. El valor predeterminado se establece en "0,5". Los valores aceptables abarcan de "0,1" a "1,5", inclusive.-
0.5
representa la tasa de lectura predeterminada, es decir que AWS Glue intentará consumir la mitad de la capacidad de lectura de la tabla. Si usted aumenta el valor por encima de0.5
, AWS Glue incrementará la tasa de solicitudes; si reduce el valor por debajo de0.5
, disminuirá la tasa de solicitudes de lectura. (La tasa de lectura real varía, según diversos factores tales como el hecho de que exista o no una distribución uniforme de claves en la tabla de DynamoDB). -
Cuando la tabla de DynamoDB está en modo bajo demanda, AWS Glue maneja la capacidad de lectura de la tabla como 40 000. Para exportar una tabla de gran extensión, recomendamos cambiar la tabla de DynamoDB al modo bajo demanda.
-
-
"dynamodb.splits"
: (opcional) define la cantidad de particiones en las que dividimos esta tabla de DynamoDB al leerla. El valor predeterminado se establece en "1". Los valores aceptables abarcan de "1" a "1,000,000", inclusive.-
1
representa que no hay paralelismo. Se recomienda especialmente que especifique un valor mayor para un mejor rendimiento mediante la fórmula siguiente. -
Le recomendamos que calcule
numSlots
con la siguiente fórmula, y lo use comodynamodb.splits
. Si necesita más rendimiento, le recomendamos que amplíe su trabajo al aumentar el número de DPU.La Cantidad de trabajos (
NumberOfWorkers
) se establece en la configuración del trabajo. Para obtener más información, consulte Agregar trabajos en AWS Glue. Si habilita el escalado automático, es posible que la cantidad de trabajos disponibles se ajuste debido a la carga de trabajo. Para contextualizar, un ejecutor está reservado para el controlador de Spark mientras que otros ejecutores procesan datos.-
numExecutors =
-
NumberOfWorkers - 1
siWorkerType
esG.1X
oG.2X
-
MaximumCapacity * 2 - 1
siWorkerType
esStandard
y la versión de AWS Glue es 2.0 o superior.(MaximumCapacity - 1) * 2 - 1
siWorkerType
esStandard
y la versión de AWS Glue es 1.0 o anterior.
-
-
numSlotsPerExecutor =
-
numSlots = numSlotsPerExecutor * numExecutors
-
-
-
"dynamodb.sts.roleArn"
: (opcional) el ARN de rol de IAM que se asumirá para el acceso entre cuentas. Este parámetro se encuentra disponible en AWS Glue 1.0 o posterior. -
"dynamodb.sts.roleSessionName"
: (opcional) nombre de sesión STS. El valor predeterminado se establece en «glue-dynamodb-read-sts-sts-sts-sts-sts-sts-sts-st Este parámetro se encuentra disponible en AWS Glue 1.0 o posterior.
En los siguientes ejemplos de código, se muestra cómo leer (con el conector de ETL) de tablas de DynamoDB y escribir en ellas. Demuestran la lectura de una tabla y la escritura en otra tabla.
nota
AWS Glue admite la lectura de datos de tablas de DynamoDB de otra cuenta de AWS. Para obtener más información, consulte Acceso entre cuentas y entre regiones a tablas de DynamoDB.
nota
El lector de ETL de DynamoDB no es compatible con filtros ni predicados de inserción.
“connectionType”: “dynamodb” con el conector de exportación de DynamoDB de AWS Glue como origen
Además del conector ETL deAWS Glue DynamoDB,AWS Glue ofrece un conector de exportación de DynamoDB que invoca unaExportTableToPointInTime
solicitud de DynamoDB y la almacena en una ubicación de Amazon S3 que usted proporcione, en el formato JSON de DynamoDB. AWS Glueluego crea un DynamicFrame objeto leyendo los datos de la ubicación de exportación de Amazon S3.
El conector de exportación funciona mejor que el conector de ETL cuando el tamaño de la tabla de DynamoDB es superior a 80 GB. Además, dado que la solicitud de exportación se lleva a cabo fuera de los procesos de Spark en un trabajo de AWS Glue, se puede habilitar el escalado automático de los trabajos de AWS Glue para guardar el uso de DPU durante la solicitud de exportación. Con el conector de exportación, tampoco es necesario configurar el número de divisiones del paralelismo del ejecutor de Spark o el porcentaje de lectura de rendimiento de DynamoDB.
Utilice las siguientes opciones de conexión con “connectionType”: “dynamodb” como origen cuando use el conector de exportación de DynamoDB de AWS Glue, que solo está disponible a partir de AWS Glue versión 2.0:
-
"dynamodb.export"
: (obligatorio) valor de cadena:Si se configura como
ddb
, habilita el conector de exportación de DynamoDB de AWS Glue cuando se invoque una nuevaExportTableToPointInTimeRequest
durante el trabajo de AWS Glue. Se generará una nueva exportación con la ubicación pasada desdedynamodb.s3.bucket
ydynamodb.s3.prefix
.Si se configura como
s3
, habilita el conector de exportación de DynamoDB de AWS Glue, pero omite la creación de una nueva exportación de DynamoDB y, en su lugar, utilizadynamodb.s3.bucket
ydynamodb.s3.prefix
como ubicación de Simple Storage Service (Amazon S3) de una exportación anterior de esa tabla.
-
"dynamodb.tableArn"
: (obligatorio) tabla de DynamoDB desde la que se debe leer. -
"dynamodb.unnestDDBJson"
: (opcional) toma un valor booleano. Si se configura como true, realiza una transformación no anidada de la estructura JSON de DynamoDB que está presente en las exportaciones. El valor predeterminado está configurado como false. -
"dynamodb.s3.bucket"
: (opcional) indica la ubicación del bucket de Simple Storage Service (Amazon S3) en el que debe llevarse a cabo el procesoExportTableToPointInTime
de DynamoDB. El formato de archivo para la exportación es DynamoDB JSON.-
"dynamodb.s3.prefix"
: (opcional) indica la ubicación del prefijo de Simple Storage Service (Amazon S3) dentro del bucket de Simple Storage Service (Amazon S3) en el que se almacenarán las cargasExportTableToPointInTime
de DynamoDB. Si no se especificadynamodb.s3.prefix
nidynamodb.s3.bucket
, estos valores adoptarán de manera predeterminada la ubicación del directorio temporal especificada en la configuración del trabajo de AWS Glue. Para obtener más información, consulte Parámetros especiales utilizados por AWS Glue. -
"dynamodb.s3.bucketOwner"
: indica el propietario del bucket que se necesita para el acceso entre cuentas de Simple Storage Service (Amazon S3).
-
-
"dynamodb.sts.roleArn"
: (opcional) ARN del rol de IAM que se asumirá para el acceso entre cuentas o el acceso entre regiones para la tabla de DynamoDB. Nota: El mismo ARN del rol de IAM se utilizará para acceder a la ubicación de Simple Storage Service (Amazon S3) especificada para la solicitudExportTableToPointInTime
. -
"dynamodb.sts.roleSessionName"
: (opcional) nombre de sesión STS. El valor predeterminado se establece en «glue-dynamodb-read-sts-sts-sts-sts-sts-sts-sts-st
nota
DynamoDB tiene requisitos específicos para invocar las solicitudes ExportTableToPointInTime
. Para obtener más información, consulte Solicitud de una exportación de tabla en DynamoDB. Por ejemplo, debe estar habilitada la restauración a un momento dado (PITR) en la tabla para utilizar este conector. El conector de DynamoDB también es compatible con el cifrado de AWS KMS para las exportaciones de DynamoDB a Amazon S3. Proporcionar la configuración de seguridad en la configuración del trabajo de AWS Glue habilita el cifrado de AWS KMS para una exportación de DynamoDB. La clave de KMS debe estar en la misma región que el bucket de Simple Storage Service (Amazon S3).
Tenga en cuenta que se aplican cargos adicionales por la exportación de DynamoDB y los costos de almacenamiento de Simple Storage Service (Amazon S3). Los datos exportados en Simple Storage Service (Amazon S3) se mantienen cuando finaliza la ejecución de un trabajo para que pueda reutilizarlos sin exportaciones adicionales de DynamoDB. Un requisito para utilizar este conector es que point-in-time la recuperación (PITR) esté habilitada para la tabla.
El conector de ETL y el conector de exportación de DynamoDB no son compatibles con la aplicación de filtros o predicados de inserción en el origen de DynamoDB.
Los siguientes ejemplos de código muestran cómo leer desde particiones (a través del conector de exportación) e imprimir el número de estas.
Estos ejemplos muestran cómo realizar la lectura desde particiones (a través del conector de exportación) e imprimir el número de estas desde una tabla de Data Catalog de AWS Glue que tenga una clasificación dynamodb
:
Examen de la estructura JSON de DynamoDB
Las exportaciones de DynamoDB con el conector de exportación de DynamoDB de AWS Glue pueden generar archivos JSON con estructuras anidadas específicas. Para obtener más información, consulte objetos de datos. AWS Glueproporciona una DynamicFrame transformación que puede convertir dichas estructuras en una easier-to-use forma para aplicaciones posteriores.
La transformación se puede invocar de dos formas. La primera forma consiste en usar una marca booleana que se pasa con el conector de exportación de DynamoDB para AWS Glue. La segunda forma consiste en llamar a la propia función de transformación.
Los siguientes ejemplos de código muestran cómo utilizar el conector de exportación de DynamoDB de AWS Glue, invocar un desanidamiento e imprimir el número de particiones:
La otra invocación de la transformación se realiza mediante una llamada a DynamicFrame función independiente. Para obtener más información, consulte DynamicFrame Clase para Python y DynamicFrame ClaseAWS Glue Scala para Scala.
“connectionType”: “dynamodb” con el conector de ETL como receptor
Utilice las siguientes opciones de conexión con "connectionType": "dynamodb"
como receptor:
-
"dynamodb.output.tableName"
: (obligatorio) tabla de DynamoDB en la que se va a escribir. -
"dynamodb.throughput.write.percent"
: (opcional) porcentaje de unidades de capacidad de escritura (WCU) que se usará. El valor predeterminado se establece en "0,5". Los valores aceptables abarcan de "0,1" a "1,5", inclusive.-
0.5
representa la tasa de escritura predeterminada, es decir que AWS Glue intentará consumir la mitad de la capacidad de escritura de la tabla. Si usted aumenta el valor por encima de 0,5, AWS Glue incrementará la tasa de solicitudes; si reduce el valor por debajo de 0,5, disminuirá la tasa de solicitudes de escritura. (La tasa de escritura real varía en función de diversos factores, tales como el hecho de que exista o no una distribución uniforme de claves en la tabla de DynamoDB). -
Cuando la tabla de DynamoDB está en modo bajo demanda, AWS Glue maneja la capacidad de escritura de la tabla como
40000
. Para importar una tabla grande, recomendamos cambiar la tabla de DynamoDB al modo bajo demanda.
-
-
"dynamodb.output.numParallelTasks"
: (opcional) define el número de tareas paralelas que escriben en DynamoDB al mismo tiempo. Se utiliza para calcular WCU permisiva por tarea de Spark. Si no desea controlar estos detalles, no es necesario que especifique este parámetro.-
permissiveWcuPerTask = TableWCU * dynamodb.throughput.write.percent / dynamodb.output.numParallelTasks
-
Si no especifica este parámetro, la WCU permisiva por tarea de Spark se calculará automáticamente mediante la siguiente fórmula:
-
numPartitions = dynamicframe.getNumPartitions()
-
numExecutors =
-
(DPU - 1) * 2 - 1
siWorkerType
esStandard
-
(NumberOfWorkers - 1)
siWorkerType
esG.1X
oG.2X
-
-
numSlotsPerExecutor =
-
4
siWorkerType
esStandard
-
8
siWorkerType
esG.1X
-
16
siWorkerType
esG.2X
-
-
numSlots = numSlotsPerExecutor * numExecutors
-
numParallelTasks = min(numPartitions, numSlots)
-
-
Ejemplo 1. DPU=10, WorkerType =Estándar. DynamicFrame La entrada tiene 100 particiones RDD.
-
numPartitions = 100
-
numExecutors = (10 - 1) * 2 - 1 = 17
-
numSlots = 4 * 17 = 68
-
numParallelTasks = min(100, 68) = 68
-
-
Ejemplo 2. DPU=10, WorkerType =Estándar. DynamicFrame La entrada tiene 20 particiones RDD.
-
numPartitions = 20
-
numExecutors = (10 - 1) * 2 - 1 = 17
-
numSlots = 4 * 17 = 68
-
numParallelTasks = min(20, 68) = 20
-
-
-
"dynamodb.output.retry"
: (opcional) define el número de reintentos que realizamos cuando hay unaProvisionedThroughputExceededException
de DynamoDB. El valor predeterminado se establece en “10”. -
"dynamodb.sts.roleArn"
: (opcional) el ARN de rol de IAM que se asumirá para el acceso entre cuentas. -
"dynamodb.sts.roleSessionName"
: (opcional) nombre de sesión STS. El valor predeterminado se establece en «glue-dynamodb-write-sts-sts-sts-sts-sts-sts-sts-st
nota
La versión de AWS Glue 1.0 o posterior admite la escritura de DynamoDB.
nota
AWS Glue admite la escritura de datos de tablas de DynamoDB en otra cuenta de AWS. Para obtener más información, consulte Acceso entre cuentas y entre regiones a tablas de DynamoDB.
En los siguientes ejemplos de código, se muestra cómo leer de tablas de DynamoDB y escribir en ellas. Demuestran la lectura de una tabla y la escritura en otra tabla.
“connectionType”: “kafka”
Designa una conexión a un clúster de Kafka o a un clúster de Amazon Managed Streaming for Apache Kafka.
Puede utilizar los métodos siguientes en el objeto GlueContext
para consumir registros de un origen de streaming de Kafka:
-
getCatalogSource
-
getSource
-
getSourceWithFormat
-
createDataFrameFromOptions
Si utiliza getCatalogSource
, el trabajo tiene la base de datos del Data Catalog y la información del nombre de la tabla, y puede usarla para obtener algunos parámetros básicos para la lectura desde el flujo de Apache Kafka. Si utiliza getSource
, getSourceWithFormat
o createDataFrameFromOptions
debe especificar estos parámetros de manera explícita:
Puede especificar estas opciones mediante connectionOptions
con getSource
o createDataFrameFromOptions
, options
con getSourceWithFormat
, o additionalOptions
con getCatalogSource
.
Utilice las siguientes opciones de conexión con "connectionType": "kafka"
:
-
bootstrap.servers
(obligatorio): una lista de direcciones URL Bootstrap, por ejemplo, comob-1.vpc-test-2.o4q88o.c6.kafka.us-east-1.amazonaws.com:9094
. Esta opción debe especificarse en la llamada a la API o definirse en los metadatos de la tabla en el Data Catalog. -
security.protocol
(obligatorio): el protocolo que se utiliza para la comunicación con los agentes. Los valores posibles son"SSL"
o"PLAINTEXT"
. -
topicName
(Obligatorio) Lista separada por comas de temas a los que suscribirse. Debe especificar solo una opción de"topicName"
,"assign"
o"subscribePattern"
. -
"assign"
: (Obligatorio) Una cadena JSON que especifica el valor deTopicPartitions
para consumir. Debe especificar solo una opción de"topicName"
,"assign"
o"subscribePattern"
.Ejemplo: “{"temaA":[0,1],"temaB":[2,4]}”
-
"subscribePattern"
: (obligatorio) cadena de expresiones regulares de Java que identifica la lista de temas a la que desea suscribirse. Debe especificar solo una opción de"topicName"
,"assign"
o"subscribePattern"
.Ejemplo: “tema.*”
-
classification
(opcional) -
delimiter
(opcional) -
"startingOffsets"
: (opcional) posición inicial en el tema de Kafka para leer los datos. Los valores posibles son"earliest"
o"latest"
. El valor predeterminado es"latest"
. -
"endingOffsets"
: (opcional) el punto final cuando finaliza una consulta por lotes. Los valores posibles son"latest"
o una cadena JSON que especifica una compensación final para cadaTopicPartition
.Para la cadena JSON, el formato es
{"topicA":{"0":23,"1":-1},"topicB":{"0":-1}}
. El valor-1
como compensación representa"latest"
. -
"pollTimeoutMs"
: (opcional) tiempo de espera en milisegundos para sondear datos de Kafka en ejecutores de trabajos de Spark. El valor predeterminado es512
. -
"numRetries"
: (opcional) el número de veces que se reintenta antes de no obtener las compensaciones de Kafka. El valor predeterminado es3
. -
"retryIntervalMs"
: (opcional) tiempo en milisegundos para esperar antes de volver a intentar obtener compensaciones Kafka. El valor predeterminado es10
. -
"maxOffsetsPerTrigger"
: (opcional) el límite de velocidad en el número máximo de compensaciones que se procesan por intervalo de desencadenador. El número total de compensaciones especificado se divide de forma proporcional entretopicPartitions
de diferentes volúmenes. El valor predeterminado es nulo, lo que significa que el consumidor lee todos las compensaciones hasta la última compensación conocida. -
"minPartitions"
: (opcional) el número mínimo deseado de particiones para leer desde Kafka. El valor predeterminado es nulo, lo que significa que el número de particiones de Spark es igual al número de particiones de Kafka. -
"includeHeaders"
: (opcional) si se deben incluir los encabezados de Kafka. Cuando la opción se establece en “verdadero”, la salida de datos contendrá una columna adicional denominada “glue_streaming_kafka_headers” con el tipoArray[Struct(key: String, value: String)]
. El valor predeterminado es “falso”. Esta opción se encuentra disponible en la versión 3.0 o posterior de AWS Glue. -
"schema"
: (Obligatorio cuando inferSchema se establece en false) Esquema que se va a utilizar para procesar la carga. Si la clasificación esavro
, el esquema proporcionado debe estar en el formato de esquema Avro. Si la clasificación no esavro
, el esquema proporcionado debe estar en el formato de esquema DDL.A continuación, se muestran algunos ejemplos de esquemas.
-
"inferSchema"
: (opcional) El valor predeterminado es “false”. Si se establece en “true”, el esquema se detectará durante el tiempo de ejecución desde la carga dentro deforeachbatch
. -
"avroSchema"
: (obsoleto) Parámetro utilizado para especificar un esquema de datos Avro cuando se utiliza el formato Avro. Este parámetro se ha quedado obsoleto. Utilice el parámetroschema
. -
"addRecordTimestamp"
: (opcional) cuando esta opción se establece en “true”, la salida de datos contendrá una columna adicional denominada “__src_timestamp” que indica la hora en la que el tema recibió el registro correspondiente. El valor predeterminado es "false". Esta opción es compatible con la versión 4.0 o posterior de AWS Glue. -
"emitConsumerLagMetrics"
: (opcional) cuando esta opción se establece en «verdadera», para cada lote, emitirá las métricas correspondientes al período comprendido entre el registro más antiguo recibido por el tema y el momento en que llegue a en cada lote, emitirá las métricas correspondientes al período comprendido entre el registro más antiguo recibido por el tema y el momento en que llegueAWS Glue a en cada lote CloudWatch. El nombre de la métrica es «glue.driver.driver.st.st.st.st.st.st. maxConsumerLagInMs». El valor predeterminado es "false". Esta opción es compatible con la versión 4.0 o posterior de AWS Glue.
“connectionType”: “kinesis”
Designa opciones de conexión para Amazon Kinesis Data Streams.
Puede leer desde una secuencia de datos de Amazon Kinesis mediante la información almacenada en una tabla de Data Catalog o al proporcionar información para acceder directamente a la secuencia de datos. Si accede directamente a la secuencia de datos, utilice estas opciones para proporcionar información sobre cómo acceder a la secuencia de datos.
Si utiliza getCatalogSource
o create_data_frame_from_catalog
para consumir registros de una fuente de streaming de Kinesis, el trabajo tiene la base de datos de Data Catalog y la información del nombre de la tabla, y puede utilizarla para obtener algunos parámetros básicos para la lectura de la fuente de streaming de Kinesis. Si utiliza getSource
, getSourceWithFormat
, createDataFrameFromOptions
o create_data_frame_from_options
debe especificar estos parámetros básicos mediante las opciones de conexión descritas aquí.
Puede especificar las opciones de conexión para Kinesis al utilizar los siguientes argumentos para los métodos especificados en la clase GlueContext
.
-
Scala
-
connectionOptions
: se debe utilizar congetSource
,createDataFrameFromOptions
-
additionalOptions
: se debe utilizar congetCatalogSource
-
options
: se debe utilizar congetSourceWithFormat
-
-
Python
-
connection_options
: se debe utilizar concreate_data_frame_from_options
-
additional_options
: se debe utilizar concreate_data_frame_from_catalog
-
options
: se debe utilizar congetSource
-
Utilice las siguientes opciones de conexión para los orígenes de datos de streaming de Kinesis:
-
streamARN
(obligatorio) el ARN de flujo de datos de Kinesis. -
classification
(opcional) -
delimiter
(opcional) -
"startingPosition"
: (opcional) posición inicial en el flujo de datos de Kinesis para leer los datos. Los valores posibles son"latest"
,"trim_horizon"
, o"earliest"
. El valor predeterminado es"latest"
. -
"awsSTSRoleARN"
: (opcional) el nombre de recurso de Amazon (ARN) del rol de que se asumirá mediante AWS Security Token Service (AWS STS). Este rol debe tener permisos para describir o leer operaciones de registros de la secuencia de datos de Kinesis. Debe utilizar este parámetro para acceder a un flujo de datos de otra cuenta. Se utiliza junto con"awsSTSSessionName"
. -
"awsSTSSessionName"
: (opcional) identificador de la sesión que asume el rol mediante AWS STS. Debe utilizar este parámetro para acceder a una secuencia de datos de otra cuenta. Se utiliza junto con"awsSTSRoleARN"
. -
"maxFetchTimeInMs"
: (opcional) el tiempo máximo empleado en el ejecutor de trabajos para obtener un registro del flujo de datos de Kinesis por fragmento, especificado en milisegundos (ms). El valor predeterminado es1000
. -
"maxFetchRecordsPerShard"
: (opcional) número máximo de registros que se recuperará por fragmento en el flujo de datos de Kinesis. El valor predeterminado es100000
. -
"maxRecordPerRead"
: (opcional) número máximo de registros que se recuperará del flujo de datos de Kinesis en cada operacióngetRecords
. El valor predeterminado es10000
. -
"addIdleTimeBetweenReads"
: (opcional) Agrega un retardo de tiempo entre dos operacionesgetRecords
consecutivas. El valor predeterminado es"False"
. Esta opción sólo se puede configurar para Glue versión 2.0 y superior. -
"idleTimeBetweenReadsInMs"
: (opcional) el tiempo mínimo de retraso entre dos operacionesgetRecords
consecutivas, especificado en ms. El valor predeterminado es1000
. Esta opción sólo se puede configurar para Glue versión 2.0 y superior. -
"describeShardInterval"
: (opcional) el intervalo mínimo de tiempo entre dos llamadas a la APIListShards
para que su script considere cambios en los fragmentos. Para obtener más información, consulte Estrategias para cambios en los fragmentos en la Guía del desarrollador de Amazon Kinesis Data Streams. El valor predeterminado es1s
. -
"numRetries"
: (opcional) el número máximo de reintentos para las solicitudes de la API de Kinesis Data Streams. El valor predeterminado es3
. -
"retryIntervalMs"
: (opcional) el período de tiempo de enfriamiento (especificado en ms) antes de volver a intentar la llamada a la API de Kinesis Data Streams. El valor predeterminado es1000
. -
"maxRetryIntervalMs"
: (opcional) el período de tiempo de enfriamiento máximo (especificado en ms) entre dos intentos de llamada a la API de Kinesis Data Streams. El valor predeterminado es10000
. -
"avoidEmptyBatches"
: (opcional) evita crear un trabajo de microlotes vacío al comprobar si hay datos no leídos en el flujo de datos de Kinesis antes de que se inicie el lote. El valor predeterminado es"False"
. -
"schema"
: (Obligatorio cuando inferSchema se establece en false) Esquema que se va a utilizar para procesar la carga. Si la clasificación esavro
, el esquema proporcionado debe estar en el formato de esquema Avro. Si la clasificación no esavro
, el esquema proporcionado debe estar en el formato de esquema DDL.A continuación, se muestran algunos ejemplos de esquemas.
-
"inferSchema"
: (opcional) El valor predeterminado es “false”. Si se establece en “true”, el esquema se detectará durante el tiempo de ejecución desde la carga dentro deforeachbatch
. -
"avroSchema"
: (obsoleto) Parámetro utilizado para especificar un esquema de datos Avro cuando se utiliza el formato Avro. Este parámetro se ha quedado obsoleto. Utilice el parámetroschema
. -
"addRecordTimestamp"
: (opcional) cuando esta opción se establece en “true”, la salida de datos contendrá una columna adicional denominada “__src_timestamp” que indica la hora en la que el flujo recibió el registro correspondiente. El valor predeterminado es "false". Esta opción es compatible con la versión 4.0 o posterior de AWS Glue. -
"emitConsumerLagMetrics"
: (opcional) cuando esta opción se establece en «verdadera», para cada lote, emitirá las métricas correspondientes al período comprendido entre el registro más antiguo recibido por el flujo y el momento en que llegue a en cada lote, emitirá las métricas correspondientes al período comprendido entre el registro más antiguo recibido por el flujo y el momento en que llegueAWS Glue a en cada lote CloudWatch. El nombre de la métrica es «glue.driver.driver.st.st.st.st.st.st. maxConsumerLagInMs». El valor predeterminado es "false". Esta opción es compatible con la versión 4.0 o posterior de AWS Glue.
"connectionType": "mongodb"
Designa una conexión a MongoDB. Las opciones de conexión son distintas entre una conexión de origen y una conexión de receptor.
"connectionType": "mongodb" como origen
Utilice las siguientes opciones de conexión con "connectionType": "mongodb"
como origen:
-
"uri"
: (obligatorio) el host de MongoDB del que se va a leer, con formatomongodb://<host>:<port>
. -
"database"
: (obligatorio) la base de datos de MongoDB de la que se va a leer. Esta opción también se puede transferir aadditional_options
al llamar aglue_context.create_dynamic_frame_from_catalog
en su script de trabajo. -
"collection"
: (obligatorio) la colección de MongoDB de la que se va a leer. Esta opción también se puede transferir aadditional_options
al llamar aglue_context.create_dynamic_frame_from_catalog
en su script de trabajo. -
"username"
: (obligatorio) el nombre de usuario de MongoDB. -
"password"
: (obligatorio) la contraseña de MongoDB. -
"ssl"
: (opcional) si estrue
, inicia una conexión SSL. El valor predeterminado esfalse
. -
"ssl.domain_match"
: (opcional) si estrue
yssl
estrue
, se realiza la comprobación de coincidencia de dominio. El valor predeterminado estrue
. -
"batchSize"
: (opcional): el número de documentos que se deben devolver por lote, que se utilizan dentro del cursor de lotes internos. -
"partitioner"
: (opcional): el nombre de la clase del particionador para leer los datos de entrada de MongoDB. El conector proporciona los siguientes particionadores:-
MongoDefaultPartitioner
(predeterminado) -
MongoSamplePartitioner
(Requiere MongoDB 3.2 o posterior) -
MongoShardedPartitioner
-
MongoSplitVectorPartitioner
-
MongoPaginateByCountPartitioner
-
MongoPaginateBySizePartitioner
-
-
"partitionerOptions"
( opcional): opciones para el particionador designado. Se admiten las siguientes opciones para cada particionador:-
MongoSamplePartitioner
:partitionKey
,partitionSizeMB
,samplesPerPartition
-
MongoShardedPartitioner
:shardkey
-
MongoSplitVectorPartitioner
:partitionKey
,partitionSizeMB
-
MongoPaginateByCountPartitioner
:partitionKey
,numberOfPartitions
-
MongoPaginateBySizePartitioner
:partitionKey
,partitionSizeMB
Para obtener más información acerca de estas opciones, consulte Partitioner Configuration
en la documentación de MongoDB. Para ver el código de muestra, consulte Ejemplos: configuración de tipos y opciones de conexión. -
"connectionType": "mongodb" como receptor
Utilice las siguientes opciones de conexión con "connectionType": "mongodb"
como receptor:
-
"uri"
: (obligatorio) el host de MongoDB en el que se va a escribir, con formatomongodb://<host>:<port>
. -
"database"
: (obligatorio) la base de datos de MongoDB en la que se va a escribir. -
"collection"
: (obligatorio) la colección de MongoDB en la que se va a escribir. -
"username"
: (obligatorio) el nombre de usuario de MongoDB. -
"password"
: (obligatorio) la contraseña de MongoDB. -
"ssl"
: (opcional) si estrue
, inicia una conexión SSL. El valor predeterminado esfalse
. -
"ssl.domain_match"
: (opcional) si estrue
yssl
estrue
, se realiza la comprobación de coincidencia de dominio. El valor predeterminado estrue
. -
"extendedBsonTypes"
: (opcional) si estrue
, habilitan los tipos de BSON ampliados al escribir datos en MongoDB. El valor predeterminado estrue
. -
"replaceDocument"
: (opcional) si estrue
, reemplaza todo el documento al guardar conjuntos de datos que contienen un campo_id
. Si esfalse
, solo se actualizan los campos del documento que coinciden con los campos del conjunto de datos. El valor predeterminado estrue
. -
"maxBatchSize"
: (opcional): el tamaño máximo del lote para operaciones en bloque al guardar datos. El valor predeterminado es 512.
Para ver el código de muestra, consulte Ejemplos: configuración de tipos y opciones de conexión.
"connectionType": "Orc"
Designa una conexión a archivos almacenados en Amazon S3 en formato de archivo Apache Hive Optimized Row Columnar (ORC)
Utilice las siguientes opciones de conexión con "connectionType": "orc"
:
-
paths
: (obligatorio) una lista de las rutas de Amazon S3 desde las que se va a leer. -
(Otros pares de nombre/valor de la opción): cualquier opción adicional, incluidas las opciones de formato, se pasa directamente a SparkSQL
DataSource
. Para obtener más información, consulte Amazon Redshift data source for Spark(Origen de datos de Amazon Redshift para Spark).
"connectionType": "parquet"
Designa una conexión a archivos almacenados en Amazon S3 con el formato de archivo Apache Parquet
Utilice las siguientes opciones de conexión con "connectionType": "parquet"
:
-
paths
: (obligatorio) una lista de las rutas de Amazon S3 desde las que se va a leer. -
(Otros pares de nombre/valor de la opción): cualquier opción adicional, incluidas las opciones de formato, se pasa directamente a SparkSQL
DataSource
. Para obtener más información, consulte la fuente de datos de Amazon Redshift para Sparken el GitHub sitio web.
"connectionType": "s3"
Designa una conexión a Amazon S3.
Utilice las siguientes opciones de conexión con "connectionType": "s3"
:
-
"paths"
: (obligatorio) una lista de las rutas de Amazon S3 desde las que se va a leer. -
"exclusions"
: (opcional) cadena que contiene una lista JSON de patrones glob de estilo Unix para excluir. Por ejemplo,"[\"**.pdf\"]"
excluye todos los archivos PDF. Para obtener más información acerca de la sintaxis glob que admite AWS Glue, consulte Incluir y excluir patrones. -
"compressionType"
: o "compression
": (opcional) especifica la forma en que los datos se comprimen. Use"compressionType"
para orígenes de Amazon S3 y"compression"
para destinos de Amazon S3. Po lo general no es necesario si los datos tienen una extensión de archivo estándar. Los posibles valores son"gzip"
y"bzip2"
). -
"groupFiles"
: (opcional) la agrupación de archivos se habilita de forma predeterminada cuando la entrada contiene más de 50 000 archivos. Para habilitar las agrupaciones con menos de 50 000 archivos, establezca este parámetro en"inPartition"
. Para deshabilitar las agrupaciones con más de 50 000 archivos, establezca este parámetro en"none"
. -
"groupSize"
: (opcional) tamaño del grupo de destino en bytes. El valor predeterminado se calcula en función del tamaño de los datos de entrada y el tamaño de su clúster. Cuando hay menos de 50 000 archivos de entrada,"groupFiles"
debe establecerse en"inPartition"
para que este valor surta efecto. -
"recurse"
: (opcional) si se establece en verdadero, lee recursivamente archivos en todos los subdirectorios de las rutas especificadas. -
"maxBand"
: (opcional, avanzada) esta opción controla la duración en milisegundos después de la que es probable que el listado des3
sea coherente. Se realiza un seguimiento de los archivos cuyas marcas de tiempo de modificación estén comprendidas en los últimos milisegundos demaxBand
, en especial cuando se utilizanJobBookmarks
para obtener coherencia eventual de Amazon S3. La mayoría de los usuarios no tienen que establecer esta opción. El valor predeterminado es 900 000 milisegundos, o 15 minutos. -
"maxFilesInBand"
: (opcional, avanzada) esta opción especifica el número máximo de archivos que deben guardarse desde los últimos segundos demaxBand
. Si se supera este número, los archivos adicionales se omiten y solo se procesarán en la siguiente ejecución del flujo de trabajo. La mayoría de los usuarios no tienen que establecer esta opción. -
"isFailFast"
: (opcional) esta opción determina si un trabajo de ETL de AWS Glue arroja excepciones de análisis del lector. Si se establece entrue
, los trabajos fallan rápidamente si cuatro reintentos de la tarea Spark no pueden analizar los datos en forma correcta.
Valores connectionType de JDBC
Entre los valores de ConnectionType de JDBC, se incluyen los siguientes:
-
"connectionType": "sqlserver"
: designa una conexión a una base de datos Microsoft SQL Server. -
"connectionType": "mysql"
: designa una conexión a una base de datos MySQL. -
"connectionType": "oracle"
: designa una conexión a una base de datos Oracle. -
"connectionType": "postgresql"
: designa una conexión a una base de datos PostgreSQL. -
"connectionType": "redshift"
: designa una conexión a una base de datos de Amazon Redshift. Para obtener más información, consulte Conexiones Redshift.
En la siguiente tabla, se muestran las versiones del controlador JDBC que AWS Glue admite.
Producto | Versiones del controlador JDBC para Glue 4.0 | Versiones del controlador JDBC para Glue 3.0 | Versiones del controlador JDBC para Glue 0.9, 1.0, 2.0 |
---|---|---|---|
Microsoft SQL Server | 9.4.0 | 7.x | 6.x |
MySQL | 8.0.23 | 8.0.23 | 5.1 |
Oracle Database | 21,7 | 21.1 | 11.2 |
PostgreSQL | 42.3.6 | 42.2.18 | 42.1.x |
MongoDB | 4.7.2 | 4.0.0 | 2.0.0 |
Amazon Redshift | redshift-jdbc42-2.1.0.9 | redshift-jdbc41-1.2.12.1017 | redshift-jdbc41-1.2.12.1017 |
Si ya tiene definida una conexión JDBC, puede reutilizar las propiedades de configuración definidas en ella, como: url, usuario y contraseña; de este modo, no tendrá que especificarlas en el código como opciones de conexión. Para ello, utilice las siguientes propiedades de conexión:
-
"useConnectionProperties"
: configúrelo en “true” para indicar que desea utilizar la configuración desde una conexión. -
"connectionName"
: ingrese el nombre de la conexión desde la que recuperará la configuración. La conexión debe definirse en la misma región que el trabajo.
Utilice estas opciones con las conexiones JDBC:
-
"url"
: (obligatorio) la URL de JDBC de la base de datos. -
"dbtable"
: la tabla de base de datos donde se efectúa la lectura. Para almacenes de datos de JDBC que admiten esquemas dentro de una base de datos, especifiqueschema.table-name
. Si no se ha proporcionado un esquema, se usa el esquema "public" predeterminado. -
"user"
: (obligatorio) nombre de usuario que se va a utilizar para establecer la conexión. -
"password"
: (obligatorio) contraseña que se utilizará para establecer la conexión. -
(Opcional) las siguientes opciones le permiten proporcionar un controlador JDBC personalizado. Utilice estas opciones si debe utilizar un controlador que AWS Glue no admita de forma nativa.
Los trabajos de ETL pueden utilizar diferentes versiones de controladores JDBC para el origen y el destino de los datos, incluso aunque el origen y el destino sean el mismo producto de base de datos. Esto le permite migrar datos entre bases de datos de origen y destino con diferentes versiones. Para usar estas opciones, primero debe subir el archivo JAR del controlador JDBC a Amazon S3.
-
"customJdbcDriverS3Path"
: ruta de Amazon S3 del controlador JDBC personalizado. -
"customJdbcDriverClassName"
: nombre de clase del controlador JDBC.
-
-
"bulksize"
: (opcional) se utiliza para configurar inserciones paralelas para acelerar las cargas masivas en los objetivos de JDBC. Especifique un valor entero para el grado de paralelismo que se utilizará al escribir o insertar datos. Esta opción resulta útil para mejorar el rendimiento de las escrituras en bases de datos como el repositorio de usuarios de Arch (AUR). -
"sampleQuery"
: la instrucción de consulta SQL personalizada para muestreo (opcional). De forma predeterminada, la consulta de ejemplo la ejecuta un ejecutor único. Si está leyendo un conjunto de datos grande, es posible que tenga que habilitar las particiones de JDBC para consultar una tabla en paralelo. Para obtener más información, consulte Lectura desde tablas de JDBC en paralelo. Para utilizarsampleQuery
con la partición JDBC, también configureenablePartitioningForSampleQuery
como true. -
"enablePartitioningForSampleQuery"
: (opcional) de forma predeterminada, esta opción es false (falso). Obligatorio si quiere usarsampleQuery
con una tabla JDBC particionada. Si se establece en true,sampleQuery
debe terminar con “where” (dónde) o “and” (y) para que AWS Glue agregue condiciones de partición. Consulte el ejemplo que se indica a continuación. -
"sampleSize"
: (opcional) limita el número de filas que devuelve la consulta de ejemplo. Funciona solo cuandoenablePartitioningForSampleQuery
es true. Si la partición no está habilitada, debe añadir directamente “limit x” (límite x) en elsampleQuery
para limitar el tamaño.ejemplo Uso de sampleQuery sin particionar
En el siguiente ejemplo de código, se muestra cómo utilizar
sampleQuery
sin particionar.//A full sql query statement. val query = "select name from $tableName where age > 0 limit 1" val connectionOptions = JsonOptions(Map( "url" -> url, "dbtable" -> table, "user" -> user, "password" -> password, "sampleQuery" -> query )) val dyf = glueContext.getSource("mysql", connectionOptions) .getDynamicFrame()
ejemplo Uso de sampleQuery con particiones JDBC
En el siguiente ejemplo de código, se muestra cómo utilizar
sampleQuery
con partición JDBC.//note that the query should end with "where" or "and" if use with JDBC partitioning. val query = "select name from $tableName where age > 0 and" //Enable JDBC partitioning by setting hashfield. //to use sampleQuery with partitioning, set enablePartitioningForSampleQuery. //use sampleSize to limit the size of returned data. val connectionOptions = JsonOptions(Map( "url" -> url, "dbtable" -> table, "user" -> user, "password" -> password, "hashfield" -> primaryKey, "sampleQuery" -> query, "enablePartitioningForSampleQuery" -> true, "sampleSize" -> "1" )) val dyf = glueContext.getSource("mysql", connectionOptions) .getDynamicFrame()
En el caso del tipo de conexión de Amazon Redshift, todos los demás pares nombre/valor que estén presentes en las opciones de una conexión JDBC, incluidas las opciones de formato, se pasan directamente al SparkSQL subyacente DataSource. Para obtener más información, consulte Amazon Redshift data source for Spark
En los ejemplos de código siguientes, se muestra cómo se utilizan controladores JDBC personalizados para leer bases de datos JDBC y escribir en ellas. En ellos, se muestra cómo se lee una versión de un producto de base de datos y se escribe en una versión posterior del mismo producto.
nota
Los trabajos de AWS Glue solo se asocian a una subred durante una ejecución. Esto puede afectar a su capacidad de conectarse a diversos orígenes de datos a través del mismo trabajo. Este comportamiento no se limita a los orígenes de JDBC.
Valores de conexiones de tipo personalizada y AWS Marketplace
Esto incluye lo siguiente:
-
"connectionType": "marketplace.athena"
: designa una conexión a un almacén de datos de Amazon Athena. La conexión utiliza un conector de AWS Marketplace. -
"connectionType": "marketplace.spark"
: designa una conexión a un almacén de datos de Apache Spark. La conexión utiliza un conector de AWS Marketplace. -
"connectionType": "marketplace.jdbc"
: designa una conexión a un almacén de datos de JDBC. La conexión utiliza un conector de AWS Marketplace. -
"connectionType": "custom.athena"
: designa una conexión a un almacén de datos de Amazon Athena. La conexión utiliza un conector personalizado que se carga a AWS Glue Studio. -
"connectionType": "custom.spark"
: designa una conexión a un almacén de datos de Apache Spark. La conexión utiliza un conector personalizado que se carga a AWS Glue Studio. -
"connectionType": "custom.jdbc"
: designa una conexión a un almacén de datos de JDBC. La conexión utiliza un conector personalizado que se carga a AWS Glue Studio.
Opciones de conexión para el tipo custom.jdbc o marketplace.jdbc
-
className
: cadena, obligatoria, nombre de clase de controlador. -
connectionName
: cadena, obligatoria, nombre de la conexión asociada al conector. -
url
: cadena, obligatoria, URL JDBC con marcadores de posición (${}
) que se utilizan para construir la conexión al origen de datos. El marcador de posición${secretKey}
se reemplaza con el secreto del mismo nombre en AWS Secrets Manager. Consulte la documentación del almacén de datos para obtener más información sobre la construcción de la URL. -
secretId
ouser/password
: cadena, obligatoria, utilizada para recuperar credenciales de la URL. -
dbTable
oquery
: cadena, obligatoria, la tabla o consulta SQL de la que se obtienen los datos. Puede especificardbTable
oquery
, pero no ambos. -
partitionColumn
: cadena, opcional, el nombre de una columna entera que se utiliza para particionar. Esta opción solo funciona cuando está incluida conlowerBound
,upperBound
ynumPartitions
. Esta opción funciona de la misma manera que en el lector JDBC de Spark SQL. Para obtener más información, consulte JDBC a otras bases de datosen la Guía de Apache Spark SQL DataFrames y conjuntos de datos. Los valores
lowerBound
yupperBound
se utilizan para decidir el intervalo de partición, no para filtrar las filas de la tabla. Todas las filas de la tabla se particionan y se devuelven.nota
Cuando se utiliza una consulta en lugar de un nombre de tabla, debe validar que la consulta funciona con la condición de partición especificada. Por ejemplo:
-
Si el formato de consulta es
"SELECT col1 FROM table1"
, pruebe la consulta al agregar una cláusulaWHERE
al final de la consulta que utiliza la columna de partición. -
Si su formato de consulta es “
SELECT col1 FROM table1 WHERE col2=val"
, pruebe la consulta al ampliar la cláusulaWHERE
conAND
y una expresión que utiliza la columna de partición.
-
-
lowerBound
: entero, opcional, el valor mínimo departitionColumn
que se utiliza para decidir el intervalo de partición. -
upperBound
: entero, opcional, el valor máximo departitionColumn
que se utiliza para decidir el intervalo de partición. -
numPartitions
: entero, opcional, el número de particiones. Este valor, junto conlowerBound
(inclusive) yupperBound
(exclusivo), forma intervalos de partición para expresiones de la cláusulaWHERE
generadas, que se utilizan para dividir lapartitionColumn
.importante
Preste atención al número de particiones, ya que demasiadas particiones pueden causar problemas en los sistemas de base de datos externos.
-
filterPredicate
: cadena, opcional, condición adicional para filtrar datos desde el origen. Por ejemplo:BillingCity='Mountain View'
Cuando se utiliza una consulta en lugar de una tabla, debe validar que la consulta funciona con el
filterPredicate
especificado. Por ejemplo:-
Si el formato de consulta es
"SELECT col1 FROM table1"
, pruebe la consulta al agregar una cláusulaWHERE
al final de la consulta que utiliza el predicado de filtrado. -
Si su formato de consulta es
"SELECT col1 FROM table1 WHERE col2=val"
, pruebe la consulta al ampliar la cláusulaWHERE
conAND
y una expresión que utiliza el predicado de filtrado.
-
-
dataTypeMapping
: diccionario, opcional, mapeo de tipos de datos personalizado, que crea un mapeo a partir de un tipo de datos JDBC a un tipo de datos de Glue. Por ejemplo, la opción"dataTypeMapping":{"FLOAT":"STRING"}
asigna campos de datos de tipoFLOAT
de JDBC al tipoString
de Java al invocar el métodoResultSet.getString()
del controlador y lo utiliza para crear registros de AWS Glue. Cada controlador implementa el objetoResultSet
, por lo que el comportamiento es específico del controlador que se utiliza. Consulte la documentación del controlador JDBC para comprender cómo el controlador realiza las conversiones. -
Los tipos de datos de AWS Glue que se admiten actualmente son:
-
DATE
-
STRING
-
TIMESTAMP
-
INT
-
FLOAT
-
LONG
-
BIGDECIMAL
-
BYTE
-
SHORT
-
DOUBLE
Los tipos de datos JDBC soportados son Java8 java.sql.types
. Las asignaciones de tipos de datos predeterminados (de JDBC a AWS Glue) son:
-
DATE -> DATE
-
VARCHAR -> STRING
-
CHAR -> STRING
-
LONGNVARCHAR -> STRING
-
TIMESTAMP -> TIMESTAMP
-
INTEGER -> INT
-
FLOAT -> FLOAT
-
REAL -> FLOAT
-
BIT -> BOOLEAN
-
BOOLEAN -> BOOLEAN
-
BIGINT -> LONG
-
DECIMAL -> BIGDECIMAL
-
NUMERIC -> BIGDECIMAL
-
TINYINT -> SHORT
-
SMALLINT -> SHORT
-
DOUBLE -> DOUBLE
Si utiliza un mapeo de tipos de datos personalizada con la opción
dataTypeMapping
, puede anular el mapeo de tipos de datos predeterminado. Sólo los tipos de datos JDBC enumerados en la opcióndataTypeMapping
se ven afectados; el mapeo predeterminado se utiliza para todos los demás tipos de datos JDBC. Puede agregar mapeos para tipos de datos JDBC adicionales si es necesario. Si un tipo de datos JDBC no está incluido en la asignación predeterminada o en una asignación personalizada, el tipo de datos se convierte al tipo de datosSTRING
de AWS Glue de forma predeterminada. -
En los ejemplos de código Python siguientes, se muestra cómo leer desde bases de datos JDBC con controladores JDBC AWS Marketplace. Demuestra la lectura desde una base de datos y la escritura en una ubicación S3.
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [connection_type = "marketplace.jdbc", connection_options = {"dataTypeMapping":{"INTEGER":"STRING"},"upperBound":"200","query":"select id, name, department from department where id < 200","numPartitions":"4", "partitionColumn":"id","lowerBound":"0","connectionName":"test-connection-jdbc"}, transformation_ctx = "DataSource0"] ## @return: DataSource0 ## @inputs: [] DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.jdbc", connection_options = {"dataTypeMapping":{"INTEGER":"STRING"}, "upperBound":"200","query":"select id, name, department from department where id < 200","numPartitions":"4","partitionColumn":"id","lowerBound":"0", "connectionName":"test-connection-jdbc"}, transformation_ctx = "DataSource0") ## @type: ApplyMapping ## @args: [mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0"] ## @return: Transform0 ## @inputs: [frame = DataSource0] Transform0 = ApplyMapping.apply(frame = DataSource0, mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0") ## @type: DataSink ## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3://
<S3 path>
/", "partitionKeys": []}, transformation_ctx = "DataSink0"] ## @return: DataSink0 ## @inputs: [frame = Transform0] DataSink0 = glueContext.write_dynamic_frame.from_options(frame = Transform0, connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>
/", "partitionKeys": []}, transformation_ctx = "DataSink0") job.commit()
Opciones de conexión para el tipo custom.athena o marketplace.athena
-
className
: cadena, obligatoria, nombre de clase de controlador. Cuando se utiliza elCloudWatch conector Athena-, este valor de parámetro es el prefijo del prefijo del nombre de clase (por ejemplo,"com.amazonaws.athena.connectors"
). ElCloudWatch conector Athena se compone de dos clases: un controlador de metadatos y un controlador de registros. Si proporciona el prefijo común aquí, la API carga las clases correctas basadas en ese prefijo. -
tableName
— cadena, obligatoria, el nombre del flujo de CloudWatch registro que se va a leer. Este fragmento de código usa el nombre de vista especialall_log_streams
, lo que significa que el marco de datos dinámico devuelto contendrá datos de todos los flujos de registro incluidos en el grupo de registros. -
schemaName
— cadena, obligatoria, el nombre del grupo de CloudWatch registro que se va a leer. Por ejemplo,/aws-glue/jobs/output
. -
connectionName
: cadena, obligatoria, nombre de la conexión asociada al conector.
Para obtener más opciones para este conector, consulte el archivo README de Amazon Athena CloudWatch Connector
En el siguiente ejemplo de código de Python, se muestra cómo se lee desde un almacén de datos de Athena mediante un conector AWS Marketplace. Demuestra la lectura de Athena y la escritura en una ubicación S3.
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [connection_type = "marketplace.athena", connection_options = {"tableName":"all_log_streams","schemaName":"/aws-glue/jobs/output", "connectionName":"test-connection-athena"}, transformation_ctx = "DataSource0"] ## @return: DataSource0 ## @inputs: [] DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.athena", connection_options = {"tableName":"all_log_streams",, "schemaName":"/aws-glue/jobs/output","connectionName": "test-connection-athena"}, transformation_ctx = "DataSource0") ## @type: ApplyMapping ## @args: [mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0"] ## @return: Transform0 ## @inputs: [frame = DataSource0] Transform0 = ApplyMapping.apply(frame = DataSource0, mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0") ## @type: DataSink ## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3://
<S3 path>
/", "partitionKeys": []}, transformation_ctx = "DataSink0"] ## @return: DataSink0 ## @inputs: [frame = Transform0] DataSink0 = glueContext.write_dynamic_frame.from_options(frame = Transform0, connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>
/", "partitionKeys": []}, transformation_ctx = "DataSink0") job.commit()
Opciones de conexión para el tipo custom.spark o marketplace.spark
-
className
: cadena, obligatoria, nombre de clase de conector. -
secretId
: cadena, opcional, se utiliza para recuperar las credenciales de la conexión del conector. -
connectionName
: cadena, obligatoria, nombre de la conexión asociada al conector. -
Otras opciones dependen del almacén de datos. Por ejemplo, las opciones OpenSearch de configuración de configuración de comienzan con el prefijo
es
, tal como se describe en la documentación Elasticicsearch para Apache Hadoop. Las conexiones de Spark con Snowflake utilizan opciones tales como sfUser
ysfPassword
, como se describe en Uso del conector de Sparken la guía Conexión a Snowflake.
En el siguiente ejemplo de código de Python, se muestra cómo se lee desde un almacén de OpenSearch datos de un almacén de datos mediante unamarketplace.spark
conexión.
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [connection_type = "marketplace.spark", connection_options = {"path":"test", "es.nodes.wan.only":"true","es.nodes":"https://
<AWS endpoint>
", "connectionName":"test-spark-es","es.port":"443"}, transformation_ctx = "DataSource0"] ## @return: DataSource0 ## @inputs: [] DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.spark", connection_options = {"path":"test","es.nodes.wan.only": "true","es.nodes":"https://<AWS endpoint>
","connectionName": "test-spark-es","es.port":"443"}, transformation_ctx = "DataSource0") ## @type: DataSink ## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>
/", "partitionKeys": []}, transformation_ctx = "DataSink0"] ## @return: DataSink0 ## @inputs: [frame = DataSource0] DataSink0 = glueContext.write_dynamic_frame.from_options(frame = DataSource0, connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>
/", "partitionKeys": []}, transformation_ctx = "DataSink0") job.commit()