Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.
Bekerja dengan operasi streaming dalam sesi AWS Glue interaktif
Mengalihkan jenis sesi streaming
Gunakan sihir konfigurasi sesi AWS Glue interaktif,%streaming
, untuk menentukan pekerjaan yang Anda jalankan dan menginisialisasi sesi interaktif streaming.
Sampling input stream untuk pengembangan interaktif
Salah satu alat yang kami peroleh untuk membantu meningkatkan pengalaman AWS Glue interaktif dalam sesi interaktif adalah penambahan metode baru GlueContext
untuk mendapatkan snapshot aliran dalam DynamicFrame statis. GlueContext
memungkinkan Anda untuk memeriksa, berinteraksi, dan mengimplementasikan alur kerja Anda.
Dengan instance GlueContext
kelas, Anda akan dapat menemukan metodegetSampleStreamingDynamicFrame
. Argumen yang diperlukan untuk metode ini adalah:
-
dataFrame
: Streaming Spark DataFrame -
options
: Lihat opsi yang tersedia di bawah
Pilihan yang tersedia meliputi:
-
WindowSize: Ini juga disebut Durasi Microbatch. Parameter ini akan menentukan berapa lama kueri streaming akan menunggu setelah batch sebelumnya dipicu. Nilai parameter ini harus lebih kecil dari
pollingTimeInMs
. -
pollingTimeInMs: Total panjang waktu metode akan berjalan. Ini akan menembakkan setidaknya satu batch mikro untuk mendapatkan catatan sampel dari aliran input.
-
recordPollingLimit: Parameter ini membantu Anda membatasi jumlah total catatan yang akan Anda polling dari aliran.
-
(Opsional) Anda juga dapat menggunakan
writeStreamFunction
untuk menerapkan fungsi kustom ini ke setiap fungsi pengambilan sampel rekaman. Lihat di bawah untuk contoh di Scala dan Python.
catatan
Ketika sampel DynFrame
kosong, itu bisa disebabkan oleh beberapa alasan:
-
Sumber Streaming diatur ke “Terbaru” dan tidak ada data baru yang tertelan selama periode pengambilan sampel.
-
Waktu pemungutan suara tidak cukup untuk memproses catatan yang dicerna. Data tidak akan muncul kecuali seluruh batch telah diproses.
Menjalankan aplikasi streaming dalam sesi interaktif
Dalam sesi AWS Glue interaktif, Anda dapat menjalankan aplikasi AWS Glue streaming seperti bagaimana Anda akan membuat aplikasi streaming di AWS Glue Konsol. Karena sesi interaktif berbasis sesi, menghadapi pengecualian di runtime tidak menyebabkan sesi berhenti. Kami sekarang memiliki manfaat tambahan untuk mengembangkan fungsi batch Anda secara iteratif. Misalnya:
def batch_function(data_frame, batch_id): log.info(data_frame.count()) invalid_method_call() glueContext.forEachBatch(frame=streaming_df, batch_function = batch_function, options = {**})
Dalam contoh di atas, kami menyertakan penggunaan metode yang tidak valid dan tidak seperti AWS Glue pekerjaan biasa yang akan keluar dari seluruh aplikasi, konteks dan definisi pengkodean pengguna sepenuhnya dipertahankan dan sesi masih beroperasi. Tidak perlu mem-bootstrap cluster baru dan menjalankan kembali semua transformasi sebelumnya. Ini memungkinkan Anda untuk fokus pada iterasi implementasi fungsi batch Anda dengan cepat untuk mendapatkan hasil yang diinginkan.
Penting untuk dicatat bahwa Sesi Interaktif mengevaluasi setiap pernyataan dengan cara pemblokiran sehingga sesi hanya akan mengeksekusi satu pernyataan pada satu waktu. Karena kueri streaming terus menerus dan tidak pernah berakhir, sesi dengan kueri streaming aktif tidak akan dapat menangani pernyataan tindak lanjut apa pun kecuali jika terputus. Anda dapat mengeluarkan perintah interupsi langsung dari Jupyter Notebook dan kernel kami akan menangani pembatalan untuk Anda.
Ambil urutan pernyataan berikut yang menunggu eksekusi sebagai contoh:
Statement 1: val number = df.count() #Spark Action with deterministic result Result: 5 Statement 2: streamingQuery.start().awaitTermination() #Spark Streaming Query that will be executing continously Result: Constantly updated with each microbatch Statement 3: val number2 = df.count() #This will not be executed as previous statement will be running indefinitely