Amazon SQS 临时队列 - Amazon Simple Queue Service

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

Amazon SQS 临时队列

使用 request-response 等常见消息模式时,临时队列可帮助您节省开发时间和部署成本。您可以使用临时队列客户端创建高吞吐量、经济实惠、由应用程序管理的临时队列。

客户端会自动将多个临时队列(按需为特定流程创建的应用程序管理队列)映射到单个 Amazon SQS 队列。当指向每个临时队列的流量较低时,这就使得您的应用程序发出较少的 API 调用,实现更高的吞吐量。当临时队列不再使用时,客户端自动清除临时队列,即使部分使用客户端的进程没有完全关闭也是如此。

以下是临时队列的优势:

  • 它们充当特定线程或进程的轻型通信渠道。

  • 创建和删除临时队列不会产生额外的成本。

  • 它们与静态(普通)Amazon SQS 队列 API 兼容。这意味着发送和接收消息的现有代码可以针对虚拟队列发送和接收消息。

主题

虚拟队列

虚拟队列是临时队列客户端创建的本地数据结构。使用虚拟队列让您可以将多个低流量目标合并成单个 Amazon SQS 队列。有关最佳实践,请参阅避免在虚拟队列中重复使用相同的消息组 ID

注意
  • 创建虚拟队列操作仅为接收消息的使用者创建临时数据结构。虚拟队列不对 Amazon SQS 发出 API 调用,因此不会产生任何成本。

  • TPS 配额适用于单个主机队列中的所有虚拟队列。有关更多信息,请参阅与消息相关的配额

AmazonSQSVirtualQueuesClient 包装程序类增加了对与虚拟队列相关属性的支持。要创建虚拟队列,您必须使用 HostQueueURL 属性调用 CreateQueue API 操作。此属性指定托管虚拟队列的现有队列。

虚拟队列 URL 采用以下格式。

https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue#MyVirtualQueueName

创建者在虚拟队列 URL 上调用 SendMessageSendMessageBatch API 操作时,临时队列客户端执行以下操作:

  1. 提取虚拟队列名称。

  2. 将虚拟队列名称作为额外的消息属性进行附加。

  3. 发送消息到主机队列。

当创建者发送消息时,后台线程轮询主机队列,并根据对应的消息属性将收到的消息发送到虚拟队列。

当使用者在虚拟队列 URL 上调用 ReceiveMessage API 操作时,临时队列客户端在本地阻止调用,直至后台线程将消息发送到虚拟队列中。(此过程类似于 Buffered Asynchronous Client 中的消息预取操作:单个 API 操作可以将消息提供到最多 10 个虚拟队列。) 删除虚拟队列操作可以在不调用 Amazon SQS 本身的情况下删除任何客户端资源。

AmazonSQSTemporaryQueuesClient 类自动将它创建的所有队列转入临时队列中。它还会自动使用相同的队列属性,按需创建主机队列。这些队列的名称共用共同的可配置前缀(默认情况下为 __RequesterClientQueues__),该前缀将队列标识为临时队列。这使得客户端充当了简易替代,可优化创建和删除队列的现有代码。客户端还包括 AmazonSQSRequesterAmazonSQSResponder 接口,以允许队列之间的双向通信。

请求-响应消息收发模式(虚拟队列)

临时队列的最常见使用案例为请求-响应消息收发模式,此时请求者创建临时队列来接收各个响应消息。为了避免为每个响应消息创建 Amazon SQS 队列,临时队列客户端允许您创建和删除多个临时队列,而无需进行任何 Amazon SQS API 调用。有关更多信息,请参阅实施请求-响应系统

下图显示了使用此模式时的常见配置。


                请求-响应模式用于 Amazon SQS 时的图表。

示例场景:处理登录请求

以下示例场景展示了如何使用 AmazonSQSRequesterAmazonSQSResponder 接口来处理用户的登录请求。

在客户端

public class LoginClient { // Specify the Amazon SQS queue to which to send requests. private final String requestQueueUrl; // Use the AmazonSQSRequester interface to create // a temporary queue for each response. private final AmazonSQSRequester sqsRequester = AmazonSQSRequesterClientBuilder.defaultClient(); LoginClient(String requestQueueUrl) { this.requestQueueUrl = requestQueueUrl; } // Send a login request. public String login(String body) throws TimeoutException { SendMessageRequest request = new SendMessageRequest() .withMessageBody(body) .withQueueUrl(requestQueueUrl); // If no response is received, in 20 seconds, // trigger the TimeoutException. Message reply = sqsRequester.sendMessageAndGetResponse(request, 20, TimeUnit.SECONDS); return reply.getBody(); } }

发送登录请求将执行以下操作:

  1. 创建一个临时队列。

  2. 将临时队列的 URL 作为属性附加到消息。

  3. 发送消息。

  4. 从临时队列接收响应。

  5. 删除临时队列。

  6. 返回响应。

在服务器端

以下示例假设在构造时创建了一个线程,它为每个消息轮询该队列并调用 handleLoginRequest() 方法。此外,假设使用了 doLogin() 方法。

public class LoginServer { // Specify the Amazon SQS queue to poll for login requests. private final String requestQueueUrl; // Use the AmazonSQSResponder interface to take care // of sending responses to the correct response destination. private final AmazonSQSResponder sqsResponder = AmazonSQSResponderClientBuilder.defaultClient(); LoginServer(String requestQueueUrl) { this.requestQueueUrl = requestQueueUrl; } // Process login requests from the client. public void handleLoginRequest(Message message) { // Process the login and return a serialized result. String response = doLogin(message.getBody()); // Extract the URL of the temporary queue from the message attribute // and send the response to the temporary queue. sqsResponder.sendResponseMessage(MessageContent.fromMessage(message), new MessageContent(response)); } }

清除队列

为确保 Amazon SQS 回收虚拟队列使用的任何内存中资源,应用程序不再需要临时队列客户端时,它应调用 shutdown() 方法。您还可以使用 AmazonSQSRequester 接口的 shutdown() 方法。

临时队列客户端还提供了一种方法来消除孤立的主机队列。对于在一段时间内(默认情况下为 5 分钟)接收 API 调用的每个队列,客户端使用 TagQueue API 操作来标记仍在使用的队列。

注意

在队列上执行的任意 API 操作将标记为非空闲,包括不返回任何消息的 ReceiveMessage 操作。

后台线程使用 ListQueuesListTags API 操作,以检查具有已配置前缀的所有队列,并删除至少五分钟内没有被标记的所有队列。这样,如果一个客户端未完全关闭,则其他活动客户端在之后会进行清除。为了减少重复工作,具有相同前缀的所有客户端会通过共享的内部工作队列通信,该队列以前缀命名。