Menggunakan bookmark pekerjaan - AWS Glue

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Menggunakan bookmark pekerjaan

AWS Glueuntuk Spark menggunakan bookmark pekerjaan untuk melacak data yang telah diproses. Untuk ringkasan fitur bookmark pekerjaan dan apa yang didukungnya, lihatMelacak data yang diproses menggunakan bookmark pekerjaan. Saat memprogram AWS Glue pekerjaan dengan bookmark, Anda memiliki akses ke fleksibilitas yang tidak tersedia dalam pekerjaan visual.

  • Saat membaca dari JDBC, Anda dapat menentukan kolom yang akan digunakan sebagai tombol bookmark dalam skrip Anda. AWS Glue

  • Anda dapat memilih mana transformation_ctx yang akan diterapkan pada setiap panggilan metode.

Selalu panggil job.init di awal skrip dan job.commit di akhir skrip dengan parameter yang dikonfigurasi dengan tepat. Kedua fungsi ini menginisialisasi layanan bookmark dan memperbarui perubahan status ke layanan. Bookmark tidak akan berfungsi tanpa memanggilnya.

Tentukan tombol bookmark

Untuk alur kerja JDBC, bookmark melacak baris mana yang telah dibaca pekerjaan Anda dengan membandingkan nilai bidang kunci dengan nilai yang ditandai. Ini tidak perlu atau berlaku untuk alur kerja Amazon S3. Saat menulis AWS Glue skrip tanpa editor visual, Anda dapat menentukan kolom mana yang akan dilacak dengan bookmark. Anda juga dapat menentukan beberapa kolom. Kesenjangan dalam urutan nilai diizinkan saat menentukan kunci bookmark yang ditentukan pengguna.

Awas

Jika tombol bookmark yang ditentukan pengguna digunakan, masing-masing tombol tersebut harus benar-benar meningkat atau menurun secara monoton. Saat memilih bidang tambahan untuk kunci majemuk, bidang untuk konsep seperti “versi minor” atau “nomor revisi” tidak memenuhi kriteria ini, karena nilainya digunakan kembali di seluruh kumpulan data Anda.

Anda dapat menentukan jobBookmarkKeys dan dengan jobBookmarkKeysSortOrder cara berikut:

  • create_dynamic_frame.from_catalog— Gunakanadditional_options.

  • create_dynamic_frame.from_options— Gunakanconnection_options.

Konteks transformasi

Banyak metode frame AWS Glue PySpark dinamis menyertakan parameter opsional bernamatransformation_ctx, yang merupakan pengidentifikasi unik untuk instance operator ETL. Parameter transformation_ctx digunakan untuk mengidentifikasi informasi status dalam bookmark tugas untuk operator tertentu. Secara khusus, AWS Glue menggunakan transformation_ctx untuk mengindeks kunci ke status bookmark.

Awas

Ini transformation_ctx berfungsi sebagai kunci untuk mencari status bookmark untuk sumber tertentu dalam skrip Anda. Agar bookmark berfungsi dengan baik, Anda harus selalu menjaga sumber dan yang terkait transformation_ctx konsisten. Mengubah properti sumber atau mengganti nama transformation_ctx dapat membuat bookmark sebelumnya tidak valid dan pemfilteran berbasis stempel waktu mungkin tidak menghasilkan hasil yang benar.

Agar bookmark tugas bekerja dengan benar, aktifkan parameter bookmark tugas dan atur parameter transformation_ctx. Jika Anda tidak memberikan parameter transformation_ctx, maka bookmark tugas tidak akan diaktifkan untuk bingkai dinamis atau tabel yang digunakan dalam metode tersebut. Sebagai contoh, jika Anda memiliki tugas ETL yang membaca dan menggabungkan dua sumber Amazon S3, maka Anda dapat memilih untuk memberikan parameter transformation_ctx hanya untuk metode-metode yang Anda ingin aktifkan bookmark-nya. Jika Anda menyetel ulang bookmark tugas untuk sebuah tugas, hal itu akan me-reset semua transformasi yang dikaitkan dengan tugas tersebut terlepas dari transformation_ctx yang digunakan.

Untuk informasi lebih lanjut tentang kelas DynamicFrameReader, lihat DynamicFrameReader kelas. Untuk informasi selengkapnya tentang PySpark ekstensi, lihatAWS Glue PySpark referensi ekstensi.

Contoh-contoh

Berikut ini adalah contoh skrip yang dihasilkan untuk sumber data Amazon S3. Bagian dari skrip yang diperlukan untuk menggunakan bookmark pekerjaan ditampilkan dalam huruf miring. Untuk informasi selengkapnya tentang elemen-elemen ini lihat API GlueContext kelas, dan API DynamicFrameWriter kelas.

# Sample Script import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) datasource0 = glueContext.create_dynamic_frame.from_catalog( database = "database", table_name = "relatedqueries_csv", transformation_ctx = "datasource0" ) applymapping1 = ApplyMapping.apply( frame = datasource0, mappings = [("col0", "string", "name", "string"), ("col1", "string", "number", "string")], transformation_ctx = "applymapping1" ) datasink2 = glueContext.write_dynamic_frame.from_options( frame = applymapping1, connection_type = "s3", connection_options = {"path": "s3://input_path"}, format = "json", transformation_ctx = "datasink2" ) job.commit()

Berikut ini adalah contoh skrip yang dihasilkan untuk sumber JDBC. Tabel sumber adalah tabel karyawan dengan kolom empno sebagai kunci primer. Meskipun secara default tugas menggunakan kunci primer berurutan sebagai kunci bookmark jika tidak ada kunci bookmark yang ditentukan, karena empno belum tentu secara berurutan—mungkin ada celah dalam nilai—maka ia tidak memenuhi syarat sebagai kunci bookmark default. Oleh karena itu, skrip secara eksplisit menetapkan empno sebagai kunci bookmark. Bagian kode itu ditampilkan dalam huruf miring.

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) datasource0 = glueContext.create_dynamic_frame.from_catalog( database = "hr", table_name = "emp", transformation_ctx = "datasource0", additional_options = {"jobBookmarkKeys":["empno"],"jobBookmarkKeysSortOrder":"asc"} ) applymapping1 = ApplyMapping.apply( frame = datasource0, mappings = [("ename", "string", "ename", "string"), ("hrly_rate", "decimal(38,0)", "hrly_rate", "decimal(38,0)"), ("comm", "decimal(7,2)", "comm", "decimal(7,2)"), ("hiredate", "timestamp", "hiredate", "timestamp"), ("empno", "decimal(5,0)", "empno", "decimal(5,0)"), ("mgr", "decimal(5,0)", "mgr", "decimal(5,0)"), ("photo", "string", "photo", "string"), ("job", "string", "job", "string"), ("deptno", "decimal(3,0)", "deptno", "decimal(3,0)"), ("ssn", "decimal(9,0)", "ssn", "decimal(9,0)"), ("sal", "decimal(7,2)", "sal", "decimal(7,2)")], transformation_ctx = "applymapping1" ) datasink2 = glueContext.write_dynamic_frame.from_options( frame = applymapping1, connection_type = "s3", connection_options = {"path": "s3://hr/employees"}, format = "csv", transformation_ctx = "datasink2" ) job.commit()