使用 .NET 的 AWS 訊息處理架構使用訊息 - AWS SDK for .NET

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

使用 .NET 的 AWS 訊息處理架構使用訊息

這是預覽版中某項功能的搶鮮版說明文件。內容可能變動。

.NET 的訊 AWS 息處理架構可讓您使用已經使用架構或其中一個郵件服務所發佈的訊息。消息可以通過多種方式消耗,其中一些說明如下。

訊息處理器

若要使用訊息,請使用您想要處理的每個訊息類型的IMessageHandler介面來實作訊息處理常式。消息類型和消息處理程序之間的映射在項目啟動中配置。

await Host.CreateDefaultBuilder(args) .ConfigureServices(services => { // Register the AWS Message Processing Framework for .NET services.AddAWSMessageBus(builder => { // Register an SQS Queue that the framework will poll for messages. // NOTE: The URL given below is an example. Use the appropriate URL for your SQS Queue. builder.AddSQSPoller("https://sqs.us-west-2.amazonaws.com/012345678910/MyAppProd"); // Register all IMessageHandler implementations with the message type they should process. // Here messages that match our ChatMessage .NET type will be handled by our ChatMessageHandler builder.AddMessageHandler<ChatMessageHandler, ChatMessage>(); }); }) .Build() .RunAsync();

下列程式碼顯示訊息的範例訊息處理常式。ChatMessage

public class ChatMessageHandler : IMessageHandler<ChatMessage> { public Task<MessageProcessStatus> HandleAsync(MessageEnvelope<ChatMessage> messageEnvelope, CancellationToken token = default) { // Add business and validation logic here. if (messageEnvelope == null) { return Task.FromResult(MessageProcessStatus.Failed()); } if (messageEnvelope.Message == null) { return Task.FromResult(MessageProcessStatus.Failed()); } ChatMessage message = messageEnvelope.Message; Console.WriteLine($"Message Description: {message.MessageDescription}"); // Return success so the framework will delete the message from the queue. return Task.FromResult(MessageProcessStatus.Success()); } }

外部MessageEnvelope包含框架使用的元數據。它的message屬性是消息類型(在本例中ChatMessage)。

您可以返回MessageProcessStatus.Success()以指示訊息已成功處理,而架構將從 Amazon SQS 佇列中刪除訊息。傳回時MessageProcessStatus.Failed(),訊息會保留在佇列中,在佇列中可以再次處理,或移至無效字母佇列 (若已設定)。

在長時間執行的程序中處理郵件

您可以AddSQSPoller使用 SQS 佇列 URL 呼叫,以啟動長時間執行,BackgroundService以持續輪詢佇列並處理訊息。

await Host.CreateDefaultBuilder(args) .ConfigureServices(services => { // Register the AWS Message Processing Framework for .NET services.AddAWSMessageBus(builder => { // Register an SQS Queue that the framework will poll for messages. // NOTE: The URL given below is an example. Use the appropriate URL for your SQS Queue. builder.AddSQSPoller("https://sqs.us-west-2.amazonaws.com/012345678910/MyAppProd", options => { // The maximum number of messages from this queue that the framework will process concurrently on this client. options.MaxNumberOfConcurrentMessages = 10; // The duration each call to SQS will wait for new messages. options.WaitTimeSeconds = 20; }); // Register all IMessageHandler implementations with the message type they should process. builder.AddMessageHandler<ChatMessageHandler, ChatMessage>(); }); }) .Build() .RunAsync();

設定 SQS 訊息輪詢程式

SQS 訊息輪詢程式可在呼叫SQSMessagePollerOptions時由設定。AddSQSPoller

  • MaxNumberOfConcurrentMessages-佇列中要同時處理的訊息數目上限。預設值為 10。

  • WaitTimeSeconds-ReceiveMessage SQS 呼叫在傳回前等待訊息到達佇列的持續時間 (以秒為單位)。如果有可用的訊息,呼叫會比傳回的時間早於WaitTimeSeconds。預設值為 20。

訊息可見性逾時處理

SQS 訊息具有可見性逾時期間。當一個消費者開始處理給定的消息時,它會保留在隊列中,但對其他消費者隱藏,以避免多次處理它。如果在再次顯示之前未處理和刪除郵件,則其他消費者可能會嘗試處理相同的消息。

該框架將跟踪並嘗試擴展當前正在處理的消息的可見性超時。您可以在呼叫SQSMessagePollerOptions時在上設定此行為AddSQSPoller

  • VisibilityTimeout-在後續擷取要求中隱藏接收訊息的持續時間 (以秒為單位)。預設值為 30。

  • VisibilityTimeoutExtensionThreshold-當消息的可見性超時在過期的這幾秒鐘內時,框架將擴展可見性超時(另一VisibilityTimeout秒鐘)。預設值為 5。

  • VisibilityTimeoutExtensionHeartbeatInterval-框架在幾秒鐘內檢查過期後幾VisibilityTimeoutExtensionThreshold秒鐘內的消息,然後延長其可見性超時的頻率。預設值為 1。

在下面的例子中,框架將每隔 1 秒檢查一次是否有仍在處理的消息。對於在再次顯示後 5 秒內的那些消息,框架將自動將每個消息的可見性超時延長 30 秒。

// NOTE: The URL given below is an example. Use the appropriate URL for your SQS Queue. builder.AddSQSPoller("https://sqs.us-west-2.amazonaws.com/012345678910/MyAppProd", options => { options.VisibilityTimeout = 30; options.VisibilityTimeoutExtensionThreshold = 5; VisibilityTimeoutExtensionHeartbeatInterval = 1; });

處理 AWS Lambda 函數中的消息

您可以使用適用於 .NET 的 AWS 訊息處理架構,搭配 SQS 與 Lambda 的整合。這是由AWS.Messaging.Lambda軟件包提供。請參閱其自述文件以開始使用。