Exemple de projet de plan - AWS Glue

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Exemple de projet de plan

La conversion de format de données est un cas d'utilisation fréquent d'extraction, de transformation et de chargement (ETL). Dans les charges de travail analytiques typiques, les formats de fichiers basés sur des colonnes comme Parquet ou ORC sont préférés aux formats de texte tels que CSV ou JSON. Cet exemple de modèle vous permet de convertir des données de type CSV/JSON/etc. au format Parquet pour les fichiers sur Amazon S3.

Ce modèle prend une liste de chemins S3 définis par un paramètre de modèle, convertit les données au format Parquet et les écrit à l'emplacement S3 spécifié par un autre paramètre de modèle. Le script de structure crée un crawler et une tâche pour chaque chemin d'accès. Le script de structure télécharge également le script ETL se trouvant dans Conversion.py vers un compartiment S3 spécifié par un autre paramètre de modèle. Le script de structure spécifie ensuite le script téléchargé en tant que script ETL pour chaque tâche. L'archive ZIP du projet contient le script de structure, le script ETL et le fichier de configuration du modèle.

Pour plus d'informations sur d'autres projets de modèle, veuillez consulter la rubrique Exemples de plans.

Ce qui suit est le script de structure, dans le fichier Layout.py.

from awsglue.blueprint.workflow import * from awsglue.blueprint.job import * from awsglue.blueprint.crawler import * import boto3 s3_client = boto3.client('s3') # Ingesting all the S3 paths as Glue table in parquet format def generate_layout(user_params, system_params): #Always give the full path for the file with open("ConversionBlueprint/Conversion.py", "rb") as f: s3_client.upload_fileobj(f, user_params['ScriptsBucket'], "Conversion.py") etlScriptLocation = "s3://{}/Conversion.py".format(user_params['ScriptsBucket']) crawlers = [] jobs = [] workflowName = user_params['WorkflowName'] for path in user_params['S3Paths']: tablePrefix = "source_" crawler = Crawler(Name="{}_crawler".format(workflowName), Role=user_params['PassRole'], DatabaseName=user_params['TargetDatabase'], TablePrefix=tablePrefix, Targets= {"S3Targets": [{"Path": path}]}) crawlers.append(crawler) transform_job = Job(Name="{}_transform_job".format(workflowName), Command={"Name": "glueetl", "ScriptLocation": etlScriptLocation, "PythonVersion": "3"}, Role=user_params['PassRole'], DefaultArguments={"--database_name": user_params['TargetDatabase'], "--table_prefix": tablePrefix, "--region_name": system_params['region'], "--output_path": user_params['TargetS3Location']}, DependsOn={crawler: "SUCCEEDED"}, WaitForDependencies="AND") jobs.append(transform_job) conversion_workflow = Workflow(Name=workflowName, Entities=Entities(Jobs=jobs, Crawlers=crawlers)) return conversion_workflow

Ce qui suit est le fichier de configuration du modèle correspondant blueprint.cfg.

{ "layoutGenerator": "ConversionBlueprint.Layout.generate_layout", "parameterSpec" : { "WorkflowName" : { "type": "String", "collection": false, "description": "Name for the workflow." }, "S3Paths" : { "type": "S3Uri", "collection": true, "description": "List of Amazon S3 paths for data ingestion." }, "PassRole" : { "type": "IAMRoleName", "collection": false, "description": "Choose an IAM role to be used in running the job/crawler" }, "TargetDatabase": { "type": "String", "collection" : false, "description": "Choose a database in the Data Catalog." }, "TargetS3Location": { "type": "S3Uri", "collection" : false, "description": "Choose an Amazon S3 output path: ex:s3://<target_path>/." }, "ScriptsBucket": { "type": "S3Bucket", "collection": false, "description": "Provide an S3 bucket name(in the same AWS Region) to store the scripts." } } }

Le script suivant dans le fichier Conversion.py est le script ETL téléchargé. Notez qu'il conserve le schéma de partitionnement pendant la conversion.

import sys from pyspark.sql.functions import * from pyspark.context import SparkContext from awsglue.transforms import * from awsglue.context import GlueContext from awsglue.job import Job from awsglue.utils import getResolvedOptions import boto3 args = getResolvedOptions(sys.argv, [ 'JOB_NAME', 'region_name', 'database_name', 'table_prefix', 'output_path']) databaseName = args['database_name'] tablePrefix = args['table_prefix'] outputPath = args['output_path'] glue = boto3.client('glue', region_name=args['region_name']) glue_context = GlueContext(SparkContext.getOrCreate()) spark = glue_context.spark_session job = Job(glue_context) job.init(args['JOB_NAME'], args) def get_tables(database_name, table_prefix): tables = [] paginator = glue.get_paginator('get_tables') for page in paginator.paginate(DatabaseName=database_name, Expression=table_prefix+"*"): tables.extend(page['TableList']) return tables for table in get_tables(databaseName, tablePrefix): tableName = table['Name'] partitionList = table['PartitionKeys'] partitionKeys = [] for partition in partitionList: partitionKeys.append(partition['Name']) # Create DynamicFrame from Catalog dyf = glue_context.create_dynamic_frame.from_catalog( name_space=databaseName, table_name=tableName, additional_options={ 'useS3ListImplementation': True }, transformation_ctx='dyf' ) # Resolve choice type with make_struct dyf = ResolveChoice.apply( frame=dyf, choice='make_struct', transformation_ctx='resolvechoice_' + tableName ) # Drop null fields dyf = DropNullFields.apply( frame=dyf, transformation_ctx="dropnullfields_" + tableName ) # Write DynamicFrame to S3 in glueparquet sink = glue_context.getSink( connection_type="s3", path=outputPath, enableUpdateCatalog=True, partitionKeys=partitionKeys ) sink.setFormat("glueparquet") sink.setCatalogInfo( catalogDatabase=databaseName, catalogTableName=tableName[len(tablePrefix):] ) sink.writeFrame(dyf) job.commit()
Note

Seuls deux chemins Amazon S3 peuvent être fournis en entrée pour l'exemple de modèle. La raison en est que les déclencheurs AWS Glue sont limités à invoquer seulement deux actions de crawler.