Seguimiento de los datos procesados mediante marcadores de trabajo - AWS Glue

Seguimiento de los datos procesados mediante marcadores de trabajo

AWS Glue realiza un seguimiento de los datos que ya se han procesado durante una ejecución anterior de un trabajo de ETL mediante información de estado persistente de la ejecución del trabajo. Esta información de estado persistente se llama marcador de flujo de trabajo. Los marcadores de trabajo ayudan a AWS Glue a mantener la información de estado y evitar el reprocesamiento de los datos antiguos. Con los marcadores de trabajo, puede procesar datos nuevos al volver a realizar una ejecución en un intervalo programado. Un marcador de trabajo se compone de los estados de diversos elementos de trabajos, como orígenes, transformaciones y destinos. Por ejemplo, su trabajo de ETL podría leer nuevas particiones en un archivo de Amazon S3. AWS Glue hace el seguimiento de las particiones que el trabajo ha procesado correctamente para evitar procesamiento y datos duplicados en el almacén de datos de destino del trabajo.

Los marcadores de trabajo se implementan para los orígenes de datos JDBC, la transformación Relationalize (Establecimiento de relaciones) y algunos orígenes Amazon Simple Storage Service (Amazon S3). En la siguiente tabla se muestran los formatos de origen Amazon S3 que AWS Glue soporta para los marcadores del trabajo.

Versión de AWS Glue Formatos de origen de Amazon S3
Versión 0.9 JSON, CSV, Apache Avro, XML
Versión 1.0 y posteriores. JSON, CSV, Apache Avro, XML, Parquet, ORC

Para obtener información acerca de las versiones de AWS Glue, consulte Definición de propiedades de trabajo para trabajos de Spark.

Para los orígenes JDBC, se aplican las reglas siguientes:

  • Para cada tabla, AWS Glue utiliza una o más columnas como claves de marcador para determinar los datos nuevos y procesados. Las claves de marcador se combinan para formar una única clave compuesta.

  • Puede especificar las columnas que se van a utilizar como claves de marcador. Si no especifica ninguna clave de marcador, AWS Glue utiliza la clave principal como clave de marcador de forma predeterminada, siempre que aumente o disminuya secuencialmente (sin espacios).

  • Si se utilizan claves de marcador definidas por el usuario, deben aumentar o disminuir de forma monotónica estrictamente. Se permiten los espacios.

  • AWS Glue no admite el uso de columnas que distinguen mayúsculas de minúsculas como claves de marcadores de trabajo.

Uso de marcadores de trabajo en AWS Glue (AWS Glue)

La opción de marcador de trabajo se transfiere como parámetro al iniciarse el trabajo. En la siguiente tabla se describen las opciones para la configuración de marcadores de trabajo en la consola de AWS Glue.

Job bookmark (Marcador de flujo de trabajo) Descripción
Habilitar Hace que el trabajo actualice el estado después de una ejecución para realizar un seguimiento de datos procesados anteriormente. Si el trabajo tiene un origen con compatibilidad de marcador de trabajo, realizará un seguimiento de los datos procesados y cuando se ejecute un trabajo, procesará datos nuevos desde el último punto de control.
Deshabilitar Los marcadores de trabajo no se utilizan y el trabajo siempre procesa todo el conjunto de datos. Usted es responsable de administrar el resultado de las ejecuciones de trabajos anteriores. Esta es la opción predeterminada.
Pause

Procesar los datos incrementales desde la última ejecución correcta o los datos en el rango identificado por las siguientes subopciones, sin actualizar el estado del último marcador. Usted es responsable de administrar el resultado de las ejecuciones de trabajos anteriores. Las dos subopciones son:

  • job-bookmark-from <from-value> es el ID de ejecución que representa toda la entrada que se ha procesado hasta la última ejecución correcta antes de la misma e incluye el ID de ejecución especificado. La entrada correspondiente se ignora.

  • job-bookmark-to <to-value> es el ID de ejecución que representa toda la entrada que se ha procesado hasta la última ejecución correcta antes de la misma e incluye el ID de ejecución especificado. El trabajo procesa la entrada correspondiente, excluida la entrada identificada por <from-value>. Cualquier entrada posterior a esta entrada también se excluye para el procesamiento.

El estado del marcador de trabajo no se actualiza cuando se especifica este conjunto de opciones.

Las subopciones son opcionales; sin embargo, cuando se utilizan ambas opciones, se deben proporcionar.

Para obtener información detallada sobre los parámetros transferidos a un trabajo en la línea de comandos y para los marcadores de trabajo específicamente, consulte Parámetros del flujo de trabajo utilizados po AWS Glue (AWS Glue).

Para orígenes de entrada de Amazon S3, los marcadores de trabajo de AWS Glue comprueban la hora de última modificación de los objetos para verificar qué objetos deben volver a procesarse. Si los datos de origen de entrada se han modificado desde la última ejecución de trabajo, los archivos se vuelven a procesar al ejecutar el trabajo de nuevo.

Puede retroceder sus marcadores de trabajo para sus trabajos de Spark ETL de AWS Glue a cualquier ejecución de trabajo anterior. Puede admitir escenarios de reposición de datos mejor al rebobinar sus marcadores de trabajo a cualquier ejecución de trabajo anterior reprocesando los datos solo desde la ejecución del trabajo marcado.

Si va a volver a procesar todos los datos utilizando el mismo trabajo, restablezca el marcador de trabajo. Para restablecer el estado del marcador de trabajo, utilice la consola de AWS Glue, la Acción ResetJobBookmark (Python: reset_job_bookmark)operación de API o AWS CLI. Por ejemplo, escriba el siguiente comando usando AWS CLI:

aws glue reset-job-bookmark --job-name my-job-name

Al retroceder o restablecer un marcador, AWS Glue no limpia los archivos de destino porque podría haber varios destinos y los destinos no se rastrean con marcadores de trabajo. Sólo los archivos de origen se rastrean con marcadores de trabajo. Puede crear diferentes destinos de salida al retroceder y reprocesar los archivos de origen para evitar datos duplicados en la salida.

AWS Glue realiza el seguimiento de los marcadores de flujo de trabajo para cada flujo de trabajo. Si elimina un flujo de trabajo, el marcador de flujo de trabajo se elimina.

En algunos casos, es posible que tenga habilitados marcadores de trabajo de AWS Glue, pero su trabajo de ETL está reprocesando los datos que ya se han procesado anteriormente. Para obtener información acerca de la resolución de causas comunes de este error, consulte Solución de errores en AWS Glue (AWS Glue).

Contexto de transformación

Muchos de los métodos de marco dinámico de AWS Glue PySpark incluyen un parámetro opcional denominado transformation_ctx, que es un identificador único para la instancia de operador de ETL. El parámetro transformation_ctx se utiliza para identificar la información de estado dentro de un marcador de trabajo para el operador determinado. En concreto, AWS Glue utiliza transformation_ctx para indexar la clave del estado de marcador.

Para que los marcadores de flujo de trabajo funcionen correctamente, habilite el parámetro de marcador de trabajo y establezca el parámetro transformation_ctx. Si no pasa el parámetro transformation_ctx, no se habilitan los marcadores de trabajo para un marco dinámico o una tabla que se usen en el método. Por ejemplo, si tiene un trabajo de ETL que lee y combina dos orígenes de Amazon S3, puede decidir transferir el parámetro transformation_ctx únicamente a los métodos para los que desea habilitar marcadores. Si restablece el marcador de trabajo de un trabajo, se restablecen todas las transformaciones que están asociadas al trabajo sea cual sea el transformation_ctx utilizado.

Para obtener más información acerca de la clase DynamicFrameReader, consulte Clase DynamicFrameReader. Para obtener más información acerca de las extensiones de PySpark, consulte Referencia de las extensiones de PySpark de AWS Glue (AWS Glue).

Uso de marcadores de trabajo con el script generado de AWS Glue (AWS Glue)

En esta sección se describen más detalles operativos de utilizar marcadores de trabajo. También proporciona un ejemplo de un script que puede generar a partir de AWS Glue cuando elige un origen y un destino y ejecuta un trabajo.

Los marcadores de trabajo almacenan los estados de un trabajo. Cada instancia del estado está marcada con un nombre de trabajo y un número de versión. Cuando un script invoca job.init, recupera su estado y siempre obtiene la última versión. Dentro de un estado, hay varios elementos de estado, que son específicos de cada origen, transformación e instancia de receptor en el script. Estos elementos de estado se identifican mediante un contexto de transformación que se adjunta al elemento correspondiente (origen, transformación o receptor) en el script. Los elementos de estado se guardan de manera granular cuando job.commit se haya invocado desde el script de usuario. El script obtiene el nombre del trabajo y la opción de control de los marcadores de trabajo de los argumentos.

Los elementos de estado del marcador de trabajo son datos específicos de origen, transformación o receptor. Por ejemplo, supongamos que desea leer los datos progresivos desde una ubicación de Amazon S3 a la que un proceso o trabajo ascendente escribe en forma constante. En este caso, el script debe determinar qué se ha procesado hasta el momento. La implementación de marcador de trabajo para el origen de Amazon S3 guarda la información para que cuando el trabajo se ejecute de nuevo, pueda filtrar solo los nuevos objetos mediante la información guardada y recalcular el estado para la próxima ejecución del trabajo. Se utiliza una marca temporal para filtrar los archivos nuevos.

Además de los elementos de estado, los marcadores de trabajo tienen un número de ejecución, un número de intentos y un número de versión. El número de ejecución realiza un seguimiento de la ejecución del trabajo y el número de intentos registra los intentos de ejecución de un trabajo. El número de ejecuciones de un trabajo es un número que se incrementa de forma monotónica por cada ejecución correcta. El número de intentos realiza un seguimiento de los intentos de cada ejecución y solo se incrementa cuando hay una ejecución después de un intento fallido. El número de versión aumenta de forma monotónica y realiza un seguimiento de las actualizaciones de un marcador de trabajo.

En la base de datos del servicio AWS Glue, los estados de favorito de todas las transformaciones se almacenan juntos como pares clave-valor:

{ "job_name" : ..., "run_id": ..., "run_number": .., "attempt_number": ... "states": { "transformation_ctx1" : { bookmark_state1 }, "transformation_ctx2" : { bookmark_state2 } } }

transformation_ctx sirve de clave para realizar búsquedas en el estado de favorito de un origen específico del script. Para que el favorito funcione correctamente, siempre se debe conservar la coherencia entre el origen y el elemento transformation_ctx asociado. Modificar la propiedad de origen o cambiarle el nombre a transformation_ctx puede hacer que el favorito anterior deje de ser válido y que el filtrado basado en marcas temporales no produzca el resultado correcto.

Prácticas recomendadas

A continuación, se indican las prácticas recomendadas para utilizar favoritos de trabajos con el script generado de AWS Glue.

  • Coloque siempre job.init() al principio del script y job.commit() al final. Estas dos funciones se utilizan para inicializar el servicio de favoritos y actualizar el servicio con el cambio de estado. Los favoritos no funcionarán sin llamarlos.

  • No cambie la propiedad del origen de datos con el favorito habilitado. Por ejemplo, hay un origen de datos datasource0 que apunta a una ruta de entrada A de Simple Storage Service (Amazon S3), y el trabajo ha estado leyendo desde un origen que se ha estado ejecutando durante varias rondas con el favorito habilitado. Si cambia la ruta de entrada de datasource0 a la ruta B de Simple Storage Service (Amazon S3) sin cambiar transformation_ctx, el trabajo de AWS Glue utilizará el antiguo estado de favorito almacenado. Eso provocará que se omitan o falten archivos en la ruta de entrada B, ya que AWS Glue presupondrá que esos archivos se han procesado en ejecuciones anteriores.

  • Utilice una tabla de catálogo con favoritos para mejorar la administración de particiones. Los favoritos funcionan para orígenes de datos de Data Catalog o de opciones. No obstante, resulta difícil eliminar/agregar nuevas particiones con el enfoque de opciones. Utilizar una tabla de catálogo con rastreadores puede proporcionar una mejor automatización para realizar un seguimiento de las nuevas particiones agregadas, y ofrece flexibilidad para seleccionar particiones concretas con un predicado de inserción.

  • Utilice el enumerador de archivos de Simple Storage Service (Amazon S3) de AWS Glue para conjuntos de datos de gran tamaño. Un favorito mostrará una lista de todos los archivos de cada partición de entrada y realizará el filtrado, de modo que, si hay demasiados archivos en una sola partición, puede que el controlador del favorito se quede sin suficiente memoria. Utilice el enumerador de archivos de Simple Storage Service (Amazon S3) de AWS Glue para evitar mostrar una lista de todos los archivos de la memoria a la vez.

A continuación, se muestra un ejemplo de un script generado para un origen de datos de Amazon S3. Las partes del script que son necesarias para utilizar marcadores de trabajo se muestran en negrita y cursiva. Para obtener más información sobre estos elementos, consulte Clase GlueContext API y Clase DynamicFrameWriter API.

# Sample Script import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [database = "database", table_name = "relatedqueries_csv", transformation_ctx = "datasource0"] ## @return: datasource0 ## @inputs: [] datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "database", table_name = "relatedqueries_csv", transformation_ctx = "datasource0") ## @type: ApplyMapping ## @args: [mapping = [("col0", "string", "name", "string"), ("col1", "string", "number", "string")], transformation_ctx = "applymapping1"] ## @return: applymapping1 ## @inputs: [frame = datasource0] applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("col0", "string", "name", "string"), ("col1", "string", "number", "string")], transformation_ctx = "applymapping1") ## @type: DataSink ## @args: [connection_type = "s3", connection_options = {"path": "s3://input_path"}, format = "json", transformation_ctx = "datasink2"] ## @return: datasink2 ## @inputs: [frame = applymapping1] datasink2 = glueContext.write_dynamic_frame.from_options(frame = applymapping1, connection_type = "s3", connection_options = {"path": "s3://input_path"}, format = "json", transformation_ctx = "datasink2") job.commit()

A continuación se muestra un ejemplo de un script generado para un origen JDBC. La tabla de origen es una tabla de empleados con la columna empno como clave principal. Aunque el trabajo utiliza una clave principal secuencial como clave de marcador predeterminada, si no se especifica ninguna clave de marcador, porque empno no es necesariamente secuencial (podría haber brechas en los valores), esta clave no califica como clave de marcador predeterminada. Por lo tanto, el script designa empno explícitamente como la clave de marcador. Esa parte del código se muestra en negrita y cursiva.

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [database = "hr", table_name = "emp", transformation_ctx = "datasource0"] ## @return: datasource0 ## @inputs: [] datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "hr", table_name = "emp", transformation_ctx = "datasource0", additional_options = {"jobBookmarkKeys":["empno"],"jobBookmarkKeysSortOrder":"asc"}) ## @type: ApplyMapping ## @args: [mapping = [("ename", "string", "ename", "string"), ("hrly_rate", "decimal(38,0)", "hrly_rate", "decimal(38,0)"), ("comm", "decimal(7,2)", "comm", "decimal(7,2)"), ("hiredate", "timestamp", "hiredate", "timestamp"), ("empno", "decimal(5,0)", "empno", "decimal(5,0)"), ("mgr", "decimal(5,0)", "mgr", "decimal(5,0)"), ("photo", "string", "photo", "string"), ("job", "string", "job", "string"), ("deptno", "decimal(3,0)", "deptno", "decimal(3,0)"), ("ssn", "decimal(9,0)", "ssn", "decimal(9,0)"), ("sal", "decimal(7,2)", "sal", "decimal(7,2)")], transformation_ctx = "applymapping1"] ## @return: applymapping1 ## @inputs: [frame = datasource0] applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("ename", "string", "ename", "string"), ("hrly_rate", "decimal(38,0)", "hrly_rate", "decimal(38,0)"), ("comm", "decimal(7,2)", "comm", "decimal(7,2)"), ("hiredate", "timestamp", "hiredate", "timestamp"), ("empno", "decimal(5,0)", "empno", "decimal(5,0)"), ("mgr", "decimal(5,0)", "mgr", "decimal(5,0)"), ("photo", "string", "photo", "string"), ("job", "string", "job", "string"), ("deptno", "decimal(3,0)", "deptno", "decimal(3,0)"), ("ssn", "decimal(9,0)", "ssn", "decimal(9,0)"), ("sal", "decimal(7,2)", "sal", "decimal(7,2)")], transformation_ctx = "applymapping1") ## @type: DataSink ## @args: [connection_type = "s3", connection_options = {"path": "s3://hr/employees"}, format = "csv", transformation_ctx = "datasink2"] ## @return: datasink2 ## @inputs: [frame = applymapping1] datasink2 = glueContext.write_dynamic_frame.from_options(frame = applymapping1, connection_type = "s3", connection_options = {"path": "s3://hr/employees"}, format = "csv", transformation_ctx = "datasink2") job.commit()

Puede especificar jobBookmarkKeys y jobBookmarkKeysSortOrder de las siguientes maneras:

  • create_dynamic_frame.from_catalog: utilice additional_options.

  • create_dynamic_frame.from_options: utilice connection_options.

Para obtener más información acerca de las opciones de conexión relacionadas con los marcadores de trabajo, consulte Valores connectionType de JDBC.