AWS Glue for Spark での ETL の接続タイプとオプション - AWS Glue

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

AWS Glue for Spark での ETL の接続タイプとオプション

AWS Glue for Spark では、PySpark と Scala のさまざまなメソッドと変換で、connectionType パラメータを使用しながら接続タイプを指定します。connectionOptions または options パラメータを使用して接続オプションを指定します。

connectionType パラメータには、次の表に示す値を指定できます。各タイプの関連する connectionOptions (または options) パラメータ値については、以下のセクションで説明します。特に明記されていない限り、パラメータは、接続がソースまたはシンクとして使用されるときに適用されます。

接続オプションの設定と使用方法を示すサンプルコードについては、「各接続タイプのホームページ」を参照してください。

connectionType 接続先
dynamodb Amazon DynamoDB データベース
kinesis Amazon Kinesis Data Streams
s3 Simple Storage Service (Amazon S3)
documentdb Amazon DocumentDB (MongoDB 互換) データベース
opensearch Amazon OpenSearch Service
Redshift Amazon Redshift データベース
kafka Kafka または Amazon Managed Streaming for Apache Kafka
azurecosmos Azure Cosmos for NoSQL。
azuresql Azure SQL。
bigquery Google BigQuery。
mongodb MongoDB データベース (MongoDB Atlas を含む)。
sqlserver Microsoft SQL Server データベース (「JDBC 接続」を参照)
mysql MySQL データベース (「JDBC 接続」を参照)
oracle Oracle データベース (「JDBC 接続」を参照)
postgresql PostgreSQL データベース (「JDBC 接続」を参照)
saphana SAP HANA。
snowflake Snowflake データレイク
teradata Teradata Vantage。
vertica Vertica。
custom.* Spark、Athena、または JDBC データストア (「カスタムと AWS Marketplace での、connectionType の値」 を参照
marketplace.* Spark、Athena、または JDBC データストア (「カスタムと AWS Marketplace での、connectionType の値」を参照)

カスタムと AWS Marketplace での、connectionType の値

これには以下が含まれます。

  • "connectionType": "marketplace.athena": Amazon Athena データストアへの接続を指定します。この接続では、AWS Marketplace が提供するコネクタを使用します。

  • "connectionType": "marketplace.spark": Apache Spark データストアへの接続を指定します。この接続では、AWS Marketplace が提供するコネクタを使用します。

  • "connectionType": "marketplace.jdbc": JDBC データストアへの接続を指定します。この接続では、AWS Marketplace が提供するコネクタを使用します。

  • "connectionType": "custom.athena": Amazon Athena データストアへの接続を指定します。接続には、AWS Glue Studio にアップロードしたカスタムコネクタを使用します。

  • "connectionType": "custom.spark": Apache Spark データストアへの接続を指定します。接続には、AWS Glue Studio にアップロードしたカスタムコネクタを使用します。

  • "connectionType": "custom.jdbc": JDBC データストアへの接続を指定します。接続には、AWS Glue Studio にアップロードしたカスタムコネクタを使用します。

custom.jdbc または marketplace.jdbc 型での接続オプション

  • className – (必須) ドライバクラス名を示す文字列。

  • connectionName – (必須) コネクタに関連付けられている接続の名前を示す文字列。

  • url – (必須) データソースへの接続を構築するために使用される、プレースホルダを含む JDBC URL (${}) を示す文字列。プレースホルダー ${secretKey} は、AWS Secrets Manager 内にある同じ名前のシークレットにより置き換えられます。URL の構築の詳細については、データストアのドキュメントを参照してください。

  • secretId または user/password – (必須) URL の認証情報を取得するために使用される文字列。

  • dbTableまたはquery – (必須) データを取得するテーブルまたは SQL クエリを示す文字列。dbTable または query を指定できます。両方を指定することはできません。

  • partitionColumn – (オプション) パーティション化に使用される整数カラムの名前を示す文字列。このオプションは、lowerBoundupperBound、および numPartitions に含まれている場合にのみ機能します。このオプションの機能は、Spark SQL JDBC リーダーのものと同様です。詳細については、Apache Spark SQL, DataFrames and Datasets Guide の「JDBC To Other Databases」を参照してください。

    lowerBound および upperBound 値は、パーティションのストライドを決定するために使用されます (テーブル内の行のフィルタリングには使用しません)。返されるテーブル内のすべての行は、パーティション化されています。

    注記

    テーブル名の代わりにクエリを使用する場合は、指定されたパーティショニング条件でクエリが動作することを確認する必要があります。例:

    • "SELECT col1 FROM table1" の形式のクエリでパーティション列を使用する場合、末尾に WHERE 句を追加してそのクエリをテストします。

    • クエリ形式が SELECT col1 FROM table1 WHERE col2=val" の場合は、WHERE 句を AND とパーティション列を使用する式で拡張することで、そのクエリをテストします。

  • lowerBound – パーティションストライドを決定するために使用される partitionColumn の最小値を示す整数 (オプション)。

  • upperBound – パーティションストライドを決定するために使用される partitionColumn の最大値を示す整数 (オプション)。

  • numPartitions – パーティション数を示す整数 (オプション)。この値は、(範囲に含まれる) lowerBound と (範囲に含まれない) upperBound とともに使用され、partitionColumn の分割で使用するために生成された WHERE 句の式のための、パーティションストライドを形成します。

    重要

    パーティションが多すぎると、外部データベースシステムで問題が発生する可能性があるため、パーティションの数には注意を払ってください。

  • filterPredicate – ソースからのデータをフィルタリングする、追加の条件句を示す文字列 (オプション)。例:

    BillingCity='Mountain View'

    table 名の代わりに query を使用した場合は、指定された filterPredicate でクエリが動作することを確認します。例:

    • クエリの形式が "SELECT col1 FROM table1" の場合は、フィルタ述語を使用するクエリの末尾に WHERE 句を追加して、そのクエリをテストします。

    • クエリ形式が "SELECT col1 FROM table1 WHERE col2=val" の場合は、WHERE 句を AND およびフィルター述語を使用する式で拡張して、そのクエリをテストします。

  • dataTypeMapping – (ディクショナリ、オプション) JDBC データ型 から Glue データ型に対するマッピングを構築する、カスタムのデータ型マッピング。例えば、オプション "dataTypeMapping":{"FLOAT":"STRING"} では、ドライバーの ResultSet.getString() メソッドを呼び出すことで、JDBC タイプ FLOAT のデータフィールドが Java String 型にマッピングされ、それを使用して AWS Glue レコードが構築されます。ResultSet オブジェクトは各ドライバによって実装されるため、その動作は使用するドライバにより決定されます。ドライバによる変換の実行方法については、JDBC ドライバのドキュメントを参照してください。

  • AWS Glue で現在サポートされているデータ型は次のとおりです。

    • DATE

    • STRING

    • TIMESTAMP

    • INT

    • FLOAT

    • LONG

    • BIGDECIMAL

    • BYTE

    • SHORT

    • DOUBLE

    JDBC データ型としては、Java8 java.sql.types がサポートされています。

    デフォルトの (JDBC から AWS Glue への) データ型マッピングは以下のとおりです。

    • DATE -> DATE

    • VARCHAR -> STRING

    • CHAR -> STRING

    • LONGNVARCHAR -> STRING

    • TIMESTAMP -> TIMESTAMP

    • INTEGER -> INT

    • FLOAT -> FLOAT

    • REAL -> FLOAT

    • BIT -> BOOLEAN

    • BOOLEAN -> BOOLEAN

    • BIGINT -> LONG

    • DECIMAL -> BIGDECIMAL

    • NUMERIC -> BIGDECIMAL

    • TINYINT -> SHORT

    • SMALLINT -> SHORT

    • DOUBLE -> DOUBLE

    カスタムのデータタイプマッピングで dataTypeMapping オプションを使用すると、デフォルトのデータ型マッピングをオーバーライドできます。この影響を受けるのは、dataTypeMapping オプションでリストされた JDBC データ型のみです。他のすべての JDBC データ型に対しては、デフォルトのマッピングが使用されます。必要に応じて、別の JDBC データ型のマッピングを追加することも可能です。デフォルトまたはカスタムのマッピングのいずれにも JDBC データ型が含まれていない場合、データ型はデフォルトで AWS GlueSTRING データ型に変換されます。

次の Python コード例は、AWS Marketplace のJDBC ドライバーを使用して、JDBC データベースからの読み取りを実行する方法を示しています。ここでは、データベースからの読み取りと、S3 ロケーションへの書き込みの方法を知ることができます。

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [connection_type = "marketplace.jdbc", connection_options = {"dataTypeMapping":{"INTEGER":"STRING"},"upperBound":"200","query":"select id, name, department from department where id < 200","numPartitions":"4", "partitionColumn":"id","lowerBound":"0","connectionName":"test-connection-jdbc"}, transformation_ctx = "DataSource0"] ## @return: DataSource0 ## @inputs: [] DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.jdbc", connection_options = {"dataTypeMapping":{"INTEGER":"STRING"}, "upperBound":"200","query":"select id, name, department from department where id < 200","numPartitions":"4","partitionColumn":"id","lowerBound":"0", "connectionName":"test-connection-jdbc"}, transformation_ctx = "DataSource0") ## @type: ApplyMapping ## @args: [mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0"] ## @return: Transform0 ## @inputs: [frame = DataSource0] Transform0 = ApplyMapping.apply(frame = DataSource0, mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0") ## @type: DataSink ## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0"] ## @return: DataSink0 ## @inputs: [frame = Transform0] DataSink0 = glueContext.write_dynamic_frame.from_options(frame = Transform0, connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0") job.commit()

custom.athena または marketplace.athena タイプ用の接続オプション

  • className – (必須) ドライバクラス名を示す文字列。Athena-CloudWatch コネクタを使用している場合、このパラメータ値はクラス名にプレフィックスされます (例: "com.amazonaws.athena.connectors")。Athena-CloudWatch コネクタは、メタデータハンドラーとレコードハンドラーの 2 つのクラスで構成されています。共通のプレフィックスを指定することで、API がそのプレフィックスに基づいた適切なクラスをロードします。

  • tableName – (必須) 読み込む CloudWatch ログストリームの名前を示す文字列。このコードスニペットでは、ビューに特別な名前 all_log_streams を使用しています。この場合、返された動的データフレームには、ロググループ内のすべてのログストリームからのデータが含まれます。

  • schemaName – (必須) 読み取りのソースとなる CloudWatch ロググループの名前を示す文字列。例えば、/aws-glue/jobs/output です。

  • connectionName – (必須) コネクタに関連付けられている接続の名前を示す文字列。

このコネクタの追加オプションについては、GitHub の Amazon Athena CloudWatch Connector README ファイルを参照してください。

次の Python コード例は、AWS Marketplace コネクタを使用しながら、Athena データストアからの読み取りを実行する方法を示しています。こここでは、Athena からの読み取りと、S3 ロケーションへの書き込みを行う方法を知ることができます。

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [connection_type = "marketplace.athena", connection_options = {"tableName":"all_log_streams","schemaName":"/aws-glue/jobs/output", "connectionName":"test-connection-athena"}, transformation_ctx = "DataSource0"] ## @return: DataSource0 ## @inputs: [] DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.athena", connection_options = {"tableName":"all_log_streams",, "schemaName":"/aws-glue/jobs/output","connectionName": "test-connection-athena"}, transformation_ctx = "DataSource0") ## @type: ApplyMapping ## @args: [mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0"] ## @return: Transform0 ## @inputs: [frame = DataSource0] Transform0 = ApplyMapping.apply(frame = DataSource0, mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0") ## @type: DataSink ## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0"] ## @return: DataSink0 ## @inputs: [frame = Transform0] DataSink0 = glueContext.write_dynamic_frame.from_options(frame = Transform0, connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0") job.commit()

custom.spark または marketplace.spark タイプ用の接続オプション

  • className – (必須) コネクタのクラス名を支援す文字列。

  • secretId – (オプション) コネクタ接続の認証情報を取得するために使用される文字列。

  • connectionName – (必須) コネクタに関連付けられている接続の名前を示す文字列。

  • その他のオプションは、データストアによって異なります。例えば、Elasticsearch for Apache Hadoop ドキュメントの説明にあるように、OpenSearch の設定オプションは「es」でプレフィックスされます。Spark から Snowflake への接続では、Connecting to Snowflake ガイドの「Using the Spark Connector」で説明されているように、sfUser および sfPassword のオプションを使用します。

次の Python コード例に、marketplace.spark 接続を使用して、OpenSearch のデータストアからの読み取りを実行する方法を示します。

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [connection_type = "marketplace.spark", connection_options = {"path":"test", "es.nodes.wan.only":"true","es.nodes":"https://<AWS endpoint>", "connectionName":"test-spark-es","es.port":"443"}, transformation_ctx = "DataSource0"] ## @return: DataSource0 ## @inputs: [] DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.spark", connection_options = {"path":"test","es.nodes.wan.only": "true","es.nodes":"https://<AWS endpoint>","connectionName": "test-spark-es","es.port":"443"}, transformation_ctx = "DataSource0") ## @type: DataSink ## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0"] ## @return: DataSink0 ## @inputs: [frame = DataSource0] DataSink0 = glueContext.write_dynamic_frame.from_options(frame = DataSource0, connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0") job.commit()

汎用オプション

このセクションのオプションは connection_options として提供されていますが、特定のコネクタには当てはまりません。

以下のパラメータは、ブックマークを設定する際に一般的に使用されます。Amazon S3 または JDBC ワークフローに適用される場合があります。詳細については、「ジョブのブックマークを使用する」を参照してください。

  • jobBookmarkKeys - 列名の配列。

  • jobBookmarkKeysSortOrder — ソート順序に基づいて値を比較する方法を定義する文字列。有効な値: "asc""desc"

  • useS3ListImplementation — Amazon S3 バケットの内容を一覧表示する際のメモリパフォーマンスの管理に使用されます。詳しくは、「Optimize memory management in AWS Glue」を参照してください。