Seguimiento de los datos procesados mediante marcadores de trabajo - AWS Glue

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Seguimiento de los datos procesados mediante marcadores de trabajo

AWS Glue realiza un seguimiento de los datos que ya se han procesado durante una ejecución anterior de un trabajo de ETL mediante información de estado persistente de la ejecución del trabajo. Esta información de estado persistente se llama marcador de flujo de trabajo. Los marcadores de trabajo ayudan a AWS Glue a mantener la información de estado y evitar el reprocesamiento de los datos antiguos. Con los marcadores de trabajo, puede procesar datos nuevos al volver a realizar una ejecución en un intervalo programado. Un marcador de trabajo se compone de los estados de diversos elementos de trabajos, como orígenes, transformaciones y destinos. Por ejemplo, su flujo de trabajo de ETL podría leer nuevas particiones en un archivo de Amazon S3.AWS Gluehace el seguimiento de las particiones que el flujo de trabajo ha procesado correctamente para evitar procesamiento de duplicados y duplicar datos en el almacén de datos de destino del flujo de trabajo.

Los marcadores de Job se implementan para los orígenes de datos JDBC, la transformación Relationalize y algunos orígenes de Amazon Simple Storage Service (Amazon S3). En la siguiente tabla se muestran los formatos de origen de Amazon S3 queAWS GluePara marcadores de trabajo.

AWS Glue version Formatos de origen de Amazon S3
Versión 0.9 JSON, CSV, Apache Avro, XML
Versión 1.0 y posteriores. JSON, CSV, Apache Avro, XML, Parquet, ORC

Para obtener información sobreAWS Glueversiones, consulteDefinición de propiedades de Job para trabajos de Spark.

Para los orígenes JDBC, se aplican las reglas siguientes:

  • Para cada tabla, AWS Glue utiliza una o más columnas como claves de marcador para determinar los datos nuevos y procesados. Las claves de marcador se combinan para formar una única clave compuesta.

  • Puede especificar las columnas que se van a utilizar como claves de marcador. Si no especifica ninguna clave de marcador, AWS Glue utiliza la clave principal como clave de marcador de forma predeterminada, siempre que aumente o disminuya secuencialmente (sin espacios).

  • Si se utilizan claves de marcador definidas por el usuario, deben aumentar o disminuir de forma monotónica estrictamente. Se permiten los espacios.

  • AWS Glueno admite el uso de columnas que distinguen mayúsculas de minúsculas como claves de marcadores de

Uso de marcadores de trabajo en AWS Glue

La opción de marcador de trabajo se transfiere como parámetro al iniciarse el trabajo. En la siguiente tabla se describen las opciones para la configuración de marcadores de trabajo en la pestañaAWS Glueconsola de .

Job bookmark (Marcador de flujo de trabajo) Descripción
Habilitar Hace que el trabajo actualice el estado después de una ejecución para realizar un seguimiento de datos procesados anteriormente. Si el trabajo tiene un origen con compatibilidad de marcador de trabajo, realizará un seguimiento de los datos procesados y cuando se ejecute un trabajo, procesará datos nuevos desde el último punto de control.
Deshabilitar Los marcadores de trabajo no se utilizan y el trabajo siempre procesa todo el conjunto de datos. Usted es responsable de administrar el resultado de las ejecuciones de trabajos anteriores. Esta es la opción predeterminada.
Pause

Procesar los datos incrementales desde la última ejecución correcta o los datos en el rango identificado por las siguientes subopciones, sin actualizar el estado del último marcador. Usted es responsable de administrar el resultado de las ejecuciones de trabajos anteriores. Las dos subopciones son:

  • job-bookmark-from <from-value> es el ID de ejecución que representa toda la entrada que se ha procesado hasta la última ejecución correcta antes de la misma e incluye el ID de ejecución especificado. La entrada correspondiente se ignora.

  • job-bookmark-to <to-value> es el ID de ejecución que representa toda la entrada que se ha procesado hasta la última ejecución correcta antes de la misma e incluye el ID de ejecución especificado. El trabajo procesa la entrada correspondiente, excluida la entrada identificada por <from-value>. Cualquier entrada posterior a esta entrada también se excluye para el procesamiento.

El estado del marcador de trabajo no se actualiza cuando se especifica este conjunto de opciones.

Las subopciones son opcionales; sin embargo, cuando se utilizan ambas opciones, se deben proporcionar.

Para obtener información detallada sobre los parámetros transferidos a un trabajo en la línea de comandos y para los marcadores de trabajo específicamente, consulte Parámetros especiales usados porAWSAdherencia.

Para las fuentes de entrada de Amazon S3,AWS GlueLos marcadores de trabajo de comprueban la hora de última modificación de los objetos para verificar qué objetos deben volver a procesarse. Si los datos de origen de entrada se han modificado desde la última ejecución de trabajo, los archivos se vuelven a procesar al ejecutar el trabajo de nuevo.

Puede rebobinar sus marcadores de trabajo para suAWS GlueHabilitar los trabajos de ETL a cualquier ejecución de trabajo anterior. Puede admitir escenarios de reposición de datos mejor al rebobinar sus marcadores de trabajo a cualquier ejecución de trabajo anterior reprocesando los datos solo desde la ejecución del trabajo marcado.

Si va a volver a procesar todos los datos utilizando el mismo trabajo, restablezca el marcador de trabajo. Para restablecer el estado del marcador de trabajo, utilice la consola de AWS Glue, la Acción ResetJobBookmark (Python: reset_job_bookmark)operación de API o AWS CLI. Por ejemplo, escriba el siguiente comando usando AWS CLI:

aws glue reset-job-bookmark --job-name my-job-name

Cuando rebobinas o restableces un marcador,AWS Glueno limpia los archivos de destino porque podría haber varios destinos y los destinos no se rastrean con marcadores de trabajo. Sólo se realiza un seguimiento de los archivos de origen con marcadores de trabajo. Puede crear diferentes destinos de salida al rebobinar y reprocesar los archivos de origen para evitar datos duplicados en la salida.

AWS Glue realiza el seguimiento de los marcadores de flujo de trabajo para cada flujo de trabajo. Si elimina un flujo de trabajo, el marcador de flujo de trabajo se elimina.

En algunos casos, es posible que tenga habilitados marcadores de trabajo de AWS Glue, pero su trabajo de ETL está reprocesando los datos que ya se han procesado anteriormente. Para obtener información acerca de la resolución de causas comunes de este error, consulte Solución de errores en AWS Glue.

Contexto de transformación

Muchos de los métodos de marco dinámico de AWS Glue PySpark incluyen un parámetro opcional denominado transformation_ctx, que es un identificador único para la instancia de operador de ETL. El parámetro transformation_ctx se utiliza para identificar la información de estado dentro de un marcador de trabajo para el operador determinado. En concreto, AWS Glue utiliza transformation_ctx para indexar la clave del estado de marcador.

Para que los marcadores de flujo de trabajo funcionen correctamente, habilite el parámetro de marcador de trabajo y establezca el parámetro transformation_ctx. Si no pasa el parámetro transformation_ctx, no se habilitan los marcadores de trabajo para un marco dinámico o una tabla que se usen en el método. Por ejemplo, si tiene un flujo de trabajo de ETL que lee y conecta dos orígenes de Amazon S3, puede decidir pasar eltransformation_ctxsólo a los métodos que desea habilitar marcadores. Si restablece el marcador de trabajo de un trabajo, se restablecen todas las transformaciones que están asociadas al trabajo sea cual sea el transformation_ctx utilizado.

Para obtener más información acerca de la clase DynamicFrameReader, consulte Clase DynamicFrameReader. Para obtener más información acerca de las extensiones de PySpark, consulte AWSReferencia de las extensiones de PySpark de Glue.

Uso de marcadores de trabajo con el script AWS Glue generado

En esta sección se describen más detalles operativos de utilizar marcadores de trabajo. También proporciona un ejemplo de un script que puede generar a partir de AWS Glue cuando elige un origen y un destino y ejecuta un trabajo.

Los marcadores de trabajo almacenan los estados de un trabajo. Cada instancia del estado está marcada con un nombre de trabajo y un número de versión. Cuando un script invoca job.init, recupera su estado y siempre obtiene la última versión. Dentro de un estado, hay varios elementos de estado, que son específicos de cada origen, transformación e instancia de receptor en el script. Estos elementos de estado se identifican mediante un contexto de transformación que se adjunta al elemento correspondiente (origen, transformación o receptor) en el script. Los elementos de estado se guardan de manera granular cuando job.commit se haya invocado desde el script de usuario. El script obtiene el nombre del trabajo y la opción de control de los marcadores de trabajo de los argumentos.

Los elementos de estado del marcador de trabajo son datos específicos de origen, transformación o receptor. Por ejemplo, supongamos que desea leer los datos incrementales desde una ubicación de Amazon S3 en la que esté escribiendo constantemente un flujo de trabajo o proceso ascendente. En este caso, el script debe determinar qué se ha procesado hasta el momento. La implementación de marcador de trabajo para el origen de Amazon S3 guarda la información para que cuando el trabajo se ejecute de nuevo, pueda filtrar solo los nuevos objetos usando la información guardada y recalcular el estado para la próxima ejecución del trabajo. Se utiliza una marca temporal para filtrar los archivos nuevos.

Además de los elementos de estado, los marcadores de trabajo tienen un número de ejecución, un número de intentos y un número de versión. El número de ejecución realiza un seguimiento de la ejecución del trabajo y el número de intentos registra los intentos de ejecución de un trabajo. El número de ejecuciones de un trabajo es un número que se incrementa de forma monotónica por cada ejecución correcta. El número de intentos realiza un seguimiento de los intentos de cada ejecución y solo se incrementa cuando hay una ejecución después de un intento fallido. El número de versión aumenta de forma monotónica y realiza un seguimiento de las actualizaciones de un marcador de trabajo.

A continuación, mostramos un ejemplo de un script generado para un origen de datos de Amazon S3. Las partes del script que son necesarias para utilizar marcadores de trabajo se muestran en negrita y cursiva. Para obtener más información sobre estos elementos, consulte Clase GlueContext API y Clase DynamicFrameWriter API.

# Sample Script import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [database = "database", table_name = "relatedqueries_csv", transformation_ctx = "datasource0"] ## @return: datasource0 ## @inputs: [] datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "database", table_name = "relatedqueries_csv", transformation_ctx = "datasource0") ## @type: ApplyMapping ## @args: [mapping = [("col0", "string", "name", "string"), ("col1", "string", "number", "string")], transformation_ctx = "applymapping1"] ## @return: applymapping1 ## @inputs: [frame = datasource0] applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("col0", "string", "name", "string"), ("col1", "string", "number", "string")], transformation_ctx = "applymapping1") ## @type: DataSink ## @args: [connection_type = "s3", connection_options = {"path": "s3://input_path"}, format = "json", transformation_ctx = "datasink2"] ## @return: datasink2 ## @inputs: [frame = applymapping1] datasink2 = glueContext.write_dynamic_frame.from_options(frame = applymapping1, connection_type = "s3", connection_options = {"path": "s3://input_path"}, format = "json", transformation_ctx = "datasink2") job.commit()

A continuación se muestra un ejemplo de un script generado para un origen JDBC. La tabla de origen es una tabla de empleados con la columna empno como clave principal. Aunque de forma predeterminada el trabajo utiliza una clave principal secuencial como clave de marcador si no se especifica ninguna clave de marcador, porqueempnono es necesariamente secuencial (podría haber huecos en los valores), no califica como clave de marcador predeterminada. Por lo tanto, el script designa empno explícitamente como la clave de marcador. Esa parte del código se muestra en negrita y cursiva.

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [database = "hr", table_name = "emp", transformation_ctx = "datasource0"] ## @return: datasource0 ## @inputs: [] datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "hr", table_name = "emp", transformation_ctx = "datasource0", additional_options = {"jobBookmarkKeys":["empno"],"jobBookmarkKeysSortOrder":"asc"}) ## @type: ApplyMapping ## @args: [mapping = [("ename", "string", "ename", "string"), ("hrly_rate", "decimal(38,0)", "hrly_rate", "decimal(38,0)"), ("comm", "decimal(7,2)", "comm", "decimal(7,2)"), ("hiredate", "timestamp", "hiredate", "timestamp"), ("empno", "decimal(5,0)", "empno", "decimal(5,0)"), ("mgr", "decimal(5,0)", "mgr", "decimal(5,0)"), ("photo", "string", "photo", "string"), ("job", "string", "job", "string"), ("deptno", "decimal(3,0)", "deptno", "decimal(3,0)"), ("ssn", "decimal(9,0)", "ssn", "decimal(9,0)"), ("sal", "decimal(7,2)", "sal", "decimal(7,2)")], transformation_ctx = "applymapping1"] ## @return: applymapping1 ## @inputs: [frame = datasource0] applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("ename", "string", "ename", "string"), ("hrly_rate", "decimal(38,0)", "hrly_rate", "decimal(38,0)"), ("comm", "decimal(7,2)", "comm", "decimal(7,2)"), ("hiredate", "timestamp", "hiredate", "timestamp"), ("empno", "decimal(5,0)", "empno", "decimal(5,0)"), ("mgr", "decimal(5,0)", "mgr", "decimal(5,0)"), ("photo", "string", "photo", "string"), ("job", "string", "job", "string"), ("deptno", "decimal(3,0)", "deptno", "decimal(3,0)"), ("ssn", "decimal(9,0)", "ssn", "decimal(9,0)"), ("sal", "decimal(7,2)", "sal", "decimal(7,2)")], transformation_ctx = "applymapping1") ## @type: DataSink ## @args: [connection_type = "s3", connection_options = {"path": "s3://hr/employees"}, format = "csv", transformation_ctx = "datasink2"] ## @return: datasink2 ## @inputs: [frame = applymapping1] datasink2 = glueContext.write_dynamic_frame.from_options(frame = applymapping1, connection_type = "s3", connection_options = {"path": "s3://hr/employees"}, format = "csv", transformation_ctx = "datasink2") job.commit()

Para obtener más información sobre las opciones de conexión relacionadas con los marcadores de trabajo, consulte.Valores connectionType de JDBC.