Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.
AWS Glue GlueContext APIs de Scala
Paquete: com.amazonaws.services.glue
class GlueContext extends SQLContext(sc) (
@transient val sc : SparkContext,
val defaultSourcePartitioner : PartitioningStrategy )
GlueContext
es el punto de entrada para leer y escribir un DynamicFrame desde y hacia Amazon Simple Storage Service (Amazon S3), el AWS Glue Data Catalog, JDBC, etc. Esta clase ofrece funciones de utilidades para crear objetos DataSource rasgo y DataSink que, a su vez, se pueden usar para leer y escribir objetos DynamicFrame
.
También se puede utilizar GlueContext
para establecer un número de particiones de destino (el valor predeterminado es 20) en el objeto DynamicFrame
si el número de particiones creadas desde el origen es menor que un umbral mínimo para las particiones (el valor predeterminado es 10).
addIngestionTime
def addIngestionTimeColumns(
df : DataFrame,
timeGranularity : String = "") : dataFrame
Agrega columnas de tiempo de ingesta como ingest_year
, ingest_month
, ingest_day
, ingest_hour
, ingest_minute
al DataFrame
de entrada. Esta función se genera en forma automática en el script generado por AWS Glue cuando especifique una tabla del Catálogo de datos con Amazon S3 como destino. Esta función actualiza en forma automática la partición con columnas de tiempo de ingesta en la tabla de salida. Esto permite que los datos de salida se dividan automáticamente en el tiempo de ingesta sin requerir columnas de tiempo de ingesta explícitas en los datos de entrada.
-
dataFrame
: eldataFrame
al que anexar las columnas de tiempo de ingesta. -
timeGranularity
: la granularidad de las columnas de tiempo. Los valores válidos son “day
”, “hour
” y “minute
”. Por ejemplo, si “hour
” se transfiere a la función, eldataFrame
original tendrá las columnas de tiempo “ingest_year
,” “ingest_month
,” “ingest_day
” y “ingest_hour
” anexadas.
Devuelve el marco de datos después de anexar las columnas de granularidad de tiempo.
Ejemplo:
glueContext.addIngestionTimeColumns(dataFrame, "hour")
def createDataFrameFromOptions
def createDataFrameFromOptions( connectionType : String,
connectionOptions : JsonOptions,
transformationContext : String = "",
format : String = null,
formatOptions : JsonOptions = JsonOptions.empty
) : DataSource
Muestra un DataFrame
que se crea con la conexión y el formato especificados. Utilice esta función sólo con orígenes de streaming de AWS Glue.
connectionType
: el tipo de conexión de streaming. Los valores válidos sonkinesis
ykafka
.-
connectionOptions
: opciones de conexión, que son diferentes para Kinesis y Kafka. Puede encontrar la lista de todas las opciones de conexión para cada origen de datos de streaming en Tipos de conexión y opciones para ETL en AWS Glue. Tenga en cuenta las siguientes diferencias en las opciones de conexión de streaming:-
Los orígenes de streaming de Kinesis requieren
streamARN
,startingPosition
,inferSchema
yclassification
. -
Los orígenes de streaming de Kafka requieren
connectionName
,topicName
,startingOffsets
,inferSchema
yclassification
.
-
transformationContext
: el contexto de transformación que se va a utilizar (opcional).format
: una especificación de formato (opcional). Se utiliza con una conexión de Amazon S3 o AWS Glue que admite diversos formatos. Para obtener información acerca de los formatos soportados, consulte Opciones de formato de datos para las entradas y las salidas en AWS GlueformatOptions
: opciones de formato para el formato especificado. Para obtener información acerca de las opciones de formatos soportados, consulte Opciones de formato de datos.
Ejemplo de origen de streaming de Amazon Kinesis:
val data_frame_datasource0 = glueContext.createDataFrameFromOptions(transformationContext = "datasource0", connectionType = "kinesis", connectionOptions = JsonOptions("""{"streamName": "example_stream", "startingPosition": "TRIM_HORIZON", "inferSchema": "true", "classification": "json"}}"""))
Ejemplo de origen de streaming de Kafka:
val data_frame_datasource0 = glueContext.createDataFrameFromOptions(transformationContext = "datasource0", connectionType = "kafka", connectionOptions = JsonOptions("""{"connectionName": "example_connection", "topicName": "example_topic", "startingPosition": "earliest", "inferSchema": "false", "classification": "json", "schema":"`column1` STRING, `column2` STRING"}"""))
forEachBatch
forEachBatch(frame, batch_function, options)
Se aplica la batch_function
transferida a cada microlote que se lee desde el origen de streaming.
-
frame
— El DataFrame que contiene el microlote actual. -
batch_function
: una función que se aplicará para cada microlote. -
options
: una recopilación de pares clave-valor que contiene información sobre cómo procesar microlotes. Se requieren las siguientes opciones:-
windowSize
: cantidad de tiempo que se debe dedicar al procesamiento de cada lote. -
checkpointLocation
: la ubicación donde se almacenan los puntos de verificación para el trabajo de ETL de streaming. -
batchMaxRetries
: número máximo de reintentos permitidos para este lote si se genera un error. El valor predeterminado es 3. Esta opción sólo se puede configurar para Glue versión 2.0 y superior.
-
Ejemplo:
glueContext.forEachBatch(data_frame_datasource0, (dataFrame: Dataset[Row], batchId: Long) => { if (dataFrame.count() > 0) { val datasource0 = DynamicFrame(glueContext.addIngestionTimeColumns(dataFrame, "hour"), glueContext) // @type: DataSink // @args: [database = "tempdb", table_name = "fromoptionsoutput", stream_batch_time = "100 seconds", // stream_checkpoint_location = "s3://from-options-testing-eu-central-1/fromOptionsOutput/checkpoint/", // transformation_ctx = "datasink1"] // @return: datasink1 // @inputs: [frame = datasource0] val options_datasink1 = JsonOptions( Map("partitionKeys" -> Seq("ingest_year", "ingest_month","ingest_day", "ingest_hour"), "enableUpdateCatalog" -> true)) val datasink1 = glueContext.getCatalogSink( database = "tempdb", tableName = "fromoptionsoutput", redshiftTmpDir = "", transformationContext = "datasink1", additionalOptions = options_datasink1).writeDynamicFrame(datasource0) } }, JsonOptions("""{"windowSize" : "100 seconds", "checkpointLocation" : "s3://from-options-testing-eu-central-1/fromOptionsOutput/checkpoint/"}"""))
getCatalogSink
def getCatalogSink( database : String,
tableName : String,
redshiftTmpDir : String = "",
transformationContext : String = ""
additionalOptions: JsonOptions = JsonOptions.empty,
catalogId: String = null
) : DataSink
Crea un DataSink que escribe en una ubicación especificada en una tabla definida en el Data Catalog.
database
: nombre de la base de datos en el Data Catalog.tableName
: nombre de la tabla en el Data Catalog.redshiftTmpDir
: directorio provisional que se usará con determinados receptores de datos. Se establece en un valor vacío de forma predeterminada.transformationContext
: contexto de transformación asociado con el receptor que utilizarán los marcadores de trabajo. Se establece en un valor vacío de forma predeterminada.additionalOptions
: opciones adicionales para AWS Glue.catalogId
: ID de catálogo (ID de cuenta) del Data Catalog al que se accede. Cuando el valor es nulo, se utiliza el ID de cuenta predeterminado del intermediario.
Devuelve el DataSink
.
getCatalogSource
def getCatalogSource( database : String,
tableName : String,
redshiftTmpDir : String = "",
transformationContext : String = ""
pushDownPredicate : String = " "
additionalOptions: JsonOptions = JsonOptions.empty,
catalogId: String = null
) : DataSource
Crea un objeto DataSource rasgo que lee datos de una definición de tabla en el Data Catalog.
database
: nombre de la base de datos en el Data Catalog.tableName
: nombre de la tabla en el Data Catalog.redshiftTmpDir
: directorio provisional que se usará con determinados receptores de datos. Se establece en un valor vacío de forma predeterminada.transformationContext
: contexto de transformación asociado con el receptor que utilizarán los marcadores de trabajo. Se establece en un valor vacío de forma predeterminada.pushDownPredicate
: filtra particiones sin tener que enumerar y leer todos los archivos del conjunto de datos. Para obtener más información, consulte Filtrado previo con predicados de inserción.additionalOptions
: conjunto de pares nombre-valor opcionales. Las opciones posibles incluyen las enumeradas en Tipos de conexión y opciones para ETL en AWS Glue, excepto porendpointUrl
,streamName
,bootstrap.servers
,security.protocol
,topicName
,classification
ydelimiter
. Otra opción soportada escatalogPartitionPredicate
:catalogPartitionPredicate
: puede transferir una expresión de catálogo para filtrar en función de las columnas de índice. Esto inserta el filtrado hacia el lado del servidor. Para obtener más información, consulte Índices de partición de AWS Glue. Tenga en cuenta quepush_down_predicate
ycatalogPartitionPredicate
utilizan sintaxis diferentes. El primero utiliza la sintaxis estándar de Spark SQL y el segundo utiliza el analizador JSQL.catalogId
: ID de catálogo (ID de cuenta) del Data Catalog al que se accede. Cuando el valor es nulo, se utiliza el ID de cuenta predeterminado del intermediario.
Devuelve el DataSource
.
Ejemplo de origen de streaming
val data_frame_datasource0 = glueContext.getCatalogSource( database = "tempdb", tableName = "test-stream-input", redshiftTmpDir = "", transformationContext = "datasource0", additionalOptions = JsonOptions("""{ "startingPosition": "TRIM_HORIZON", "inferSchema": "false"}""") ).getDataFrame()
def getJDBCSink
def getJDBCSink( catalogConnection : String,
options : JsonOptions,
redshiftTmpDir : String = "",
transformationContext : String = "",
catalogId: String = null
) : DataSink
Crea un DataSink que escribe en una base de datos JDBC especificada en un objeto Connection
en el Data Catalog. El objeto Connection
tiene información para conectarse a un receptor de JDBC, incluida la URL, el nombre de usuario, la contraseña, la VPC, la subred y los grupos de seguridad.
catalogConnection
: nombre de la conexión en el Data Catalog que contiene la URL de JDBC en la que se va a escribir.options
: cadena de pares de nombre-valor de JSON que proporcionan información adicional que se exige para escribir en un almacén de datos JDBC. Esto incluye:dbtable (obligatorio): nombre de la tabla de JDBC. Para almacenes de datos de JDBC que admiten esquemas dentro de una base de datos, especifique
schema.table-name
. Si no se ha proporcionado un esquema, se usa el esquema "public" predeterminado. El siguiente ejemplo muestra un parámetro de opciones que apunta a un esquema llamadotest
y a una tabla llamadatest_table
en la base de datostest_db
.options = JsonOptions("""{"dbtable": "test.test_table", "database": "test_db"}""")
database (obligatorio): nombre de la base de datos de JDBC.
Todas las opciones adicionales transferidas directamente al escritor SparkSQL de JDBC. Para obtener más información, consulte el artículo acerca del origen de datos Redshift de Spark
.
redshiftTmpDir
: directorio provisional que se usará con determinados receptores de datos. Se establece en un valor vacío de forma predeterminada.transformationContext
: contexto de transformación asociado con el receptor que utilizarán los marcadores de trabajo. Se establece en un valor vacío de forma predeterminada.catalogId
: ID de catálogo (ID de cuenta) del Data Catalog al que se accede. Cuando el valor es nulo, se utiliza el ID de cuenta predeterminado del intermediario.
Código de ejemplo:
getJDBCSink(catalogConnection = "my-connection-name", options = JsonOptions("""{"dbtable": "my-jdbc-table", "database": "my-jdbc-db"}"""), redshiftTmpDir = "", transformationContext = "datasink4")
Devuelve el DataSink
.
def getSink
def getSink( connectionType : String,
connectionOptions : JsonOptions,
transformationContext : String = ""
) : DataSink
Crea una DataSink que escribe datos en un destino como Amazon Simple Storage Service (Amazon S3), JDBC o AWS Glue Data Catalog.
-
connectionType
— Tipo de la conexión. Consulte Tipos de conexión y opciones para ETL en AWS Glue. -
connectionOptions
: una cadena de pares de nombre-valor de JSON que proporcionan información adicional para establecer la conexión con el receptor de datos. Consulte Tipos de conexión y opciones para ETL en AWS Glue. -
transformationContext
: contexto de transformación asociado con el receptor que utilizarán los marcadores de trabajo. Se establece en un valor vacío de forma predeterminada.
Devuelve el DataSink
.
getSinkWithFormato def de Decimalo
def getSinkWithFormat( connectionType : String,
options : JsonOptions,
transformationContext : String = "",
format : String = null,
formatOptions : JsonOptions = JsonOptions.empty
) : DataSink
Crea un objeto DataSink que escribe datos en un destino como Amazon S3, JDBC o Data Catalog y también establece el formato de los datos que se van a escribir en el destino.
connectionType
— Tipo de la conexión. Consulte Tipos de conexión y opciones para ETL en AWS Glue.-
options
: cadena de pares de nombre-valor de JSON que proporcionan información adicional para establecer una conexión con el receptor de datos. Consulte Tipos de conexión y opciones para ETL en AWS Glue. transformationContext
: contexto de transformación asociado con el receptor que utilizarán los marcadores de trabajo. Se establece en un valor vacío de forma predeterminada.format
: formato de los datos que se escribirán en el destino.formatOptions
: una cadena de pares de nombre-valor de JSON que proporcionan opciones adicionales para el formato de los datos en el destino. Consulte Opciones de formato de datos.
Devuelve el DataSink
.
def getSource
def getSource( connectionType : String,
connectionOptions : JsonOptions,
transformationContext : String = ""
pushDownPredicate
) : DataSource
Crea una DataSource rasgo que lee datos de un origen como Amazon S3, JDBC o AWS Glue Data Catalog. También soporta orígenes de datos de streaming de Kafka y Kinesis.
connectionType
: tipo de origen de datos. Consulte Tipos de conexión y opciones para ETL en AWS Glue.-
connectionOptions
: una cadena de pares de nombre-valor de JSON que proporcionan información adicional para establecer una conexión con el origen de datos. Para obtener más información, consulte Tipos de conexión y opciones para ETL en AWS Glue.Un origen de streaming de Kinesis requiere las siguientes opciones de conexión:
streamARN
,startingPosition
,inferSchema
yclassification
.Un origen de streaming de Kafka requiere las siguientes opciones de conexión:
connectionName
,topicName
,startingOffsets
,inferSchema
yclassification
. transformationContext
: contexto de transformación asociado con el receptor que utilizarán los marcadores de trabajo. Se establece en un valor vacío de forma predeterminada.pushDownPredicate
: predicado en columnas de partición.
Devuelve el DataSource
.
Ejemplo de origen de streaming de Amazon Kinesis:
val kinesisOptions = jsonOptions() data_frame_datasource0 = glueContext.getSource("kinesis", kinesisOptions).getDataFrame() private def jsonOptions(): JsonOptions = { new JsonOptions( s"""{"streamARN": "arn:aws:kinesis:eu-central-1:123456789012:stream/fromOptionsStream", |"startingPosition": "TRIM_HORIZON", |"inferSchema": "true", |"classification": "json"}""".stripMargin) }
Ejemplo de origen de streaming de Kafka:
val kafkaOptions = jsonOptions() val data_frame_datasource0 = glueContext.getSource("kafka", kafkaOptions).getDataFrame() private def jsonOptions(): JsonOptions = { new JsonOptions( s"""{"connectionName": "ConfluentKafka", |"topicName": "kafka-auth-topic", |"startingOffsets": "earliest", |"inferSchema": "true", |"classification": "json"}""".stripMargin) }
getSourceWithFormato def de Decimalo
def getSourceWithFormat( connectionType : String,
options : JsonOptions,
transformationContext : String = "",
format : String = null,
formatOptions : JsonOptions = JsonOptions.empty
) : DataSource
Crea una DataSource rasgo que lee datos de un origen como Amazon S3, JDBC o AWS Glue Data Catalog y también establece el formato de los datos almacenados en el origen.
connectionType
: tipo de origen de datos. Consulte Tipos de conexión y opciones para ETL en AWS Glue.-
options
: una cadena de pares de nombre-valor de JSON que proporciona información adicional para establecer una conexión con el origen de datos. Consulte Tipos de conexión y opciones para ETL en AWS Glue. transformationContext
: contexto de transformación asociado con el receptor que utilizarán los marcadores de flujo de trabajo. Se establece en un valor vacío de forma predeterminada.format
: formato de los datos que se almacenan en el origen. Cuando elconnectionType
es "s3", también puede especificarformat
. Puede ser "avro", "csv", "grokLog", "ion", "json", "xml", "parquet" u "orco".formatOptions
: una cadena de pares de nombre-valor de JSON que proporciona opciones adicionales para analizar los datos en el origen. Consulte Opciones de formato de datos.
Devuelve el DataSource
.
Ejemplos
Cree un DynamicFrame a partir de un origen de datos que sea un archivo de valores separados por comas (CSV) en Amazon S3:
val datasource0 = glueContext.getSourceWithFormat( connectionType="s3", options =JsonOptions(s"""{"paths": [ "
s3://csv/nycflights.csv
"]}"""), transformationContext = "datasource0", format = "csv", formatOptions=JsonOptions(s"""{"withHeader":"true","separator": ","}""") ).getDynamicFrame()
Cree un DynamicFrame a partir de un origen de datos que es un PostgreSQL mediante una conexión JDBC:
val datasource0 = glueContext.getSourceWithFormat( connectionType="postgresql", options =JsonOptions(s"""{ "url":"jdbc:postgresql://
databasePostgres-1
.rds.amazonaws.com:5432
/testdb
", "dbtable": "public.company
", "redshiftTmpDir":"", "user":"username
", "password":"password123
" }"""), transformationContext = "datasource0").getDynamicFrame()
Cree un DynamicFrame a partir de un origen de datos que es un MySQL usando una conexión JDBC:
val datasource0 = glueContext.getSourceWithFormat( connectionType="mysql", options =JsonOptions(s"""{ "url":"jdbc:mysql://
databaseMysql-1
.rds.amazonaws.com:3306
/testdb
", "dbtable": "athenatest_nycflights13_csv
", "redshiftTmpDir":"", "user":"username
", "password":"password123
" }"""), transformationContext = "datasource0").getDynamicFrame()
getSparkSession
def getSparkSession : SparkSession
Obtiene el objeto SparkSession
asociado a esta instancia de GlueContext. Utilice este SparkSession objeto para registrar tablas y UDF para utilizarlas con lasDataFrame
creadas desde DynamicFrames.
Devuelve el SparkSession.
Definición de startTransaction (iniciar transacción)
def startTransaction(readOnly: Boolean):String
Inicia una nueva transacción. Llama de forma interna a la API startTransaction de Lake Formation.
readOnly
: (booleano) indica si esta transacción debe ser de solo lectura o de lectura y escritura. Se rechazarán las escrituras realizadas con un ID de transacción de solo lectura. No es necesario confirmar las transacciones de solo lectura.
Devuelve el ID de la transacción.
Definición de commitTransaction (confirmar transacción)
def commitTransaction(transactionId: String, waitForCommit: Boolean): Boolean
Intenta confirmar la transacción especificada. Es posible que se devuelva commitTransaction
antes de que la transacción haya terminado de confirmarse. Llama de forma interna a la API commitTransaction de Lake Formation.
transactionId
: (cadena) la transacción que se confirmará.waitForCommit
: (booleano) determina si se devuelvecommitTransaction
de inmediato. El valor predeterminado es true. Si es falso,commitTransaction
realiza un sondeo y espera hasta que la transacción se haya confirmado. La cantidad de tiempo de espera se limita a un minuto mediante retroceso exponencial con un máximo de seis reintentos.
Devuelve un valor booleano para indicar si se realizó o no la confirmación.
Definición de cancelTransaction (cancelar transacción)
def cancelTransaction(transactionId: String): Unit
Intenta cancelar la transacción especificada. Llama de forma interna a la CancelTransactionAPI Lake Formation.
transactionId
: (cadena) la transacción que se cancelará.
Devuelve una excepción de TransactionCommittedException
si la transacción se había confirmado con anterioridad.
def this
def this( sc : SparkContext,
minPartitions : Int,
targetPartitions : Int )
Crea un objeto GlueContext
con el objeto SparkContext
especificado, particiones mínimas y particiones de destino.
sc
— LaSparkContext
.minPartitions
: número mínimo de particiones.targetPartitions
: número de particiones de destino.
Devuelve el GlueContext
.
def this
def this( sc : SparkContext )
Crear un objeto GlueContext
con el SparkContext
proporcionado. Establece las particiones mínimas en 10 y las particiones de destino en 20.
sc
— LaSparkContext
.
Devuelve el GlueContext
.
def this
def this( sparkContext : JavaSparkContext )
Crear un objeto GlueContext
con el JavaSparkContext
proporcionado. Establece las particiones mínimas en 10 y las particiones de destino en 20.
sparkContext
— LaJavaSparkContext
.
Devuelve el GlueContext
.