Amazon Redshift は、2025 年 11 月 1 日以降、新しい Python UDF の作成をサポートしなくなります。Python UDF を使用する場合は、その日付より前に UDF を作成してください。既存の Python UDF は引き続き通常どおり機能します。詳細については、ブログ記事
サポートされているデータ型
Amazon Redshift の以下のデータ型は Spark コネクタでサポートされています。Amazon Redshift でサポートされているデータ型の完全なリストについては、「データ型」を参照してください。データ型が下の表にない場合、そのデータ型は Spark コネクタではサポートされていません。
| データ型 | エイリアス |
|---|---|
| SMALLINT | INT2 |
| INTEGER | INT、INT4 |
| BIGINT | INT8 |
| DECIMAL | NUMERIC |
| REAL | FLOAT4 |
| DOUBLE PRECISION | FLOAT8、FLOAT |
| BOOLEAN | BOOL |
| CHAR | CHARACTER、NCHAR、BPCHAR |
| VARCHAR | CHARACTER VARYING、NVARCHAR、TEXT |
| DATE | |
| TIMESTAMP | Timestamp without time zone |
| TIMESTAMPTZ | Timestamp with time zone |
| SUPER | |
| TIME | Time without time zone |
| TIMETZ | Time with time zone |
| VARBYTE | VARBINARY、BINARY VARYING |
複雑なデータ型
Spark コネクタを使用して、Redshift の SUPER データ型列との間で、ArrayType、MapType、StructType のような Spark の複雑なデータ型を読み書きできます。読み取りオペレーションにスキーマを指定すると、列のデータは Spark 内の対応する複合型 (ネストされた型も含む) に変換されます。さらに、autopushdown を有効にすると、ネストされた属性、マップ値、配列インデックスの投影が Redshift にプッシュダウンされるため、データの一部だけにアクセスするときに、ネストされたデータ構造全体をアンロードする必要がなくなります。
コネクタから DataFrames を書き込む場合、MapType 型の列 (StringType を使用)、StructType、または ArrayType は、Redshift SUPER データ型列に書き込まれます。これらのネストされたデータ構造を記述する場合、tempformat パラメータは CSV、CSV GZIP、または PARQUET である必要があります。AVRO を使用すると、例外が発生します。また、StringType 以外のキータイプを持つ MapType データ構造を記述すると、例外が発生します。
StructType
次の例は、構造体を含む SUPER データ型のテーブルを作成する方法を示しています。
create table contains_super (a super);
その後、次の例のようなスキーマを使用して、テーブルの SUPER 列 a から StringType フィールド hello をクエリできます。
import org.apache.spark.sql.types._ val sc = // existing SparkContext val sqlContext = new SQLContext(sc) val schema = StructType(StructField("a", StructType(StructField("hello", StringType) ::Nil)) :: Nil) val helloDF = sqlContext.read .format("io.github.spark_redshift_community.spark.redshift") .option("url", jdbcURL ) .option("tempdir", tempS3Dir) .option("dbtable", "contains_super") .schema(schema) .load().selectExpr("a.hello")
次の例は、構造体を a 列に書き込む方法を示しています。
import org.apache.spark.sql.types._ import org.apache.spark.sql._ val sc = // existing SparkContext val sqlContext = new SQLContext(sc) val schema = StructType(StructField("a", StructType(StructField("hello", StringType) ::Nil)) :: Nil) val data = sc.parallelize(Seq(Row(Row("world")))) val mydf = sqlContext.createDataFrame(data, schema) mydf.write.format("io.github.spark_redshift_community.spark.redshift"). option("url", jdbcUrl). option("dbtable", tableName). option("tempdir", tempS3Dir). option("tempformat", "CSV"). mode(SaveMode.Append).save
MapType
MapType を使用してデータを表す場合は、スキーマ内の MapType データ構造を使用して、マップ内のキーに対応する値を取得します。MapType データ構造のすべてのキーは String 型でなければならず、すべての値は int のように同じ型でなければなりません。
次の例は、列 a のキー hello の値を取得する方法を示しています。
import org.apache.spark.sql.types._ val sc = // existing SparkContext val sqlContext = new SQLContext(sc) val schema = StructType(StructField("a", MapType(StringType, IntegerType))::Nil) val helloDF = sqlContext.read .format("io.github.spark_redshift_community.spark.redshift") .option("url", jdbcURL ) .option("tempdir", tempS3Dir) .option("dbtable", "contains_super") .schema(schema) .load().selectExpr("a['hello']")
ArrayType
列に構造体ではなく配列が含まれている場合は、コネクタを使用して配列の最初の要素をクエリできます。
import org.apache.spark.sql.types._ val sc = // existing SparkContext val sqlContext = new SQLContext(sc) val schema = StructType(StructField("a", ArrayType(IntegerType)):: Nil) val helloDF = sqlContext.read .format("io.github.spark_redshift_community.spark.redshift") .option("url", jdbcURL ) .option("tempdir", tempS3Dir) .option("dbtable", "contains_super") .schema(schema) .load().selectExpr("a[0]")
制限
Spark コネクタで複雑なデータ型を使用する場合は、次の制限があります。
-
ネストされた構造体フィールド名とマップキーはすべて小文字でなければなりません。大文字を含む複雑なフィールド名をクエリする場合は、回避策として、スキーマを省略し、
from_jsonspark 関数を使用して、返された文字列をローカルに変換するよう試みることができます。 -
読み取りまたは書き込みオペレーションで使用されるマップフィールドには、
StringTypeキーのみが必要です。 -
CSV、CSV GZIP、およびPARQUETのみが、複合型を Redshift に書き込むためにサポートされる tempformat 値です。AVROを使用しようとすると、例外が発生します。