在 AWS Glue 中使用 Parquet 格式 - AWS 連接詞

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

在 AWS Glue 中使用 Parquet 格式

AWS Glue 會從各來源擷取資料,並將資料寫入以各種資料格式儲存和傳輸的目標。如果您的資料以 Parquet 資料格式存放或傳輸,本文件將向您介紹在 AWS Glue 中使用資料的可用功能。

AWS Glue 支援使用 Parquet 格式。此格式是效能導向、以資料行為基礎的資料格式。如需標準授權單位的格式簡介,請參閱 Apache Parquet Documentation Overview (Apache Parquet 文件概觀)。

您可以使用 AWS Glue 從 Amazon S3 和串流來源讀取 Parquet 檔案,並將 Parquet 檔案寫入 Amazon S3。您可以從 S3 讀取和寫入包含 Parquet 檔案的 bzipgzip 封存檔。您可以在 S3 連線參數 上設定壓縮行為,而不是在本頁討論的組態中設定。

下表顯示支援 Parquet 格式選項的常見 AWS Glue 功能。

讀取 寫入 串流讀取 對小型檔案進行分組 任務書籤
支援 支援 支援 不支援 支援*

* 在 AWS Glue 1.0 或更新版本中支援

範例:從 S3 讀取 Parquet 檔案或資料夾

先決條件:您需要指向希望讀取的 Parquet 檔案或資料夾的 S3 路徑 (s3path)。

組態:在您的函數選項中,指定 format="parquet"。在您的 connection_options 中,使用 paths 索引鍵指定 s3path

您可以在 connection_options 中設定讀取器與 S3 的互動方式。如需詳細資訊,請參閱 AWS Glue 中 ETL 的連線類型和選項:S3 連線參數

您可以在 format_options 中設定讀取器解譯 Parquet 檔案的方式。如需詳細資訊,請參閱 Parquet 組態參考

以下 AWS Glue ETL 指令碼顯示從 S3 讀取 Parquet 檔案或資料夾的流程:

Python

在此範例中,使用 create_dynamic_frame.from_options 方法。

# Example: Read Parquet from S3 from pyspark.context import SparkContext from awsglue.context import GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) spark = glueContext.spark_session dynamicFrame = glueContext.create_dynamic_frame.from_options( connection_type = "s3", connection_options = {"paths": ["s3://s3path/"]}, format = "parquet" )

您也可以在指令碼中使用 DataFrames (pyspark.sql.DataFrame)。

dataFrame = spark.read.parquet("s3://s3path/")
Scala

在此範例中,使用 getSourceWithFormat 方法。

// Example: Read Parquet from S3 import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.{DynamicFrame, GlueContext} import org.apache.spark.SparkContext object GlueApp { def main(sysArgs: Array[String]): Unit = { val spark: SparkContext = new SparkContext() val glueContext: GlueContext = new GlueContext(spark) val dynamicFrame = glueContext.getSourceWithFormat( connectionType="s3", format="parquet", options=JsonOptions("""{"paths": ["s3://s3path"]}""") ).getDynamicFrame() } }

您也可以在指令碼中使用 DataFrames (org.apache.spark.sql.DataFrame)。

spark.read.parquet("s3://s3path/")

範例:將 Parquet 檔案和資料夾寫入 S3

先決條件:您需要初始化 DataFrame (dataFrame) 或 DynamicFrame (dynamicFrame)。您還需要預期的 S3 輸出路徑 s3path

組態:在您的函數選項中,指定 format="parquet"。在您的 connection_options 中,使用 paths 鍵來指定 s3path

您可以在 connection_options 中進一步更改寫入器與 S3 的互動方式。如需詳細資訊,請參閱 AWS Glue 中 ETL 的連線類型和選項:S3 連線參數。您可以在 format_options 中設定操作如何寫入檔案內容。如需詳細資訊,請參閱 Parquet 組態參考

以下 AWS Glue ETL 指令碼顯示將 Parquet 檔案和資料夾寫入 S3 的流程。

我們透過 useGlueParquetWriter 組態鍵為自訂 Parquet 寫入器提供 DynamicFrames 的效能最佳化。若要判斷此寫入器是否適合您的工作負載,請參閱 Glue Parquet 寫入器

Python

在此範例中,使用 write_dynamic_frame.from_options 方法。

# Example: Write Parquet to S3 # Consider whether useGlueParquetWriter is right for your workflow. from pyspark.context import SparkContext from awsglue.context import GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) glueContext.write_dynamic_frame.from_options( frame=dynamicFrame, connection_type="s3", format="parquet", connection_options={ "path": "s3://s3path", }, format_options={ # "useGlueParquetWriter": True, }, )

您也可以在指令碼中使用 DataFrames (pyspark.sql.DataFrame)。

df.write.parquet("s3://s3path/")
Scala

在此範例中,使用 getSinkWithFormat 方法。

// Example: Write Parquet to S3 // Consider whether useGlueParquetWriter is right for your workflow. import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.{DynamicFrame, GlueContext} import org.apache.spark.SparkContext object GlueApp { def main(sysArgs: Array[String]): Unit = { val spark: SparkContext = new SparkContext() val glueContext: GlueContext = new GlueContext(spark) glueContext.getSinkWithFormat( connectionType="s3", options=JsonOptions("""{"path": "s3://s3path"}"""), format="parquet" ).writeDynamicFrame(dynamicFrame) } }

您也可以在指令碼中使用 DataFrames (org.apache.spark.sql.DataFrame)。

df.write.parquet("s3://s3path/")

Parquet 組態參考

您都可以使用下列 format_options,無論 AWS Glue 程式庫在何處指定 format="parquet"

  • useGlueParquetWriter – 指定使用具有 DynamicFrame 工作流程效能最佳化的自訂 Parquet 寫入器。如需用量詳細資訊,請參閱 Glue Parquet 寫入器

    • 類型:布林值,預設:false

  • compression – 指定使用的壓縮轉碼器。值與 org.apache.parquet.hadoop.metadata.CompressionCodecName 完全相容。

    • 類型:列舉文字,預設:"snappy"

    • 值:"uncompressed""snappy""gzip""lzo"

  • blockSize – 指定記憶體中將緩衝處理資料列群組的大小,以位元組為單位。您可以使用該值來調校效能。大小應完全分為多個 MB 單位。

    • 類型:數值,預設:134217728

    • 預設值等於 128 MB。

  • pageSize – 指定分頁的大小,以位元組為單位。您可以使用該值來調校效能。分頁是必須完整讀取才能存取單一記錄之最小單位。

    • 類型:數值,預設:1048576

    • 預設值等於 1 MB。

注意

此外,任何底層的 SparkSQL 程式碼所接受的選項均可透過 connection_options 對應參數傳送。例如,可為 AWS Glue Spark 讀取器設定 mergeSchema 等 Spark 組態以合併所有檔案的結構描述。

使用 AWS Glue Parquet 寫入器以最佳化寫入效能

注意

過去,AWS Glue Parquet 寫入器都透過 glueparquet 格式類型存取。但現在不再提倡這種存取模式。請改用已啟用 useGlueParquetWriterparquet 類型。

AWS Glue Parquet 寫入器具有效能增強功能,可加快 Parquet 檔案寫入速度。傳統的寫入器會在寫入之前計算結構描述。Parquet 格式不會以可供快速擷取的方式存放結構描述,因此這可能需要一些時間。使用 AWS Glue Parquet 寫入器,無需預先計算的結構描述。當資料送達時,寫入器會動態地計算並修改結構描述。

指定 useGlueParquetWriter 時,請注意以下限制:

  • 寫入器僅支援結構描述演變 (例如新增或移除資料行),但不會變更資料行類型,例如使用 ResolveChoice

  • 寫入器不支援寫入空白的 DataFrame,例如,寫入只含結構描述的檔案。透過設定 enableUpdateCatalog=True 與 AWS Glue 資料目錄整合時,嘗試寫入空白的 DataFrame 不會更新資料目錄,而會在資料目錄建立不含結構描述的資料表。

如果您的轉換不需要這些限制,則開啟 AWS Glue Parquet 寫入器會提升效能。