GlueContext クラス - AWS Glue

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

GlueContext クラス

Apache Spark の SparkContext オブジェクトをラップすることにより、Apache Spark プラットフォームとやり取りするためのメカニズムが提供されます。

__init__

__init__(sparkContext)
  • sparkContext - 使用する Apache Spark のコンテキスト。

[Creating] (作成中)

getSource

getSource(connection_type, transformation_ctx = "", **options)

外部ソースから DynamicFrames を読み取るために使用できる DataSource オブジェクトを作成します。

  • connection_type – 使用する接続タイプ (Amazon Simple Storage Service (Amazon S3)、Amazon Redshift、JDBC など)。有効な値には、s3mysqlpostgresqlredshiftsqlserveroracle および dynamodb があります。

  • transformation_ctx - 使用する変換コンテキスト (オプション)。

  • options - オプションの名前と値のペアのコレクション。詳しくは、「AWS Glue for Spark での ETL の接続タイプとオプション」を参照してください。

以下は、getSource の使用例です。

>>> data_source = context.getSource("file", paths=["/in/path"]) >>> data_source.setFormat("json") >>> myFrame = data_source.getFrame()

create_dynamic_frame_from_rdd

create_dynamic_frame_from_rdd(data, name, schema=None, sample_ratio=None, transformation_ctx="")

Apache Spark Resilient Distributed Dataset (RDD) から作成された DynamicFrame を返します。

  • data - 使用するデータソース。

  • name - 使用するデータの名前。

  • schema - 使用するスキーマ (オプション)。

  • sample_ratio - 使用するサンプル比率 (オプション)。

  • transformation_ctx - 使用する変換コンテキスト (オプション)。

create_dynamic_frame_from_catalog

create_dynamic_frame_from_catalog(database, table_name, redshift_tmp_dir, transformation_ctx = "", push_down_predicate= "", additional_options = {}, catalog_id = None)

データカタログデータベースとテーブル名を使用して作成された DynamicFrame を返します。このメソッドを使用するときは、指定した AWS Glue データカタログテーブルのテーブルプロパティを使用して format_options を指定し、additional_options 引数を使用して他のオプションを指定します。

  • Database - 読み取り元のデータベース。

  • table_name - 読み取り元のテーブルの名前。

  • redshift_tmp_dir - 使用する Amazon Redshift の一時ディレクトリ (オプション)。

  • transformation_ctx - 使用する変換コンテキスト (オプション)。

  • push_down_predicate – データセットのすべてのファイルをリストアップして読み取る必要がないフィルタパーティション。詳しくは、「プッシュダウン述語を使用した事前フィルタ処理」を参照してください。

  • additional_options - オプションの名前と値のペアのコレクション。AWS Glue for Spark での ETL の接続タイプとオプション でリストされている使用可能なオプション (endpointUrlstreamNamebootstrap.serverssecurity.protocoltopicNameclassification、および delimiter を除く)。別のオプションとして、catalogPartitionPredicate もサポートされています。

    catalogPartitionPredicate – カタログ式を渡して、インデックス列に基づいたフィルタリングができます。これにより、フィルタリングをサーバー側で処理できます。詳細については、「AWS Glue パーティションインデックス」を参照してください。push_down_predicatecatalogPartitionPredicate では、異なる構文が使用されることに注意してください。前者では Spark SQL の標準構文を使用し、後者では JSQL パーサーを使用します。

  • catalog_id – 現在アクセスされているデータカタログのカタログ ID (アカウント ID)。None の場合は、呼び出し元のアカウント ID のデフォルトが使用されます。

create_dynamic_frame_from_options

create_dynamic_frame_from_options(connection_type, connection_options={}, format=None, format_options={}, transformation_ctx = "")

指定された接続と形式で作成された DynamicFrame を返します。

  • connection_type – 接続タイプ (Amazon S3、Amazon Redshift、JDBC など)。有効な値には、s3mysqlpostgresqlredshiftsqlserveroracle および dynamodb があります。

  • connection_options – パスやデータベーステーブルなど接続オプション (オプション)。s3connection_type に関しては、Amazon S3 パスのリストが定義されています。

    connection_options = {"paths": ["s3://aws-glue-target/temp"]}

    JDBC 接続の場合、いくつかのプロパティを定義する必要があります。データベース名は URL の一部である必要があることに注意してください。オプションで接続オプションに含めることができます。

    警告

    スクリプトにパスワードを保存することはお勧めしません。AWS Secrets Manager または AWS Glue データカタログから取得する場合には、boto3 を使用することを検討してください。

    connection_options = {"url": "jdbc-url/database", "user": "username", "password": passwordVariable,"dbtable": "table-name", "redshiftTmpDir": "s3-tempdir-path"}

    dbtable プロパティは JDBC テーブルの名前です。データベース内でスキーマをサポートする JDBC データストアの場合、schema.table-name を指定します。スキーマを指定しない場合、デフォルトの「パブリック」スキーマが使用されます。

    詳しくは、「AWS Glue for Spark での ETL の接続タイプとオプション」を参照してください。

  • format – 形式の仕様 (オプション)。Amazon S3 や、複数のフォーマットをサポートする AWS Glue 接続の場合に使用します。サポートされる形式については、「AWS Glue for Spark での入出力のデータ形式に関するオプション」を参照してください。

  • format_options – 指定された形式についてのオプション。サポートされる形式については、「AWS Glue for Spark での入出力のデータ形式に関するオプション」を参照してください。

  • transformation_ctx - 使用する変換コンテキスト (オプション)。

  • push_down_predicate – データセットのすべてのファイルをリストアップして読み取る必要がないフィルタパーティション。詳細については、「プッシュダウン述語を使用した事前フィルタ処理」を参照してください。

create_sample_dynamic_frame_from_catalog

create_sample_dynamic_frame_from_catalog(database, table_name, num, redshift_tmp_dir, transformation_ctx = "", push_down_predicate= "", additional_options = {}, sample_options = {}, catalog_id = None)

データカタログデータベースとテーブル名を使用して作成されたサンプル DynamicFrame を返します。DynamicFrame にはデータソースからの最初の num レコードのみが含まれます。

  • database - 読み取り元のデータベース。

  • table_name - 読み取り元のテーブルの名前。

  • num — 返されるサンプル動的フレーム内のレコードの最大数。

  • redshift_tmp_dir - 使用する Amazon Redshift の一時ディレクトリ (オプション)。

  • transformation_ctx - 使用する変換コンテキスト (オプション)。

  • push_down_predicate – データセットのすべてのファイルをリストアップして読み取る必要がないフィルタパーティション。詳しくは、「プッシュダウン述語を使用した事前フィルタ処理」を参照してください。

  • additional_options - オプションの名前と値のペアのコレクション。AWS Glue for Spark での ETL の接続タイプとオプション でリストされている使用可能なオプション (endpointUrlstreamNamebootstrap.serverssecurity.protocoltopicNameclassification、および delimiter を除く)。

  • sample_options— サンプリング動作を制御するパラメータ (オプション)。Simple Storage Service (Amazon S3) ソースで現在使用可能なパラメータ:

    • maxSamplePartitions — サンプリングが読み取るパーティションの最大数。デフォルト値は 10 です

    • maxSampleFilesPerPartition — サンプリングが 1 つのパーティションで読み取るファイルの最大数。デフォルト値は 10 です。

      これらのパラメータは、ファイル一覧で消費される時間を短縮するのに役立ちます。例えば、データセットに 1000 個のパーティションがあり、各パーティションには 10 個のファイルがあるとします。10,000 個のファイルをすべてリスト表示する代わりに、maxSamplePartitions = 10 および maxSampleFilesPerPartition = 10 と設定した場合、サンプリングでは、最初の 10 個のパーティションの最初の 10 個のファイルのみがリスト表示されて読み込まれ、合計で 10*10 = 100 個のファイルとなります。

  • catalog_id – 現在アクセスされているデータカタログのカタログ ID (データカタログのアカウント ID)。デフォルトでは、None に設定されています。None のデフォルト値は、サービス内の呼び出し元アカウントのカタログ ID になります。

create_sample_dynamic_frame_from_options

create_sample_dynamic_frame_from_options(connection_type, connection_options={}, num, sample_options={}, format=None, format_options={}, transformation_ctx = "")

指定された接続と形式で作成されたサンプル DynamicFrame を返します。DynamicFrame にはデータソースからの最初の num レコードのみが含まれます。

  • connection_type – 接続タイプ (Amazon S3、Amazon Redshift、JDBC など)。有効な値には、s3mysqlpostgresqlredshiftsqlserveroracle および dynamodb があります。

  • connection_options – パスやデータベーステーブルなど接続オプション (オプション)。詳しくは、「AWS Glue for Spark での ETL の接続タイプとオプション」を参照してください。

  • num — 返されるサンプル動的フレーム内のレコードの最大数。

  • sample_options— サンプリング動作を制御するパラメータ (オプション)。Simple Storage Service (Amazon S3) ソースで現在使用可能なパラメータ:

    • maxSamplePartitions — サンプリングが読み取るパーティションの最大数。デフォルト値は 10 です

    • maxSampleFilesPerPartition — サンプリングが 1 つのパーティションで読み取るファイルの最大数。デフォルト値は 10 です。

      これらのパラメータは、ファイル一覧で消費される時間を短縮するのに役立ちます。例えば、データセットに 1000 個のパーティションがあり、各パーティションには 10 個のファイルがあるとします。10,000 個のファイルをすべてリスト表示する代わりに、maxSamplePartitions = 10 および maxSampleFilesPerPartition = 10 と設定した場合、サンプリングでは、最初の 10 個のパーティションの最初の 10 個のファイルのみがリスト表示されて読み込まれ、合計で 10*10 = 100 個のファイルとなります。

  • format – 形式の仕様 (オプション)。Amazon S3 や、複数のフォーマットをサポートする AWS Glue 接続の場合に使用します。サポートされる形式については、「AWS Glue for Spark での入出力のデータ形式に関するオプション」を参照してください。

  • format_options – 指定された形式についてのオプション。サポートされる形式については、「AWS Glue for Spark での入出力のデータ形式に関するオプション」を参照してください。

  • transformation_ctx - 使用する変換コンテキスト (オプション)。

  • push_down_predicate – データセットのすべてのファイルをリストアップして読み取る必要がないフィルタパーティション。詳しくは、「プッシュダウン述語を使用した事前フィルタ処理」を参照してください。

add_ingestion_time_columns

add_ingestion_time_columns(dataFrame, timeGranularity = "")

入力 DataFrame への取り込み時間列 (ingest_yearingest_monthingest_dayingest_houringest_minute) を追加します。Amazon S3 のデータカタログテーブルをターゲットとして指定する場合、この関数は、AWS Glue により生成されたスクリプト内で自動的に生成されます。この関数は、出力テーブル上で、取り込み時間列があるパーティションを自動的に更新します。これにより、入力データに明示的な取り込み時間列を指定しなくても、取り込み時間において出力データの自動的なパーティション化が行えます。

  • dataFrame – 取り込み時間列の追加先である dataFrame

  • timeGranularity – 時間列の詳細度。有効な値は「day」、「hour」、および「minute」です。例えば、関数に対し「hour」が渡された場合、元の dataFrame は「ingest_year」、「ingest_month」、「ingest_day」に加え「ingest_hour」の時間列を持つことになります。

時間の詳細度列を追加した後、そのデータフレームを返します。

例:

dynamic_frame = DynamicFrame.fromDF(glueContext.add_ingestion_time_columns(dataFrame, "hour"))

create_data_frame_from_catalog

create_data_frame_from_catalog(database, table_name, transformation_ctx = "", additional_options = {})

データカタログテーブルからの情報を使用して作成された DataFrame を返します。

  • database – 読み取り元のデータカタログデータベース。

  • table_name – 読み取り元のデータカタログテーブルの名前。

  • transformation_ctx - 使用する変換コンテキスト (オプション)。

  • additional_options - オプションの名前と値のペアのコレクション。可能なオプションには、startingPositionmaxFetchTimeInMs、および startingOffsets など、ストリーミングソース用として AWS Glue for Spark での ETL の接続タイプとオプション にリストされているものが含まれます。

    • useSparkDataSource – true に設定すると、AWS Glue はネイティブ Spark データソース API を使用してテーブルを読み取るようになります。Spark データソース API でサポートされている形式は、AVRO、バイナリ、CSV、JSON、ORC、Parquet、およびテキストです。データカタログテーブルでは、classification プロパティを使用して形式を指定します。Spark データソース API の詳細については、公式の Apache Spark ドキュメントを参照してください。

      create_data_frame_from_cataloguseSparkDataSource と併用すると次のようなメリットがあります。

      • DataFrame を直接返し、create_dynamic_frame.from_catalog().toDF() の代替手段になります。

      • ネイティブ形式の AWS Lake Formation テーブルレベルのアクセス許可制御をサポートします。

      • AWS Lake Formation テーブルレベルのアクセス許可制御なしでデータレイク形式の読み取りをサポートします。詳細については、「AWS Glue ETL ジョブでのデータレイクフレームワークの使用」を参照してください。

      useSparkDataSource を有効にすると、必要に応じて任意の Spark データソースオプションadditional_options に追加することもできます。AWSGlue はこれらのオプションを Spark リーダーに直接渡します。

    • useCatalogSchema – true に設定すると、AWS Glue は結果として得られた DataFrame にデータカタログスキーマを適用します。true に設定しなければ、リーダーはデータからスキーマを推測します。useCatalogSchema を有効にする場合は、useSparkDataSource も true に設定する必要もあります。

機能制限

useSparkDataSource オプションを使用する際には、次の制限事項を考慮してください。

  • useSparkDataSource を使用すると、AWS Glue は元の Spark セッションとは異なる別の Spark セッションで新しい DataFrame を作成します。

  • Spark DataFrame パーティションフィルタリングは、次の AWS Glue 機能では機能しません。

    これらの機能でパーティションフィルタリングを使用するには、AWS Glue プッシュダウン述語を使用します。詳細については、「プッシュダウン述語を使用した事前フィルタ処理」を参照してください。分割されていない列のフィルタリングは影響を受けません。

    次のスクリプト例は、excludeStorageClasses オプションを使用してパーティションフィルタリングを実行する誤った方法を示しています。

    // Incorrect partition filtering using Spark filter with excludeStorageClasses read_df = glueContext.create_data_frame.from_catalog( database=database_name, table_name=table_name, additional_options = { "useSparkDataSource": True, "excludeStorageClasses" : ["GLACIER", "DEEP_ARCHIVE"] } ) // Suppose year and month are partition keys. // Filtering on year and month won't work, the filtered_df will still // contain data with other year/month values. filtered_df = read_df.filter("year == '2017 and month == '04' and 'state == 'CA'")

    次のスクリプト例は、excludeStorageClasses オプションを使用してパーティションフィルタリングを実行するためにプッシュダウン述語を使用する正しい方法を示しています。

    // Correct partition filtering using the AWS Glue pushdown predicate // with excludeStorageClasses read_df = glueContext.create_data_frame.from_catalog( database=database_name, table_name=table_name, // Use AWS Glue pushdown predicate to perform partition filtering push_down_predicate = "(year=='2017' and month=='04')" additional_options = { "useSparkDataSource": True, "excludeStorageClasses" : ["GLACIER", "DEEP_ARCHIVE"] } ) // Use Spark filter only on non-partitioned columns filtered_df = read_df.filter("state == 'CA'")

例: Spark データソースリーダーを使用した CSV テーブルの作成

// Read a CSV table with '\t' as separator read_df = glueContext.create_data_frame.from_catalog( database=<database_name>, table_name=<table_name>, additional_options = {"useSparkDataSource": True, "sep": '\t'} )

create_data_frame_from_options

create_data_frame_from_options(connection_type, connection_options={}, format=None, format_options={}, transformation_ctx = "")

この API は廃止されました。代わりに、getSource() API を使用してください。指定された接続と形式で作成された DataFrame を返します。この関数は、AWS Glue ストリーミングソースのみで使用してください。

  • connection_type – ストリーミング接続タイプ。有効な値は、kinesis および kafka です。

  • connection_options – 接続オプション。Kinesis と Kafka では異なります。各ストリーミングデータソースのすべての接続オプションの一覧は、AWS Glue for Spark での ETL の接続タイプとオプション で確認いただけます。ストリーミング接続オプションについては、以下の違いに注意してください。

    • Kinesis ストリーミングのソースには streamARNstartingPositioninferSchema、および classification が必要です。

    • Kafka ストリーミングのソースには connectionNametopicNamestartingOffsetsinferSchema、および classification が必要です。

  • format – 形式の仕様 (オプション)。Amazon S3 や、複数のフォーマットをサポートする AWS Glue 接続の場合に使用します。サポートされる形式については、「AWS Glue for Spark での入出力のデータ形式に関するオプション」を参照してください。

  • format_options – 指定された形式についてのオプション。サポートされる形式オプションについては、「AWS Glue for Spark での入出力のデータ形式に関するオプション」を参照してください。

  • transformation_ctx - 使用する変換コンテキスト (オプション)。

Amazon Kinesis ストリーミングソースの例:

kinesis_options = { "streamARN": "arn:aws:kinesis:us-east-2:777788889999:stream/fromOptionsStream", "startingPosition": "TRIM_HORIZON", "inferSchema": "true", "classification": "json" } data_frame_datasource0 = glueContext.create_data_frame.from_options(connection_type="kinesis", connection_options=kinesis_options)

Kafka ストリーミングソースの例:

kafka_options = { "connectionName": "ConfluentKafka", "topicName": "kafka-auth-topic", "startingOffsets": "earliest", "inferSchema": "true", "classification": "json" } data_frame_datasource0 = glueContext.create_data_frame.from_options(connection_type="kafka", connection_options=kafka_options)

forEachBatch

forEachBatch(frame, batch_function, options)

ストリーミングソースから読み取られるすべてのマイクロバッチに渡される、batch_function を適用します。

  • frame – 現在のマイクロバッチを含む DataFrame。

  • batch_function – すべてのマイクロバッチに適用される関数。

  • options – マイクロバッチの処理方法に関する情報を保持している、キーと値のペアの集合。以下のような必須オプションがあります。

    • windowSize – 各バッチの処理にかかる時間。

    • checkpointLocation – ストリーミング ETL ジョブ用に、チェックポイントが格納される場所。

    • batchMaxRetries – 失敗した場合にこのバッチを再試行する最大回数。デフォルト値は 3 です。このオプションは、Glue バージョン 2.0 以降でのみ設定可能です。

例:

glueContext.forEachBatch( frame = data_frame_datasource0, batch_function = processBatch, options = { "windowSize": "100 seconds", "checkpointLocation": "s3://kafka-auth-dataplane/confluent-test/output/checkpoint/" } ) def processBatch(data_frame, batchId): if (data_frame.count() > 0): datasource0 = DynamicFrame.fromDF( glueContext.add_ingestion_time_columns(data_frame, "hour"), glueContext, "from_data_frame" ) additionalOptions_datasink1 = {"enableUpdateCatalog": True} additionalOptions_datasink1["partitionKeys"] = ["ingest_yr", "ingest_mo", "ingest_day"] datasink1 = glueContext.write_dynamic_frame.from_catalog( frame = datasource0, database = "tempdb", table_name = "kafka-auth-table-output", transformation_ctx = "datasink1", additional_options = additionalOptions_datasink1 )

Simple Storage Service (Amazon S3) でのデータセットの操作

purge_table

purge_table(catalog_id=None, database="", table_name="", options={}, transformation_ctx="")

指定したカタログのデータベースとテーブルのファイルを Simple Storage Service (Amazon S3) から削除します。パーティション内のすべてのファイルが削除されると、そのパーティションもカタログから削除されます。

削除したオブジェクトを回復できるようにするには、Amazon S3 バケットでオブジェクトのバージョニングを有効にします。オブジェクトバージョニングが有効になっていないバケットからオブジェクトが削除された場合、そのオブジェクトは復元できません。バージョニングが有効にされているバケットで削除されたオブジェクトを復元する方法の詳細については、AWS Support ナレッジセンターで「バージョニングが有効なバケットで削除された Simple Storage Service (Amazon S3) オブジェクトを取得するにはどうすればよいですか?」を参照してください。

  • catalog_id – 現在アクセスされているデータカタログのカタログ ID (データカタログのアカウント ID)。デフォルトでは、None に設定されています。None のデフォルト値は、サービス内の呼び出し元アカウントのカタログ ID になります。

  • database – 使用するデータベース。

  • table_name – 使用するテーブルの名前。

  • options – 削除するファイルのフィルタリングと、マニフェストファイルの生成のためオプション。

    • retentionPeriod – ファイルを保持する期間を時間単位で指定します。保存期間より新しいファイルは保持されます。デフォルトでは 168 時間(7 日)に設定されています。

    • partitionPredicate – この述語を満たすパーティションは削除されます。これらのパーティションの保存期間内のファイルは削除されません。"" を設定 – デフォルトでは空です。

    • excludeStorageClassesexcludeStorageClasses セット内のストレージクラスを持つファイルは削除されません。デフォルトは Set() – 空のセットです。

    • manifestFilePath – マニフェストファイルを生成するためのオプションのパス。正常にパージされたすべてのファイルが Success.csv に記録され、失敗したファイルは Failed.csv に記録されます。

  • transformation_ctx - 使用する変換コンテキスト (オプション)。マニフェストファイルパスで使用されます。

glueContext.purge_table("database", "table", {"partitionPredicate": "(month=='march')", "retentionPeriod": 1, "excludeStorageClasses": ["STANDARD_IA"], "manifestFilePath": "s3://bucketmanifest/"})

purge_s3_path

purge_s3_path(s3_path, options={}, transformation_ctx="")

指定された Amazon S3 パスからファイルを再帰的に削除します。

削除したオブジェクトを回復できるようにするには、Amazon S3 バケットでオブジェクトのバージョニングを有効にします。オブジェクトバージョニングが有効になっていないバケットからオブジェクトが削除された場合、そのオブジェクトは復元できません。バージョニングされたバケットで削除されたオブジェクトを復元する方法の詳細については、AWS Support ナレッジセンターで「バージョニングが有効なバケットで削除された Simple Storage Service (Amazon S3) オブジェクトを取得するにはどうすればよいですか?」を参照してください。

  • s3_path – 削除するファイルを指す Simple Storage Service (Amazon S3) のパス (s3://<bucket>/<prefix>/ 形式)

  • options – 削除するファイルのフィルタリングと、マニフェストファイルの生成のためオプション。

    • retentionPeriod – ファイルを保持する期間を時間単位で指定します。保存期間より新しいファイルは保持されます。デフォルトでは 168 時間(7 日)に設定されています。

    • excludeStorageClassesexcludeStorageClasses セット内のストレージクラスを持つファイルは削除されません。デフォルトは Set() – 空のセットです。

    • manifestFilePath – マニフェストファイルを生成するためのオプションのパス。正常にパージされたすべてのファイルが Success.csv に記録され、失敗したファイルは Failed.csv に記録されます。

  • transformation_ctx - 使用する変換コンテキスト (オプション)。マニフェストファイルパスで使用されます。

glueContext.purge_s3_path("s3://bucket/path/", {"retentionPeriod": 1, "excludeStorageClasses": ["STANDARD_IA"], "manifestFilePath": "s3://bucketmanifest/"})

transition_table

transition_table(database, table_name, transition_to, options={}, transformation_ctx="", catalog_id=None)

指定されたカタログのデータベースとテーブルのために、Simple Storage Service (Amazon S3) に格納されているファイルの、ストレージクラスを移行します。

任意の 2 つのストレージクラス間で移行できます。GLACIERDEEP_ARCHIVE のストレージクラスでは、これらのクラスに移行できます。ただし、GLACIERDEEP_ARCHIVE のストレージクラスからの移行には S3 RESTORE を使用します。

Amazon S3 からファイルまたはパーティションを読み取る AWS Glue ETL ジョブを実行している場合は、一部の Amazon S3 ストレージクラスタイプを除外できます。詳細については、「Excluding Amazon S3 Storage Classes」を参照してください。

  • database – 使用するデータベース。

  • table_name – 使用するテーブルの名前。

  • transition_to – 移行する先の Amazon S3 ストレージクラス

  • options – 削除するファイルのフィルタリングと、マニフェストファイルの生成のためオプション。

    • retentionPeriod – ファイルを保持する期間を時間単位で指定します。保存期間より新しいファイルは保持されます。デフォルトでは 168 時間(7 日)に設定されています。

    • partitionPredicate – この述語を満たすパーティションは移行されます。これらのパーティションの保存期間内のファイルは移行されません。"" を設定 – デフォルトでは空です。

    • excludeStorageClassesexcludeStorageClasses セット内のストレージクラスを持つファイルは移行されません。デフォルトは Set() – 空のセットです。

    • manifestFilePath – マニフェストファイルを生成するためのオプションのパス。正常に移行されたすべてのファイルが Success.csv に記録され、失敗したファイルは Failed.csv に記録されます。

    • accountId – 移行変換を実行する Amazon Web Services アカウント ID。この変換には必須です。

    • roleArn – 移行変換を実行する AWS ロール。この変換には必須です。

  • transformation_ctx - 使用する変換コンテキスト (オプション)。マニフェストファイルパスで使用されます。

  • catalog_id – 現在アクセスされているデータカタログのカタログ ID (データカタログのアカウント ID)。デフォルトでは、None に設定されています。None のデフォルト値は、サービス内の呼び出し元アカウントのカタログ ID になります。

glueContext.transition_table("database", "table", "STANDARD_IA", {"retentionPeriod": 1, "excludeStorageClasses": ["STANDARD_IA"], "manifestFilePath": "s3://bucketmanifest/", "accountId": "12345678901", "roleArn": "arn:aws:iam::123456789012:user/example-username"})

transition_s3_path

transition_s3_path(s3_path, transition_to, options={}, transformation_ctx="")

指定された Simple Storage Service (Amazon S3) パス内のファイルのストレージクラスを再帰的に移行します。

任意の 2 つのストレージクラス間で移行できます。GLACIERDEEP_ARCHIVE のストレージクラスでは、これらのクラスに移行できます。ただし、GLACIERDEEP_ARCHIVE のストレージクラスからの移行には S3 RESTORE を使用します。

Amazon S3 からファイルまたはパーティションを読み取る AWS Glue ETL ジョブを実行している場合は、一部の Amazon S3 ストレージクラスタイプを除外できます。詳細については、「Excluding Amazon S3 Storage Classes」を参照してください。

  • s3_path – 移行するファイルの Simple Storage Service (Amazon S3) のパス (s3://<bucket>/<prefix>/ 形式)

  • transition_to – 移行する先の Amazon S3 ストレージクラス

  • options – 削除するファイルのフィルタリングと、マニフェストファイルの生成のためオプション。

    • retentionPeriod – ファイルを保持する期間を時間単位で指定します。保存期間より新しいファイルは保持されます。デフォルトでは 168 時間(7 日)に設定されています。

    • partitionPredicate – この述語を満たすパーティションは移行されます。これらのパーティションの保存期間内のファイルは移行されません。"" を設定 – デフォルトでは空です。

    • excludeStorageClassesexcludeStorageClasses セット内のストレージクラスを持つファイルは移行されません。デフォルトは Set() – 空のセットです。

    • manifestFilePath – マニフェストファイルを生成するためのオプションのパス。正常に移行されたすべてのファイルが Success.csv に記録され、失敗したファイルは Failed.csv に記録されます。

    • accountId – 移行変換を実行する Amazon Web Services アカウント ID。この変換には必須です。

    • roleArn – 移行変換を実行する AWS ロール。この変換には必須です。

  • transformation_ctx - 使用する変換コンテキスト (オプション)。マニフェストファイルパスで使用されます。

glueContext.transition_s3_path("s3://bucket/prefix/", "STANDARD_IA", {"retentionPeriod": 1, "excludeStorageClasses": ["STANDARD_IA"], "manifestFilePath": "s3://bucketmanifest/", "accountId": "12345678901", "roleArn": "arn:aws:iam::123456789012:user/example-username"})

抽出

extract_jdbc_conf

extract_jdbc_conf(connection_name, catalog_id = None)

データカタログの AWS Glue 接続オブジェクトから、設定プロパティを使用するキーを持つ dict を返します。

  • user - データベースのユーザー名です。

  • password - データベースのパスワードです。

  • vendor - ベンダーを指定します (mysqlpostgresqloraclesqlserver など)。

  • enforceSSL - 安全な接続が必要かどうかを示すブール文字列です。

  • customJDBCCert - 提示された Amazon S3 パスからの特定のクライアント証明書を使用します。

  • skipCustomJDBCCertValidation - customJDBCCert が CA によって検証される必要があるかどうかを示すブール文字列です。

  • customJDBCCertString - ドライバーの種類に固有のカスタム証明書に関する追加情報です。

  • url - (廃止) プロトコル、サーバー、ポートのみを含む JDBC URL です。

  • fullUrl - 接続の作成時に入力された JDBC URL です (AWS Glue バージョン 3.0 以降で使用可能)。

JDBC 設定の取得例:

jdbc_conf = glueContext.extract_jdbc_conf(connection_name="your_glue_connection_name") print(jdbc_conf) >>> {'enforceSSL': 'false', 'skipCustomJDBCCertValidation': 'false', 'url': 'jdbc:mysql://myserver:3306', 'fullUrl': 'jdbc:mysql://myserver:3306/mydb', 'customJDBCCertString': '', 'user': 'admin', 'customJDBCCert': '', 'password': '1234', 'vendor': 'mysql'}

トランザクション

start_transaction

start_transaction(read_only)

新しいトランザクションの開始。Lake Formation startTransaction API を内部的に呼び出します。

  • read_only — (Boolean) このトランザクションを読み取り専用にするか、または読み取りおよび書き込みを行うかを示します。読み取り専用のトランザクション ID を使用した書き込みは拒否されます。読み取り専用トランザクションはコミットする必要はありません。

トランザクション ID を返します。

commit_transaction

commit_transaction(transaction_id, wait_for_commit = True)

指定されたトランザクションをコミットしようとします。commit_transaction でトランザクションのコミットが完了する前に戻ることがあります。Lake Formation startTransaction API を内部的に呼び出します。

  • transaction_id — (文字列) コミットするトランザクション。

  • wait_for_commit — (ブール値) commit_transaction がすぐに戻るかどうか指定します。デフォルト値は True です。false の場合、commit_transaction はトランザクションがコミットされるまでポーリングし待機します。最大で 6 回の再試行でエクスポネンシャルバックオフを使用すると、待機時間は 1 分に制限されます。

コミットが完了したかどうかを示すブール値を返します。

cancel_transaction

cancel_transaction(transaction_id)

指定されたトランザクションをキャンセルしようとします。戻り値は、トランザクションが以前にコミットされた場合は TransactionCommittedException 例外です。Lake Formation startTransaction API を内部的に呼び出します。

  • transaction_id — (文字列) キャンセルするトランザクション。

書き込み

getSink

getSink(connection_type, format = None, transformation_ctx = "", **options)

外部ソースに DynamicFrames を書き込むために使用できる DataSink オブジェクトを取得します。期待しているシンクを確実に取得するために、SparkSQL format を最初に確認します。

  • connection_type – 使用する接続タイプ (Simple Storage Service (Amazon S3)、Amazon Redshift、JDBC など)。有効な値には、s3mysqlpostgresqlredshiftsqlserver、および oracle があります。

  • format – 使用する SparkSQL 形式 (オプション)。

  • transformation_ctx - 使用する変換コンテキスト (オプション)。

  • options — 接続オプションの指定に使用される名前と値のペアの集合。指定できる値は以下のとおりです。

    • userpassword: 認可用

    • url: データストアのエンドポイント

    • dbtable: ターゲットテーブルの名前。

    • bulkSize: 挿入操作の並列度の度合い

指定できるオプションは、接続タイプによって異なります。追加の値と例については、「AWS Glue for Spark での ETL の接続タイプとオプション」を参照してください。

例:

>>> data_sink = context.getSink("s3") >>> data_sink.setFormat("json"), >>> data_sink.writeFrame(myFrame)

write_dynamic_frame_from_options

write_dynamic_frame_from_options(frame, connection_type, connection_options={}, format=None, format_options={}, transformation_ctx = "")

指定された接続と形式を使用して DynamicFrame を書き込み、返します。

  • frame - 書き込む DynamicFrame

  • connection_type – 接続タイプ (Amazon S3、Amazon Redshift、JDBC など)。有効な値には、s3mysqlpostgresqlredshiftsqlserver、および oracle があります。

  • connection_options - 接続オプション (パスやデータベーステーブルなど) (オプション)。s3connection_type では、Amazon S3 パスが定義されています。

    connection_options = {"path": "s3://aws-glue-target/temp"}

    JDBC 接続の場合、いくつかのプロパティを定義する必要があります。データベース名は URL の一部である必要があることに注意してください。オプションで接続オプションに含めることができます。

    警告

    スクリプトにパスワードを保存することはお勧めしません。AWS Secrets Manager または AWS Glue データカタログから取得する場合には、boto3 を使用することを検討してください。

    connection_options = {"url": "jdbc-url/database", "user": "username", "password": passwordVariable,"dbtable": "table-name", "redshiftTmpDir": "s3-tempdir-path"}

    dbtable プロパティは JDBC テーブルの名前です。データベース内でスキーマをサポートする JDBC データストアの場合、schema.table-name を指定します。スキーマを指定しない場合、デフォルトの「パブリック」スキーマが使用されます。

    詳しくは、「AWS Glue for Spark での ETL の接続タイプとオプション」を参照してください。

  • format – 形式の仕様 (オプション)。Amazon S3 や、複数のフォーマットをサポートする AWS Glue 接続の場合に使用します。サポートされる形式については、「AWS Glue for Spark での入出力のデータ形式に関するオプション」を参照してください。

  • format_options – 指定された形式についてのオプション。サポートされる形式については、「AWS Glue for Spark での入出力のデータ形式に関するオプション」を参照してください。

  • transformation_ctx - 使用する変換コンテキスト (オプション)。

write_from_options

write_from_options(frame_or_dfc, connection_type, connection_options={}, format={}, format_options={}, transformation_ctx = "")

指定された接続および形式情報で作成された DynamicFrame または DynamicFrameCollection を書き込み、返します。

  • frame_or_dfc - 書き込む DynamicFrame または DynamicFrameCollection

  • connection_type – 接続タイプ (Amazon S3、Amazon Redshift、JDBC など)。有効な値には、s3mysqlpostgresqlredshiftsqlserver、および oracle があります。

  • connection_options - 接続オプション (パスやデータベーステーブルなど) (オプション)。s3connection_type では、Amazon S3 パスが定義されています。

    connection_options = {"path": "s3://aws-glue-target/temp"}

    JDBC 接続の場合、いくつかのプロパティを定義する必要があります。データベース名は URL の一部である必要があることに注意してください。オプションで接続オプションに含めることができます。

    警告

    スクリプトにパスワードを保存することはお勧めしません。AWS Secrets Manager または AWS Glue データカタログから取得する場合には、boto3 を使用することを検討してください。

    connection_options = {"url": "jdbc-url/database", "user": "username", "password": passwordVariable,"dbtable": "table-name", "redshiftTmpDir": "s3-tempdir-path"}

    dbtable プロパティは JDBC テーブルの名前です。データベース内でスキーマをサポートする JDBC データストアの場合、schema.table-name を指定します。スキーマを指定しない場合、デフォルトの「パブリック」スキーマが使用されます。

    詳しくは、「AWS Glue for Spark での ETL の接続タイプとオプション」を参照してください。

  • format – 形式の仕様 (オプション)。Amazon S3 や、複数のフォーマットをサポートする AWS Glue 接続の場合に使用します。サポートされる形式については、「AWS Glue for Spark での入出力のデータ形式に関するオプション」を参照してください。

  • format_options – 指定された形式についてのオプション。サポートされる形式については、「AWS Glue for Spark での入出力のデータ形式に関するオプション」を参照してください。

  • transformation_ctx - 使用する変換コンテキスト (オプション)。

write_dynamic_frame_from_catalog

write_dynamic_frame_from_catalog(frame, database, table_name, redshift_tmp_dir, transformation_ctx = "", additional_options = {}, catalog_id = None)

データカタログデータベースとテーブルからの情報を使用して、記述した DynamicFrame を返します。

  • frame - 書き込む DynamicFrame

  • Database – テーブルを含むデータカタログデータベース。

  • table_name – ターゲットに関連付けられたデータカタログテーブルの名前。

  • redshift_tmp_dir - 使用する Amazon Redshift の一時ディレクトリ (オプション)。

  • transformation_ctx - 使用する変換コンテキスト (オプション)。

  • additional_options - オプションの名前と値のペアのコレクション。

  • catalog_id – 現在アクセスされているデータカタログのカタログ ID (アカウント ID)。None の場合は、呼び出し元のアカウント ID のデフォルトが使用されます。

write_data_frame_from_catalog

write_data_frame_from_catalog(frame, database, table_name, redshift_tmp_dir, transformation_ctx = "", additional_options = {}, catalog_id = None)

データカタログデータベースとテーブルからの情報を使用して、記述した DataFrame を返します。このメソッドは、データレイク形式 (Hudi、Iceberg、および Delta Lake) への書き込みをサポートします。詳細については、「AWS Glue ETL ジョブでのデータレイクフレームワークの使用」を参照してください。

  • frame - 書き込む DataFrame

  • Database – テーブルを含むデータカタログデータベース。

  • table_name – ターゲットに関連付けられたデータカタログテーブルの名前。

  • redshift_tmp_dir - 使用する Amazon Redshift の一時ディレクトリ (オプション)。

  • transformation_ctx - 使用する変換コンテキスト (オプション)。

  • additional_options - オプションの名前と値のペアのコレクション。

    • useSparkDataSink – true に設定すると、AWS Glue はネイティブ Spark データシンク API を使用してテーブルに書き込むようになります。このオプションを有効にすると、必要に応じて任意の Spark データソースオプションadditional_options に追加できます。AWSGlue はこれらのオプションを Spark ライターに直接渡します。

  • catalog_id – 現在アクセスされているデータカタログのカタログ ID (アカウント ID)。値を指定しない場合、呼び出し元のアカウント ID が使用されます。

機能制限

useSparkDataSink オプションを使用する際には、次の制限事項を考慮してください。

  • useSparkDataSink オプションを使用する場合、enableUpdateCatalog オプションはサポートされません。

例: Spark データソースライターを使用して Hudi テーブルに書き込む

hudi_options = { 'useSparkDataSink': True, 'hoodie.table.name': <table_name>, 'hoodie.datasource.write.storage.type': 'COPY_ON_WRITE', 'hoodie.datasource.write.recordkey.field': 'product_id', 'hoodie.datasource.write.table.name': <table_name>, 'hoodie.datasource.write.operation': 'upsert', 'hoodie.datasource.write.precombine.field': 'updated_at', 'hoodie.datasource.write.hive_style_partitioning': 'true', 'hoodie.upsert.shuffle.parallelism': 2, 'hoodie.insert.shuffle.parallelism': 2, 'hoodie.datasource.hive_sync.enable': 'true', 'hoodie.datasource.hive_sync.database': <database_name>, 'hoodie.datasource.hive_sync.table': <table_name>, 'hoodie.datasource.hive_sync.use_jdbc': 'false', 'hoodie.datasource.hive_sync.mode': 'hms'} glueContext.write_data_frame.from_catalog( frame = <df_product_inserts>, database = <database_name>, table_name = <table_name>, additional_options = hudi_options )

write_dynamic_frame_from_jdbc_conf

write_dynamic_frame_from_jdbc_conf(frame, catalog_connection, connection_options={}, redshift_tmp_dir = "", transformation_ctx = "", catalog_id = None)

指定された JDBC 接続情報を使用して DynamicFrame を書き込み、返します。

  • frame - 書き込む DynamicFrame

  • catalog_connection - 使用するカタログ接続。

  • connection_options - 接続オプション (パスやデータベーステーブルなど) (オプション)。詳しくは、「AWS Glue for Spark での ETL の接続タイプとオプション」を参照してください。

  • redshift_tmp_dir - 使用する Amazon Redshift の一時ディレクトリ (オプション)。

  • transformation_ctx - 使用する変換コンテキスト (オプション)。

  • catalog_id – 現在アクセスされているデータカタログのカタログ ID (アカウント ID)。None の場合は、呼び出し元のアカウント ID のデフォルトが使用されます。

write_from_jdbc_conf

write_from_jdbc_conf(frame_or_dfc, catalog_connection, connection_options={}, redshift_tmp_dir = "", transformation_ctx = "", catalog_id = None)

指定された JDBC 接続情報を使用して DynamicFrame または DynamicFrameCollection を書き込み、返します。

  • frame_or_dfc - 書き込む DynamicFrame または DynamicFrameCollection

  • catalog_connection - 使用するカタログ接続。

  • connection_options - 接続オプション (パスやデータベーステーブルなど) (オプション)。詳しくは、「AWS Glue for Spark での ETL の接続タイプとオプション」を参照してください。

  • redshift_tmp_dir - 使用する Amazon Redshift の一時ディレクトリ (オプション)。

  • transformation_ctx - 使用する変換コンテキスト (オプション)。

  • catalog_id – 現在アクセスされているデータカタログのカタログ ID (アカウント ID)。None の場合は、呼び出し元のアカウント ID のデフォルトが使用されます。