本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
SQS 搭配 使用 Amazon 的自動請求批次處理 AWS SDK for Java 2.x
Amazon API的 Automatic Request Batching SQS 是高階程式庫,可提供高效的方式批次處理和緩衝SQS操作的請求。透過使用批次處理 API,您可以減少對 的請求數量SQS,從而提高輸送量並將成本降至最低。
因為批次API方法符合SqsAsyncClient
方法 —sendMessage
、changeMessageVisibility
deleteMessage
、 receiveMessage
— 您可以使用批次API做為插入式取代,且變更最少。
本主題概述如何設定和使用 Amazon API的自動請求批次SQS。
檢查先決條件
您需要使用SDK適用於 Java 2.x 的 2.28.0 版或更新版本,才能存取批次處理 。 API您的 Maven pom.xml
應至少包含下列元素。
<dependencyManagement>
<dependencies>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>bom</artifactId>
<version>2.28.231
</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>sqs</artifactId>
</dependency>
</dependencies>
1 最新版本
建立批次管理工具
自動請求批次API由 SqsAsyncBatchManager
使用 的預設組態 SqsAsyncClient
建立批次管理程式最簡單的方法是在現有SqsAsyncClientbatchManager
原廠方法。簡單方法如下列程式碼片段所示。
SqsAsyncClient asyncClient = SqsAsyncClient.create(); SqsAsyncBatchManager sqsAsyncBatchManager = asyncClient.batchManager();
當您使用此方法時,SqsAsyncBatchManager
執行個體會使用 覆寫 的組態設定 SqsAsyncBatchManager區段中資料表中顯示的預設值。此外,SqsAsyncBatchManager
執行個體會使用其建立來源ExecutorService
SqsAsyncClient
執行個體的 。
使用 自訂組態 SqsAsyncBatchManager.Builder
對於更進階的使用案例,您可以使用 自訂批次管理工具SqsAsyncBatchManager.Builder
SqsAsyncBatchManager
執行個體,您可以微調批次處理行為。下列程式碼片段示範如何使用建置器來自訂批次處理行為。
SqsAsyncBatchManager batchManager = SqsAsyncBatchManager.builder() .client(SqsAsyncClient.create()) .scheduledExecutor(Executors.newScheduledThreadPool(5)) .overrideConfiguration(b -> b .receiveMessageMinWaitDuration(Duration.ofSeconds(10)) .receiveMessageVisibilityTimeout(Duration.ofSeconds(1)) .receiveMessageAttributeNames(Collections.singletonList("*")) .receiveMessageSystemAttributeNames(Collections.singletonList(MessageSystemAttributeName.ALL))) .build();
使用此方法時,您可以調整 覆寫 的組態設定 SqsAsyncBatchManager區段中資料表中顯示的BatchOverrideConfiguration
物件設定。您也可以使用此方法ScheduledExecutorService
傳送訊息
若要使用批次管理工具傳送訊息,請使用 SqsAsyncBatchManager#sendMessage
方法。SDK 緩衝區會請求,並在達到 maxBatchSize
或 sendRequestFrequency
值時以批次傳送它們。
下列範例顯示緊接另一個sendMessage
請求的請求。在此情況下, 會以單一批次SDK傳送兩個訊息。
// Sending the first message CompletableFuture<SendMessageResponse> futureOne = sqsAsyncBatchManager.sendMessage(r -> r.messageBody("One").queueUrl("queue")); // Sending the second message CompletableFuture<SendMessageResponse> futureTwo = sqsAsyncBatchManager.sendMessage(r -> r.messageBody("Two").queueUrl("queue")); // Waiting for both futures to complete and retrieving the responses SendMessageResponse messageOne = futureOne.join(); SendMessageResponse messageTwo = futureTwo.join();
變更訊息可見性逾時
您可以使用 SqsAsyncBatchManager#changeMessageVisibility
maxBatchSize
或 sendRequestFrequency
值時,請求並以批次方式傳送。
下列範例示範如何呼叫 changeMessageVisibility
方法。
CompletableFuture<ChangeMessageVisibilityResponse> futureOne = sqsAsyncBatchManager.changeMessageVisibility(r -> r.receiptHandle("receiptHandle") .queueUrl("queue")); ChangeMessageVisibilityResponse response = futureOne.join();
刪除訊息
您可以使用 SqsAsyncBatchManager#deleteMessage
maxBatchSize
或 sendRequestFrequency
值時,請求並以批次方式傳送。
下列範例示範如何呼叫 deleteMessage
方法。
CompletableFuture<DeleteMessageResponse> futureOne = sqsAsyncBatchManager.deleteMessage(r -> r.receiptHandle("receiptHandle") .queueUrl("queue")); DeleteMessageResponse response = futureOne.join();
接收訊息
使用預設設定
當您輪詢應用程式中SqsAsyncBatchManager#receiveMessage
的方法時,批次管理工具會從其內部緩衝區擷取訊息,而 會在背景中SDK自動更新。
下列範例示範如何呼叫 receiveMessage
方法。
CompletableFuture<ReceiveMessageResponse> responseFuture = sqsAsyncBatchManager.receiveMessage(r -> r.queueUrl("queueUrl"));
使用自訂設定
如果您想要進一步自訂請求,例如設定自訂等待時間並指定要擷取的訊息數量,您可以自訂請求,如下列範例所示。
CompletableFuture<ReceiveMessageResponse> response = sqsAsyncBatchManager.receiveMessage(r -> r.queueUrl("queueUrl") .waitTimeSeconds(5) .visibilityTimeout(20));
注意
如果您receiveMessage
使用ReceiveMessageRequest
包含下列任何參數的 呼叫 , 會SDK略過批次管理員並傳送一般非同步receiveMessage
請求:
-
messageAttributeNames
-
messageSystemAttributeNames
-
messageSystemAttributeNamesWithStrings
-
overrideConfiguration
覆寫 的組態設定 SqsAsyncBatchManager
您可以在建立SqsAsyncBatchManager
執行個體時調整下列設定。下列設定清單可在 上取得BatchOverrideConfiguration.Builder
設定 | 描述 | 預設值 |
---|---|---|
maxBatchSize |
每個 SendMessageBatchRequest 、 ChangeMessageVisibilityBatchRequest 或 每個批次的請求數量上限DeleteMessageBatchRequest 。最大值為 10。 |
10 |
sendRequestFrequency |
傳送批次之前的時間,除非提早 |
200 毫秒 |
receiveMessageVisibilityTimeout |
訊息的可見性逾時。如果取消設定,則會使用佇列的預設值。 | 佇列的預設 |
receiveMessageMinWaitDuration |
receiveMessage 請求的最短等待時間。避免將 設定為 0 以防止CPU浪費。 |
50ms |
receiveMessageSystemAttributeNames |
要請求receiveMessage 呼叫的系統屬性名稱 |
無 |
receiveMessageAttributeNames |
要請求receiveMessage 呼叫的屬性名稱清單。 |
無 |