Clase GlueContext
Envuelve el objeto SparkContext
__init__
__init__(sparkContext)
sparkContext
: el contexto de Apache Spark que se va a utilizar.
Creación
getSource
getSource(connection_type, transformation_ctx = "", **options)
Crea un objeto DataSource
que se puede utilizar para leer DynamicFrames
desde fuentes externas.
connection_type
: el tipo de conexión que se va a utilizar, como Amazon Simple Storage Service (Amazon S3), Amazon Redshift y JDBC. Los valores válidos sons3
,mysql
,postgresql
,redshift
,sqlserver
,oracle
ydynamodb
.transformation_ctx
: contexto de transformación que se va a utilizar (opcional).options
: conjunto de pares nombre-valor opcionales. Para obtener más información, consulte Tipos de conexión y opciones para ETL en AWS Glue para Spark.
A continuación, se muestra un ejemplo de cómo se utiliza getSource
.
>>> data_source = context.getSource("file", paths=["/in/path"]) >>> data_source.setFormat("json") >>> myFrame = data_source.getFrame()
create_dynamic_frame_from_rdd
create_dynamic_frame_from_rdd(data, name, schema=None, sample_ratio=None, transformation_ctx="")
Muestra un DynamicFrame
que se crea a partir de un conjunto de datos distribuido resistente (RDD) de Apache Spark.
data
: el origen de datos que se va a utilizar.name
: el nombre de los datos que se van a utilizar.schema
: el esquema que se va a utilizar (opcional).sample_ratio
: el ratio de muestra que se va a utilizar (opcional).transformation_ctx
: contexto de transformación que se va a utilizar (opcional).
create_dynamic_frame_from_catalog
create_dynamic_frame_from_catalog(database, table_name, redshift_tmp_dir, transformation_ctx = "", push_down_predicate= "", additional_options = {}, catalog_id = None)
Muestra un DynamicFrame
que se crea mediante una base de datos del Catálogo de datos y un nombre de tabla. Al utilizar este método, proporciona format_options
a través de las propiedades de la tabla en el AWS Glue Data Catalog especificada y otras opciones a través del argumento additional_options
.
Database
: la base de datos de lectura.table_name
: nombre de la tabla de lectura.redshift_tmp_dir
: directorio provisional de Amazon Redshift que se va a utilizar (opcional).transformation_ctx
: contexto de transformación que se va a utilizar (opcional).push_down_predicate
: filtra particiones sin tener que enumerar y leer todos los archivos del conjunto de datos. Para conocer los orígenes compatibles y las limitaciones, consulte Optimizing reads with pushdown in AWS Glue ETL. Para obtener más información, consulte Filtrado previo con predicados de inserción.additional_options
: conjunto de pares nombre-valor opcionales. Las opciones posibles incluyen las enumeradas en Tipos de conexión y opciones para ETL en AWS Glue para Spark, excepto porendpointUrl
,streamName
,bootstrap.servers
,security.protocol
,topicName
,classification
ydelimiter
. Otra opción soportada escatalogPartitionPredicate
:catalogPartitionPredicate
: puede transferir una expresión de catálogo para filtrar en función de las columnas de índice. Esto inserta el filtrado hacia el lado del servidor. Para obtener más información, consulte Índices de partición de AWS Glue. Tenga en cuenta quepush_down_predicate
ycatalogPartitionPredicate
utilizan sintaxis diferentes. El primero utiliza la sintaxis estándar de Spark SQL y el segundo utiliza el analizador JSQL.catalog_id
: ID de catálogo (ID de cuenta) del Catálogo de datos al que se accede. Cuando el valor es Ninguno, se utiliza el ID de cuenta predeterminado del intermediario.
create_dynamic_frame_from_options
create_dynamic_frame_from_options(connection_type, connection_options={},
format=None, format_options={}, transformation_ctx = "")
Muestra un DynamicFrame
que se crea con la conexión y el formato especificados.
connection_type
: el tipo de conexión, como Amazon S3, Amazon Redshift y JDBC. Los valores válidos sons3
,mysql
,postgresql
,redshift
,sqlserver
,oracle
ydynamodb
.connection_options
: las opciones de conexión, como las rutas y la tabla de bases de datos (opcional). Para unconnection_type
des3
, se define una lista de rutas de Amazon S3.connection_options = {"paths": ["
s3://aws-glue-target/temp
"]}Para conexiones JDBC, deben definirse varias propiedades. Tenga en cuenta que el nombre de la base de datos debe ser parte de la URL. Opcionalmente, se puede incluir en las opciones de conexión.
aviso
No se recomienda almacenar las contraseñas en el script. Considere utilizar
boto3
para recuperarlas de AWS Secrets Manager o del catálogo de datos de Glue AWS.connection_options = {"url": "
jdbc-url/database
", "user": "username
", "password":passwordVariable
,"dbtable": "table-name
", "redshiftTmpDir": "s3-tempdir-path
"}La propiedad
dbtable
es el nombre de la tabla de JDBC. Para almacenes de datos de JDBC que admiten esquemas dentro de una base de datos, especifiqueschema.table-name
. Si no se ha proporcionado un esquema, se usa el esquema "public" predeterminado.Para obtener más información, consulte Tipos de conexión y opciones para ETL en AWS Glue para Spark.
format
: una especificación de formato. Se utiliza con una conexión de Amazon S3 o AWS Glue que admite diversos formatos. Consulte en Opciones de formato de datos para las entradas y las salidas en AWS Glue para Spark los formatos que se admiten.format_options
: opciones del formato especificado. Consulte en Opciones de formato de datos para las entradas y las salidas en AWS Glue para Spark los formatos que se admiten.transformation_ctx
: contexto de transformación que se va a utilizar (opcional).push_down_predicate
: filtra particiones sin tener que enumerar y leer todos los archivos del conjunto de datos. Para conocer los orígenes compatibles y las limitaciones, consulte Optimizing reads with pushdown in AWS Glue ETL. Para obtener más información, consulte Filtrado previo con predicados de inserción.
create_sample_dynamic_frame_from_catalog
create_sample_dynamic_frame_from_catalog(database, table_name, num, redshift_tmp_dir, transformation_ctx = "", push_down_predicate= "", additional_options = {}, sample_options = {}, catalog_id = None)
Muestra un DynamicFrame
de ejemplo que se crea mediante una base de datos del Catálogo de datos y un nombre de tabla. La DynamicFrame
solo contiene los primeros registros de num
de un origen de datos.
-
database
: la base de datos de lectura. -
table_name
: nombre de la tabla de lectura. -
num
: número máximo de registros del marco dinámico de muestra arrojado. redshift_tmp_dir
: directorio provisional de Amazon Redshift que se va a utilizar (opcional).-
transformation_ctx
: contexto de transformación que se va a utilizar (opcional). push_down_predicate
: filtra particiones sin tener que enumerar y leer todos los archivos del conjunto de datos. Para obtener más información, consulte Filtrado previo con predicados de inserción.-
additional_options
: conjunto de pares nombre-valor opcionales. Las opciones posibles incluyen las enumeradas en Tipos de conexión y opciones para ETL en AWS Glue para Spark, excepto porendpointUrl
,streamName
,bootstrap.servers
,security.protocol
,topicName
,classification
ydelimiter
. -
sample_options
: parámetros para controlar el comportamiento del muestreo (opcional). Parámetros disponibles actuales para los orígenes de Simple Storage Service (Amazon S3):maxSamplePartitions
: número máximo de particiones que leerá el muestreo. El valor predeterminado es 10maxSampleFilesPerPartition
: número máximo de archivos que leerá el muestreo en una partición. El valor predeterminado es 10.Estos parámetros ayudan a reducir el tiempo que consume el listado de archivos. Por ejemplo, supongamos que el conjunto de datos tiene 1000 particiones y cada partición tiene 10 archivos. Si se configura
maxSamplePartitions
= 10 ymaxSampleFilesPerPartition
= 10, en lugar de enumerar los 10 000 archivos, el muestreo solo mostrará y leerá las 10 primeras particiones con los 10 primeros archivos de cada uno: 10 * 10 = 100 archivos en total.
-
catalog_id
: el ID de catálogo del Catálogo de datos al que se accede (el ID de cuenta del Catálogo de datos). De forma predeterminada, se establece enNone
.None
es el valor predeterminado para el ID de catálogo de la cuenta del servicio que hace la llamada.
create_sample_dynamic_frame_from_options
create_sample_dynamic_frame_from_options(connection_type, connection_options={}, num, sample_options={}, format=None, format_options={}, transformation_ctx = "")
Arroja un DynamicFrame
de ejemplo que se crea con la conexión y el formato especificados. La DynamicFrame
solo contiene los primeros registros de num
de un origen de datos.
connection_type
: el tipo de conexión, como Amazon S3, Amazon Redshift y JDBC. Los valores válidos sons3
,mysql
,postgresql
,redshift
,sqlserver
,oracle
ydynamodb
.connection_options
: las opciones de conexión, como las rutas y la tabla de bases de datos (opcional). Para obtener más información, consulte Tipos de conexión y opciones para ETL en AWS Glue para Spark.-
num
: número máximo de registros del marco dinámico de muestra arrojado. -
sample_options
: parámetros para controlar el comportamiento del muestreo (opcional). Parámetros disponibles actuales para los orígenes de Simple Storage Service (Amazon S3):maxSamplePartitions
: número máximo de particiones que leerá el muestreo. El valor predeterminado es 10maxSampleFilesPerPartition
: número máximo de archivos que leerá el muestreo en una partición. El valor predeterminado es 10.Estos parámetros ayudan a reducir el tiempo que consume el listado de archivos. Por ejemplo, supongamos que el conjunto de datos tiene 1000 particiones y cada partición tiene 10 archivos. Si se configura
maxSamplePartitions
= 10 ymaxSampleFilesPerPartition
= 10, en lugar de enumerar los 10 000 archivos, el muestreo solo mostrará y leerá las 10 primeras particiones con los 10 primeros archivos de cada uno: 10 * 10 = 100 archivos en total.
format
: una especificación de formato. Se utiliza con una conexión de Amazon S3 o AWS Glue que admite diversos formatos. Consulte en Opciones de formato de datos para las entradas y las salidas en AWS Glue para Spark los formatos que se admiten.format_options
: opciones del formato especificado. Consulte en Opciones de formato de datos para las entradas y las salidas en AWS Glue para Spark los formatos que se admiten.-
transformation_ctx
: contexto de transformación que se va a utilizar (opcional). push_down_predicate
: filtra particiones sin tener que enumerar y leer todos los archivos del conjunto de datos. Para obtener más información, consulte Filtrado previo con predicados de inserción.
add_ingestion_time_columns
add_ingestion_time_columns(dataFrame, timeGranularity = "")
Agrega columnas de tiempo de ingesta como ingest_year
, ingest_month
, ingest_day
, ingest_hour
, ingest_minute
al DataFrame
de entrada. Esta función se genera en forma automática en el script generado por AWS Glue cuando especifique una tabla del Catálogo de datos con Amazon S3 como destino. Esta función actualiza en forma automática la partición con columnas de tiempo de ingesta en la tabla de salida. Esto permite que los datos de salida se dividan automáticamente en el tiempo de ingesta sin requerir columnas de tiempo de ingesta explícitas en los datos de entrada.
-
dataFrame
: eldataFrame
al que anexar las columnas de tiempo de ingesta. -
timeGranularity
: la granularidad de las columnas de tiempo. Los valores válidos son “day
”, “hour
” y “minute
”. Por ejemplo, si “hour
” se transfiere a la función, eldataFrame
original tendrá las columnas de tiempo “ingest_year
,” “ingest_month
,” “ingest_day
” y “ingest_hour
” anexadas.
Devuelve el marco de datos después de anexar las columnas de granularidad de tiempo.
Ejemplo:
dynamic_frame = DynamicFrame.fromDF(glueContext.add_ingestion_time_columns(dataFrame, "hour"))
create_data_frame_from_catalog
create_data_frame_from_catalog(database, table_name, transformation_ctx = "",
additional_options = {})
Devuelve un DataFrame
que se crea con información de una tabla del Catálogo de datos.
-
database
: la base de datos del Catálogo de datos de la que se va a leer. -
table_name
: el nombre de la tabla del Catálogo de datos de la que se va a leer. -
transformation_ctx
: contexto de transformación que se va a utilizar (opcional). -
additional_options
: conjunto de pares nombre-valor opcionales. Las opciones posibles incluyen las enumeradas en Tipos de conexión y opciones para ETL en AWS Glue para Spark para orígenes de streaming, comostartingPosition
,maxFetchTimeInMs
ystartingOffsets
.-
useSparkDataSource
: cuando se establece en true (verdadero), obliga a AWS Glue a usar la API nativa de orígenes de datos de Spark para leer la tabla. La API de orígenes de datos de Spark admite los siguientes formatos: AVRO, binario, CSV, JSON, ORC, Parquet y texto. En una tabla del Catálogo de datos, especifique el formato mediante la propiedadclassification
. Para obtener más información sobre la API de orígenes de datos de Spark, consulte la documentación oficial de Apache Spark. El uso de
create_data_frame_from_catalog
conuseSparkDataSource
presenta las siguientes ventajas:-
Devuelve directamente un
DataFrame
y ofrece una alternativa acreate_dynamic_frame.from_catalog().toDF()
. -
Admite el control de permisos de tabla de AWS Lake Formation para formatos nativos.
-
Admite la lectura de formatos de lagos de datos sin el control de permisos de tabla de AWS Lake Formation. Para obtener más información, consulte Uso de marcos de lagos de datos con trabajos de ETL de AWS Glue.
Cuando habilita
useSparkDataSource
, también puede agregar cualquiera de las opciones de orígenes de datos de Sparken additional_options
según sea necesario. AWS Glue pasa estas opciones directamente al lector de Spark. -
-
useCatalogSchema
: cuando se establece en true (verdadero), AWS Glue aplica el esquema del Catálogo de datos alDataFrame
resultante. De lo contrario, el lector deduce el esquema a partir de los datos. Cuando se habilitauseCatalogSchema
, también se debe estableceruseSparkDataSource
en true (verdadero).
-
Limitaciones
Tenga en cuenta las siguientes limitaciones cuando utilice la opción useSparkDataSource
:
-
Cuando usa
useSparkDataSource
, AWS Glue crea un nuevoDataFrame
en una sesión independiente de Spark que es diferente de la sesión original de Spark. -
El filtrado de particiones de DataFrame de Spark no funciona con las siguientes características de AWS Glue.
Para utilizar el filtrado de particiones con estas características, puede utilizar el predicado de inserción de AWS Glue. Para obtener más información, consulte Filtrado previo con predicados de inserción. El filtrado de columnas no particionadas no se ve afectado.
En el siguiente script de ejemplo, se muestra la forma incorrecta de filtrar particiones con la opción
excludeStorageClasses
.// Incorrect partition filtering using Spark filter with excludeStorageClasses read_df = glueContext.create_data_frame.from_catalog( database=database_name, table_name=table_name, additional_options = { "useSparkDataSource": True, "excludeStorageClasses" : ["GLACIER", "DEEP_ARCHIVE"] } ) // Suppose year and month are partition keys. // Filtering on year and month won't work, the filtered_df will still // contain data with other year/month values. filtered_df = read_df.filter("year == '2017 and month == '04' and 'state == 'CA'")
En el siguiente script de ejemplo, se muestra la forma correcta de utilizar un predicado de inserción para filtrar particiones con la opción
excludeStorageClasses
.// Correct partition filtering using the AWS Glue pushdown predicate // with excludeStorageClasses read_df = glueContext.create_data_frame.from_catalog( database=database_name, table_name=table_name, // Use AWS Glue pushdown predicate to perform partition filtering push_down_predicate = "(year=='2017' and month=='04')" additional_options = { "useSparkDataSource": True, "excludeStorageClasses" : ["GLACIER", "DEEP_ARCHIVE"] } ) // Use Spark filter only on non-partitioned columns filtered_df = read_df.filter("state == 'CA'")
Ejemplo: crear una tabla CSV con el lector de orígenes de datos de Spark
// Read a CSV table with '\t' as separator read_df = glueContext.create_data_frame.from_catalog( database=
<database_name>
, table_name=<table_name>
, additional_options = {"useSparkDataSource": True, "sep": '\t'} )
create_data_frame_from_options
create_data_frame_from_options(connection_type, connection_options={},
format=None, format_options={}, transformation_ctx = "")
Esta API ha quedado obsoleta. En su lugar, use la API getSource()
. Muestra un DataFrame
que se crea con la conexión y el formato especificados. Utilice esta función solo con orígenes de streaming de AWS Glue.
-
connection_type
: el tipo de conexión de streaming. Los valores válidos sonkinesis
ykafka
. -
connection_options
: opciones de conexión, que son diferentes para Kinesis y Kafka. Puede encontrar la lista de todas las opciones de conexión para cada origen de datos de streaming en Tipos de conexión y opciones para ETL en AWS Glue para Spark. Tenga en cuenta las siguientes diferencias en las opciones de conexión de streaming:-
Los orígenes de streaming de Kinesis requieren
streamARN
,startingPosition
,inferSchema
yclassification
. -
Los orígenes de streaming de Kafka requieren
connectionName
,topicName
,startingOffsets
,inferSchema
yclassification
.
-
-
format
: una especificación de formato. Se utiliza con una conexión de Amazon S3 o AWS Glue que admite diversos formatos. Para obtener información acerca de los formatos soportados, consulte Opciones de formato de datos para las entradas y las salidas en AWS Glue para Spark. -
format_options
: opciones del formato especificado. Para obtener información acerca de las opciones de formatos soportados, consulte Opciones de formato de datos para las entradas y las salidas en AWS Glue para Spark. -
transformation_ctx
: contexto de transformación que se va a utilizar (opcional).
Ejemplo de origen de streaming de Amazon Kinesis:
kinesis_options = { "streamARN": "arn:aws:kinesis:us-east-2:777788889999:stream/fromOptionsStream", "startingPosition": "TRIM_HORIZON", "inferSchema": "true", "classification": "json" } data_frame_datasource0 = glueContext.create_data_frame.from_options(connection_type="kinesis", connection_options=kinesis_options)
Ejemplo de origen de streaming de Kafka:
kafka_options = { "connectionName": "ConfluentKafka", "topicName": "kafka-auth-topic", "startingOffsets": "earliest", "inferSchema": "true", "classification": "json" } data_frame_datasource0 = glueContext.create_data_frame.from_options(connection_type="kafka", connection_options=kafka_options)
forEachBatch
forEachBatch(frame, batch_function, options)
Se aplica la batch_function
transferida a cada microlote que se lee desde el origen de streaming.
-
frame
: el DataFrame que contiene el microlote actual. -
batch_function
: una función que se aplicará para cada microlote. -
options
: una recopilación de pares clave-valor que contiene información sobre cómo procesar microlotes. Se requieren las siguientes opciones:-
windowSize
: cantidad de tiempo que se debe dedicar al procesamiento de cada lote. -
checkpointLocation
: la ubicación donde se almacenan los puntos de verificación para el trabajo de ETL de streaming. -
batchMaxRetries
: número máximo de reintentos permitidos para este lote si se genera un error. El valor predeterminado es 3. Esta opción sólo se puede configurar para Glue versión 2.0 y superior.
-
Ejemplo:
glueContext.forEachBatch( frame = data_frame_datasource0, batch_function = processBatch, options = { "windowSize": "100 seconds", "checkpointLocation": "s3://kafka-auth-dataplane/confluent-test/output/checkpoint/" } ) def processBatch(data_frame, batchId): if (data_frame.count() > 0): datasource0 = DynamicFrame.fromDF( glueContext.add_ingestion_time_columns(data_frame, "hour"), glueContext, "from_data_frame" ) additionalOptions_datasink1 = {"enableUpdateCatalog": True} additionalOptions_datasink1["partitionKeys"] = ["ingest_yr", "ingest_mo", "ingest_day"] datasink1 = glueContext.write_dynamic_frame.from_catalog( frame = datasource0, database = "tempdb", table_name = "kafka-auth-table-output", transformation_ctx = "datasink1", additional_options = additionalOptions_datasink1 )
Trabajar con conjuntos de datos en Amazon S3
purge_table
purge_table(catalog_id=None, database="", table_name="", options={},
transformation_ctx="")
Elimina archivos de Amazon S3 correspondientes a la base de datos y la tabla del catálogo especificado. Si se eliminan todos los archivos de una partición, esa partición también se eliminará del catálogo.
Si desea poder recuperar los objetos eliminados, puede habilitar control de versiones de objetos en el bucket de Amazon S3. Cuando se elimina un objeto de un bucket que no tiene habilitado el control de versiones de objetos, el objeto no se puede recuperar. Para obtener más información acerca de cómo recuperar objetos eliminados en un bucket habilitado para versiones, consulte ¿Cómo puedo recuperar un objeto de Amazon S3 eliminado?
-
catalog_id
: el ID de catálogo del Catálogo de datos al que se accede (el ID de cuenta del Catálogo de datos). De forma predeterminada, se establece enNone
.None
es el valor predeterminado para el ID de catálogo de la cuenta del servicio que hace la llamada. database
: la base de datos que se va a utilizar.table_name
: nombre de la tabla que se utilizará.options
: opciones para filtrar los archivos que se van a eliminar y para la generación de archivos de manifiesto.retentionPeriod
: especifica un período para retener los archivos en cantidad de horas. Se mantienen los archivos que son posteriores al período de retención. De forma predeterminada, se establece en 168 horas (7 días).partitionPredicate
: se eliminan las particiones que cumplen con este predicado. Los archivos comprendidos en el período de retención de estas particiones no se eliminan. Configurar en""
, valor vacío de forma predeterminada.excludeStorageClasses
: no se eliminan los archivos con clase de almacenamiento configurada enexcludeStorageClasses
. El valor predeterminado esSet()
, un conjunto vacío.manifestFilePath
: una ruta opcional para la generación de archivos de manifiesto. Todos los archivos que se depuraron correctamente se registran enSuccess.csv
, mientras que los que no lo hicieron se registran enFailed.csv
transformation_ctx
: contexto de transformación que se va a utilizar (opcional). Se utiliza en la ruta del archivo de manifiesto.
glueContext.purge_table("database", "table", {"partitionPredicate": "(month=='march')", "retentionPeriod": 1, "excludeStorageClasses": ["STANDARD_IA"], "manifestFilePath": "s3://bucketmanifest/"})
purge_s3_path
purge_s3_path(s3_path, options={}, transformation_ctx="")
Elimina archivos de la ruta de Amazon S3 especificada recursivamente.
Si desea poder recuperar los objetos eliminados, puede habilitar control de versiones de objetos en el bucket de Amazon S3. Cuando se elimina un objeto de un bucket que no tiene habilitado el control de versiones de objetos, el objeto no se puede recuperar. Para obtener más información acerca de cómo recuperar objetos eliminados en un bucket habilitado para control de versiones, consulte ¿Cómo puedo recuperar un objeto de Amazon S3 eliminado?
s3_path
: la ruta de acceso en Amazon S3 de los archivos que se van a eliminar en el formatos3://<
bucket
>/<prefix
>/options
: opciones para filtrar los archivos que se van a eliminar y para la generación de archivos de manifiesto.retentionPeriod
: especifica un período para retener los archivos en cantidad de horas. Se mantienen los archivos que son posteriores al período de retención. De forma predeterminada, se establece en 168 horas (7 días).excludeStorageClasses
: no se eliminan los archivos con clase de almacenamiento configurada enexcludeStorageClasses
. El valor predeterminado esSet()
, un conjunto vacío.manifestFilePath
: una ruta opcional para la generación de archivos de manifiesto. Todos los archivos que se depuraron correctamente se registran enSuccess.csv
, mientras que los que no lo hicieron se registran enFailed.csv
transformation_ctx
: contexto de transformación que se va a utilizar (opcional). Se utiliza en la ruta del archivo de manifiesto.
glueContext.purge_s3_path("s3://bucket/path/", {"retentionPeriod": 1, "excludeStorageClasses": ["STANDARD_IA"], "manifestFilePath": "s3://bucketmanifest/"})
transition_table
transition_table(database, table_name, transition_to, options={}, transformation_ctx="", catalog_id=None)
Inicia la transición de la clase de almacenamiento de los archivos almacenados en Amazon S3 correspondientes a la base de datos y la tabla del catálogo especificado.
Puede realizar la transición entre dos clases de almacenamiento cualquiera. En el caso de las clases de almacenamiento GLACIER
y DEEP_ARCHIVE
, la transición puede hacerse a estas clases. Sin embargo, debería utilizar S3 RESTORE
para realizar la transición de GLACIER
y las clases de almacenamiento DEEP_ARCHIVE
.
Si ejecuta trabajos de ETL de AWS Glue que leen archivos o particiones de Amazon S3, puede excluir algunos tipos de clases de almacenamiento de Amazon S3. Para obtener más información, consulte Exclusión de clases de almacenamiento de Amazon S3.
database
: la base de datos que se va a utilizar.table_name
: nombre de la tabla que se utilizará.transition_to
: la clase de almacenamiento de Amazon S3 hacia la que se hará la transición.options
: opciones para filtrar los archivos que se van a eliminar y para la generación de archivos de manifiesto.retentionPeriod
: especifica un período para retener los archivos en cantidad de horas. Se mantienen los archivos que son posteriores al período de retención. De forma predeterminada, se establece en 168 horas (7 días).partitionPredicate
: se realiza la transición de las particiones que cumplen con este predicado. Los archivos comprendidos en el período de retención de estas particiones no realizan la transición. Configurar en""
, valor vacío de forma predeterminada.excludeStorageClasses
: no se realiza la transición de los archivos con clase de almacenamiento enexcludeStorageClasses
. El valor predeterminado esSet()
, un conjunto vacío.manifestFilePath
: una ruta opcional para la generación de archivos de manifiesto. Todos los archivos que realizaron la transición correctamente se registran enSuccess.csv
, mientras que los que no lo hicieron se registran enFailed.csv
accountId
: el ID de cuenta de Amazon Web Services para ejecutar la transformación de transición. Es obligatorio para esta transformación.roleArn
: el rol AWS para ejecutar la transformación de transición. Es obligatorio para esta transformación.
transformation_ctx
: contexto de transformación que se va a utilizar (opcional). Se utiliza en la ruta del archivo de manifiesto.catalog_id
: el ID de catálogo del Catálogo de datos al que se accede (el ID de cuenta del Catálogo de datos). De forma predeterminada, se establece enNone
.None
es el valor predeterminado para el ID de catálogo de la cuenta del servicio que hace la llamada.
glueContext.transition_table("database", "table", "STANDARD_IA", {"retentionPeriod": 1, "excludeStorageClasses": ["STANDARD_IA"], "manifestFilePath": "s3://bucketmanifest/", "accountId": "12345678901", "roleArn": "arn:aws:iam::123456789012:user/example-username"})
transition_s3_path
transition_s3_path(s3_path, transition_to, options={}, transformation_ctx="")
Realiza transiciones recursivas en la clase de almacenamiento de los archivos de la ruta de Amazon S3 especificada.
Puede realizar la transición entre dos clases de almacenamiento cualquiera. En el caso de las clases de almacenamiento GLACIER
y DEEP_ARCHIVE
, la transición puede hacerse a estas clases. Sin embargo, debería utilizar S3 RESTORE
para realizar la transición de GLACIER
y las clases de almacenamiento DEEP_ARCHIVE
.
Si ejecuta trabajos de ETL de AWS Glue que leen archivos o particiones de Amazon S3, puede excluir algunos tipos de clases de almacenamiento de Amazon S3. Para obtener más información, consulte Exclusión de clases de almacenamiento de Amazon S3.
s3_path
: la ruta en Amazon S3 de los archivos sobre los que se realizará la transición en el formatos3://<
bucket
>/<prefix
>/transition_to
: la clase de almacenamiento de Amazon S3 hacia la que se hará la transición.options
: opciones para filtrar los archivos que se van a eliminar y para la generación de archivos de manifiesto.retentionPeriod
: especifica un período para retener los archivos en cantidad de horas. Se mantienen los archivos que son posteriores al período de retención. De forma predeterminada, se establece en 168 horas (7 días).partitionPredicate
: se realiza la transición de las particiones que cumplen con este predicado. Los archivos comprendidos en el período de retención de estas particiones no realizan la transición. Configurar en""
, valor vacío de forma predeterminada.excludeStorageClasses
: no se realiza la transición de los archivos con clase de almacenamiento enexcludeStorageClasses
. El valor predeterminado esSet()
, un conjunto vacío.manifestFilePath
: una ruta opcional para la generación de archivos de manifiesto. Todos los archivos que realizaron la transición correctamente se registran enSuccess.csv
, mientras que los que no lo hicieron se registran enFailed.csv
accountId
: el ID de cuenta de Amazon Web Services para ejecutar la transformación de transición. Es obligatorio para esta transformación.roleArn
: el rol AWS para ejecutar la transformación de transición. Es obligatorio para esta transformación.
transformation_ctx
: contexto de transformación que se va a utilizar (opcional). Se utiliza en la ruta del archivo de manifiesto.
glueContext.transition_s3_path("s3://bucket/prefix/", "STANDARD_IA", {"retentionPeriod": 1, "excludeStorageClasses": ["STANDARD_IA"], "manifestFilePath": "s3://bucketmanifest/", "accountId": "12345678901", "roleArn": "arn:aws:iam::123456789012:user/example-username"})
Extracción
extract_jdbc_conf
extract_jdbc_conf(connection_name, catalog_id = None)
Devuelve dict
con claves con las propiedades de configuración del objeto de conexión de AWS Glue en el Catálogo de datos.
user
: nombre de usuario de la base de datos.password
: contraseña de la base de datos.vendor
: especifica un proveedor (mysql
,postgresql
,oracle
,sqlserver
, etc.).enforceSSL
: una cadena booleana que indica si se requiere una conexión segura.customJDBCCert
: uso de un certificado de cliente específico de la ruta de Amazon S3 indicada.skipCustomJDBCCertValidation
: una cadena booleana que indica sicustomJDBCCert
debe ser validado por una CA.customJDBCCertString
: información adicional sobre el certificado personalizado, específico para el tipo de controlador.url
: URL de JDBC (obsoleta) con solo protocolo, servidor y puerto.fullUrl
: URL de JDBC introducida cuando se creó la conexión (Disponible en AWS Glue versión 3.0 o posterior).
Ejemplo de recuperación de configuraciones de JDBC:
jdbc_conf = glueContext.extract_jdbc_conf(connection_name="your_glue_connection_name") print(jdbc_conf) >>> {'enforceSSL': 'false', 'skipCustomJDBCCertValidation': 'false', 'url': 'jdbc:mysql://myserver:3306', 'fullUrl': 'jdbc:mysql://myserver:3306/mydb', 'customJDBCCertString': '', 'user': 'admin', 'customJDBCCert': '', 'password': '1234', 'vendor': 'mysql'}
Transacciones
start_transaction
start_transaction(read_only)
Inicie una nueva transacción. Llama de forma interna a la API startTransaction de Lake Formation.
read_only
: (booleano) indica si esta transacción debe ser de solo lectura o de lectura y escritura. Se rechazarán las escrituras realizadas con un ID de transacción de solo lectura. No es necesario confirmar las transacciones de solo lectura.
Devuelve el ID de la transacción.
commit_transaction
commit_transaction(transaction_id, wait_for_commit = True)
Intenta confirmar la transacción especificada. Es posible que se devuelva commit_transaction
antes de que la transacción haya terminado de confirmarse. Llama de forma interna a la API commitTransaction de Lake Formation.
transaction_id
: (cadena) la transacción que se confirmará.wait_for_commit
: (booleano) determina si se devuelvecommit_transaction
de inmediato. El valor predeterminado es true. Si es falso,commit_transaction
realiza un sondeo y espera hasta que la transacción se haya confirmado. La cantidad de tiempo de espera se limita a un minuto mediante retroceso exponencial con un máximo de seis reintentos.
Devuelve un valor booleano para indicar si se realizó o no la confirmación.
cancel_transaction
cancel_transaction(transaction_id)
Intenta cancelar la transacción especificada. Devuelve una excepción de TransactionCommittedException
si la transacción se había confirmado anteriormente. Llama de forma interna a la API CancelTransaction de Lake Formation.
-
transaction_id
: (cadena) la transacción que se cancelará.
Escritura
getSink
getSink(connection_type, format = None, transformation_ctx = "", **options)
Obtiene un objeto DataSink
que se puede utilizar para escribir DynamicFrames
en fuentes externas. Compruebe el valor de format
de SparkSQL en primer lugar para asegurarse de que obtiene el receptor esperado.
connection_type
: tipo de conexión que se va a utilizar, como Amazon S3, Amazon Redshift y JDBC. Los valores válidos sons3
,mysql
,postgresql
,redshift
,sqlserver
,oracle
,kinesis
ykafka
.format
: el formato de SparkSQL que se utilizará (opcional).transformation_ctx
: contexto de transformación que se va a utilizar (opcional).options
: conjunto de pares de nombre-valor que se utilizan para especificar las opciones de conexión. Algunos de valores posibles son:-
user
ypassword
: para autorización -
url
: punto de conexión del almacén de datos -
dbtable
: nombre de la tabla de destino -
bulkSize
: grado de paralelismo para operaciones de inserción
-
Las opciones que puede especificar dependen del tipo de conexión. Consulte Tipos de conexión y opciones para ETL en AWS Glue para Spark para obtener más valores y ejemplos.
Ejemplo:
>>> data_sink = context.getSink("s3") >>> data_sink.setFormat("json"), >>> data_sink.writeFrame(myFrame)
write_dynamic_frame_from_options
write_dynamic_frame_from_options(frame, connection_type, connection_options={}, format=None,
format_options={}, transformation_ctx = "")
Escribe y devuelve un DynamicFrame
mediante la conexión y el formato especificados.
frame
: elDynamicFrame
que se va a escribir.connection_type
: el tipo de conexión, como Amazon S3, Amazon Redshift y JDBC. Los valores válidos sons3
,mysql
,postgresql
,redshift
,sqlserver
,oracle
,kinesis
ykafka
.connection_options
: opciones de conexión, como la tabla de rutas y bases de datos (opcional). Para unconnection_type
des3
, se define una ruta de Amazon S3.connection_options = {"path": "
s3://aws-glue-target/temp
"}Para conexiones JDBC, deben definirse varias propiedades. Tenga en cuenta que el nombre de la base de datos debe ser parte de la URL. Opcionalmente, se puede incluir en las opciones de conexión.
aviso
No se recomienda almacenar las contraseñas en el script. Considere utilizar
boto3
para recuperarlas de AWS Secrets Manager o del catálogo de datos de Glue AWS.connection_options = {"url": "
jdbc-url/database
", "user": "username
", "password":passwordVariable
,"dbtable": "table-name
", "redshiftTmpDir": "s3-tempdir-path
"}La propiedad
dbtable
es el nombre de la tabla de JDBC. Para almacenes de datos de JDBC que admiten esquemas dentro de una base de datos, especifiqueschema.table-name
. Si no se ha proporcionado un esquema, se usa el esquema "public" predeterminado.Para obtener más información, consulte Tipos de conexión y opciones para ETL en AWS Glue para Spark.
format
: una especificación de formato. Se utiliza con una conexión de Amazon S3 o AWS Glue que admite diversos formatos. Consulte en Opciones de formato de datos para las entradas y las salidas en AWS Glue para Spark los formatos que se admiten.format_options
: opciones del formato especificado. Consulte en Opciones de formato de datos para las entradas y las salidas en AWS Glue para Spark los formatos que se admiten.transformation_ctx
: contexto de transformación que se va a utilizar (opcional).
write_from_options
write_from_options(frame_or_dfc, connection_type,
connection_options={}, format={}, format_options={}, transformation_ctx = "")
Escribe y devuelve un DynamicFrame
o DynamicFrameCollection
que se crea con la información sobre la conexión y el formato especificada.
frame_or_dfc
: elDynamicFrame
o laDynamicFrameCollection
que se van a escribir.connection_type
: el tipo de conexión, como Amazon S3, Amazon Redshift y JDBC. Entre los valores válidos se incluyen:s3
,mysql
,postgresql
,redshift
,sqlserver
yoracle
.connection_options
: opciones de conexión, como la tabla de rutas y bases de datos (opcional). Para unconnection_type
des3
, se define una ruta de Amazon S3.connection_options = {"path": "
s3://aws-glue-target/temp
"}Para conexiones JDBC, deben definirse varias propiedades. Tenga en cuenta que el nombre de la base de datos debe ser parte de la URL. Opcionalmente, se puede incluir en las opciones de conexión.
aviso
No se recomienda almacenar las contraseñas en el script. Considere utilizar
boto3
para recuperarlas de AWS Secrets Manager o del catálogo de datos de Glue AWS.connection_options = {"url": "
jdbc-url/database
", "user": "username
", "password":passwordVariable
,"dbtable": "table-name
", "redshiftTmpDir": "s3-tempdir-path
"}La propiedad
dbtable
es el nombre de la tabla de JDBC. Para almacenes de datos de JDBC que admiten esquemas dentro de una base de datos, especifiqueschema.table-name
. Si no se ha proporcionado un esquema, se usa el esquema "public" predeterminado.Para obtener más información, consulte Tipos de conexión y opciones para ETL en AWS Glue para Spark.
format
: una especificación de formato. Se utiliza con una conexión de Amazon S3 o AWS Glue que admite diversos formatos. Consulte en Opciones de formato de datos para las entradas y las salidas en AWS Glue para Spark los formatos que se admiten.format_options
: opciones del formato especificado. Consulte en Opciones de formato de datos para las entradas y las salidas en AWS Glue para Spark los formatos que se admiten.transformation_ctx
: contexto de transformación que se va a utilizar (opcional).
write_dynamic_frame_from_catalog
write_dynamic_frame_from_catalog(frame, database, table_name, redshift_tmp_dir, transformation_ctx = "", additional_options = {}, catalog_id = None)
Escribe y devuelve un DynamicFrame
que se crea mediante una base de datos y una tabla del Catálogo de datos.
frame
: elDynamicFrame
que se va a escribir.Database
: la base de datos del Catálogo de datos que contiene la tabla.table_name
: el nombre de la tabla del Catálogo de datos asociada al destino.redshift_tmp_dir
: directorio provisional de Amazon Redshift que se va a utilizar (opcional).transformation_ctx
: contexto de transformación que se va a utilizar (opcional).-
additional_options
: conjunto de pares nombre-valor opcionales. catalog_id
: ID de catálogo (ID de cuenta) del Catálogo de datos al que se accede. Cuando el valor es Ninguno, se utiliza el ID de cuenta predeterminado del intermediario.
write_data_frame_from_catalog
write_data_frame_from_catalog(frame, database, table_name, redshift_tmp_dir,
transformation_ctx = "", additional_options = {}, catalog_id = None)
Escribe y devuelve un DataFrame
que se crea mediante una base de datos y una tabla del Catálogo de datos. Este método admite la escritura en formatos de lagos de datos (Hudi, Iceberg y Delta Lake). Para obtener más información, consulte Uso de marcos de lagos de datos con trabajos de ETL de AWS Glue.
frame
: elDataFrame
que se va a escribir.Database
: la base de datos del Catálogo de datos que contiene la tabla.table_name
: nombre de la tabla del Catálogo de datos que está asociada al destino.redshift_tmp_dir
: directorio provisional de Amazon Redshift que se va a utilizar (opcional).transformation_ctx
: contexto de transformación que se va a utilizar (opcional).-
additional_options
: conjunto de pares nombre-valor opcionales.-
useSparkDataSink
: cuando se establece en true (verdadero), obliga a AWS Glue a usar la API nativa del receptor de datos de Spark para escribir en la tabla. Cuando habilita esta opción, puede agregar cualquier opción de origen de datos de Sparka additional_options
según sea necesario. AWS Glue pasa estas opciones directamente al escritor de Spark.
-
catalog_id
: el ID de catálogo (ID de cuenta) del Catálogo de datos al que se accede. Si no especifica un valor, se utiliza el ID de cuenta predeterminado del intermediario.
Limitaciones
Tenga en cuenta las siguientes limitaciones cuando utilice la opción useSparkDataSink
:
-
La opción enableUpdateCatalog no se admite cuando se utiliza la opción
useSparkDataSink
.
Ejemplo: escribir una tabla de Hudi con el escritor de orígenes de datos de Spark
hudi_options = { 'useSparkDataSink': True, 'hoodie.table.name':
<table_name>
, 'hoodie.datasource.write.storage.type': 'COPY_ON_WRITE', 'hoodie.datasource.write.recordkey.field': 'product_id', 'hoodie.datasource.write.table.name':<table_name>
, 'hoodie.datasource.write.operation': 'upsert', 'hoodie.datasource.write.precombine.field': 'updated_at', 'hoodie.datasource.write.hive_style_partitioning': 'true', 'hoodie.upsert.shuffle.parallelism': 2, 'hoodie.insert.shuffle.parallelism': 2, 'hoodie.datasource.hive_sync.enable': 'true', 'hoodie.datasource.hive_sync.database':<database_name>
, 'hoodie.datasource.hive_sync.table':<table_name>
, 'hoodie.datasource.hive_sync.use_jdbc': 'false', 'hoodie.datasource.hive_sync.mode': 'hms'} glueContext.write_data_frame.from_catalog( frame =<df_product_inserts>
, database =<database_name>
, table_name =<table_name>
, additional_options = hudi_options )
write_dynamic_frame_from_jdbc_conf
write_dynamic_frame_from_jdbc_conf(frame, catalog_connection, connection_options={},
redshift_tmp_dir = "", transformation_ctx = "", catalog_id = None)
Escribe y devuelve un DynamicFrame
mediante la información de la conexión JDBC especificada.
frame
: elDynamicFrame
que se va a escribir.catalog_connection
: conexión al catálogo que se va a utilizar.connection_options
: opciones de conexión, como la tabla de rutas y bases de datos (opcional). Para obtener más información, consulte Tipos de conexión y opciones para ETL en AWS Glue para Spark.redshift_tmp_dir
: directorio provisional de Amazon Redshift que se va a utilizar (opcional).transformation_ctx
: contexto de transformación que se va a utilizar (opcional).catalog_id
: ID de catálogo (ID de cuenta) del Catálogo de datos al que se accede. Cuando el valor es Ninguno, se utiliza el ID de cuenta predeterminado del intermediario.
write_from_jdbc_conf
write_from_jdbc_conf(frame_or_dfc, catalog_connection, connection_options={}, redshift_tmp_dir = "", transformation_ctx = "", catalog_id = None)
Escribe y devuelve un DynamicFrame
o DynamicFrameCollection
mediante la información de la conexión JDBC especificada.
frame_or_dfc
: elDynamicFrame
o laDynamicFrameCollection
que se van a escribir.catalog_connection
: conexión al catálogo que se va a utilizar.connection_options
: opciones de conexión, como la tabla de rutas y bases de datos (opcional). Para obtener más información, consulte Tipos de conexión y opciones para ETL en AWS Glue para Spark.redshift_tmp_dir
: directorio provisional de Amazon Redshift que se va a utilizar (opcional).transformation_ctx
: contexto de transformación que se va a utilizar (opcional).catalog_id
: ID de catálogo (ID de cuenta) del Catálogo de datos al que se accede. Cuando el valor es Ninguno, se utiliza el ID de cuenta predeterminado del intermediario.