在 Glue AWS 中使用 JSON 格式 - AWS Glue

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

在 Glue AWS 中使用 JSON 格式

AWS Glue 從來源擷取資料,並將資料寫入以各種資料格式存放和傳輸的目標。如果您的資料是以 JSON 資料格式存放或傳輸,本文件會向您介紹在 Glue AWS 中使用資料的可用功能。

AWS Glue 支援使用 JSON 格式。此格式表示具有一致形狀但內容靈活、且非基於資料列或資料行的資料結構。JSON 是由多個機構發佈的平行標準所定義的,其中之一是 ECMA-404。如需常用參考來源的格式簡介,請參閱 JSON 簡介

您可以使用 AWS Glue 從 Amazon S3 讀取 JSON 檔案,以及 bzipgzip 壓縮的 JSON 檔案。您可以在 S3 連線參數 上設定壓縮行為,而不是在本頁討論的組態中設定。

讀取 寫入 串流讀取 對小型檔案進行分組 任務書籤
支援 支援 支援 支援 支援

範例:從 S3 讀取 JSON 檔案或資料夾

先決條件:您需要指向想要讀取的 JSON 檔案或資料夾的 S3 路徑 (s3path)。

組態:在您的函數選項中,指定 format="json"。在您的 connection_options 中,使用 paths 索引鍵指定 s3path。您可以在連線選項中進一步更改讀取操作將如何周遊 s3,如需詳細資訊,請諮詢 Amazon S3 連線選項參考。您可以在 format_options 中設定讀取器解譯 JSON 的方式。如需詳細資訊,請參閱 JSON 組態參考

下列 AWS Glue ETL 指令碼顯示從 S3 讀取 JSON 檔案或資料夾的程序:

Python

在此範例中,使用 create_dynamic_frame.from_options 方法。

# Example: Read JSON from S3 # For show, we handle a nested JSON file that we can limit with the JsonPath parameter # For show, we also handle a JSON where a single entry spans multiple lines # Consider whether optimizePerformance is right for your workflow. from pyspark.context import SparkContext from awsglue.context import GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) spark = glueContext.spark_session dynamicFrame = glueContext.create_dynamic_frame.from_options( connection_type="s3", connection_options={"paths": ["s3://s3path"]}, format="json", format_options={ "jsonPath": "$.id", "multiline": True, # "optimizePerformance": True, -> not compatible with jsonPath, multiline } )

您也可以在指令碼中使用 DataFrames (pyspark.sql.DataFrame)。

dataFrame = spark.read\ .option("multiline", "true")\ .json("s3://s3path")
Scala

在此範例中,使用 getSourceWithFormat 操作。

// Example: Read JSON from S3 // For show, we handle a nested JSON file that we can limit with the JsonPath parameter // For show, we also handle a JSON where a single entry spans multiple lines // Consider whether optimizePerformance is right for your workflow. import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.{DynamicFrame, GlueContext} import org.apache.spark.SparkContext object GlueApp { def main(sysArgs: Array[String]): Unit = { val spark: SparkContext = new SparkContext() val glueContext: GlueContext = new GlueContext(spark) val dynamicFrame = glueContext.getSourceWithFormat( formatOptions=JsonOptions("""{"jsonPath": "$.id", "multiline": true, "optimizePerformance":false}"""), connectionType="s3", format="json", options=JsonOptions("""{"paths": ["s3://s3path"], "recurse": true}""") ).getDynamicFrame() } }

您也可以在指令碼中使用 DataFrames (pyspark.sql.DataFrame)。

val dataFrame = spark.read .option("multiline", "true") .json("s3://s3path")

範例:將 JSON 檔案和資料夾寫入 S3

先決條件:您需要初始化 DataFrame (dataFrame) 或 DynamicFrame (dynamicFrame)。您還需要預期的 S3 輸出路徑 s3path

組態:在您的函數選項中,指定 format="json"。在您的 connection_options 中,使用 paths 鍵來指定 s3path。您可以在 connection_options 中進一步更改寫入器與 S3 的互動方式。如需詳細資訊,請參閱 Glue AWS 中 ETL 輸入和輸出的資料格式選項:Amazon S3 連線選項參考。您可以在 format_options 中設定寫入器解譯 JSON 的方式。如需詳細資訊,請參閱 JSON 組態參考

下列 AWS Glue ETL 指令碼顯示從 S3 寫入 JSON 檔案或資料夾的程序:

Python

在此範例中,使用 write_dynamic_frame.from_options 方法。

# Example: Write JSON to S3 from pyspark.context import SparkContext from awsglue.context import GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) glueContext.write_dynamic_frame.from_options( frame=dynamicFrame, connection_type="s3", connection_options={"path": "s3://s3path"}, format="json" )

您也可以在指令碼中使用 DataFrames (pyspark.sql.DataFrame)。

df.write.json("s3://s3path/")
Scala

在此範例中,使用 getSinkWithFormat 方法。

// Example: Write JSON to S3 import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.{DynamicFrame, GlueContext} import org.apache.spark.SparkContext object GlueApp { def main(sysArgs: Array[String]): Unit = { val spark: SparkContext = new SparkContext() val glueContext: GlueContext = new GlueContext(spark) glueContext.getSinkWithFormat( connectionType="s3", options=JsonOptions("""{"path": "s3://s3path"}"""), format="json" ).writeDynamicFrame(dynamicFrame) } }

您也可以在指令碼中使用 DataFrames (pyspark.sql.DataFrame)。

df.write.json("s3://s3path")

Json 組態參考

可以使用下列的 format_options 值搭配 format="json"

  • jsonPathJsonPath 表達式,用以識別要讀入記錄的物件。當檔案包含在外部陣列中的巢狀記錄時,這個方法特別實用。舉例而言,下列 JsonPath 表達式會以 JSON 物件的 id 欄位為目標:

    format="json", format_options={"jsonPath": "$.id"}
  • multiline — 布林值,用以指定單項記錄是否可以跨越多行。當欄位內含引用的新行字元時,可能就會發生這種情況。若有任何記錄跨越多行,請務必將此選項設為 "true"。預設值為 "false",如此在剖析時會更加積極地分割檔案。

  • optimizePerformance — 布林值,指定是否要使用進階 SIMD JSON 讀取器,以及 Apache Arrow 為基礎的直欄式記憶體格式。僅適用於 AWS Glue 3.0。與 multilinejsonPath 不相容。提供其中一個選項會指示 AWS Glue 回到標準讀取器。

  • withSchema — 字串值,以 手動指定 XML 結構描述 中所述的格式指定資料表結構描述。僅在從非目錄連線讀取時搭配 optimizePerformance 使用。

使用向量化 SIMD JSON 讀取器搭配 Apache Arrow 直欄式格式

AWS Glue 3.0 版為 JSON 資料新增向量化讀取器。與標準讀取器相比,它在某些條件下的效能快 2 倍。此讀取器具有使用者在使用前應注意的某些限制, 本區段中給出了這些限制。

要使用最佳化的讀取器,請在 format_options 或資料表屬性中將 "optimizePerformance" 設為 True。除非從目錄中讀取,否則還需要提供 withSchemawithSchema 需要輸入,如 手動指定 XML 結構描述 中所述。

// Read from S3 data source glueContext.create_dynamic_frame.from_options( connection_type = "s3", connection_options = {"paths": ["s3://s3path"]}, format = "json", format_options={ "optimizePerformance": True, "withSchema": SchemaString }) // Read from catalog table glueContext.create_dynamic_frame.from_catalog( database = database, table_name = table, additional_options = { // The vectorized reader for JSON can read your schema from a catalog table property. "optimizePerformance": True, })

如需在 Glue 程式庫中建置 SchemaString AWS 的詳細資訊,請參閱 PySpark 延伸模組類型

向量化 CSV 讀取器的限制

注意下列限制:

  • 不支援具有巢狀物件或陣列值的 JSON 元素。如果提供, AWS Glue 會回到標準讀取器。

  • 必須從目錄或使用 withSchema 提供結構描述。

  • multilinejsonPath 不相容。提供其中一個選項會指示 AWS Glue 回到標準讀取器。

  • 提供不符合輸入結構描述的輸入記錄會導致讀取器失敗。

  • 錯誤記錄將不會建立。

  • 不支援含有多位元組字元 (例如日文或中文字元) 的 JSON 檔案。