使用 的 Amazon SQS範例 AWS SDK for .NET - AWS SDK 程式碼範例

文件範例儲存庫中有更多 AWS SDK可用的範例。 AWS SDK GitHub

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

使用 的 Amazon SQS範例 AWS SDK for .NET

下列程式碼範例說明如何 AWS SDK for .NET 搭配 Amazon 使用 來執行動作和實作常見案例SQS。

Actions 是大型程式的程式碼摘錄,必須在內容中執行。雖然動作會示範如何呼叫個別服務函數,但您可以在相關案例中查看內容中的動作。

案例是程式碼範例,示範如何透過呼叫服務內的多個函數或與其他 結合來完成特定任務 AWS 服務。

每個範例都包含完整原始程式碼的連結,您可以在其中找到如何在內容中設定和執行程式碼的指示。

開始使用

下列程式碼範例說明如何開始使用 Amazon SQS。

AWS SDK for .NET
注意

還有更多 。 GitHub尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

using Amazon.SQS; using Amazon.SQS.Model; namespace SQSActions; public static class HelloSQS { static async Task Main(string[] args) { var sqsClient = new AmazonSQSClient(); Console.WriteLine($"Hello Amazon SQS! Following are some of your queues:"); Console.WriteLine(); // You can use await and any of the async methods to get a response. // Let's get the first five queues. var response = await sqsClient.ListQueuesAsync( new ListQueuesRequest() { MaxResults = 5 }); foreach (var queue in response.QueueUrls) { Console.WriteLine($"\tQueue Url: {queue}"); Console.WriteLine(); } } }
  • 如需API詳細資訊,請參閱 參考 ListQueues中的 。 AWS SDK for .NET API

動作

下列程式碼範例示範如何使用 CreateQueue

AWS SDK for .NET
注意

還有更多 。 GitHub尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

建立具有特定名稱的佇列。

/// <summary> /// Create a queue with a specific name. /// </summary> /// <param name="queueName">The name for the queue.</param> /// <param name="useFifoQueue">True to use a FIFO queue.</param> /// <returns>The url for the queue.</returns> public async Task<string> CreateQueueWithName(string queueName, bool useFifoQueue) { int maxMessage = 256 * 1024; var queueAttributes = new Dictionary<string, string> { { QueueAttributeName.MaximumMessageSize, maxMessage.ToString() } }; var createQueueRequest = new CreateQueueRequest() { QueueName = queueName, Attributes = queueAttributes }; if (useFifoQueue) { // Update the name if it is not correct for a FIFO queue. if (!queueName.EndsWith(".fifo")) { createQueueRequest.QueueName = queueName + ".fifo"; } // Add an attribute for a FIFO queue. createQueueRequest.Attributes.Add( QueueAttributeName.FifoQueue, "true"); } var createResponse = await _amazonSQSClient.CreateQueueAsync( new CreateQueueRequest() { QueueName = queueName }); return createResponse.QueueUrl; }

建立 Amazon SQS佇列並傳送訊息。

using System; using System.Collections.Generic; using System.Threading.Tasks; using Amazon; using Amazon.SQS; using Amazon.SQS.Model; public class CreateSendExample { // Specify your AWS Region (an example Region is shown). private static readonly string QueueName = "Example_Queue"; private static readonly RegionEndpoint ServiceRegion = RegionEndpoint.USWest2; private static IAmazonSQS client; public static async Task Main() { client = new AmazonSQSClient(ServiceRegion); var createQueueResponse = await CreateQueue(client, QueueName); string queueUrl = createQueueResponse.QueueUrl; Dictionary<string, MessageAttributeValue> messageAttributes = new Dictionary<string, MessageAttributeValue> { { "Title", new MessageAttributeValue { DataType = "String", StringValue = "The Whistler" } }, { "Author", new MessageAttributeValue { DataType = "String", StringValue = "John Grisham" } }, { "WeeksOn", new MessageAttributeValue { DataType = "Number", StringValue = "6" } }, }; string messageBody = "Information about current NY Times fiction bestseller for week of 12/11/2016."; var sendMsgResponse = await SendMessage(client, queueUrl, messageBody, messageAttributes); } /// <summary> /// Creates a new Amazon SQS queue using the queue name passed to it /// in queueName. /// </summary> /// <param name="client">An SQS client object used to send the message.</param> /// <param name="queueName">A string representing the name of the queue /// to create.</param> /// <returns>A CreateQueueResponse that contains information about the /// newly created queue.</returns> public static async Task<CreateQueueResponse> CreateQueue(IAmazonSQS client, string queueName) { var request = new CreateQueueRequest { QueueName = queueName, Attributes = new Dictionary<string, string> { { "DelaySeconds", "60" }, { "MessageRetentionPeriod", "86400" }, }, }; var response = await client.CreateQueueAsync(request); Console.WriteLine($"Created a queue with URL : {response.QueueUrl}"); return response; } /// <summary> /// Sends a message to an SQS queue. /// </summary> /// <param name="client">An SQS client object used to send the message.</param> /// <param name="queueUrl">The URL of the queue to which to send the /// message.</param> /// <param name="messageBody">A string representing the body of the /// message to be sent to the queue.</param> /// <param name="messageAttributes">Attributes for the message to be /// sent to the queue.</param> /// <returns>A SendMessageResponse object that contains information /// about the message that was sent.</returns> public static async Task<SendMessageResponse> SendMessage( IAmazonSQS client, string queueUrl, string messageBody, Dictionary<string, MessageAttributeValue> messageAttributes) { var sendMessageRequest = new SendMessageRequest { DelaySeconds = 10, MessageAttributes = messageAttributes, MessageBody = messageBody, QueueUrl = queueUrl, }; var response = await client.SendMessageAsync(sendMessageRequest); Console.WriteLine($"Sent a message with id : {response.MessageId}"); return response; } }
  • 如需API詳細資訊,請參閱 參考 CreateQueue中的 。 AWS SDK for .NET API

下列程式碼範例示範如何使用 DeleteMessage

AWS SDK for .NET
注意

還有更多 。 GitHub尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

從 Amazon SQS佇列接收訊息,然後刪除訊息。

public static async Task Main() { // If the AWS Region you want to use is different from // the AWS Region defined for the default user, supply // the specify your AWS Region to the client constructor. var client = new AmazonSQSClient(); string queueName = "Example_Queue"; var queueUrl = await GetQueueUrl(client, queueName); Console.WriteLine($"The SQS queue's URL is {queueUrl}"); var response = await ReceiveAndDeleteMessage(client, queueUrl); Console.WriteLine($"Message: {response.Messages[0]}"); } /// <summary> /// Retrieve the queue URL for the queue named in the queueName /// property using the client object. /// </summary> /// <param name="client">The Amazon SQS client used to retrieve the /// queue URL.</param> /// <param name="queueName">A string representing name of the queue /// for which to retrieve the URL.</param> /// <returns>The URL of the queue.</returns> public static async Task<string> GetQueueUrl(IAmazonSQS client, string queueName) { var request = new GetQueueUrlRequest { QueueName = queueName, }; GetQueueUrlResponse response = await client.GetQueueUrlAsync(request); return response.QueueUrl; } /// <summary> /// Retrieves the message from the quque at the URL passed in the /// queueURL parameters using the client. /// </summary> /// <param name="client">The SQS client used to retrieve a message.</param> /// <param name="queueUrl">The URL of the queue from which to retrieve /// a message.</param> /// <returns>The response from the call to ReceiveMessageAsync.</returns> public static async Task<ReceiveMessageResponse> ReceiveAndDeleteMessage(IAmazonSQS client, string queueUrl) { // Receive a single message from the queue. var receiveMessageRequest = new ReceiveMessageRequest { AttributeNames = { "SentTimestamp" }, MaxNumberOfMessages = 1, MessageAttributeNames = { "All" }, QueueUrl = queueUrl, VisibilityTimeout = 0, WaitTimeSeconds = 0, }; var receiveMessageResponse = await client.ReceiveMessageAsync(receiveMessageRequest); // Delete the received message from the queue. var deleteMessageRequest = new DeleteMessageRequest { QueueUrl = queueUrl, ReceiptHandle = receiveMessageResponse.Messages[0].ReceiptHandle, }; await client.DeleteMessageAsync(deleteMessageRequest); return receiveMessageResponse; } }
  • 如需API詳細資訊,請參閱 參考 DeleteMessage中的 。 AWS SDK for .NET API

下列程式碼範例示範如何使用 DeleteMessageBatch

AWS SDK for .NET
注意

還有更多 。 GitHub尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

/// <summary> /// Delete a batch of messages from a queue by its url. /// </summary> /// <param name="queueUrl">The url of the queue.</param> /// <returns>True if successful.</returns> public async Task<bool> DeleteMessageBatchByUrl(string queueUrl, List<Message> messages) { var deleteRequest = new DeleteMessageBatchRequest() { QueueUrl = queueUrl, Entries = new List<DeleteMessageBatchRequestEntry>() }; foreach (var message in messages) { deleteRequest.Entries.Add(new DeleteMessageBatchRequestEntry() { ReceiptHandle = message.ReceiptHandle, Id = message.MessageId }); } var deleteResponse = await _amazonSQSClient.DeleteMessageBatchAsync(deleteRequest); return deleteResponse.Failed.Any(); }
  • 如需API詳細資訊,請參閱 參考 DeleteMessageBatch中的 。 AWS SDK for .NET API

下列程式碼範例示範如何使用 DeleteQueue

AWS SDK for .NET
注意

還有更多 。 GitHub尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

使用佇列的 刪除佇列URL。

/// <summary> /// Delete a queue by its URL. /// </summary> /// <param name="queueUrl">The url of the queue.</param> /// <returns>True if successful.</returns> public async Task<bool> DeleteQueueByUrl(string queueUrl) { var deleteResponse = await _amazonSQSClient.DeleteQueueAsync( new DeleteQueueRequest() { QueueUrl = queueUrl }); return deleteResponse.HttpStatusCode == HttpStatusCode.OK; }
  • 如需API詳細資訊,請參閱 參考 DeleteQueue中的 。 AWS SDK for .NET API

下列程式碼範例示範如何使用 GetQueueAttributes

AWS SDK for .NET
注意

還有更多 。 GitHub尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

/// <summary> /// Get the ARN for a queue from its URL. /// </summary> /// <param name="queueUrl">The URL of the queue.</param> /// <returns>The ARN of the queue.</returns> public async Task<string> GetQueueArnByUrl(string queueUrl) { var getAttributesRequest = new GetQueueAttributesRequest() { QueueUrl = queueUrl, AttributeNames = new List<string>() { QueueAttributeName.QueueArn } }; var getAttributesResponse = await _amazonSQSClient.GetQueueAttributesAsync( getAttributesRequest); return getAttributesResponse.QueueARN; }
  • 如需API詳細資訊,請參閱 參考 GetQueueAttributes中的 。 AWS SDK for .NET API

下列程式碼範例示範如何使用 GetQueueUrl

AWS SDK for .NET
注意

還有更多 。 GitHub尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

using System; using System.Threading.Tasks; using Amazon.SQS; using Amazon.SQS.Model; public class GetQueueUrl { /// <summary> /// Initializes the Amazon SQS client object and then calls the /// GetQueueUrlAsync method to retrieve the URL of an Amazon SQS /// queue. /// </summary> public static async Task Main() { // If the Amazon SQS message queue is not in the same AWS Region as your // default user, you need to provide the AWS Region as a parameter to the // client constructor. var client = new AmazonSQSClient(); string queueName = "New-Example-Queue"; try { var response = await client.GetQueueUrlAsync(queueName); if (response.HttpStatusCode == System.Net.HttpStatusCode.OK) { Console.WriteLine($"The URL for {queueName} is: {response.QueueUrl}"); } } catch (QueueDoesNotExistException ex) { Console.WriteLine(ex.Message); Console.WriteLine($"The queue {queueName} was not found."); } } }
  • 如需API詳細資訊,請參閱 參考 GetQueueUrl中的 。 AWS SDK for .NET API

下列程式碼範例示範如何使用 ReceiveMessage

AWS SDK for .NET
注意

還有更多 。 GitHub尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

使用佇列的 接收訊息URL。

/// <summary> /// Receive messages from a queue by its URL. /// </summary> /// <param name="queueUrl">The url of the queue.</param> /// <returns>The list of messages.</returns> public async Task<List<Message>> ReceiveMessagesByUrl(string queueUrl, int maxMessages) { // Setting WaitTimeSeconds to non-zero enables long polling. // For information about long polling, see // https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-short-and-long-polling.html var messageResponse = await _amazonSQSClient.ReceiveMessageAsync( new ReceiveMessageRequest() { QueueUrl = queueUrl, MaxNumberOfMessages = maxMessages, WaitTimeSeconds = 1 }); return messageResponse.Messages; }

從 Amazon SQS佇列接收訊息,然後刪除訊息。

public static async Task Main() { // If the AWS Region you want to use is different from // the AWS Region defined for the default user, supply // the specify your AWS Region to the client constructor. var client = new AmazonSQSClient(); string queueName = "Example_Queue"; var queueUrl = await GetQueueUrl(client, queueName); Console.WriteLine($"The SQS queue's URL is {queueUrl}"); var response = await ReceiveAndDeleteMessage(client, queueUrl); Console.WriteLine($"Message: {response.Messages[0]}"); } /// <summary> /// Retrieve the queue URL for the queue named in the queueName /// property using the client object. /// </summary> /// <param name="client">The Amazon SQS client used to retrieve the /// queue URL.</param> /// <param name="queueName">A string representing name of the queue /// for which to retrieve the URL.</param> /// <returns>The URL of the queue.</returns> public static async Task<string> GetQueueUrl(IAmazonSQS client, string queueName) { var request = new GetQueueUrlRequest { QueueName = queueName, }; GetQueueUrlResponse response = await client.GetQueueUrlAsync(request); return response.QueueUrl; } /// <summary> /// Retrieves the message from the quque at the URL passed in the /// queueURL parameters using the client. /// </summary> /// <param name="client">The SQS client used to retrieve a message.</param> /// <param name="queueUrl">The URL of the queue from which to retrieve /// a message.</param> /// <returns>The response from the call to ReceiveMessageAsync.</returns> public static async Task<ReceiveMessageResponse> ReceiveAndDeleteMessage(IAmazonSQS client, string queueUrl) { // Receive a single message from the queue. var receiveMessageRequest = new ReceiveMessageRequest { AttributeNames = { "SentTimestamp" }, MaxNumberOfMessages = 1, MessageAttributeNames = { "All" }, QueueUrl = queueUrl, VisibilityTimeout = 0, WaitTimeSeconds = 0, }; var receiveMessageResponse = await client.ReceiveMessageAsync(receiveMessageRequest); // Delete the received message from the queue. var deleteMessageRequest = new DeleteMessageRequest { QueueUrl = queueUrl, ReceiptHandle = receiveMessageResponse.Messages[0].ReceiptHandle, }; await client.DeleteMessageAsync(deleteMessageRequest); return receiveMessageResponse; } }
  • 如需API詳細資訊,請參閱 參考 ReceiveMessage中的 。 AWS SDK for .NET API

下列程式碼範例示範如何使用 SendMessage

AWS SDK for .NET
注意

還有更多 。 GitHub尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

建立 Amazon SQS佇列並傳送訊息。

using System; using System.Collections.Generic; using System.Threading.Tasks; using Amazon; using Amazon.SQS; using Amazon.SQS.Model; public class CreateSendExample { // Specify your AWS Region (an example Region is shown). private static readonly string QueueName = "Example_Queue"; private static readonly RegionEndpoint ServiceRegion = RegionEndpoint.USWest2; private static IAmazonSQS client; public static async Task Main() { client = new AmazonSQSClient(ServiceRegion); var createQueueResponse = await CreateQueue(client, QueueName); string queueUrl = createQueueResponse.QueueUrl; Dictionary<string, MessageAttributeValue> messageAttributes = new Dictionary<string, MessageAttributeValue> { { "Title", new MessageAttributeValue { DataType = "String", StringValue = "The Whistler" } }, { "Author", new MessageAttributeValue { DataType = "String", StringValue = "John Grisham" } }, { "WeeksOn", new MessageAttributeValue { DataType = "Number", StringValue = "6" } }, }; string messageBody = "Information about current NY Times fiction bestseller for week of 12/11/2016."; var sendMsgResponse = await SendMessage(client, queueUrl, messageBody, messageAttributes); } /// <summary> /// Creates a new Amazon SQS queue using the queue name passed to it /// in queueName. /// </summary> /// <param name="client">An SQS client object used to send the message.</param> /// <param name="queueName">A string representing the name of the queue /// to create.</param> /// <returns>A CreateQueueResponse that contains information about the /// newly created queue.</returns> public static async Task<CreateQueueResponse> CreateQueue(IAmazonSQS client, string queueName) { var request = new CreateQueueRequest { QueueName = queueName, Attributes = new Dictionary<string, string> { { "DelaySeconds", "60" }, { "MessageRetentionPeriod", "86400" }, }, }; var response = await client.CreateQueueAsync(request); Console.WriteLine($"Created a queue with URL : {response.QueueUrl}"); return response; } /// <summary> /// Sends a message to an SQS queue. /// </summary> /// <param name="client">An SQS client object used to send the message.</param> /// <param name="queueUrl">The URL of the queue to which to send the /// message.</param> /// <param name="messageBody">A string representing the body of the /// message to be sent to the queue.</param> /// <param name="messageAttributes">Attributes for the message to be /// sent to the queue.</param> /// <returns>A SendMessageResponse object that contains information /// about the message that was sent.</returns> public static async Task<SendMessageResponse> SendMessage( IAmazonSQS client, string queueUrl, string messageBody, Dictionary<string, MessageAttributeValue> messageAttributes) { var sendMessageRequest = new SendMessageRequest { DelaySeconds = 10, MessageAttributes = messageAttributes, MessageBody = messageBody, QueueUrl = queueUrl, }; var response = await client.SendMessageAsync(sendMessageRequest); Console.WriteLine($"Sent a message with id : {response.MessageId}"); return response; } }
  • 如需API詳細資訊,請參閱 參考 SendMessage中的 。 AWS SDK for .NET API

下列程式碼範例示範如何使用 SetQueueAttributes

AWS SDK for .NET
注意

還有更多 。 GitHub尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

設定主題佇列的政策屬性。

/// <summary> /// Set the policy attribute of a queue for a topic. /// </summary> /// <param name="queueArn">The ARN of the queue.</param> /// <param name="topicArn">The ARN of the topic.</param> /// <param name="queueUrl">The url for the queue.</param> /// <returns>True if successful.</returns> public async Task<bool> SetQueuePolicyForTopic(string queueArn, string topicArn, string queueUrl) { var queuePolicy = "{" + "\"Version\": \"2012-10-17\"," + "\"Statement\": [{" + "\"Effect\": \"Allow\"," + "\"Principal\": {" + $"\"Service\": " + "\"sns.amazonaws.com\"" + "}," + "\"Action\": \"sqs:SendMessage\"," + $"\"Resource\": \"{queueArn}\"," + "\"Condition\": {" + "\"ArnEquals\": {" + $"\"aws:SourceArn\": \"{topicArn}\"" + "}" + "}" + "}]" + "}"; var attributesResponse = await _amazonSQSClient.SetQueueAttributesAsync( new SetQueueAttributesRequest() { QueueUrl = queueUrl, Attributes = new Dictionary<string, string>() { { "Policy", queuePolicy } } }); return attributesResponse.HttpStatusCode == HttpStatusCode.OK; }
  • 如需API詳細資訊,請參閱 參考 SetQueueAttributes中的 。 AWS SDK for .NET API

案例

以下程式碼範例顯示做法:

  • 建立主題 (FIFO 或非 FIFO)。

  • 為主題訂閱多個佇列,並提供套用篩選條件的選擇。

  • 發佈訊息至主題。

  • 輪詢佇列以獲取收到的訊息。

AWS SDK for .NET
注意

還有更多 。 GitHub尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

在命令提示中執行互動式案例。

/// <summary> /// Console application to run a workflow scenario for topics and queues. /// </summary> public static class TopicsAndQueues { private static bool _useFifoTopic = false; private static bool _useContentBasedDeduplication = false; private static string _topicName = null!; private static string _topicArn = null!; private static readonly int _queueCount = 2; private static readonly string[] _queueUrls = new string[_queueCount]; private static readonly string[] _subscriptionArns = new string[_queueCount]; private static readonly string[] _tones = { "cheerful", "funny", "serious", "sincere" }; public static SNSWrapper SnsWrapper { get; set; } = null!; public static SQSWrapper SqsWrapper { get; set; } = null!; public static bool UseConsole { get; set; } = true; static async Task Main(string[] args) { // Set up dependency injection for Amazon EventBridge. using var host = Host.CreateDefaultBuilder(args) .ConfigureLogging(logging => logging.AddFilter("System", LogLevel.Debug) .AddFilter<DebugLoggerProvider>("Microsoft", LogLevel.Information) .AddFilter<ConsoleLoggerProvider>("Microsoft", LogLevel.Trace)) .ConfigureServices((_, services) => services.AddAWSService<IAmazonSQS>() .AddAWSService<IAmazonSimpleNotificationService>() .AddTransient<SNSWrapper>() .AddTransient<SQSWrapper>() ) .Build(); ServicesSetup(host); PrintDescription(); await RunScenario(); } /// <summary> /// Populate the services for use within the console application. /// </summary> /// <param name="host">The services host.</param> private static void ServicesSetup(IHost host) { SnsWrapper = host.Services.GetRequiredService<SNSWrapper>(); SqsWrapper = host.Services.GetRequiredService<SQSWrapper>(); } /// <summary> /// Run the scenario for working with topics and queues. /// </summary> /// <returns>True if successful.</returns> public static async Task<bool> RunScenario() { try { await SetupTopic(); await SetupQueues(); await PublishMessages(); foreach (var queueUrl in _queueUrls) { var messages = await PollForMessages(queueUrl); if (messages.Any()) { await DeleteMessages(queueUrl, messages); } } await CleanupResources(); Console.WriteLine("Messaging with topics and queues workflow is complete."); return true; } catch (Exception ex) { Console.WriteLine(new string('-', 80)); Console.WriteLine($"There was a problem running the scenario: {ex.Message}"); await CleanupResources(); Console.WriteLine(new string('-', 80)); return false; } } /// <summary> /// Print a description for the tasks in the workflow. /// </summary> /// <returns>Async task.</returns> private static void PrintDescription() { Console.WriteLine(new string('-', 80)); Console.WriteLine($"Welcome to messaging with topics and queues."); Console.WriteLine(new string('-', 80)); Console.WriteLine($"In this workflow, you will create an SNS topic and subscribe {_queueCount} SQS queues to the topic." + $"\r\nYou can select from several options for configuring the topic and the subscriptions for the 2 queues." + $"\r\nYou can then post to the topic and see the results in the queues.\r\n"); Console.WriteLine(new string('-', 80)); } /// <summary> /// Set up the SNS topic to be used with the queues. /// </summary> /// <returns>Async task.</returns> private static async Task<string> SetupTopic() { Console.WriteLine(new string('-', 80)); Console.WriteLine($"SNS topics can be configured as FIFO (First-In-First-Out)." + $"\r\nFIFO topics deliver messages in order and support deduplication and message filtering." + $"\r\nYou can then post to the topic and see the results in the queues.\r\n"); _useFifoTopic = GetYesNoResponse("Would you like to work with FIFO topics?"); if (_useFifoTopic) { Console.WriteLine(new string('-', 80)); _topicName = GetUserResponse("Enter a name for your SNS topic: ", "example-topic"); Console.WriteLine( "Because you have selected a FIFO topic, '.fifo' must be appended to the topic name.\r\n"); Console.WriteLine(new string('-', 80)); Console.WriteLine($"Because you have chosen a FIFO topic, deduplication is supported." + $"\r\nDeduplication IDs are either set in the message or automatically generated " + $"\r\nfrom content using a hash function.\r\n" + $"\r\nIf a message is successfully published to an SNS FIFO topic, any message " + $"\r\npublished and determined to have the same deduplication ID, " + $"\r\nwithin the five-minute deduplication interval, is accepted but not delivered.\r\n" + $"\r\nFor more information about deduplication, " + $"\r\nsee https://docs.aws.amazon.com/sns/latest/dg/fifo-message-dedup.html."); _useContentBasedDeduplication = GetYesNoResponse("Use content-based deduplication instead of entering a deduplication ID?"); Console.WriteLine(new string('-', 80)); } _topicArn = await SnsWrapper.CreateTopicWithName(_topicName, _useFifoTopic, _useContentBasedDeduplication); Console.WriteLine($"Your new topic with the name {_topicName}" + $"\r\nand Amazon Resource Name (ARN) {_topicArn}" + $"\r\nhas been created.\r\n"); Console.WriteLine(new string('-', 80)); return _topicArn; } /// <summary> /// Set up the queues. /// </summary> /// <returns>Async task.</returns> private static async Task SetupQueues() { Console.WriteLine(new string('-', 80)); Console.WriteLine($"Now you will create {_queueCount} Amazon Simple Queue Service (Amazon SQS) queues to subscribe to the topic."); // Repeat this section for each queue. for (int i = 0; i < _queueCount; i++) { var queueName = GetUserResponse("Enter a name for an Amazon SQS queue: ", $"example-queue-{i}"); if (_useFifoTopic) { // Only explain this once. if (i == 0) { Console.WriteLine( "Because you have selected a FIFO topic, '.fifo' must be appended to the queue name."); } var queueUrl = await SqsWrapper.CreateQueueWithName(queueName, _useFifoTopic); _queueUrls[i] = queueUrl; Console.WriteLine($"Your new queue with the name {queueName}" + $"\r\nand queue URL {queueUrl}" + $"\r\nhas been created.\r\n"); if (i == 0) { Console.WriteLine( $"The queue URL is used to retrieve the queue ARN,\r\n" + $"which is used to create a subscription."); Console.WriteLine(new string('-', 80)); } var queueArn = await SqsWrapper.GetQueueArnByUrl(queueUrl); if (i == 0) { Console.WriteLine( $"An AWS Identity and Access Management (IAM) policy must be attached to an SQS queue, enabling it to receive\r\n" + $"messages from an SNS topic"); } await SqsWrapper.SetQueuePolicyForTopic(queueArn, _topicArn, queueUrl); await SetupFilters(i, queueArn, queueName); } } Console.WriteLine(new string('-', 80)); } /// <summary> /// Set up filters with user options for a queue. /// </summary> /// <param name="queueCount">The number of this queue.</param> /// <param name="queueArn">The ARN of the queue.</param> /// <param name="queueName">The name of the queue.</param> /// <returns>Async Task.</returns> public static async Task SetupFilters(int queueCount, string queueArn, string queueName) { if (_useFifoTopic) { Console.WriteLine(new string('-', 80)); // Only explain this once. if (queueCount == 0) { Console.WriteLine( "Subscriptions to a FIFO topic can have filters." + "If you add a filter to this subscription, then only the filtered messages " + "will be received in the queue."); Console.WriteLine( "For information about message filtering, " + "see https://docs.aws.amazon.com/sns/latest/dg/sns-message-filtering.html"); Console.WriteLine( "For this example, you can filter messages by a" + "TONE attribute."); } var useFilter = GetYesNoResponse($"Filter messages for {queueName}'s subscription to the topic?"); string? filterPolicy = null; if (useFilter) { filterPolicy = CreateFilterPolicy(); } var subscriptionArn = await SnsWrapper.SubscribeTopicWithFilter(_topicArn, filterPolicy, queueArn); _subscriptionArns[queueCount] = subscriptionArn; Console.WriteLine( $"The queue {queueName} has been subscribed to the topic {_topicName} " + $"with the subscription ARN {subscriptionArn}"); Console.WriteLine(new string('-', 80)); } } /// <summary> /// Use user input to create a filter policy for a subscription. /// </summary> /// <returns>The serialized filter policy.</returns> public static string CreateFilterPolicy() { Console.WriteLine(new string('-', 80)); Console.WriteLine( $"You can filter messages by one or more of the following" + $"TONE attributes."); List<string> filterSelections = new List<string>(); var selectionNumber = 0; do { Console.WriteLine( $"Enter a number to add a TONE filter, or enter 0 to stop adding filters."); for (int i = 0; i < _tones.Length; i++) { Console.WriteLine($"\t{i + 1}. {_tones[i]}"); } var selection = GetUserResponse("", filterSelections.Any() ? "0" : "1"); int.TryParse(selection, out selectionNumber); if (selectionNumber > 0 && !filterSelections.Contains(_tones[selectionNumber - 1])) { filterSelections.Add(_tones[selectionNumber - 1]); } } while (selectionNumber != 0); var filters = new Dictionary<string, List<string>> { { "tone", filterSelections } }; string filterPolicy = JsonSerializer.Serialize(filters); return filterPolicy; } /// <summary> /// Publish messages using user settings. /// </summary> /// <returns>Async task.</returns> public static async Task PublishMessages() { Console.WriteLine("Now we can publish messages."); var keepSendingMessages = true; string? deduplicationId = null; string? toneAttribute = null; while (keepSendingMessages) { Console.WriteLine(); var message = GetUserResponse("Enter a message to publish.", "This is a sample message"); if (_useFifoTopic) { Console.WriteLine("Because you are using a FIFO topic, you must set a message group ID." + "\r\nAll messages within the same group will be received in the order " + "they were published."); Console.WriteLine(); var messageGroupId = GetUserResponse("Enter a message group ID for this message:", "1"); if (!_useContentBasedDeduplication) { Console.WriteLine("Because you are not using content-based deduplication, " + "you must enter a deduplication ID."); Console.WriteLine("Enter a deduplication ID for this message."); deduplicationId = GetUserResponse("Enter a deduplication ID for this message.", "1"); } if (GetYesNoResponse("Add an attribute to this message?")) { Console.WriteLine("Enter a number for an attribute."); for (int i = 0; i < _tones.Length; i++) { Console.WriteLine($"\t{i + 1}. {_tones[i]}"); } var selection = GetUserResponse("", "1"); int.TryParse(selection, out var selectionNumber); if (selectionNumber > 0 && selectionNumber < _tones.Length) { toneAttribute = _tones[selectionNumber - 1]; } } var messageID = await SnsWrapper.PublishToTopicWithAttribute( _topicArn, message, "tone", toneAttribute, deduplicationId, messageGroupId); Console.WriteLine($"Message published with id {messageID}."); } keepSendingMessages = GetYesNoResponse("Send another message?", false); } } /// <summary> /// Poll for the published messages to see the results of the user's choices. /// </summary> /// <returns>Async task.</returns> public static async Task<List<Message>> PollForMessages(string queueUrl) { Console.WriteLine(new string('-', 80)); Console.WriteLine($"Now the SQS queue at {queueUrl} will be polled to retrieve the messages." + "\r\nPress any key to continue."); if (UseConsole) { Console.ReadLine(); } var moreMessages = true; var messages = new List<Message>(); while (moreMessages) { var newMessages = await SqsWrapper.ReceiveMessagesByUrl(queueUrl, 10); moreMessages = newMessages.Any(); if (moreMessages) { messages.AddRange(newMessages); } } Console.WriteLine($"{messages.Count} message(s) were received by the queue at {queueUrl}."); foreach (var message in messages) { Console.WriteLine("\tMessage:" + $"\n\t{message.Body}"); } Console.WriteLine(new string('-', 80)); return messages; } /// <summary> /// Delete the message using handles in a batch. /// </summary> /// <returns>Async task.</returns> public static async Task DeleteMessages(string queueUrl, List<Message> messages) { Console.WriteLine(new string('-', 80)); Console.WriteLine("Now we can delete the messages in this queue in a batch."); await SqsWrapper.DeleteMessageBatchByUrl(queueUrl, messages); Console.WriteLine(new string('-', 80)); } /// <summary> /// Clean up the resources from the scenario. /// </summary> /// <returns>Async task.</returns> private static async Task CleanupResources() { Console.WriteLine(new string('-', 80)); Console.WriteLine($"Clean up resources."); try { foreach (var queueUrl in _queueUrls) { if (!string.IsNullOrEmpty(queueUrl)) { var deleteQueue = GetYesNoResponse($"Delete queue with url {queueUrl}?"); if (deleteQueue) { await SqsWrapper.DeleteQueueByUrl(queueUrl); } } } foreach (var subscriptionArn in _subscriptionArns) { if (!string.IsNullOrEmpty(subscriptionArn)) { await SnsWrapper.UnsubscribeByArn(subscriptionArn); } } var deleteTopic = GetYesNoResponse($"Delete topic {_topicName}?"); if (deleteTopic) { await SnsWrapper.DeleteTopicByArn(_topicArn); } } catch (Exception ex) { Console.WriteLine($"Unable to clean up resources. Here's why: {ex.Message}."); } Console.WriteLine(new string('-', 80)); } /// <summary> /// Helper method to get a yes or no response from the user. /// </summary> /// <param name="question">The question string to print on the console.</param> /// <param name="defaultAnswer">Optional default answer to use.</param> /// <returns>True if the user responds with a yes.</returns> private static bool GetYesNoResponse(string question, bool defaultAnswer = true) { if (UseConsole) { Console.WriteLine(question); var ynResponse = Console.ReadLine(); var response = ynResponse != null && ynResponse.Equals("y", StringComparison.InvariantCultureIgnoreCase); return response; } // If not using the console, use the default. return defaultAnswer; } /// <summary> /// Helper method to get a string response from the user through the console. /// </summary> /// <param name="question">The question string to print on the console.</param> /// <param name="defaultAnswer">Optional default answer to use.</param> /// <returns>True if the user responds with a yes.</returns> private static string GetUserResponse(string question, string defaultAnswer) { if (UseConsole) { var response = ""; while (string.IsNullOrEmpty(response)) { Console.WriteLine(question); response = Console.ReadLine(); } return response; } // If not using the console, use the default. return defaultAnswer; } }

建立包裝 Amazon SQS操作的類別。

/// <summary> /// Wrapper for Amazon Simple Queue Service (SQS) operations. /// </summary> public class SQSWrapper { private readonly IAmazonSQS _amazonSQSClient; /// <summary> /// Constructor for the Amazon SQS wrapper. /// </summary> /// <param name="amazonSQS">The injected Amazon SQS client.</param> public SQSWrapper(IAmazonSQS amazonSQS) { _amazonSQSClient = amazonSQS; } /// <summary> /// Create a queue with a specific name. /// </summary> /// <param name="queueName">The name for the queue.</param> /// <param name="useFifoQueue">True to use a FIFO queue.</param> /// <returns>The url for the queue.</returns> public async Task<string> CreateQueueWithName(string queueName, bool useFifoQueue) { int maxMessage = 256 * 1024; var queueAttributes = new Dictionary<string, string> { { QueueAttributeName.MaximumMessageSize, maxMessage.ToString() } }; var createQueueRequest = new CreateQueueRequest() { QueueName = queueName, Attributes = queueAttributes }; if (useFifoQueue) { // Update the name if it is not correct for a FIFO queue. if (!queueName.EndsWith(".fifo")) { createQueueRequest.QueueName = queueName + ".fifo"; } // Add an attribute for a FIFO queue. createQueueRequest.Attributes.Add( QueueAttributeName.FifoQueue, "true"); } var createResponse = await _amazonSQSClient.CreateQueueAsync( new CreateQueueRequest() { QueueName = queueName }); return createResponse.QueueUrl; } /// <summary> /// Get the ARN for a queue from its URL. /// </summary> /// <param name="queueUrl">The URL of the queue.</param> /// <returns>The ARN of the queue.</returns> public async Task<string> GetQueueArnByUrl(string queueUrl) { var getAttributesRequest = new GetQueueAttributesRequest() { QueueUrl = queueUrl, AttributeNames = new List<string>() { QueueAttributeName.QueueArn } }; var getAttributesResponse = await _amazonSQSClient.GetQueueAttributesAsync( getAttributesRequest); return getAttributesResponse.QueueARN; } /// <summary> /// Set the policy attribute of a queue for a topic. /// </summary> /// <param name="queueArn">The ARN of the queue.</param> /// <param name="topicArn">The ARN of the topic.</param> /// <param name="queueUrl">The url for the queue.</param> /// <returns>True if successful.</returns> public async Task<bool> SetQueuePolicyForTopic(string queueArn, string topicArn, string queueUrl) { var queuePolicy = "{" + "\"Version\": \"2012-10-17\"," + "\"Statement\": [{" + "\"Effect\": \"Allow\"," + "\"Principal\": {" + $"\"Service\": " + "\"sns.amazonaws.com\"" + "}," + "\"Action\": \"sqs:SendMessage\"," + $"\"Resource\": \"{queueArn}\"," + "\"Condition\": {" + "\"ArnEquals\": {" + $"\"aws:SourceArn\": \"{topicArn}\"" + "}" + "}" + "}]" + "}"; var attributesResponse = await _amazonSQSClient.SetQueueAttributesAsync( new SetQueueAttributesRequest() { QueueUrl = queueUrl, Attributes = new Dictionary<string, string>() { { "Policy", queuePolicy } } }); return attributesResponse.HttpStatusCode == HttpStatusCode.OK; } /// <summary> /// Receive messages from a queue by its URL. /// </summary> /// <param name="queueUrl">The url of the queue.</param> /// <returns>The list of messages.</returns> public async Task<List<Message>> ReceiveMessagesByUrl(string queueUrl, int maxMessages) { // Setting WaitTimeSeconds to non-zero enables long polling. // For information about long polling, see // https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-short-and-long-polling.html var messageResponse = await _amazonSQSClient.ReceiveMessageAsync( new ReceiveMessageRequest() { QueueUrl = queueUrl, MaxNumberOfMessages = maxMessages, WaitTimeSeconds = 1 }); return messageResponse.Messages; } /// <summary> /// Delete a batch of messages from a queue by its url. /// </summary> /// <param name="queueUrl">The url of the queue.</param> /// <returns>True if successful.</returns> public async Task<bool> DeleteMessageBatchByUrl(string queueUrl, List<Message> messages) { var deleteRequest = new DeleteMessageBatchRequest() { QueueUrl = queueUrl, Entries = new List<DeleteMessageBatchRequestEntry>() }; foreach (var message in messages) { deleteRequest.Entries.Add(new DeleteMessageBatchRequestEntry() { ReceiptHandle = message.ReceiptHandle, Id = message.MessageId }); } var deleteResponse = await _amazonSQSClient.DeleteMessageBatchAsync(deleteRequest); return deleteResponse.Failed.Any(); } /// <summary> /// Delete a queue by its URL. /// </summary> /// <param name="queueUrl">The url of the queue.</param> /// <returns>True if successful.</returns> public async Task<bool> DeleteQueueByUrl(string queueUrl) { var deleteResponse = await _amazonSQSClient.DeleteQueueAsync( new DeleteQueueRequest() { QueueUrl = queueUrl }); return deleteResponse.HttpStatusCode == HttpStatusCode.OK; } }

建立包裝 Amazon SNS操作的類別。

/// <summary> /// Wrapper for Amazon Simple Notification Service (SNS) operations. /// </summary> public class SNSWrapper { private readonly IAmazonSimpleNotificationService _amazonSNSClient; /// <summary> /// Constructor for the Amazon SNS wrapper. /// </summary> /// <param name="amazonSQS">The injected Amazon SNS client.</param> public SNSWrapper(IAmazonSimpleNotificationService amazonSNS) { _amazonSNSClient = amazonSNS; } /// <summary> /// Create a new topic with a name and specific FIFO and de-duplication attributes. /// </summary> /// <param name="topicName">The name for the topic.</param> /// <param name="useFifoTopic">True to use a FIFO topic.</param> /// <param name="useContentBasedDeduplication">True to use content-based de-duplication.</param> /// <returns>The ARN of the new topic.</returns> public async Task<string> CreateTopicWithName(string topicName, bool useFifoTopic, bool useContentBasedDeduplication) { var createTopicRequest = new CreateTopicRequest() { Name = topicName, }; if (useFifoTopic) { // Update the name if it is not correct for a FIFO topic. if (!topicName.EndsWith(".fifo")) { createTopicRequest.Name = topicName + ".fifo"; } // Add the attributes from the method parameters. createTopicRequest.Attributes = new Dictionary<string, string> { { "FifoTopic", "true" } }; if (useContentBasedDeduplication) { createTopicRequest.Attributes.Add("ContentBasedDeduplication", "true"); } } var createResponse = await _amazonSNSClient.CreateTopicAsync(createTopicRequest); return createResponse.TopicArn; } /// <summary> /// Subscribe a queue to a topic with optional filters. /// </summary> /// <param name="topicArn">The ARN of the topic.</param> /// <param name="useFifoTopic">The optional filtering policy for the subscription.</param> /// <param name="queueArn">The ARN of the queue.</param> /// <returns>The ARN of the new subscription.</returns> public async Task<string> SubscribeTopicWithFilter(string topicArn, string? filterPolicy, string queueArn) { var subscribeRequest = new SubscribeRequest() { TopicArn = topicArn, Protocol = "sqs", Endpoint = queueArn }; if (!string.IsNullOrEmpty(filterPolicy)) { subscribeRequest.Attributes = new Dictionary<string, string> { { "FilterPolicy", filterPolicy } }; } var subscribeResponse = await _amazonSNSClient.SubscribeAsync(subscribeRequest); return subscribeResponse.SubscriptionArn; } /// <summary> /// Publish a message to a topic with an attribute and optional deduplication and group IDs. /// </summary> /// <param name="topicArn">The ARN of the topic.</param> /// <param name="message">The message to publish.</param> /// <param name="attributeName">The optional attribute for the message.</param> /// <param name="attributeValue">The optional attribute value for the message.</param> /// <param name="deduplicationId">The optional deduplication ID for the message.</param> /// <param name="groupId">The optional group ID for the message.</param> /// <returns>The ID of the message published.</returns> public async Task<string> PublishToTopicWithAttribute( string topicArn, string message, string? attributeName = null, string? attributeValue = null, string? deduplicationId = null, string? groupId = null) { var publishRequest = new PublishRequest() { TopicArn = topicArn, Message = message, MessageDeduplicationId = deduplicationId, MessageGroupId = groupId }; if (attributeValue != null) { // Add the string attribute if it exists. publishRequest.MessageAttributes = new Dictionary<string, MessageAttributeValue> { { attributeName!, new MessageAttributeValue() { StringValue = attributeValue, DataType = "String"} } }; } var publishResponse = await _amazonSNSClient.PublishAsync(publishRequest); return publishResponse.MessageId; } /// <summary> /// Unsubscribe from a topic by a subscription ARN. /// </summary> /// <param name="subscriptionArn">The ARN of the subscription.</param> /// <returns>True if successful.</returns> public async Task<bool> UnsubscribeByArn(string subscriptionArn) { var unsubscribeResponse = await _amazonSNSClient.UnsubscribeAsync( new UnsubscribeRequest() { SubscriptionArn = subscriptionArn }); return unsubscribeResponse.HttpStatusCode == HttpStatusCode.OK; } /// <summary> /// Delete a topic by its topic ARN. /// </summary> /// <param name="topicArn">The ARN of the topic.</param> /// <returns>True if successful.</returns> public async Task<bool> DeleteTopicByArn(string topicArn) { var deleteResponse = await _amazonSNSClient.DeleteTopicAsync( new DeleteTopicRequest() { TopicArn = topicArn }); return deleteResponse.HttpStatusCode == HttpStatusCode.OK; } }

下列程式碼範例示範如何使用適用於 SQS的訊息處理架構來建立發佈和接收 Amazon AWS 訊息的應用程式NET。

AWS SDK for .NET

提供適用於 AWS 的訊息處理架構教學NET課程。教學課程會建立 Web 應用程式,允許使用者發佈 Amazon SQS 訊息和接收訊息的命令列應用程式。

如需設定和執行的完整原始程式碼和指示,請參閱 AWS SDK for .NET 開發人員指南中的完整教學課程和 上的範例GitHub

此範例中使用的服務
  • Amazon SQS

無伺服器範例

下列程式碼範例示範如何實作 Lambda 函數,該函數接收從SQS佇列接收訊息所觸發的事件。函數會從事件參數擷取訊息,並記錄每一則訊息的內容。

AWS SDK for .NET
注意

還有更多 。 GitHub尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。

使用 使用 Lambda 取用SQS事件NET。

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 using Amazon.Lambda.Core; using Amazon.Lambda.SQSEvents; // Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class. [assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))] namespace SqsIntegrationSampleCode { public async Task FunctionHandler(SQSEvent evnt, ILambdaContext context) { foreach (var message in evnt.Records) { await ProcessMessageAsync(message, context); } context.Logger.LogInformation("done"); } private async Task ProcessMessageAsync(SQSEvent.SQSMessage message, ILambdaContext context) { try { context.Logger.LogInformation($"Processed message {message.Body}"); // TODO: Do interesting work based on the new message await Task.CompletedTask; } catch (Exception e) { //You can use Dead Letter Queue to handle failures. By configuring a Lambda DLQ. context.Logger.LogError($"An error occurred"); throw; } } }

下列程式碼範例示範如何針對從SQS佇列接收事件的 Lambda 函數實作部分批次回應。此函數會在回應中報告批次項目失敗,指示 Lambda 稍後重試這些訊息。

AWS SDK for .NET
注意

還有更多 。 GitHub尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。

使用 使用 Lambda 報告SQS批次項目失敗NET。

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 using Amazon.Lambda.Core; using Amazon.Lambda.SQSEvents; // Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class. [assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))] namespace sqsSample; public class Function { public async Task<SQSBatchResponse> FunctionHandler(SQSEvent evnt, ILambdaContext context) { List<SQSBatchResponse.BatchItemFailure> batchItemFailures = new List<SQSBatchResponse.BatchItemFailure>(); foreach(var message in evnt.Records) { try { //process your message await ProcessMessageAsync(message, context); } catch (System.Exception) { //Add failed message identifier to the batchItemFailures list batchItemFailures.Add(new SQSBatchResponse.BatchItemFailure{ItemIdentifier=message.MessageId}); } } return new SQSBatchResponse(batchItemFailures); } private async Task ProcessMessageAsync(SQSEvent.SQSMessage message, ILambdaContext context) { if (String.IsNullOrEmpty(message.Body)) { throw new Exception("No Body in SQS Message."); } context.Logger.LogInformation($"Processed message {message.Body}"); // TODO: Do interesting work based on the new message await Task.CompletedTask; } }