在 AWS Glue 中使用 CSV 格式 - AWS Glue

在 AWS Glue 中使用 CSV 格式

AWS Glue 从源中检索数据,并将数据写入以各种数据格式存储和传输的目标。如果您的数据以 CSV 数据格式存储或传输,本文档将向您介绍供您使用 AWS Glue 中的数据的可用功能。

AWS Glue 支持逗号分隔值(CSV)格式。此格式是一种基于行的最小数据格式。CSV 通常不严格遵守某一标准,但您可以参考 RFC 4180RFC 7111 了解更多信息。

您可以使用 AWS Glue 从 Amazon S3 和流式传输源读取 CSV,以及将 CSV 写入 Amazon S3。您可以读取并写入包含 S3 中的 CSV 文件的 bzipgzip 存档。请在 S3 连接参数 上而非本页中讨论的配置中配置压缩行为。

下表显示了哪些常用 AWS Glue 功能支持 CSV 格式选项。

读取 写入 流式处理读取 对小文件进行分组 作业书签
支持 支持 支持 支持 支持

示例:从 S3 读取 CSV 文件或文件夹

先决条件:您将需要至您想要读取的 CSV 文件或文件夹的 S3 路径(s3path)。

配置:在函数选项中,请指定 format="csv"。在您的 connection_options 中,请使用 paths 键指定 s3path。您可以在 connection_options 中配置读取器与 S3 的交互方式。有关详细信息,请参阅 AWS Glue 中 ETL 的连接类型和选项:S3 连接参数。您可以配置读取器如何解释 format_options 中的 CSV 文件。有关详细信息,请参阅 CSV 配置参考

以下 AWS Glue ETL 脚本显示了从 S3 读取 CSV 文件或文件夹的过程。

我们提供自定义的 CSV 读取器,其中包含通过 optimizePerformance 配置键针对常见工作流进行的性能优化。要确定此读取器是否适合您的工作负载,请参阅 使用向量化 SIMD CSV 读取器优化读取性能

Python

在本示例中,使用 create_dynamic_frame.from_options 方法。

# Example: Read CSV from S3 # For show, we handle a CSV with a header row. Set the withHeader option. # Consider whether optimizePerformance is right for your workflow. from pyspark.context import SparkContext from awsglue.context import GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) spark = glueContext.spark_session dynamicFrame = glueContext.create_dynamic_frame.from_options( connection_type="s3", connection_options={"paths": ["s3://s3path"]}, format="csv", format_options={ "withHeader": True, # "optimizePerformance": True, }, )

您还可以使用脚本(pyspark.sql.DataFrame)中的 DataFrames。

dataFrame = spark.read\ .format("csv")\ .option("header", "true")\ .load("s3://s3path")
Scala

在本示例中,使用 getSourceWithFormat 操作。

// Example: Read CSV from S3 // For show, we handle a CSV with a header row. Set the withHeader option. // Consider whether optimizePerformance is right for your workflow. import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.{DynamicFrame, GlueContext} import org.apache.spark.SparkContext object GlueApp { def main(sysArgs: Array[String]): Unit = { val spark: SparkContext = new SparkContext() val glueContext: GlueContext = new GlueContext(spark) val dynamicFrame = glueContext.getSourceWithFormat( formatOptions=JsonOptions("""{"withHeader": true}"""), connectionType="s3", format="csv", options=JsonOptions("""{"paths": ["s3://s3path"], "recurse": true}""") ).getDynamicFrame() } }

您还可以使用脚本(org.apache.spark.sql.DataFrame)中的 DataFrames。

val dataFrame = spark.read .option("header","true") .format("csv") .load("s3://s3path“)

示例:将 CSV 文件和文件夹写入 S3

先决条件:您将需要一个初始化的 DataFrame(dataFrame)或 DynamicFrame(dynamicFrame)。您还需要预期 S3 输出路径 s3path

配置:在函数选项中,请指定 format="csv"。在您的 connection_options 中,请使用 paths 键指定 s3path。您可以在 connection_options 中配置编写器与 S3 的交互方式。有关详细信息,请参阅 AWS Glue 中 ETL 的连接类型和选项:S3 连接参数。您可以配置自己的操作在 format_options 中写入文件的内容的方式。有关详细信息,请参阅 CSV 配置参考。以下 AWS Glue ETL 脚本显示了将 CSV 文件和文件夹写入 S3 的过程。

Python

在本示例中,使用 write_dynamic_frame.from_options 方法。

# Example: Write CSV to S3 # For show, customize how we write string type values. Set quoteChar to -1 so our values are not quoted. from pyspark.context import SparkContext from awsglue.context import GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) glueContext.write_dynamic_frame.from_options( frame=dynamicFrame, connection_type="s3", connection_options={"path": "s3://s3path"}, format="csv", format_options={ "quoteChar": -1, }, )

您还可以使用脚本(pyspark.sql.DataFrame)中的 DataFrames。

dataFrame.write\ .format("csv")\ .option("quote", None)\ .mode("append")\ .save("s3://s3path")
Scala

在本示例中,请使用 getSinkWithFormat 方法。

// Example: Write CSV to S3 // For show, customize how we write string type values. Set quoteChar to -1 so our values are not quoted. import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.{DynamicFrame, GlueContext} import org.apache.spark.SparkContext object GlueApp { def main(sysArgs: Array[String]): Unit = { val spark: SparkContext = new SparkContext() val glueContext: GlueContext = new GlueContext(spark) glueContext.getSinkWithFormat( connectionType="s3", options=JsonOptions("""{"path": "s3://s3path"}"""), format="csv" ).writeDynamicFrame(dynamicFrame) } }

您还可以使用脚本(org.apache.spark.sql.DataFrame)中的 DataFrames。

dataFrame.write .format("csv") .option("quote", null) .mode("Append") .save("s3://s3path")

CSV 配置参考

您可以在 AWS Glue 库指定 format="csv" 的任何位置使用以下 format_options

  • separator – 指定分隔符。默认值为逗号,但也可以指定任何其他字符。

    • 类型:文本,默认值:","

  • escaper – 指定要用于转义的字符。此选项仅在读取 CSV 文件而非写入时使用。如果启用,则按原样使用紧跟其后的字符,一小组已知的转义符(\n\r\t\0)除外。

    • 类型:文本,默认值:

  • quoteChar – 指定要用于引用的字符。默认值为双引号。将这设置为 -1 可完全关闭引用。

    • 类型:文本,默认值:'"'

  • multiLine – 指定单个记录能否跨越多行。当字段包含带引号的换行符时,会出现此选项。如果有记录跨越多个行,您必须将此选项设置为 True。启用 multiLine 可能会降低性能,因为它在解析时需要更加谨慎的文件拆分。

    • 类型:布尔值,默认值:false

  • withHeader – 指定是否将第一行视为标头。可以在 DynamicFrameReader 类中使用此选项。

    • 类型:布尔值,默认值:false

  • writeHeader – 指定是否将标头写入输出。可以在 DynamicFrameWriter 类中使用此选项。

    • 类型:布尔值,默认值:true

  • skipFirst – 指定是否跳过第一个数据行。

    • 类型:布尔值,默认值:false

  • optimizePerformance – 指定是否使用高级 SIMD CSV 读取器以及基于 Apache Arrow 的列式内存格式。仅适用于 AWS Glue 3.0+。

    • 类型:布尔值,默认值:false

  • strictCheckForQuoting - 在编写 CSV 时,Glue 可能会在其解释为字符串的值中添加引号。这样做是为了防止写出的内容出现模棱两可之处。为了节省决定写入什么的时间,Glue 可能会在某些不需要引号的情况下进行引用。启用严格检查将执行更密集的计算,并且只有在绝对必要时才会引用。仅适用于 AWS Glue 3.0+。

    • 类型:布尔值,默认值:false

使用向量化 SIMD CSV 读取器优化读取性能

AWS Glue 3.0 版添加了经过优化的 CSV 读取器,与基于行的 CSV 读取器相比,它可以显著提高整体任务性能。

优化的读取器:

  • 使用 CPU SIMD 指令从磁盘读取

  • 立即以列格式(Apache Arrow)将记录写入内存

  • 将记录分成几批

这样可以节省日后对记录进行批处理或转换为列格式时的处理时间。例如,更改架构或按列检索数据时。

要使用优化的读取器,请将在 format_options 或表属性中将 "optimizePerformance" 设置为 true

glueContext.create_dynamic_frame.from_options( frame = datasource1, connection_type = "s3", connection_options = {"paths": ["s3://s3path"]}, format = "csv", format_options={ "optimizePerformance": True, "separator": "," }, transformation_ctx = "datasink2")
矢量化 CSV 读取器的限制

请注意向量化 CSV 读取器的以下限制:

  • 它不支持 multiLineescaper 格式选项。它使用默认双引号字符 '"'escaper。设置这些选项后,AWS Glue 会自动回退使用基于行的 CSV 读取器。

  • 它不支持创建具有 ChoiceType 的 DynamicFrame。

  • 它不支持创建具有错误记录的 DynamicFrame。

  • 它不支持读取带多字节字符(如日语或中文字符)的 CSV 文件。