Memperbarui skema, dan menambahkan partisi baru di Katalog Data menggunakan AWS Glue pekerjaan ETL - AWS Glue

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Memperbarui skema, dan menambahkan partisi baru di Katalog Data menggunakan AWS Glue pekerjaan ETL

Tugas extract, transform, and load (ETL) Anda mungkin membuat partisi tabel baru di penyimpanan data target. Skema set data Anda dapat berevolusi dan menyimpang dari skema Katalog Data AWS Glue dari waktu ke waktu. AWS Glue Tugas ETL sekarang menyediakan beberapa fitur yang dapat Anda gunakan dalam skrip ETL Anda untuk memperbarui skema dan partisi dalam Katalog Data. Fitur ini memungkinkan Anda untuk melihat hasil tugas ETL Anda di Katalog Data, tanpa harus menjalankan kembali crawler.

Partisi baru

Jika Anda ingin melihat partisi baru di AWS Glue Data Catalog, Anda dapat melakukan salah satu hal berikut:

  • Setelah tugas selesai, jalankan kembali crawler, dan lihat partisi baru di konsol tersebut saat crawler selesai.

  • Setelah tugas selesai, segera lihat partisi baru di konsol tersebut, tanpa harus menjalankan ulang crawler. Anda dapat mengaktifkan fitur ini dengan menambahkan beberapa baris kode pada skrip ETL Anda, seperti yang ditunjukkan dalam contoh berikut. Kode menggunakan argumen enableUpdateCatalog untuk menunjukkan bahwa Katalog Data akan diperbarui selama eksekusi tugas saat partisi baru dibuat.

Metode 1

Berikan enableUpdateCatalog dan partitionKeys dalam sebuah argumen pilihan.

Python
additionalOptions = {"enableUpdateCatalog": True} additionalOptions["partitionKeys"] = ["region", "year", "month", "day"] sink = glueContext.write_dynamic_frame_from_catalog(frame=last_transform, database=<target_db_name>, table_name=<target_table_name>, transformation_ctx="write_sink", additional_options=additionalOptions)
Scala
val options = JsonOptions(Map( "path" -> <S3_output_path>, "partitionKeys" -> Seq("region", "year", "month", "day"), "enableUpdateCatalog" -> true)) val sink = glueContext.getCatalogSink( database = <target_db_name>, tableName = <target_table_name>, additionalOptions = options)sink.writeDynamicFrame(df)
Metode 2

Berikan enableUpdateCatalog dan partitionKeys dalam getSink(), dan panggil setCatalogInfo() di objek DataSink.

Python
sink = glueContext.getSink( connection_type="s3", path="<S3_output_path>", enableUpdateCatalog=True, partitionKeys=["region", "year", "month", "day"]) sink.setFormat("json") sink.setCatalogInfo(catalogDatabase=<target_db_name>, catalogTableName=<target_table_name>) sink.writeFrame(last_transform)
Scala
val options = JsonOptions( Map("path" -> <S3_output_path>, "partitionKeys" -> Seq("region", "year", "month", "day"), "enableUpdateCatalog" -> true)) val sink = glueContext.getSink("s3", options).withFormat("json") sink.setCatalogInfo(<target_db_name>, <target_table_name>) sink.writeDynamicFrame(df)

Sekarang, Anda dapat membuat tabel katalog baru, memperbarui tabel yang ada dengan skema yang sudah dimodifikasi, dan menambahkan partisi tabel baru dalam Katalog Data dengan menggunakan tugas ETL AWS Glue itu sendiri, tanpa perlu kembali menjalankan crawler.

Memperbarui skema tabel

Jika Anda ingin menimpa skema tabel Katalog Data Anda, Anda dapat melakukan salah satu hal berikut:

  • Setelah tugas selesai, jalankan kembali crawler dan pastikan crawler dikonfigurasi untuk memperbarui definisi tabel juga. Lihat partisi baru di konsol tersebut beserta pembaruan skema apa pun, saat crawler selesai. Untuk informasi selengkapnya, lihat Mengkonfigurasi Crawler Menggunakan API.

  • Setelah tugas selesai, segera lihat skema yang sudah dimodifikasi di konsol tersebut, tanpa harus menjalankan ulang crawler. Anda dapat mengaktifkan fitur ini dengan menambahkan beberapa baris kode pada skrip ETL Anda, seperti yang ditunjukkan dalam contoh berikut. Kode menggunakan enableUpdateCatalog yang diatur ke BETUL, dan juga updateBehavior yang diatur ke UPDATE_IN_DATABASE, yang menunjukkan untuk menimpa skema dan menambahkan partisi baru dalam Katalog Data selama eksekusi tugas.

Python
additionalOptions = { "enableUpdateCatalog": True, "updateBehavior": "UPDATE_IN_DATABASE"} additionalOptions["partitionKeys"] = ["partition_key0", "partition_key1"] sink = glueContext.write_dynamic_frame_from_catalog(frame=last_transform, database=<dst_db_name>, table_name=<dst_tbl_name>, transformation_ctx="write_sink", additional_options=additionalOptions) job.commit()
Scala
val options = JsonOptions(Map( "path" -> outputPath, "partitionKeys" -> Seq("partition_0", "partition_1"), "enableUpdateCatalog" -> true)) val sink = glueContext.getCatalogSink(database = nameSpace, tableName = tableName, additionalOptions = options) sink.writeDynamicFrame(df)

Anda juga dapat mengatur nilai updateBehavior ke LOG jika Anda ingin mencegah skema tabel Anda agar tidak ditimpa, tapi masih ingin menambahkan partisi baru. Nilai default dari updateBehavior adalah UPDATE_IN_DATABASE, jadi jika Anda tidak secara eksplisit mendefinisikannya, maka skema tabel akan ditimpa.

Jika enableUpdateCatalog tidak diatur ke BETUL, terlepas dari mana pilihan yang dipilih untuk updateBehavior, tugas ETL tidak akan memperbarui tabel di Katalog Data.

Membuat tabel baru

Anda juga dapat menggunakan opsi yang sama untuk membuat sebuah tabel baru di Katalog Data. Anda dapat menentukan basis data dan nama tabel baru dengan menggunakan setCatalogInfo.

Python
sink = glueContext.getSink(connection_type="s3", path="s3://path/to/data", enableUpdateCatalog=True, updateBehavior="UPDATE_IN_DATABASE", partitionKeys=["partition_key0", "partition_key1"]) sink.setFormat("<format>") sink.setCatalogInfo(catalogDatabase=<dst_db_name>, catalogTableName=<dst_tbl_name>) sink.writeFrame(last_transform)
Scala
val options = JsonOptions(Map( "path" -> outputPath, "partitionKeys" -> Seq("<partition_1>", "<partition_2>"), "enableUpdateCatalog" -> true, "updateBehavior" -> "UPDATE_IN_DATABASE")) val sink = glueContext.getSink(connectionType = "s3", connectionOptions = options).withFormat("<format>") sink.setCatalogInfo(catalogDatabase = “<dst_db_name>”, catalogTableName = “<dst_tbl_name>”) sink.writeDynamicFrame(df)

Pembatasan

Perhatikan pembatasan-pembatasan berikut ini:

  • Hanya target Amazon Simple Storage Service (Amazon S3) saja yang didukung.

  • enableUpdateCatalogFitur ini tidak didukung untuk tabel yang diatur.

  • Hanya format berikut ini didukung: json, csv, avro, dan parquet.

  • Untuk membuat atau memperbarui tabel dengan parquet klasifikasi, Anda harus menggunakan penulis parket yang AWS Glue dioptimalkan untuk. DynamicFrames Ini dapat dicapai dengan salah satu dari yang berikut:

    • Jika Anda memperbarui tabel yang ada dalam katalog dengan parquet klasifikasi, tabel harus memiliki properti "useGlueParquetWriter" tabel yang disetel ke true sebelum Anda memperbaruinya. Anda dapat mengatur properti ini melalui AWS Glue APIS/SDK, melalui konsol atau melalui pernyataan Athena DDL.

      Bidang edit properti tabel katalog di AWS Glue konsol.

      Setelah properti tabel katalog diatur, Anda dapat menggunakan potongan kode berikut untuk memperbarui tabel katalog dengan data baru:

      glueContext.write_dynamic_frame.from_catalog( frame=frameToWrite, database="dbName", table_name="tableName", additional_options={ "enableUpdateCatalog": True, "updateBehavior": "UPDATE_IN_DATABASE" } )
    • Jika tabel belum ada dalam katalog, Anda dapat menggunakan getSink() metode dalam skrip Anda connection_type="s3" untuk menambahkan tabel dan partisi ke katalog, bersama dengan menulis data ke Amazon S3. Berikan yang sesuai partitionKeys dan compression untuk alur kerja Anda.

      s3sink = glueContext.getSink( path="s3://bucket/folder/", connection_type="s3", updateBehavior="UPDATE_IN_DATABASE", partitionKeys=[], compression="snappy", enableUpdateCatalog=True ) s3sink.setCatalogInfo( catalogDatabase="dbName", catalogTableName="tableName" ) s3sink.setFormat("parquet", useGlueParquetWriter=true) s3sink.writeFrame(frameToWrite)
    • Nilai glueparquet format adalah metode warisan yang memungkinkan penulis AWS Glue parket.

  • Saat updateBehavior diatur ke LOG, partisi baru akan ditambahkan hanya jika skema DynamicFrame setara dengan atau berisi sebuah subset dari kolom yang didefinisikan dalam skema tabel Katalog Data.

  • Pembaruan skema tidak didukung untuk tabel non-partisi (tidak menggunakan opsi “PartitionKeys”).

  • PartitionKeys Anda harus setara, dan dalam urutan yang sama, antara parameter Anda yang diberikan dalam skrip ETL dan PartitionKeys dalam skema tabel Katalog Data Anda.

  • Fitur ini saat ini belum men-support pembaruan/pembuatan tabel di mana skema yang memperbarui bersarang (misalnya, array dalam struct).

Untuk informasi selengkapnya, lihat Pemrograman skrip Spark.