三種 AWS Glue ETL 任務類型,可將資料轉換為 Apache 實木地板 - AWS 方案指引

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

三種 AWS Glue ETL 任務類型,可將資料轉換為 Apache 實木地板

創建者:阿德南阿爾維(AWS),卡爾蒂安·拉馬干德蘭和尼特·戈文達西文(AWS)

環境:PoC 或試點

技術:分析

工作負載:所有其他工作

AWS 服務:AWS AWS Glue

使用 AWS Glue 任務將資料轉換為 Apache

在 Amazon Web Services (AWS) 雲端上,AWS Glue 是全受管的擷取、轉換和載入 (ETL) 服務。AWS Glue 讓您能夠以符合成本效益的方式,將資料分類、清理、豐富資料,以及在各種資料存放區和資料串流之間可靠地移動資料。

此模式在 AWS Glue 中提供不同的任務類型,並使用三種不同的指令碼來示範編寫 ETL 任務。

您可以使用 AWS Glue 在 Python 殼層環境中撰寫 ETL 任務。您也可以在託管的 Apache 星火環境中使用 Python(PySpark)或斯卡拉創建批處理和流 ETL 任務。為了讓您開始編寫 ETL 作業,此模式著重於使用 Python 殼層和 Scala 的批次 ETL 作業。 PySparkPython 殼層作業適用於需要較低運算能力的工作負載。受管理的 Apache Spark 環境適用於需要高運算能力的工作負載。

阿帕奇實木複合地板是建立支持高效的壓縮和編碼方案。它可以加速您的分析工作負載,因為它會以單欄式方式儲存資料。在長期運行中將數據轉換為 Parquet 可以節省您的存儲空間,成本和時間。要了解有關鑲木地板的更多信息,請參閱博客文章 Apache Parquet:如何使用開源柱狀數據格式成為英雄

使用 AWS Glue 任務將資料轉換為 Apache 實木地板的先決條件和限制

先決條件

  • AWS Identity and Access Management (IAM) 角色 (如果您沒有角色,請參閱其他資訊一節)。

使用 AWS Glue 任務將資料轉換為 Apache 木地板的架構

目標技術堆疊

  • AWS Glue

  • Amazon Simple Storage Service (Amazon S3)

  • Apache Parquet

自動化和規模

  • AWS Glue 工作流程支援 ETL 管道的完全自動化。

  • 您可以變更資料處理單位 (DPU) 或 Worker 類型的數目,以水平和垂直縮放。

使用 AWS Glue 任務將資料轉換為 Apache 實木複合地板的工具

AWS 服務

  • Amazon Simple Storage Service (Amazon S3) 是雲端物件儲存服務,可協助您存放、保護和擷取任意數量的資料。

  • AWS Glue 是全受管 ETL 服務,可在各種資料存放區和資料串流之間分類、清理、豐富和移動資料。

其他工具

組態

使用下列設定來設定 AWS Glue ETL 的運算能力。若要降低成本,請在執行此模式中提供的工作負載時使用最小設定。 

  • Python 外殼 — 您可以使用 1 個 DPU 來利用 16 GB 的記憶體,或使用 0.0625 個 DPU 來利用 1 GB 的記憶體。此模式使用 0.0625 DPU,這是 AWS Glue 主控台中的預設值。

  • 適用於 Spark 的 Python 或 Scala — 如果您在主控台中選擇與火花相關的任務類型,AWS Glue 預設會使用 10 個工作程式和 G.1X 工作者類型。此模式使用兩個 Worker,這是允許的最小數目,具有標準 Worker 類型,這是足夠且具有成本效益的。

下表顯示 Apache Spark 環境的不同 AWS AWS Glue 工作者類型。因為 Python 殼層作業不使用 Apache 星火環境來執行 Python,所以它不會包含在資料表中。

標準

G.1X

G.2X

vCPU

4

4

8

記憶體

16 GB

16 GB

32 GB

磁碟空間

50 GB

64 GB

128 GB

每名工人的執行人

2

1

Code

如需此模式中使用的程式碼 (包括 IAM 角色和參數設定),請參閱其他資訊一節。

使用 AWS Glue 任務將資料轉換為 Apache 實木複合地板的史詩

任務 描述 所需技能

將資料上傳到新的或現有的 S3 儲存貯體。

在您的帳戶中建立或使用現有的 S3 儲存貯體。從「附件」區段上傳 sample_data.csv 檔案,並記下 S3 儲存貯體和前置碼的位置。

一般 AWS
任務 描述 所需技能

建立 AWS AWS Glue 任務。

在 AWS Glue 主控台的 ETL 區段下,新增 AWS AWS Glue 任務。選取適當的任務類型、AWS Glue 版本,以及對應的 DPU/ 工作者類型和工作者數目。如需詳細資訊,請參閱「組態」一節。

開發人員、雲端或資料

更改輸入和輸出位置。

複製與 AWS Glue 任務對應的程式碼,然後變更您在上傳資料史詩中記下的輸入和輸出位置。

開發人員、雲端或資料

設定參數。

您可以使用「其他資訊」區段中提供的片段來設定 ETL 工作的參數。AWS Glue 在內部使用四個引數名稱:

  • --conf

  • --debug

  • --mode

  • --JOB_NAME

必須在 AWS Glue 主控台上明確輸入--JOB_NAME參數。選擇 「Job」、「編輯作」、「安全性組態」、「命令檔程式庫」和「工作參數」(選 輸入--JOB_NAME作為鍵並提供一個值。您也可以使用 AWS Command Line Interface (AWS CLI) (AWS CLI) 或 AWS Glue API 來設定此參數。該--JOB_NAME參數由星火使用,並且不需要在 Python 外殼環境作業。

您必須--在每個參數名稱之前加入;否則,程式碼將無法運作。例如,對於程式碼片段,位置參數必須由--input_loc和叫用--output_loc

開發人員、雲端或資料

執行 ETL 工作。

運行您的作業並檢查輸出。請注意原始檔案減少了多少空間。

開發人員、雲端或資料

使用 AWS Glue 任務將資料轉換為 Apache 實木地板的相關資源

參考

教學課程和影片

使用 AWS Glue 任務將資料轉換為 Apache 實木複合地板的其他資訊

IAM role (IAM 角色)

建立 AWS Glue 任務時,您可以使用具有下列程式碼片段中顯示許可的現有 IAM 角色,或使用新角色。

若要建立新角色,請使用下列 YAML 程式碼。

# (c) 2022 Amazon Web Services, Inc. or its affiliates. All Rights Reserved. This AWS Content is provided subject to the terms of the AWS Customer # Agreement available at https://aws.amazon.com/agreement/ or other written agreement between Customer and Amazon Web Services, Inc. AWSTemplateFormatVersion: "2010-09-09" Description: This template will setup IAM role for AWS Glue service. Resources: rGlueRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Principal: Service: - "glue.amazonaws.com" Action: - "sts:AssumeRole" ManagedPolicyArns: - arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole Policies: - PolicyName: !Sub "${AWS::StackName}-s3-limited-read-write-inline-policy" PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: - "s3:PutObject" - "s3:GetObject" Resource: "arn:aws:s3:::*/*" Tags: - Key : "Name" Value : !Sub "${AWS::StackName}" Outputs: oGlueRoleName: Description: AWS Glue IAM role Value: Ref: rGlueRole Export: Name: !Join [ ":", [ !Ref "AWS::StackName", rGlueRole ] ]

AWS AWS Glue Python 殼

Python 代碼使用熊貓和 PyArrow 庫將數據轉換為實木複合地板。熊貓圖書館已經可用。當您執行病毒碼時,會下載程式 PyArrow 庫,因為這是一次性執行。您可以使用 wheel 檔案轉換 PyArrow 為資源庫,並將檔案做為資源庫套件提供。如需有關封裝輪子檔案的詳細資訊,請參閱提供您自己的 Python 程式庫

AWS Glue Python 外殼參數

from awsglue.utils import getResolvedOptions args = getResolvedOptions(sys.argv, ["input_loc", "output_loc"])

AWS Glue Python 殼代碼

from io import BytesIO import pandas as pd import boto3 import os import io import site from importlib import reload from setuptools.command import easy_install install_path = os.environ['GLUE_INSTALLATION'] easy_install.main( ["--install-dir", install_path, "pyarrow"] ) reload(site) import pyarrow input_loc = "bucket-name/prefix/sample_data.csv" output_loc = "bucket-name/prefix/" input_bucket = input_loc.split('/', 1)[0] object_key = input_loc.split('/', 1)[1] output_loc_bucket = output_loc.split('/', 1)[0] output_loc_prefix = output_loc.split('/', 1)[1]  s3 = boto3.client('s3') obj = s3.get_object(Bucket=input_bucket, Key=object_key) df = pd.read_csv(io.BytesIO(obj['Body'].read())) parquet_buffer = BytesIO() s3_resource = boto3.resource('s3') df.to_parquet(parquet_buffer, index=False)  s3_resource.Object(output_loc_bucket, output_loc_prefix +  'data' + '.parquet').put(Body=parquet_buffer.getvalue())

AWS AWS Glue 火花任務與 Python

若要搭配 Python 使用 AWS AWS Glue 星火工作類型,請選擇星火做為任務類型。選擇火花 3.1、Python 3,並改善任務啟動時間 (Glue 3.0 版) 做為 AWS AWS Glue 版本。

AWS Glue Python 參數

from awsglue.utils import getResolvedOptions args = getResolvedOptions(sys.argv, ["JOB_NAME", "input_loc", "output_loc"])

使用 Python 代碼進行 AWS AWS Glue 火花任務

import sys from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.transforms import * from awsglue.dynamicframe import DynamicFrame from awsglue.utils import getResolvedOptions from awsglue.job import Job sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) input_loc = "bucket-name/prefix/sample_data.csv" output_loc = "bucket-name/prefix/" inputDyf = glueContext.create_dynamic_frame_from_options(\     connection_type = "s3", \     connection_options = {          "paths": [input_loc]}, \     format = "csv",     format_options={         "withHeader": True,         "separator": ","     }) outputDF = glueContext.write_dynamic_frame.from_options(\     frame = inputDyf, \     connection_type = "s3", \     connection_options = {"path": output_loc \         }, format = "parquet")    

對於大量壓縮的大型檔案 (例如,1,000 個檔案大約 3 MB),請使用compressionType參數搭配recurse參數來讀取前置碼內可用的所有檔案,如下列程式碼所示。

input_loc = "bucket-name/prefix/" output_loc = "bucket-name/prefix/" inputDyf = glueContext.create_dynamic_frame_from_options( connection_type = "s3", connection_options = {"paths": [input_loc], "compressionType":"gzip","recurse" :"True", }, format = "csv", format_options={"withHeader": True,"separator": ","} )

對於大量壓縮的小型檔案 (例如,1,000 個檔案大約 133 KB),請使用groupFiles參數以compressionType及和recurse參數。groupFiles參數會將小型檔案群組成多個大檔案,而groupSize參數則控制群組為以位元組為單位的指定大小 (例如 1 MB)。下列程式碼片段提供在程式碼中使用這些參數的範例。

input_loc = "bucket-name/prefix/" output_loc = "bucket-name/prefix/" inputDyf = glueContext.create_dynamic_frame_from_options( connection_type = "s3", connection_options = {"paths": [input_loc], "compressionType":"gzip","recurse" :"True", "groupFiles" :"inPartition", "groupSize" :"1048576", }, format = "csv", format_options={"withHeader": True,"separator": ","} )

這些設定可讓 AWS Glue 任務讀取多個檔案 (大或小,無論壓縮或不含壓縮),並以 Parquet 格式將檔案寫入目標中,無論是否有任何變更,都可以在工作節點中進行任何變更。

AWS AWS Glue 火花與斯卡拉的任務

若要搭配 Scala 使用 AWS AWS Glue 星火工作類型,請選擇 S park 作為任務類型,選擇語言Scala。選擇 S park 3.1,斯卡拉 2 改善任務啟動時間 (Glue 版本 3.0) 做為 AWS AWS Glue 版本。為了節省儲存空間,下列 AWS Glue 與 Scala 範例也使用此applyMapping功能來轉換資料類型。

AWS AWS Glue 斯卡拉參數

import com.amazonaws.services.glue.util.GlueArgParser val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME", "inputLoc", "outputLoc").toArray)

AWS AWS Glue 星火任務與斯卡拉代碼

import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.MappingSpec import com.amazonaws.services.glue.DynamicFrame import com.amazonaws.services.glue.errors.CallSite import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object GlueScalaApp {   def main(sysArgs: Array[String]) {          @transient val spark: SparkContext = SparkContext.getOrCreate()     val glueContext: GlueContext = new GlueContext(spark)     val inputLoc = "s3://bucket-name/prefix/sample_data.csv"     val outputLoc = "s3://bucket-name/prefix/"     val readCSV = glueContext.getSource("csv", JsonOptions(Map("paths" -> Set(inputLoc)))).getDynamicFrame()     val applyMapping = readCSV.applyMapping(mappings = Seq(("_c0", "string", "date", "string"), ("_c1", "string", "sales", "long"),     ("_c2", "string", "profit", "double")), caseSensitive = false)     val formatPartition = applyMapping.toDF().coalesce(1)     val dynamicFrame = DynamicFrame(formatPartition, glueContext)     val dataSink = glueContext.getSinkWithFormat(         connectionType = "s3",          options = JsonOptions(Map("path" -> outputLoc )),         transformationContext = "dataSink", format = "parquet").writeDynamicFrame(dynamicFrame)   } }

使用 AWS Glue 任務將資料轉換為 Apache 實木地板的附件

若要存取與此文件相關聯的其他內容,請解壓縮下列檔案:attachment.zip