AWS Glue で CSV 形式を使用する - AWS Glue

AWS Glue で CSV 形式を使用する

AWS Glue はソースからデータを取得し、さまざまなデータ形式で保存および転送されたターゲットにデータを書き込みます。このドキュメントでは、データが CSV データ形式で保存または転送される場合に、AWS Glue でデータを使用する際に利用できる機能について説明します。

AWS Glue は、カンマ区切り値 (CSV) ファイル形式をサポートしています。この形式は、最小限の行をベースにしたデータ形式です。CSV は厳密に基準に準拠していないことがよくありますが、詳細については RFC 4180 および RFC 7111 を参照してください。

AWS Glue を使用して、Amazon S3 およびストリーミングソースから CSV を読み取ることができ、Amazon S3 に CSV を書き込むこともできます。S3 から、CSV ファイルが含まれる bzip および gzip アーカイブを読み書きすることができます。このページで説明する設定ではなく、S3 接続パラメータ 上で圧縮動作を設定します。

次の表は、CSV 形式オプションをサポートする一般的な AWS Glue 機能を示しています。

読み込み 書き込み ストリーミングの読み取り 小さなファイルのグループ化 ジョブのブックマーク
サポート サポート対象 サポート対象 サポート対象 サポート

例: S3 から CSV ファイルまたはフォルダを読み取る

前提条件: 読み取りたい CSV ファイルまたはフォルダへの S3 パス (s3path) が必要です。

設定: 関数オプションで format="csv" を指定します。connection_options で、paths キーを使用して s3path を指定します。リーダーが S3 とやり取りする方法は、connection_options で設定できます。詳細については、AWS Glue: S3 接続パラメータ の「ETL の接続タイプとオプション」を参照してください。リーダーが CSV ファイルを解釈する方法は、format_options で設定できます。詳細については、「CSV 設定リファレンス」を参照してください。

次の AWS Glue ETL スクリプトは、S3 から CSV ファイルまたはフォルダを読み取るプロセスを示しています。

optimizePerformance 設定キーを使用して、一般的なワークフローのパフォーマンスを最適化するカスタム CSV リーダーを提供します。このリーダーがワークロードに適しているかどうかを判断するには、「ベクトル化された SIMD CSV リーダーで読み取りパフォーマンスを最適化する」を参照してください。

Python

この例では create_dynamic_frame.from_options メソッドを使用します。

# Example: Read CSV from S3 # For show, we handle a CSV with a header row. Set the withHeader option. # Consider whether optimizePerformance is right for your workflow. from pyspark.context import SparkContext from awsglue.context import GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) spark = glueContext.spark_session dynamicFrame = glueContext.create_dynamic_frame.from_options( connection_type="s3", connection_options={"paths": ["s3://s3path"]}, format="csv", format_options={ "withHeader": True, # "optimizePerformance": True, }, )

スクリプト (pyspark.sql.DataFrame) では DataFrame を使用することもできます。

dataFrame = spark.read\ .format("csv")\ .option("header", "true")\ .load("s3://s3path")
Scala

この例では getSourceWithFormat 操作を使用します。

// Example: Read CSV from S3 // For show, we handle a CSV with a header row. Set the withHeader option. // Consider whether optimizePerformance is right for your workflow. import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.{DynamicFrame, GlueContext} import org.apache.spark.SparkContext object GlueApp { def main(sysArgs: Array[String]): Unit = { val spark: SparkContext = new SparkContext() val glueContext: GlueContext = new GlueContext(spark) val dynamicFrame = glueContext.getSourceWithFormat( formatOptions=JsonOptions("""{"withHeader": true}"""), connectionType="s3", format="csv", options=JsonOptions("""{"paths": ["s3://s3path"], "recurse": true}""") ).getDynamicFrame() } }

スクリプト (org.apache.spark.sql.DataFrame) では DataFrame を使用することもできます。

val dataFrame = spark.read .option("header","true") .format("csv") .load("s3://s3path“)

例: CSV ファイルおよびフォルダを S3 に書き込む

前提条件: 初期化された DataFrame (dataFrame) または DynamicFrame (dynamicFrame) が必要です。また、予想される S3 出力パスである s3path も必要になります。

設定: 関数オプションで format="csv" を指定します。connection_options で、paths キーを使用して s3path を指定します。ライターが S3 とやり取りする方法は、connection_options で設定できます。詳細については、AWS Glue: S3 接続パラメータ の「ETL の接続タイプとオプション」を参照してください。操作によってファイルの内容が format_options にどのように書き込まれるかを設定できます。詳細については、「CSV 設定リファレンス」を参照してください。次の AWS Glue ETL スクリプトは、S3 に CSV ファイルとフォルダを書き込むプロセスを示しています。

Python

この例では write_dynamic_frame.from_options メソッドを使用します。

# Example: Write CSV to S3 # For show, customize how we write string type values. Set quoteChar to -1 so our values are not quoted. from pyspark.context import SparkContext from awsglue.context import GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) glueContext.write_dynamic_frame.from_options( frame=dynamicFrame, connection_type="s3", connection_options={"path": "s3://s3path"}, format="csv", format_options={ "quoteChar": -1, }, )

スクリプト (pyspark.sql.DataFrame) では DataFrame を使用することもできます。

dataFrame.write\ .format("csv")\ .option("quote", None)\ .mode("append")\ .save("s3://s3path")
Scala

この例では getSinkWithFormat メソッドを使用します。

// Example: Write CSV to S3 // For show, customize how we write string type values. Set quoteChar to -1 so our values are not quoted. import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.{DynamicFrame, GlueContext} import org.apache.spark.SparkContext object GlueApp { def main(sysArgs: Array[String]): Unit = { val spark: SparkContext = new SparkContext() val glueContext: GlueContext = new GlueContext(spark) glueContext.getSinkWithFormat( connectionType="s3", options=JsonOptions("""{"path": "s3://s3path"}"""), format="csv" ).writeDynamicFrame(dynamicFrame) } }

スクリプト (org.apache.spark.sql.DataFrame) では DataFrame を使用することもできます。

dataFrame.write .format("csv") .option("quote", null) .mode("Append") .save("s3://s3path")

CSV 設定リファレンス

AWS Glue ライブラリが format="csv" を指定している場合には、以下の format_options を使用することができます。

  • separator - 区切り文字を指定します。デフォルトはカンマですが、他の任意の文字を指定できます。

    • タイプ: テキスト、デフォルト: ","

  • escaper - エスケープに使用する文字を指定します。このオプションは、CSV ファイルを書き込む場合ではなく、読み取る場合にのみ使用します。有効にすると、直後の文字はそのまま使用されます。ただし、よく知られている小さいエスケープセット (\n\r\t\0) を除きます。

    • タイプ: テキスト、デフォルト: なし

  • quoteChar - 引用に使用する文字を指定します。デフォルト値は二重引用符です。これに -1 を設定すると、全体的に引用が無効になります。

    • タイプ: テキスト、デフォルト: '"'

  • multiLine - 単一のレコードが複数行にまたがることができるかどうかを指定します。これが発生するのは、フィールドに引用符で囲まれた改行文字がある場合などです。複数行にまたがるレコードがある場合は、このオプションを True に設定する必要があります。multiLine を有効にすると、解析の際にファイル分割をより慎重に行う必要があるため、パフォーマンスが低下する可能性があります。

    • タイプ: ブール値、デフォルト: false

  • withHeader - 最初の行をヘッダーとして扱うかどうかを指定します。このオプションは DynamicFrameReader クラスで使用できます。

    • タイプ: ブール値、デフォルト: false

  • writeHeader - 出力にヘッダーを書き込むかどうかを指定します。このオプションは DynamicFrameWriter クラスで使用できます。

    • タイプ: ブール値、デフォルト: true

  • skipFirst - 最初のデータ行をスキップするかどうかを指定します。

    • タイプ: ブール値、デフォルト: false

  • optimizePerformance - 高度な SIMD CSV リーダーで、Apache Arrow ベースの列指向メモリ形式を使用するかどうかを指定します。これは、AWS Glue 3.0 以降でのみ使用できます。

    • タイプ: ブール値、デフォルト: false

  • strictCheckForQuoting – CSV を作成する際、Glue は文字列と解釈した値に引用符を追加することがあります。これは、書き出される内容があいまいにならないようにするためです。Glue は、書き出す内容を決める時間を節約するために、特定の状況では引用符が不要でも引用符を追加することがあります。厳密なチェックを有効にすると、より強力な計算が行われ、厳密に必要な場合にのみ引用符が追加されます。これは、AWS Glue 3.0 以降でのみ使用できます。

    • タイプ: ブール値、デフォルト: false

ベクトル化された SIMD CSV リーダーで読み取りパフォーマンスを最適化する

AWS Glue バージョン 3.0 では、行ベースの CSV リーダーと比較して、全体的なジョブパフォーマンスを大幅に高速化できる最適化された CSV リーダーが追加されています。

最適化されたリーダーでは:

  • CPU SIMD 命令を使用してディスクから読み取ります

  • レコードを列形式 (Apache Arrow) で即座にメモリに書き込みます

  • レコードをバッチに分割します

これにより、レコードがバッチ処理されるときや、後で列形式に変換されるときの処理時間が節約されます。一例としては、スキーマを変更したり、列ごとにデータを取得したりする場合などが挙げられます。

最適化されたリーダーを使用するには、format_options またはテーブルプロパティで "optimizePerformance"true に設定します。

glueContext.create_dynamic_frame.from_options( frame = datasource1, connection_type = "s3", connection_options = {"paths": ["s3://s3path"]}, format = "csv", format_options={ "optimizePerformance": True, "separator": "," }, transformation_ctx = "datasink2")
ベクトル化された CSV リーダーでの制限事項

ベクトル化された CSV リーダーには、次の制限があるので注意が必要です。

  • multiLine および escaper 形式オプションはサポートされません。デフォルトの escaper として二重引用符文字 '"' を使用します。これらのオプションを設定すると、AWS Glue は自動的にフォールバックし、行ベースの CSV リーダーを使用するようになります。

  • ChoiceType の DynamicFrame の作成はサポートされません。

  • error records の DynamicFrame の作成はサポートされません。

  • 日本語や中国語など、マルチバイト文字を含む CSV ファイルからの読み取りはサポートされません。