テーブルの作成、スキーマの更新およびからのデータカタログへの新規パーティションの追加AWS GlueETLジョブズ - AWS接着語

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

テーブルの作成、スキーマの更新およびからのデータカタログへの新規パーティションの追加AWS GlueETLジョブズ

抽出、変換、およびロード (ETL) ジョブによって、ターゲットデータストアに新しいテーブルパーティションが作成される場合があります。データセットスキーマは、AWS Glue経時的なデータカタログスキーマAWS Glue ETL ジョブには、ETL スクリプト内で使用できるいくつかの機能が提供され、データカタログ内のスキーマおよびパーティションを更新できるようになりました。これらの機能を使用すると、クローラを再実行することなく、データカタログでのETL作業の結果を確認できます。

新しいパーティション

AWS Glue Data Catalog で新しいパーティションを表示するには、次のいずれかを実行します。

  • ジョブが終了したら、クローラを再実行し、クローラの完了時にコンソールで新しいパーティションを表示します。

  • ジョブが終了すると、クローラを再実行することなく、コンソールで新しいパーティションがすぐに表示されます。この機能は、次の例に示すように、ETL スクリプトに数行のコードを追加して有効にすることができます。このコードでは、enableUpdateCatalog引数を使用して、新しいパーティションの作成時のジョブ実行中にデータカタログを更新することを示します。

方法 1

オプション引数に enableUpdateCatalogpartitionKeys を渡します。

Python
additionalOptions = {"enableUpdateCatalog": True} additionalOptions["partitionKeys"] = ["region", "year", "month", "day"] sink = glueContext.write_dynamic_frame_from_catalog(frame=last_transform, database=<target_db_name>, table_name=<target_table_name>, transformation_ctx="write_sink", additional_options=additionalOptions)
Scala
val options = JsonOptions(Map( "path" -> <S3_output_path>, "partitionKeys" -> Seq("region", "year", "month", "day"), "enableUpdateCatalog" -> true)) val sink = glueContext.getCatalogSink( database = <target_db_name>, tableName = <target_table_name>, additionalOptions = options)sink.writeDynamicFrame(df)
方法 2

getSink()enableUpdateCatalogpartitionKeys を渡して、DataSink オブジェクトで setCatalogInfo() を呼び出します。

Python
sink = glueContext.getSink( connection_type="s3", path="<S3_output_path>", enableUpdateCatalog=True, partitionKeys=["region", "year", "month", "day"] sink.setFormat("json") sink.setCatalogInfo(catalogDatabase=<target_db_name>, catalogTableName=<target_table_name>) sink.writeFrame(last_transform)
Scala
val options = JsonOptions( Map("path" -> <S3_output_path>, "partitionKeys" -> Seq("region", "year", "month", "day"), "enableUpdateCatalog" -> true)) val sink = glueContext.getSink("s3", options).withFormat("json") sink.setCatalogInfo(<target_db_name>, <target_table_name>) sink.writeDynamicFrame(df)

新しいカタログテーブルの作成、変更済みスキーマによる既存のテーブルの更新、およびデータカタログへの新しいテーブルパーティションの追加が可能になりました。AWS Glueクローラを再実行することなく、ETLジョブ自体。

テーブルスキーマの更新

Data Catalog テーブルのスキーマを上書きする場合は、次のいずれかを実行できます。

  • ジョブが終了したら、クローラを再実行し、テーブル定義も更新するようにクローラが設定されていることを確認します。クローラが終了したら、新しいパーティションをスキーマの更新とともにコンソールに表示します。詳細については、「API を使用したクローラの設定」を参照してください。

  • ジョブが終了すると、クローラを再実行することなく、コンソールで変更済みスキーマがすぐに表示されます。この機能は、次の例に示すように、ETL スクリプトに数行のコードを追加して有効にすることができます。このコードはenableUpdateCatalogがtrueに設定され、updateBehaviorがに設定されている場合UPDATE_IN_DATABASEこれは、ジョブ実行中にスキーマを上書きし、データカタログに新しいパーティションを追加することを示します。

Python
additionalOptions = { "enableUpdateCatalog": True, "updateBehavior": "UPDATE_IN_DATABASE"} additionalOptions["partitionKeys"] = ["partition_key0", "partition_key1"] sink = glueContext.write_dynamic_frame_from_catalog(frame=last_transform, database=<dst_db_name>, table_name=<dst_tbl_name>, transformation_ctx="write_sink", additional_options=additionalOptions) job.commit()
Scala
val options = JsonOptions(Map( "path" -> outputPath, "partitionKeys" -> Seq("partition_0", "partition_1"), "enableUpdateCatalog" -> true)) val sink = glueContext.getCatalogSink(database = nameSpace, tableName = tableName, additionalOptions = options) sink.writeDynamicFrame(df)

テーブルスキーマが上書きされないようにして、新しいパーティションを追加する場合は、updateBehavior 値を LOG に設定することもできます。updateBehavior のデフォルト値は UPDATE_IN_DATABASE です。そのため、明示的に定義しない場合、テーブルスキーマは上書きされます。

もしenableUpdateCatalogが true に設定されていない場合、で選択したオプションに関係なくupdateBehaviorETL ジョブは、データカタログ内のテーブルを更新しません。

新しいテーブルの作成

同じオプションを使用して、データカタログに新しいテーブルを作成することもできます。setCatalogInfo を使用して、データベースと新しいテーブル名を指定できます。

Python
sink = glueContext.getSink(connection_type="s3", path="s3://path/to/data", enableUpdateCatalog=True, updateBehavior="UPDATE_IN_DATABASE", partitionKeys=["partition_key0", "partition_key1"]) sink.setFormat("<format>") sink.setCatalogInfo(catalogDatabase=<dst_db_name>, catalogTableName=<dst_tbl_name>) sink.writeFrame(last_transform)
Scala
val options = JsonOptions(Map( "path" -> outputPath, "partitionKeys" -> Seq("<partition_1>", "<partition_2>"), "enableUpdateCatalog" -> true, "updateBehavior" -> "UPDATE_IN_DATABASE")) val sink = glueContext.getSink(connectionType = "s3", connectionOptions = options).withFormat("<format>") sink.setCatalogInfo(catalogDatabase = “<dst_db_name>”, catalogTableName = “<dst_tbl_name>”) sink.writeDynamicFrame(df)

Restrictions

次の制限事項に注意してください。

  • Amazon Simple Storage Service (Amazon S3) のターゲットのみがサポートされます。

  • jsoncsvavro、および parquet の形式のみがサポートされます。

  • テーブルを作成または更新するにはparquet分類を使用するには、AWS GlueDynamicFramesのための最適化された寄木細工のライター。これは、次の 3 つの方法のいずれかで実現できます。

    • 電話write_dynamic_frame_from_catalog()を設定してから、useGlueParquetWriterテーブルプロパティを true に設定します。

    • 電話getSink()あなたのスクリプトでconnection_type=」s3「を選択し、フォーマットをglueparquet

    • 電話getSink()あなたのスクリプトでconnection_type=」s3「を選択し、フォーマットをparquetを渡し、useGlueParquetWriterプロパティにtrueとしてformat_options、これは特に新しい寄木細工のテーブルを作成するのに便利です。

  • がに設定されている場合updateBehaviorは、 に設定されます。LOGに設定されている場合、新しいパーティションはDynamicFrameスキーマは、Data Catalog テーブルのスキーマに定義されている列のサブセットと同等であるか、またはそのサブセットが含まれています。

  • partitionKeys は、ETL スクリプトで渡されるパラメータとデータカタログテーブルスキーマの partitionKeys で同等で、同じ順序でなければなりません。

  • この機能は、現在、更新スキーマがネストされているテーブルの更新/作成をサポートしていません (たとえば、構造体の内部の配列)。

詳細については、「ETL スクリプトのプログラミング」を参照してください