Langkah 4: Menerapkan produsen - Amazon Kinesis Data Streams

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Langkah 4: Menerapkan produsen

Tutorial ini menggunakan skenario dunia nyata pemantauan perdagangan pasar saham. Prinsip-prinsip berikut menjelaskan secara singkat bagaimana skenario ini memetakan ke produsen dan struktur kode pendukungnya.

Lihat kode sumber dan tinjau informasi berikut.

StockTrade kelas

Perdagangan saham individu diwakili oleh contoh StockTrade kelas. Contoh ini berisi atribut seperti simbol ticker, harga, jumlah saham, jenis perdagangan (beli atau jual), dan ID yang secara unik mengidentifikasi perdagangan. Kelas ini diterapkan untuk Anda.

Rekam aliran

Aliran adalah urutan catatan. Rekaman adalah serialisasi StockTrade instance dalam JSON format. Sebagai contoh:

{ "tickerSymbol": "AMZN", "tradeType": "BUY", "price": 395.87, "quantity": 16, "id": 3567129045 }
StockTradeGenerator kelas

StockTradeGenerator memiliki metode getRandomTrade() yang disebut yang mengembalikan perdagangan saham baru yang dihasilkan secara acak setiap kali dipanggil. Kelas ini diterapkan untuk Anda.

StockTradesWriter kelas

mainMetode produsen, StockTradesWriter terus mengambil perdagangan acak dan kemudian mengirimkannya ke Kinesis Data Streams dengan melakukan tugas-tugas berikut:

  1. Membaca nama aliran data dan nama Wilayah sebagai masukan.

  2. Menggunakan KinesisAsyncClientBuilder untuk mengatur Region, kredensial, dan konfigurasi klien.

  3. Memeriksa apakah aliran ada dan aktif (jika tidak, ia keluar dengan kesalahan).

  4. Dalam loop kontinu, panggil StockTradeGenerator.getRandomTrade() metode dan kemudian sendStockTrade metode untuk mengirim perdagangan ke aliran setiap 100 milidetik.

sendStockTradeMetode StockTradesWriter kelas memiliki kode berikut:

private static void sendStockTrade(StockTrade trade, KinesisAsyncClient kinesisClient, String streamName) { byte[] bytes = trade.toJsonAsBytes(); // The bytes could be null if there is an issue with the JSON serialization by the Jackson JSON library. if (bytes == null) { LOG.warn("Could not get JSON bytes for stock trade"); return; } LOG.info("Putting trade: " + trade.toString()); PutRecordRequest request = PutRecordRequest.builder() .partitionKey(trade.getTickerSymbol()) // We use the ticker symbol as the partition key, explained in the Supplemental Information section below. .streamName(streamName) .data(SdkBytes.fromByteArray(bytes)) .build(); try { kinesisClient.putRecord(request).get(); } catch (InterruptedException e) { LOG.info("Interrupted, assuming shutdown."); } catch (ExecutionException e) { LOG.error("Exception while sending data to Kinesis. Will try again next cycle.", e); } }

Lihat rincian kode berikut:

  • PutRecordAPIMengharapkan array byte, dan Anda harus mengonversi perdagangan ke JSON format. Baris kode tunggal ini melakukan operasi itu:

    byte[] bytes = trade.toJsonAsBytes();
  • Sebelum Anda dapat mengirim perdagangan, Anda membuat PutRecordRequest instance baru (disebut permintaan dalam kasus ini). Masing-masing request membutuhkan nama aliran, kunci partisi, dan gumpalan data.

    PutPutRecordRequest request = PutRecordRequest.builder() .partitionKey(trade.getTickerSymbol()) // We use the ticker symbol as the partition key, explained in the Supplemental Information section below. .streamName(streamName) .data(SdkBytes.fromByteArray(bytes)) .build();

    Contoh menggunakan ticker saham sebagai kunci partisi, yang memetakan catatan ke pecahan tertentu. Dalam praktiknya, Anda harus memiliki ratusan atau ribuan kunci partisi per pecahan sehingga catatan tersebar merata di seluruh aliran Anda. Untuk informasi selengkapnya tentang cara menambahkan data ke aliran, lihatMenulis data ke Amazon Kinesis Data Streams.

    Sekarang request siap untuk mengirim ke klien (operasi put):

    kinesisClient.putRecord(request).get();
  • Pemeriksaan kesalahan dan pencatatan selalu merupakan tambahan yang berguna. Kode ini mencatat kondisi kesalahan:

    if (bytes == null) { LOG.warn("Could not get JSON bytes for stock trade"); return; }

    Tambahkan blok coba/tangkap di sekitar operasi: put

    try { kinesisClient.putRecord(request).get(); } catch (InterruptedException e) { LOG.info("Interrupted, assuming shutdown."); } catch (ExecutionException e) { LOG.error("Exception while sending data to Kinesis. Will try again next cycle.", e); }

    Ini karena operasi put Kinesis Data Streams dapat gagal karena kesalahan jaringan, atau karena aliran data mencapai batas throughputnya dan terhambat. Disarankan agar Anda mempertimbangkan dengan cermat kebijakan coba ulang Anda untuk put operasi untuk menghindari kehilangan data, seperti menggunakan coba lagi.

  • Pencatatan status sangat membantu tetapi opsional:

    LOG.info("Putting trade: " + trade.toString());

Produser yang ditampilkan di sini menggunakan fungsi rekaman tunggal Kinesis API Data Streams,. PutRecord Dalam prakteknya, jika produsen individu menghasilkan banyak catatan, seringkali lebih efisien untuk menggunakan fungsi beberapa catatan PutRecords dan mengirim batch catatan pada suatu waktu. Untuk informasi selengkapnya, lihat Menulis data ke Amazon Kinesis Data Streams.

Untuk menjalankan produser
  1. Verifikasi bahwa kunci akses dan key pair rahasia yang diambil Langkah 2: Buat IAM kebijakan dan pengguna disimpan dalam file~/.aws/credentials.

  2. Jalankan StockTradeWriter kelas dengan argumen berikut:

    StockTradeStream us-west-2

    Jika Anda membuat streaming di wilayah selainus-west-2, Anda harus menentukan wilayah tersebut di sini.

Anda akan melihat output yang serupa dengan yang berikut:

Feb 16, 2015 3:53:00 PM com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade INFO: Putting trade: ID 8: SELL 996 shares of BUD for $124.18 Feb 16, 2015 3:53:00 PM com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade INFO: Putting trade: ID 9: BUY 159 shares of GE for $20.85 Feb 16, 2015 3:53:01 PM com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade INFO: Putting trade: ID 10: BUY 322 shares of WMT for $90.08

Perdagangan saham Anda sekarang sedang dicerna oleh Kinesis Data Streams.

Langkah selanjutnya

Langkah 5: Menerapkan konsumen