Bekerja dengan operasi streaming di AWS Glue sesi interaktif - AWS Glue

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Bekerja dengan operasi streaming di AWS Glue sesi interaktif

Mengalihkan jenis sesi streaming

Menggunakan AWS Glue sihir konfigurasi sesi interaktif,%streaming, untuk menentukan pekerjaan yang Anda jalankan dan menginisialisasi sesi interaktif streaming.

Sampling input stream untuk pengembangan interaktif

Salah satu alat yang kami dapatkan untuk membantu meningkatkan pengalaman interaktif di AWS Glue sesi interaktif adalah penambahan metode baru di bawah GlueContext untuk mendapatkan snapshot dari aliran dalam DynamicFrame statis. GlueContextmemungkinkan Anda untuk memeriksa, berinteraksi, dan mengimplementasikan alur kerja Anda.

Dengan instance GlueContext kelas, Anda akan dapat menemukan metodegetSampleStreamingDynamicFrame. Argumen yang diperlukan untuk metode ini adalah:

  • dataFrame: Streaming Spark DataFrame

  • options: Lihat opsi yang tersedia di bawah ini

Opsi yang tersedia meliputi:

  • windowSizeIni juga disebut Durasi Microbatch. Parameter ini akan menentukan berapa lama kueri streaming akan menunggu setelah batch sebelumnya dipicu. Nilai parameter ini harus lebih kecil daripollingTimeInMs.

  • pollingTimeInMs: Total lamanya waktu dari metode yang akan berjalan Ini akan menembakkan setidaknya satu batch mikro untuk mendapatkan catatan sampel dari aliran input.

  • recordPollingLimit: Parameter ini membantu Anda membatasi jumlah total catatan yang akan Anda polling dari aliran.

  • (Opsional) Anda juga dapat menggunakan writeStreamFunction untuk menerapkan fungsi kustom ini ke setiap fungsi pengambilan sampel rekaman. Lihat di bawah untuk contoh di Scala dan Python.

Scala
val sampleBatchFunction = (batchDF: DataFrame, batchId: Long) => {//Optional but you can replace your own forEachBatch function here} val jsonString: String = s"""{"pollingTimeInMs": "10000", "windowSize": "5 seconds"}""" val dynFrame = glueContext.getSampleStreamingDynamicFrame(YOUR_STREAMING_DF, JsonOptions(jsonString), sampleBatchFunction) dynFrame.show()
Python
def sample_batch_function(batch_df, batch_id): //Optional but you can replace your own forEachBatch function here options = { "pollingTimeInMs": "10000", "windowSize": "5 seconds", } glue_context.getSampleStreamingDynamicFrame(YOUR_STREAMING_DF, options, sample_batch_function)
catatan

Ketika sampel DynFrame kosong, itu bisa disebabkan oleh beberapa alasan:

  • Sumber Streaming diatur ke “Terbaru” dan tidak ada data baru yang tertelan selama periode pengambilan sampel.

  • Waktu pemungutan suara tidak cukup untuk memproses catatan yang dicerna. Data tidak akan muncul kecuali seluruh batch telah diproses.

Menjalankan aplikasi streaming dalam sesi interaktif

Masuk AWS Glue sesi interaktif, Anda dapat menjalankan AWS Glue aplikasi streaming seperti bagaimana Anda akan membuat aplikasi streaming di AWS Glue Konsol Karena sesi interaktif berbasis sesi, menghadapi pengecualian di runtime tidak menyebabkan sesi berhenti. Kami sekarang memiliki manfaat tambahan untuk mengembangkan fungsi batch Anda secara iteratif. Sebagai contoh:

def batch_function(data_frame, batch_id): log.info(data_frame.count()) invalid_method_call() glueContext.forEachBatch(frame=streaming_df, batch_function = batch_function, options = {**})

Dalam contoh di atas, kami menyertakan penggunaan metode yang tidak valid dan tidak seperti biasa AWS Glue pekerjaan yang akan keluar dari seluruh aplikasi, konteks dan definisi pengkodean pengguna sepenuhnya dipertahankan dan sesi masih beroperasi. Tidak perlu mem-bootstrap cluster baru dan menjalankan kembali semua transformasi sebelumnya. Ini memungkinkan Anda untuk fokus pada iterasi implementasi fungsi batch Anda dengan cepat untuk mendapatkan hasil yang diinginkan.

Penting untuk dicatat bahwa Sesi Interaktif mengevaluasi setiap pernyataan dengan cara pemblokiran sehingga sesi hanya akan mengeksekusi satu pernyataan pada satu waktu. Karena kueri streaming berkelanjutan dan tidak pernah berakhir, sesi dengan kueri streaming aktif tidak akan dapat menangani pernyataan tindak lanjut apa pun kecuali jika terputus. Anda dapat mengeluarkan perintah interupsi langsung dari Jupyter Notebook dan kernel kami akan menangani pembatalan untuk Anda.

Ambil urutan pernyataan berikut yang menunggu eksekusi sebagai contoh:

Statement 1: val number = df.count() #Spark Action with deterministic result Result: 5 Statement 2: streamingQuery.start().awaitTermination() #Spark Streaming Query that will be executing continously Result: Constantly updated with each microbatch Statement 3: val number2 = df.count() #This will not be executed as previous statement will be running indefinitely