AWS Glue ETL ジョブからの Data Catalog でのテーブルの作成、スキーマの更新、および新規パーティションの追加 - AWS Glue

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

AWS Glue ETL ジョブからの Data Catalog でのテーブルの作成、スキーマの更新、および新規パーティションの追加

抽出、変換、ロード (ETL) ジョブによって、ターゲットデータストアに新しいテーブルパーティションが作成される場合があります。データセットスキーマは、時間の経過とともに AWS Glue Data Catalog スキーマから進化し、拡散する可能性があります。AWS GlueETL ジョブには、ETL スクリプト内で使用できるいくつかの機能が用意され、Data Catalog でスキーマおよびパーティションを更新できるようになりました。これらの機能を使用すると、クローラを再実行することなく、Data Catalog での ETL 作業の結果を確認できます。

新しいパーティション

AWS Glue Data Catalog で新しいパーティションを表示するには、次のいずれかを実行します。

  • ジョブが終了したら、クローラを再実行し、クローラの完了時にコンソールで新しいパーティションを表示します。

  • ジョブが終了すると、クローラを再実行することなく、コンソールで新しいパーティションがすぐに表示されます。この機能は、次の例に示すように、ETL スクリプトに数行のコードを追加して有効にすることができます。このコードは enableUpdateCatalog 引数を使用して、新しいパーティションの作成時のジョブ実行中に Data Catalog を更新する必要があることを示します。

方式 1

オプション引数に enableUpdateCatalogpartitionKeys を渡します。

Python
additionalOptions = {"enableUpdateCatalog": True} additionalOptions["partitionKeys"] = ["region", "year", "month", "day"] sink = glueContext.write_dynamic_frame_from_catalog(frame=last_transform, database=<target_db_name>, table_name=<target_table_name>, transformation_ctx="write_sink", additional_options=additionalOptions)
Scala
val options = JsonOptions(Map( "path" -> <S3_output_path>, "partitionKeys" -> Seq("region", "year", "month", "day"), "enableUpdateCatalog" -> true)) val sink = glueContext.getCatalogSink( database = <target_db_name>, tableName = <target_table_name>, additionalOptions = options)sink.writeDynamicFrame(df)
方式 2

enableUpdateCatalogpartitionKeysgetSink() を渡して、DataSink オブジェクトで setCatalogInfo() を呼び出します。

Python
sink = glueContext.getSink( connection_type="s3", path="<S3_output_path>", enableUpdateCatalog=True, partitionKeys=["region", "year", "month", "day"]) sink.setFormat("json") sink.setCatalogInfo(catalogDatabase=<target_db_name>, catalogTableName=<target_table_name>) sink.writeFrame(last_transform)
Scala
val options = JsonOptions( Map("path" -> <S3_output_path>, "partitionKeys" -> Seq("region", "year", "month", "day"), "enableUpdateCatalog" -> true)) val sink = glueContext.getSink("s3", options).withFormat("json") sink.setCatalogInfo(<target_db_name>, <target_table_name>) sink.writeDynamicFrame(df)

クローラを再実行することなく、AWS Glue ETLジョブ自体を使用して、Data Catalog での新しいカタログテーブルの作成、変更されたスキーマによる既存のテーブルの更新、および新しいテーブルパーティションの追加が可能になりました。

テーブルスキーマの更新

Data Catalog テーブルのスキーマを上書きする場合は、次のいずれかを実行します。

  • ジョブが終了したら、クローラを再実行し、テーブル定義も更新するようにクローラが設定されていることを確認します。クローラが終了したら、新しいパーティションをスキーマの更新とともにコンソールに表示します。詳細については、「API を使用したクローラの設定」を参照してください。

  • ジョブが終了すると、クローラを再実行することなく、コンソールで変更済みスキーマがすぐに表示されます。この機能は、次の例に示すように、ETL スクリプトに数行のコードを追加して有効にすることができます。このコードでは enableUpdateCatalog を true に設定し、updateBehaviorUPDATE_IN_DATABASE に設定しています。これは、ジョブ実行中にスキーマを上書きし、Data Catalog に新しいパーティションを追加することを示します。

Python
additionalOptions = { "enableUpdateCatalog": True, "updateBehavior": "UPDATE_IN_DATABASE"} additionalOptions["partitionKeys"] = ["partition_key0", "partition_key1"] sink = glueContext.write_dynamic_frame_from_catalog(frame=last_transform, database=<dst_db_name>, table_name=<dst_tbl_name>, transformation_ctx="write_sink", additional_options=additionalOptions) job.commit()
Scala
val options = JsonOptions(Map( "path" -> outputPath, "partitionKeys" -> Seq("partition_0", "partition_1"), "enableUpdateCatalog" -> true)) val sink = glueContext.getCatalogSink(database = nameSpace, tableName = tableName, additionalOptions = options) sink.writeDynamicFrame(df)

テーブルスキーマが上書きされないようにして、新しいパーティションを追加する場合は、updateBehavior 値を LOG に設定することもできます。updateBehavior のデフォルト値は UPDATE_IN_DATABASE です。そのため、明示的に定義しない場合、テーブルスキーマは上書きされます。

enableUpdateCatalog が true に設定されていない場合、updateBehavior で選択したオプションに関係なく、ETL ジョブは Data Catalog 内のテーブルを更新しません。

新しいテーブルの作成

同じオプションを使用して、Data Catalog で新しいテーブルを作成することもできます。setCatalogInfo を使用して、データベースと新しいテーブル名を指定できます。

Python
sink = glueContext.getSink(connection_type="s3", path="s3://path/to/data", enableUpdateCatalog=True, updateBehavior="UPDATE_IN_DATABASE", partitionKeys=["partition_key0", "partition_key1"]) sink.setFormat("<format>") sink.setCatalogInfo(catalogDatabase=<dst_db_name>, catalogTableName=<dst_tbl_name>) sink.writeFrame(last_transform)
Scala
val options = JsonOptions(Map( "path" -> outputPath, "partitionKeys" -> Seq("<partition_1>", "<partition_2>"), "enableUpdateCatalog" -> true, "updateBehavior" -> "UPDATE_IN_DATABASE")) val sink = glueContext.getSink(connectionType = "s3", connectionOptions = options).withFormat("<format>") sink.setCatalogInfo(catalogDatabase = “<dst_db_name>”, catalogTableName = “<dst_tbl_name>”) sink.writeDynamicFrame(df)

制限事項

次の制限事項に注意してください。

  • Amazon Simple Storage Service (Amazon S3) のターゲットのみがサポートされています。

  • enableUpdateCatalog 機能は、管理対象テーブルでは、サポートされていません。

  • jsoncsvavro、および parquet の形式のみがサポートされます。

  • parquet 分類でテーブルを作成または更新するには、AWS Glue に最適化された DynamicFrames 用 parquet ライターを使用する必要があります。これは、次のいずれかの方法で実現できます。

    • parquet 分類を使用してカタログ内の既存のテーブルを更新する場合は、更新前にテーブルでは "useGlueParquetWriter" テーブルプロパティを true に設定する必要があります。このプロパティは、AWS Glue API/SDK、コンソール、または Athena DDL ステートメントから設定できます。

      
                            AWS Glue コンソールのカタログテーブルプロパティの編集フィールド。

      カタログテーブルプロパティを設定したら、次のコードスニペットを使用して新しいデータでカタログテーブルを更新します。

      glueContext.write_dynamic_frame.from_catalog( frame=frameToWrite, database="dbName", table_name="tableName", additional_options={ "enableUpdateCatalog": True, "updateBehavior": "UPDATE_IN_DATABASE" } )
    • テーブルがカタログ内にまだ存在しない場合は、connection_type="s3" を用いた getSink() メソッドをスクリプトで使用して、データを Amazon S3 に書き込むとともに、テーブルとそのパーティションをカタログに追加します。ワークフローに適切な partitionKeyscompression を指定します。

      s3sink = glueContext.getSink( path="s3://bucket/folder/", connection_type="s3", updateBehavior="UPDATE_IN_DATABASE", partitionKeys=[], compression="snappy", enableUpdateCatalog=True ) s3sink.setCatalogInfo( catalogDatabase="dbName", catalogTableName="tableName" ) s3sink.setFormat("parquet", useGlueParquetWriter=true) s3sink.writeFrame(frameToWrite)
    • glueparquet フォーマットの値は、AWS Glue parquet ライターを有効にする従来のメソッドです。

  • updateBehaviorLOG に設定した場合、DynamicFrame スキーマが Data Catalog テーブルのスキーマに定義されている列のサブセットと同等であるか、またはそのサブセットを含んでいる場合にのみ、新しいパーティションが追加されます。

  • スキーマの更新は、パーティション化されていないテーブル (「partitionKeys」オプションを使用していない) ではサポートされていません。

  • partitionKeys は、ETL スクリプトで渡されるパラメータと Data Catalog テーブルスキーマの partitionKeys で同等で、同じ順序でなければなりません。

  • この機能は、現在、更新スキーマがネストされているテーブルの更新/作成をサポートしていません (例えば、構造体の内部の配列)。

詳細については、Spark スクリプトのプログラミング を参照してください。