Edición de scripts en AWS Glue - AWS Glue

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Edición de scripts en AWS Glue

Los scripts contienen código que extrae datos de orígenes, los transforma y los carga en destinos. AWS Glue ejecuta un script cuando inicia un flujo de trabajo.

Los scripts de ETL de AWS Glue pueden codificarse en Python o Scala. Los scripts de Python utilizan un lenguaje que es una extensión del dialecto Python de PySpark para los trabajos de extraer, transformar y cargar (ETL). El script contiene constructos ampliados para gestionar las transformaciones de ETL. Al generar automáticamente lógica de código fuente para el flujo de trabajo, se crea un script. Puede editar este script o proporcionar su propio script para procesar el flujo de trabajo de ETL.

Para obtener más información acerca de cómo definir y editar scripts con la consola de AWS Glue, consulte Trabajo con scripts en la consola de AWS Glue.

Definición de scripts

Con un origen y un destino determinados, AWS Glue puede generar un script para transformar los datos. Esta propuesta de script es una versión inicial que se ajusta a los orígenes y los destinos y plantea transformaciones en PySpark. Puede verificar y modificar el script para adaptarse a sus necesidades empresariales. Utilice el editor de scripts en AWS Glue para añadir argumentos que especifiquen el origen y el destino, así como cualquier otro argumento que deba ejecutarse. Los trabajos ejecutan scripts y los disparadores inician trabajos, y esto se puede basar en una programación o un evento. Para obtener más información acerca de los disparadores, consulte Inicio de trabajos y rastreadores mediante desencadenadores.

En la consola de AWS Glue, el script se representa en forma de código. También puede ver el script como un diagrama que utiliza anotaciones (# #) incrustadas en el script. Estas anotaciones describen los parámetros, los tipos de transformación, los argumentos, las entradas y otras características del script que se utilizan para generar un diagrama en la consola de AWS Glue.

El diagrama del script contiene lo siguiente:

  • Entradas de origen para el script

  • Transformaciones

  • Salidas de destino que escribe el script

Los scripts puede incluir las anotaciones siguientes:

Anotación Uso
@params Parámetros del flujo de trabajo de ETL que requiere el script.
@type Tipo de nodo en el diagrama, como el tipo de transformación, el origen de datos o el receptor de datos.
@args Argumentos que se transfieren al nodo, salvo las referencias a los datos de entrada.
@return Variable que devuelve el script.
@inputs Entrada de datos al nodo.

Para obtener más información sobre los constructos de código en un script, consulte ProgramaAWSGlue scripts de ETL en Python.

A continuación se muestra un ejemplo de un script generado por AWS Glue. El script es para un trabajo que copia un conjunto de datos sencillo desde una ubicación Amazon Simple Storage Service (Amazon S3) a otra, cambiando el formato de CSV a JSON. Después de código de inicialización, el script incluye comandos que especifican el origen de datos, los mapeos y el destino (receptor de datos). Tenga en cuenta también las anotaciones.

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [database = "sample-data", table_name = "taxi_trips", transformation_ctx = "datasource0"] ## @return: datasource0 ## @inputs: [] datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "sample-data", table_name = "taxi_trips", transformation_ctx = "datasource0") ## @type: ApplyMapping ## @args: [mapping = [("vendorid", "long", "vendorid", "long"), ("tpep_pickup_datetime", "string", "tpep_pickup_datetime", "string"), ("tpep_dropoff_datetime", "string", "tpep_dropoff_datetime", "string"), ("passenger_count", "long", "passenger_count", "long"), ("trip_distance", "double", "trip_distance", "double"), ("ratecodeid", "long", "ratecodeid", "long"), ("store_and_fwd_flag", "string", "store_and_fwd_flag", "string"), ("pulocationid", "long", "pulocationid", "long"), ("dolocationid", "long", "dolocationid", "long"), ("payment_type", "long", "payment_type", "long"), ("fare_amount", "double", "fare_amount", "double"), ("extra", "double", "extra", "double"), ("mta_tax", "double", "mta_tax", "double"), ("tip_amount", "double", "tip_amount", "double"), ("tolls_amount", "double", "tolls_amount", "double"), ("improvement_surcharge", "double", "improvement_surcharge", "double"), ("total_amount", "double", "total_amount", "double")], transformation_ctx = "applymapping1"] ## @return: applymapping1 ## @inputs: [frame = datasource0] applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("vendorid", "long", "vendorid", "long"), ("tpep_pickup_datetime", "string", "tpep_pickup_datetime", "string"), ("tpep_dropoff_datetime", "string", "tpep_dropoff_datetime", "string"), ("passenger_count", "long", "passenger_count", "long"), ("trip_distance", "double", "trip_distance", "double"), ("ratecodeid", "long", "ratecodeid", "long"), ("store_and_fwd_flag", "string", "store_and_fwd_flag", "string"), ("pulocationid", "long", "pulocationid", "long"), ("dolocationid", "long", "dolocationid", "long"), ("payment_type", "long", "payment_type", "long"), ("fare_amount", "double", "fare_amount", "double"), ("extra", "double", "extra", "double"), ("mta_tax", "double", "mta_tax", "double"), ("tip_amount", "double", "tip_amount", "double"), ("tolls_amount", "double", "tolls_amount", "double"), ("improvement_surcharge", "double", "improvement_surcharge", "double"), ("total_amount", "double", "total_amount", "double")], transformation_ctx = "applymapping1") ## @type: DataSink ## @args: [connection_type = "s3", connection_options = {"path": "s3://example-data-destination/taxi-data"}, format = "json", transformation_ctx = "datasink2"] ## @return: datasink2 ## @inputs: [frame = applymapping1] datasink2 = glueContext.write_dynamic_frame.from_options(frame = applymapping1, connection_type = "s3", connection_options = {"path": "s3://example-data-destination/taxi-data"}, format = "json", transformation_ctx = "datasink2") job.commit()