SQS 搭配 使用 Amazon 的自動請求批次處理 AWS SDK for Java 2.x - AWS SDK for Java 2.x

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

SQS 搭配 使用 Amazon 的自動請求批次處理 AWS SDK for Java 2.x

Amazon API的 Automatic Request Batching SQS 是高階程式庫,可提供高效的方式批次處理和緩衝SQS操作的請求。透過使用批次處理 API,您可以減少對 的請求數量SQS,從而提高輸送量並將成本降至最低。

因為批次API方法符合SqsAsyncClient方法 —sendMessagechangeMessageVisibilitydeleteMessagereceiveMessage— 您可以使用批次API做為插入式取代,且變更最少。

本主題概述如何設定和使用 Amazon API的自動請求批次SQS。

檢查先決條件

您需要使用SDK適用於 Java 2.x 的 2.28.0 版或更新版本,才能存取批次處理 。 API您的 Maven pom.xml 應至少包含下列元素。

<dependencyManagement> <dependencies> <dependency> <groupId>software.amazon.awssdk</groupId> <artifactId>bom</artifactId> <version>2.28.231</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>software.amazon.awssdk</groupId> <artifactId>sqs</artifactId> </dependency> </dependencies>

1 最新版本

建立批次管理工具

自動請求批次API由 SqsAsyncBatchManager 介面實作。您可以用幾種方式建立管理員的執行個體。

使用 的預設組態 SqsAsyncClient

建立批次管理程式最簡單的方法是在現有SqsAsyncClient執行個體上呼叫batchManager原廠方法。簡單方法如下列程式碼片段所示。

SqsAsyncClient asyncClient = SqsAsyncClient.create(); SqsAsyncBatchManager sqsAsyncBatchManager = asyncClient.batchManager();

當您使用此方法時,SqsAsyncBatchManager執行個體會使用 覆寫 的組態設定 SqsAsyncBatchManager區段中資料表中顯示的預設值。此外,SqsAsyncBatchManager執行個體會使用其建立來源ExecutorServiceSqsAsyncClient執行個體的 。

使用 自訂組態 SqsAsyncBatchManager.Builder

對於更進階的使用案例,您可以使用 自訂批次管理工具SqsAsyncBatchManager.Builder。透過使用此方法來建立SqsAsyncBatchManager執行個體,您可以微調批次處理行為。下列程式碼片段示範如何使用建置器來自訂批次處理行為。

SqsAsyncBatchManager batchManager = SqsAsyncBatchManager.builder() .client(SqsAsyncClient.create()) .scheduledExecutor(Executors.newScheduledThreadPool(5)) .overrideConfiguration(b -> b .receiveMessageMinWaitDuration(Duration.ofSeconds(10)) .receiveMessageVisibilityTimeout(Duration.ofSeconds(1)) .receiveMessageAttributeNames(Collections.singletonList("*")) .receiveMessageSystemAttributeNames(Collections.singletonList(MessageSystemAttributeName.ALL))) .build();

使用此方法時,您可以調整 覆寫 的組態設定 SqsAsyncBatchManager區段中資料表中顯示的BatchOverrideConfiguration物件設定。您也可以使用此方法ScheduledExecutorService為批次管理員提供自訂。

傳送訊息

若要使用批次管理工具傳送訊息,請使用 SqsAsyncBatchManager#sendMessage方法。SDK 緩衝區會請求,並在達到 maxBatchSizesendRequestFrequency值時以批次傳送它們。

下列範例顯示緊接另一個sendMessage請求的請求。在此情況下, 會以單一批次SDK傳送兩個訊息。

// Sending the first message CompletableFuture<SendMessageResponse> futureOne = sqsAsyncBatchManager.sendMessage(r -> r.messageBody("One").queueUrl("queue")); // Sending the second message CompletableFuture<SendMessageResponse> futureTwo = sqsAsyncBatchManager.sendMessage(r -> r.messageBody("Two").queueUrl("queue")); // Waiting for both futures to complete and retrieving the responses SendMessageResponse messageOne = futureOne.join(); SendMessageResponse messageTwo = futureTwo.join();

變更訊息可見性逾時

您可以使用 SqsAsyncBatchManager#changeMessageVisibility方法變更批次中訊息的可見性逾時。SDK 緩衝區會在達到 maxBatchSizesendRequestFrequency值時,請求並以批次方式傳送。

下列範例示範如何呼叫 changeMessageVisibility方法。

CompletableFuture<ChangeMessageVisibilityResponse> futureOne = sqsAsyncBatchManager.changeMessageVisibility(r -> r.receiptHandle("receiptHandle") .queueUrl("queue")); ChangeMessageVisibilityResponse response = futureOne.join();

刪除訊息

您可以使用 SqsAsyncBatchManager#deleteMessage方法刪除批次中的訊息。SDK 緩衝區會在達到 maxBatchSizesendRequestFrequency值時,請求並以批次方式傳送。

下列範例示範如何呼叫 deleteMessage方法。

CompletableFuture<DeleteMessageResponse> futureOne = sqsAsyncBatchManager.deleteMessage(r -> r.receiptHandle("receiptHandle") .queueUrl("queue")); DeleteMessageResponse response = futureOne.join();

接收訊息

使用預設設定

當您輪詢應用程式中SqsAsyncBatchManager#receiveMessage的方法時,批次管理工具會從其內部緩衝區擷取訊息,而 會在背景中SDK自動更新。

下列範例示範如何呼叫 receiveMessage方法。

CompletableFuture<ReceiveMessageResponse> responseFuture = sqsAsyncBatchManager.receiveMessage(r -> r.queueUrl("queueUrl"));

使用自訂設定

如果您想要進一步自訂請求,例如設定自訂等待時間並指定要擷取的訊息數量,您可以自訂請求,如下列範例所示。

CompletableFuture<ReceiveMessageResponse> response = sqsAsyncBatchManager.receiveMessage(r -> r.queueUrl("queueUrl") .waitTimeSeconds(5) .visibilityTimeout(20));
注意

如果您receiveMessage使用ReceiveMessageRequest包含下列任何參數的 呼叫 , 會SDK略過批次管理員並傳送一般非同步receiveMessage請求:

  • messageAttributeNames

  • messageSystemAttributeNames

  • messageSystemAttributeNamesWithStrings

  • overrideConfiguration

覆寫 的組態設定 SqsAsyncBatchManager

您可以在建立SqsAsyncBatchManager執行個體時調整下列設定。下列設定清單可在 上取得BatchOverrideConfiguration.Builder

設定 描述 預設值
maxBatchSize 每個 SendMessageBatchRequestChangeMessageVisibilityBatchRequest或 每個批次的請求數量上限DeleteMessageBatchRequest。最大值為 10。 10
sendRequestFrequency

傳送批次之前的時間,除非提早maxBatchSize到達。較高的值可能會減少請求,但會增加延遲。

200 毫秒
receiveMessageVisibilityTimeout 訊息的可見性逾時。如果取消設定,則會使用佇列的預設值。 佇列的預設
receiveMessageMinWaitDuration receiveMessage 請求的最短等待時間。避免將 設定為 0 以防止CPU浪費。 50ms
receiveMessageSystemAttributeNames 要請求receiveMessage呼叫的系統屬性名稱清單。
receiveMessageAttributeNames 要請求receiveMessage呼叫的屬性名稱清單。