AWS Glue for Spark での ETL の接続タイプとオプション - AWS Glue

AWS Glue for Spark での ETL の接続タイプとオプション

AWS Glue for Spark では、PySpark と Scala のさまざまなメソッドと変換で、connectionType パラメータを使用しながら接続タイプを指定します。connectionOptions または options パラメータを使用して接続オプションを指定します。

connectionType パラメータには、次の表に示す値を指定できます。各タイプの関連する connectionOptions (または options) パラメータ値については、以下のセクションで説明します。特に明記されていない限り、パラメータは、接続がソースまたはシンクとして使用されるときに適用されます。

接続オプションの設定と使用方法を示すサンプルコードについては、「各接続タイプのホームページ」を参照してください。

connectionType 接続先
dynamodb Amazon DynamoDB データベース
kinesis Amazon Kinesis Data Streams
s3 Simple Storage Service (Amazon S3)
documentdb Amazon DocumentDB (MongoDB 互換) データベース
opensearch Amazon OpenSearch Service
Redshift Amazon Redshift データベース
kafka Kafka または Amazon Managed Streaming for Apache Kafka
azurecosmos Azure Cosmos for NoSQL。
azuresql Azure SQL。
bigquery Google BigQuery。
mongodb MongoDB データベース (MongoDB Atlas を含む)。
sqlserver Microsoft SQL Server データベース (「JDBC 接続」を参照)
mysql MySQL データベース (「JDBC 接続」を参照)
oracle Oracle データベース (「JDBC 接続」を参照)
postgresql PostgreSQL データベース (「JDBC 接続」を参照)
saphana SAP HANA。
snowflake Snowflake データレイク
teradata Teradata Vantage。
vertica Vertica。
custom.* Spark、Athena、または JDBC データストア (「カスタムと AWS Marketplace での、connectionType の値」 を参照
marketplace.* Spark、Athena、または JDBC データストア (「カスタムと AWS Marketplace での、connectionType の値」を参照)

AWS Glue 5.0 for Spark の ETL の DataFrame オプション

DataFrame は、名前の付いた列にまとめられた、テーブルに似たデータセットで、機能スタイル (マップ/リデュース/フィルター/その他) 操作と SQL 操作 (選択、プロジェクト、集計) をサポートしています。

Glue でサポートされているデータソースの DataFrame を作成するには、以下が必要です。

  • データソースコネクタ ClassName

  • データソース接続 Options

同様に、Glue でサポートされているデータシンクに DataFrame を書き込むときも、同じものが必要です。

  • データシンクコネクタ ClassName

  • データシンク接続 Options

ジョブのブックマークなどの AWS Glue 機能や、connectionName などの DynamicFrame オプションは DataFrame ではサポートされていないことに注意してください。DataFrame とサポートされているオペレーションの詳細については、DataFrame の Spark のドキュメントを参照してください。

コネクタ ClassName の指定

データソース/シンクの ClassName を指定するには、.format オプションを使用して、データソース/シンクを定義するための対応するコネクタ ClassName を指定します。

JDBC コネクタ

JDBC コネクタの場合は、.format オプションの値として jdbc を指定し、driver オプションで JDBC ドライバー ClassName を指定します。

df = spark.read.format("jdbc").option("driver", "<DATA SOURCE JDBC DRIVER CLASSNAME>")... df.write.format("jdbc").option("driver", "<DATA SINK JDBC DRIVER CLASSNAME>")...

次の表は、AWS Glue for DataFrames でサポートされているデータソースの JDBC ドライバー ClassName の一覧です。

データソース ドライバーの ClassName
PostgreSQL org.postgresql.Driver
Oracle oracle.jdbc.driver.OracleDriver
SQLServer com.microsoft.sqlserver.jdbc.SQLServerDriver
MySQL com.mysql.jdbc.Driver
SAPHana com.sap.db.jdbc.Driver
Teradata com.teradata.jdbc.TeraDriver
スパークコネクタ

Spark コネクタの場合は、コネクタの ClassName.format オプションの値として指定します。

df = spark.read.format("<DATA SOURCE CONNECTOR CLASSNAME>")... df.write.format("<DATA SINK CONNECTOR CLASSNAME>")...

次の表は、AWS Glue for DataFrames でサポートされているデータソースの Spark コネクタ ClassName の一覧です。

データソース ClassName
MongoDB/DocumentDB glue.spark.mongodb
Redshift io.github.spark_redshift_community.spark.redshift
AzureCosmos cosmos.oltp
AzureSQL com.microsoft.sqlserver.jdbc.spark
BigQuery com.google.cloud.spark.bigquery
OpenSearch org.opensearch.spark.sql
Snowflake net.snowflake.spark.snowflake
Vertica com.vertica.spark.datasource.VerticaSource

接続オプションの指定

データソース/シンクへの接続の Options を指定するには、.option(<KEY>, <VALUE>) を使用して個々のオプションを指定するか、.options(<MAP>) を使用してキーと値のマップとして複数のオプションを指定します。

各データソース/シンクは、接続 Options の独自のセットをサポートします。利用可能な Options の詳細については、次の表に記載されている特定のデータソース/シンクの Spark コネクタの公開ドキュメントを参照してください。

次の例では、PostgreSQL から読み取り、SnowFlake に書き込みます。

Python

例:

from awsglue.context import GlueContext from pyspark.sql import SparkSession spark = SparkSession.builder.getOrCreate() dataSourceClassName = "jdbc" dataSourceOptions = { "driver": "org.postgresql.Driver", "url": "<url>", "user": "<user>", "password": "<password>", "dbtable": "<dbtable>", } dataframe = spark.read.format(className).options(**options).load() dataSinkClassName = "net.snowflake.spark.snowflake" dataSinkOptions = { "sfUrl": "<url>", "sfUsername": "<username>", "sfPassword": "<password>", "sfDatabase" -> "<database>", "sfSchema" -> "<schema>", "sfWarehouse" -> "<warehouse>" } dataframe.write.format(dataSinkClassName).options(**dataSinkOptions).save()
Scala

例:

import org.apache.spark.sql.SparkSession val spark = SparkSession.builder().getOrCreate() val dataSourceClassName = "jdbc" val dataSourceOptions = Map( "driver" -> "org.postgresql.Driver", "url" -> "<url>", "user" -> "<user>", "password" -> "<password>", "dbtable" -> "<dbtable>" ) val dataframe = spark.read.format(dataSourceClassName).options(dataSourceOptions).load() val dataSinkClassName = "net.snowflake.spark.snowflake" val dataSinkOptions = Map( "sfUrl" -> "<url>", "sfUsername" -> "<username>", "sfPassword" -> "<password>", "sfDatabase" -> "<database>", "sfSchema" -> "<schema>", "sfWarehouse" -> "<warehouse>" ) dataframe.write.format(dataSinkClassName).options(dataSinkOptions).save()

カスタムと AWS Marketplace での、connectionType の値

これには以下が含まれます。

  • "connectionType": "marketplace.athena": Amazon Athena データストアへの接続を指定します。この接続では、AWS Marketplace が提供するコネクタを使用します。

  • "connectionType": "marketplace.spark": Apache Spark データストアへの接続を指定します。この接続では、AWS Marketplace が提供するコネクタを使用します。

  • "connectionType": "marketplace.jdbc": JDBC データストアへの接続を指定します。この接続では、AWS Marketplace が提供するコネクタを使用します。

  • "connectionType": "custom.athena": Amazon Athena データストアへの接続を指定します。接続には、AWS Glue Studio にアップロードしたカスタムコネクタを使用します。

  • "connectionType": "custom.spark": Apache Spark データストアへの接続を指定します。接続には、AWS Glue Studio にアップロードしたカスタムコネクタを使用します。

  • "connectionType": "custom.jdbc": JDBC データストアへの接続を指定します。接続には、AWS Glue Studio にアップロードしたカスタムコネクタを使用します。

custom.jdbc または marketplace.jdbc 型での接続オプション

  • className – (必須) ドライバクラス名を示す文字列。

  • connectionName – (必須) コネクタに関連付けられている接続の名前を示す文字列。

  • url – (必須) データソースへの接続を構築するために使用される、プレースホルダを含む JDBC URL (${}) を示す文字列。プレースホルダー ${secretKey} は、AWS Secrets Manager 内にある同じ名前のシークレットにより置き換えられます。URL の構築の詳細については、データストアのドキュメントを参照してください。

  • secretId または user/password – (必須) URL の認証情報を取得するために使用される文字列。

  • dbTableまたはquery – (必須) データを取得するテーブルまたは SQL クエリを示す文字列。dbTable または query を指定できます。両方を指定することはできません。

  • partitionColumn – (オプション) パーティション化に使用される整数カラムの名前を示す文字列。このオプションは、lowerBoundupperBound、および numPartitions に含まれている場合にのみ機能します。このオプションの機能は、Spark SQL JDBC リーダーのものと同様です。詳細については、Apache Spark SQL, DataFrames and Datasets Guide の「JDBC To Other Databases」を参照してください。

    lowerBound および upperBound 値は、パーティションのストライドを決定するために使用されます (テーブル内の行のフィルタリングには使用しません)。返されるテーブル内のすべての行は、パーティション化されています。

    注記

    テーブル名の代わりにクエリを使用する場合は、指定されたパーティショニング条件でクエリが動作することを確認する必要があります。例:

    • "SELECT col1 FROM table1" の形式のクエリでパーティション列を使用する場合、末尾に WHERE 句を追加してそのクエリをテストします。

    • クエリ形式が SELECT col1 FROM table1 WHERE col2=val" の場合は、WHERE 句を AND とパーティション列を使用する式で拡張することで、そのクエリをテストします。

  • lowerBound – パーティションストライドを決定するために使用される partitionColumn の最小値を示す整数 (オプション)。

  • upperBound – パーティションストライドを決定するために使用される partitionColumn の最大値を示す整数 (オプション)。

  • numPartitions – パーティション数を示す整数 (オプション)。この値は、(範囲に含まれる) lowerBound と (範囲に含まれない) upperBound とともに使用され、partitionColumn の分割で使用するために生成された WHERE 句の式のための、パーティションストライドを形成します。

    重要

    パーティションが多すぎると、外部データベースシステムで問題が発生する可能性があるため、パーティションの数には注意を払ってください。

  • filterPredicate – ソースからのデータをフィルタリングする、追加の条件句を示す文字列 (オプション)。例:

    BillingCity='Mountain View'

    table 名の代わりに query を使用した場合は、指定された filterPredicate でクエリが動作することを確認します。例:

    • クエリの形式が "SELECT col1 FROM table1" の場合は、フィルタ述語を使用するクエリの末尾に WHERE 句を追加して、そのクエリをテストします。

    • クエリ形式が "SELECT col1 FROM table1 WHERE col2=val" の場合は、WHERE 句を AND およびフィルター述語を使用する式で拡張して、そのクエリをテストします。

  • dataTypeMapping – (ディクショナリ、オプション) JDBC データ型 から Glue データ型に対するマッピングを構築する、カスタムのデータ型マッピング。例えば、オプション "dataTypeMapping":{"FLOAT":"STRING"} では、ドライバーの ResultSet.getString() メソッドを呼び出すことで、JDBC タイプ FLOAT のデータフィールドが Java String 型にマッピングされ、それを使用して AWS Glue レコードが構築されます。ResultSet オブジェクトは各ドライバによって実装されるため、その動作は使用するドライバにより決定されます。ドライバによる変換の実行方法については、JDBC ドライバのドキュメントを参照してください。

  • AWS Glue で現在サポートされているデータ型は次のとおりです。

    • DATE

    • STRING

    • TIMESTAMP

    • INT

    • FLOAT

    • LONG

    • BIGDECIMAL

    • BYTE

    • SHORT

    • DOUBLE

    JDBC データ型としては、Java8 java.sql.types がサポートされています。

    デフォルトの (JDBC から AWS Glue への) データ型マッピングは以下のとおりです。

    • DATE -> DATE

    • VARCHAR -> STRING

    • CHAR -> STRING

    • LONGNVARCHAR -> STRING

    • TIMESTAMP -> TIMESTAMP

    • INTEGER -> INT

    • FLOAT -> FLOAT

    • REAL -> FLOAT

    • BIT -> BOOLEAN

    • BOOLEAN -> BOOLEAN

    • BIGINT -> LONG

    • DECIMAL -> BIGDECIMAL

    • NUMERIC -> BIGDECIMAL

    • TINYINT -> SHORT

    • SMALLINT -> SHORT

    • DOUBLE -> DOUBLE

    カスタムのデータタイプマッピングで dataTypeMapping オプションを使用すると、デフォルトのデータ型マッピングをオーバーライドできます。この影響を受けるのは、dataTypeMapping オプションでリストされた JDBC データ型のみです。他のすべての JDBC データ型に対しては、デフォルトのマッピングが使用されます。必要に応じて、別の JDBC データ型のマッピングを追加することも可能です。デフォルトまたはカスタムのマッピングのいずれにも JDBC データ型が含まれていない場合、データ型はデフォルトで AWS GlueSTRING データ型に変換されます。

次の Python コード例は、AWS Marketplace のJDBC ドライバーを使用して、JDBC データベースからの読み取りを実行する方法を示しています。ここでは、データベースからの読み取りと、S3 ロケーションへの書き込みの方法を知ることができます。

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [connection_type = "marketplace.jdbc", connection_options = {"dataTypeMapping":{"INTEGER":"STRING"},"upperBound":"200","query":"select id, name, department from department where id < 200","numPartitions":"4", "partitionColumn":"id","lowerBound":"0","connectionName":"test-connection-jdbc"}, transformation_ctx = "DataSource0"] ## @return: DataSource0 ## @inputs: [] DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.jdbc", connection_options = {"dataTypeMapping":{"INTEGER":"STRING"}, "upperBound":"200","query":"select id, name, department from department where id < 200","numPartitions":"4","partitionColumn":"id","lowerBound":"0", "connectionName":"test-connection-jdbc"}, transformation_ctx = "DataSource0") ## @type: ApplyMapping ## @args: [mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0"] ## @return: Transform0 ## @inputs: [frame = DataSource0] Transform0 = ApplyMapping.apply(frame = DataSource0, mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0") ## @type: DataSink ## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0"] ## @return: DataSink0 ## @inputs: [frame = Transform0] DataSink0 = glueContext.write_dynamic_frame.from_options(frame = Transform0, connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0") job.commit()

custom.athena または marketplace.athena タイプ用の接続オプション

  • className – (必須) ドライバクラス名を示す文字列。Athena-CloudWatch コネクタを使用している場合、このパラメータ値はクラス名にプレフィックスされます (例: "com.amazonaws.athena.connectors")。Athena-CloudWatch コネクタは、メタデータハンドラーとレコードハンドラーの 2 つのクラスで構成されています。共通のプレフィックスを指定することで、API がそのプレフィックスに基づいた適切なクラスをロードします。

  • tableName – (必須) 読み込む CloudWatch ログストリームの名前を示す文字列。このコードスニペットでは、ビューに特別な名前 all_log_streams を使用しています。この場合、返された動的データフレームには、ロググループ内のすべてのログストリームからのデータが含まれます。

  • schemaName – (必須) 読み取りのソースとなる CloudWatch ロググループの名前を示す文字列。例えば、/aws-glue/jobs/output と指定します。

  • connectionName – (必須) コネクタに関連付けられている接続の名前を示す文字列。

このコネクタの追加オプションについては、GitHub の Amazon Athena CloudWatch Connector README ファイルを参照してください。

次の Python コード例は、AWS Marketplace コネクタを使用しながら、Athena データストアからの読み取りを実行する方法を示しています。こここでは、Athena からの読み取りと、S3 ロケーションへの書き込みを行う方法を知ることができます。

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [connection_type = "marketplace.athena", connection_options = {"tableName":"all_log_streams","schemaName":"/aws-glue/jobs/output", "connectionName":"test-connection-athena"}, transformation_ctx = "DataSource0"] ## @return: DataSource0 ## @inputs: [] DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.athena", connection_options = {"tableName":"all_log_streams",, "schemaName":"/aws-glue/jobs/output","connectionName": "test-connection-athena"}, transformation_ctx = "DataSource0") ## @type: ApplyMapping ## @args: [mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0"] ## @return: Transform0 ## @inputs: [frame = DataSource0] Transform0 = ApplyMapping.apply(frame = DataSource0, mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0") ## @type: DataSink ## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0"] ## @return: DataSink0 ## @inputs: [frame = Transform0] DataSink0 = glueContext.write_dynamic_frame.from_options(frame = Transform0, connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0") job.commit()

custom.spark または marketplace.spark タイプ用の接続オプション

  • className – (必須) コネクタのクラス名を支援す文字列。

  • secretId – (オプション) コネクタ接続の認証情報を取得するために使用される文字列。

  • connectionName – (必須) コネクタに関連付けられている接続の名前を示す文字列。

  • その他のオプションは、データストアによって異なります。例えば、Elasticsearch for Apache Hadoop ドキュメントの説明にあるように、OpenSearch の設定オプションは「es」でプレフィックスされます。Spark から Snowflake への接続では、Connecting to Snowflake ガイドの「Using the Spark Connector」で説明されているように、sfUser および sfPassword のオプションを使用します。

次の Python コード例に、marketplace.spark 接続を使用して、OpenSearch のデータストアからの読み取りを実行する方法を示します。

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [connection_type = "marketplace.spark", connection_options = {"path":"test", "es.nodes.wan.only":"true","es.nodes":"https://<AWS endpoint>", "connectionName":"test-spark-es","es.port":"443"}, transformation_ctx = "DataSource0"] ## @return: DataSource0 ## @inputs: [] DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.spark", connection_options = {"path":"test","es.nodes.wan.only": "true","es.nodes":"https://<AWS endpoint>","connectionName": "test-spark-es","es.port":"443"}, transformation_ctx = "DataSource0") ## @type: DataSink ## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0"] ## @return: DataSink0 ## @inputs: [frame = DataSource0] DataSink0 = glueContext.write_dynamic_frame.from_options(frame = DataSource0, connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0") job.commit()

汎用オプション

このセクションのオプションは connection_options として提供されていますが、特定のコネクタには当てはまりません。

以下のパラメータは、ブックマークを設定する際に一般的に使用されます。Amazon S3 または JDBC ワークフローに適用される場合があります。詳細については、「ジョブのブックマークを使用する」を参照してください。

  • jobBookmarkKeys - 列名の配列。

  • jobBookmarkKeysSortOrder — ソート順序に基づいて値を比較する方法を定義する文字列。有効な値: "asc""desc"

  • useS3ListImplementation — Amazon S3 バケットの内容を一覧表示する際のメモリパフォーマンスの管理に使用されます。詳しくは、「Optimize memory management in AWS Glue」を参照してください。