實施消費者 - Amazon Kinesis Data Streams

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

實施消費者

本教學課程中的消費者應用程式會持續處理您的資料串流中的股票交易。隨後,其將輸出每分鐘買進和賣出最多的熱門股票。該應用程式建置在 Kinesis 用戶端程式庫 (KCL) 之上,該程式庫可執行消費者應用程式常見的許多繁重工作。如需詳細資訊,請參閱使用 Kinesis 用戶端程式庫

請查看原始碼並對照檢閱以下資訊。

StockTradesProcessor 類

為您提供的消費者的主要類別,它會執行下列工作:

  • 讀取作為引數傳入的應用程式、資料串流和區域名稱。

  • 使用「區域」名稱建立KinesisAsyncClient執行個體。

  • 建立 StockTradeRecordProcessorFactory 執行個體以提供由 ShardRecordProcessor 執行個體實作的 StockTradeRecordProcessor 執行個體。

  • 使用KinesisAsyncClientStreamName、和ConfigsBuilder執行個體建立執行個StockTradeRecordProcessorFactory體。ApplicationName這對於使用預設值建立所有組態非常有用。

  • 使用ConfigsBuilder執行個體建立KCL排程器 (先前,在KCL版本 1.x 中稱為 KCL Worker)。

  • 排程器會為每個碎片 (已指派給此消費者執行個體) 建立新的執行緒,以持續循環從資料串流讀取記錄。接著,其將叫用 StockTradeRecordProcessor 執行個體以處理收到的各個批次記錄。

StockTradeRecordProcessor 類

StockTradeRecordProcessor 執行個體的實作,而此執行個體將實作五個必要的方法:initializeprocessRecordsleaseLostshardEndedshutdownRequested

initialize和方shutdownRequested法可用KCL來讓記錄處理器知道何時應該準備好開始接收記錄,以及何時應該分別停止接收記錄,以便它可以執行任何應用程式特定的安裝和終止工作。 leaseLostshardEnded且用於在租用遺失或處理到達碎片結尾時執行的任何邏輯。在此範例中,我們只記錄指出這些事件的訊息。

我們會提供這些方法的程式碼。主要處理任務在 processRecords 方法中進行,而此方法將使用 processRecord 處理每筆記錄。後一種方法以幾乎全空的架構程式碼提供,讓您於下一個步驟進行實作,屆時將會有更詳細的說明。

另請注意 processRecord 支援方法的實作:reportStatsresetStats,其最初的原始碼為全空。

程式碼已為您實作 processRecords 方法,並將執行以下步驟:

  • 對每一筆傳入的記錄,它會呼叫其上的 processRecord

  • 若自從上次報告後已歷時至少 1 分鐘,請先呼叫 reportStats() 列印出最新統計資料,接著呼叫 resetStats() 清除統計資料以使下一個間隔僅包含新記錄。

  • 設定下一次報告時間。

  • 若自從最後一個檢查點過後已歷時至少 1 分鐘,請呼叫 checkpoint()

  • 設定下一次檢查點作業時間。

此方法使用 60 秒的間隔做為報告及檢查點作業率。如需有關檢查點的詳細資訊,請參閱使用 Kinesis Client Library

StockStats 類

此類別針對一段時間內最熱門的股票提供資料保留與統計資料追蹤。其程式碼已為您提供且包含下列方法:

  • addStockTrade(StockTrade):將給定的 StockTrade 注入目前統計資料。

  • toString():以格式化字串的形式傳回統計資料。

該類別通過保持每個股票的交易總數和最大數量的運行計數來跟踪最受歡迎的股票。每當股票交易達成時,其將更新這些計數。

StockTradeRecordProcessor 類別的各個方法加入程式碼,如以下步驟所示。

實作消費者
  1. 實作 processRecord 方法,藉此執行個體化正確大小的 StockTrade 物件並將記錄資料加入該物件,且於發生問題時記錄警告。

    byte[] arr = new byte[record.data().remaining()]; record.data().get(arr); StockTrade trade = StockTrade.fromJsonAsBytes(arr); if (trade == null) { log.warn("Skipping record. Unable to parse record into StockTrade. Partition Key: " + record.partitionKey()); return; } stockStats.addStockTrade(trade);
  2. 實現一個reportStats方法。修改輸出格式以適合您的喜好。

    System.out.println("****** Shard " + kinesisShardId + " stats for last 1 minute ******\n" + stockStats + "\n" + "****************************************************************\n");
  3. 實作 resetStats 方法以便建立新的 stockStats 執行個體。

    stockStats = new StockStats();
  4. 實現接ShardRecordProcessor口所需的以下方法:

    @Override public void leaseLost(LeaseLostInput leaseLostInput) { log.info("Lost lease, so terminating."); } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { log.info("Reached shard end checkpointing."); shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { log.error("Exception while checkpointing at shard end. Giving up.", e); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { log.info("Scheduler is shutting down, checkpointing."); checkpoint(shutdownRequestedInput.checkpointer()); } private void checkpoint(RecordProcessorCheckpointer checkpointer) { log.info("Checkpointing shard " + kinesisShardId); try { checkpointer.checkpoint(); } catch (ShutdownException se) { // Ignore checkpoint if the processor instance has been shutdown (fail over). log.info("Caught shutdown exception, skipping checkpoint.", se); } catch (ThrottlingException e) { // Skip checkpoint when throttled. In practice, consider a backoff and retry policy. log.error("Caught throttling exception, skipping checkpoint.", e); } catch (InvalidStateException e) { // This indicates an issue with the DynamoDB table (check for table, provisioned IOPS). log.error("Cannot save checkpoint to the DynamoDB table used by the Amazon Kinesis Client Library.", e); } }
執行消費者
  1. 執行您在實施生產者時撰寫的生產者,將模擬的股票交易記錄注入您的串流。

  2. 確認先前擷取的存取金鑰和秘密 key pair (建立IAM使用者時) 已儲存在檔案中~/.aws/credentials

  3. 使用以下引數執行 StockTradesProcessor 類別:

    StockTradesProcessor StockTradeStream us-west-2

    請注意,如果您是在 us-west-2 以外的區域建立串流,則此處必須改為指定該區域。

一分鐘後,您應會看到類似以下內容的輸出,而且此後每分鐘將重新整理一次輸出:

****** Shard shardId-000000000001 stats for last 1 minute ****** Most popular stock being bought: WMT, 27 buys. Most popular stock being sold: PTR, 14 sells. ****************************************************************

後續步驟

(可選)擴展消費者