Anatomi pekerjaan AWS Glue streaming - AWS Glue

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Anatomi pekerjaan AWS Glue streaming

AWS Gluepekerjaan streaming beroperasi pada paradigma streaming Spark dan memanfaatkan streaming terstruktur dari kerangka Spark. Pekerjaan streaming terus-menerus melakukan polling pada sumber data streaming, pada interval waktu tertentu, untuk mengambil catatan sebagai batch mikro. Bagian berikut memeriksa bagian-bagian yang berbeda dari pekerjaan AWS Glue streaming.

Tangkapan layar menunjukkan log Amazon CloudWatch Pemantauan, AWS Glue untuk contoh yang diberikan di atas dan melihat jumlah pelaksana yang diperlukan (Garis Oranye) dan skala pelaksana (garis biru) untuk mencocokkannya tanpa perlu penyesuaian manual.

forEachBatch

forEachBatchMetode ini adalah titik masuk dari pekerjaan AWS Glue streaming. AWS Gluepekerjaan streaming menggunakan forEachBatch metode untuk polling data yang berfungsi seperti iterator yang tetap aktif selama siklus hidup pekerjaan streaming dan secara teratur melakukan polling sumber streaming untuk data baru dan memproses data terbaru dalam batch mikro.

glueContext.forEachBatch( frame=dataFrame_AmazonKinesis_node1696872487972, batch_function=processBatch, options={ "windowSize": "100 seconds", "checkpointLocation": args["TempDir"] + "/" + args["JOB_NAME"] + "/checkpoint/", }, )

Konfigurasikan frame properti forEachBatch untuk menentukan sumber streaming. Dalam contoh ini, simpul sumber yang Anda buat di kanvas kosong selama pembuatan pekerjaan diisi dengan default DataFrame pekerjaan. Tetapkan batch_function properti sebagai function yang Anda putuskan untuk dipanggil untuk setiap operasi batch mikro. Anda harus menentukan fungsi untuk menangani transformasi batch pada data yang masuk.

Sumber

Pada langkah pertama processBatch fungsi, program memverifikasi jumlah catatan DataFrame yang Anda definisikan sebagai properti bingkai. forEachBatch Program ini menambahkan stempel waktu konsumsi ke yang tidak kosong. DataFrame data_frame.count()>0Klausul menentukan apakah batch mikro terbaru tidak kosong dan siap untuk diproses lebih lanjut.

def processBatch(data_frame, batchId): if data_frame.count() >0: AmazonKinesis_node1696872487972 = DynamicFrame.fromDF( glueContext.add_ingestion_time_columns(data_frame, "hour"), glueContext, "from_data_frame", )

Pemetaan

Bagian selanjutnya dari program ini adalah menerapkan pemetaan. Mapping.applyMetode pada percikan DataFrame memungkinkan Anda untuk menentukan aturan transformasi di sekitar elemen data. Biasanya Anda dapat mengganti nama, mengubah tipe data, atau menerapkan fungsi kustom pada kolom data sumber dan memetakannya ke kolom target.

#Script generated for node ChangeSchema ChangeSchema_node16986872679326 = ApplyMapping.apply( frame = AmazonKinesis_node1696872487972, mappings = [ ("eventtime", "string", "eventtime", "string"), ("manufacturer", "string", "manufacturer", "string"), ("minutevolume", "long", "minutevolume", "int"), ("o2stats", "long", "OxygenSaturation", "int"), ("pressurecontrol", "long", "pressurecontrol", "int"), ("serialnumber", "string", "serialnumber", "string"), ("ventilatorid", "long", "ventilatorid", "long"), ("ingest_year", "string", "ingest_year", "string"), ("ingest_month", "string", "ingest_month", "string"), ("ingest_day", "string", "ingest_day", "string"), ("ingest_hour", "string", "ingest_hour", "string"), ], transformation_ctx="ChangeSchema_node16986872679326", ) )

Wastafel

Di bagian ini, kumpulan data yang masuk dari sumber streaming disimpan di lokasi target. Dalam contoh ini kita akan menulis data ke lokasi Amazon S3. Detail AmazonS3_node_path properti diisi sebelumnya sebagaimana ditentukan oleh pengaturan yang Anda gunakan selama pembuatan lapangan kerja dari kanvas. Anda dapat mengatur updateBehavior berdasarkan kasus penggunaan dan memutuskan untuk tidak memperbarui tabel katalog data, atau Membuat katalog data dan memperbarui skema katalog data pada proses berikutnya, atau membuat tabel katalog dan tidak memperbarui definisi skema pada proses berikutnya.

partitionKeysProperti mendefinisikan opsi partisi penyimpanan. Perilaku default adalah mempartisi data per data ingestion_time_columns yang dibuat tersedia di bagian sumber. compressionProperti ini memungkinkan Anda untuk mengatur algoritma kompresi yang akan diterapkan selama penulisan target. Anda memiliki opsi untuk mengatur Snappy, LZO, atau GZIP sebagai teknik kompresi. enableUpdateCatalogProperti mengontrol apakah tabel AWS Glue katalog perlu diperbarui. Pilihan yang tersedia untuk properti ini adalah True atauFalse.

#Script generated for node Amazon S3 AmazonS3_node1696872743449 = glueContext.getSink( path = AmazonS3_node1696872743449_path, connection_type = "s3", updateBehavior = "UPDATE_IN_DATABASE", partitionKeys = ["ingest_year", "ingest_month", "ingest_day", "ingest_hour"], compression = "snappy", enableUpdateCatalog = True, transformation_ctx = "AmazonS3_node1696872743449", )

AWS GlueWastafel katalog

Bagian pekerjaan ini mengontrol perilaku pembaruan tabel AWS Glue katalog. Set catalogDatabase dan catalogTableName properti per nama database AWS Glue Katalog Anda dan nama tabel yang terkait dengan AWS Glue pekerjaan yang Anda desain. Anda dapat menentukan format file dari data target melalui setFormat properti. Untuk contoh ini kita akan menyimpan data dalam format parket.

Setelah Anda mengatur dan menjalankan pekerjaan AWS Glue streaming merujuk tutorial ini, data streaming yang dihasilkan Amazon Kinesis Data Streams akan disimpan di lokasi Amazon S3 dalam format parket dengan kompresi tajam. Pada menjalankan pekerjaan streaming yang berhasil, Anda akan dapat menanyakan data melaluiAmazon Athena.

AmazonS3_node1696872743449 = setCatalogInfo( catalogDatabase = "demo", catalogTableName = "demo_stream_transform_result" ) AmazonS3_node1696872743449.setFormat("glueparquet") AmazonS3_node1696872743449.writeFormat("ChangeSchema_node16986872679326") )