使用 SDK for JavaScript (v3) 的 Amazon SQS 示例 - AWS SDK 代码示例

AWS 文档 SDK 示例 GitHub 存储库中还有更多 AWS SDK 示例。

使用 SDK for JavaScript (v3) 的 Amazon SQS 示例

以下代码示例演示如何通过将 AWS SDK for JavaScript (v3) 与 Amazon SQS 结合使用,来执行操作和实现常见场景。

操作是大型程序的代码摘录,必须在上下文中运行。您可以通过操作了解如何调用单个服务函数,还可以通过函数相关场景的上下文查看操作。

场景是向您演示如何通过在一个服务中调用多个函数或与其他 AWS 服务 结合来完成特定任务的代码示例。

每个示例都包含一个指向完整源代码的链接,您可以从中找到有关如何在上下文中设置和运行代码的说明。

开始使用

以下代码示例展示了如何开始使用 Amazon SQS。

SDK for JavaScript (v3)
注意

查看 GitHub,了解更多信息。在 AWS 代码示例存储库中查找完整示例,了解如何进行设置和运行。

初始化 Amazon SQS 客户端并列出队列。

import { SQSClient, paginateListQueues } from "@aws-sdk/client-sqs"; export const helloSqs = async () => { // The configuration object (`{}`) is required. If the region and credentials // are omitted, the SDK uses your local configuration if it exists. const client = new SQSClient({}); // You can also use `ListQueuesCommand`, but to use that command you must // handle the pagination yourself. You can do that by sending the `ListQueuesCommand` // with the `NextToken` parameter from the previous request. const paginatedQueues = paginateListQueues({ client }, {}); const queues = []; for await (const page of paginatedQueues) { if (page.QueueUrls?.length) { queues.push(...page.QueueUrls); } } const suffix = queues.length === 1 ? "" : "s"; console.log( `Hello, Amazon SQS! You have ${queues.length} queue${suffix} in your account.`, ); console.log(queues.map((t) => ` * ${t}`).join("\n")); };
  • 有关 API 详细信息,请参阅《AWS SDK for JavaScript API 参考》中的 ListQueues

操作

以下代码示例演示如何使用 ChangeMessageVisibility

适用于 JavaScript 的 SDK(v3)
注意

查看 GitHub,了解更多信息。在 AWS 代码示例存储库中查找完整示例,了解如何进行设置和运行。

接收 Amazon SQS 消息并更改其超时可见性。

import { ReceiveMessageCommand, ChangeMessageVisibilityCommand, SQSClient, } from "@aws-sdk/client-sqs"; const client = new SQSClient({}); const SQS_QUEUE_URL = "queue_url"; const receiveMessage = (queueUrl) => client.send( new ReceiveMessageCommand({ AttributeNames: ["SentTimestamp"], MaxNumberOfMessages: 1, MessageAttributeNames: ["All"], QueueUrl: queueUrl, WaitTimeSeconds: 1, }), ); export const main = async (queueUrl = SQS_QUEUE_URL) => { const { Messages } = await receiveMessage(queueUrl); const response = await client.send( new ChangeMessageVisibilityCommand({ QueueUrl: queueUrl, ReceiptHandle: Messages[0].ReceiptHandle, VisibilityTimeout: 20, }), ); console.log(response); return response; };

以下代码示例演示如何使用 CreateQueue

适用于 JavaScript 的 SDK(v3)
注意

查看 GitHub,了解更多信息。在 AWS 代码示例存储库中查找完整示例,了解如何进行设置和运行。

创建 Amazon SQS 标准队列。

import { CreateQueueCommand, SQSClient } from "@aws-sdk/client-sqs"; const client = new SQSClient({}); const SQS_QUEUE_NAME = "test-queue"; export const main = async (sqsQueueName = SQS_QUEUE_NAME) => { const command = new CreateQueueCommand({ QueueName: sqsQueueName, Attributes: { DelaySeconds: "60", MessageRetentionPeriod: "86400", }, }); const response = await client.send(command); console.log(response); return response; };

使用长轮询创建 Amazon SQS 队列。

import { CreateQueueCommand, SQSClient } from "@aws-sdk/client-sqs"; const client = new SQSClient({}); const SQS_QUEUE_NAME = "queue_name"; export const main = async (queueName = SQS_QUEUE_NAME) => { const response = await client.send( new CreateQueueCommand({ QueueName: queueName, Attributes: { // When the wait time for the ReceiveMessage API action is greater than 0, // long polling is in effect. The maximum long polling wait time is 20 // seconds. Long polling helps reduce the cost of using Amazon SQS by, // eliminating the number of empty responses and false empty responses. // https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-short-and-long-polling.html ReceiveMessageWaitTimeSeconds: "20", }, }), ); console.log(response); return response; };

以下代码示例演示如何使用 DeleteMessage

适用于 JavaScript 的 SDK(v3)
注意

查看 GitHub,了解更多信息。在 AWS 代码示例存储库中查找完整示例,了解如何进行设置和运行。

接收和删除 Amazon SQS 消息。

import { ReceiveMessageCommand, DeleteMessageCommand, SQSClient, DeleteMessageBatchCommand, } from "@aws-sdk/client-sqs"; const client = new SQSClient({}); const SQS_QUEUE_URL = "queue_url"; const receiveMessage = (queueUrl) => client.send( new ReceiveMessageCommand({ AttributeNames: ["SentTimestamp"], MaxNumberOfMessages: 10, MessageAttributeNames: ["All"], QueueUrl: queueUrl, WaitTimeSeconds: 20, VisibilityTimeout: 20, }), ); export const main = async (queueUrl = SQS_QUEUE_URL) => { const { Messages } = await receiveMessage(queueUrl); if (!Messages) { return; } if (Messages.length === 1) { console.log(Messages[0].Body); await client.send( new DeleteMessageCommand({ QueueUrl: queueUrl, ReceiptHandle: Messages[0].ReceiptHandle, }), ); } else { await client.send( new DeleteMessageBatchCommand({ QueueUrl: queueUrl, Entries: Messages.map((message) => ({ Id: message.MessageId, ReceiptHandle: message.ReceiptHandle, })), }), ); } };
  • 有关 API 详细信息,请参阅《AWS SDK for JavaScript API 参考》中的 DeleteMessage

以下代码示例演示如何使用 DeleteMessageBatch

适用于 JavaScript 的 SDK(v3)
注意

查看 GitHub,了解更多信息。在 AWS 代码示例存储库中查找完整示例,了解如何进行设置和运行。

import { ReceiveMessageCommand, DeleteMessageCommand, SQSClient, DeleteMessageBatchCommand, } from "@aws-sdk/client-sqs"; const client = new SQSClient({}); const SQS_QUEUE_URL = "queue_url"; const receiveMessage = (queueUrl) => client.send( new ReceiveMessageCommand({ AttributeNames: ["SentTimestamp"], MaxNumberOfMessages: 10, MessageAttributeNames: ["All"], QueueUrl: queueUrl, WaitTimeSeconds: 20, VisibilityTimeout: 20, }), ); export const main = async (queueUrl = SQS_QUEUE_URL) => { const { Messages } = await receiveMessage(queueUrl); if (!Messages) { return; } if (Messages.length === 1) { console.log(Messages[0].Body); await client.send( new DeleteMessageCommand({ QueueUrl: queueUrl, ReceiptHandle: Messages[0].ReceiptHandle, }), ); } else { await client.send( new DeleteMessageBatchCommand({ QueueUrl: queueUrl, Entries: Messages.map((message) => ({ Id: message.MessageId, ReceiptHandle: message.ReceiptHandle, })), }), ); } };
  • 有关 API 的详细信息,请参阅《AWS SDK for JavaScript API 参考》中的 DeleteMessageBatch

以下代码示例演示如何使用 DeleteQueue

适用于 JavaScript 的 SDK(v3)
注意

查看 GitHub,了解更多信息。在 AWS 代码示例存储库中查找完整示例,了解如何进行设置和运行。

删除 Amazon SQS 队列。

import { DeleteQueueCommand, SQSClient } from "@aws-sdk/client-sqs"; const client = new SQSClient({}); const SQS_QUEUE_URL = "test-queue-url"; export const main = async (queueUrl = SQS_QUEUE_URL) => { const command = new DeleteQueueCommand({ QueueUrl: queueUrl }); const response = await client.send(command); console.log(response); return response; };

以下代码示例演示如何使用 GetQueueAttributes

适用于 JavaScript 的 SDK(v3)
注意

查看 GitHub,了解更多信息。在 AWS 代码示例存储库中查找完整示例,了解如何进行设置和运行。

import { GetQueueAttributesCommand, SQSClient } from "@aws-sdk/client-sqs"; const client = new SQSClient({}); const SQS_QUEUE_URL = "queue-url"; export const getQueueAttributes = async (queueUrl = SQS_QUEUE_URL) => { const command = new GetQueueAttributesCommand({ QueueUrl: queueUrl, AttributeNames: ["DelaySeconds"], }); const response = await client.send(command); console.log(response); // { // '$metadata': { // httpStatusCode: 200, // requestId: '747a1192-c334-5682-a508-4cd5e8dc4e79', // extendedRequestId: undefined, // cfId: undefined, // attempts: 1, // totalRetryDelay: 0 // }, // Attributes: { DelaySeconds: '1' } // } return response; };
  • 有关 API 的详细信息,请参阅《AWS SDK for JavaScript API 参考》中的 GetTopicAttributes

以下代码示例演示如何使用 GetQueueUrl

适用于 JavaScript 的 SDK(v3)
注意

查看 GitHub,了解更多信息。在 AWS 代码示例存储库中查找完整示例,了解如何进行设置和运行。

获取 Amazon SQS 队列的 URL。

import { GetQueueUrlCommand, SQSClient } from "@aws-sdk/client-sqs"; const client = new SQSClient({}); const SQS_QUEUE_NAME = "test-queue"; export const main = async (queueName = SQS_QUEUE_NAME) => { const command = new GetQueueUrlCommand({ QueueName: queueName }); const response = await client.send(command); console.log(response); return response; };

以下代码示例演示如何使用 ListQueues

适用于 JavaScript 的 SDK(v3)
注意

查看 GitHub,了解更多信息。在 AWS 代码示例存储库中查找完整示例,了解如何进行设置和运行。

列出 Amazon SQS 队列。

import { paginateListQueues, SQSClient } from "@aws-sdk/client-sqs"; const client = new SQSClient({}); export const main = async () => { const paginatedListQueues = paginateListQueues({ client }, {}); /** @type {string[]} */ const urls = []; for await (const page of paginatedListQueues) { const nextUrls = page.QueueUrls?.filter((qurl) => !!qurl) || []; urls.push(...nextUrls); for (const url of urls) { console.log(url); } } return urls; };

以下代码示例演示如何使用 ReceiveMessage

适用于 JavaScript 的 SDK(v3)
注意

查看 GitHub,了解更多信息。在 AWS 代码示例存储库中查找完整示例,了解如何进行设置和运行。

接收来自 Amazon SQS 队列的消息。

import { ReceiveMessageCommand, DeleteMessageCommand, SQSClient, DeleteMessageBatchCommand, } from "@aws-sdk/client-sqs"; const client = new SQSClient({}); const SQS_QUEUE_URL = "queue_url"; const receiveMessage = (queueUrl) => client.send( new ReceiveMessageCommand({ AttributeNames: ["SentTimestamp"], MaxNumberOfMessages: 10, MessageAttributeNames: ["All"], QueueUrl: queueUrl, WaitTimeSeconds: 20, VisibilityTimeout: 20, }), ); export const main = async (queueUrl = SQS_QUEUE_URL) => { const { Messages } = await receiveMessage(queueUrl); if (!Messages) { return; } if (Messages.length === 1) { console.log(Messages[0].Body); await client.send( new DeleteMessageCommand({ QueueUrl: queueUrl, ReceiptHandle: Messages[0].ReceiptHandle, }), ); } else { await client.send( new DeleteMessageBatchCommand({ QueueUrl: queueUrl, Entries: Messages.map((message) => ({ Id: message.MessageId, ReceiptHandle: message.ReceiptHandle, })), }), ); } };

使用长轮询接收来自 Amazon SQS 队列的消息。

import { ReceiveMessageCommand, SQSClient } from "@aws-sdk/client-sqs"; const client = new SQSClient({}); const SQS_QUEUE_URL = "queue-url"; export const main = async (queueUrl = SQS_QUEUE_URL) => { const command = new ReceiveMessageCommand({ AttributeNames: ["SentTimestamp"], MaxNumberOfMessages: 1, MessageAttributeNames: ["All"], QueueUrl: queueUrl, // The duration (in seconds) for which the call waits for a message // to arrive in the queue before returning. If a message is available, // the call returns sooner than WaitTimeSeconds. If no messages are // available and the wait time expires, the call returns successfully // with an empty list of messages. // https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_ReceiveMessage.html#API_ReceiveMessage_RequestSyntax WaitTimeSeconds: 20, }); const response = await client.send(command); console.log(response); return response; };
  • 有关 API 详细信息,请参阅《AWS SDK for JavaScript API 参考》中的 ReceiveMessage

以下代码示例演示如何使用 SendMessage

适用于 JavaScript 的 SDK(v3)
注意

查看 GitHub,了解更多信息。在 AWS 代码示例存储库中查找完整示例,了解如何进行设置和运行。

向 Amazon SQS 队列发送消息。

import { SendMessageCommand, SQSClient } from "@aws-sdk/client-sqs"; const client = new SQSClient({}); const SQS_QUEUE_URL = "queue_url"; export const main = async (sqsQueueUrl = SQS_QUEUE_URL) => { const command = new SendMessageCommand({ QueueUrl: sqsQueueUrl, DelaySeconds: 10, MessageAttributes: { Title: { DataType: "String", StringValue: "The Whistler", }, Author: { DataType: "String", StringValue: "John Grisham", }, WeeksOn: { DataType: "Number", StringValue: "6", }, }, MessageBody: "Information about current NY Times fiction bestseller for week of 12/11/2016.", }); const response = await client.send(command); console.log(response); return response; };

以下代码示例演示如何使用 SetQueueAttributes

适用于 JavaScript 的 SDK(v3)
注意

查看 GitHub,了解更多信息。在 AWS 代码示例存储库中查找完整示例,了解如何进行设置和运行。

import { SetQueueAttributesCommand, SQSClient } from "@aws-sdk/client-sqs"; const client = new SQSClient({}); const SQS_QUEUE_URL = "queue-url"; export const main = async (queueUrl = SQS_QUEUE_URL) => { const command = new SetQueueAttributesCommand({ QueueUrl: queueUrl, Attributes: { DelaySeconds: "1", }, }); const response = await client.send(command); console.log(response); return response; };

将 Amazon SQS 队列配置为使用长轮询。

import { SetQueueAttributesCommand, SQSClient } from "@aws-sdk/client-sqs"; const client = new SQSClient({}); const SQS_QUEUE_URL = "queue_url"; export const main = async (queueUrl = SQS_QUEUE_URL) => { const command = new SetQueueAttributesCommand({ Attributes: { ReceiveMessageWaitTimeSeconds: "20", }, QueueUrl: queueUrl, }); const response = await client.send(command); console.log(response); return response; };

配置死信队列。

import { SetQueueAttributesCommand, SQSClient } from "@aws-sdk/client-sqs"; const client = new SQSClient({}); const SQS_QUEUE_URL = "queue_url"; const DEAD_LETTER_QUEUE_ARN = "dead_letter_queue_arn"; export const main = async ( queueUrl = SQS_QUEUE_URL, deadLetterQueueArn = DEAD_LETTER_QUEUE_ARN, ) => { const command = new SetQueueAttributesCommand({ Attributes: { RedrivePolicy: JSON.stringify({ // Amazon SQS supports dead-letter queues (DLQ), which other // queues (source queues) can target for messages that can't // be processed (consumed) successfully. // https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-dead-letter-queues.html deadLetterTargetArn: deadLetterQueueArn, maxReceiveCount: "10", }), }, QueueUrl: queueUrl, }); const response = await client.send(command); console.log(response); return response; };
  • 有关 API 的详细信息,请参阅《AWS SDK for JavaScript API 参考》中的 SetTopicAttributes

场景

以下代码示例演示如何通过交互式应用程序探索 Amazon Textract 输出。

SDK for JavaScript (v3)

演示了如何使用 AWS SDK for JavaScript 构建 React 应用程序,该应用程序使用 Amazon Textract 从文档图像中提取数据并在交互式网页中显示该数据。此示例在 Web 浏览器中运行,需要经过身份验证的 Amazon Cognito 身份才能获得凭证。它使用 Amazon Simple Storage Service(Amazon S3)进行存储;对于通知,它将轮询订阅 Amazon Simple Notification Service(Amazon SNS)主题的 Amazon Simple Queue Service(Amazon SQS)队列。

有关完整的源代码以及如何设置和运行的说明,请参阅 GitHub 上的完整示例。

本示例中使用的服务
  • Amazon Cognito Identity

  • Amazon S3

  • Amazon SNS

  • Amazon SQS

  • Amazon Textract

以下代码示例演示了操作流程:

  • 创建主题(FIFO 或非 FIFO)。

  • 针对主题订阅多个队列,并提供应用筛选条件的选项。

  • 将消息发布到主题。

  • 轮询队列中是否有收到的消息。

SDK for JavaScript (v3)
注意

查看 GitHub,了解更多信息。在 AWS 代码示例存储库中查找完整示例,了解如何进行设置和运行。

这是此场景的入口点。

import { SNSClient } from "@aws-sdk/client-sns"; import { SQSClient } from "@aws-sdk/client-sqs"; import { TopicsQueuesWkflw } from "./TopicsQueuesWkflw.js"; import { Prompter } from "@aws-doc-sdk-examples/lib/prompter.js"; export const startSnsWorkflow = () => { const snsClient = new SNSClient({}); const sqsClient = new SQSClient({}); const prompter = new Prompter(); const logger = console; const wkflw = new TopicsQueuesWkflw(snsClient, sqsClient, prompter, logger); wkflw.start(); };

前面的代码提供必要的依赖关系并启动此场景。下一节包含示例的大部分内容。

const toneChoices = [ { name: "cheerful", value: "cheerful" }, { name: "funny", value: "funny" }, { name: "serious", value: "serious" }, { name: "sincere", value: "sincere" }, ]; export class TopicsQueuesWkflw { // SNS topic is configured as First-In-First-Out isFifo = true; // Automatic content-based deduplication is enabled. autoDedup = false; snsClient; sqsClient; topicName; topicArn; subscriptionArns = []; /** * @type {{ queueName: string, queueArn: string, queueUrl: string, policy?: string }[]} */ queues = []; prompter; /** * @param {import('@aws-sdk/client-sns').SNSClient} snsClient * @param {import('@aws-sdk/client-sqs').SQSClient} sqsClient * @param {import('../../libs/prompter.js').Prompter} prompter * @param {import('../../libs/logger.js').Logger} logger */ constructor(snsClient, sqsClient, prompter, logger) { this.snsClient = snsClient; this.sqsClient = sqsClient; this.prompter = prompter; this.logger = logger; } async welcome() { await this.logger.log(MESSAGES.description); } async confirmFifo() { await this.logger.log(MESSAGES.snsFifoDescription); this.isFifo = await this.prompter.confirm({ message: MESSAGES.snsFifoPrompt, }); if (this.isFifo) { this.logger.logSeparator(MESSAGES.headerDedup); await this.logger.log(MESSAGES.deduplicationNotice); await this.logger.log(MESSAGES.deduplicationDescription); this.autoDedup = await this.prompter.confirm({ message: MESSAGES.deduplicationPrompt, }); } } async createTopic() { await this.logger.log(MESSAGES.creatingTopics); this.topicName = await this.prompter.input({ message: MESSAGES.topicNamePrompt, }); if (this.isFifo) { this.topicName += ".fifo"; this.logger.logSeparator(MESSAGES.headerFifoNaming); await this.logger.log(MESSAGES.appendFifoNotice); } const response = await this.snsClient.send( new CreateTopicCommand({ Name: this.topicName, Attributes: { FifoTopic: this.isFifo ? "true" : "false", ...(this.autoDedup ? { ContentBasedDeduplication: "true" } : {}), }, }), ); this.topicArn = response.TopicArn; await this.logger.log( MESSAGES.topicCreatedNotice .replace("${TOPIC_NAME}", this.topicName) .replace("${TOPIC_ARN}", this.topicArn), ); } async createQueues() { await this.logger.log(MESSAGES.createQueuesNotice); // Increase this number to add more queues. const maxQueues = 2; for (let i = 0; i < maxQueues; i++) { await this.logger.log(MESSAGES.queueCount.replace("${COUNT}", i + 1)); let queueName = await this.prompter.input({ message: MESSAGES.queueNamePrompt.replace( "${EXAMPLE_NAME}", i === 0 ? "good-news" : "bad-news", ), }); if (this.isFifo) { queueName += ".fifo"; await this.logger.log(MESSAGES.appendFifoNotice); } const response = await this.sqsClient.send( new CreateQueueCommand({ QueueName: queueName, Attributes: { ...(this.isFifo ? { FifoQueue: "true" } : {}) }, }), ); const { Attributes } = await this.sqsClient.send( new GetQueueAttributesCommand({ QueueUrl: response.QueueUrl, AttributeNames: ["QueueArn"], }), ); this.queues.push({ queueName, queueArn: Attributes.QueueArn, queueUrl: response.QueueUrl, }); await this.logger.log( MESSAGES.queueCreatedNotice .replace("${QUEUE_NAME}", queueName) .replace("${QUEUE_URL}", response.QueueUrl) .replace("${QUEUE_ARN}", Attributes.QueueArn), ); } } async attachQueueIamPolicies() { for (const [index, queue] of this.queues.entries()) { const policy = JSON.stringify( { Statement: [ { Effect: "Allow", Principal: { Service: "sns.amazonaws.com", }, Action: "sqs:SendMessage", Resource: queue.queueArn, Condition: { ArnEquals: { "aws:SourceArn": this.topicArn, }, }, }, ], }, null, 2, ); if (index !== 0) { this.logger.logSeparator(); } await this.logger.log(MESSAGES.attachPolicyNotice); console.log(policy); const addPolicy = await this.prompter.confirm({ message: MESSAGES.addPolicyConfirmation.replace( "${QUEUE_NAME}", queue.queueName, ), }); if (addPolicy) { await this.sqsClient.send( new SetQueueAttributesCommand({ QueueUrl: queue.queueUrl, Attributes: { Policy: policy, }, }), ); queue.policy = policy; } else { await this.logger.log( MESSAGES.policyNotAttachedNotice.replace( "${QUEUE_NAME}", queue.queueName, ), ); } } } async subscribeQueuesToTopic() { for (const [index, queue] of this.queues.entries()) { /** * @type {import('@aws-sdk/client-sns').SubscribeCommandInput} */ const subscribeParams = { TopicArn: this.topicArn, Protocol: "sqs", Endpoint: queue.queueArn, }; let tones = []; if (this.isFifo) { if (index === 0) { await this.logger.log(MESSAGES.fifoFilterNotice); } tones = await this.prompter.checkbox({ message: MESSAGES.fifoFilterSelect.replace( "${QUEUE_NAME}", queue.queueName, ), choices: toneChoices, }); if (tones.length) { subscribeParams.Attributes = { FilterPolicyScope: "MessageAttributes", FilterPolicy: JSON.stringify({ tone: tones, }), }; } } const { SubscriptionArn } = await this.snsClient.send( new SubscribeCommand(subscribeParams), ); this.subscriptionArns.push(SubscriptionArn); await this.logger.log( MESSAGES.queueSubscribedNotice .replace("${QUEUE_NAME}", queue.queueName) .replace("${TOPIC_NAME}", this.topicName) .replace("${TONES}", tones.length ? tones.join(", ") : "none"), ); } } async publishMessages() { const message = await this.prompter.input({ message: MESSAGES.publishMessagePrompt, }); let groupId; let deduplicationId; let choices; if (this.isFifo) { await this.logger.log(MESSAGES.groupIdNotice); groupId = await this.prompter.input({ message: MESSAGES.groupIdPrompt, }); if (this.autoDedup === false) { await this.logger.log(MESSAGES.deduplicationIdNotice); deduplicationId = await this.prompter.input({ message: MESSAGES.deduplicationIdPrompt, }); } choices = await this.prompter.checkbox({ message: MESSAGES.messageAttributesPrompt, choices: toneChoices, }); } await this.snsClient.send( new PublishCommand({ TopicArn: this.topicArn, Message: message, ...(groupId ? { MessageGroupId: groupId, } : {}), ...(deduplicationId ? { MessageDeduplicationId: deduplicationId, } : {}), ...(choices ? { MessageAttributes: { tone: { DataType: "String.Array", StringValue: JSON.stringify(choices), }, }, } : {}), }), ); const publishAnother = await this.prompter.confirm({ message: MESSAGES.publishAnother, }); if (publishAnother) { await this.publishMessages(); } } async receiveAndDeleteMessages() { for (const queue of this.queues) { const { Messages } = await this.sqsClient.send( new ReceiveMessageCommand({ QueueUrl: queue.queueUrl, }), ); if (Messages) { await this.logger.log( MESSAGES.messagesReceivedNotice.replace( "${QUEUE_NAME}", queue.queueName, ), ); console.log(Messages); await this.sqsClient.send( new DeleteMessageBatchCommand({ QueueUrl: queue.queueUrl, Entries: Messages.map((message) => ({ Id: message.MessageId, ReceiptHandle: message.ReceiptHandle, })), }), ); } else { await this.logger.log( MESSAGES.noMessagesReceivedNotice.replace( "${QUEUE_NAME}", queue.queueName, ), ); } } const deleteAndPoll = await this.prompter.confirm({ message: MESSAGES.deleteAndPollConfirmation, }); if (deleteAndPoll) { await this.receiveAndDeleteMessages(); } } async destroyResources() { for (const subscriptionArn of this.subscriptionArns) { await this.snsClient.send( new UnsubscribeCommand({ SubscriptionArn: subscriptionArn }), ); } for (const queue of this.queues) { await this.sqsClient.send( new DeleteQueueCommand({ QueueUrl: queue.queueUrl }), ); } if (this.topicArn) { await this.snsClient.send( new DeleteTopicCommand({ TopicArn: this.topicArn }), ); } } async start() { console.clear(); try { this.logger.logSeparator(MESSAGES.headerWelcome); await this.welcome(); this.logger.logSeparator(MESSAGES.headerFifo); await this.confirmFifo(); this.logger.logSeparator(MESSAGES.headerCreateTopic); await this.createTopic(); this.logger.logSeparator(MESSAGES.headerCreateQueues); await this.createQueues(); this.logger.logSeparator(MESSAGES.headerAttachPolicy); await this.attachQueueIamPolicies(); this.logger.logSeparator(MESSAGES.headerSubscribeQueues); await this.subscribeQueuesToTopic(); this.logger.logSeparator(MESSAGES.headerPublishMessage); await this.publishMessages(); this.logger.logSeparator(MESSAGES.headerReceiveMessages); await this.receiveAndDeleteMessages(); } catch (err) { console.error(err); } finally { await this.destroyResources(); } } }

无服务器示例

以下代码示例展示了如何实现一个 Lambda 函数,该函数接收因接收来自 SNS 队列的消息而触发的事件。该函数从事件参数检索消息并记录每条消息的内容。

适用于 JavaScript 的 SDK(v3)
注意

查看 GitHub,了解更多信息。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。

通过 JavaScript 将 SQS 事件与 Lambda 结合使用。

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 exports.handler = async (event, context) => { for (const message of event.Records) { await processMessageAsync(message); } console.info("done"); }; async function processMessageAsync(message) { try { console.log(`Processed message ${message.body}`); // TODO: Do interesting work based on the new message await Promise.resolve(1); //Placeholder for actual async work } catch (err) { console.error("An error occurred"); throw err; } }

通过 TypeScript 将 SQS 事件与 Lambda 结合使用。

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 import { SQSEvent, Context, SQSHandler, SQSRecord } from "aws-lambda"; export const functionHandler: SQSHandler = async ( event: SQSEvent, context: Context ): Promise<void> => { for (const message of event.Records) { await processMessageAsync(message); } console.info("done"); }; async function processMessageAsync(message: SQSRecord): Promise<any> { try { console.log(`Processed message ${message.body}`); // TODO: Do interesting work based on the new message await Promise.resolve(1); //Placeholder for actual async work } catch (err) { console.error("An error occurred"); throw err; } }

以下代码示例展示了如何为接收来自 SQS 队列的事件的 Lambda 函数实现部分批处理响应。该函数在响应中报告批处理项目失败,并指示 Lambda 稍后重试这些消息。

适用于 JavaScript 的 SDK(v3)
注意

查看 GitHub,了解更多信息。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。

报告使用 JavaScript 进行 Lambda SQS 批处理项目失败。

// Node.js 20.x Lambda runtime, AWS SDK for Javascript V3 export const handler = async (event, context) => { const batchItemFailures = []; for (const record of event.Records) { try { await processMessageAsync(record, context); } catch (error) { batchItemFailures.push({ itemIdentifier: record.messageId }); } } return { batchItemFailures }; }; async function processMessageAsync(record, context) { if (record.body && record.body.includes("error")) { throw new Error("There is an error in the SQS Message."); } console.log(`Processed message: ${record.body}`); }

报告使用 TypeScript 进行 Lambda SQS 批处理项目失败。

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 import { SQSEvent, SQSBatchResponse, Context, SQSBatchItemFailure, SQSRecord } from 'aws-lambda'; export const handler = async (event: SQSEvent, context: Context): Promise<SQSBatchResponse> => { const batchItemFailures: SQSBatchItemFailure[] = []; for (const record of event.Records) { try { await processMessageAsync(record); } catch (error) { batchItemFailures.push({ itemIdentifier: record.messageId }); } } return {batchItemFailures: batchItemFailures}; }; async function processMessageAsync(record: SQSRecord): Promise<void> { if (record.body && record.body.includes("error")) { throw new Error('There is an error in the SQS Message.'); } console.log(`Processed message ${record.body}`); }