翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
GlueContext クラス
Apache Spark の SparkContext
__init__
__init__(sparkContext)
sparkContext
- 使用する Apache Spark のコンテキスト。
作成中
getSource
getSource(connection_type, transformation_ctx = "", **options)
外部ソースから DynamicFrames
を読み取るために使用できる DataSource
オブジェクトを作成します。
connection_type
– 使用する接続タイプ (Amazon Simple Storage Service (Amazon S3)、Amazon Redshift、JDBC など)。有効な値には、s3
、mysql
、postgresql
、redshift
、sqlserver
、oracle
およびdynamodb
があります。transformation_ctx
- 使用する変換コンテキスト (オプション)。options
- オプションの名前と値のペアのコレクション。詳細については、「AWS Glue for Spark での ETL の接続タイプとオプション」を参照してください。
以下は、getSource
の使用例です。
>>> data_source = context.getSource("file", paths=["/in/path"]) >>> data_source.setFormat("json") >>> myFrame = data_source.getFrame()
create_dynamic_frame_from_rdd
create_dynamic_frame_from_rdd(data, name, schema=None, sample_ratio=None, transformation_ctx="")
Apache Spark Resilient Distributed Dataset (RDD) から作成された DynamicFrame
を返します。
data
- 使用するデータソース。name
- 使用するデータの名前。schema
- 使用するスキーマ (オプション)。sample_ratio
- 使用するサンプル比率 (オプション)。transformation_ctx
- 使用する変換コンテキスト (オプション)。
create_dynamic_frame_from_catalog
create_dynamic_frame_from_catalog(database, table_name, redshift_tmp_dir, transformation_ctx = "", push_down_predicate= "", additional_options = {}, catalog_id = None)
データカタログデータベースとテーブル名を使用して作成された DynamicFrame
を返します。このメソッドを使用するときは、指定した AWS Glue データカタログテーブルのテーブルプロパティを使用して format_options
を指定し、additional_options
引数を使用して他のオプションを指定します。
Database
- 読み取り元のデータベース。table_name
- 読み取り元のテーブルの名前。redshift_tmp_dir
- 使用する Amazon Redshift の一時ディレクトリ (オプション)。transformation_ctx
- 使用する変換コンテキスト (オプション)。push_down_predicate
– データセットのすべてのファイルをリストアップして読み取る必要がないフィルタパーティション。サポートされているソースと制限については、「AWS Glue ETL の「プッシュダウンによる読み取りの最適化」を参照してください。詳細については、「プッシュダウン述語を使用した事前フィルタ処理」を参照してください。additional_options
- オプションの名前と値のペアのコレクション。AWS Glue for Spark での ETL の接続タイプとオプション でリストされている使用可能なオプション (endpointUrl
、streamName
、bootstrap.servers
、security.protocol
、topicName
、classification
、およびdelimiter
を除く)。別のオプションとして、catalogPartitionPredicate
もサポートされています。catalogPartitionPredicate
– カタログ式を渡して、インデックス列に基づいたフィルタリングができます。これにより、フィルタリングをサーバー側で処理できます。詳細については、「AWS Glue パーティションインデックス」を参照してください。push_down_predicate
とcatalogPartitionPredicate
では、異なる構文が使用されることに注意してください。前者では Spark SQL の標準構文を使用し、後者では JSQL パーサーを使用します。catalog_id
– 現在アクセスされているデータカタログのカタログ ID (アカウント ID)。None の場合は、呼び出し元のアカウント ID のデフォルトが使用されます。
create_dynamic_frame_from_options
create_dynamic_frame_from_options(connection_type, connection_options={},
format=None, format_options={}, transformation_ctx = "")
指定された接続と形式で作成された DynamicFrame
を返します。
connection_type
– 接続タイプ (Amazon S3、Amazon Redshift、JDBC など)。有効な値には、s3
、mysql
、postgresql
、redshift
、sqlserver
、oracle
およびdynamodb
があります。connection_options
– パスやデータベーステーブルなど接続オプション (オプション)。s3
のconnection_type
に関しては、Amazon S3 パスのリストが定義されています。connection_options = {"paths": ["
s3://aws-glue-target/temp
"]}JDBC 接続の場合、いくつかのプロパティを定義する必要があります。データベース名は URL の一部である必要があることに注意してください。オプションで接続オプションに含めることができます。
警告
スクリプトにパスワードを保存することはお勧めしません。AWS Secrets Manager または AWS Glue データカタログから取得する場合には、
boto3
を使用することを検討してください。connection_options = {"url": "
jdbc-url/database
", "user": "username
", "password":passwordVariable
,"dbtable": "table-name
", "redshiftTmpDir": "s3-tempdir-path
"}dbtable
プロパティは JDBC テーブルの名前です。データベース内でスキーマをサポートする JDBC データストアの場合、schema.table-name
を指定します。スキーマを指定しない場合、デフォルトの「パブリック」スキーマが使用されます。詳細については、「AWS Glue for Spark での ETL の接続タイプとオプション」を参照してください。
format
– フォーマット仕様 Amazon S3 や、複数のフォーマットをサポートする AWS Glue 接続の場合に使用します。サポートされる形式については、「AWS Glue for Spark での入出力のデータ形式に関するオプション」を参照してください。format_options
– 指定された形式についてのオプション。サポートされる形式については、「AWS Glue for Spark での入出力のデータ形式に関するオプション」を参照してください。transformation_ctx
- 使用する変換コンテキスト (オプション)。push_down_predicate
– データセットのすべてのファイルをリストアップして読み取る必要がないフィルタパーティション。サポートされているソースと制限については、「AWS Glue ETL の「プッシュダウンによる読み取りの最適化」を参照してください。詳細については、「プッシュダウン述語を使用した事前フィルタ処理」を参照してください。
create_sample_dynamic_frame_from_catalog
create_sample_dynamic_frame_from_catalog(database, table_name, num, redshift_tmp_dir, transformation_ctx = "", push_down_predicate= "", additional_options = {}, sample_options = {}, catalog_id = None)
データカタログデータベースとテーブル名を使用して作成されたサンプル DynamicFrame
を返します。DynamicFrame
にはデータソースからの最初の num
レコードのみが含まれます。
-
database
- 読み取り元のデータベース。 -
table_name
- 読み取り元のテーブルの名前。 -
num
— 返されるサンプル動的フレーム内のレコードの最大数。 redshift_tmp_dir
- 使用する Amazon Redshift の一時ディレクトリ (オプション)。-
transformation_ctx
- 使用する変換コンテキスト (オプション)。 push_down_predicate
– データセットのすべてのファイルをリストアップして読み取る必要がないフィルタパーティション。詳細については、「プッシュダウン述語を使用した事前フィルタ処理」を参照してください。-
additional_options
- オプションの名前と値のペアのコレクション。AWS Glue for Spark での ETL の接続タイプとオプション でリストされている使用可能なオプション (endpointUrl
、streamName
、bootstrap.servers
、security.protocol
、topicName
、classification
、およびdelimiter
を除く)。 -
sample_options
— サンプリング動作を制御するパラメータ (オプション)。Simple Storage Service (Amazon S3) ソースで現在使用可能なパラメータ:maxSamplePartitions
— サンプリングが読み取るパーティションの最大数。デフォルト値は 10 ですmaxSampleFilesPerPartition
— サンプリングが 1 つのパーティションで読み取るファイルの最大数。デフォルト値は 10 です。これらのパラメータは、ファイル一覧で消費される時間を短縮するのに役立ちます。例えば、データセットに 1000 個のパーティションがあり、各パーティションには 10 個のファイルがあるとします。10,000 個のファイルをすべてリスト表示する代わりに、
maxSamplePartitions
= 10 およびmaxSampleFilesPerPartition
= 10 と設定した場合、サンプリングでは、最初の 10 個のパーティションの最初の 10 個のファイルのみがリスト表示されて読み込まれ、合計で 10*10 = 100 個のファイルとなります。
-
catalog_id
– 現在アクセスされているデータカタログのカタログ ID (データカタログのアカウント ID)。デフォルトでは、None
に設定されています。None
のデフォルト値は、サービス内の呼び出し元アカウントのカタログ ID になります。
create_sample_dynamic_frame_from_options
create_sample_dynamic_frame_from_options(connection_type, connection_options={}, num, sample_options={}, format=None, format_options={}, transformation_ctx = "")
指定された接続と形式で作成されたサンプル DynamicFrame
を返します。DynamicFrame
にはデータソースからの最初の num
レコードのみが含まれます。
connection_type
– 接続タイプ (Amazon S3、Amazon Redshift、JDBC など)。有効な値には、s3
、mysql
、postgresql
、redshift
、sqlserver
、oracle
およびdynamodb
があります。connection_options
– パスやデータベーステーブルなど接続オプション (オプション)。詳細については、「AWS Glue for Spark での ETL の接続タイプとオプション」を参照してください。-
num
— 返されるサンプル動的フレーム内のレコードの最大数。 -
sample_options
— サンプリング動作を制御するパラメータ (オプション)。Simple Storage Service (Amazon S3) ソースで現在使用可能なパラメータ:maxSamplePartitions
— サンプリングが読み取るパーティションの最大数。デフォルト値は 10 ですmaxSampleFilesPerPartition
— サンプリングが 1 つのパーティションで読み取るファイルの最大数。デフォルト値は 10 です。これらのパラメータは、ファイル一覧で消費される時間を短縮するのに役立ちます。例えば、データセットに 1000 個のパーティションがあり、各パーティションには 10 個のファイルがあるとします。10,000 個のファイルをすべてリスト表示する代わりに、
maxSamplePartitions
= 10 およびmaxSampleFilesPerPartition
= 10 と設定した場合、サンプリングでは、最初の 10 個のパーティションの最初の 10 個のファイルのみがリスト表示されて読み込まれ、合計で 10*10 = 100 個のファイルとなります。
format
– フォーマット仕様 Amazon S3 や、複数のフォーマットをサポートする AWS Glue 接続の場合に使用します。サポートされる形式については、「AWS Glue for Spark での入出力のデータ形式に関するオプション」を参照してください。format_options
– 指定された形式についてのオプション。サポートされる形式については、「AWS Glue for Spark での入出力のデータ形式に関するオプション」を参照してください。-
transformation_ctx
- 使用する変換コンテキスト (オプション)。 push_down_predicate
– データセットのすべてのファイルをリストアップして読み取る必要がないフィルタパーティション。詳細については、「プッシュダウン述語を使用した事前フィルタ処理」を参照してください。
add_ingestion_time_columns
add_ingestion_time_columns(dataFrame, timeGranularity = "")
入力 DataFrame
への取り込み時間列 (ingest_year
、ingest_month
、ingest_day
、ingest_hour
、ingest_minute
など) を追加します。Amazon S3 のデータカタログテーブルをターゲットとして指定する場合、この関数は、AWS Glue により生成されたスクリプト内で自動的に生成されます。この関数は、出力テーブル上で、取り込み時間列があるパーティションを自動的に更新します。これにより、入力データに明示的な取り込み時間列を指定しなくても、取り込み時間において出力データの自動的なパーティション化が行えます。
-
dataFrame
– 取り込み時間列の追加先であるdataFrame
。 -
timeGranularity
– 時間列の詳細度。有効な値は「day
」、「hour
」、および「minute
」です。例えば、関数に対し「hour
」が渡された場合、元のdataFrame
は「ingest_year
」、「ingest_month
」、「ingest_day
」に加え「ingest_hour
」の時間列を持つことになります。
時間の詳細度列を追加した後、そのデータフレームを返します。
例:
dynamic_frame = DynamicFrame.fromDF(glueContext.add_ingestion_time_columns(dataFrame, "hour"))
create_data_frame_from_catalog
create_data_frame_from_catalog(database, table_name, transformation_ctx = "",
additional_options = {})
データカタログテーブルからの情報を使用して作成された DataFrame
を返します。
-
database
– 読み取り元のデータカタログデータベース。 -
table_name
– 読み取り元のデータカタログテーブルの名前。 -
transformation_ctx
- 使用する変換コンテキスト (オプション)。 -
additional_options
- オプションの名前と値のペアのコレクション。可能なオプションには、startingPosition
、maxFetchTimeInMs
、およびstartingOffsets
など、ストリーミングソース用として AWS Glue for Spark での ETL の接続タイプとオプション にリストされているものが含まれます。-
useSparkDataSource
– true に設定すると、AWS Glue はネイティブ Spark データソース API を使用してテーブルを読み取るようになります。Spark データソース API でサポートされている形式は、AVRO、バイナリ、CSV、JSON、ORC、Parquet、およびテキストです。データカタログテーブルでは、classification
プロパティを使用して形式を指定します。Spark データソース API の詳細については、公式の「Apache Spark ドキュメント」を参照してください。 create_data_frame_from_catalog
をuseSparkDataSource
と併用すると次のようなメリットがあります。-
DataFrame
を直接返し、create_dynamic_frame.from_catalog().toDF()
の代替手段になります。 -
ネイティブ形式の AWS Lake Formation テーブルレベルのアクセス許可制御をサポートします。
-
AWS Lake Formation テーブルレベルのアクセス許可制御なしでデータレイク形式の読み取りをサポートします。詳細については、「AWS Glue ETL ジョブでのデータレイクフレームワークの使用」を参照してください。
useSparkDataSource
を有効にすると、必要に応じて任意の Spark データソースオプションを additional_options
に追加することもできます。AWSGlue はこれらのオプションを Spark リーダーに直接渡します。 -
-
useCatalogSchema
– true に設定すると、AWS Glue は結果として得られたDataFrame
にデータカタログスキーマを適用します。true に設定しなければ、リーダーはデータからスキーマを推測します。useCatalogSchema
を有効にする場合は、useSparkDataSource
も true に設定する必要もあります。
-
制約事項
useSparkDataSource
オプションを使用する際には、次の制限事項を考慮してください。
-
useSparkDataSource
を使用すると、AWS Glue は元の Spark セッションとは異なる別の Spark セッションで新しいDataFrame
を作成します。 -
Spark DataFrame パーティションフィルタリングは、次の AWS Glue 機能では機能しません。
これらの機能でパーティションフィルタリングを使用するには、AWS Glue プッシュダウン述語を使用します。詳細については、「プッシュダウン述語を使用した事前フィルタ処理」を参照してください。分割されていない列のフィルタリングは影響を受けません。
次のスクリプト例は、
excludeStorageClasses
オプションを使用してパーティションフィルタリングを実行する誤った方法を示しています。// Incorrect partition filtering using Spark filter with excludeStorageClasses read_df = glueContext.create_data_frame.from_catalog( database=database_name, table_name=table_name, additional_options = { "useSparkDataSource": True, "excludeStorageClasses" : ["GLACIER", "DEEP_ARCHIVE"] } ) // Suppose year and month are partition keys. // Filtering on year and month won't work, the filtered_df will still // contain data with other year/month values. filtered_df = read_df.filter("year == '2017 and month == '04' and 'state == 'CA'")
次のスクリプト例は、
excludeStorageClasses
オプションを使用してパーティションフィルタリングを実行するためにプッシュダウン述語を使用する正しい方法を示しています。// Correct partition filtering using the AWS Glue pushdown predicate // with excludeStorageClasses read_df = glueContext.create_data_frame.from_catalog( database=database_name, table_name=table_name, // Use AWS Glue pushdown predicate to perform partition filtering push_down_predicate = "(year=='2017' and month=='04')" additional_options = { "useSparkDataSource": True, "excludeStorageClasses" : ["GLACIER", "DEEP_ARCHIVE"] } ) // Use Spark filter only on non-partitioned columns filtered_df = read_df.filter("state == 'CA'")
例: Spark データソースリーダーを使用した CSV テーブルの作成
// Read a CSV table with '\t' as separator read_df = glueContext.create_data_frame.from_catalog( database=
<database_name>
, table_name=<table_name>
, additional_options = {"useSparkDataSource": True, "sep": '\t'} )
create_data_frame_from_options
create_data_frame_from_options(connection_type, connection_options={},
format=None, format_options={}, transformation_ctx = "")
この API は廃止されました。代わりに、getSource()
API を使用してください。指定された接続と形式で作成された DataFrame
を返します。この関数は、AWS Glue ストリーミングソースのみで使用してください。
-
connection_type
– ストリーミング接続タイプ。有効な値は、kinesis
およびkafka
です。 -
connection_options
– 接続オプション。Kinesis と Kafka では異なります。各ストリーミングデータソースのすべての接続オプションの一覧は、AWS Glue for Spark での ETL の接続タイプとオプション で確認いただけます。ストリーミング接続オプションについては、以下の違いに注意してください。-
Kinesis ストリーミングのソースには
streamARN
、startingPosition
、inferSchema
、およびclassification
が必要です。 -
Kafka ストリーミングのソースには
connectionName
、topicName
、startingOffsets
、inferSchema
、およびclassification
が必要です。
-
-
format
– フォーマット仕様 Amazon S3 や、複数のフォーマットをサポートする AWS Glue 接続の場合に使用します。サポートされる形式については、「AWS Glue for Spark での入出力のデータ形式に関するオプション」を参照してください。 -
format_options
– 指定された形式についてのオプション。サポートされる形式オプションについては、「AWS Glue for Spark での入出力のデータ形式に関するオプション」を参照してください。 -
transformation_ctx
- 使用する変換コンテキスト (オプション)。
Amazon Kinesis ストリーミングソースの例:
kinesis_options = { "streamARN": "arn:aws:kinesis:us-east-2:777788889999:stream/fromOptionsStream", "startingPosition": "TRIM_HORIZON", "inferSchema": "true", "classification": "json" } data_frame_datasource0 = glueContext.create_data_frame.from_options(connection_type="kinesis", connection_options=kinesis_options)
Kafka ストリーミングソースの例:
kafka_options = { "connectionName": "ConfluentKafka", "topicName": "kafka-auth-topic", "startingOffsets": "earliest", "inferSchema": "true", "classification": "json" } data_frame_datasource0 = glueContext.create_data_frame.from_options(connection_type="kafka", connection_options=kafka_options)
forEachBatch
forEachBatch(frame, batch_function, options)
ストリーミングソースから読み取られるすべてのマイクロバッチに渡される、batch_function
を適用します。
-
frame
– 現在のマイクロバッチを含む DataFrame。 -
batch_function
– すべてのマイクロバッチに適用される関数。 -
options
– マイクロバッチの処理方法に関する情報を保持している、キーと値のペアの集合。以下のような必須オプションがあります。-
windowSize
– 各バッチの処理にかかる時間。 -
checkpointLocation
– ストリーミング ETL ジョブ用に、チェックポイントが格納される場所。 -
batchMaxRetries
– 失敗した場合にこのバッチを再試行する最大回数。デフォルト値は 3 です。このオプションは、Glue バージョン 2.0 以降でのみ設定可能です。
-
例:
glueContext.forEachBatch( frame = data_frame_datasource0, batch_function = processBatch, options = { "windowSize": "100 seconds", "checkpointLocation": "s3://kafka-auth-dataplane/confluent-test/output/checkpoint/" } ) def processBatch(data_frame, batchId): if (data_frame.count() > 0): datasource0 = DynamicFrame.fromDF( glueContext.add_ingestion_time_columns(data_frame, "hour"), glueContext, "from_data_frame" ) additionalOptions_datasink1 = {"enableUpdateCatalog": True} additionalOptions_datasink1["partitionKeys"] = ["ingest_yr", "ingest_mo", "ingest_day"] datasink1 = glueContext.write_dynamic_frame.from_catalog( frame = datasource0, database = "tempdb", table_name = "kafka-auth-table-output", transformation_ctx = "datasink1", additional_options = additionalOptions_datasink1 )
Simple Storage Service (Amazon S3) でのデータセットの操作
purge_table
purge_table(catalog_id=None, database="", table_name="", options={},
transformation_ctx="")
指定したカタログのデータベースとテーブルのファイルを Simple Storage Service (Amazon S3) から削除します。パーティション内のすべてのファイルが削除されると、そのパーティションもカタログから削除されます。
削除したオブジェクトを回復できるようにするには、Amazon S3 バケットでオブジェクトのバージョニングを有効にします。オブジェクトバージョニングが有効になっていないバケットからオブジェクトが削除された場合、そのオブジェクトは復元できません。バージョニングが有効にされているバケットで削除されたオブジェクトを復元する方法の詳細については、AWS Support ナレッジセンターで「バージョニングが有効なバケットで削除された Simple Storage Service (Amazon S3) オブジェクトを取得するにはどうすればよいですか?
-
catalog_id
– 現在アクセスされているデータカタログのカタログ ID (データカタログのアカウント ID)。デフォルトでは、None
に設定されています。None
のデフォルト値は、サービス内の呼び出し元アカウントのカタログ ID になります。 database
– 使用するデータベース。table_name
– 使用するテーブルの名前。options
– 削除するファイルのフィルタリングと、マニフェストファイルの生成のためオプション。retentionPeriod
– ファイルを保持する期間を時間単位で指定します。保存期間より新しいファイルは保持されます。デフォルトでは 168 時間(7 日)に設定されています。partitionPredicate
– この述語を満たすパーティションは削除されます。これらのパーティションの保存期間内のファイルは削除されません。""
を設定 – デフォルトでは空です。excludeStorageClasses
–excludeStorageClasses
セット内のストレージクラスを持つファイルは削除されません。デフォルトはSet()
– 空のセットです。manifestFilePath
– マニフェストファイルを生成するためのオプションのパス。正常にパージされたすべてのファイルがSuccess.csv
に記録され、失敗したファイルはFailed.csv
に記録されます。
transformation_ctx
- 使用する変換コンテキスト (オプション)。マニフェストファイルパスで使用されます。
glueContext.purge_table("database", "table", {"partitionPredicate": "(month=='march')", "retentionPeriod": 1, "excludeStorageClasses": ["STANDARD_IA"], "manifestFilePath": "s3://bucketmanifest/"})
purge_s3_path
purge_s3_path(s3_path, options={}, transformation_ctx="")
指定された Amazon S3 パスからファイルを再帰的に削除します。
削除したオブジェクトを回復できるようにするには、Amazon S3 バケットでオブジェクトのバージョニングを有効にします。オブジェクトバージョニングが有効になっていないバケットからオブジェクトが削除された場合、そのオブジェクトは復元できません。バージョニングされたバケットで削除されたオブジェクトを復元する方法の詳細については、AWS Support ナレッジセンターで「バージョニングが有効なバケットで削除された Simple Storage Service (Amazon S3) オブジェクトを取得するにはどうすればよいですか?
s3_path
–s3://<
形式で削除するファイルの Amazon S3 でのパスbucket
>/<prefix
>/options
– 削除するファイルのフィルタリングと、マニフェストファイルの生成のためオプション。retentionPeriod
– ファイルを保持する期間を時間単位で指定します。保存期間より新しいファイルは保持されます。デフォルトでは 168 時間(7 日)に設定されています。excludeStorageClasses
–excludeStorageClasses
セット内のストレージクラスを持つファイルは削除されません。デフォルトはSet()
– 空のセットです。manifestFilePath
– マニフェストファイルを生成するためのオプションのパス。正常にパージされたすべてのファイルがSuccess.csv
に記録され、失敗したファイルはFailed.csv
に記録されます。
transformation_ctx
- 使用する変換コンテキスト (オプション)。マニフェストファイルパスで使用されます。
glueContext.purge_s3_path("s3://bucket/path/", {"retentionPeriod": 1, "excludeStorageClasses": ["STANDARD_IA"], "manifestFilePath": "s3://bucketmanifest/"})
transition_table
transition_table(database, table_name, transition_to, options={}, transformation_ctx="", catalog_id=None)
指定されたカタログのデータベースとテーブルのために、Simple Storage Service (Amazon S3) に格納されているファイルの、ストレージクラスを移行します。
任意の 2 つのストレージクラス間で移行できます。GLACIER
と DEEP_ARCHIVE
のストレージクラスでは、これらのクラスに移行できます。ただし、GLACIER
と DEEP_ARCHIVE
のストレージクラスからの移行には S3 RESTORE
を使用します。
Amazon S3 からファイルまたはパーティションを読み取る AWS Glue ETL ジョブを実行している場合は、一部の Amazon S3 ストレージクラスタイプを除外できます。詳細については、「Excluding Amazon S3 Storage Classes」を参照してください。
database
– 使用するデータベース。table_name
– 使用するテーブルの名前。transition_to
– 移行する先の Amazon S3 ストレージクラス。options
– 削除するファイルのフィルタリングと、マニフェストファイルの生成のためオプション。retentionPeriod
– ファイルを保持する期間を時間単位で指定します。保存期間より新しいファイルは保持されます。デフォルトでは 168 時間(7 日)に設定されています。partitionPredicate
– この述語を満たすパーティションは移行されます。これらのパーティションの保存期間内のファイルは移行されません。""
を設定 – デフォルトでは空です。excludeStorageClasses
–excludeStorageClasses
セット内のストレージクラスを持つファイルは移行されません。デフォルトはSet()
– 空のセットです。manifestFilePath
– マニフェストファイルを生成するためのオプションのパス。正常に移行されたすべてのファイルがSuccess.csv
に記録され、失敗したファイルはFailed.csv
に記録されます。accountId
– 移行変換を実行する Amazon Web Services アカウント ID。この変換には必須です。roleArn
– 移行変換を実行する AWS ロール。この変換には必須です。
transformation_ctx
- 使用する変換コンテキスト (オプション)。マニフェストファイルパスで使用されます。catalog_id
– 現在アクセスされているデータカタログのカタログ ID (データカタログのアカウント ID)。デフォルトでは、None
に設定されています。None
のデフォルト値は、サービス内の呼び出し元アカウントのカタログ ID になります。
glueContext.transition_table("database", "table", "STANDARD_IA", {"retentionPeriod": 1, "excludeStorageClasses": ["STANDARD_IA"], "manifestFilePath": "s3://bucketmanifest/", "accountId": "12345678901", "roleArn": "arn:aws:iam::123456789012:user/example-username"})
transition_s3_path
transition_s3_path(s3_path, transition_to, options={}, transformation_ctx="")
指定された Simple Storage Service (Amazon S3) パス内のファイルのストレージクラスを再帰的に移行します。
任意の 2 つのストレージクラス間で移行できます。GLACIER
と DEEP_ARCHIVE
のストレージクラスでは、これらのクラスに移行できます。ただし、GLACIER
と DEEP_ARCHIVE
のストレージクラスからの移行には S3 RESTORE
を使用します。
Amazon S3 からファイルまたはパーティションを読み取る AWS Glue ETL ジョブを実行している場合は、一部の Amazon S3 ストレージクラスタイプを除外できます。詳細については、「Excluding Amazon S3 Storage Classes」を参照してください。
s3_path
–s3://<
形式で移行するファイルの Amazon S3 でのパスbucket
>/<prefix
>/transition_to
– 移行する先の Amazon S3 ストレージクラス。options
– 削除するファイルのフィルタリングと、マニフェストファイルの生成のためオプション。retentionPeriod
– ファイルを保持する期間を時間単位で指定します。保存期間より新しいファイルは保持されます。デフォルトでは 168 時間(7 日)に設定されています。partitionPredicate
– この述語を満たすパーティションは移行されます。これらのパーティションの保存期間内のファイルは移行されません。""
を設定 – デフォルトでは空です。excludeStorageClasses
–excludeStorageClasses
セット内のストレージクラスを持つファイルは移行されません。デフォルトはSet()
– 空のセットです。manifestFilePath
– マニフェストファイルを生成するためのオプションのパス。正常に移行されたすべてのファイルがSuccess.csv
に記録され、失敗したファイルはFailed.csv
に記録されます。accountId
– 移行変換を実行する Amazon Web Services アカウント ID。この変換には必須です。roleArn
– 移行変換を実行する AWS ロール。この変換には必須です。
transformation_ctx
- 使用する変換コンテキスト (オプション)。マニフェストファイルパスで使用されます。
glueContext.transition_s3_path("s3://bucket/prefix/", "STANDARD_IA", {"retentionPeriod": 1, "excludeStorageClasses": ["STANDARD_IA"], "manifestFilePath": "s3://bucketmanifest/", "accountId": "12345678901", "roleArn": "arn:aws:iam::123456789012:user/example-username"})
抽出
extract_jdbc_conf
extract_jdbc_conf(connection_name, catalog_id = None)
データカタログの AWS Glue 接続オブジェクトから、設定プロパティを使用するキーを持つ dict
を返します。
user
- データベースのユーザー名です。password
- データベースのパスワードです。vendor
- ベンダーを指定します (mysql
、postgresql
、oracle
、sqlserver
など)。enforceSSL
- 安全な接続が必要かどうかを示すブール文字列です。customJDBCCert
- 提示された Amazon S3 パスからの特定のクライアント証明書を使用します。skipCustomJDBCCertValidation
-customJDBCCert
が CA によって検証される必要があるかどうかを示すブール文字列です。customJDBCCertString
- ドライバーの種類に固有のカスタム証明書に関する追加情報です。url
- (廃止) プロトコル、サーバー、ポートのみを含む JDBC URL です。fullUrl
- 接続の作成時に入力された JDBC URL です (AWS Glue バージョン 3.0 以降で使用可能)。
JDBC 設定の取得例:
jdbc_conf = glueContext.extract_jdbc_conf(connection_name="your_glue_connection_name") print(jdbc_conf) >>> {'enforceSSL': 'false', 'skipCustomJDBCCertValidation': 'false', 'url': 'jdbc:mysql://myserver:3306', 'fullUrl': 'jdbc:mysql://myserver:3306/mydb', 'customJDBCCertString': '', 'user': 'admin', 'customJDBCCert': '', 'password': '1234', 'vendor': 'mysql'}
トランザクション
start_transaction
start_transaction(read_only)
新しいトランザクションの開始。Lake Formation startTransaction API を内部的に呼び出します。
read_only
— (ブール値) このトランザクションを読み取り専用にするか、または読み取りおよび書き込みを行うかを示します。読み取り専用のトランザクション ID を使用した書き込みは拒否されます。読み取り専用トランザクションはコミットする必要はありません。
トランザクション ID を返します。
commit_transaction
commit_transaction(transaction_id, wait_for_commit = True)
指定されたトランザクションをコミットしようとします。commit_transaction
でトランザクションのコミットが完了する前に戻ることがあります。Lake Formation startTransaction API を内部的に呼び出します。
transaction_id
— (文字列) コミットするトランザクション。wait_for_commit
— (ブール値)commit_transaction
がすぐに戻るかどうか指定します。デフォルト値は True です。false の場合、commit_transaction
はトランザクションがコミットされるまでポーリングし待機します。最大で 6 回の再試行でエクスポネンシャルバックオフを使用すると、待機時間は 1 分に制限されます。
コミットが完了したかどうかを示すブール値を返します。
cancel_transaction
cancel_transaction(transaction_id)
指定されたトランザクションをキャンセルしようとします。戻り値は、トランザクションが以前にコミットされた場合は TransactionCommittedException
例外です。Lake Formation startTransaction API を内部的に呼び出します。
-
transaction_id
— (文字列) キャンセルするトランザクション。
書き込み
getSink
getSink(connection_type, format = None, transformation_ctx = "", **options)
外部ソースに DynamicFrames
を書き込むために使用できる DataSink
オブジェクトを取得します。期待しているシンクを確実に取得するために、SparkSQL format
を最初に確認します。
connection_type
– 使用する接続タイプ (Amazon S3、Amazon Redshift、JDBC など)。有効な値には、s3
、mysql
、postgresql
、redshift
、sqlserver
、oracle
、kinesis
、kafka
が含まれています。format
– 使用する SparkSQL 形式 (オプション)。transformation_ctx
- 使用する変換コンテキスト (オプション)。options
— 接続オプションの指定に使用される名前と値のペアの集合。指定できる値は以下のとおりです。-
user
とpassword
: 認可用 -
url
: データストアのエンドポイント -
dbtable
: ターゲットテーブルの名前。 -
bulkSize
: 挿入操作の並列度の度合い
-
指定できるオプションは、接続タイプによって異なります。追加の値と例については、「AWS Glue for Spark での ETL の接続タイプとオプション」を参照してください。
例:
>>> data_sink = context.getSink("s3") >>> data_sink.setFormat("json"), >>> data_sink.writeFrame(myFrame)
write_dynamic_frame_from_options
write_dynamic_frame_from_options(frame, connection_type, connection_options={}, format=None,
format_options={}, transformation_ctx = "")
指定された接続と形式を使用して DynamicFrame
を書き込み、返します。
frame
- 書き込むDynamicFrame
。connection_type
– 接続タイプ (Amazon S3、Amazon Redshift、JDBC など)。有効な値には、s3
、mysql
、postgresql
、redshift
、sqlserver
、oracle
、kinesis
、kafka
が含まれています。connection_options
- 接続オプション (パスやデータベーステーブルなど) (オプション)。s3
のconnection_type
では、Amazon S3 パスが定義されています。connection_options = {"path": "
s3://aws-glue-target/temp
"}JDBC 接続の場合、いくつかのプロパティを定義する必要があります。データベース名は URL の一部である必要があることに注意してください。オプションで接続オプションに含めることができます。
警告
スクリプトにパスワードを保存することはお勧めしません。AWS Secrets Manager または AWS Glue データカタログから取得する場合には、
boto3
を使用することを検討してください。connection_options = {"url": "
jdbc-url/database
", "user": "username
", "password":passwordVariable
,"dbtable": "table-name
", "redshiftTmpDir": "s3-tempdir-path
"}dbtable
プロパティは JDBC テーブルの名前です。データベース内でスキーマをサポートする JDBC データストアの場合、schema.table-name
を指定します。スキーマを指定しない場合、デフォルトの「パブリック」スキーマが使用されます。詳細については、「AWS Glue for Spark での ETL の接続タイプとオプション」を参照してください。
format
– フォーマット仕様 Amazon S3 や、複数のフォーマットをサポートする AWS Glue 接続の場合に使用します。サポートされる形式については、「AWS Glue for Spark での入出力のデータ形式に関するオプション」を参照してください。format_options
– 指定された形式についてのオプション。サポートされる形式については、「AWS Glue for Spark での入出力のデータ形式に関するオプション」を参照してください。transformation_ctx
- 使用する変換コンテキスト (オプション)。
write_from_options
write_from_options(frame_or_dfc, connection_type,
connection_options={}, format={}, format_options={}, transformation_ctx = "")
指定された接続および形式情報で作成された DynamicFrame
または DynamicFrameCollection
を書き込み、返します。
frame_or_dfc
- 書き込むDynamicFrame
またはDynamicFrameCollection
。connection_type
– 接続タイプ (Amazon S3、Amazon Redshift、JDBC など)。有効な値には、s3
、mysql
、postgresql
、redshift
、sqlserver
、およびoracle
があります。connection_options
- 接続オプション (パスやデータベーステーブルなど) (オプション)。s3
のconnection_type
では、Amazon S3 パスが定義されています。connection_options = {"path": "
s3://aws-glue-target/temp
"}JDBC 接続の場合、いくつかのプロパティを定義する必要があります。データベース名は URL の一部である必要があることに注意してください。オプションで接続オプションに含めることができます。
警告
スクリプトにパスワードを保存することはお勧めしません。AWS Secrets Manager または AWS Glue データカタログから取得する場合には、
boto3
を使用することを検討してください。connection_options = {"url": "
jdbc-url/database
", "user": "username
", "password":passwordVariable
,"dbtable": "table-name
", "redshiftTmpDir": "s3-tempdir-path
"}dbtable
プロパティは JDBC テーブルの名前です。データベース内でスキーマをサポートする JDBC データストアの場合、schema.table-name
を指定します。スキーマを指定しない場合、デフォルトの「パブリック」スキーマが使用されます。詳細については、「AWS Glue for Spark での ETL の接続タイプとオプション」を参照してください。
format
– フォーマット仕様 Amazon S3 や、複数のフォーマットをサポートする AWS Glue 接続の場合に使用します。サポートされる形式については、「AWS Glue for Spark での入出力のデータ形式に関するオプション」を参照してください。format_options
– 指定された形式についてのオプション。サポートされる形式については、「AWS Glue for Spark での入出力のデータ形式に関するオプション」を参照してください。transformation_ctx
- 使用する変換コンテキスト (オプション)。
write_dynamic_frame_from_catalog
write_dynamic_frame_from_catalog(frame, database, table_name, redshift_tmp_dir, transformation_ctx = "", additional_options = {}, catalog_id = None)
データカタログデータベースとテーブルからの情報を使用して、記述した DynamicFrame
を返します。
frame
- 書き込むDynamicFrame
。Database
– テーブルを含むデータカタログデータベース。table_name
– ターゲットに関連付けられたデータカタログテーブルの名前。redshift_tmp_dir
- 使用する Amazon Redshift の一時ディレクトリ (オプション)。transformation_ctx
- 使用する変換コンテキスト (オプション)。-
additional_options
- オプションの名前と値のペアのコレクション。 catalog_id
– 現在アクセスされているデータカタログのカタログ ID (アカウント ID)。None の場合は、呼び出し元のアカウント ID のデフォルトが使用されます。
write_data_frame_from_catalog
write_data_frame_from_catalog(frame, database, table_name, redshift_tmp_dir,
transformation_ctx = "", additional_options = {}, catalog_id = None)
データカタログデータベースとテーブルからの情報を使用して、記述した DataFrame
を返します。このメソッドは、データレイク形式 (Hudi、Iceberg、および Delta Lake) への書き込みをサポートします。詳細については、「AWS Glue ETL ジョブでのデータレイクフレームワークの使用」を参照してください。
frame
- 書き込むDataFrame
。Database
– テーブルを含むデータカタログデータベース。table_name
– ターゲットに関連付けられたデータカタログテーブルの名前。redshift_tmp_dir
- 使用する Amazon Redshift の一時ディレクトリ (オプション)。transformation_ctx
- 使用する変換コンテキスト (オプション)。-
additional_options
- オプションの名前と値のペアのコレクション。-
useSparkDataSink
– true に設定すると、AWS Glue はネイティブ Spark データシンク API を使用してテーブルに書き込むようになります。このオプションを有効にすると、必要に応じて任意の Spark データソースオプションを additional_options
に追加できます。AWSGlue はこれらのオプションを Spark ライターに直接渡します。
-
catalog_id
– 現在アクセスされているデータカタログのカタログ ID (アカウント ID)。値を指定しない場合、呼び出し元のアカウント ID が使用されます。
制約事項
useSparkDataSink
オプションを使用する際には、次の制限事項を考慮してください。
-
useSparkDataSink
オプションを使用する場合、enableUpdateCatalog オプションはサポートされません。
例: Spark データソースライターを使用して Hudi テーブルに書き込む
hudi_options = { 'useSparkDataSink': True, 'hoodie.table.name':
<table_name>
, 'hoodie.datasource.write.storage.type': 'COPY_ON_WRITE', 'hoodie.datasource.write.recordkey.field': 'product_id', 'hoodie.datasource.write.table.name':<table_name>
, 'hoodie.datasource.write.operation': 'upsert', 'hoodie.datasource.write.precombine.field': 'updated_at', 'hoodie.datasource.write.hive_style_partitioning': 'true', 'hoodie.upsert.shuffle.parallelism': 2, 'hoodie.insert.shuffle.parallelism': 2, 'hoodie.datasource.hive_sync.enable': 'true', 'hoodie.datasource.hive_sync.database':<database_name>
, 'hoodie.datasource.hive_sync.table':<table_name>
, 'hoodie.datasource.hive_sync.use_jdbc': 'false', 'hoodie.datasource.hive_sync.mode': 'hms'} glueContext.write_data_frame.from_catalog( frame =<df_product_inserts>
, database =<database_name>
, table_name =<table_name>
, additional_options = hudi_options )
write_dynamic_frame_from_jdbc_conf
write_dynamic_frame_from_jdbc_conf(frame, catalog_connection, connection_options={},
redshift_tmp_dir = "", transformation_ctx = "", catalog_id = None)
指定された JDBC 接続情報を使用して DynamicFrame
を書き込み、返します。
frame
- 書き込むDynamicFrame
。catalog_connection
- 使用するカタログ接続。connection_options
- 接続オプション (パスやデータベーステーブルなど) (オプション)。詳細については、「AWS Glue for Spark での ETL の接続タイプとオプション」を参照してください。redshift_tmp_dir
- 使用する Amazon Redshift の一時ディレクトリ (オプション)。transformation_ctx
- 使用する変換コンテキスト (オプション)。catalog_id
– 現在アクセスされているデータカタログのカタログ ID (アカウント ID)。None の場合は、呼び出し元のアカウント ID のデフォルトが使用されます。
write_from_jdbc_conf
write_from_jdbc_conf(frame_or_dfc, catalog_connection, connection_options={}, redshift_tmp_dir = "", transformation_ctx = "", catalog_id = None)
指定された JDBC 接続情報を使用して DynamicFrame
または DynamicFrameCollection
を書き込み、返します。
frame_or_dfc
- 書き込むDynamicFrame
またはDynamicFrameCollection
。catalog_connection
- 使用するカタログ接続。connection_options
- 接続オプション (パスやデータベーステーブルなど) (オプション)。詳細については、「AWS Glue for Spark での ETL の接続タイプとオプション」を参照してください。redshift_tmp_dir
- 使用する Amazon Redshift の一時ディレクトリ (オプション)。transformation_ctx
- 使用する変換コンテキスト (オプション)。catalog_id
– 現在アクセスされているデータカタログのカタログ ID (アカウント ID)。None の場合は、呼び出し元のアカウント ID のデフォルトが使用されます。