AWS Glue での ETL の接続タイプとオプション - AWS Glue

AWS Glue での ETL の接続タイプとオプション

AWS Glue では、PySpark と Scala のさまざまなメソッドと変換で connectionType パラメータを使用して接続タイプを指定します。connectionOptions または options パラメータを使用して接続オプションを指定します。

connectionType パラメータには、次の表に示す値を指定できます。各タイプの関連する connectionOptions (または options) パラメータ値については、以下のセクションで説明します。特に明記されていない限り、パラメータは、接続がソースまたはシンクとして使用されるときに適用されます。

接続オプションの設定と使用方法を示すサンプルコードについては、「例: 接続タイプとオプションの設定」を参照してください。

connectionType 接続先
custom.* Spark、Athena、または JDBC データストア (「カスタムと AWS Marketplace での、connectionType の値」 を参照
documentdb Amazon DocumentDB (MongoDB 互換) データベース
dynamodb Amazon DynamoDB データベース
kafka Kafka または Amazon Managed Streaming for Apache Kafka
kinesis Amazon Kinesis Data Streams
marketplace.* Spark、Athena、または JDBC データストア (「カスタムと AWS Marketplace での、connectionType の値」を参照)
mongodb MongoDB データベース
mysql MySQL データベース (「JDBC connectionType の値」を参照)
oracle Oracle データベース (「JDBC connectionType の値」を参照)
orc Apache Hive Optimized Row Columnar (ORC) ファイル形式で Amazon Simple Storage Service (Amazon S3) に保存されたファイル
parquet Apache Parquet ファイル形式で Amazon S3 に保存されたファイル
postgresql PostgreSQL データベース (「JDBC connectionType の値」を参照)
Redshift Amazon Redshift データベース (「JDBC connectionType の値」を参照)
s3 Simple Storage Service (Amazon S3)
sqlserver Microsoft SQL Server データベース (「JDBC connectionType の値」を参照)

"connectionType": "Documentdb"

Amazon DocumentDB (MongoDB 互換) への接続を指定します。

接続オプションは、ソース接続とシンク接続とで異なります。

"connectionType": "Documentdb" ソースとする

"connectionType": "documentdb" をソースとして、次の接続オプションを使用します。

  • "uri": (必須) 読み取り元の Amazon DocumentDB ホスト (mongodb://<host>:<port> 形式)。

  • "database": (必須) 読み取り元の Amazon DocumentDB データベース。

  • "collection": (必須) 読み取り元の Amazon DocumentDB コレクション。

  • "username": (必須) Amazon DocumentDB のユーザー名。

  • "password": (必須) Amazon DocumentDB のパスワード。

  • "ssl": (SSL を使用する場合は必須) SSL を使用して接続する場合は、このオプションの値を "true" に設定する必要があります。

  • "ssl.domain_match": (SSL を使用する場合は必須) SSL を使用して接続する場合は、このオプションの値を "false" に設定する必要があります。

  • "batchSize": (オプション): 内部バッチのカーソル内で使用される、バッチごとに返されるドキュメントの数。

  • "partitioner": (オプション) Amazon DocumentDB から入力データを読み取るためのパーティショナーのクラス名。コネクタには、次のパーティショナーがあります。

    • MongoDefaultPartitioner (デフォルト)

    • MongoSamplePartitioner

    • MongoShardedPartitioner

    • MongoSplitVectorPartitioner

    • MongoPaginateByCountPartitioner

    • MongoPaginateBySizePartitioner

  • "partitionerOptions" (オプション): 指定されたパーティショナーのオプション。各パーティショナーでは、次のオプションがサポートされています。

    • MongoSamplePartitioner: partitionKey, partitionSizeMB, samplesPerPartition

    • MongoShardedPartitioner: shardkey

    • MongoSplitVectorPartitioner: partitionKey、partitionSizeMB

    • MongoPaginateByCountPartitioner: partitionKey, numberOfPartitions

    • MongoPaginateBySizePartitioner: partitionKey、partitionSizeMB

    これらのオプションの詳細については、MongoDB のドキュメントの「Partitioner Configuration」を参照してください。サンプルコードについては、「例: 接続タイプとオプションの設定」を参照してください。

"connectionType": "documentdb" as Sink

"connectionType": "documentdb" をシンクとして、次の接続オプションを使用します。

  • "uri": (必須) 書き込み先の Amazon DocumentDB ホスト (mongodb://<host>:<port> 形式)。

  • "database": (必須) 書き込み先の Amazon DocumentDB データベース。

  • "collection": (必須) 書き込み先の Amazon DocumentDB コレクション。

  • "username": (必須) Amazon DocumentDB のユーザー名。

  • "password": (必須) Amazon DocumentDB のパスワード。

  • "extendedBsonTypes": (オプション) true が指定されている場合、Amazon DocumentDB へのデータ書き込み時に拡張 BSON 型を使用できます。デフォルト: true

  • "replaceDocument": (オプション) true の場合、_id フィールドを含むデータセットを保存するときに、ドキュメント全体を置き換えます。false の場合、データセットのフィールドと一致するドキュメントのフィールドのみが更新されます。デフォルト: true

  • "maxBatchSize": (オプション): データを保存するときの一括オペレーションの最大バッチサイズ。デフォルトは 512 です。

サンプルコードについては、「例: 接続タイプとオプションの設定」を参照してください。

"connectionType": "dynamodb"

Amazon DynamoDB への接続を指定します。

接続オプションは、ソース接続とシンク接続とで異なります。

"connectionType": ソースとして ETL コネクタを使用する "dynamodb"

AWS Glue DynamoDB ETL コネクタを使用している場合は、"connectionType": "dynamodb" をソースとして次の接続オプションを使用します。

  • "dynamodb.input.tableName": (必須) 読み取り元の DynamoDB テーブル。

  • "dynamodb.throughput.read.percent": (オプション) 使用する読み込みキャパシティーユニット (RCU) の割合。デフォルトでは、"0.5" に設定されます。許容値は "0.1" から "1.5" (これらの値を含む) です。

    • 0.5 ではデフォルトの読み込み速度を表し、AWS Glue はテーブルの読み込み容量の半分を消費しようとすることを意味します。上記の値を 0.5 より上に設定すると、AWS Glue は読み取りのリクエストレートを増加させ、0.5 より低くした場合はそのリクエストレートを減少させます。(実際の読み込みレートは、DynamoDB テーブルに、統一ディストリビューションのキーがあるかどうかなどの要因によって変わります。)

    • DynamoDB テーブルがオンデマンドモードの場合、AWS Glue はテーブルの読み取り容量を 40000 として処理します。大きなテーブルをエクスポートする場合は、DynamoDB テーブルをオンデマンドモードに切り替えることをお勧めします。

  • "dynamodb.splits": (オプション) 読み取り中にこの DynamoDB テーブルを分割するパーティションの数を定義します。デフォルトでは、"1" に設定されます。許容値は "1" から "1,000,000" (これらの値を含む) です。

    • 1 は並列処理がないことを表します。より良いパフォーマンスを得るためには、より大きな値を (以下の式を使用して) 指定することを強くお勧めします。

    • numSlots は、次の式を使用して計算し、dynamodb.splits として使用することをお勧めします。さらにパフォーマンスを向上させる必要がある場合は、DPU 数を増加してジョブをスケールアウトすることをお勧めします。

      • numExecutors =

        • WorkerTypeStandard の場合は (DPU - 1) * 2 - 1

        • WorkerTypeG.1X または G.2X の場合は (NumberOfWorkers - 1)

      • numSlotsPerExecutor =

        • WorkerTypeStandard の場合は 4

        • WorkerTypeG.1X の場合は 8

        • WorkerTypeG.2X の場合は 16

      • numSlots = numSlotsPerExecutor * numExecutors

  • "dynamodb.sts.roleArn": (オプション) クロスアカウントアクセスのために引き受ける IAM ロール の ARN。このパラメータは、AWS Glue 1.0 以降で使用可能です。

  • "dynamodb.sts.roleSessionName": (任意) STS セッション名。デフォルトでは、「glue-dynamodb-read-sts-session」に設定されています。このパラメータは、AWS Glue 1.0 以降で使用可能です。

次のコード例では、DynamoDB テーブルからの読み取り (ETL コネクタ経由) と、DynamoDB のテーブルへの書き込み方法を示します。ここでは、あるテーブルから読み取りを行い、別のテーブルに対して書き込みを行っています。

Python
import sys from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job from awsglue.utils import getResolvedOptions args = getResolvedOptions(sys.argv, ["JOB_NAME"]) glue_context= GlueContext(SparkContext.getOrCreate()) job = Job(glue_context) job.init(args["JOB_NAME"], args) dyf = glue_context.create_dynamic_frame.from_options( connection_type="dynamodb", connection_options={"dynamodb.input.tableName": "test_source", "dynamodb.throughput.read.percent": "1.0", "dynamodb.splits": "100" } ) print(dyf.getNumPartitions()) glue_context.write_dynamic_frame_from_options( frame=dyf, connection_type="dynamodb", connection_options={"dynamodb.output.tableName": "test_sink", "dynamodb.throughput.write.percent": "1.0" } ) job.commit()
Scala
import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.DynamoDbDataSink import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object GlueApp { def main(sysArgs: Array[String]): Unit = { val glueContext = new GlueContext(SparkContext.getOrCreate()) val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) val dynamicFrame = glueContext.getSourceWithFormat( connectionType = "dynamodb", options = JsonOptions(Map( "dynamodb.input.tableName" -> "test_source", "dynamodb.throughput.read.percent" -> "1.0", "dynamodb.splits" -> "100" )) ).getDynamicFrame() print(dynamicFrame.getNumPartitions()) val dynamoDbSink: DynamoDbDataSink = glueContext.getSinkWithFormat( connectionType = "dynamodb", options = JsonOptions(Map( "dynamodb.output.tableName" -> "test_sink", "dynamodb.throughput.write.percent" -> "1.0" )) ).asInstanceOf[DynamoDbDataSink] dynamoDbSink.writeDynamicFrame(dynamicFrame) Job.commit() } }
注記

AWS Glue は、別のAWS アカウントの DynamoDB テーブルからの、データ読み取りをサポートしています。詳細については、「DynamoDB テーブルへのクロスアカウントおよびクロスリージョンでのアクセス」を参照してください。

注記

DynamoDB ETL リーダーでは、フィルタまたはプッシュダウン述語は使用できません。

"connectionType":AWS Glue DynamoDB エクスポートコネクタをソースとする "dynamodb"

AWS Glue DynamoDB ETL コネクターに加えて、AWS Glue は DynamoDB エクスポートコネクターを提供します。このコネクターは、DynamoDB ExportTableToPointInTime リクエストを呼び出し、これを指定した Amazon S3 の場所に DynamoDB JSON 形式で保存します。次に、AWS Glue は、Amazon S3 のエクスポート場所からデータを読み取ることによって DynamicFrame オブジェクトを作成します。

DynamoDB テーブルサイズが 80 GB を超える場合、エクスポートコネクタは ETL コネクタよりもパフォーマンスが向上します。さらに、エクスポートリクエストが AWS Glue ジョブで Spark プロセスの外部で実行される場合、AWS Glue ジョブの自動スケーリングを有効にして、エクスポートリクエスト中の DPU 使用量を節約できます。エクスポートコネクタでは、Spark エグゼキューターの並列処理のためのスプリット数や、DynamoDB スループットの読み取り率を設定する必要がありません。

AWS Glue バージョン 2.0 以降でのみ使用可能な AWS Glue DynamoDB エクスポートコネクタを使用する場合は、ソースとして "connectionType": "dynamodb" で次の接続オプションを使用します。

  • "dynamodb.export": (必須) 文字列値:

    • ddb に設定すると、AWS Glue DynamoDB エクスポートコネクタが有効になり、AWS Glue ジョブ中に新しい ExportTableToPointInTimeRequest が呼び出されます。dynamodb.s3.bucketdynamodb.s3.prefix から渡された場所で新しいエクスポートが生成されます。

    • s3 に設定すると、AWS Glue DynamoDB エクスポートコネクタが有効になりますが、新しい DynamoDB エクスポートの作成はスキップされ、代わりに dynamodb.s3.bucketdynamodb.s3.prefix がそのテーブルの過去のエクスポートの Amazon S3 ロケーションとして使用されます。

  • "dynamodb.tableArn": (必須) 読み取り元の DynamoDB テーブル。

  • "dynamodb.unnestDDBJson": (オプション) ブール値を取得します。true に設定すると、エクスポートに存在する DynamoDB JSON 構造体のネスト解除の変換が実行されます。デフォルト値は false に設定されています。

  • "dynamodb.s3.bucket": (オプション) DynamoDB ExportTableToPointInTime プロセスが実行される Simple Storage Service (Amazon S3) バケットの場所を示します。エクスポートファイル形式は DynamoDB JSON です。

    • "dynamodb.s3.prefix": (オプション) DynamoDB ExportTableToPointInTime の読み込みが保存される Simple Storage Service (Amazon S3) バケット内の Amazon S3 プレフィックスの場所を示します。dynamodb.s3.prefixdynamodb.s3.bucket のどちらも指定しなかった場合、これらの値は AWS Glue ジョブの設定で指定された一時ディレクトリの場所がデフォルトとなります。詳細については、「AWS Glue で使用される特別なパラメータ」を参照してください。

    • "dynamodb.s3.bucketOwner": クロスアカウント Simple Storage Service (Amazon S3) アクセスのために必要なバケット所有者を示します。

  • "dynamodb.sts.roleArn": (オプション) DynamoDB テーブルのクロスアカウントアクセスおよび/またはクロスリージョンアクセスで想定される IAM ロール のARN 。注: 同じ IAM ロール のARN を使用して、ExportTableToPointInTime リクエストに指定された Simple Storage Service (Amazon S3) の場所にアクセスします。

  • "dynamodb.sts.roleSessionName": (任意) STS セッション名。デフォルトでは、「glue-dynamodb-read-sts-session」に設定されています。

注記

DynamoDBには、ExportTableToPointInTime リクエストを呼び出すための特定の要件があります。詳細については、「DynamoDB でテーブルのエクスポートをリクエストする」を参照してください。例えば、このコネクタを使用するには、テーブルでポイントインタイムリストア (PITR) を有効にする必要があります。DynamoDB コネクタは、Amazon S3 への DynamoDB エクスポートの KMS 暗号化もサポートします。AWS Glue ジョブの設定でセキュリティ設定を指定することで、DynamoDB エクスポートの KMS 暗号化が有効になります。KMS キーは、Amazon S3 バケットと同じリージョンにある必要があります。

DynamoDB エクスポートと Amazon S3 ストレージのコストに追加料金が適用されることに注意してください。Amazon S3 でエクスポートされたデータは、ジョブ実行の終了後も保持されるため、DynamoDB を追加でエクスポートしなくても再利用できます。このコネクタを使用するための要件として、テーブルに対してポイントインタイムリカバリ(PITR)が有効になっていることが必要です。

DynamoDB ETL コネクタまたはエクスポートコネクタは、DynamoDB ソースに適用されるフィルタまたはプッシュダウン述語をサポートしていません。

次のコード例は、(エクスポートコネクタを経由して)読み取り、パーティションの数を出力する方法を示しています。

Python
import sys from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job from awsglue.utils import getResolvedOptions args = getResolvedOptions(sys.argv, ["JOB_NAME"]) glue_context= GlueContext(SparkContext.getOrCreate()) job = Job(glue_context) job.init(args["JOB_NAME"], args) dyf = glue_context.create_dynamic_frame.from_options( connection_type="dynamodb", connection_options={ "dynamodb.export": "ddb", "dynamodb.tableArn": "<test_source>", "dynamodb.s3.bucket": "<bucket name>", "dynamodb.s3.prefix": "<bucket prefix>", "dynamodb.s3.bucketOwner": "<account_id>", } ) print(dyf.getNumPartitions()) job.commit()
Scala
import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.DynamoDbDataSink import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object GlueApp { def main(sysArgs: Array[String]): Unit = { val glueContext = new GlueContext(SparkContext.getOrCreate()) val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) val dynamicFrame = glueContext.getSourceWithFormat( connectionType = "dynamodb", options = JsonOptions(Map( "dynamodb.export" -> "ddb", "dynamodb.tableArn" -> "<test_source>", "dynamodb.s3.bucket" -> "<bucket name>", "dynamodb.s3.prefix" -> "<bucket prefix>", "dynamodb.s3.bucketOwner" -> "<account_id of bucket>", )) ).getDynamicFrame() print(dynamicFrame.getNumPartitions()) Job.commit() } }

これらの例は、(エクスポートコネクタを介して)読み取りを行い、dynamodb 分類を持つ AWS Glue Data Catalog テーブルからパーティションの数を出力する方法を示しています。      

Python
import sys from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job from awsglue.utils import getResolvedOptions args = getResolvedOptions(sys.argv, ["JOB_NAME"]) glue_context= GlueContext(SparkContext.getOrCreate()) job = Job(glue_context) job.init(args["JOB_NAME"], args) dynamicFrame = glue_context.create_dynamic_frame.from_catalog( database="<catalog_database>", table_name="<catalog_table_name", additional_options={ "dynamodb.export": "ddb", "dynamodb.s3.bucket": "<s3_bucket>", "dynamodb.s3.prefix": "<s3_bucket_prefix>" } ) print(dynamicFrame.getNumPartitions()) job.commit()
Scala
import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.DynamoDbDataSink import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object GlueApp { def main(sysArgs: Array[String]): Unit = { val glueContext = new GlueContext(SparkContext.getOrCreate()) val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) val dynamicFrame = glueContext.getCatalogSource( database = "<catalog_database>", tableName = "<catalog_table_name", additionalOptions = JsonOptions(Map( "dynamodb.export" -> "ddb", "dynamodb.s3.bucket" -> "<s3_bucket>", "dynamodb.s3.prefix" -> "<s3_bucket_prefix>" )) ).getDynamicFrame() print(dynamicFrame.getNumPartitions()) )

DynamoDB JSON 構造体のトラバース

AWS Glue DynamoDB エクスポートコネクタを使用した DynamoDB エクスポートでは、特殊なネスト構造の JSON ファイルが生成される場合があります。  詳細については、データオブジェクトを参照してください。 AWS Glue は DynamicFrame 変換を提供します。これを使用して、このような構造をダウンストリームアプリケーションで使いやすい形式に変換できます。

トランスフォームは、2 つの方法のいずれかで呼び出すことができます。最初の方法は、AWS Glue DynamoDB エクスポートコネクターで渡されるブールフラグです。2 つ目は、変換関数自体を呼び出すことです。

次のコード例は、AWS Glue DynamoDB エクスポートコネクタを使用し、ネスト解除の変換を呼び出し、パーティションの数を出力する方法を示しています。

Python
import sys from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job from awsglue.utils import getResolvedOptions args = getResolvedOptions(sys.argv, ["JOB_NAME"]) glue_context= GlueContext(SparkContext.getOrCreate()) job = Job(glue_context) job.init(args["JOB_NAME"], args) dyf = glue_context.create_dynamic_frame.from_options( connection_type="dynamodb", connection_options={ "dynamodb.export": "ddb", "dynamodb.tableArn": "<test_source>", "dynamodb.unnestDDBJson": True, "dynamodb.s3.bucket": "<bucket name>", "dynamodb.s3.prefix": "<bucket prefix>", "dynamodb.s3.bucketOwner": "<account_id>", } ) print(dyf.getNumPartitions()) job.commit()
Scala
import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.DynamoDbDataSink import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object GlueApp { def main(sysArgs: Array[String]): Unit = { val glueContext = new GlueContext(SparkContext.getOrCreate()) val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) val dynamicFrame = glueContext.getSourceWithFormat( connectionType = "dynamodb", options = JsonOptions(Map( "dynamodb.export" -> "ddb", "dynamodb.tableArn" -> "<test_source>", "dynamodb.unnestDDBJson" -> true "dynamodb.s3.bucket" -> "<bucket name>", "dynamodb.s3.prefix" -> "<bucket prefix>", "dynamodb.s3.bucketOwner" -> "<account_id of bucket>", )) ).getDynamicFrame() print(dynamicFrame.getNumPartitions()) Job.commit() } }

他の変換の呼び出しは、別の DynamicFrame 関数呼び出しによるものです。詳細については、Pythonの場合はDynamicFrame クラス、Scala の場合は 「AWS Glue Scala DynamicFrame クラス」を参照してください。 

"connectionType": ETLコネクタをシンクとして使用する "dynamodb"

"connectionType": "dynamodb" をシンクとして、次の接続オプションを使用します。

  • "dynamodb.output.tableName": (必須) 書き込み先の DynamoDB テーブル。

  • "dynamodb.throughput.write.percent": (オプション) 使用する書き込みキャパシティーユニット (WCU) の割合。デフォルトでは、"0.5" に設定されます。許容値は "0.1" から "1.5" (これらの値を含む) です。

    • 0.5 ではデフォルトの読み込み速度を表し、AWS Glue はテーブルの書き込み容量の半分を消費しようとすることを意味します。上記の値を 0.5 より上に設定すると、AWS Glue は書き込みリクエストレートを増加させ、0.5 より低くした場合はそのリクエストレートを減少させます。(実際の書き込みレートは、統一されたキーのディストリビューションが、DynamoDB テーブルにあるかどうかなどの要因によって変わります)。

    • DynamoDB テーブルがオンデマンドモードの場合、AWS Glue はテーブルの書き込み容量を 40000 として処理します。大きなテーブルをインポートする場合は、DynamoDB テーブルをオンデマンドモードに切り替えることをお勧めします。

  • "dynamodb.output.numParallelTasks": (オプション) 同時に DynamoDB に書き込める並列タスクの数を定義します。Spark タスクごとに許容される WCU を計算するために使用されます。これらの詳細設定をコントロールしない場合は、このパラメータを指定する必要はありません。

    • permissiveWcuPerTask = TableWCU * dynamodb.throughput.write.percent / dynamodb.output.numParallelTasks

    • このパラメータを指定しない場合、Spark タスクごとに許可される WCU は、次の式により自動的に計算されます。

      • numPartitions = dynamicframe.getNumPartitions()

      • numExecutors =

        • WorkerTypeStandard の場合は (DPU - 1) * 2 - 1

        • WorkerTypeG.1X または G.2X の場合は (NumberOfWorkers - 1)

      • numSlotsPerExecutor =

        • WorkerTypeStandard の場合は 4

        • WorkerTypeG.1X の場合は 8

        • WorkerTypeG.2X の場合は 16

      • numSlots = numSlotsPerExecutor * numExecutors

      • numParallelTasks = min(numPartitions, numSlots)

    • 例 1. DPU=10、WorkerType=Standard。入力 DynamicFrame には、100 個の RDD パーティションがあります。

      • numPartitions = 100

      • numExecutors = (10 - 1) * 2 - 1 = 17

      • numSlots = 4 * 17 = 68

      • numParallelTasks = min(100, 68) = 68

    • 例 2. DPU=10、WorkerType=Standard。入力 DynamicFrame には、20 個の RDD パーティションがあります。

      • numPartitions = 20

      • numExecutors = (10 - 1) * 2 - 1 = 17

      • numSlots = 4 * 17 = 68

      • numParallelTasks = min(20, 68) = 20

  • "dynamodb.output.retry": (オプション) DynamoDB から ProvisionedThroughputExceededException が送られている場合の、再試行の実行回数を定義します。デフォルトでは、"10" に設定されています。

  • "dynamodb.sts.roleArn": (オプション) クロスアカウントアクセスのために引き受ける IAM ロール の ARN。

  • "dynamodb.sts.roleSessionName": (任意) STS セッション名。デフォルトでは、「glue-dynamodb-write-sts-session」に設定されています。

注記

DynamoDB のライターは、AWS Glue バージョン 1.0 以降でサポートされます。

注記

AWS Glue は、別の AWS アカウントの DynamoDB テーブルに対するデータの書き込みをサポートしています。詳細については、「DynamoDB テーブルへのクロスアカウントおよびクロスリージョンでのアクセス」を参照してください。

DynamoDB テーブルとの間で、読み取りおよび書き込みを行う方法を、次のコード例により示します。ここでは、あるテーブルから読み取りを行い、別のテーブルに対して書き込みを行っています。

Python
import sys from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job from awsglue.utils import getResolvedOptions args = getResolvedOptions(sys.argv, ["JOB_NAME"]) glue_context= GlueContext(SparkContext.getOrCreate()) job = Job(glue_context) job.init(args["JOB_NAME"], args) dyf = glue_context.create_dynamic_frame.from_options( connection_type="dynamodb", connection_options={ "dynamodb.input.tableName": "test_source", "dynamodb.throughput.read.percent": "1.0", "dynamodb.splits": "100" } ) print(dyf.getNumPartitions()) glue_context.write_dynamic_frame_from_options( frame=dyf, connection_type="dynamodb", connection_options={ "dynamodb.output.tableName": "test_sink", "dynamodb.throughput.write.percent": "1.0" } ) job.commit()
Scala
import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.DynamoDbDataSink import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object GlueApp { def main(sysArgs: Array[String]): Unit = { val glueContext = new GlueContext(SparkContext.getOrCreate()) val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) val dynamicFrame = glueContext.getSourceWithFormat( connectionType = "dynamodb", options = JsonOptions(Map( "dynamodb.input.tableName" -> "test_source", "dynamodb.throughput.read.percent" -> "1.0", "dynamodb.splits" -> "100" )) ).getDynamicFrame() print(dynamicFrame.getNumPartitions()) val dynamoDbSink: DynamoDbDataSink = glueContext.getSinkWithFormat( connectionType = "dynamodb", options = JsonOptions(Map( "dynamodb.output.tableName" -> "test_sink", "dynamodb.throughput.write.percent" -> "1.0" )) ).asInstanceOf[DynamoDbDataSink] dynamoDbSink.writeDynamicFrame(dynamicFrame) Job.commit() } }

"connectionType": "kafka"

Kafka クラスターまたは Amazon Managed Streaming for Apache Kafka への接続を指定します。

GlueContext オブジェクトにある以下のメソッドを使用することで、Kafka ストリーミングソースからのレコードを利用できます。

  • getCatalogSource

  • getSource

  • getSourceWithFormat

  • createDataFrameFromOptions

getCatalogSource を使用する場合、ジョブには Data Catalog のデータベースとテーブル名の情報が提供されます。それらを使用することで、Apache Kafka ストリームからの読み取りを実行するための、基本的なパラメータを取得できます。getSourcegetSourceWithFormat、あるいは createDataFrameFromOptions を使用する場合は、これらのパラメータを明示的に指定する必要があります。

getSource あるいは createDataFrameFromOptions では connectionOptions を、getSourceWithFormat では options を、getCatalogSource では additionalOptions を使用することで、これらのオプションを指定できます。

"connectionType": "kafka" では、次の接続オプションを使用します。

  • bootstrap.servers (必須) ブートストラップサーバーの URL のリスト (例: b-1.vpc-test-2.o4q88o.c6.kafka.us-east-1.amazonaws.com:9094)。このオプションは API 呼び出しで指定するか、 Data Catalog 内のテーブルメタデータで定義する必要があります。

  • security.protocol (必須) ブローカーと通信するために使用されるプロトコル。使用できる値は、"SSL" または "PLAINTEXT" です。

  • topicName (必須) サブスクライブするトピックのカンマ区切りリスト。"topicName""assign"、または "subscribePattern" の中から、いずれか 1 つのみを指定する必要があります。

  • "assign": (必須) 消費する特定の TopicPartitions を指定する JSON 文字列。"topicName""assign"、または "subscribePattern" の中から、いずれか 1 つのみを指定する必要があります。

    例: '{"topicA":[0,1],"topicB":[2,4]}'

  • "subscribePattern": (必須) サブスクライブする先のトピックリストを識別する Java の正規表現文字列。"topicName""assign"、または "subscribePattern" の中から、いずれか 1 つのみを指定する必要があります。

    例: 'topic.*'

  • classification (オプション)

  • delimiter (オプション)

  • "startingOffsets": (オプション) Kafka トピック内で、データの読み取りを開始する位置 使用できる値は、"earliest" または "latest" です。デフォルト値は "latest" です。

  • "endingOffsets": (オプション) バッチクエリの終了位置。設定が可能な値は、"latest" または、各 TopicPartition の終了オフセットを指定する JSON 文字列のいずれかです。

    JSON 文字列の場合、{"topicA":{"0":23,"1":-1},"topicB":{"0":-1}} の形式を使用します。オフセットとして値 -1 を設定する場合、"latest" の意味になります。

  • "pollTimeoutMs": (オプション) Spark ジョブエグゼキュータで、Kafka からデータをポーリングする際のタイムアウト値 (ミリ秒単位)。デフォルト値は 512 です。

  • "numRetries": (オプション) Kafka オフセットのフェッチが失敗したと判断される前の再試行回数。デフォルト値は 3 です。

  • "retryIntervalMs": (オプション) Kafka オフセットのフェッチを開始するまでの待機時間 (ミリ秒)。デフォルト値は 10 です。

  • "maxOffsetsPerTrigger": (オプション) 処理されるオフセットの最大数を、トリガー間隔ごとのレート上限で指定する値。指定されたオフセットの合計数は、異なるボリュームの topicPartitions 間で均等に分割されます。デフォルト値は「null」です。この場合、コンシューマーは既知の最新のオフセットまで、すべてのオフセットを読み取ります。

  • "minPartitions": (オプション) Kafka から読み取ることを想定する、最小のパーティション数。デフォルト値は「null」です。これは、Spark パーティションの数が Kafka パーティションの数に等しいことを意味します。

  • "includeHeaders": (オプション) Kafka ヘッダーを含めるかどうかを決定します。このオプションが「true」に設定されている場合、データ出力には、「glue_streaming_kafka_headers」という名前で Array[Struct(key: String, value: String)] 型の列が追加されます。デフォルト値は「false」です。このオプションは、AWS Glue バージョン 3.0 以降でのみ使用可能です。

  • "schema": (inferSchema に false を設定した場合は必須) ペイロードの処理に使用するスキーマ。分類が avro である場合、提供されるスキーマには Avro スキーマ形式を使用する必要があります。分類が avro 以外の場合、提供されるスキーマには DDL スキーマ形式を使用する必要があります。

    以下に、スキーマの例を示します。

    Example in DDL schema format
    'column1' INT, 'column2' STRING , 'column3' FLOAT
    Example in Avro schema format
    { "type":"array", "items": { "type":"record", "name":"test", "fields": [ { "name":"_id", "type":"string" }, { "name":"index", "type": [ "int", "string", "float" ] } ] } }
  • "inferSchema": (オプション) デフォルト値は「false」です。「true」に設定すると、実行時に、スキーマが foreachbatch 内のペイロードから検出されます。

  • "avroSchema": (非推奨) Avro 形式を使用する場合に、Avro データのスキーマを指定するために使用されるパラメータです。このパラメータは非推奨となりました。schema パラメータを使用します。

"connectionType": "kinesis"

Amazon Kinesis Data Streams への接続を指定します。

Amazon Kinesis Data Streams から読み取るには、データカタログテーブルに格納されている情報を使用するか、データストリームに直接アクセスするための情報を指定します。データストリームに直接アクセスする場合は、これらのオプションを使用して、データストリームへのアクセス方法に関する情報を提供します。

getCatalogSource または create_data_frame_from_catalog を使用して Kinesis ストリーミングソースからレコードを消費する場合、ジョブは Data Catalog データベースとテーブル名の情報を持っており、それを使用して Kinesis ストリーミングソースから読み込むためのいくつかの基本パラメータを取得することができます。getSourcegetSourceWithFormatcreateDataFrameFromOptions、または create_data_frame_from_options を使用している場合、ここで説明する接続オプションを使用して、これらの基本パラメータを指定する必要があります。

Kinesis の接続オプションは、GlueContext クラス内の指定されたメソッドに対して以下の引数で指定することができます。

  • Scala

    • connectionOptions: getSourcecreateDataFrameFromOptions で使用

    • additionalOptions: getCatalogSource で使用する

    • options: getSourceWithFormat で使用する

  • Python

    • connection_options: create_data_frame_from_options で使用する

    • additional_options: create_data_frame_from_catalog で使用する

    • options: getSource で使用する

Kinesis ストリーミングデータソースには、次の接続オプションを使用します。

  • streamARN (必須) Kinesis データストリームの ARN。

  • classification (オプション)

  • delimiter (オプション)

  • "startingPosition": (オプション) Kinesis データストリーム内の、データの読み取り開始位置。使用できる値は "latest""trim_horizon"、または "earliest" です。デフォルト値は "latest" です。

  • "awsSTSRoleARN": (オプション) AWS Security Token Service(AWS STS) を使用して担うロールの Amazon リソースネーム (ARN)。このロールには、Kinesis データストリームのレコードの説明操作または読み取り操作の権限が必要です。このパラメーターは、別のアカウントのデータストリームにアクセスするときに使用する必要があります。"awsSTSSessionName" と組み合わせて使用します。

  • "awsSTSSessionName":(オプション)AWS STS を使用してロールを担うセッションの識別子。このパラメーターは、別のアカウントのデータストリームにアクセスするときに使用する必要があります。"awsSTSRoleARN" と組み合わせて使用します。

  • "maxFetchTimeInMs": (オプション) ジョブエグゼキュータが、シャードごとの Kinesis データストリームからレコードを取得するのに費やした最大時間。ミリ秒 (ms) 単位で表示します。デフォルト値は 1000 です。

  • "maxFetchRecordsPerShard": (オプション) Kinesis データストリームのシャードごとの、フェッチするレコードの最大数。デフォルト値は 100000 です。

  • "maxRecordPerRead": (オプション) getRecords オペレーションごとに、Kinesis データストリームからフェッチするレコードの最大数。デフォルト値は 10000 です。

  • "addIdleTimeBetweenReads": (オプション) 2 つの連続する getRecords オペレーション間の遅延時間を追加します。デフォルト値は "False" です。このオプションは、Glue バージョン 2.0 以降でのみ設定可能です。

  • "idleTimeBetweenReadsInMs": (オプション) 2 つの連続する getRecords オペレーション間での、最短の遅延時間 (ミリ秒単位で指定)。デフォルト値は 1000 です。このオプションは、Glue バージョン 2.0 以降でのみ設定可能です。

  • "describeShardInterval": (オプション) スクリプトが呼び出す 2 つの ListShards API 間での、再シャーディングを考慮すべき最小時間。詳細については、Amazon Kinesis Data Streams デベロッパーガイドの「リシャーディングのための戦略」を参照してください。デフォルト値は 1s です。

  • "numRetries": (オプション) Kinesis Data Streams API リクエストを再試行する最大の回数。デフォルト値は 3 です。

  • "retryIntervalMs": (オプション) Kinesis Data Streams API 呼び出しを再試行するまでのクールオフ期間 (ミリ秒単位で指定)。デフォルト値は 1000 です。

  • "maxRetryIntervalMs": (オプション) 再試行で 2 つの Kinesis Data Streams API を呼び出す間の最大クールオフ期間 (ミリ秒単位で指定)。デフォルト値は 10000 です。

  • "avoidEmptyBatches": (オプション) バッチ処理を開始する前に、Kinesis データストリームで未読のデータをチェックすることで、空のマイクロバッチジョブを作成しないようにします。デフォルト値は "False" です。

  • "schema": (inferSchema に false を設定した場合は必須) ペイロードの処理に使用するスキーマ。分類が avro である場合、提供されるスキーマには Avro スキーマ形式を使用する必要があります。分類が avro 以外の場合、提供されるスキーマには DDL スキーマ形式を使用する必要があります。

    以下に、スキーマの例を示します。

    Example in DDL schema format
    'column1' INT, 'column2' STRING , 'column3' FLOAT
    Example in Avro schema format
    { "type":"array", "items": { "type":"record", "name":"test", "fields": [ { "name":"_id", "type":"string" }, { "name":"index", "type": [ "int", "string", "float" ] } ] } }
  • "inferSchema": (オプション) デフォルト値は「false」です。「true」に設定すると、実行時に、スキーマが foreachbatch 内のペイロードから検出されます。

  • "avroSchema": (非推奨) Avro 形式を使用する場合に、Avro データのスキーマを指定するために使用されるパラメータです。このパラメータは非推奨となりました。schema パラメータを使用します。

"connectionType": "mongodb"

MongoDB への接続を指定します。接続オプションは、ソース接続とシンク接続とで異なります。

"connectionType": "mongodb" ソースとする

"connectionType": "mongodb" をソースとして、次の接続オプションを使用します。

  • "uri": (必須) 読み込み元の MongoDB ホスト (形式: mongodb://<host>:<port>)。

  • "database": (必須) 読み込み元の MongoDB データベース。このオプションは、ジョブスクリプトで glue_context.create_dynamic_frame_from_catalog を呼び出す際に、additional_options を介して渡すことも可能です。

  • "collection": (必須) 読み込み元の MongoDB コレクション。このオプションは、ジョブスクリプトで glue_context.create_dynamic_frame_from_catalog を呼び出す際に、additional_options を介して渡すことも可能です。

  • "username": (必須) MongoDB のユーザー名。

  • "password": (必須) MongoDB のパスワード。

  • "ssl": (オプション) true の場合、SSL 接続を開始します。デフォルト: false

  • "ssl.domain_match": (任意) truessltrue の場合 、ドメイン一致チェックが実行されます。デフォルト: true

  • "batchSize": (オプション): 内部バッチのカーソル内で使用される、バッチごとに返されるドキュメントの数。

  • "partitioner": (オプション): MongoDB から入力データを読み取るためのパーティショナーのクラス名。コネクタには、次のパーティショナーがあります。

    • MongoDefaultPartitioner (デフォルト)

    • MongoSamplePartitioner (MongoDB 3.2 以降が必要です)

    • MongoShardedPartitioner

    • MongoSplitVectorPartitioner

    • MongoPaginateByCountPartitioner

    • MongoPaginateBySizePartitioner

  • "partitionerOptions" (オプション): 指定されたパーティショナーのオプション。各パーティショナーでは、次のオプションがサポートされています。

    • MongoSamplePartitioner: partitionKey, partitionSizeMB, samplesPerPartition

    • MongoShardedPartitioner: shardkey

    • MongoSplitVectorPartitioner: partitionKey, partitionSizeMB

    • MongoPaginateByCountPartitioner: partitionKey, numberOfPartitions

    • MongoPaginateBySizePartitioner: partitionKey, partitionSizeMB

    これらのオプションの詳細については、MongoDB のドキュメントの「Partitioner Configuration」を参照してください。サンプルコードについては、「例: 接続タイプとオプションの設定」を参照してください。

"connectionType": "mongodb" as Sink

"connectionType": "mongodb" をシンクとして、次の接続オプションを使用します。

  • "uri": (必須) 書き込み先の MongoDB ホスト (形式: mongodb://<host>:<port>)。

  • "database": (必須) 書き込み先の MongoDB データベース。

  • "collection": (必須) 書き込み先の MongoDB コレクション。

  • "username": (必須) MongoDB のユーザー名。

  • "password": (必須) MongoDB のパスワード。

  • "ssl": (オプション) true の場合、SSL 接続を開始します。デフォルト: false

  • "ssl.domain_match": (任意) truessltrue の場合 、ドメイン一致チェックが実行されます。デフォルト: true

  • "extendedBsonTypes": (オプション) true が設定されている場合、MongoDB にデータを書き込む際に拡張 BSON 型を使用することを許可します。デフォルト: true

  • "replaceDocument": (オプション) true の場合、_id フィールドを含むデータセットを保存するときに、ドキュメント全体を置き換えます。false の場合、データセットのフィールドと一致するドキュメントのフィールドのみが更新されます。デフォルト: true

  • "maxBatchSize": (オプション): データを保存するときの一括オペレーションの最大バッチサイズ。デフォルトは 512 です。

サンプルコードについては、「例: 接続タイプとオプションの設定」を参照してください。

"connectionType": "orc"

Apache Hive Optimized Row Columnar (ORC) ファイル形式で、Amazon S3 に保存されるファイルへの接続を指定します。

"connectionType": "orc" では、次の接続オプションを使用します。

  • paths: (必須) 読み取りのソースとなる Amazon S3 パスのリスト。

  • (その他のオプション名/値ペア): 書式設定オプションなどのその他のオプションはすべて SparkSQL DataSource に直接渡されます。詳細については、「Redshift data source for Spark」を参照してください。

"connectionType": "parquet"

Apache Parquet ファイル形式で、Amazon S3 に保存されるファイルへの接続を指定します。

"connectionType": "parquet" では、次の接続オプションを使用します。

  • paths: (必須) 読み取りのソースとなる Amazon S3 パスのリスト。

  • (その他のオプション名/値ペア): 書式設定オプションなどのその他のオプションはすべて SparkSQL DataSource に直接渡されます。詳細については、GitHub ウェブサイトの「Amazon Redshift data source for Spark」を参照してください。

"connectionType": "s3"

Simple Storage Service (Amazon S3) への接続を指定します。

"connectionType": "s3" では、次の接続オプションを使用します。

  • "paths": (必須) 読み取りのソースとなる Amazon S3 パスのリスト。

  • "exclusions": (オプション) 除外する Unix スタイルの glob パターンの JSON リストを含む文字列。例えば、"[\"**.pdf\"]" はすべての PDF ファイルを除外します。AWS Glue がサポートする glob 構文の詳細については、「包含パターンと除外パターンを使用する」を参照してください。

  • "compressionType": または「compression」: (オプション) データの圧縮方法を指定します。Simple Storage Service (Amazon S3) ソース用には "compressionType" を、Simple Storage Service (Amazon S3) ターゲット用には "compression" を使用します。データに標準のファイル拡張子が付いている場合、このオプションは一般的に不要です。指定できる値は "gzip" および "bzip" です。

  • "groupFiles": (オプション) 入力ファイルが 50,000 個を超える場合、デフォルトでファイルのグループ化が有効化されます。入力ファイルが 50,000 個未満の場合にグループ化を有効化するには、このパラメータに "inPartition" を設定します。入力ファイルが 50,000 個を超える場合に、グループ化を無効にするには、このパラメータを "none" に設定します。

  • "groupSize": (オプション) ターゲットグループのサイズ (バイト単位)。デフォルトは、入力データのサイズとクラスターのサイズに基づいて計算されます。入力ファイルが 50,000 個未満の場合、このオプションを有効にするには、"groupFiles""inPartition" に設定する必要があります。

  • "recurse": (オプション) true に設定した場合は、指定したパスの下にあるすべてのサブディレクトリ内のファイルを再帰的に読み取ります。

  • "maxBand": (オプション、詳細設定) このオプションでは、s3 リストの一貫性が認められるまでの期間をミリ秒単位で指定します。Amazon S3 の結果整合性を担保するために、直前の maxBand ミリ秒以内の変更タイムスタンプが付いたファイルが、特に JobBookmarks の使用時に追跡されます。ほとんどのユーザーはこのオプションを設定する必要はありません。デフォルトは 900000 ミリ秒 (15 分) です。

  • "maxFilesInBand": (オプション、詳細設定) このオプションは、直前の maxBand 秒間に保存するファイルの最大数を指定します。この数を超えた場合、余分なファイルはスキップされ、次のジョブ実行時にのみ処理されます。ほとんどのユーザーはこのオプションを設定する必要はありません。

  • "isFailFast": (オプション) このオプションにより、AWS Glue ETL ジョブがリーダー解析の例外をスローするかどうかを決定します。true に設定すると、Spark タスクが 4 回の再試行のうちにデータを正しく解析できなかった場合、ジョブは速やかに失敗します。

JDBC connectionType の値

これには以下が含まれます。

  • "connectionType": "sqlserver": Microsoft SQL Server データベースへの接続を指定します。

  • "connectionType": "mysql": MySQL データベースへの接続を指定します。

  • "connectionType": "oracle": Oracle データベースへの接続を指定します。

  • "connectionType": "postgresql": PostgreSQL データベースへの接続を指定します。

  • "connectionType": "redshift": Amazon Redshift データベースへの接続を指定します。

次の表に、AWS Glue がサポートする JDBC ドライバーのバージョンを示します。

製品 Glue 0.9、1.0、2.0 の JDBC ドライバのバージョン Glue 3.0 の JDBC ドライバのバージョン
Microsoft SQL Server 6.x 7.x
MySQL 5.1 8.0.23
Oracle Database 11.2 21.1
PostgreSQL 42.1.x 42.2.18
MongoDB 2.0.0 4.0.0
Amazon Redshift 4.1 4.1

JDBC 接続では、次の接続オプションを使用します。

  • "url": (必須) データベースの JDBC URL。

  • "dbtable": 読み取り元のデータベーステーブル。データベース内でスキーマをサポートする JDBC データストアの場合、schema.table-name を指定します。スキーマを指定しない場合、デフォルトの「パブリック」スキーマが使用されます。

  • "redshiftTmpDir": (Amazon Redshift の場合は必須、他の JDBC タイプの場合はオプション) データベースからコピーする際の一時データをステージングできる Amazon S3 のパス。

  • "user": (必須) 接続時に使用するユーザー名。

  • "password": (必須) 接続時に使用するパスワード。

  • (オプション) 次のオプションを使用すると、カスタム JDBC ドライバーを指定できます。AWS Glue がネイティブでサポートしていないドライバーを使用する必要がある場合は、これらのオプションを使用します。ソースおよびターゲットが同じデータベース製品の場合でも、ETL ジョブは、データソースおよびターゲットに対して、異なるバージョンの JDBC ドライバーを使用することができます。これにより、異なるバージョンのソースデータベースとターゲットデータベース間でデータを移行できます。これらのオプションを使用するには、最初に JDBC ドライバーの jar ファイルを Amazon S3 にアップロードする必要があります。

    • "customJdbcDriverS3Path": カスタム JDBC ドライバーの Simple Storage Service (Amazon S3) パス。

    • "customJdbcDriverClassName": JDBC ドライバーのクラス名。

  • "bulksize":(オプション)JDBC ターゲットへのバルクロードを高速化するためのパラレル挿入を構成するために使用します。データの書き込みまたは挿入時に使用する並列度の整数値を指定します。このオプションは、Arch User Repository (AUR) などのデータベースへの書き込みのパフォーマンスを向上させるのに役立ちます。

  • "sampleQuery": (オプション) サンプリング用のカスタム SQL クエリステートメント。デフォルトでは、サンプルクエリは単一のエグゼキュータによって実行されます。大きなデータセットを読み取る場合は、JDBC パーティション化を有効にして、テーブルをパラレルにクエリする必要があります。詳細については、JDBC テーブルからのパラレル読み取りを参照してください。sampleQuery を JDBC パーティショニングで使用する場合は、enablePartitioningForSampleQuery も true に設定します。

  • "enablePartitioningForSampleQuery": (オプション) デフォルトでは、このオプションは false となっています。sampleQuery をパーティション化された JDBC テーブルで使用する場合は必要になります。true に設定すると、AWS Glueでパーティション化条件を追加するためには、sampleQuery が「where」または「and」で終わる必要があります。以下の例を参照してください。

  • "sampleSize": (オプション) サンプルクエリによって返される行数を制限します。enablePartitioningForSampleQuery が true の場合にのみ動作します。パーティショニングが有効になっていない場合は、代わりに sampleQuery で「limit x」を追加してサイズを制限します。

    例 パーティション化せずに sampleQuery を使用する

    次のコード例は、パーティション化せずに sampleQuery を使用する方法を示しています。

    //A full sql query statement. val query = "select name from $tableName where age > 0 limit 1" val connectionOptions = JsonOptions(Map( "url" → url, "dbtable" → table, "user" → user, "password" → password, "basePath" → basePath, "sampleQuery" → query )) val dyf = glueContext.getSource("mysql", connectionOptions) .getDynamicFrame()

    例 JDBC パーティション化で sampleQuery を使用する

    次のコード例は、JDBC パーティション化で sampleQuery を使用する方法を示しています。

    //note that the query should end with "where" or "and" if use with JDBC partitioning. val query = "select name from $tableName where age > 0 and" //Enable JDBC partitioning by setting hashfield. //to use sampleQuery with partitioning, set enablePartitioningForSampleQuery. //use sampleSize to limit the size of returned data. val connectionOptions = JsonOptions(Map( "url" → url, "dbtable" → table, "user" → user, "password" → password, "basePath" → basePath, "hashfield" -> primaryKey, "sampleQuery" → query, "enablePartitioningForSampleQuery" -> true, "sampleSize" -> "1" )) val dyf = glueContext.getSource("mysql", connectionOptions) .getDynamicFrame()

Redshift 接続タイプの場合は、書式設定オプションを含め、JDBC 接続用に接続オプションに含まれるその他のオプション名/値ペアはすべて、基になる SparkSQL DataSource に直接渡されます。詳細については、「Redshift data source for Spark」を参照してください。

次のコード例は、カスタム JDBC ドライバーを使用して JDBC データベースから読み書きする方法を示しています。データベース製品の 1 つのバージョンから読み取り、同じ製品の新しいバージョンに書き込んでいます。

Python
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext, SparkConf from awsglue.context import GlueContext from awsglue.job import Job import time from pyspark.sql.types import StructType, StructField, IntegerType, StringType sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session # Construct JDBC connection options connection_mysql5_options = { "url": "jdbc:mysql://<jdbc-host-name>:3306/db", "dbtable": "test", "user": "admin", "password": "pwd"} connection_mysql8_options = { "url": "jdbc:mysql://<jdbc-host-name>:3306/db", "dbtable": "test", "user": "admin", "password": "pwd", "customJdbcDriverS3Path": "s3://path/mysql-connector-java-8.0.17.jar", "customJdbcDriverClassName": "com.mysql.cj.jdbc.Driver"} connection_oracle11_options = { "url": "jdbc:oracle:thin:@//<jdbc-host-name>:1521/ORCL", "dbtable": "test", "user": "admin", "password": "pwd"} connection_oracle18_options = { "url": "jdbc:oracle:thin:@//<jdbc-host-name>:1521/ORCL", "dbtable": "test", "user": "admin", "password": "pwd", "customJdbcDriverS3Path": "s3://path/ojdbc10.jar", "customJdbcDriverClassName": "oracle.jdbc.OracleDriver"} # Read from JDBC databases with custom driver df_mysql8 = glueContext.create_dynamic_frame.from_options(connection_type="mysql", connection_options=connection_mysql8_options) # Read DynamicFrame from MySQL 5 and write to MySQL 8 df_mysql5 = glueContext.create_dynamic_frame.from_options(connection_type="mysql", connection_options=connection_mysql5_options) glueContext.write_from_options(frame_or_dfc=df_mysql5, connection_type="mysql", connection_options=connection_mysql8_options) # Read DynamicFrame from Oracle 11 and write to Oracle 18 df_oracle11 = glueContext.create_dynamic_frame.from_options(connection_type="oracle", connection_options=connection_oracle11_options) glueContext.write_from_options(frame_or_dfc=df_oracle11, connection_type="oracle", connection_options=connection_oracle18_options)
Scala
import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.MappingSpec import com.amazonaws.services.glue.errors.CallSite import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.DynamicFrame import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object GlueApp { val MYSQL_5_URI: String = "jdbc:mysql://<jdbc-host-name>:3306/db" val MYSQL_8_URI: String = "jdbc:mysql://<jdbc-host-name>:3306/db" val ORACLE_11_URI: String = "jdbc:oracle:thin:@//<jdbc-host-name>:1521/ORCL" val ORACLE_18_URI: String = "jdbc:oracle:thin:@//<jdbc-host-name>:1521/ORCL" // Construct JDBC connection options lazy val mysql5JsonOption = jsonOptions(MYSQL_5_URI) lazy val mysql8JsonOption = customJDBCDriverJsonOptions(MYSQL_8_URI, "s3://path/mysql-connector-java-8.0.17.jar", "com.mysql.cj.jdbc.Driver") lazy val oracle11JsonOption = jsonOptions(ORACLE_11_URI) lazy val oracle18JsonOption = customJDBCDriverJsonOptions(ORACLE_18_URI, "s3://path/ojdbc10.jar", "oracle.jdbc.OracleDriver") def main(sysArgs: Array[String]): Unit = { val spark: SparkContext = new SparkContext() val glueContext: GlueContext = new GlueContext(spark) val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) // Read from JDBC database with custom driver val df_mysql8: DynamicFrame = glueContext.getSource("mysql", mysql8JsonOption).getDynamicFrame() // Read DynamicFrame from MySQL 5 and write to MySQL 8 val df_mysql5: DynamicFrame = glueContext.getSource("mysql", mysql5JsonOption).getDynamicFrame() glueContext.getSink("mysql", mysql8JsonOption).writeDynamicFrame(df_mysql5) // Read DynamicFrame from Oracle 11 and write to Oracle 18 val df_oracle11: DynamicFrame = glueContext.getSource("oracle", oracle11JsonOption).getDynamicFrame() glueContext.getSink("oracle", oracle18JsonOption).writeDynamicFrame(df_oracle11) Job.commit() } private def jsonOptions(uri: String): JsonOptions = { new JsonOptions( s"""{"url": "${url}", |"dbtable":"test", |"user": "admin", |"password": "pwd"}""".stripMargin) } private def customJDBCDriverJsonOptions(uri: String, customJdbcDriverS3Path: String, customJdbcDriverClassName: String): JsonOptions = { new JsonOptions( s"""{"url": "${url}", |"dbtable":"test", |"user": "admin", |"password": "pwd", |"customJdbcDriverS3Path": "${customJdbcDriverS3Path}", |"customJdbcDriverClassName" : "${customJdbcDriverClassName}"}""".stripMargin) } }

カスタムと AWS Marketplace での、connectionType の値

これには以下が含まれます。

  • "connectionType": "marketplace.athena": Amazon Athena データストアへの接続を指定します。この接続では、AWS Marketplace が提供するコネクタを使用します。

  • "connectionType": "marketplace.spark": Apache Spark データストアへの接続を指定します。この接続では、AWS Marketplace が提供するコネクタを使用します。

  • "connectionType": "marketplace.jdbc": JDBC データストアへの接続を指定します。この接続では、AWS Marketplace が提供するコネクタを使用します。

  • "connectionType": "custom.athena": Amazon Athena データストアへの接続を指定します。接続には、AWS Glue Studio にアップロードしたカスタムコネクタを使用します。

  • "connectionType": "custom.spark": Apache Spark データストアへの接続を指定します。接続には、AWS Glue Studio にアップロードしたカスタムコネクタを使用します。

  • "connectionType": "custom.jdbc": JDBC データストアへの接続を指定します。接続には、AWS Glue Studio にアップロードしたカスタムコネクタを使用します。

custom.jdbc または marketplace.jdbc 型での接続オプション

  • className – (必須) ドライバクラス名を示す文字列。

  • connectionName – (必須) コネクタに関連付けられている接続の名前を示す文字列。

  • url – (必須) データソースへの接続を構築するために使用される、プレースホルダを含む JDBC URL (${}) を示す文字列。プレースホルダー ${secretKey} は、AWS Secrets Manager 内にある同じ名前のシークレットにより置き換えられます。URL の構築の詳細については、データストアのドキュメントを参照してください。

  • secretId または user/password – (必須) URL の認証情報を取得するために使用される文字列。

  • dbTableまたはquery – (必須) データを取得するテーブルまたは SQL クエリを示す文字列。dbTable または query を指定できます。両方を指定することはできません。

  • partitionColumn – (オプション) パーティション化に使用される整数カラムの名前を示す文字列。このオプションは、lowerBoundupperBound、および numPartitions に含まれている場合にのみ機能します。このオプションの機能は、Spark SQL JDBC リーダーのものと同様です。詳細については、Apache Spark SQL, DataFrames and Datasets Guide の「JDBC To Other Databases」を参照してください。

    lowerBound および upperBound 値は、パーティションのストライドを決定するために使用されます (テーブル内の行のフィルタリングには使用しません)。返されるテーブル内のすべての行は、パーティション化されています。

    注記

    テーブル名の代わりにクエリを使用する場合は、指定されたパーティショニング条件でクエリが動作することを確認する必要があります。例:

    • "SELECT col1 FROM table1" の形式のクエリでパーティション列を使用する場合、末尾に WHERE 句を追加してそのクエリをテストします。

    • クエリ形式が SELECT col1 FROM table1 WHERE col2=val" の場合は、WHERE 句を AND とパーティション列を使用する式で拡張することで、そのクエリをテストします。

  • lowerBound – パーティションストライドを決定するために使用される partitionColumn の最小値を示す整数 (オプション)。

  • upperBound – パーティションストライドを決定するために使用される partitionColumn の最大値を示す整数 (オプション)。

  • numPartitions – パーティション数を示す整数 (オプション)。この値は、(範囲に含まれる) lowerBound と (範囲に含まれない) upperBound とともに使用され、partitionColumn の分割で使用するために生成された WHERE 句の式のための、パーティションストライドを形成します。

    重要

    パーティションが多すぎると、外部データベースシステムで問題が発生する可能性があるため、パーティションの数には注意を払ってください。

  • filterPredicate – ソースからのデータをフィルタリングする、追加の条件句を示す文字列 (オプション)。例:

    BillingCity='Mountain View'

    table 名の代わりに query を使用した場合は、指定された filterPredicate でクエリが動作することを確認します。例:

    • クエリの形式が "SELECT col1 FROM table1" の場合は、フィルタ述語を使用するクエリの末尾に WHERE 句を追加して、そのクエリをテストします。

    • クエリ形式が "SELECT col1 FROM table1 WHERE col2=val" の場合は、WHERE 句を AND およびフィルター述語を使用する式で拡張して、そのクエリをテストします。

  • dataTypeMapping – (ディクショナリ、オプション) JDBC データ型 から Glue データ型に対するマッピングを構築する、カスタムのデータ型マッピング。例えば、"dataTypeMapping":{"FLOAT":"STRING"} オプションは、ドライバーの ResultSet.getString() メソッドを呼び出すことによって JDBC FLOAT データ型のデータフィールドを Java String データ型にマップし、それを使用してグルーレコードを構築します。ResultSet オブジェクトは各ドライバによって実装されるため、その動作は使用するドライバにより決定されます。ドライバによる変換の実行方法については、JDBC ドライバのドキュメントを参照してください。

  • AWS Glue で現在サポートされているデータ型は以下のとおりです。

    • DATE

    • STRING

    • TIMESTAMP

    • INT

    • FLOAT

    • LONG

    • BIGDECIMAL

    • BYTE

    • SHORT

    • DOUBLE

    JDBC データ型としては、Java8 java.sql.types がサポートされています。

    デフォルトの (JDBC から AWS Glue への) データ型マッピングは以下のとおりです。

    • DATE -> DATE

    • VARCHAR -> STRING

    • CHAR -> STRING

    • LONGNVARCHAR -> STRING

    • TIMESTAMP -> TIMESTAMP

    • INTEGER -> INT

    • FLOAT -> FLOAT

    • REAL -> FLOAT

    • BIT -> BOOLEAN

    • BOOLEAN -> BOOLEAN

    • BIGINT -> LONG

    • DECIMAL -> BIGDECIMAL

    • NUMERIC -> BIGDECIMAL

    • TINYINT -> SHORT

    • SMALLINT -> SHORT

    • DOUBLE -> DOUBLE

    カスタムのデータタイプマッピングで dataTypeMapping オプションを使用すると、デフォルトのデータ型マッピングをオーバーライドできます。この影響を受けるのは、dataTypeMapping オプションでリストされた JDBC データ型のみです。他のすべての JDBC データ型に対しては、デフォルトのマッピングが使用されます。必要に応じて、別の JDBC データ型のマッピングを追加することも可能です。デフォルトまたはカスタムのマッピングのいずれにも JDBC データ型が含まれていない場合、データ型はデフォルトで AWS GlueSTRING データ型に変換されます。

次の Python コード例は、AWS Marketplace のJDBC ドライバーを使用して、JDBC データベースからの読み取りを実行する方法を示しています。ここでは、データベースからの読み取りと、S3 ロケーションへの書き込みの方法を知ることができます。

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [connection_type = "marketplace.jdbc", connection_options = {"dataTypeMapping":{"INTEGER":"STRING"},"upperBound":"200","query":"select id, name, department from department where id < 200","numPartitions":"4", "partitionColumn":"id","lowerBound":"0","connectionName":"test-connection-jdbc"}, transformation_ctx = "DataSource0"] ## @return: DataSource0 ## @inputs: [] DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.jdbc", connection_options = {"dataTypeMapping":{"INTEGER":"STRING"}, "upperBound":"200","query":"select id, name, department from department where id < 200","numPartitions":"4","partitionColumn":"id","lowerBound":"0", "connectionName":"test-connection-jdbc"}, transformation_ctx = "DataSource0") ## @type: ApplyMapping ## @args: [mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0"] ## @return: Transform0 ## @inputs: [frame = DataSource0] Transform0 = ApplyMapping.apply(frame = DataSource0, mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0") ## @type: DataSink ## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0"] ## @return: DataSink0 ## @inputs: [frame = Transform0] DataSink0 = glueContext.write_dynamic_frame.from_options(frame = Transform0, connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0") job.commit()

custom.athena または marketplace.athena タイプ用の接続オプション

  • className – (必須) ドライバクラス名を示す文字列。Athena-CloudWatch コネクタを使用している場合、このパラメータ値はクラス名にプレフィックスされます (例: "com.amazonaws.athena.connectors")。Athena-CloudWatch コネクタは、メタデータハンドラーとレコードハンドラーの 2 つのクラスで構成されています。共通のプレフィックスを指定することで、API がそのプレフィックスに基づいた適切なクラスをロードします。

  • tableName – (必須) 読み込む CloudWatch ログストリームの名前を示す文字列。このコードスニペットでは、ビューに特別な名前 all_log_streams を使用しています。この場合、返された動的データフレームには、ロググループ内のすべてのログストリームからのデータが含まれます。

  • schemaName – (必須) 読み取りのソースとなる CloudWatch ロググループの名前を示す文字列。例えば、/aws-glue/jobs/output です。

  • connectionName – (必須) コネクタに関連付けられている接続の名前を示す文字列。

このコネクタの追加オプションについては、GitHub の Amazon Athena CloudWatch Connector README ファイルを参照してください。

次の Python コード例は、AWS Marketplace コネクタを使用しながら、Athena データストアからの読み取りを実行する方法を示しています。こここでは、Athena からの読み取りと、S3 ロケーションへの書き込みを行う方法を知ることができます。

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [connection_type = "marketplace.athena", connection_options = {"tableName":"all_log_streams","schemaName":"/aws-glue/jobs/output", "connectionName":"test-connection-athena"}, transformation_ctx = "DataSource0"] ## @return: DataSource0 ## @inputs: [] DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.athena", connection_options = {"tableName":"all_log_streams",, "schemaName":"/aws-glue/jobs/output","connectionName": "test-connection-athena"}, transformation_ctx = "DataSource0") ## @type: ApplyMapping ## @args: [mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0"] ## @return: Transform0 ## @inputs: [frame = DataSource0] Transform0 = ApplyMapping.apply(frame = DataSource0, mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0") ## @type: DataSink ## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0"] ## @return: DataSink0 ## @inputs: [frame = Transform0] DataSink0 = glueContext.write_dynamic_frame.from_options(frame = Transform0, connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0") job.commit()

custom.spark または marketplace.spark タイプ用の接続オプション

  • className – (必須) コネクタのクラス名を支援す文字列。

  • secretId – (オプション) コネクタ接続の認証情報を取得するために使用される文字列。

  • connectionName – (必須) コネクタに関連付けられている接続の名前を示す文字列。

  • その他のオプションは、データストアによって異なります。例えば、Elasticsearch for Apache Hadoop ドキュメントの説明にあるように、OpenSearch の設定オプションは「es」でプレフィックスされます。Spark から Snowflake への接続では、Connecting to Snowflake ガイドの「Using the Spark Connector」で説明されているように、sfUser および sfPassword のオプションを使用します。

次の Python コード例に、marketplace.spark 接続を使用して、OpenSearch のデータストアからの読み取りを実行する方法を示します。

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [connection_type = "marketplace.spark", connection_options = {"path":"test", "es.nodes.wan.only":"true","es.nodes":"https://<AWS endpoint>", "connectionName":"test-spark-es","es.port":"443"}, transformation_ctx = "DataSource0"] ## @return: DataSource0 ## @inputs: [] DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.spark", connection_options = {"path":"test","es.nodes.wan.only": "true","es.nodes":"https://<AWS endpoint>","connectionName": "test-spark-es","es.port":"443"}, transformation_ctx = "DataSource0") ## @type: DataSink ## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0"] ## @return: DataSink0 ## @inputs: [frame = DataSource0] DataSink0 = glueContext.write_dynamic_frame.from_options(frame = DataSource0, connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0") job.commit()