Langkah 5: Menerapkan konsumen - Amazon Kinesis Data Streams

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Langkah 5: Menerapkan konsumen

Aplikasi konsumen dalam tutorial ini terus memproses perdagangan saham dalam aliran data Anda. Ini kemudian menghasilkan saham paling populer yang dibeli dan dijual setiap menit. Aplikasi ini dibangun di atas Kinesis Client Library (KCL), yang melakukan banyak pekerjaan berat yang umum untuk aplikasi konsumen. Untuk informasi selengkapnya, lihat Gunakan Perpustakaan Klien Kinesis.

Lihat kode sumber dan tinjau informasi berikut.

StockTradesProcessor kelas

Kelas utama konsumen, disediakan untuk Anda, yang melakukan tugas-tugas berikut:

  • Membaca aplikasi, aliran data, dan nama Wilayah, diteruskan sebagai argumen.

  • Membuat KinesisAsyncClient instance dengan nama Region.

  • Menciptakan sebuah StockTradeRecordProcessorFactory instance yang melayani instance dariShardRecordProcessor, diimplementasikan oleh sebuah StockTradeRecordProcessor instance.

  • Menciptakan sebuah ConfigsBuilder instance denganKinesisAsyncClient,StreamName,ApplicationName, dan StockTradeRecordProcessorFactory instance. Ini berguna untuk membuat semua konfigurasi dengan nilai default.

  • Membuat KCL penjadwal (sebelumnya, dalam KCL versi 1.x itu dikenal sebagai KCL pekerja) dengan instance. ConfigsBuilder

  • Scheduler membuat thread baru untuk setiap shard (ditugaskan ke instance konsumen ini), yang terus-menerus melakukan loop untuk membaca catatan dari aliran data. Kemudian memanggil StockTradeRecordProcessor instance untuk memproses setiap batch catatan yang diterima.

StockTradeRecordProcessor kelas

Implementasi StockTradeRecordProcessor instance, yang pada gilirannya mengimplementasikan lima metode yang diperlukan:initialize,processRecords,leaseLost,shardEnded, danshutdownRequested.

shutdownRequestedMetode initialize dan digunakan oleh KCL untuk memberi tahu prosesor rekaman kapan harus siap untuk mulai menerima catatan dan kapan harus berhenti menerima catatan, masing-masing, sehingga dapat melakukan tugas pengaturan dan penghentian khusus aplikasi apa pun. leaseLostdan shardEnded digunakan untuk menerapkan logika apa pun untuk apa yang harus dilakukan ketika sewa hilang atau pemrosesan telah mencapai akhir pecahan. Dalam contoh ini, kami cukup mencatat pesan yang menunjukkan peristiwa ini.

Kode untuk metode ini disediakan untuk Anda. Pemrosesan utama terjadi dalam processRecords metode, yang pada gilirannya digunakan processRecord untuk setiap catatan. Metode terakhir ini disediakan sebagai kode kerangka yang sebagian besar kosong untuk Anda terapkan pada langkah berikutnya, di mana dijelaskan secara lebih rinci.

Yang juga perlu diperhatikan adalah penerapan metode dukungan untukprocessRecord:reportStats, danresetStats, yang kosong dalam kode sumber asli.

processRecordsMetode ini diterapkan untuk Anda, dan melakukan langkah-langkah berikut:

  • Untuk setiap catatan yang dilewatkan, ia processRecord memanggilnya.

  • Jika setidaknya 1 menit telah berlalu sejak laporan terakhir, panggilanreportStats(), yang mencetak statistik terbaru, dan kemudian resetStats() yang menghapus statistik sehingga interval berikutnya hanya mencakup catatan baru.

  • Menetapkan waktu pelaporan berikutnya.

  • Jika setidaknya 1 menit telah berlalu sejak pos pemeriksaan terakhir, hubungi. checkpoint()

  • Menetapkan waktu checkpointing berikutnya.

Metode ini menggunakan interval 60 detik untuk tingkat pelaporan dan pos pemeriksaan. Untuk informasi selengkapnya tentang checkpointing, lihat Menggunakan Perpustakaan Klien Kinesis.

StockStats kelas

Kelas ini menyediakan retensi data dan pelacakan statistik untuk saham paling populer dari waktu ke waktu. Kode ini disediakan untuk Anda dan berisi metode berikut:

  • addStockTrade(StockTrade): menyuntikkan yang diberikan StockTrade ke dalam statistik yang sedang berjalan.

  • toString(): mengembalikan statistik dalam string diformat.

Kelas ini melacak saham paling populer dengan menjaga hitungan berjalan dari jumlah total perdagangan untuk setiap saham dan jumlah maksimum. Ini memperbarui jumlah ini setiap kali perdagangan saham tiba.

Tambahkan kode ke metode StockTradeRecordProcessor kelas, seperti yang ditunjukkan pada langkah-langkah berikut.

Untuk mengimplementasikan konsumen
  1. Terapkan processRecord metode dengan membuat instance StockTrade objek berukuran benar dan menambahkan data catatan ke dalamnya, mencatat peringatan jika ada masalah.

    byte[] arr = new byte[record.data().remaining()]; record.data().get(arr); StockTrade trade = StockTrade.fromJsonAsBytes(arr); if (trade == null) { log.warn("Skipping record. Unable to parse record into StockTrade. Partition Key: " + record.partitionKey()); return; } stockStats.addStockTrade(trade);
  2. Menerapkan reportStats metode. Ubah format output agar sesuai dengan preferensi Anda.

    System.out.println("****** Shard " + kinesisShardId + " stats for last 1 minute ******\n" + stockStats + "\n" + "****************************************************************\n");
  3. Menerapkan resetStats metode, yang menciptakan stockStats instance baru.

    stockStats = new StockStats();
  4. Menerapkan metode berikut yang diperlukan oleh ShardRecordProcessor antarmuka:

    @Override public void leaseLost(LeaseLostInput leaseLostInput) { log.info("Lost lease, so terminating."); } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { log.info("Reached shard end checkpointing."); shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { log.error("Exception while checkpointing at shard end. Giving up.", e); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { log.info("Scheduler is shutting down, checkpointing."); checkpoint(shutdownRequestedInput.checkpointer()); } private void checkpoint(RecordProcessorCheckpointer checkpointer) { log.info("Checkpointing shard " + kinesisShardId); try { checkpointer.checkpoint(); } catch (ShutdownException se) { // Ignore checkpoint if the processor instance has been shutdown (fail over). log.info("Caught shutdown exception, skipping checkpoint.", se); } catch (ThrottlingException e) { // Skip checkpoint when throttled. In practice, consider a backoff and retry policy. log.error("Caught throttling exception, skipping checkpoint.", e); } catch (InvalidStateException e) { // This indicates an issue with the DynamoDB table (check for table, provisioned IOPS). log.error("Cannot save checkpoint to the DynamoDB table used by the Amazon Kinesis Client Library.", e); } }
Untuk menjalankan konsumen
  1. Jalankan produser yang Anda tulis Langkah 4: Menerapkan produsen untuk menyuntikkan catatan perdagangan saham simulasi ke aliran Anda.

  2. Verifikasi bahwa kunci akses dan key pair rahasia yang diambil sebelumnya (saat membuat IAM pengguna) disimpan dalam file~/.aws/credentials.

  3. Jalankan StockTradesProcessor kelas dengan argumen berikut:

    StockTradesProcessor StockTradeStream us-west-2

    Perhatikan bahwa jika Anda membuat streaming di Wilayah selainus-west-2, Anda harus menentukan Wilayah tersebut di sini.

Setelah satu menit, Anda akan melihat output seperti berikut, disegarkan setiap menit setelahnya:

****** Shard shardId-000000000001 stats for last 1 minute ****** Most popular stock being bought: WMT, 27 buys. Most popular stock being sold: PTR, 14 sells. ****************************************************************

Langkah selanjutnya

Langkah 6: (Opsional) Memperluas konsumen