AWS Glue での ETL の接続タイプとオプション - AWS Glue

AWS Glue での ETL の接続タイプとオプション

AWS Glue では、PySpark と Scala のさまざまなメソッドと変換で、connectionType パラメータを使用しながら接続タイプを指定します。connectionOptions または options パラメータを使用して接続オプションを指定します。

connectionType パラメータには、次の表に示す値を指定できます。各タイプの関連する connectionOptions (または options) パラメータ値については、以下のセクションで説明します。特に明記されていない限り、パラメータは、接続がソースまたはシンクとして使用されるときに適用されます。

接続オプションの設定と使用方法を示すサンプルコードについては、「例: 接続タイプとオプションの設定」を参照してください。

connectionType 接続先
カスタム.* Spark、Athena、または JDBC データストア (カスタムと AWS Marketplace での、connectionType の値 参照
documentdb Amazon DocumentDB (MongoDB 互換) データベース
dynamodb Amazon DynamoDB データベース
Kafka Kafka または Amazon Managed Streaming for Apache Kafka
kinesis Amazon Kinesis Data Streams
Marketplace.* Spark、Athena、または JDBC データストア (カスタムと AWS Marketplace での、connectionType の値 参照)
mongodb MongoDB データベース
mysql MySQL データベース (「JDBC connectionType の値」を参照)
oracle Oracle データベース (「JDBC connectionType の値」を参照)
orc Apache Hive Optimized Row Columnar (ORC) ファイル形式で Amazon Simple Storage Service (Amazon S3) に保存されたファイル
parquet Apache Parquet ファイル形式で Amazon S3 に保存されたファイル
postgresql PostgreSQL データベース (「JDBC connectionType の値」を参照)
Redshift Amazon Redshift データベース (JDBC connectionType の値 参照)
s3 Amazon S3
sqlserver Microsoft SQL Server データベース (「JDBC connectionType の値」を参照)

"connectionType": "documentdb"

Amazon DocumentDB (MongoDB 互換) への接続を指定します。

接続オプションは、ソース接続とシンク接続とで異なります。

"connectionType": "documentdb" as Source

"connectionType": "documentdb" をソースとして、次の接続オプションを使用します。

  • "uri": (必須) 読み取り元の Amazon DocumentDB ホスト (mongodb://<host>:<port> 形式)。

  • "database": (必須) 読み取り元の Amazon DocumentDB データベース。

  • "collection": (必須) 読み取り元の Amazon DocumentDB コレクション。

  • "username": (必須) Amazon DocumentDB のユーザー名。

  • "password": (必須) Amazon DocumentDB のパスワード。

  • "ssl": (SSL を使用する場合は必須) SSL を使用して接続する場合は、このオプションの値を "true" に設定する必要があります。

  • "ssl.domain_match": (SSL を使用する場合は必須) SSL を使用して接続する場合は、このオプションの値を "false" に設定する必要があります。

  • "batchSize": (オプション): 内部バッチのカーソル内で使用される、バッチごとに返されるドキュメントの数。

  • "partitioner": (オプション) Amazon DocumentDB から入力データを読み取るためのパーティショナーのクラス名。コネクタには、次のパーティショナーがあります。

    • MongoDefaultPartitioner (デフォルト)

    • MongoSamplePartitioner

    • MongoShardedPartitioner

    • MongoSplitVectorPartitioner

    • MongoPaginateByCountPartitioner

    • MongoPaginateBySizePartitioner

  • "partitionerOptions" (オプション): 指定されたパーティショナーのオプション。各パーティショナーでは、次のオプションがサポートされています。

    • MongoSamplePartitioner: partitionKey, partitionSizeMB, samplesPerPartition

    • MongoShardedPartitioner: shardkey

    • MongoSplitVectorPartitioner: partitionKey、partitionSizeMB

    • MongoPaginateByCountPartitioner: partitionKey, numberOfPartitions

    • MongoPaginateBySizePartitioner: partitionKey、partitionSizeMB

    これらのオプションの詳細については、MongoDB のドキュメントの「Partitioner Configuration」を参照してください。サンプルコードについては、「例: 接続タイプとオプションの設定」を参照してください。

"connectionType": "documentdb" as Sink

"connectionType": "documentdb" をシンクとして、次の接続オプションを使用します。

  • "uri": (必須) 書き込み先の Amazon DocumentDB ホスト (mongodb://<host>:<port> 形式)。

  • "database": (必須) 書き込み先の Amazon DocumentDB データベース。

  • "collection": (必須) 書き込み先の Amazon DocumentDB コレクション。

  • "username": (必須) Amazon DocumentDB のユーザー名。

  • "password": (必須) Amazon DocumentDB のパスワード。

  • "extendedBsonTypes": (オプション) true が指定されている場合、Amazon DocumentDB へのデータ書き込み時に拡張 BSON 型を使用できます。デフォルト: true

  • "replaceDocument": (オプション) true の場合、_id フィールドを含むデータセットを保存するときに、ドキュメント全体を置き換えます。false の場合、データセットのフィールドと一致するドキュメントのフィールドのみが更新されます。デフォルト: true

  • "maxBatchSize": (オプション): データを保存するときの一括オペレーションの最大バッチサイズ。デフォルトは 512 です。

サンプルコードについては、「例: 接続タイプとオプションの設定」を参照してください。

"connectionType": "dynamodb"

Amazon DynamoDB への接続を指定します。

接続オプションは、ソース接続とシンク接続とで異なります。

「connectionType」: 「dynamodb」 (ソースの場合)

"connectionType": "dynamodb" をソースとして、次の接続オプションを使用します。

  • "dynamodb.input.tableName": (必須) 読み取り元の DynamoDB テーブル。

  • "dynamodb.throughput.read.percent": (オプション) 使用する読み込みキャパシティーユニット (RCU) の割合。デフォルトでは、"0.5" に設定されます。許容値は "0.1" から "1.5" (これらの値を含む) です。

    • デフォルトの読み取りレートは 0.5 です。この場合 AWS Glue は、テーブルの読み取り容量の半分を消費することを試みます。上記の値を 0.5 より上に設定すると、AWS Glue は読み取りのリクエストレートを増加させ、0.5 より低くした場合はそのリクエストレートを減少させます。(実際の読み込みレートは、DynamoDB テーブルに、統一ディストリビューションのキーがあるかどうかなどの要因によって変わります。)

    • DynamoDB テーブルがオンデマンドモードの場合、AWS Glue は、テーブルの読み取り容量を 40000 として処理します。大きなテーブルをエクスポートする場合は、DynamoDB テーブルをオンデマンドモードに切り替えることをお勧めします。

  • "dynamodb.splits": (オプション) 読み取り中にこの DynamoDB テーブルを分割するパーティションの数を定義します。デフォルトでは、"1" に設定されます。許容値は "1" から "1,000,000" (これらの値を含む) です。

    • 1 は並列処理がないことを表します。より良いパフォーマンスを得るためには、より大きな値を (以下の式を使用して) 指定することを強くお勧めします。

    • numSlots は、次の式を使用して計算し、dynamodb.splits として使用することをお勧めします。さらにパフォーマンスを向上させる必要がある場合は、DPU 数を増加してジョブをスケールアウトすることをお勧めします。

      • numExecutors =

        • WorkerTypeStandard の場合は (DPU - 1) * 2 - 1

        • WorkerTypeG.1X または G.2X の場合は (NumberOfWorkers - 1)

      • numSlotsPerExecutor =

        • WorkerTypeStandard の場合は 4

        • WorkerTypeG.1X の場合は 8

        • WorkerTypeG.2X の場合は 16

      • numSlots = numSlotsPerExecutor * numExecutors

  • "dynamodb.sts.roleArn": (オプション) クロスアカウントアクセスのために引き受ける IAM ロール の ARN。このパラメータは、AWS Glue 1.0 以降で使用可能です。

  • "dynamodb.sts.roleSessionName": (任意) STS セッション名。デフォルトでは、「glue-dynamodb-read-sts-session」に設定されています。このパラメータは、AWS Glue 1.0 以降で使用可能です。

注記

AWS Glue は、別のAWS アカウントの DynamoDB テーブルからの、データ読み取りをサポートしています。詳細については、「DynamoDB テーブルへのクロスアカウントおよびクロスリージョンでのアクセス」を参照してください。

注記

DynamoDB リーダーでは、フィルタまたはプッシュダウン述語は使用できません。

「connectionType」: 「dynamodb」 (シンクの場合)

"connectionType": "dynamodb" をシンクとして、次の接続オプションを使用します。

  • "dynamodb.output.tableName": (必須) 書き込み先の DynamoDB テーブル。

  • "dynamodb.throughput.write.percent": (オプション) 使用する書き込みキャパシティーユニット (WCU) の割合。デフォルトでは、"0.5" に設定されます。許容値は "0.1" から "1.5" (これらの値を含む) です。

    • デフォルトの書き込みレートは 0.5 です。この場合 AWS Glue は、テーブルの書き込み容量の半分を消費することを試みます。上記の値を 0.5 より上に設定すると、AWS Glue は書き込みリクエストレートを増加させ、0.5 より低くした場合はそのリクエストレートを減少させます。(実際の書き込みレートは、統一されたキーのディストリビューションが、DynamoDB テーブルにあるかどうかなどの要因によって変わります)。

    • DynamoDB テーブルがオンデマンドモードの場合、AWS Glue は、テーブルの書き込み容量を 40000 として処理します。大きなテーブルをインポートする場合は、DynamoDB テーブルをオンデマンドモードに切り替えることをお勧めします。

  • "dynamodb.output.numParallelTasks": (オプション) 同時に DynamoDB に書き込める並列タスクの数を定義します。Spark タスクごとに許容される WCU を計算するために使用されます。これらの詳細設定を制御しない場合は、このパラメータを指定する必要はありません。

    • permissiveWcuPerTask = TableWCU * dynamodb.throughput.write.percent / dynamodb.output.numParallelTasks

    • このパラメータを指定しない場合、Spark タスクごとに許可される WCU は、次の式により自動的に計算されます。

      • numPartitions = dynamicframe.getNumPartitions()

      • numExecutors =

        • WorkerTypeStandard の場合は (DPU - 1) * 2 - 1

        • WorkerTypeG.1X または G.2X の場合は (NumberOfWorkers - 1)

      • numSlotsPerExecutor =

        • WorkerTypeStandard の場合は 4

        • WorkerTypeG.1X の場合は 8

        • WorkerTypeG.2X の場合は 16

      • numSlots = numSlotsPerExecutor * numExecutors

      • numParallelTasks = min(numPartitions, numSlots)

    • 例 1. DPU=10、WorkerType=Standard。入力 DynamicFrame には、100 個の RDD パーティションがあります。

      • numPartitions = 100

      • numExecutors = (10 - 1) * 2 - 1 = 17

      • numSlots = 4 * 17 = 68

      • numParallelTasks = min(100, 68) = 68

    • 例 2. DPU=10、WorkerType=Standard。入力 DynamicFrame には、20 個の RDD パーティションがあります。

      • numPartitions = 20

      • numExecutors = (10 - 1) * 2 - 1 = 17

      • numSlots = 4 * 17 = 68

      • numParallelTasks = min(20, 68) = 20

  • "dynamodb.output.retry": (オプション) DynamoDB から ProvisionedThroughputExceededException が送られている場合の、再試行の実行回数を定義します。デフォルトでは、"10" に設定されています。

  • "dynamodb.sts.roleArn": (オプション) クロスアカウントアクセスのために引き受ける IAM ロール の ARN。

  • "dynamodb.sts.roleSessionName": (任意) STS セッション名。デフォルトでは、「glue-dynamodb-write-sts-session」に設定されています。

注記

DynamoDB のライターは、AWS Glue バージョン 1.0 以降でサポートされます。

注記

AWS Glue は、別の AWS アカウントの DynamoDB テーブルに対するデータの書き込みをサポートしています。詳細については、「DynamoDB テーブルへのクロスアカウントおよびクロスリージョンでのアクセス」を参照してください。

DynamoDB テーブルとの間で、読み取りおよび書き込みを行う方法を、次のコード例により示します。ここでは、あるテーブルから読み取りを行い、別のテーブルに対して書き込みを行っています。

Python
import sys from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job from awsglue.utils import getResolvedOptions args = getResolvedOptions(sys.argv, ["JOB_NAME"]) glue_context= GlueContext(SparkContext.getOrCreate()) job = Job(glue_context) job.init(args["JOB_NAME"], args) dyf = glue_context.create_dynamic_frame.from_options( connection_type="dynamodb", connection_options={ "dynamodb.input.tableName": "test_source", "dynamodb.throughput.read.percent": "1.0", "dynamodb.splits": "100" } ) print(dyf.getNumPartitions()) glue_context.write_dynamic_frame_from_options( frame=dyf, connection_type="dynamodb", connection_options={ "dynamodb.output.tableName": "test_sink", "dynamodb.throughput.write.percent": "1.0" } ) job.commit()
Scala
import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.DynamoDbDataSink import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object GlueApp { def main(sysArgs: Array[String]): Unit = { val glueContext = new GlueContext(SparkContext.getOrCreate()) val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) val dynamicFrame = glueContext.getSourceWithFormat( connectionType = "dynamodb", options = JsonOptions(Map( "dynamodb.input.tableName" -> "test_source", "dynamodb.throughput.read.percent" -> "1.0", "dynamodb.splits" -> "100" )) ).getDynamicFrame() print(dynamicFrame.getNumPartitions()) val dynamoDbSink: DynamoDbDataSink = glueContext.getSinkWithFormat( connectionType = "dynamodb", options = JsonOptions(Map( "dynamodb.output.tableName" -> "test_sink", "dynamodb.throughput.write.percent" -> "1.0" )) ).asInstanceOf[DynamoDbDataSink] dynamoDbSink.writeDynamicFrame(dynamicFrame) Job.commit() } }

「connectionType」: 「kafka」

Kafka クラスターまたは Amazon Managed Streaming for Apache Kafka への接続を指定します。

GlueContext オブジェクトにある以下のメソッドを使用することで、Kafka ストリーミングソースからのレコードを利用できます。

  • getCatalogSource

  • getSource

  • getSourceWithFormat

getCatalogSource を使用する場合、ジョブには Data Catalog のデータベースとテーブル名の情報が提供されます。それらを使用することで、Apache Kafka ストリームからの読み取りを実行するための、基本的なパラメータを取得できます。getSource を使用する場合は、これらのパラメータを明示的に指定する必要があります。

GetSource では connectionOptions を、getSourceWithFormat では options を、getCatalogSource では additionalOptionsを使用することで、これらのオプションを指定できます。

"connectionType": "kafka" では、次の接続オプションを使用します。

  • bootstrap.servers (必須) ブートストラップサーバーの URL のリスト (例: b-1.vpc-test-2.o4q88o.c6.kafka.us-east-1.amazonaws.com:9094)。このオプションは API 呼び出しで指定するか、 Data Catalog 内のテーブルメタデータで定義する必要があります。

  • security.protocol (必須) Apache Kafka 接続で SSL をオンまたはオフにするかどうかを示すブール値。デフォルト値は「true」です。このオプションは API 呼び出しで指定するか、 Data Catalog 内のテーブルメタデータで定義する必要があります。

  • topicName (必須) Apache Kafka で指定されたトピック名。少なくくとも "topicName""assign""subscribePattern" の内いずれかを指定する必要があります。

  • "assign": (必須) 利用する特定の TopicPartitions。少なくくとも "topicName""assign""subscribePattern" の内いずれかを指定する必要があります。

  • "subscribePattern": (必須) サブスクライブする先のトピックリストを識別する Java の正規表現文字列。少なくくとも "topicName""assign""subscribePattern" の内いずれかを指定する必要があります。

  • classification (オプション)

  • delimiter (オプション)

  • "startingOffsets": (オプション) Kafka トピック内で、データの読み取りを開始する位置 使用できる値は、"earliest" または "latest" です。デフォルト値は "latest" です。

  • "endingOffsets": (オプション) バッチクエリの終了位置。設定が可能な値は、"latest" または、各 TopicPartition の終了オフセットを指定する JSON 文字列のいずれかです。

    JSON 文字列の場合、{"topicA":{"0":23,"1":-1},"topicB":{"0":-1}} の形式を使用します。オフセットとして値 -1 を設定する場合、"latest" の意味になります。

  • "pollTimeoutMs": (オプション) Spark ジョブエグゼキュータで、Kafka からデータをポーリングする際のタイムアウト値 (ミリ秒単位)。デフォルト値は 512 です。

  • "numRetries": (オプション) Kafka オフセットのフェッチが失敗したと判断される前の再試行回数。デフォルト値は 3 です。

  • "retryIntervalMs": (オプション) Kafka オフセットのフェッチを開始するまでの待機時間 (ミリ秒)。デフォルト値は 10 です。

  • "maxOffsetsPerTrigger": (オプション) 処理されるオフセットの最大数を、トリガー間隔ごとのレート上限で指定する値。指定されたオフセットの合計数は、異なるボリュームの topicPartitions 間で均等に分割されます。デフォルト値は「null」です。この場合、コンシューマーは既知の最新のオフセットまで、すべてのオフセットを読み取ります。

  • "minPartitions": (オプション) Kafka から読み取ることを想定する、最小のパーティション数。デフォルト値は「null」です。これは、Spark パーティションの数が Kafka パーティションの数に等しいことを意味します。

  • "includeHeaders": (オプション) Kafka ヘッダーを含めるかどうかを決定します。このオプションが「true」に設定されている場合、データ出力には、Array[Struct(key: String, value: String)] 型を使用して「headers」という名前が付けられた追加の列が含まれます。デフォルト値は「false」です。このオプションは、AWS Glue バージョン 3.0 以降のみで使用できます。

「connectionType」: 「kinesis」

Amazon Kinesis Data Streams への接続を指定します。

GlueContext オブジェクトでの以下のメソッドを使用することで、Kinesis ストリーミングソースからのレコードを利用できます。

  • getCatalogSource

  • getSource

  • getSourceWithFormat

getCatalogSource を使用する場合、ジョブには Data Catalog データベースとテーブル名の情報が提供されます。それらを使用することで、Kinesis ストリーミングソースからの読み取りを実行するための、基本的なパラメータを取得できます。getSource を使用する場合は、これらのパラメータを明示的に指定する必要があります。

GetSource では connectionOptions を、getSourceWithFormat では options を、getCatalogSource では additionalOptionsを使用することで、これらのオプションを指定できます。

"connectionType": "kinesis" では、次の接続オプションを使用します。

  • streamARN (必須) Kinesis データストリームの ARN。

  • classification (オプション)

  • delimiter (オプション)

  • "startingPosition": (オプション) Kinesis データストリーム内の、データの読み取り開始位置。使用できる値は "latest""trim_horizon"、または "earliest" です。デフォルト値は "latest" です。

  • "maxFetchTimeInMs": (オプション) ジョブエグゼキュータが、シャードごとの Kinesis データストリームからレコードを取得するのに費やした最大時間。ミリ秒 (ms) 単位で表示します。デフォルト値は 1000 です。

  • "maxFetchRecordsPerShard": (オプション) Kinesis データストリームのシャードごとの、フェッチするレコードの最大数。デフォルト値は 100000 です。

  • "maxRecordPerRead": (オプション) getRecords オペレーションごとに、Kinesis データストリームからフェッチするレコードの最大数。デフォルト値は 10000 です。

  • "addIdleTimeBetweenReads": (オプション) 2 つの連続する getRecords オペレーション間の遅延時間を追加します。デフォルト値は "False" です。このオプションは、Glue バージョン 2.0 以降でのみ設定可能です。

  • "idleTimeBetweenReadsInMs": (オプション) 2 つの連続する getRecords オペレーション間での、最短の遅延時間 (ミリ秒単位で指定)。デフォルト値は 1000 です。このオプションは、Glue バージョン 2.0 以降でのみ設定可能です。

  • "describeShardInterval": (オプション) スクリプトが呼び出す 2 つの ListShards API 間での、再シャーディングを考慮すべき最小時間。詳細については、Amazon Kinesis Data Streams デベロッパーガイドの「Strategies for Resharding」を参照してください。デフォルト値は 1s です。

  • "numRetries": (オプション) Kinesis Data Streams API リクエストを再試行する最大の回数。デフォルト値は 3 です。

  • "retryIntervalMs": (オプション) Kinesis Data Streams API 呼び出しを再試行するまでのクールオフ期間 (ミリ秒単位で指定)。デフォルト値は 1000 です。

  • "maxRetryIntervalMs": (オプション) 再試行で 2 つの Kinesis Data Streams API を呼び出す間の最大クールオフ期間 (ミリ秒単位で指定)。デフォルト値は 10000 です。

  • "avoidEmptyBatches": (オプション) バッチ処理を開始する前に、Kinesis データストリームで未読のデータをチェックすることで、空のマイクロバッチジョブを作成しないようにします。デフォルト値は "False" です。

"connectionType": "mongodb"

MongoDB への接続を指定します。接続オプションは、ソース接続とシンク接続とで異なります。

"connectionType": "mongodb" as Source

"connectionType": "mongodb" をソースとして、次の接続オプションを使用します。

  • "uri": (必須) 読み込み元の MongoDB ホスト (形式:mongodb://<host>:<port> )。

  • "database": (必須) 読み込み元の MongoDB データベース。このオプションは、ジョブスクリプトで glue_context.create_dynamic_frame_from_catalog を呼び出す際に、additional_options を介して渡すことも可能です。

  • "collection": (必須) 読み込み元の MongoDB コレクション。このオプションは、ジョブスクリプトで glue_context.create_dynamic_frame_from_catalog を呼び出す際に、additional_options を介して渡すことも可能です。

  • "username": (必須) MongoDB のユーザー名。

  • "password": (必須) MongoDB のパスワード。

  • "ssl": (オプション) true の場合、SSL 接続を開始します。デフォルト: false

  • "ssl.domain_match": (任意) truessltrue の場合 、ドメイン一致チェックが実行されます。デフォルト: true

  • "batchSize": (オプション): 内部バッチのカーソル内で使用される、バッチごとに返されるドキュメントの数。

  • "partitioner": (オプション): MongoDB から入力データを読み取るためのパーティショナーのクラス名。コネクタには、次のパーティショナーがあります。

    • MongoDefaultPartitioner (デフォルト)

    • MongoSamplePartitioner (MongoDB 3.2 以降が必要です)

    • MongoShardedPartitioner

    • MongoSplitVectorPartitioner

    • MongoPaginateByCountPartitioner

    • MongoPaginateBySizePartitioner

  • "partitionerOptions" (オプション): 指定されたパーティショナーのオプション。各パーティショナーでは、次のオプションがサポートされています。

    • MongoSamplePartitioner: partitionKey, partitionSizeMB, samplesPerPartition

    • MongoShardedPartitioner: shardkey

    • MongoSplitVectorPartitioner: partitionKey, partitionSizeMB

    • MongoPaginateByCountPartitioner: partitionKey, numberOfPartitions

    • MongoPaginateBySizePartitioner: partitionKey, partitionSizeMB

    これらのオプションの詳細については、MongoDB のドキュメントの「Partitioner Configuration」を参照してください。サンプルコードについては、「例: 接続タイプとオプションの設定」を参照してください。

"connectionType": "mongodb" as Sink

"connectionType": "mongodb" をシンクとして、次の接続オプションを使用します。

  • "uri": (必須) 書き込み先の MongoDB ホスト (形式:mongodb://<host>:<port> )。

  • "database": (必須) 書き込み先の MongoDB データベース。

  • "collection": (必須) 書き込み先の MongoDB コレクション。

  • "username": (必須) MongoDB のユーザー名。

  • "password": (必須) MongoDB のパスワード。

  • "ssl": (オプション) true の場合、SSL 接続を開始します。デフォルト: false

  • "ssl.domain_match": (任意) truessltrue の場合 、ドメイン一致チェックが実行されます。デフォルト: true

  • "extendedBsonTypes": (オプション) true が設定されている場合、MongoDB にデータを書き込む際に拡張 BSON 型を使用することを許可します。デフォルト: true

  • "replaceDocument": (オプション) true の場合、_id フィールドを含むデータセットを保存するときに、ドキュメント全体を置き換えます。false の場合、データセットのフィールドと一致するドキュメントのフィールドのみが更新されます。デフォルト: true

  • "maxBatchSize": (オプション): データを保存するときの一括オペレーションの最大バッチサイズ。デフォルトは 512 です。

サンプルコードについては、「例: 接続タイプとオプションの設定」を参照してください。

"connectionType": "orc"

Apache Hive Optimized Row Columnar (ORC) ファイル形式で、Amazon S3 に保存されるファイルへの接続を指定します。

"connectionType": "orc" では、次の接続オプションを使用します。

  • paths: (必須) 読み取りのソースとなる Amazon S3 パスのリスト。

  • (その他のオプション名/値ペア): 書式設定オプションなどのその他のオプションはすべて SparkSQL DataSource に直接渡されます。詳細については、「Redshift data source for Spark」を参照してください。

"connectionType": "parquet"

Apache Parquet ファイル形式で、Amazon S3 に保存されるファイルへの接続を指定します。

"connectionType": "parquet" では、次の接続オプションを使用します。

  • paths: (必須) 読み取りのソースとなる Amazon S3 パスのリスト。

  • (その他のオプション名/値ペア): 書式設定オプションなどのその他のオプションはすべて SparkSQL DataSource に直接渡されます。詳細については、GitHub ウェブサイトの「Amazon Redshift data source for Spark」を参照してください。

"connectionType": "s3"

Amazon S3 への接続を指定します。

"connectionType": "s3" では、次の接続オプションを使用します。

  • "paths": (必須) 読み取りのソースとなる Amazon S3 パスのリスト。

  • "exclusions": (オプション) 除外する Unix スタイルの glob パターンの JSON リストを含む文字列。たとえば、"[\"**.pdf\"]" はすべての PDF ファイルを除外します。AWS Glue でサポートされる glob 構文の詳細については、「Include and Exclude Patterns」を参照してください。

  • "compressionType": または「compression」: (オプション) データの圧縮方法を指定します。Amazon S3 ソース用には "compressionType" を、Amazon S3 ターゲット用には "compression" を使用します。データに標準のファイル拡張子が付いている場合、このオプションは一般的に不要です。指定できる値は "gzip" および "bzip" です。

  • "groupFiles": (オプション) 入力ファイルが 50,000 個を超える場合、デフォルトでファイルのグループ化が有効化されます。入力ファイルが 50,000 個未満の場合にグループ化を有効化するには、このパラメータに "inPartition" を設定します。入力ファイルが 50,000 個を超える場合に、グループ化を無効にするには、このパラメータを "none" に設定します。

  • "groupSize": (オプション) ターゲットグループのサイズ (バイト単位)。デフォルトは、入力データのサイズとクラスターのサイズに基づいて計算されます。入力ファイルが 50,000 個未満の場合、このオプションを有効にするには、"groupFiles""inPartition" に設定する必要があります。

  • "recurse": (オプション) true に設定した場合は、指定したパスの下にあるすべてのサブディレクトリ内のファイルを再帰的に読み取ります。

  • "maxBand": (オプション、詳細設定) このオプションでは、s3 リストの一貫性が認められるまでの期間をミリ秒単位で指定します。Amazon S3 の結果整合性を担保するために、直前の maxBand ミリ秒以内の変更タイムスタンプが付いたファイルが、特に JobBookmarks の使用時に追跡されます。ほとんどのユーザーはこのオプションを設定する必要はありません。デフォルトは 900000 ミリ秒 (15 分) です。

  • "maxFilesInBand": (オプション、詳細設定) このオプションは、直前の maxBand 秒間に保存するファイルの最大数を指定します。この数を超えた場合、余分なファイルはスキップされ、次のジョブ実行時にのみ処理されます。ほとんどのユーザーはこのオプションを設定する必要はありません。

  • "isFailFast": (オプション) このオプションにより、AWS Glue ETL ジョブがリーダー解析の例外をスローするかどうかを決定します。true に設定すると、Spark タスクが 4 回の再試行のうちにデータを正しく解析できなかった場合、ジョブは速やかに失敗します。

JDBC connectionType の値

これには次のものが含まれます。

  • "connectionType": "sqlserver": Microsoft SQL Server データベースへの接続を指定します。

  • "connectionType": "mysql": MySQL データベースへの接続を指定します。

  • "connectionType": "oracle": Oracle データベースへの接続を指定します。

  • "connectionType": "postgresql": PostgreSQL データベースへの接続を指定します。

  • "connectionType": "redshift": Amazon Redshift データベースへの接続を指定します。

次の表に、AWS Glue がサポートする JDBC ドライバーのバージョンを示します。

製品 JDBC ドライバーのバージョン
Microsoft SQL Server 6.x
MySQL 5.1
Oracle Database 11.2
PostgreSQL 42.x
Amazon Redshift 4.1

JDBC 接続では、次の接続オプションを使用します。

  • "url": (必須) データベースの JDBC URL。

  • "dbtable": 読み取り元のデータベーステーブル。データベース内でスキーマをサポートする JDBC データストアの場合、schema.table-name を指定します。スキーマを指定しない場合、デフォルトの「パブリック」スキーマが使用されます。

  • "redshiftTmpDir": (Amazon Redshift の場合は必須、他の JDBC タイプの場合はオプション) データベースからコピーする際の一時データをステージングできる Amazon S3 のパス。

  • "user": (必須) 接続時に使用するユーザー名。

  • "password": (必須) 接続時に使用するパスワード。

  • (オプション) 次のオプションを使用すると、カスタム JDBC ドライバーを指定できます。AWS Glue がネイティブでサポートしていないドライバーを使用する必要がある場合は、これらのオプションを使用します。ソースおよびターゲットが同じデータベース製品の場合でも、ETL ジョブは、データソースおよびターゲットに対して、異なるバージョンの JDBC ドライバーを使用することができます。これにより、異なるバージョンのソースデータベースとターゲットデータベース間でデータを移行できます。これらのオプションを使用するには、最初に JDBC ドライバーの jar ファイルを Amazon S3 にアップロードする必要があります。

    • "customJdbcDriverS3Path": カスタム JDBC ドライバーの Amazon S3 パス。

    • "customJdbcDriverClassName": JDBC ドライバーのクラス名。

書式設定オプションを含め、JDBC 接続用に接続オプションに含まれるその他のオプション名/値ペアはすべて、基になる SparkSQL DataSource に直接渡されます。詳細については、「Redshift data source for Spark」を参照してください。

次のコード例は、カスタム JDBC ドライバーを使用して JDBC データベースから読み書きする方法を示しています。データベース製品の 1 つのバージョンから読み取り、同じ製品の新しいバージョンに書き込んでいます。

Python
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext, SparkConf from awsglue.context import GlueContext from awsglue.job import Job import time from pyspark.sql.types import StructType, StructField, IntegerType, StringType sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session # Construct JDBC connection options connection_mysql5_options = { "url": "jdbc:mysql://<jdbc-host-name>:3306/db", "dbtable": "test", "user": "admin", "password": "pwd"} connection_mysql8_options = { "url": "jdbc:mysql://<jdbc-host-name>:3306/db", "dbtable": "test", "user": "admin", "password": "pwd", "customJdbcDriverS3Path": "s3://path/mysql-connector-java-8.0.17.jar", "customJdbcDriverClassName": "com.mysql.cj.jdbc.Driver"} connection_oracle11_options = { "url": "jdbc:oracle:thin:@//<jdbc-host-name>:1521/ORCL", "dbtable": "test", "user": "admin", "password": "pwd"} connection_oracle18_options = { "url": "jdbc:oracle:thin:@//<jdbc-host-name>:1521/ORCL", "dbtable": "test", "user": "admin", "password": "pwd", "customJdbcDriverS3Path": "s3://path/ojdbc10.jar", "customJdbcDriverClassName": "oracle.jdbc.OracleDriver"} # Read from JDBC databases with custom driver df_mysql8 = glueContext.create_dynamic_frame.from_options(connection_type="mysql", connection_options=connection_mysql8_options) # Read DynamicFrame from MySQL 5 and write to MySQL 8 df_mysql5 = glueContext.create_dynamic_frame.from_options(connection_type="mysql", connection_options=connection_mysql5_options) glueContext.write_from_options(frame_or_dfc=df_mysql5, connection_type="mysql", connection_options=connection_mysql8_options) # Read DynamicFrame from Oracle 11 and write to Oracle 18 df_oracle11 = glueContext.create_dynamic_frame.from_options(connection_type="oracle", connection_options=connection_oracle11_options) glueContext.write_from_options(frame_or_dfc=df_oracle11, connection_type="oracle", connection_options=connection_oracle18_options)
Scala
import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.MappingSpec import com.amazonaws.services.glue.errors.CallSite import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.DynamicFrame import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object GlueApp { val MYSQL_5_URI: String = "jdbc:mysql://<jdbc-host-name>:3306/db" val MYSQL_8_URI: String = "jdbc:mysql://<jdbc-host-name>:3306/db" val ORACLE_11_URI: String = "jdbc:oracle:thin:@//<jdbc-host-name>:1521/ORCL" val ORACLE_18_URI: String = "jdbc:oracle:thin:@//<jdbc-host-name>:1521/ORCL" // Construct JDBC connection options lazy val mysql5JsonOption = jsonOptions(MYSQL_5_URI) lazy val mysql8JsonOption = customJDBCDriverJsonOptions(MYSQL_8_URI, "s3://path/mysql-connector-java-8.0.17.jar", "com.mysql.cj.jdbc.Driver") lazy val oracle11JsonOption = jsonOptions(ORACLE_11_URI) lazy val oracle18JsonOption = customJDBCDriverJsonOptions(ORACLE_18_URI, "s3://path/ojdbc10.jar", "oracle.jdbc.OracleDriver") def main(sysArgs: Array[String]): Unit = { val spark: SparkContext = new SparkContext() val glueContext: GlueContext = new GlueContext(spark) val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) // Read from JDBC database with custom driver val df_mysql8: DynamicFrame = glueContext.getSource("mysql", mysql8JsonOption).getDynamicFrame() // Read DynamicFrame from MySQL 5 and write to MySQL 8 val df_mysql5: DynamicFrame = glueContext.getSource("mysql", mysql5JsonOption).getDynamicFrame() glueContext.getSink("mysql", mysql8JsonOption).writeDynamicFrame(df_mysql5) // Read DynamicFrame from Oracle 11 and write to Oracle 18 val df_oracle11: DynamicFrame = glueContext.getSource("oracle", oracle11JsonOption).getDynamicFrame() glueContext.getSink("oracle", oracle18JsonOption).writeDynamicFrame(df_oracle11) Job.commit() } private def jsonOptions(uri: String): JsonOptions = { new JsonOptions( s"""{"url": "${url}", |"dbtable":"test", |"user": "admin", |"password": "pwd"}""".stripMargin) } private def customJDBCDriverJsonOptions(uri: String, customJdbcDriverS3Path: String, customJdbcDriverClassName: String): JsonOptions = { new JsonOptions( s"""{"url": "${url}", |"dbtable":"test", |"user": "admin", |"password": "pwd", |"customJdbcDriverS3Path": "${customJdbcDriverS3Path}", |"customJdbcDriverClassName" : "${customJdbcDriverClassName}"}""".stripMargin) } }

カスタムと AWS Marketplace での、connectionType の値

これには次のものが含まれます。

  • "connectionType": "marketplace.athena": Amazon Athena データストアへの接続を指定します。この接続では、AWS Marketplace が提供するコネクタを使用します。

  • "connectionType": "marketplace.spark": Apache Spark データストアへの接続を指定します。この接続では、AWS Marketplace が提供するコネクタを使用します。

  • "connectionType": "marketplace.jdbc": JDBC データストアへの接続を指定します。この接続では、AWS Marketplace が提供するコネクタを使用します。

  • "connectionType": "custom.athena": Amazon Athena データストアへの接続を指定します。この接続では、AWS Glue Studio にアップロードしたカスタムコネクタを使用します。

  • "connectionType": "custom.spark": Apache Spark データストアへの接続を指定します。接続では、AWS Glue Studio にアップロードしたカスタムコネクタを使用します。

  • "connectionType": "custom.jdbc": JDBC データストアへの接続を指定します。接続では、AWS Glue Studio にアップロードしたカスタムコネクタを使用します。

custom.jdbc または marketplace.jdbc 型での接続オプション

  • className – (必須) ドライバクラス名を示す文字列。

  • connectionName – (必須) コネクタに関連付けられている接続の名前を示す文字列。

  • url – (必須) データソースへの接続を構築するために使用される、プレースホルダを含む JDBC URL (${}) を示す文字列。プレースホルダー ${secretKey} は、AWS Secrets Manager 内にある同じ名前のシークレットにより置き換えられます。URL の構築の詳細については、データストアのドキュメントを参照してください。

  • secretId または user/password – (必須) URL の認証情報を取得するために使用される文字列。

  • dbTableまたはquery – (必須) データを取得するテーブルまたは SQL クエリを示す文字列。dbTable または query を指定できます。両方を指定することはできません。

  • partitionColumn – (オプション) パーティション化に使用される整数カラムの名前を示す文字列。このオプションは、lowerBoundupperBound、および numPartitions に含まれている場合にのみ機能します。このオプションの機能は、Spark SQL JDBC リーダーのものと同様です。詳細については、Apache Spark SQL, DataFrames and Datasets Guide の「JDBC To Other Databases」を参照してください。

    lowerBound および upperBound 値は、パーティションのストライドを決定するために使用されます (テーブル内の行のフィルタリングには使用しません)。返されるテーブル内のすべての行は、パーティション化されています。

    注記

    テーブル名の代わりにクエリを使用する場合は、指定されたパーティショニング条件でクエリが動作することを確認する必要があります。例:

    • "SELECT col1 FROM table1" の形式のクエリでパーティション列を使用する場合、末尾に WHERE 句を追加してそのクエリをテストします。

    • クエリ形式が SELECT col1 FROM table1 WHERE col2=val" の場合は、WHERE 句を AND とパーティション列を使用する式で拡張することで、そのクエリをテストします。

  • lowerBound – パーティションストライドを決定するために使用される partitionColumn の最小値を示す整数 (オプション)。

  • upperBound – パーティションストライドを決定するために使用される partitionColumn の最大値を示す整数 (オプション)。

  • numPartitions – パーティション数を示す整数 (オプション)。この値は、(範囲に含まれる) lowerBound と (範囲に含まれない) upperBound とともに使用され、partitionColumn の分割で使用するために生成された WHERE 句の式のための、パーティションストライドを形成します。

    重要

    パーティションが多すぎると、外部データベースシステムで問題が発生する可能性があるため、パーティションの数には注意を払ってください。

  • filterPredicate – ソースからのデータをフィルタリングする、追加の条件句を示す文字列 (オプション)。例:

    BillingCity='Mountain View'

    table 名の代わりに query を使用した場合は、指定された filterPredicate でクエリが動作することを確認します。例:

    • クエリの形式が "SELECT col1 FROM table1" の場合は、フィルタ述語を使用するクエリの末尾に WHERE 句を追加して、そのクエリをテストします。

    • クエリ形式が "SELECT col1 FROM table1 WHERE col2=val" の場合は、WHERE 句を AND およびフィルター述語を使用する式で拡張して、そのクエリをテストします。

  • dataTypeMapping – (ディクショナリ、オプション) JDBC データ型 から Glue データ型に対するマッピングを構築する、カスタムのデータ型マッピング。例えば、"dataTypeMapping":{"FLOAT":"STRING"} オプションはドライバーの ResultSet.getString() メソッドを呼び出すことで、JDBC タイプの FLOAT データフィールドを Java String 型にマップし、それを使用して Glue レコード を構築します。ResultSet オブジェクトは各ドライバによって実装されるため、その動作は使用するドライバにより決定されます。ドライバによる変換の実行方法については、JDBC ドライバのドキュメントを参照してください。

  • AWS Glue で現在サポートされているデータ型は以下のとおりです。

    • DATE

    • STRING

    • TIMESTAMP

    • INT

    • FLOAT

    • LONG

    • BIGDECIMAL

    • BYTE

    • SHORT

    • DOUBLE

    JDBC データ型としては、Java8 java.sql.types がサポートされています。

    デフォルトの (JDBC から AWS Glue への) データ型マッピングは以下のとおりです。

    • DATE -> DATE

    • VARCHAR -> STRING

    • CHAR -> STRING

    • LONGNVARCHAR -> STRING

    • TIMESTAMP -> TIMESTAMP

    • INTEGER -> INT

    • FLOAT -> FLOAT

    • REAL -> FLOAT

    • BIT -> BOOLEAN

    • BOOLEAN -> BOOLEAN

    • BIGINT -> LONG

    • DECIMAL -> BIGDECIMAL

    • NUMERIC -> BIGDECIMAL

    • TINYINT -> SHORT

    • SMALLINT -> SHORT

    • DOUBLE -> DOUBLE

    カスタムのデータタイプマッピングで dataTypeMapping オプションを使用すると、デフォルトのデータ型マッピングをオーバーライドできます。この影響を受けるのは、dataTypeMapping オプションでリストされた JDBC データ型のみです。他のすべての JDBC データ型に対しては、デフォルトのマッピングが使用されます。必要に応じて、別の JDBC データ型のマッピングを追加することも可能です。デフォルトまたはカスタムのマッピングのいずれにも JDBC データ型が含まれていない場合、データ型はデフォルトで AWSGlueSTRING データ型に変換されます。

次の Python コード例は、AWS Marketplace のJDBC ドライバーを使用して、JDBC データベースからの読み取りを実行する方法を示しています。ここでは、データベースからの読み取りと、S3 ロケーションへの書き込みの方法を知ることができます。

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [connection_type = "marketplace.jdbc", connection_options = {"dataTypeMapping":{"INTEGER":"STRING"},"upperBound":"200","query":"select id, name, department from department where id < 200","numPartitions":"4", "partitionColumn":"id","lowerBound":"0","connectionName":"test-connection-jdbc"}, transformation_ctx = "DataSource0"] ## @return: DataSource0 ## @inputs: [] DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.jdbc", connection_options = {"dataTypeMapping":{"INTEGER":"STRING"}, "upperBound":"200","query":"select id, name, department from department where id < 200","numPartitions":"4","partitionColumn":"id","lowerBound":"0", "connectionName":"test-connection-jdbc"}, transformation_ctx = "DataSource0") ## @type: ApplyMapping ## @args: [mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0"] ## @return: Transform0 ## @inputs: [frame = DataSource0] Transform0 = ApplyMapping.apply(frame = DataSource0, mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0") ## @type: DataSink ## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0"] ## @return: DataSink0 ## @inputs: [frame = Transform0] DataSink0 = glueContext.write_dynamic_frame.from_options(frame = Transform0, connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0") job.commit()

custom.athena または marketplace.athena タイプ用の接続オプション

  • className – (必須) ドライバクラス名を示す文字列。Athena-CloudWatch コネクタを使用している場合、このパラメータ値はクラス名にプレフィックスされます (例: "com.amazonaws.athena.connectors")。Athena-CloudWatch コネクタは、メタデータハンドラーとレコードハンドラーの 2 つのクラスで構成されています。共通のプレフィックスを指定することで、API がそのプレフィックスに基づいた適切なクラスをロードします。

  • tableName – (必須) 読み込む CloudWatch ログストリームの名前を示す文字列。このコードスニペットでは、ビューに特別な名前 all_log_streams を使用しています。この場合、返された動的データフレームには、ロググループ内のすべてのログストリームからのデータが含まれます。

  • schemaName – (必須) 読み取りのソースとなる CloudWatch ロググループの名前を示す文字列。例えば、 。/aws-glue/jobs/output

  • connectionName – (必須) コネクタに関連付けられている接続の名前を示す文字列。

このコネクタの追加オプションについては、GitHub の Amazon Athena CloudWatch Connector README ファイルを参照してください。

次の Python コード例は、AWS Marketplace コネクタを使用しながら、Athena データストアからの読み取りを実行する方法を示しています。こここでは、Athena からの読み取りと、S3 ロケーションへの書き込みを行う方法を知ることができます。

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [connection_type = "marketplace.athena", connection_options = {"tableName":"all_log_streams","schemaName":"/aws-glue/jobs/output", "connectionName":"test-connection-athena"}, transformation_ctx = "DataSource0"] ## @return: DataSource0 ## @inputs: [] DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.athena", connection_options = {"tableName":"all_log_streams",, "schemaName":"/aws-glue/jobs/output","connectionName": "test-connection-athena"}, transformation_ctx = "DataSource0") ## @type: ApplyMapping ## @args: [mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0"] ## @return: Transform0 ## @inputs: [frame = DataSource0] Transform0 = ApplyMapping.apply(frame = DataSource0, mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0") ## @type: DataSink ## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0"] ## @return: DataSink0 ## @inputs: [frame = Transform0] DataSink0 = glueContext.write_dynamic_frame.from_options(frame = Transform0, connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0") job.commit()

custom.spark または marketplace.spark タイプ用の接続オプション

  • className – (必須) コネクタのクラス名を支援す文字列。

  • secretId – (オプション) コネクタ接続の認証情報を取得するために使用される文字列。

  • connectionName – (必須) コネクタに関連付けられている接続の名前を示す文字列。

  • その他のオプションは、データストアによって異なります。例えば、Elasticsearch for Apache Hadoop ドキュメントの説明にあるように、OpenSearch の設定オプションは「es」でプレフィックスされます。Spark から Snowflake への接続では、Connecting to Snowflake ガイドの「Using the Spark Connector」で説明されているように、sfUser および sfPassword のオプションを使用します。

次の Python コード例に、marketplace.spark 接続を使用して、OpenSearch のデータストアからの読み取りを実行する方法を示します。

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [connection_type = "marketplace.spark", connection_options = {"path":"test", "es.nodes.wan.only":"true","es.nodes":"https://<AWS endpoint>", "connectionName":"test-spark-es","es.port":"443"}, transformation_ctx = "DataSource0"] ## @return: DataSource0 ## @inputs: [] DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.spark", connection_options = {"path":"test","es.nodes.wan.only": "true","es.nodes":"https://<AWS endpoint>","connectionName": "test-spark-es","es.port":"443"}, transformation_ctx = "DataSource0") ## @type: DataSink ## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0"] ## @return: DataSink0 ## @inputs: [frame = DataSource0] DataSink0 = glueContext.write_dynamic_frame.from_options(frame = DataSource0, connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0") job.commit()