步驟 4:實作生產者 - Amazon Kinesis Data Streams

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

步驟 4:實作生產者

本教學課程使用真實情境的股票市場交易監控。以下原則簡要說明此情境如何對應到生產者及其支援的程式碼結構。

請查看原始碼並對照檢閱以下資訊。

StockTrade 類別

單次股票交易是由 StockTrade 類別的執行個體表示。此執行個體包含若干屬性,如股票代號、價格、股份數、交易類型 (買進或賣出) 以及唯一識別該交易的 ID。程式碼已為您實作此類別。

串流記錄

串流是一連串的記錄。記錄則是 JSON 格式的序列化 StockTrade 執行個體。例如:

{ "tickerSymbol": "AMZN", "tradeType": "BUY", "price": 395.87, "quantity": 16, "id": 3567129045 }
StockTradeGenerator 類別

StockTradeGenerator 具有 getRandomTrade() 方法,每次叫用後將傳回隨機產生的新股票交易。程式碼已為您實作此類別。

StockTradesWriter 類別

生產者的 main 方法 StockTradesWriter 會持續擷取隨機交易,然後透過執行以下任務將該交易傳送至 Kinesis Data Streams:

  1. 讀取資料串流名稱和區域名稱做為輸入。

  2. 使用 KinesisAsyncClientBuilder 設定區域、登入資料和用戶端組態。

  3. 檢查串流是否存在且處於作用中狀態 (否則將結束此方法並顯示錯誤)。

  4. 在連續迴圈中依序呼叫 StockTradeGenerator.getRandomTrade() 方法和 sendStockTrade 方法,每隔 100 毫秒將交易傳送至串流。

sendStockTrade 類別的 StockTradesWriter 方法含有以下程式碼:

private static void sendStockTrade(StockTrade trade, KinesisAsyncClient kinesisClient, String streamName) { byte[] bytes = trade.toJsonAsBytes(); // The bytes could be null if there is an issue with the JSON serialization by the Jackson JSON library. if (bytes == null) { LOG.warn("Could not get JSON bytes for stock trade"); return; } LOG.info("Putting trade: " + trade.toString()); PutRecordRequest request = PutRecordRequest.builder() .partitionKey(trade.getTickerSymbol()) // We use the ticker symbol as the partition key, explained in the Supplemental Information section below. .streamName(streamName) .data(SdkBytes.fromByteArray(bytes)) .build(); try { kinesisClient.putRecord(request).get(); } catch (InterruptedException e) { LOG.info("Interrupted, assuming shutdown."); } catch (ExecutionException e) { LOG.error("Exception while sending data to Kinesis. Will try again next cycle.", e); } }

請參閱以下的程式碼詳解:

  • PutRecord API 需要一個位元組陣列,而且您必須將交易轉換為 JSON 格式。這一行程式碼將執行該項操作:

    byte[] bytes = trade.toJsonAsBytes();
  • 傳送交易之前,您必須先建立新的 PutRecordRequest 執行個體 (本例中稱為要求)。每個 request 都需要串流名稱、分割區索引鍵和資料 Blob。

    PutPutRecordRequest request = PutRecordRequest.builder() .partitionKey(trade.getTickerSymbol()) // We use the ticker symbol as the partition key, explained in the Supplemental Information section below. .streamName(streamName) .data(SdkBytes.fromByteArray(bytes)) .build();

    本範例使用股票代號做為分割區索引鍵,將記錄對應到特定碎片。實際上,每個碎片應該會有成千上百的分割區索引鍵,使記錄均勻地分佈於串流中。如需如何加入資料至串流的詳細資訊,請參閱將資料寫入 Amazon Kinesis Data Streams

    現在 request 已準備好傳送至用戶端 (put 操作):

    kinesisClient.putRecord(request).get();
  • 錯誤檢查和日誌記錄肯定是頗為實用的附加功能。此程式碼將記錄錯誤情況:

    if (bytes == null) { LOG.warn("Could not get JSON bytes for stock trade"); return; }

    put 操作的周圍加入 try/catch 區塊:

    try { kinesisClient.putRecord(request).get(); } catch (InterruptedException e) { LOG.info("Interrupted, assuming shutdown."); } catch (ExecutionException e) { LOG.error("Exception while sending data to Kinesis. Will try again next cycle.", e); }

    這麼做是因為 Kinesis Data Streams put 操作可能由於網路錯誤或是資料串流達到其傳輸量限制並受到調節而導致失敗。建議您仔細考量 put 操作的重試政策,例如令其單純重試以避免資料遺失。

  • 狀態記錄也很實用,但可有可無:

    LOG.info("Putting trade: " + trade.toString());

此處所示的生產者是使用 Kinesis Data Streams API 的單一記錄功能 PutRecord。實際上,如果個別的生產者將產生許多記錄,則使用 PutRecords 的多筆記錄功能並一次性傳送各個批次的記錄通常會更有效率。如需更多詳細資訊,請參閱 將資料寫入 Amazon Kinesis Data Streams

執行生產者
  1. 確認在步驟 2:建立 IAM 政策和使用者中擷取的存取金鑰和秘密金鑰對已儲存在檔案 ~/.aws/credentials 中。

  2. 使用以下引數執行 StockTradeWriter 類別:

    StockTradeStream us-west-2

    如果您是在 us-west-2 以外的區域建立串流,則此處必須改為指定該區域。

您應該會看到類似下列的輸出:

Feb 16, 2015 3:53:00 PM com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade INFO: Putting trade: ID 8: SELL 996 shares of BUD for $124.18 Feb 16, 2015 3:53:00 PM com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade INFO: Putting trade: ID 9: BUY 159 shares of GE for $20.85 Feb 16, 2015 3:53:01 PM com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade INFO: Putting trade: ID 10: BUY 322 shares of WMT for $90.08

您的股票交易現在正由 Kinesis Data Streams 擷取中。

後續步驟

步驟 5:實作消費者