三種 AWS Glue ETL 作業類型,用於將資料轉換為 Apache 實木複合地板 - AWS Prescriptive Guidance

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

三種 AWS Glue ETL 作業類型,用於將資料轉換為 Apache 實木複合地板

由阿爾維(AWS)創建

環境:PoC 或試驗

技術:分析

工作負載:所有其他工作負載

AWS 服務:AWS Glue

Summary

在 Amazon Web Services vice (AWS) 雲端上,AWS Glue 是全受管的擷取、轉換和載入 (ETL) 服務。AWS Glue 可讓您以經濟實惠的方式,將您的資料進行分類、清理、富集,以及在資料存放區和資料流之間可靠的移動。

此模式在 AWS Glue 中提供不同的工作類型,並使用三個不同的指令碼來示範編寫 ETL 工作。

您可以使用 AWS Glue 在 Python 殼層環境中編寫 ETL 作業。您也可以通過在託管的 Apache 星火環境中使用 Python(PySpark)或斯卡拉創建批處理和流 ETL 作業。為了讓您開始編寫 ETL 作業,這種模式著重於使用 Python 外殼,PySpark 和斯卡拉的批處理 ETL 作業。Python 殼層作業適用於需要較低計算能力的工作負載。受管理的 Apache Spark 環境適用於需要高運算能力的工作負載。

Apache 實木複合地板是建立支持高效的壓縮和編碼方案。它可以加速您的分析工作負載,因為它以柱狀方式儲存資料。將數據轉換為鑲木地板可以在較長的運行中節省您的存儲空間,成本和時間。若要進一步了解 Parquet,請參閱部落格文章Apache Parquet 如何在 Google、Azure 和 Amazon 雲端上使用開放原始碼的柱狀資料格式成為英雄

先決條件和限制

先決條件

  • AWS Identity and Access Management (IAM) 角色

Architecture

目標技術堆疊

  • AWS Glue

  • Amazon Simple Storage Service (Amazon S3)

  • Apache Parquet

自動化和擴展

AWS Glue 工作流程支持 ETL 管道的完全自動化。

您可以變更資料處理單元 (DPU) 或 Worker 類型的數量,以便在水平和垂直方向進行擴展。

Tools

工具

  • Amazon S3— Amazon Simple Storage Service (Amazon S3) 提供網際網路儲存服務。您可以使用 Amazon S3 隨時從 Web 任何地方存放和擷取任意資料量。

  • AWS Glue— AWS Glue 是完全受管的 ETL 服務,可在各種資料存放區和資料流之間分類、清理、豐富和移動資料。

組態

以下是用於配置 AWS Glue ETL 的運算能力的設置。若要降低成本,請在執行此模式中提供的工作負載時使用最低設定。 

  • Python Shell— 您可以使用 1 個 DPU 來使用 16 GB 的記憶體,或使用 0.0625 DPU 來使用 1 GB 的記憶體。此模式使用 0.0625 單位 PU,這是 AWS Glue 主控台中的預設值。

  • 蟒蛇或斯卡拉火花— 如果您在主控台中選擇與 Spark 相關的工作類型,AWS Glue 預設會使用 10 個工作者和 G.1X 工作者類型。這種模式使用兩個工作程式 (這是允許的最小數目) 與標準工作者類型 (這是足夠且符合成本效益)。

下表顯示 Apache Spark 環境的不同 AWS Glue 工作者類型。由於 Python 外殼作業不使用 Apache 星火環境來運行 Python,因此它不包含在表中。

標準G.1XG.2X
vCPU448
Memory16 GB16 GB32 GB
Disk space50 GB64 GB128 GB
Executor per worker21

Code

建立 AWS Glue 任務時,您可以使用附加的 IAM 角色或現有角色。

AWS Glue Python

Python 代碼使用熊貓和 PyArrow 庫將數據轉換為實木複合地板。熊貓圖書館已經可用。在運行模式時會下載 PyArrow 庫,因為它是一次性運行。您可以使用 wheel 文件將 PyArrow 轉換為庫,並將該文件作為庫包提供。如需封裝 wheel 檔案的詳細資訊,請參閱提供您自己的 Python 程式庫

from io import BytesIO import pandas as pd import boto3 import os import io import site from importlib import reload from setuptools.command import easy_install install_path = os.environ['GLUE_INSTALLATION'] easy_install.main( ["--install-dir", install_path, "pyarrow"] ) reload(site) import pyarrow input_loc = "bucket-name/prefix/sample_data.csv" output_loc = "bucket-name/prefix/" input_bucket = input_loc.split('/', 1)[0] object_key = input_loc.split('/', 1)[1] output_loc_bucket = output_loc.split('/', 1)[0] output_loc_prefix = output_loc.split('/', 1)[1]  s3 = boto3.client('s3') obj = s3.get_object(Bucket=input_bucket, Key=object_key) df = pd.read_csv(io.BytesIO(obj['Body'].read())) parquet_buffer = BytesIO() s3_resource = boto3.resource('s3') df.to_parquet(parquet_buffer, index=False)  s3_resource.Object(output_loc_bucket, output_loc_prefix +  'data' + '.parquet').put(Body=parquet_buffer.getvalue())

AWS Glue 火花工作與 Python

若要在 Python 中使用 AWS Glue 火花工作類型,請選擇Spark做為任務類型。選擇火花 2.4,Python 3 與改進的作業啟動時間(Glue 版本 2.0)作為 AWS Glue 版本。

import sys from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.transforms import * from awsglue.dynamicframe import DynamicFrame from awsglue.utils import getResolvedOptions from awsglue.job import Job sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) input_loc = "bucket-name/prefix/sample_data.csv" output_loc = "bucket-name/prefix/" inputDyf = glueContext.create_dynamic_frame_from_options(\     connection_type = "s3", \     connection_options = {          "paths": [input_loc]}, \     format = "csv",     format_options={         "withHeader": True,         "separator": ","     }) outputDF = glueContext.write_dynamic_frame.from_options(\     frame = inputDyf, \     connection_type = "s3", \     connection_options = {"path": output_loc \         }, format = "parquet")    

與 AWS Glue 火花工作

要在 Scala 中使用 AWS Glue 火花作業類型,請選擇Spark做為任務類型。選擇火花 2.4,斯卡拉 2 具有改進的作業啟動時間(Glue 版本 2.0)作為 AWS Glue 版本。為了節省儲存空間,以下含 Scala 示例的 AWS Glue 還使用applyMapping功能來轉換資料類型。

import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.MappingSpec import com.amazonaws.services.glue.DynamicFrame import com.amazonaws.services.glue.errors.CallSite import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object GlueScalaApp {   def main(sysArgs: Array[String]) {          @transient val spark: SparkContext = SparkContext.getOrCreate()     val glueContext: GlueContext = new GlueContext(spark)     val inputLoc = "s3://bucket-name/prefix/sample_data.csv"     val outputLoc = "s3://bucket-name/prefix/"     val readCSV = glueContext.getSource("csv", JsonOptions(Map("paths" -> Set(inputLoc)))).getDynamicFrame()     val applyMapping = readCSV.applyMapping(mappings = Seq(("_c0", "string", "date", "string"), ("_c1", "string", "sales", "long"),     ("_c2", "string", "profit", "double")), caseSensitive = false)     val formatPartition = applyMapping.toDF().coalesce(1)     val dynamicFrame = DynamicFrame(formatPartition, glueContext)     val dataSink = glueContext.getSinkWithFormat(         connectionType = "s3",          options = JsonOptions(Map("path" -> outputLoc )),         transformationContext = "dataSink", format = "parquet").writeDynamicFrame(dynamicFrame)   } }

Epics

任務描述所需技能
將資料上傳至新的或現有的 S3 儲存貯體。

在您的帳戶中,建立或使用現有的 S3 儲存貯體。上傳 sample_data.csv 檔案從附件區段,然後注意 S3 儲存貯體和字首位置。

一般 AWS
任務描述所需技能
建立 AWS Glue 任務。

在 AWS Glue 主控台的 ETL 區段下,新增 AWS Glue 工作。選取適當的工作類型、AWS Glue 版本,以及對應的 DPU/工作者類型和工作者數目。如需詳細資訊,請參閱組態區段。

開發人員、雲端或資料
變更輸入和輸出位置。

對應於您的 Glue 作業的程式碼副本,並變更您在上傳資料史詩般的。

開發人員、雲端或資料
執行 ETL 工作。

執行任務並檢查輸出。請注意,從原始檔案減少了多少空間。

開發人員、雲端或資料

References

教學和影片

其他資訊

參數組態

您可以使用下列程式碼片段來設定 ETL 工作的參數。AWS Glue 在內部使用四個引數名稱: 

  • --conf

  • --debug

  • --mode

  • --JOB_NAME

所以此--JOB_NAME參數必須在 AWS Glue 主控台上明確輸入。選擇任務編輯 Job安全組態、指令碼程式庫和任務參數 (選擇性)。Enter--JOB_NAME作為關鍵字並提供一個值。您也可以使用 AWS Command Line Interface (AWS CLI) 或 AWS Glue API 來設定此參數。所以此--JOB_NAME參數被 Spark 使用,並且在 Python shell 環境作業中不需要。

您必須新增--在每個參數名稱之前; 否則,代碼將無法工作。例如,對於下列程式碼片段,位置參數必須由--input_loc--output_loc

AWS Glue Python

from awsglue.utils import getResolvedOptions args = getResolvedOptions(sys.argv, ["input_loc", "output_loc"])

AWS Glue Python

from awsglue.utils import getResolvedOptions args = getResolvedOptions(sys.argv, ["JOB_NAME", "input_loc", "output_loc"])

AWS Glue Scala

import com.amazonaws.services.glue.util.GlueArgParser val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME", "inputLoc", "outputLoc").toArray)

  

Attachments

attachment.zip