範例藍圖專案 - AWS Glue

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

範例藍圖專案

資料格式轉換是頻繁的擷取、轉換和載入 (ETL) 使用案例。在一般的分析工作負載中,以欄為基礎的檔案格式 (例如 Parquet 或 ORC) 優先於 CSV 或 JSON 等文字格式。此範例藍圖可讓您將資料從 CSV/JSON 等格式轉換為 Parquet,以供 Amazon S3 上的檔案使用。

此藍圖會取得藍圖參數定義的 S3 路徑清單,將資料轉換為 Prquet 格式,並將其寫入另一個藍圖參數指定的 S3 位置。配置指令碼會為每個路徑建立爬蟲程式和任務。配置指令碼還會將 Conversion.py 中的 ETL 指令碼上傳至另一個藍圖參數指定的 S3 儲存貯體。然後,配置指令碼會將上傳的指令碼指定為每個任務的 ETL 指令碼。專案的 ZIP 封存包含配置指令碼、ETL 指令碼和藍圖組態檔。

如需藍圖範例專案的相關資訊,請參閱藍圖範例

以下是檔案 Layout.py 中的配置指令碼。

from awsglue.blueprint.workflow import * from awsglue.blueprint.job import * from awsglue.blueprint.crawler import * import boto3 s3_client = boto3.client('s3') # Ingesting all the S3 paths as Glue table in parquet format def generate_layout(user_params, system_params): #Always give the full path for the file with open("ConversionBlueprint/Conversion.py", "rb") as f: s3_client.upload_fileobj(f, user_params['ScriptsBucket'], "Conversion.py") etlScriptLocation = "s3://{}/Conversion.py".format(user_params['ScriptsBucket']) crawlers = [] jobs = [] workflowName = user_params['WorkflowName'] for path in user_params['S3Paths']: tablePrefix = "source_" crawler = Crawler(Name="{}_crawler".format(workflowName), Role=user_params['PassRole'], DatabaseName=user_params['TargetDatabase'], TablePrefix=tablePrefix, Targets= {"S3Targets": [{"Path": path}]}) crawlers.append(crawler) transform_job = Job(Name="{}_transform_job".format(workflowName), Command={"Name": "glueetl", "ScriptLocation": etlScriptLocation, "PythonVersion": "3"}, Role=user_params['PassRole'], DefaultArguments={"--database_name": user_params['TargetDatabase'], "--table_prefix": tablePrefix, "--region_name": system_params['region'], "--output_path": user_params['TargetS3Location']}, DependsOn={crawler: "SUCCEEDED"}, WaitForDependencies="AND") jobs.append(transform_job) conversion_workflow = Workflow(Name=workflowName, Entities=Entities(Jobs=jobs, Crawlers=crawlers)) return conversion_workflow

以下是對應的藍圖組態檔 blueprint.cfg

{ "layoutGenerator": "ConversionBlueprint.Layout.generate_layout", "parameterSpec" : { "WorkflowName" : { "type": "String", "collection": false, "description": "Name for the workflow." }, "S3Paths" : { "type": "S3Uri", "collection": true, "description": "List of Amazon S3 paths for data ingestion." }, "PassRole" : { "type": "IAMRoleName", "collection": false, "description": "Choose an IAM role to be used in running the job/crawler" }, "TargetDatabase": { "type": "String", "collection" : false, "description": "Choose a database in the Data Catalog." }, "TargetS3Location": { "type": "S3Uri", "collection" : false, "description": "Choose an Amazon S3 output path: ex:s3://<target_path>/." }, "ScriptsBucket": { "type": "S3Bucket", "collection": false, "description": "Provide an S3 bucket name(in the same AWS Region) to store the scripts." } } }

檔案 Conversion.py 中的以下指令碼是上傳的 ETL 指令碼。請注意,它在轉換過程中會保留分割結構。

import sys from pyspark.sql.functions import * from pyspark.context import SparkContext from awsglue.transforms import * from awsglue.context import GlueContext from awsglue.job import Job from awsglue.utils import getResolvedOptions import boto3 args = getResolvedOptions(sys.argv, [ 'JOB_NAME', 'region_name', 'database_name', 'table_prefix', 'output_path']) databaseName = args['database_name'] tablePrefix = args['table_prefix'] outputPath = args['output_path'] glue = boto3.client('glue', region_name=args['region_name']) glue_context = GlueContext(SparkContext.getOrCreate()) spark = glue_context.spark_session job = Job(glue_context) job.init(args['JOB_NAME'], args) def get_tables(database_name, table_prefix): tables = [] paginator = glue.get_paginator('get_tables') for page in paginator.paginate(DatabaseName=database_name, Expression=table_prefix+"*"): tables.extend(page['TableList']) return tables for table in get_tables(databaseName, tablePrefix): tableName = table['Name'] partitionList = table['PartitionKeys'] partitionKeys = [] for partition in partitionList: partitionKeys.append(partition['Name']) # Create DynamicFrame from Catalog dyf = glue_context.create_dynamic_frame.from_catalog( name_space=databaseName, table_name=tableName, additional_options={ 'useS3ListImplementation': True }, transformation_ctx='dyf' ) # Resolve choice type with make_struct dyf = ResolveChoice.apply( frame=dyf, choice='make_struct', transformation_ctx='resolvechoice_' + tableName ) # Drop null fields dyf = DropNullFields.apply( frame=dyf, transformation_ctx="dropnullfields_" + tableName ) # Write DynamicFrame to S3 in glueparquet sink = glue_context.getSink( connection_type="s3", path=outputPath, enableUpdateCatalog=True, partitionKeys=partitionKeys ) sink.setFormat("glueparquet") sink.setCatalogInfo( catalogDatabase=databaseName, catalogTableName=tableName[len(tablePrefix):] ) sink.writeFrame(dyf) job.commit()
注意

只能提供兩個 Amazon S3 路徑做為範例藍圖的輸入。這是因為 AWS Glue 觸發程序僅限於叫用兩個爬蟲程式動作。