Usar marcadores de trabalho - AWS Glue

Usar marcadores de trabalho

O AWS Glue para Spark usa marcadores de trabalho para rastrear dados que já foram processados. Para obter um resumo do atributo de marcadores de emprego e o que ele suporta, consulte Rastrear dados processados usando marcadores de trabalho. Ao programar um trabalho do AWS Glue com marcadores, você tem acesso à flexibilidade indisponível em trabalhos visuais.

  • Ao ler do JDBC, você pode especificar as colunas a serem usadas como chaves de marcadores no seu script do AWS Glue.

  • Você pode escolher qual transformation_ctx aplicar a cada chamada de método.

Sempre chame job.init no início do script e job.commit no final do script com parâmetros configurados adequadamente. Essas duas funções inicializam o serviço de marcadores e atualizam a alteração de estado para o serviço. Marcadores não funcionarão sem chamá-las.

Especifique chaves de marcadores

Para fluxos de trabalho JDBC, o marcador acompanha quais linhas seu trabalho leu comparando os valores dos campos-chave com um valor marcado. Isso não é necessário nem aplicável aos fluxos de trabalho do Amazon S3. Ao escrever um script do AWS Glue sem o editor visual, você pode especificar qual coluna rastrear com marcadores. Você pode também especificar várias colunas. Lacunas na sequência de valores são permitidas ao especificar chaves de marcadores definidas pelo usuário.

Atenção

Se forem usadas chaves de marcadores definidas pelo usuário, elas deverão aumentar ou diminuir estritamente de forma monotônica. Ao selecionar campos adicionais para uma chave composta, os campos para conceitos como “versões secundárias” ou “números de revisão” não atendem a esses critérios, pois seus valores são reutilizados em todo o seu conjunto de dados.

É possível especificar jobBookmarkKeys e jobBookmarkKeysSortOrder das seguintes maneiras:

  • create_dynamic_frame.from_catalog: use additional_options.

  • create_dynamic_frame.from_options: use connection_options.

Contexto de transformação

Muitos dos métodos de quadros dinâmicos PySpark do AWS Glue incluem um parâmetro opcional chamado transformation_ctx, que é um identificador exclusivo para a instância do operador do ETL. O parâmetro transformation_ctx é usado para identificar informações de estado em um marcador de trabalho para o determinado operador. Especificamente, o AWS Glue usa transformation_ctx para indexar a chave para o marcador de estado.

Atenção

transformation_ctx serve como a chave para pesquisar o estado do marcador para uma fonte específica em seu script. Para que o marcador funcione corretamente, você sempre deve manter a fonte e o transformation_ctx associado consistentes. Alterar a propriedade da fonte ou renomear transformation_ctx pode tornar o marcador anterior inválido e talvez a filtragem baseada em carimbo de data/hora não ofereça o resultado correto.

Para que os marcadores de trabalho funcionem corretamente, habilite o parâmetro de marcador de trabalho e defina o parâmetro transformation_ctx. Se você não passar o parâmetro transformation_ctx, os marcadores de trabalho não serão habilitados para uma tabela ou quadro dinâmico usado no método. Por exemplo, se você tiver um trabalho de ETL que lê e une duas fontes do Amazon S3, poderá optar por passar o parâmetro transformation_ctx somente para os métodos em que deseja habilitar marcadores. Se você redefinir o marcador de um trabalho, ele redefinirá todas as transformações associadas ao trabalho, independentemente do transformation_ctx usado.

Para obter mais informações sobre a classe DynamicFrameReader, consulte Classe DynamicFrameReader. Para obter mais informações sobre as extensões PySpark, consulte Referência de extensões PySpark do AWS Glue.

Exemplos

Veja a seguir um exemplo de um script gerado para uma fonte de dados do Amazon S3. As partes do script necessárias para usar marcadores de trabalho são mostradas em itálico. Para obter mais informações sobre esses elementos, consulte a API Classe GlueContexte a API Classe DynamicFrameWriter.

# Sample Script import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) datasource0 = glueContext.create_dynamic_frame.from_catalog( database = "database", table_name = "relatedqueries_csv", transformation_ctx = "datasource0" ) applymapping1 = ApplyMapping.apply( frame = datasource0, mappings = [("col0", "string", "name", "string"), ("col1", "string", "number", "string")], transformation_ctx = "applymapping1" ) datasink2 = glueContext.write_dynamic_frame.from_options( frame = applymapping1, connection_type = "s3", connection_options = {"path": "s3://input_path"}, format = "json", transformation_ctx = "datasink2" ) job.commit()

Veja a seguir um exemplo de um script gerado para uma fonte JDBC. A tabela de origem é uma tabela de funcionário com a coluna empno como a chave primária. Embora, por padrão, o trabalho use uma chave primária sequencial como chave de marcador se nenhuma chave de marcador for especificada, já que empno não é necessariamente sequencial (pode haver lacunas nos valores), ela não estará qualificada como chave de marcador padrão. Portanto, o script designa explicitamente empno como a chave de marcador. Essa parte do código é mostrada em itálico.

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) datasource0 = glueContext.create_dynamic_frame.from_catalog( database = "hr", table_name = "emp", transformation_ctx = "datasource0", additional_options = {"jobBookmarkKeys":["empno"],"jobBookmarkKeysSortOrder":"asc"} ) applymapping1 = ApplyMapping.apply( frame = datasource0, mappings = [("ename", "string", "ename", "string"), ("hrly_rate", "decimal(38,0)", "hrly_rate", "decimal(38,0)"), ("comm", "decimal(7,2)", "comm", "decimal(7,2)"), ("hiredate", "timestamp", "hiredate", "timestamp"), ("empno", "decimal(5,0)", "empno", "decimal(5,0)"), ("mgr", "decimal(5,0)", "mgr", "decimal(5,0)"), ("photo", "string", "photo", "string"), ("job", "string", "job", "string"), ("deptno", "decimal(3,0)", "deptno", "decimal(3,0)"), ("ssn", "decimal(9,0)", "ssn", "decimal(9,0)"), ("sal", "decimal(7,2)", "sal", "decimal(7,2)")], transformation_ctx = "applymapping1" ) datasink2 = glueContext.write_dynamic_frame.from_options( frame = applymapping1, connection_type = "s3", connection_options = {"path": "s3://hr/employees"}, format = "csv", transformation_ctx = "datasink2" ) job.commit()