AWS Glue GlueContext APIs de Scala - AWS Glue

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

AWS Glue GlueContext APIs de Scala

Paquete: com.amazonaws.services.glue

class GlueContext extends SQLContext(sc) ( @transient val sc : SparkContext, val defaultSourcePartitioner : PartitioningStrategy )

GlueContext es el punto de entrada para leer y escribir un DynamicFrame desde y hacia Amazon Simple Storage Service (Amazon S3), el AWS Glue Data Catalog, JDBC, etc. Esta clase ofrece funciones de utilidades para crear objetos DataSource rasgo y DataSink que, a su vez, se pueden usar para leer y escribir objetos DynamicFrame.

También se puede utilizar GlueContext para establecer un número de particiones de destino (el valor predeterminado es 20) en el objeto DynamicFrame si el número de particiones creadas desde el origen es menor que un umbral mínimo para las particiones (el valor predeterminado es 10).

addIngestionTime

def addIngestionTimeColumns( df : DataFrame, timeGranularity : String = "") : dataFrame

Agrega columnas de tiempo de ingesta como ingest_year, ingest_month, ingest_day, ingest_hour, ingest_minute al DataFrame de entrada. Esta función se genera en forma automática en el script generado por AWS Glue cuando especifique una tabla del Catálogo de datos con Amazon S3 como destino. Esta función actualiza en forma automática la partición con columnas de tiempo de ingesta en la tabla de salida. Esto permite que los datos de salida se dividan automáticamente en el tiempo de ingesta sin requerir columnas de tiempo de ingesta explícitas en los datos de entrada.

  • dataFrame: el dataFrame al que anexar las columnas de tiempo de ingesta.

  • timeGranularity: la granularidad de las columnas de tiempo. Los valores válidos son “day”, “hour” y “minute”. Por ejemplo, si “hour” se transfiere a la función, el dataFrame original tendrá las columnas de tiempo “ingest_year,” “ingest_month,” “ingest_day” y “ingest_hour” anexadas.

Devuelve el marco de datos después de anexar las columnas de granularidad de tiempo.

Ejemplo:

glueContext.addIngestionTimeColumns(dataFrame, "hour")

def createDataFrameFromOptions

def createDataFrameFromOptions( connectionType : String, connectionOptions : JsonOptions, transformationContext : String = "", format : String = null, formatOptions : JsonOptions = JsonOptions.empty ) : DataSource

Muestra un DataFrame que se crea con la conexión y el formato especificados. Utilice esta función sólo con orígenes de streaming de AWS Glue.

  • connectionType: el tipo de conexión de streaming. Los valores válidos son kinesis y kafka.

  • connectionOptions: opciones de conexión, que son diferentes para Kinesis y Kafka. Puede encontrar la lista de todas las opciones de conexión para cada origen de datos de streaming en Tipos de conexión y opciones para ETL en AWS Glue. Tenga en cuenta las siguientes diferencias en las opciones de conexión de streaming:

    • Los orígenes de streaming de Kinesis requieren streamARN, startingPosition, inferSchema y classification.

    • Los orígenes de streaming de Kafka requieren connectionName, topicName, startingOffsets, inferSchema y classification.

  • transformationContext: el contexto de transformación que se va a utilizar (opcional).

  • format: una especificación de formato (opcional). Se utiliza con una conexión de Amazon S3 o AWS Glue que admite diversos formatos. Para obtener información acerca de los formatos soportados, consulte Opciones de formato de datos para las entradas y las salidas en AWS Glue

  • formatOptions: opciones de formato para el formato especificado. Para obtener información acerca de las opciones de formatos soportados, consulte Opciones de formato de datos.

Ejemplo de origen de streaming de Amazon Kinesis:

val data_frame_datasource0 = glueContext.createDataFrameFromOptions(transformationContext = "datasource0", connectionType = "kinesis", connectionOptions = JsonOptions("""{"streamName": "example_stream", "startingPosition": "TRIM_HORIZON", "inferSchema": "true", "classification": "json"}}"""))

Ejemplo de origen de streaming de Kafka:

val data_frame_datasource0 = glueContext.createDataFrameFromOptions(transformationContext = "datasource0", connectionType = "kafka", connectionOptions = JsonOptions("""{"connectionName": "example_connection", "topicName": "example_topic", "startingPosition": "earliest", "inferSchema": "false", "classification": "json", "schema":"`column1` STRING, `column2` STRING"}"""))

forEachBatch

forEachBatch(frame, batch_function, options)

Se aplica la batch_function transferida a cada microlote que se lee desde el origen de streaming.

  • frame— El DataFrame que contiene el microlote actual.

  • batch_function: una función que se aplicará para cada microlote.

  • options: una recopilación de pares clave-valor que contiene información sobre cómo procesar microlotes. Se requieren las siguientes opciones:

    • windowSize: cantidad de tiempo que se debe dedicar al procesamiento de cada lote.

    • checkpointLocation: la ubicación donde se almacenan los puntos de verificación para el trabajo de ETL de streaming.

    • batchMaxRetries: número máximo de reintentos permitidos para este lote si se genera un error. El valor predeterminado es 3. Esta opción sólo se puede configurar para Glue versión 2.0 y superior.

Ejemplo:

glueContext.forEachBatch(data_frame_datasource0, (dataFrame: Dataset[Row], batchId: Long) => { if (dataFrame.count() > 0) { val datasource0 = DynamicFrame(glueContext.addIngestionTimeColumns(dataFrame, "hour"), glueContext) // @type: DataSink // @args: [database = "tempdb", table_name = "fromoptionsoutput", stream_batch_time = "100 seconds", // stream_checkpoint_location = "s3://from-options-testing-eu-central-1/fromOptionsOutput/checkpoint/", // transformation_ctx = "datasink1"] // @return: datasink1 // @inputs: [frame = datasource0] val options_datasink1 = JsonOptions( Map("partitionKeys" -> Seq("ingest_year", "ingest_month","ingest_day", "ingest_hour"), "enableUpdateCatalog" -> true)) val datasink1 = glueContext.getCatalogSink( database = "tempdb", tableName = "fromoptionsoutput", redshiftTmpDir = "", transformationContext = "datasink1", additionalOptions = options_datasink1).writeDynamicFrame(datasource0) } }, JsonOptions("""{"windowSize" : "100 seconds", "checkpointLocation" : "s3://from-options-testing-eu-central-1/fromOptionsOutput/checkpoint/"}"""))

getCatalogSink

def getCatalogSink( database : String, tableName : String, redshiftTmpDir : String = "", transformationContext : String = "" additionalOptions: JsonOptions = JsonOptions.empty, catalogId: String = null ) : DataSink

Crea un DataSink que escribe en una ubicación especificada en una tabla definida en el Data Catalog.

  • database: nombre de la base de datos en el Data Catalog.

  • tableName: nombre de la tabla en el Data Catalog.

  • redshiftTmpDir: directorio provisional que se usará con determinados receptores de datos. Se establece en un valor vacío de forma predeterminada.

  • transformationContext: contexto de transformación asociado con el receptor que utilizarán los marcadores de trabajo. Se establece en un valor vacío de forma predeterminada.

  • additionalOptions: opciones adicionales para AWS Glue.

  • catalogId: ID de catálogo (ID de cuenta) del Data Catalog al que se accede. Cuando el valor es nulo, se utiliza el ID de cuenta predeterminado del intermediario.

Devuelve el DataSink.

getCatalogSource

def getCatalogSource( database : String, tableName : String, redshiftTmpDir : String = "", transformationContext : String = "" pushDownPredicate : String = " " additionalOptions: JsonOptions = JsonOptions.empty, catalogId: String = null ) : DataSource

Crea un objeto DataSource rasgo que lee datos de una definición de tabla en el Data Catalog.

  • database: nombre de la base de datos en el Data Catalog.

  • tableName: nombre de la tabla en el Data Catalog.

  • redshiftTmpDir: directorio provisional que se usará con determinados receptores de datos. Se establece en un valor vacío de forma predeterminada.

  • transformationContext: contexto de transformación asociado con el receptor que utilizarán los marcadores de trabajo. Se establece en un valor vacío de forma predeterminada.

  • pushDownPredicate: filtra particiones sin tener que enumerar y leer todos los archivos del conjunto de datos. Para obtener más información, consulte Filtrado previo con predicados de inserción.

  • additionalOptions: conjunto de pares nombre-valor opcionales. Las opciones posibles incluyen las enumeradas en Tipos de conexión y opciones para ETL en AWS Glue, excepto por endpointUrl, streamName, bootstrap.servers, security.protocol, topicName, classification y delimiter. Otra opción soportada es catalogPartitionPredicate:

    catalogPartitionPredicate: puede transferir una expresión de catálogo para filtrar en función de las columnas de índice. Esto inserta el filtrado hacia el lado del servidor. Para obtener más información, consulte Índices de partición de AWS Glue. Tenga en cuenta que push_down_predicate y catalogPartitionPredicate utilizan sintaxis diferentes. El primero utiliza la sintaxis estándar de Spark SQL y el segundo utiliza el analizador JSQL.

  • catalogId: ID de catálogo (ID de cuenta) del Data Catalog al que se accede. Cuando el valor es nulo, se utiliza el ID de cuenta predeterminado del intermediario.

Devuelve el DataSource.

Ejemplo de origen de streaming

val data_frame_datasource0 = glueContext.getCatalogSource( database = "tempdb", tableName = "test-stream-input", redshiftTmpDir = "", transformationContext = "datasource0", additionalOptions = JsonOptions("""{ "startingPosition": "TRIM_HORIZON", "inferSchema": "false"}""") ).getDataFrame()

def getJDBCSink

def getJDBCSink( catalogConnection : String, options : JsonOptions, redshiftTmpDir : String = "", transformationContext : String = "", catalogId: String = null ) : DataSink

Crea un DataSink que escribe en una base de datos JDBC especificada en un objeto Connection en el Data Catalog. El objeto Connection tiene información para conectarse a un receptor de JDBC, incluida la URL, el nombre de usuario, la contraseña, la VPC, la subred y los grupos de seguridad.

  • catalogConnection: nombre de la conexión en el Data Catalog que contiene la URL de JDBC en la que se va a escribir.

  • options: cadena de pares de nombre-valor de JSON que proporcionan información adicional que se exige para escribir en un almacén de datos JDBC. Esto incluye:

    • dbtable (obligatorio): nombre de la tabla de JDBC. Para almacenes de datos de JDBC que admiten esquemas dentro de una base de datos, especifique schema.table-name. Si no se ha proporcionado un esquema, se usa el esquema "public" predeterminado. El siguiente ejemplo muestra un parámetro de opciones que apunta a un esquema llamado test y a una tabla llamada test_table en la base de datos test_db.

      options = JsonOptions("""{"dbtable": "test.test_table", "database": "test_db"}""")
    • database (obligatorio): nombre de la base de datos de JDBC.

    • Todas las opciones adicionales transferidas directamente al escritor SparkSQL de JDBC. Para obtener más información, consulte el artículo acerca del origen de datos Redshift de Spark.

  • redshiftTmpDir: directorio provisional que se usará con determinados receptores de datos. Se establece en un valor vacío de forma predeterminada.

  • transformationContext: contexto de transformación asociado con el receptor que utilizarán los marcadores de trabajo. Se establece en un valor vacío de forma predeterminada.

  • catalogId: ID de catálogo (ID de cuenta) del Data Catalog al que se accede. Cuando el valor es nulo, se utiliza el ID de cuenta predeterminado del intermediario.

Código de ejemplo:

getJDBCSink(catalogConnection = "my-connection-name", options = JsonOptions("""{"dbtable": "my-jdbc-table", "database": "my-jdbc-db"}"""), redshiftTmpDir = "", transformationContext = "datasink4")

Devuelve el DataSink.

def getSink

def getSink( connectionType : String, connectionOptions : JsonOptions, transformationContext : String = "" ) : DataSink

Crea una DataSink que escribe datos en un destino como Amazon Simple Storage Service (Amazon S3), JDBC o AWS Glue Data Catalog.

  • connectionType — Tipo de la conexión. Consulte Tipos de conexión y opciones para ETL en AWS Glue.

  • connectionOptions: una cadena de pares de nombre-valor de JSON que proporcionan información adicional para establecer la conexión con el receptor de datos. Consulte Tipos de conexión y opciones para ETL en AWS Glue.

  • transformationContext: contexto de transformación asociado con el receptor que utilizarán los marcadores de trabajo. Se establece en un valor vacío de forma predeterminada.

Devuelve el DataSink.

getSinkWithFormato def de Decimalo

def getSinkWithFormat( connectionType : String, options : JsonOptions, transformationContext : String = "", format : String = null, formatOptions : JsonOptions = JsonOptions.empty ) : DataSink

Crea un objeto DataSink que escribe datos en un destino como Amazon S3, JDBC o Data Catalog y también establece el formato de los datos que se van a escribir en el destino.

  • connectionType — Tipo de la conexión. Consulte Tipos de conexión y opciones para ETL en AWS Glue.

  • options: cadena de pares de nombre-valor de JSON que proporcionan información adicional para establecer una conexión con el receptor de datos. Consulte Tipos de conexión y opciones para ETL en AWS Glue.

  • transformationContext: contexto de transformación asociado con el receptor que utilizarán los marcadores de trabajo. Se establece en un valor vacío de forma predeterminada.

  • format: formato de los datos que se escribirán en el destino.

  • formatOptions: una cadena de pares de nombre-valor de JSON que proporcionan opciones adicionales para el formato de los datos en el destino. Consulte Opciones de formato de datos.

Devuelve el DataSink.

def getSource

def getSource( connectionType : String, connectionOptions : JsonOptions, transformationContext : String = "" pushDownPredicate ) : DataSource

Crea una DataSource rasgo que lee datos de un origen como Amazon S3, JDBC o AWS Glue Data Catalog. También soporta orígenes de datos de streaming de Kafka y Kinesis.

  • connectionType: tipo de origen de datos. Consulte Tipos de conexión y opciones para ETL en AWS Glue.

  • connectionOptions: una cadena de pares de nombre-valor de JSON que proporcionan información adicional para establecer una conexión con el origen de datos. Para obtener más información, consulte Tipos de conexión y opciones para ETL en AWS Glue.

    Un origen de streaming de Kinesis requiere las siguientes opciones de conexión: streamARN, startingPosition, inferSchema y classification.

    Un origen de streaming de Kafka requiere las siguientes opciones de conexión: connectionName, topicName, startingOffsets, inferSchema y classification.

  • transformationContext: contexto de transformación asociado con el receptor que utilizarán los marcadores de trabajo. Se establece en un valor vacío de forma predeterminada.

  • pushDownPredicate: predicado en columnas de partición.

Devuelve el DataSource.

Ejemplo de origen de streaming de Amazon Kinesis:

val kinesisOptions = jsonOptions() data_frame_datasource0 = glueContext.getSource("kinesis", kinesisOptions).getDataFrame() private def jsonOptions(): JsonOptions = { new JsonOptions( s"""{"streamARN": "arn:aws:kinesis:eu-central-1:123456789012:stream/fromOptionsStream", |"startingPosition": "TRIM_HORIZON", |"inferSchema": "true", |"classification": "json"}""".stripMargin) }

Ejemplo de origen de streaming de Kafka:

val kafkaOptions = jsonOptions() val data_frame_datasource0 = glueContext.getSource("kafka", kafkaOptions).getDataFrame() private def jsonOptions(): JsonOptions = { new JsonOptions( s"""{"connectionName": "ConfluentKafka", |"topicName": "kafka-auth-topic", |"startingOffsets": "earliest", |"inferSchema": "true", |"classification": "json"}""".stripMargin) }

getSourceWithFormato def de Decimalo

def getSourceWithFormat( connectionType : String, options : JsonOptions, transformationContext : String = "", format : String = null, formatOptions : JsonOptions = JsonOptions.empty ) : DataSource

Crea una DataSource rasgo que lee datos de un origen como Amazon S3, JDBC o AWS Glue Data Catalog y también establece el formato de los datos almacenados en el origen.

  • connectionType: tipo de origen de datos. Consulte Tipos de conexión y opciones para ETL en AWS Glue.

  • options: una cadena de pares de nombre-valor de JSON que proporciona información adicional para establecer una conexión con el origen de datos. Consulte Tipos de conexión y opciones para ETL en AWS Glue.

  • transformationContext: contexto de transformación asociado con el receptor que utilizarán los marcadores de flujo de trabajo. Se establece en un valor vacío de forma predeterminada.

  • format: formato de los datos que se almacenan en el origen. Cuando el connectionType es "s3", también puede especificar format. Puede ser "avro", "csv", "grokLog", "ion", "json", "xml", "parquet" u "orco".

  • formatOptions: una cadena de pares de nombre-valor de JSON que proporciona opciones adicionales para analizar los datos en el origen. Consulte Opciones de formato de datos.

Devuelve el DataSource.

Ejemplos

Cree un DynamicFrame a partir de un origen de datos que sea un archivo de valores separados por comas (CSV) en Amazon S3:

val datasource0 = glueContext.getSourceWithFormat( connectionType="s3", options =JsonOptions(s"""{"paths": [ "s3://csv/nycflights.csv"]}"""), transformationContext = "datasource0", format = "csv", formatOptions=JsonOptions(s"""{"withHeader":"true","separator": ","}""") ).getDynamicFrame()

Cree un DynamicFrame a partir de un origen de datos que es un PostgreSQL mediante una conexión JDBC:

val datasource0 = glueContext.getSourceWithFormat( connectionType="postgresql", options =JsonOptions(s"""{ "url":"jdbc:postgresql://databasePostgres-1.rds.amazonaws.com:5432/testdb", "dbtable": "public.company", "redshiftTmpDir":"", "user":"username", "password":"password123" }"""), transformationContext = "datasource0").getDynamicFrame()

Cree un DynamicFrame a partir de un origen de datos que es un MySQL usando una conexión JDBC:

val datasource0 = glueContext.getSourceWithFormat( connectionType="mysql", options =JsonOptions(s"""{ "url":"jdbc:mysql://databaseMysql-1.rds.amazonaws.com:3306/testdb", "dbtable": "athenatest_nycflights13_csv", "redshiftTmpDir":"", "user":"username", "password":"password123" }"""), transformationContext = "datasource0").getDynamicFrame()

getSparkSession

def getSparkSession : SparkSession

Obtiene el objeto SparkSession asociado a esta instancia de GlueContext. Utilice este SparkSession objeto para registrar tablas y UDF para utilizarlas con lasDataFrame creadas desde DynamicFrames.

Devuelve el SparkSession.

Definición de startTransaction (iniciar transacción)

def startTransaction(readOnly: Boolean):String

Inicia una nueva transacción. Llama de forma interna a la API startTransaction de Lake Formation.

  • readOnly: (booleano) indica si esta transacción debe ser de solo lectura o de lectura y escritura. Se rechazarán las escrituras realizadas con un ID de transacción de solo lectura. No es necesario confirmar las transacciones de solo lectura.

Devuelve el ID de la transacción.

Definición de commitTransaction (confirmar transacción)

def commitTransaction(transactionId: String, waitForCommit: Boolean): Boolean

Intenta confirmar la transacción especificada. Es posible que se devuelva commitTransaction antes de que la transacción haya terminado de confirmarse. Llama de forma interna a la API commitTransaction de Lake Formation.

  • transactionId: (cadena) la transacción que se confirmará.

  • waitForCommit: (booleano) determina si se devuelve commitTransaction de inmediato. El valor predeterminado es true. Si es falso, commitTransaction realiza un sondeo y espera hasta que la transacción se haya confirmado. La cantidad de tiempo de espera se limita a un minuto mediante retroceso exponencial con un máximo de seis reintentos.

Devuelve un valor booleano para indicar si se realizó o no la confirmación.

Definición de cancelTransaction (cancelar transacción)

def cancelTransaction(transactionId: String): Unit

Intenta cancelar la transacción especificada. Llama de forma interna a la CancelTransactionAPI Lake Formation.

  • transactionId: (cadena) la transacción que se cancelará.

Devuelve una excepción de TransactionCommittedException si la transacción se había confirmado con anterioridad.

def this

def this( sc : SparkContext, minPartitions : Int, targetPartitions : Int )

Crea un objeto GlueContext con el objeto SparkContext especificado, particiones mínimas y particiones de destino.

  • sc — La SparkContext.

  • minPartitions: número mínimo de particiones.

  • targetPartitions: número de particiones de destino.

Devuelve el GlueContext.

def this

def this( sc : SparkContext )

Crear un objeto GlueContext con el SparkContext proporcionado. Establece las particiones mínimas en 10 y las particiones de destino en 20.

  • sc — La SparkContext.

Devuelve el GlueContext.

def this

def this( sparkContext : JavaSparkContext )

Crear un objeto GlueContext con el JavaSparkContext proporcionado. Establece las particiones mínimas en 10 y las particiones de destino en 20.

  • sparkContext — La JavaSparkContext.

Devuelve el GlueContext.