Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.
Langkah 5: Menerapkan konsumen
Aplikasi konsumen dalam tutorial ini terus memproses perdagangan saham dalam aliran data Anda. Ini kemudian menghasilkan saham paling populer yang dibeli dan dijual setiap menit. Aplikasi ini dibangun di atas Kinesis Client Library (KCL), yang melakukan banyak pekerjaan berat yang umum untuk aplikasi konsumen. Untuk informasi selengkapnya, lihat Gunakan Perpustakaan Klien Kinesis.
Lihat kode sumber dan tinjau informasi berikut.
- StockTradesProcessor kelas
-
Kelas utama konsumen, disediakan untuk Anda, yang melakukan tugas-tugas berikut:
-
Membaca aplikasi, aliran data, dan nama Wilayah, diteruskan sebagai argumen.
-
Membuat
KinesisAsyncClient
instance dengan nama Region. -
Menciptakan sebuah
StockTradeRecordProcessorFactory
instance yang melayani instance dariShardRecordProcessor
, diimplementasikan oleh sebuahStockTradeRecordProcessor
instance. -
Menciptakan sebuah
ConfigsBuilder
instance denganKinesisAsyncClient
,StreamName
,ApplicationName
, danStockTradeRecordProcessorFactory
instance. Ini berguna untuk membuat semua konfigurasi dengan nilai default. -
Membuat KCL penjadwal (sebelumnya, dalam KCL versi 1.x itu dikenal sebagai KCL pekerja) dengan instance.
ConfigsBuilder
-
Scheduler membuat thread baru untuk setiap shard (ditugaskan ke instance konsumen ini), yang terus-menerus melakukan loop untuk membaca catatan dari aliran data. Kemudian memanggil
StockTradeRecordProcessor
instance untuk memproses setiap batch catatan yang diterima.
-
- StockTradeRecordProcessor kelas
-
Implementasi
StockTradeRecordProcessor
instance, yang pada gilirannya mengimplementasikan lima metode yang diperlukan:initialize
,processRecords
,leaseLost
,shardEnded
, danshutdownRequested
.shutdownRequested
Metodeinitialize
dan digunakan oleh KCL untuk memberi tahu prosesor rekaman kapan harus siap untuk mulai menerima catatan dan kapan harus berhenti menerima catatan, masing-masing, sehingga dapat melakukan tugas pengaturan dan penghentian khusus aplikasi apa pun.leaseLost
danshardEnded
digunakan untuk menerapkan logika apa pun untuk apa yang harus dilakukan ketika sewa hilang atau pemrosesan telah mencapai akhir pecahan. Dalam contoh ini, kami cukup mencatat pesan yang menunjukkan peristiwa ini.Kode untuk metode ini disediakan untuk Anda. Pemrosesan utama terjadi dalam
processRecords
metode, yang pada gilirannya digunakanprocessRecord
untuk setiap catatan. Metode terakhir ini disediakan sebagai kode kerangka yang sebagian besar kosong untuk Anda terapkan pada langkah berikutnya, di mana dijelaskan secara lebih rinci.Yang juga perlu diperhatikan adalah penerapan metode dukungan untuk
processRecord
:reportStats
, danresetStats
, yang kosong dalam kode sumber asli.processRecords
Metode ini diterapkan untuk Anda, dan melakukan langkah-langkah berikut:-
Untuk setiap catatan yang dilewatkan, ia
processRecord
memanggilnya. -
Jika setidaknya 1 menit telah berlalu sejak laporan terakhir, panggilan
reportStats()
, yang mencetak statistik terbaru, dan kemudianresetStats()
yang menghapus statistik sehingga interval berikutnya hanya mencakup catatan baru. -
Menetapkan waktu pelaporan berikutnya.
-
Jika setidaknya 1 menit telah berlalu sejak pos pemeriksaan terakhir, hubungi.
checkpoint()
-
Menetapkan waktu checkpointing berikutnya.
Metode ini menggunakan interval 60 detik untuk tingkat pelaporan dan pos pemeriksaan. Untuk informasi selengkapnya tentang checkpointing, lihat Menggunakan Perpustakaan Klien Kinesis.
-
- StockStats kelas
-
Kelas ini menyediakan retensi data dan pelacakan statistik untuk saham paling populer dari waktu ke waktu. Kode ini disediakan untuk Anda dan berisi metode berikut:
-
addStockTrade(StockTrade)
: menyuntikkan yang diberikanStockTrade
ke dalam statistik yang sedang berjalan. -
toString()
: mengembalikan statistik dalam string diformat.
Kelas ini melacak saham paling populer dengan menjaga hitungan berjalan dari jumlah total perdagangan untuk setiap saham dan jumlah maksimum. Ini memperbarui jumlah ini setiap kali perdagangan saham tiba.
-
Tambahkan kode ke metode StockTradeRecordProcessor
kelas, seperti yang ditunjukkan pada langkah-langkah berikut.
Untuk mengimplementasikan konsumen
-
Terapkan
processRecord
metode dengan membuat instanceStockTrade
objek berukuran benar dan menambahkan data catatan ke dalamnya, mencatat peringatan jika ada masalah.byte[] arr = new byte[record.data().remaining()]; record.data().get(arr); StockTrade trade = StockTrade.fromJsonAsBytes(arr); if (trade == null) { log.warn("Skipping record. Unable to parse record into StockTrade. Partition Key: " + record.partitionKey()); return; } stockStats.addStockTrade(trade);
-
Menerapkan
reportStats
metode. Ubah format output agar sesuai dengan preferensi Anda.System.out.println("****** Shard " + kinesisShardId + " stats for last 1 minute ******\n" + stockStats + "\n" + "****************************************************************\n");
-
Menerapkan
resetStats
metode, yang menciptakanstockStats
instance baru.stockStats = new StockStats();
-
Menerapkan metode berikut yang diperlukan oleh
ShardRecordProcessor
antarmuka:@Override public void leaseLost(LeaseLostInput leaseLostInput) { log.info("Lost lease, so terminating."); } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { log.info("Reached shard end checkpointing."); shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { log.error("Exception while checkpointing at shard end. Giving up.", e); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { log.info("Scheduler is shutting down, checkpointing."); checkpoint(shutdownRequestedInput.checkpointer()); } private void checkpoint(RecordProcessorCheckpointer checkpointer) { log.info("Checkpointing shard " + kinesisShardId); try { checkpointer.checkpoint(); } catch (ShutdownException se) { // Ignore checkpoint if the processor instance has been shutdown (fail over). log.info("Caught shutdown exception, skipping checkpoint.", se); } catch (ThrottlingException e) { // Skip checkpoint when throttled. In practice, consider a backoff and retry policy. log.error("Caught throttling exception, skipping checkpoint.", e); } catch (InvalidStateException e) { // This indicates an issue with the DynamoDB table (check for table, provisioned IOPS). log.error("Cannot save checkpoint to the DynamoDB table used by the Amazon Kinesis Client Library.", e); } }
Untuk menjalankan konsumen
-
Jalankan produser yang Anda tulis Langkah 4: Menerapkan produsen untuk menyuntikkan catatan perdagangan saham simulasi ke aliran Anda.
-
Verifikasi bahwa kunci akses dan key pair rahasia yang diambil sebelumnya (saat membuat IAM pengguna) disimpan dalam file
~/.aws/credentials
. -
Jalankan
StockTradesProcessor
kelas dengan argumen berikut:StockTradesProcessor StockTradeStream us-west-2
Perhatikan bahwa jika Anda membuat streaming di Wilayah selain
us-west-2
, Anda harus menentukan Wilayah tersebut di sini.
Setelah satu menit, Anda akan melihat output seperti berikut, disegarkan setiap menit setelahnya:
****** Shard shardId-000000000001 stats for last 1 minute ******
Most popular stock being bought: WMT, 27 buys.
Most popular stock being sold: PTR, 14 sells.
****************************************************************
Langkah selanjutnya
Langkah 6: (Opsional) Memperluas konsumen