Usar marcadores de trabalho
O AWS Glue para Spark usa marcadores de trabalho para rastrear dados que já foram processados. Para obter um resumo do atributo de marcadores de emprego e o que ele suporta, consulte Rastrear dados processados usando marcadores de trabalho. Ao programar um trabalho do AWS Glue com marcadores, você tem acesso à flexibilidade indisponível em trabalhos visuais.
-
Ao ler do JDBC, você pode especificar as colunas a serem usadas como chaves de marcadores no seu script do AWS Glue.
-
Você pode escolher qual
transformation_ctx
aplicar a cada chamada de método.
Sempre chame job.init
no início do script e job.commit
no final do script com parâmetros configurados adequadamente. Essas duas funções inicializam o serviço de marcadores e atualizam a alteração de estado para o serviço. Marcadores não funcionarão sem chamá-las.
Especifique chaves de marcadores
Para fluxos de trabalho JDBC, o marcador acompanha quais linhas seu trabalho leu comparando os valores dos campos-chave com um valor marcado. Isso não é necessário nem aplicável aos fluxos de trabalho do Amazon S3. Ao escrever um script do AWS Glue sem o editor visual, você pode especificar qual coluna rastrear com marcadores. Você pode também especificar várias colunas. Lacunas na sequência de valores são permitidas ao especificar chaves de marcadores definidas pelo usuário.
Atenção
Se forem usadas chaves de marcadores definidas pelo usuário, elas deverão aumentar ou diminuir estritamente de forma monotônica. Ao selecionar campos adicionais para uma chave composta, os campos para conceitos como “versões secundárias” ou “números de revisão” não atendem a esses critérios, pois seus valores são reutilizados em todo o seu conjunto de dados.
É possível especificar jobBookmarkKeys
e jobBookmarkKeysSortOrder
das seguintes maneiras:
-
create_dynamic_frame.from_catalog
: useadditional_options
. -
create_dynamic_frame.from_options
: useconnection_options
.
Contexto de transformação
Muitos dos métodos de quadros dinâmicos PySpark do AWS Glue incluem um parâmetro opcional chamado transformation_ctx
, que é um identificador exclusivo para a instância do operador do ETL. O parâmetro transformation_ctx
é usado para identificar informações de estado em um marcador de trabalho para o determinado operador. Especificamente, o AWS Glue usa transformation_ctx
para indexar a chave para o marcador de estado.
Atenção
transformation_ctx
serve como a chave para pesquisar o estado do marcador para uma fonte específica em seu script. Para que o marcador funcione corretamente, você sempre deve manter a fonte e o transformation_ctx
associado consistentes. Alterar a propriedade da fonte ou renomear transformation_ctx
pode tornar o marcador anterior inválido e talvez a filtragem baseada em carimbo de data/hora não ofereça o resultado correto.
Para que os marcadores de trabalho funcionem corretamente, habilite o parâmetro de marcador de trabalho e defina o parâmetro transformation_ctx
. Se você não passar o parâmetro transformation_ctx
, os marcadores de trabalho não serão habilitados para uma tabela ou quadro dinâmico usado no método. Por exemplo, se você tiver um trabalho de ETL que lê e une duas fontes do Amazon S3, poderá optar por passar o parâmetro transformation_ctx
somente para os métodos em que deseja habilitar marcadores. Se você redefinir o marcador de um trabalho, ele redefinirá todas as transformações associadas ao trabalho, independentemente do transformation_ctx
usado.
Para obter mais informações sobre a classe DynamicFrameReader
, consulte Classe DynamicFrameReader. Para obter mais informações sobre as extensões PySpark, consulte Referência de extensões PySpark do AWS Glue.
Exemplos
Veja a seguir um exemplo de um script gerado para uma fonte de dados do Amazon S3. As partes do script necessárias para usar marcadores de trabalho são mostradas em itálico. Para obter mais informações sobre esses elementos, consulte a API Classe GlueContexte a API Classe DynamicFrameWriter.
# Sample Script import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) datasource0 = glueContext.create_dynamic_frame.from_catalog( database = "database", table_name = "relatedqueries_csv", transformation_ctx = "datasource0" ) applymapping1 = ApplyMapping.apply( frame = datasource0, mappings = [("col0", "string", "name", "string"), ("col1", "string", "number", "string")], transformation_ctx = "applymapping1" ) datasink2 = glueContext.write_dynamic_frame.from_options( frame = applymapping1, connection_type = "s3", connection_options = {"path": "s3://input_path"}, format = "json", transformation_ctx = "datasink2" ) job.commit()
Veja a seguir um exemplo de um script gerado para uma fonte JDBC. A tabela de origem é uma tabela de funcionário com a coluna empno
como a chave primária. Embora, por padrão, o trabalho use uma chave primária sequencial como chave de marcador se nenhuma chave de marcador for especificada, já que empno
não é necessariamente sequencial (pode haver lacunas nos valores), ela não estará qualificada como chave de marcador padrão. Portanto, o script designa explicitamente empno
como a chave de marcador. Essa parte do código é mostrada em itálico.
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) datasource0 = glueContext.create_dynamic_frame.from_catalog( database = "hr", table_name = "emp", transformation_ctx = "datasource0", additional_options = {"jobBookmarkKeys":["empno"],"jobBookmarkKeysSortOrder":"asc"} ) applymapping1 = ApplyMapping.apply( frame = datasource0, mappings = [("ename", "string", "ename", "string"), ("hrly_rate", "decimal(38,0)", "hrly_rate", "decimal(38,0)"), ("comm", "decimal(7,2)", "comm", "decimal(7,2)"), ("hiredate", "timestamp", "hiredate", "timestamp"), ("empno", "decimal(5,0)", "empno", "decimal(5,0)"), ("mgr", "decimal(5,0)", "mgr", "decimal(5,0)"), ("photo", "string", "photo", "string"), ("job", "string", "job", "string"), ("deptno", "decimal(3,0)", "deptno", "decimal(3,0)"), ("ssn", "decimal(9,0)", "ssn", "decimal(9,0)"), ("sal", "decimal(7,2)", "sal", "decimal(7,2)")], transformation_ctx = "applymapping1" ) datasink2 = glueContext.write_dynamic_frame.from_options( frame = applymapping1, connection_type = "s3", connection_options = {"path": "s3://hr/employees"}, format = "csv", transformation_ctx = "datasink2" ) job.commit()