Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.
Lanjutan AWS Glue konsep streaming
Dalam aplikasi berbasis data kontemporer, signifikansi data berkurang dari waktu ke waktu dan transisi nilainya dari prediktif menjadi reaktif. Akibatnya, pelanggan ingin memproses data secara real-time untuk membuat keputusan yang lebih cepat. Saat berhadapan dengan umpan data real-time, seperti dari sensor IoT, data mungkin tiba tanpa urutan atau mengalami penundaan pemrosesan karena latensi jaringan dan kegagalan terkait sumber lainnya selama konsumsi. Sebagai bagian dari AWS Glue platform, AWS Glue Streaming dibangun di atas kemampuan ini untuk menyediakan ETL streaming tanpa server yang dapat diskalakan, didukung oleh streaming terstruktur Apache Spark, memberdayakan pengguna dengan pemrosesan data waktu nyata.
Dalam topik ini, kita akan mengeksplorasi konsep dan kemampuan streaming lanjutan AWS Glue Streaming.
Pertimbangan waktu saat memproses aliran
Ada empat pengertian waktu saat memproses aliran:

-
Event-time — Waktu di mana peristiwa itu terjadi. Dalam kebanyakan kasus, bidang ini disematkan ke dalam data-peristiwa itu sendiri, di sumbernya.
-
E vent-time-window - Kerangka waktu antara dua waktu peristiwa. Seperti yang ditunjukkan pada diagram di atas, W1 adalah event-time-window dari 17:00 hingga 17:10. Masing-masing event-time-window adalah pengelompokan beberapa peristiwa.
-
Trigger-time — Waktu pemicu mengontrol seberapa sering pemrosesan data dan pemutakhiran hasil terjadi. Ini adalah waktu ketika pemrosesan batch mikro dimulai.
-
Waktu konsumsi — Waktu ketika data-aliran dicerna ke dalam layanan streaming. Jika waktu acara tidak disematkan ke dalam acara itu sendiri, kali ini dapat digunakan untuk windowing dalam beberapa kasus.
Jendela
Windowing adalah teknik di mana Anda mengelompokkan dan menggabungkan beberapa peristiwa dengan. event-time-window Kami akan mengeksplorasi manfaat windowing dan kapan Anda akan menggunakannya dalam contoh berikut.
Tergantung pada kasus penggunaan bisnis, ada tiga jenis jendela waktu yang didukung oleh percikan.
-
Tumbling window — serangkaian ukuran event-time-windows tetap yang tidak tumpang tindih di mana Anda agregat.
-
Jendela geser — mirip dengan jendela yang jatuh dari titik “berukuran tetap”, tetapi jendela dapat tumpang tindih atau meluncur selama durasi slide lebih kecil dari durasi jendela itu sendiri.
-
Jendela sesi — dimulai dengan peristiwa data input dan terus memperluas dirinya sendiri selama menerima input dalam celah atau durasi tidak aktif. Jendela sesi dapat memiliki ukuran statis atau dinamis dari panjang jendela, tergantung pada input.
Jendela tumbling
Tumbling window adalah serangkaian ukuran event-time-windows tetap yang tidak tumpang tindih yang Anda agregat. Mari kita pahami ini dengan contoh dunia nyata.

Perusahaan ABC Auto ingin melakukan kampanye pemasaran untuk merek baru mobil sport. Mereka ingin memilih kota di mana mereka memiliki penggemar mobil sport terbesar. Untuk mencapai tujuan ini, mereka menampilkan iklan pendek 15 detik yang memperkenalkan mobil di situs web mereka. Semua “klik “dan” kota “yang sesuai direkam dan dialirkan ke. Amazon Kinesis Data Streams Kami ingin menghitung jumlah klik dalam jendela 10 menit dan mengelompokkannya berdasarkan kota untuk melihat kota mana yang memiliki permintaan tertinggi. Berikut ini adalah output dari agregasi.
window_start_time | window_end_time | kota | total_clicks |
---|---|---|---|
2023-07-10 17:00:00 | 2023-07-10 17:10:00 | Dallas | 75 |
2023-07-10 17:00:00 | 2023-07-10 17:10:00 | Chicago | 10 |
2023-07-10 17:20:00 | 2023-07-10 17:30:00 | Dallas | 20 |
2023-07-10 17:20:00 | 2023-07-10 17:30:00 | Chicago | 50 |
Seperti dijelaskan di atas, ini event-time-windows berbeda dari interval waktu pemicu. Misalnya, bahkan jika waktu pemicu Anda setiap menit, hasil output hanya akan menampilkan jendela agregasi 10 menit yang tidak tumpang tindih. Untuk pengoptimalan, lebih baik agar interval pemicu selaras dengan. event-time-window
Pada tabel di atas, Dallas melihat 75 klik di jendela 17:00-17:10, sementara Chicago memiliki 10 klik. Juga, tidak ada data untuk jendela 17:10 - 17:20 untuk kota mana pun, jadi jendela ini dihilangkan.
Sekarang Anda dapat menjalankan analisis lebih lanjut tentang data ini di aplikasi analitik hilir untuk menentukan kota paling eksklusif untuk menjalankan kampanye pemasaran.
Menggunakan jendela tumbling di AWS Glue
-
Buat Amazon Kinesis Data Streams DataFrame dan baca darinya. Contoh:
parsed_df = kinesis_raw_df \ .selectExpr('CAST(data AS STRING)') \ .select(from_json("data", ticker_schema).alias("data")) \ .select('data.event_time','data.ticker','data.trade','data.volume', 'data.price')
-
Memproses data di jendela yang jatuh. Dalam contoh di bawah ini, data dikelompokkan berdasarkan bidang input “event_time” dalam jendela tumbling 10 menit dan menulis output ke danau data Amazon S3.
grouped_df = parsed_df \ .groupBy(window("event_time", "10 minutes"), "city") \ .agg(sum("clicks").alias("total_clicks")) summary_df = grouped_df \ .withColumn("window_start_time", col("window.start")) \ .withColumn("window_end_time", col("window.end")) \ .withColumn("year", year("window_start_time")) \ .withColumn("month", month("window_start_time")) \ .withColumn("day", dayofmonth("window_start_time")) \ .withColumn("hour", hour("window_start_time")) \ .withColumn("minute", minute("window_start_time")) \ .drop("window") write_result = summary_df \ .writeStream \ .format("parquet") \ .trigger(processingTime="10 seconds") \ .option("checkpointLocation", "s3a://bucket-stock-stream/stock-stream-catalog-job/checkpoint/") \ .option("path", "s3a://bucket-stock-stream/stock-stream-catalog-job/summary_output/") \ .partitionBy("year", "month", "day") \ .start()
Jendela geser
Jendela geser mirip dengan jendela yang jatuh dari titik “berukuran tetap”, tetapi jendela dapat tumpang tindih atau meluncur selama durasi slide lebih kecil dari durasi jendela itu sendiri. Karena sifat geser, input dapat terikat ke beberapa jendela.

Untuk lebih memahami, mari kita perhatikan contoh bank yang ingin mendeteksi potensi penipuan kartu kredit. Aplikasi streaming dapat memantau aliran transaksi kartu kredit yang berkelanjutan. Transaksi ini dapat digabungkan ke dalam jendela dengan durasi 10 menit dan setiap 5 menit, jendela akan meluncur ke depan, menghilangkan data 5 menit tertua dan menambahkan 5 menit data baru terbaru. Dalam setiap jendela, transaksi dapat dikelompokkan berdasarkan negara yang memeriksa pola yang mencurigakan, seperti transaksi di AS segera diikuti oleh yang lain di Australia. Untuk mempermudah, mari kita mengkategorikan transaksi tersebut sebagai penipuan ketika jumlah total transaksi lebih besar dari $100. Jika pola seperti itu terdeteksi, itu menandakan potensi penipuan dan kartu bisa dibekukan.
Sistem pemrosesan kartu kredit mengirimkan uap peristiwa transaksi ke kinesis untuk setiap card-id bersama dengan negara. AWS Glue Pekerjaan menjalankan analisis dan menghasilkan output agregat berikut.
window_start_time | window_end_time | card_last_four | negeri | total_amount |
---|---|---|---|---|
2023-07-10 17:00:00 | 2023-07-10 17:10:00 | 6544 | AS | 85 |
2023-07-10 17:00:00 | 2023-07-10 17:10:00 | 6544 | Australia | 10 |
2023-07-10 17:05:45 | 2023-07-10 17:15:45 | 6544 | AS | 50 |
2023-07-10 17:10:45 | 2023-07-10 17:20:45 | 6544 | AS | 50 |
2023-07-10 17:10:45 | 2023-07-10 17:20:45 | 6544 | Australia | 150 |
Berdasarkan agregasi di atas, Anda dapat melihat jendela 10 menit meluncur setiap 5 menit, dijumlahkan dengan jumlah transaksi. Anomali terdeteksi di jendela 17:10 - 17:20 di mana ada outlier, yang merupakan transaksi seharga $150 di Australia. AWS Glue dapat mendeteksi anomali ini dan mendorong peristiwa alarm dengan kunci yang menyinggung ke topik SNS menggunakan boto3. Selanjutnya fungsi Lambda dapat berlangganan topik ini dan mengambil tindakan.
Memproses data di jendela geser
group-by
Klausa dan fungsi jendela digunakan untuk mengimplementasikan jendela geser seperti yang ditunjukkan di bawah ini.
grouped_df = parsed_df \ .groupBy(window(col("event_time"), "10 minute", "5 min"), "country", "card_last_four") \ .agg(sum("tx_amount").alias("total_amount"))
Jendela sesi
Berbeda dengan dua jendela di atas yang memiliki ukuran tetap, jendela sesi dapat memiliki ukuran statis atau dinamis dari panjang jendela, tergantung pada input. Jendela sesi dimulai dengan peristiwa data input dan terus memperluas dirinya sendiri selama menerima input dalam celah atau durasi tidak aktif.

Mari kita ambil contoh. Perusahaan ABC hotel ingin mencari tahu kapan waktu tersibuk dalam seminggu dan memberikan penawaran yang lebih baik untuk tamu mereka. Segera setelah tamu check-in, jendela sesi dimulai dan spark mempertahankan status dengan agregasi untuk itu. event-time-window Setiap kali tamu check-in, acara dibuat dan dikirim ke Amazon Kinesis Data Streams. Hotel membuat keputusan bahwa jika tidak ada check-in untuk jangka waktu 15 menit, event-time-window dapat ditutup. Selanjutnya event-time-window akan dimulai lagi ketika ada check-in baru. Outputnya terlihat sebagai berikut.
window_start_time | window_end_time | kota | total_checkins |
---|---|---|---|
2023-07-10 17:02:00 | 2023-07-10 17:30:00 | Dallas | 50 |
2023-07-10 17:02:00 | 2023-07-10 17:30:00 | Chicago | 25 |
2023-07-10 17:40:00 | 2023-07-10 18:20:00 | Dallas | 75 |
2023-07-10 18:50:45 | 2023-07-10 19:15:45 | Dallas | 20 |
Check-in pertama terjadi pada event_time= 17:02. Agregasi event-time-window akan dimulai pada 17:02. Agregasi ini akan berlanjut selama kami menerima acara dalam durasi 15 menit. Dalam contoh di atas, acara terakhir yang kami terima adalah pukul 17:15 dan kemudian selama 15 menit berikutnya tidak ada acara. Akibatnya, Spark menutupnya event-time-window pada 17:15 +15 menit = 17:30 dan mengaturnya sebagai 17:02 - 17:30. Ini memulai yang baru event-time-window pada 17:47 ketika menerima acara data check-in baru.
Memproses data di jendela sesi
group-by
Klausa dan fungsi jendela digunakan untuk mengimplementasikan jendela geser.
grouped_df = parsed_df \ .groupBy(session_window(col("event_time"), "10 minute"), "city") \ .agg(count("check_in").alias("total_checkins"))
Mode keluaran
Mode output adalah mode di mana hasil dari tabel tak terbatas ditulis ke wastafel eksternal. Ada tiga mode yang tersedia. Dalam contoh berikut, Anda menghitung kemunculan kata saat baris data sedang dialirkan dan diproses di setiap batch mikro.
-
Mode lengkap - Seluruh tabel hasil akan ditulis ke wastafel setelah setiap pemrosesan batch mikro meskipun jumlah kata tidak diperbarui saat ini event-time-window.
-
Mode Append — Ini adalah mode default, di mana hanya kata-kata dan atau baris baru yang ditambahkan ke tabel hasil karena pemicu terakhir akan ditulis ke wastafel. Mode ini bagus untuk streaming stateless untuk kueri seperti peta, FlatMap, filter, dll.
-
Mode pembaruan - Hanya kata dan atau baris di Tabel Hasil yang diperbarui atau ditambahkan sejak pemicu terakhir yang akan ditulis ke wastafel.
catatan
Mode keluaran = “pembaruan” tidak didukung untuk jendela sesi.
Menangani data dan tanda air yang terlambat
Saat bekerja dengan data real-time mungkin ada penundaan kedatangan data karena latensi jaringan dan kegagalan hulu dan kami memerlukan mekanisme untuk melakukan agregasi lagi pada yang terlewat. event-time-window Namun, untuk melakukan ini, negara perlu dipertahankan. Pada saat yang sama, data yang lebih lama perlu dibersihkan untuk membatasi ukuran negara. Spark versi 2.1 menambahkan dukungan untuk fitur yang disebut watermarking yang mempertahankan status dan memungkinkan pengguna untuk menentukan ambang batas untuk data terlambat.
Dengan mengacu pada contoh ticker saham kami di atas, mari pertimbangkan ambang batas yang diizinkan untuk data terlambat sebagai tidak lebih dari 10 menit. Untuk membuatnya tetap sederhana, kita akan menganggap jendela jatuh, ticker sebagai AMZ, diperdagangkan sebagai BELI.

Dalam diagram di atas, kita menghitung total volume di atas jendela 10 menit yang jatuh. Kami memiliki pemicu pada 17:00, 17:10 dan 17:20. Di atas panah timeline, kami memiliki aliran data input dan di bawah ini adalah tabel hasil tak terbatas.
Di jendela jatuh 10 menit pertama kami agregasi berdasarkan event_time dan total_volume dihitung sebagai 30. Yang kedua event-time-window, spark mendapatkan peristiwa data pertama dengan event_time= 17:02. Karena ini adalah event_time maks yang terlihat sejauh ini oleh percikan, ambang batas tanda air disetel 10 menit yang lalu (yaitu, watermark_event_time= 16:52). Setiap peristiwa data dengan event_time setelah 16:52 akan dipertimbangkan untuk agregasi terikat waktu dan peristiwa data apa pun sebelum itu akan dihapus. Hal ini memungkinkan percikan untuk mempertahankan status perantara selama 10 menit tambahan untuk mengakomodasi data yang terlambat. Sekitar waktu jam dinding 17:08 Spark menerima acara dengan event_time= 16:54 yang berada dalam ambang batas. Oleh karena itu spark menghitung ulang “16:50 - 17:00 “ event-time-windowdan total volume diperbarui dari 30 menjadi 60.
Namun, pada waktu pemicu 17:20, ketika spark menerima acara dengan event_time= 17:15 itu mengatur watermark_event_time= 17:05. Oleh karena itu peristiwa data terlambat dengan event_time= 17:03 dianggap “terlambat” dan diabaikan.
Watermark Boundary = Max(Event Time) - Watermark Threshold
Menggunakan tanda air di AWS Glue
Spark tidak akan memancarkan atau menulis data ke wastafel eksternal sampai batas watermark dilewati. Untuk menerapkan tanda air di AWS Glue, lihat contoh di bawah ini.
grouped_df = parsed_df \ .withWatermark("event_time", "10 minutes") \ .groupBy(window("event_time", "5 minutes"), "ticker") \ .agg(sum("volume").alias("total_volume"))