Utilizzo di segnalibri di processo - AWS Glue

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Utilizzo di segnalibri di processo

AWS Glue per Spark monitora i dati già elaborati attraverso i segnalibri dei processi. Per un riepilogo della funzionalità dei segnalibri di processo e di ciò che supportano, consulta la pagina Monitoraggio dei dati elaborati mediante segnalibri di processo. Quando si programma un processo AWS Glue con segnalibri, si ha accesso a una flessibilità non disponibile nei processi visivi.

  • Durante la lettura da JDBC, è possibile specificare le colonne da utilizzare come chiavi di segnalibro nello script AWS Glue.

  • È possibile scegliere quale transformation_ctx applicare a ciascuna chiamata al metodo.

Chiama sempre job.init all'inizio dello script e alla job.commit fine dello script con parametri configurati in modo appropriato. Queste due funzioni inizializzano il servizio di segnalibri e aggiornano la modifica dello stato al servizio. I segnalibri non funzioneranno senza che vengano richiamati.

Specifica delle chiavi dei segnalibri

Per i flussi di lavoro JDBC, il segnalibro tiene traccia delle righe lette dal processo confrontando i valori dei campi chiave con un valore aggiunto ai segnalibri. Questa operazione non è necessaria o applicabile per i flussi di processo Amazon S3. Quando scrivi uno script AWS Glue senza l'editor visivo, puoi specificare la colonna di cui tenere traccia mediante i segnalibri. È possibile specificare anche più colonne. Sono consentite lacune nella sequenza di valori quando si specificano chiavi di segnalibro definite dall'utente.

avvertimento

Se vengono utilizzate le chiavi di segnalibro definite dall'utente, devono essere fornite rigorosamente e monotonicamente in ordine crescente o decrescente. Quando si selezionano campi aggiuntivi per una chiave composta, i campi relativi a concetti come "versioni secondarie" o "numeri di revisione" non soddisfano questi criteri, poiché i loro valori vengono riutilizzati in tutto il set di dati.

È possibile specificare jobBookmarkKeys e jobBookmarkKeysSortOrder nei seguenti modi:

  • create_dynamic_frame.from_catalog: utilizza additional_options.

  • create_dynamic_frame.from_options: utilizza connection_options.

Contesto di trasformazione

Molti dei metodi di frame AWS Glue PySpark dinamici includono un parametro opzionale denominatotransformation_ctx, che è un identificatore univoco per l'istanza dell'operatore ETL. Il parametro transformation_ctx viene utilizzato per identificare le informazioni sullo stato all'interno di un segnalibro di processo per un determinato operatore. Nello specifico, AWS Glue utilizza transformation_ctx per indicizzare la chiave per lo stato del segnalibro.

avvertimento

Il parametro transformation_ctx serve come chiave per cercare nello stato del segnalibro una fonte specifica nello script. Affinché il segnalibro funzioni correttamente, è necessario mantenere sempre la fonte e il parametro transformation_ctx associato coerenti. Modificare la proprietà di origine o rinominare il parametro transformation_ctx potrebbe rendere il segnalibro precedente non valido e il filtro basato sulla marca temporale potrebbe non produrre il risultato corretto.

Per fare in modo che i segnalibri del processo funzionino correttamente, abilita il parametro del segnalibro del processo e imposta il parametro transformation_ctx. Se non passi il parametro transformation_ctx, i segnalibri del processo non sono abilitati per un frame dinamico oppure nel metodo viene utilizzata una tabella. Ad esempio, in presenza di un processo ETL che legge e unisce due origini Amazon S3, puoi scegliere di passare il parametro transformation_ctx solo ai metodi per cui vuoi abilitare i segnalibri. Reimpostando il segnalibro per un processo, si reimpostano tutte le trasformazioni associate al processo, indipendentemente dal transformation_ctx utilizzato.

Per ulteriori informazioni sulla classe DynamicFrameReader, consulta DynamicFrameReader classe. Per ulteriori informazioni sulle PySpark estensioni, vedere. Informazioni di riferimento sulle estensioni PySpark AWS Glue

Esempi

Di seguito è riportato un esempio di script generato per un'origine dati Amazon S3. Le parti dello script necessarie per l'utilizzo dei segnalibri di processo sono visualizzate in corsivo. Per ulteriori informazioni su questi elementi, consulta le API GlueContext classe e le API Classe DynamicFrameWriter.

# Sample Script import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) datasource0 = glueContext.create_dynamic_frame.from_catalog( database = "database", table_name = "relatedqueries_csv", transformation_ctx = "datasource0" ) applymapping1 = ApplyMapping.apply( frame = datasource0, mappings = [("col0", "string", "name", "string"), ("col1", "string", "number", "string")], transformation_ctx = "applymapping1" ) datasink2 = glueContext.write_dynamic_frame.from_options( frame = applymapping1, connection_type = "s3", connection_options = {"path": "s3://input_path"}, format = "json", transformation_ctx = "datasink2" ) job.commit()

Di seguito è riportato un esempio di script generato per un'origine JDBC. La tabella di origine è una tabella dipendente con la colonna empno come chiave primaria. Sebbene per impostazione predefinita il processo utilizzi una chiave primaria sequenziale come chiave di segnalibro se non viene specificata alcuna chiave di segnalibro, poiché empno non è necessariamente sequenziale (potrebbero esserci delle lacune nei valori), non si qualifica come chiave segnalibro predefinita. Di conseguenza, lo script designa esplicitamente empno come chiave di segnalibro. Quella parte del codice è mostrata in corsivo.

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) datasource0 = glueContext.create_dynamic_frame.from_catalog( database = "hr", table_name = "emp", transformation_ctx = "datasource0", additional_options = {"jobBookmarkKeys":["empno"],"jobBookmarkKeysSortOrder":"asc"} ) applymapping1 = ApplyMapping.apply( frame = datasource0, mappings = [("ename", "string", "ename", "string"), ("hrly_rate", "decimal(38,0)", "hrly_rate", "decimal(38,0)"), ("comm", "decimal(7,2)", "comm", "decimal(7,2)"), ("hiredate", "timestamp", "hiredate", "timestamp"), ("empno", "decimal(5,0)", "empno", "decimal(5,0)"), ("mgr", "decimal(5,0)", "mgr", "decimal(5,0)"), ("photo", "string", "photo", "string"), ("job", "string", "job", "string"), ("deptno", "decimal(3,0)", "deptno", "decimal(3,0)"), ("ssn", "decimal(9,0)", "ssn", "decimal(9,0)"), ("sal", "decimal(7,2)", "sal", "decimal(7,2)")], transformation_ctx = "applymapping1" ) datasink2 = glueContext.write_dynamic_frame.from_options( frame = applymapping1, connection_type = "s3", connection_options = {"path": "s3://hr/employees"}, format = "csv", transformation_ctx = "datasink2" ) job.commit()