Langkah 4: Menerapkan Produser - Amazon Kinesis Data Streams

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Langkah 4: Menerapkan Produser

Aplikasi dalam Tutorial: Memproses Data Stock Real-Time Menggunakan KPL dan KCL 1.x menggunakan skenario dunia nyata dari pemantauan perdagangan pasar saham. Prinsip-prinsip berikut menjelaskan secara singkat bagaimana skenario ini memetakan ke produsen dan struktur kode pendukung.

Lihat kode sumber dan tinjau informasi berikut.

StockTrade kelas

Perdagangan saham individu diwakili oleh instance StockTrade kelas. Instance ini berisi atribut seperti simbol ticker, harga, jumlah saham, jenis perdagangan (beli atau jual), dan ID yang secara unik mengidentifikasi perdagangan. Kelas ini diimplementasikan untuk Anda.

Rekaman aliran

Aliran adalah urutan catatan. Rekaman adalah serialisasi StockTrade instance dalam format JSON. Misalnya:

{ "tickerSymbol": "AMZN", "tradeType": "BUY", "price": 395.87, "quantity": 16, "id": 3567129045 }
StockTradeGenerator kelas

StockTradeGeneratormemiliki metode getRandomTrade() yang disebut yang mengembalikan perdagangan saham baru yang dihasilkan secara acak setiap kali dipanggil. Kelas ini diimplementasikan untuk Anda.

StockTradesWriter kelas

mainMetode produser, StockTradesWriter terus mengambil perdagangan acak dan kemudian mengirimkannya ke Kinesis Data Streams dengan melakukan tugas-tugas berikut:

  1. Membaca nama aliran dan nama Region sebagai masukan.

  2. Menciptakan sebuahAmazonKinesisClientBuilder.

  3. Menggunakan pembuat klien untuk mengatur Region, kredenial, dan konfigurasi klien.

  4. Membangun AmazonKinesis klien menggunakan pembangun klien.

  5. Memeriksa bahwa aliran ada dan aktif (jika tidak, itu keluar dengan kesalahan).

  6. Dalam loop kontinu, memanggil StockTradeGenerator.getRandomTrade() metode dan kemudian sendStockTrade metode untuk mengirim perdagangan ke aliran setiap 100 milidetik.

sendStockTradeMetode StockTradesWriter kelas memiliki kode berikut:

private static void sendStockTrade(StockTrade trade, AmazonKinesis kinesisClient, String streamName) { byte[] bytes = trade.toJsonAsBytes(); // The bytes could be null if there is an issue with the JSON serialization by the Jackson JSON library. if (bytes == null) { LOG.warn("Could not get JSON bytes for stock trade"); return; } LOG.info("Putting trade: " + trade.toString()); PutRecordRequest putRecord = new PutRecordRequest(); putRecord.setStreamName(streamName); // We use the ticker symbol as the partition key, explained in the Supplemental Information section below. putRecord.setPartitionKey(trade.getTickerSymbol()); putRecord.setData(ByteBuffer.wrap(bytes)); try { kinesisClient.putRecord(putRecord); } catch (AmazonClientException ex) { LOG.warn("Error sending record to Amazon Kinesis.", ex); } }

Lihat rincian kode berikut:

  • PutRecordAPI mengharapkan array byte, dan Anda perlu mengkonversi trade ke format JSON. Baris kode tunggal ini melakukan operasi itu:

    byte[] bytes = trade.toJsonAsBytes();
  • Sebelum Anda dapat mengirim perdagangan, Anda membuat PutRecordRequest instance baru (disebut putRecord dalam kasus ini):

    PutRecordRequest putRecord = new PutRecordRequest();

    Setiap PutRecord panggilan membutuhkan nama stream, kunci partisi, dan gumpalan data. Kode berikut mengisi bidang ini dalam putRecord objek menggunakan setXxxx() metodenya:

    putRecord.setStreamName(streamName); putRecord.setPartitionKey(trade.getTickerSymbol()); putRecord.setData(ByteBuffer.wrap(bytes));

    Contoh menggunakan tiket saham sebagai kunci partisi, yang memetakan catatan ke pecahan tertentu. Dalam praktiknya, Anda harus memiliki ratusan atau ribuan kunci partisi per pecahan sehingga catatan tersebar merata di seluruh aliran Anda. Untuk informasi selengkapnya tentang cara menambahkan data ke stream, lihatMenambahkan Data ke Stream.

    Sekarang putRecord siap untuk mengirim ke klien (putoperasi):

    kinesisClient.putRecord(putRecord);
  • Pemeriksaan kesalahan dan pencatatan selalu merupakan tambahan yang berguna. Kode ini log kondisi kesalahan:

    if (bytes == null) { LOG.warn("Could not get JSON bytes for stock trade"); return; }

    Tambahkan blok try/catch di sekitar operasi: put

    try { kinesisClient.putRecord(putRecord); } catch (AmazonClientException ex) { LOG.warn("Error sending record to Amazon Kinesis.", ex); }

    Ini karena put operasi Kinesis Data Streams dapat gagal karena kesalahan jaringan, atau karena aliran mencapai batas throughputnya dan mendapatkan throttled. Kami merekomendasikan dengan cermat mempertimbangkan kebijakan coba ulang Anda untuk put operasi untuk menghindari kehilangan data, seperti menggunakan percobaan ulang sederhana.

  • Pencatatan status sangat membantu tetapi opsional:

    LOG.info("Putting trade: " + trade.toString());

Produser yang ditampilkan di sini menggunakan fungsionalitas rekaman tunggal API Kinesis Data Streams,. PutRecord Dalam prakteknya, jika produsen individu menghasilkan banyak catatan, seringkali lebih efisien untuk menggunakan beberapa fungsi catatan PutRecords dan mengirim batch catatan pada satu waktu. Untuk informasi selengkapnya, lihat Menambahkan Data ke Stream.

Untuk menjalankan produser
  1. Verifikasi bahwa kunci akses dan pasangan kunci rahasia yang diambil sebelumnya (saat membuat pengguna IAM) disimpan dalam file. ~/.aws/credentials

  2. Jalankan StockTradeWriter kelas dengan argumen berikut:

    StockTradeStream us-west-2

    Jika Anda membuat streaming di Wilayah selainus-west-2, Anda harus menentukan Wilayah tersebut di sini.

Anda akan melihat output yang serupa dengan yang berikut:

Feb 16, 2015 3:53:00 PM com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade INFO: Putting trade: ID 8: SELL 996 shares of BUD for $124.18 Feb 16, 2015 3:53:00 PM com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade INFO: Putting trade: ID 9: BUY 159 shares of GE for $20.85 Feb 16, 2015 3:53:01 PM com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade INFO: Putting trade: ID 10: BUY 322 shares of WMT for $90.08

Aliran perdagangan saham Anda sekarang sedang dicerna oleh Kinesis Data Streams.

Langkah Selanjutnya

Langkah 5: Menerapkan Konsumen