AWS Glue for Spark での ETL の接続タイプとオプション
AWS Glue for Spark では、PySpark と Scala のさまざまなメソッドと変換で、connectionType
パラメータを使用しながら接続タイプを指定します。connectionOptions
または options
パラメータを使用して接続オプションを指定します。
connectionType
パラメータには、次の表に示す値を指定できます。各タイプの関連する connectionOptions
(または options
) パラメータ値については、以下のセクションで説明します。特に明記されていない限り、パラメータは、接続がソースまたはシンクとして使用されるときに適用されます。
接続オプションの設定と使用方法を示すサンプルコードについては、「各接続タイプのホームページ」を参照してください。
connectionType |
接続先 |
---|---|
dynamodb | Amazon DynamoDB データベース |
kinesis | Amazon Kinesis Data Streams |
s3 | Simple Storage Service (Amazon S3) |
documentdb | Amazon DocumentDB (MongoDB 互換) データベース |
opensearch | Amazon OpenSearch Service。 |
Redshift | Amazon Redshift |
kafka | Kafka |
azurecosmos | Azure Cosmos for NoSQL。 |
azuresql | Azure SQL。 |
bigquery | Google BigQuery。 |
mongodb | MongoDB |
sqlserver | Microsoft SQL Server データベース (「JDBC 接続」を参照) |
mysql | MySQL |
oracle | Oracle |
postgresql | PostgreSQL |
saphana | SAP HANA。 |
snowflake | Snowflake |
teradata | Teradata Vantage。 |
vertica | Vertica。 |
custom.* | Spark、Athena、または JDBC データストア (「カスタムと AWS Marketplace での、connectionType の値」 を参照 |
marketplace.* | Spark、Athena、または JDBC データストア (「カスタムと AWS Marketplace での、connectionType の値」を参照) |
AWS Glue 5.0 for Spark の ETL の DataFrame オプション
DataFrame は、名前の付いた列にまとめられた、テーブルに似たデータセットで、機能スタイル (マップ/リデュース/フィルター/その他) 操作と SQL 操作 (選択、プロジェクト、集計) をサポートしています。
Glue でサポートされているデータソースの DataFrame を作成するには、以下が必要です。
データソースコネクタ
ClassName
データソース接続
Options
同様に、Glue でサポートされているデータシンクに DataFrame を書き込むときも、同じものが必要です。
データシンクコネクタ
ClassName
データシンク接続
Options
ジョブのブックマークなどの AWS Glue 機能や、connectionName
などの DynamicFrame オプションは DataFrame ではサポートされていないことに注意してください。DataFrame とサポートされているオペレーションの詳細については、DataFrame
コネクタ ClassName の指定
データソース/シンクの ClassName
を指定するには、.format
オプションを使用して、データソース/シンクを定義するための対応するコネクタ ClassName
を指定します。
JDBC コネクタ
JDBC コネクタの場合は、.format
オプションの値として jdbc
を指定し、driver
オプションで JDBC ドライバー ClassName
を指定します。
df = spark.read.format("jdbc").option("driver", "<DATA SOURCE JDBC DRIVER CLASSNAME>")... df.write.format("jdbc").option("driver", "<DATA SINK JDBC DRIVER CLASSNAME>")...
次の表は、AWS Glue for DataFrames でサポートされているデータソースの JDBC ドライバー ClassName
の一覧です。
データソース | ドライバーの ClassName |
---|---|
PostgreSQL | org.postgresql.Driver |
Oracle | oracle.jdbc.driver.OracleDriver |
SQLServer | com.microsoft.sqlserver.jdbc.SQLServerDriver |
MySQL | com.mysql.jdbc.Driver |
SAPHana | com.sap.db.jdbc.Driver |
Teradata | com.teradata.jdbc.TeraDriver |
スパークコネクタ
Spark コネクタの場合は、コネクタの ClassName
を .format
オプションの値として指定します。
df = spark.read.format("<DATA SOURCE CONNECTOR CLASSNAME>")... df.write.format("<DATA SINK CONNECTOR CLASSNAME>")...
次の表は、AWS Glue for DataFrames でサポートされているデータソースの Spark コネクタ ClassName
の一覧です。
データソース | ClassName |
---|---|
MongoDB/DocumentDB | glue.spark.mongodb |
Redshift | io.github.spark_redshift_community.spark.redshift |
AzureCosmos | cosmos.oltp |
AzureSQL | com.microsoft.sqlserver.jdbc.spark |
BigQuery | com.google.cloud.spark.bigquery |
OpenSearch | org.opensearch.spark.sql |
Snowflake | net.snowflake.spark.snowflake |
Vertica | com.vertica.spark.datasource.VerticaSource |
接続オプションの指定
データソース/シンクへの接続の Options
を指定するには、.option(<KEY>, <VALUE>)
を使用して個々のオプションを指定するか、.options(<MAP>)
を使用してキーと値のマップとして複数のオプションを指定します。
各データソース/シンクは、接続 Options
の独自のセットをサポートします。利用可能な Options
の詳細については、次の表に記載されている特定のデータソース/シンクの Spark コネクタの公開ドキュメントを参照してください。
例
次の例では、PostgreSQL から読み取り、SnowFlake に書き込みます。
Python
例:
from awsglue.context import GlueContext from pyspark.sql import SparkSession spark = SparkSession.builder.getOrCreate() dataSourceClassName = "jdbc" dataSourceOptions = { "driver": "org.postgresql.Driver", "url": "<url>", "user": "<user>", "password": "<password>", "dbtable": "<dbtable>", } dataframe = spark.read.format(className).options(**options).load() dataSinkClassName = "net.snowflake.spark.snowflake" dataSinkOptions = { "sfUrl": "<url>", "sfUsername": "<username>", "sfPassword": "<password>", "sfDatabase" -> "<database>", "sfSchema" -> "<schema>", "sfWarehouse" -> "<warehouse>" } dataframe.write.format(dataSinkClassName).options(**dataSinkOptions).save()
Scala
例:
import org.apache.spark.sql.SparkSession val spark = SparkSession.builder().getOrCreate() val dataSourceClassName = "jdbc" val dataSourceOptions = Map( "driver" -> "org.postgresql.Driver", "url" -> "<url>", "user" -> "<user>", "password" -> "<password>", "dbtable" -> "<dbtable>" ) val dataframe = spark.read.format(dataSourceClassName).options(dataSourceOptions).load() val dataSinkClassName = "net.snowflake.spark.snowflake" val dataSinkOptions = Map( "sfUrl" -> "<url>", "sfUsername" -> "<username>", "sfPassword" -> "<password>", "sfDatabase" -> "<database>", "sfSchema" -> "<schema>", "sfWarehouse" -> "<warehouse>" ) dataframe.write.format(dataSinkClassName).options(dataSinkOptions).save()
カスタムと AWS Marketplace での、connectionType の値
これには以下が含まれます。
-
"connectionType": "marketplace.athena"
: Amazon Athena データストアへの接続を指定します。この接続では、AWS Marketplace が提供するコネクタを使用します。 -
"connectionType": "marketplace.spark"
: Apache Spark データストアへの接続を指定します。この接続では、AWS Marketplace が提供するコネクタを使用します。 -
"connectionType": "marketplace.jdbc"
: JDBC データストアへの接続を指定します。この接続では、AWS Marketplace が提供するコネクタを使用します。 -
"connectionType": "custom.athena"
: Amazon Athena データストアへの接続を指定します。接続には、AWS Glue Studio にアップロードしたカスタムコネクタを使用します。 -
"connectionType": "custom.spark"
: Apache Spark データストアへの接続を指定します。接続には、AWS Glue Studio にアップロードしたカスタムコネクタを使用します。 -
"connectionType": "custom.jdbc"
: JDBC データストアへの接続を指定します。接続には、AWS Glue Studio にアップロードしたカスタムコネクタを使用します。
custom.jdbc または marketplace.jdbc 型での接続オプション
-
className
– (必須) ドライバクラス名を示す文字列。 -
connectionName
– (必須) コネクタに関連付けられている接続の名前を示す文字列。 -
url
– (必須) データソースへの接続を構築するために使用される、プレースホルダを含む JDBC URL (${}
) を示す文字列。プレースホルダー${secretKey}
は、AWS Secrets Manager 内にある同じ名前のシークレットにより置き換えられます。URL の構築の詳細については、データストアのドキュメントを参照してください。 -
secretId
またはuser/password
– (必須) URL の認証情報を取得するために使用される文字列。 -
dbTable
またはquery
– (必須) データを取得するテーブルまたは SQL クエリを示す文字列。dbTable
またはquery
を指定できます。両方を指定することはできません。 -
partitionColumn
– (オプション) パーティション化に使用される整数カラムの名前を示す文字列。このオプションは、lowerBound
、upperBound
、およびnumPartitions
に含まれている場合にのみ機能します。このオプションの機能は、Spark SQL JDBC リーダーのものと同様です。詳細については、Apache Spark SQL, DataFrames and Datasets Guide の「JDBC To Other Databases」を参照してください。 lowerBound
およびupperBound
値は、パーティションのストライドを決定するために使用されます (テーブル内の行のフィルタリングには使用しません)。返されるテーブル内のすべての行は、パーティション化されています。注記
テーブル名の代わりにクエリを使用する場合は、指定されたパーティショニング条件でクエリが動作することを確認する必要があります。例:
-
"SELECT col1 FROM table1"
の形式のクエリでパーティション列を使用する場合、末尾にWHERE
句を追加してそのクエリをテストします。 -
クエリ形式が
SELECT col1 FROM table1 WHERE col2=val"
の場合は、WHERE
句をAND
とパーティション列を使用する式で拡張することで、そのクエリをテストします。
-
-
lowerBound
– パーティションストライドを決定するために使用されるpartitionColumn
の最小値を示す整数 (オプション)。 -
upperBound
– パーティションストライドを決定するために使用されるpartitionColumn
の最大値を示す整数 (オプション)。 -
numPartitions
– パーティション数を示す整数 (オプション)。この値は、(範囲に含まれる)lowerBound
と (範囲に含まれない)upperBound
とともに使用され、partitionColumn
の分割で使用するために生成されたWHERE
句の式のための、パーティションストライドを形成します。重要
パーティションが多すぎると、外部データベースシステムで問題が発生する可能性があるため、パーティションの数には注意を払ってください。
-
filterPredicate
– ソースからのデータをフィルタリングする、追加の条件句を示す文字列 (オプション)。例:BillingCity='Mountain View'
table 名の代わりに query を使用した場合は、指定された
filterPredicate
でクエリが動作することを確認します。例:-
クエリの形式が
"SELECT col1 FROM table1"
の場合は、フィルタ述語を使用するクエリの末尾にWHERE
句を追加して、そのクエリをテストします。 -
クエリ形式が
"SELECT col1 FROM table1 WHERE col2=val"
の場合は、WHERE
句をAND
およびフィルター述語を使用する式で拡張して、そのクエリをテストします。
-
-
dataTypeMapping
– (ディクショナリ、オプション) JDBC データ型 から Glue データ型に対するマッピングを構築する、カスタムのデータ型マッピング。例えば、オプション"dataTypeMapping":{"FLOAT":"STRING"}
では、ドライバーのResultSet.getString()
メソッドを呼び出すことで、JDBC タイプFLOAT
のデータフィールドが JavaString
型にマッピングされ、それを使用して AWS Glue レコードが構築されます。ResultSet
オブジェクトは各ドライバによって実装されるため、その動作は使用するドライバにより決定されます。ドライバによる変換の実行方法については、JDBC ドライバのドキュメントを参照してください。 -
AWS Glue で現在サポートされているデータ型は次のとおりです。
-
DATE
-
STRING
-
TIMESTAMP
-
INT
-
FLOAT
-
LONG
-
BIGDECIMAL
-
BYTE
-
SHORT
-
DOUBLE
JDBC データ型としては、Java8 java.sql.types
がサポートされています。 デフォルトの (JDBC から AWS Glue への) データ型マッピングは以下のとおりです。
-
DATE -> DATE
-
VARCHAR -> STRING
-
CHAR -> STRING
-
LONGNVARCHAR -> STRING
-
TIMESTAMP -> TIMESTAMP
-
INTEGER -> INT
-
FLOAT -> FLOAT
-
REAL -> FLOAT
-
BIT -> BOOLEAN
-
BOOLEAN -> BOOLEAN
-
BIGINT -> LONG
-
DECIMAL -> BIGDECIMAL
-
NUMERIC -> BIGDECIMAL
-
TINYINT -> SHORT
-
SMALLINT -> SHORT
-
DOUBLE -> DOUBLE
カスタムのデータタイプマッピングで
dataTypeMapping
オプションを使用すると、デフォルトのデータ型マッピングをオーバーライドできます。この影響を受けるのは、dataTypeMapping
オプションでリストされた JDBC データ型のみです。他のすべての JDBC データ型に対しては、デフォルトのマッピングが使用されます。必要に応じて、別の JDBC データ型のマッピングを追加することも可能です。デフォルトまたはカスタムのマッピングのいずれにも JDBC データ型が含まれていない場合、データ型はデフォルトで AWS GlueSTRING
データ型に変換されます。 -
次の Python コード例は、AWS Marketplace のJDBC ドライバーを使用して、JDBC データベースからの読み取りを実行する方法を示しています。ここでは、データベースからの読み取りと、S3 ロケーションへの書き込みの方法を知ることができます。
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [connection_type = "marketplace.jdbc", connection_options = {"dataTypeMapping":{"INTEGER":"STRING"},"upperBound":"200","query":"select id, name, department from department where id < 200","numPartitions":"4", "partitionColumn":"id","lowerBound":"0","connectionName":"test-connection-jdbc"}, transformation_ctx = "DataSource0"] ## @return: DataSource0 ## @inputs: [] DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.jdbc", connection_options = {"dataTypeMapping":{"INTEGER":"STRING"}, "upperBound":"200","query":"select id, name, department from department where id < 200","numPartitions":"4","partitionColumn":"id","lowerBound":"0", "connectionName":"test-connection-jdbc"}, transformation_ctx = "DataSource0") ## @type: ApplyMapping ## @args: [mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0"] ## @return: Transform0 ## @inputs: [frame = DataSource0] Transform0 = ApplyMapping.apply(frame = DataSource0, mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0") ## @type: DataSink ## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3://
<S3 path>
/", "partitionKeys": []}, transformation_ctx = "DataSink0"] ## @return: DataSink0 ## @inputs: [frame = Transform0] DataSink0 = glueContext.write_dynamic_frame.from_options(frame = Transform0, connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>
/", "partitionKeys": []}, transformation_ctx = "DataSink0") job.commit()
custom.athena または marketplace.athena タイプ用の接続オプション
-
className
– (必須) ドライバクラス名を示す文字列。Athena-CloudWatch コネクタを使用している場合、このパラメータ値はクラス名にプレフィックスされます (例:"com.amazonaws.athena.connectors"
)。Athena-CloudWatch コネクタは、メタデータハンドラーとレコードハンドラーの 2 つのクラスで構成されています。共通のプレフィックスを指定することで、API がそのプレフィックスに基づいた適切なクラスをロードします。 -
tableName
– (必須) 読み込む CloudWatch ログストリームの名前を示す文字列。このコードスニペットでは、ビューに特別な名前all_log_streams
を使用しています。この場合、返された動的データフレームには、ロググループ内のすべてのログストリームからのデータが含まれます。 -
schemaName
– (必須) 読み取りのソースとなる CloudWatch ロググループの名前を示す文字列。例えば、/aws-glue/jobs/output
と指定します。 -
connectionName
– (必須) コネクタに関連付けられている接続の名前を示す文字列。
このコネクタの追加オプションについては、GitHub の Amazon Athena CloudWatch Connector README
次の Python コード例は、AWS Marketplace コネクタを使用しながら、Athena データストアからの読み取りを実行する方法を示しています。こここでは、Athena からの読み取りと、S3 ロケーションへの書き込みを行う方法を知ることができます。
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [connection_type = "marketplace.athena", connection_options = {"tableName":"all_log_streams","schemaName":"/aws-glue/jobs/output", "connectionName":"test-connection-athena"}, transformation_ctx = "DataSource0"] ## @return: DataSource0 ## @inputs: [] DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.athena", connection_options = {"tableName":"all_log_streams",, "schemaName":"/aws-glue/jobs/output","connectionName": "test-connection-athena"}, transformation_ctx = "DataSource0") ## @type: ApplyMapping ## @args: [mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0"] ## @return: Transform0 ## @inputs: [frame = DataSource0] Transform0 = ApplyMapping.apply(frame = DataSource0, mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0") ## @type: DataSink ## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3://
<S3 path>
/", "partitionKeys": []}, transformation_ctx = "DataSink0"] ## @return: DataSink0 ## @inputs: [frame = Transform0] DataSink0 = glueContext.write_dynamic_frame.from_options(frame = Transform0, connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>
/", "partitionKeys": []}, transformation_ctx = "DataSink0") job.commit()
custom.spark または marketplace.spark タイプ用の接続オプション
-
className
– (必須) コネクタのクラス名を支援す文字列。 -
secretId
– (オプション) コネクタ接続の認証情報を取得するために使用される文字列。 -
connectionName
– (必須) コネクタに関連付けられている接続の名前を示す文字列。 -
その他のオプションは、データストアによって異なります。例えば、Elasticsearch for Apache Hadoop
ドキュメントの説明にあるように、OpenSearch の設定オプションは「 es
」でプレフィックスされます。Spark から Snowflake への接続では、Connecting to Snowflake ガイドの「Using the Spark Connector」で説明されているように、 sfUser
およびsfPassword
のオプションを使用します。
次の Python コード例に、marketplace.spark
接続を使用して、OpenSearch のデータストアからの読み取りを実行する方法を示します。
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [connection_type = "marketplace.spark", connection_options = {"path":"test", "es.nodes.wan.only":"true","es.nodes":"https://
<AWS endpoint>
", "connectionName":"test-spark-es","es.port":"443"}, transformation_ctx = "DataSource0"] ## @return: DataSource0 ## @inputs: [] DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.spark", connection_options = {"path":"test","es.nodes.wan.only": "true","es.nodes":"https://<AWS endpoint>
","connectionName": "test-spark-es","es.port":"443"}, transformation_ctx = "DataSource0") ## @type: DataSink ## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>
/", "partitionKeys": []}, transformation_ctx = "DataSink0"] ## @return: DataSink0 ## @inputs: [frame = DataSource0] DataSink0 = glueContext.write_dynamic_frame.from_options(frame = DataSource0, connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>
/", "partitionKeys": []}, transformation_ctx = "DataSink0") job.commit()
汎用オプション
このセクションのオプションは connection_options
として提供されていますが、特定のコネクタには当てはまりません。
以下のパラメータは、ブックマークを設定する際に一般的に使用されます。Amazon S3 または JDBC ワークフローに適用される場合があります。詳細については、「ジョブのブックマークを使用する」を参照してください。
jobBookmarkKeys
- 列名の配列。jobBookmarkKeysSortOrder
— ソート順序に基づいて値を比較する方法を定義する文字列。有効な値:"asc"
、"desc"
。useS3ListImplementation
— Amazon S3 バケットの内容を一覧表示する際のメモリパフォーマンスの管理に使用されます。詳しくは、「Optimize memory management in AWS Glue」を参照してください。