Tre tipi di job ETL di AWS Glue per la conversione dei dati in Apache Parquet - Prontuario AWS

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Tre tipi di job ETL di AWS Glue per la conversione dei dati in Apache Parquet

Creato da Adnan Alvee (AWS), Karthikeyan Ramachandran e Nith Govindasivan (AWS)

Ambiente: PoC o pilota

Tecnologie: analisi

Carico di lavoro: tutti gli altri carichi di lavoro

Servizi AWS: AWS Glue

Riepilogo

Sul cloud Amazon Web Services (AWS), AWS Glue è un servizio di estrazione, trasformazione e caricamento (ETL) completamente gestito. AWS Glue rende conveniente classificare i dati, pulirli, arricchirli e spostarli in modo affidabile tra vari archivi e flussi di dati.

Questo modello fornisce diversi tipi di lavoro in AWS Glue e utilizza tre diversi script per dimostrare la creazione di lavori ETL.

Puoi usare AWS Glue per scrivere lavori ETL in un ambiente shell Python. Puoi anche creare lavori ETL in batch e in streaming utilizzando Python PySpark () o Scala in un ambiente Apache Spark gestito. Per iniziare a creare lavori ETL, questo modello si concentra sui lavori ETL in batch che utilizzano la shell Python e Scala. PySpark I job in Python shell sono pensati per carichi di lavoro che richiedono una potenza di calcolo inferiore. L'ambiente Apache Spark gestito è pensato per carichi di lavoro che richiedono un'elevata potenza di calcolo.

Apache Parquet è progettato per supportare schemi di compressione e codifica efficienti. Può velocizzare i carichi di lavoro di analisi perché archivia i dati in modo colonnare. La conversione dei dati in Parquet può far risparmiare spazio, costi e tempo di archiviazione a lungo termine. Per saperne di più su Parquet, consulta il post del blog Apache Parquet: How to be a hero with the open source columnar data format.

Prerequisiti e limitazioni

Prerequisiti

  • Ruolo AWS Identity and Access Management (IAM) (se non hai un ruolo, consulta la sezione Informazioni aggiuntive).

Architettura

Stack tecnologico Target

  • AWS Glue

  • Amazon Simple Storage Service (Amazon S3)

  • Apache Parquet

Automazione e scalabilità

  • I flussi di lavoro AWS Glue supportano l'automazione completa di una pipeline ETL.

  • Puoi modificare il numero di unità di elaborazione dati (DPU) o i tipi di worker per scalare orizzontalmente e verticalmente.

Strumenti

Servizi AWS

  • Amazon Simple Storage Service (Amazon S3) è un servizio di archiviazione degli oggetti basato sul cloud che consente di archiviare, proteggere e recuperare qualsiasi quantità di dati.

  • AWS Glue è un servizio ETL completamente gestito per la categorizzazione, la pulizia, l'arricchimento e lo spostamento dei dati tra vari archivi e flussi di dati.

Altri strumenti

  • Apache Parquet è un formato di file di dati open source orientato alle colonne progettato per l'archiviazione e il recupero.

Configurazione

Utilizza le seguenti impostazioni per configurare la potenza di calcolo di AWS Glue ETL. Per ridurre i costi, utilizza le impostazioni minime quando esegui il carico di lavoro fornito in questo schema. 

  • Shell Python: è possibile utilizzare 1 DPU per utilizzare 16 GB di memoria o 0,0625 DPU per utilizzare 1 GB di memoria. Questo modello utilizza 0,0625 DPU, che è l'impostazione predefinita nella console AWS Glue.

  • Python o Scala per Spark: se scegli i tipi di lavoro relativi a Spark nella console, AWS Glue per impostazione predefinita utilizza 10 worker e il tipo di worker G.1X. Questo modello utilizza due lavoratori, che è il numero minimo consentito, con il tipo di lavoratore standard, che è sufficiente ed economico.

La tabella seguente mostra i diversi tipi di worker AWS Glue per l'ambiente Apache Spark. Poiché un job della shell Python non utilizza l'ambiente Apache Spark per eseguire Python, non è incluso nella tabella.

Standard

G.1X

G.2X

VPCU

4

4

8

Memoria

16 GB

16 GB

32 GB

Spazio su disco

50 GB

64 GB

128 GB

Esecutore per lavoratore

2

1

Codice

Per il codice utilizzato in questo modello, inclusa la configurazione del ruolo e dei parametri IAM, consulta la sezione Informazioni aggiuntive.

Epiche

AttivitàDescrizioneCompetenze richieste

Carica i dati in un bucket S3 nuovo o esistente.

Crea o usa un bucket S3 esistente nel tuo account. Carica il file sample_data.csv dalla sezione Allegati e annota la posizione del bucket S3 e del prefisso.

Informazioni generali su AWS
AttivitàDescrizioneCompetenze richieste

Crea il job AWS Glue.

Nella sezione ETL della console AWS Glue, aggiungi un job AWS Glue. Seleziona il tipo di lavoro appropriato, la versione di AWS Glue e il tipo di DPU/Worker corrispondente e il numero di lavoratori. Per i dettagli, consulta la sezione Configurazione.

Sviluppatore, cloud o dati

Modifica le posizioni di input e output.

Copia il codice corrispondente al tuo job AWS Glue e modifica la posizione di input e output che hai annotato nell'epopea Upload the data.

Sviluppatore, cloud o dati

Configura i parametri.

È possibile utilizzare gli snippet forniti nella sezione Informazioni aggiuntive per impostare i parametri per il job ETL. AWS Glue utilizza internamente quattro nomi di argomenti:

  • --conf

  • --debug

  • --mode

  • --JOB_NAME

Il --JOB_NAME parametro deve essere inserito in modo esplicito nella console AWS Glue. Scegliete Jobs, Edit Job, Security configuration, librerie di script e parametri del lavoro (opzionale). Immettete --JOB_NAME come chiave e fornite un valore. Puoi anche utilizzare l'AWS Command Line Interface (AWS CLI) o l'API AWS Glue per impostare questo parametro. Il --JOB_NAME parametro è usato da Spark e non è necessario in un job in ambiente shell Python.

È necessario aggiungere -- prima il nome di ogni parametro; in caso contrario, il codice non funzionerà. Ad esempio, per i frammenti di codice, i parametri di posizione devono essere richiamati da and. --input_loc --output_loc

Sviluppatore, cloud o dati

Esegui il job ETL.

Esegui il tuo lavoro e controlla l'output. Nota quanto spazio è stato ridotto rispetto al file originale.

Sviluppatore, cloud o dati

Risorse correlate

Riferimenti

Tutorial e video

Informazioni aggiuntive

Ruolo IAM

Quando crei i job AWS Glue, puoi utilizzare un ruolo IAM esistente con le autorizzazioni mostrate nel seguente frammento di codice o un nuovo ruolo.

Per creare un nuovo ruolo, usa il seguente codice YAML.

# (c) 2022 Amazon Web Services, Inc. or its affiliates. All Rights Reserved. This AWS Content is provided subject to the terms of the AWS Customer # Agreement available at https://aws.amazon.com/agreement/ or other written agreement between Customer and Amazon Web Services, Inc. AWSTemplateFormatVersion: "2010-09-09" Description: This template will setup IAM role for AWS Glue service. Resources: rGlueRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Principal: Service: - "glue.amazonaws.com" Action: - "sts:AssumeRole" ManagedPolicyArns: - arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole Policies: - PolicyName: !Sub "${AWS::StackName}-s3-limited-read-write-inline-policy" PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: - "s3:PutObject" - "s3:GetObject" Resource: "arn:aws:s3:::*/*" Tags: - Key : "Name" Value : !Sub "${AWS::StackName}" Outputs: oGlueRoleName: Description: AWS Glue IAM role Value: Ref: rGlueRole Export: Name: !Join [ ":", [ !Ref "AWS::StackName", rGlueRole ] ]

Shell Python di AWS Glue

Il codice Python utilizza Pandas e le PyArrow librerie per convertire i dati in Parquet. La libreria Pandas è già disponibile. La PyArrow libreria viene scaricata quando si esegue il pattern, perché viene eseguita una sola volta. È possibile utilizzare i file wheel PyArrow per convertirli in una libreria e fornire il file come pacchetto di libreria. Per ulteriori informazioni sulla creazione di pacchetti di file wheel, consultate Fornire la propria libreria Python.

Parametri della shell AWS Glue Python

from awsglue.utils import getResolvedOptions args = getResolvedOptions(sys.argv, ["input_loc", "output_loc"])

Codice shell AWS Glue Python

from io import BytesIO import pandas as pd import boto3 import os import io import site from importlib import reload from setuptools.command import easy_install install_path = os.environ['GLUE_INSTALLATION'] easy_install.main( ["--install-dir", install_path, "pyarrow"] ) reload(site) import pyarrow input_loc = "bucket-name/prefix/sample_data.csv" output_loc = "bucket-name/prefix/" input_bucket = input_loc.split('/', 1)[0] object_key = input_loc.split('/', 1)[1] output_loc_bucket = output_loc.split('/', 1)[0] output_loc_prefix = output_loc.split('/', 1)[1]  s3 = boto3.client('s3') obj = s3.get_object(Bucket=input_bucket, Key=object_key) df = pd.read_csv(io.BytesIO(obj['Body'].read())) parquet_buffer = BytesIO() s3_resource = boto3.resource('s3') df.to_parquet(parquet_buffer, index=False)  s3_resource.Object(output_loc_bucket, output_loc_prefix +  'data' + '.parquet').put(Body=parquet_buffer.getvalue())

Processo AWS Glue Spark con Python

Per utilizzare un tipo di lavoro AWS Glue Spark con Python, scegli Spark come tipo di lavoro. Scegli Spark 3.1, Python 3 con tempi di avvio del processo migliorati (Glue versione 3.0) come versione AWS Glue.

Parametri di AWS Glue Python

from awsglue.utils import getResolvedOptions args = getResolvedOptions(sys.argv, ["JOB_NAME", "input_loc", "output_loc"])

Processo AWS Glue Spark con codice Python

import sys from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.transforms import * from awsglue.dynamicframe import DynamicFrame from awsglue.utils import getResolvedOptions from awsglue.job import Job sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) input_loc = "bucket-name/prefix/sample_data.csv" output_loc = "bucket-name/prefix/" inputDyf = glueContext.create_dynamic_frame_from_options(\     connection_type = "s3", \     connection_options = {          "paths": [input_loc]}, \     format = "csv",     format_options={         "withHeader": True,         "separator": ","     }) outputDF = glueContext.write_dynamic_frame.from_options(\     frame = inputDyf, \     connection_type = "s3", \     connection_options = {"path": output_loc \         }, format = "parquet")    

Per un gran numero di file compressi di grandi dimensioni (ad esempio, 1.000 file di circa 3 MB ciascuno), usa il compressionType parametro con il recurse parametro per leggere tutti i file disponibili all'interno del prefisso, come mostrato nel codice seguente.

input_loc = "bucket-name/prefix/" output_loc = "bucket-name/prefix/" inputDyf = glueContext.create_dynamic_frame_from_options( connection_type = "s3", connection_options = {"paths": [input_loc], "compressionType":"gzip","recurse" :"True", }, format = "csv", format_options={"withHeader": True,"separator": ","} )

Per un numero elevato di file compressi di piccole dimensioni (ad esempio 1.000 file di circa 133 KB ciascuno), utilizzate il groupFiles parametro insieme ai parametri compressionType e. recurse Il groupFiles parametro raggruppa file di piccole dimensioni in più file di grandi dimensioni e controlla il raggruppamento alla dimensione specificata in byte (ad esempio, 1 MB). groupSize Il seguente frammento di codice fornisce un esempio di utilizzo di questi parametri all'interno del codice.

input_loc = "bucket-name/prefix/" output_loc = "bucket-name/prefix/" inputDyf = glueContext.create_dynamic_frame_from_options( connection_type = "s3", connection_options = {"paths": [input_loc], "compressionType":"gzip","recurse" :"True", "groupFiles" :"inPartition", "groupSize" :"1048576", }, format = "csv", format_options={"withHeader": True,"separator": ","} )

Senza alcuna modifica nei nodi di lavoro, queste impostazioni consentono al job AWS Glue di leggere più file (grandi o piccoli, con o senza compressione) e di scriverli sulla destinazione in formato Parquet.

Lavoro in AWS Glue Spark con Scala

Per utilizzare un tipo di lavoro AWS Glue Spark con Scala, scegli Spark come tipo di lavoro e Language come Scala. Scegli Spark 3.1, Scala 2 con tempi di avvio del lavoro migliorati (Glue versione 3.0) come versione AWS Glue. Per risparmiare spazio di archiviazione, anche il seguente esempio di AWS Glue with Scala utilizza la applyMapping funzionalità per convertire i tipi di dati.

Parametri di AWS Glue Scala

import com.amazonaws.services.glue.util.GlueArgParser val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME", "inputLoc", "outputLoc").toArray)

Job AWS Glue Spark con codice Scala

import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.MappingSpec import com.amazonaws.services.glue.DynamicFrame import com.amazonaws.services.glue.errors.CallSite import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object GlueScalaApp {   def main(sysArgs: Array[String]) {          @transient val spark: SparkContext = SparkContext.getOrCreate()     val glueContext: GlueContext = new GlueContext(spark)     val inputLoc = "s3://bucket-name/prefix/sample_data.csv"     val outputLoc = "s3://bucket-name/prefix/"     val readCSV = glueContext.getSource("csv", JsonOptions(Map("paths" -> Set(inputLoc)))).getDynamicFrame()     val applyMapping = readCSV.applyMapping(mappings = Seq(("_c0", "string", "date", "string"), ("_c1", "string", "sales", "long"),     ("_c2", "string", "profit", "double")), caseSensitive = false)     val formatPartition = applyMapping.toDF().coalesce(1)     val dynamicFrame = DynamicFrame(formatPartition, glueContext)     val dataSink = glueContext.getSinkWithFormat(         connectionType = "s3",          options = JsonOptions(Map("path" -> outputLoc )),         transformationContext = "dataSink", format = "parquet").writeDynamicFrame(dynamicFrame)   } }

Allegati

Per accedere a contenuti aggiuntivi associati a questo documento, decomprimi il seguente file: attachment.zip