You are viewing documentation for version 1 of the AWS SDK for Ruby. Version 2 documentation can be found here.
Class: AWS::SQS::Queue
- Inherits:
-
Object
- Object
- AWS::SQS::Queue
- Defined in:
- lib/aws/sqs/queue.rb
Overview
Represents an Amazon SQS Queue.
Defined Under Namespace
Classes: SentMessage
Constant Summary
- DEFAULT_POLL_INTERVAL =
Deprecated.
No longer used by #poll
The default number of seconds to wait between polling requests for new messages.
1
- DEFAULT_WAIT_TIME_SECONDS =
The default number of seconds to pass in as the SQS long polling value (
:wait_time_seconds
) in #receive_message. 15
Instance Attribute Summary collapse
-
#url ⇒ String
readonly
The queue URL.
Instance Method Summary collapse
-
#==(other) ⇒ Boolean
(also: #eql?)
Returns true if the other queue has the same url.
-
#approximate_number_of_messages ⇒ Integer
(also: #visible_messages)
The approximate number of visible messages in a queue.
-
#approximate_number_of_messages_delayed ⇒ Integer
Returns an approximate count of messages delayed.
-
#approximate_number_of_messages_not_visible ⇒ Integer
(also: #invisible_messages)
The approximate number of messages that are not timed-out and not deleted.
-
#arn ⇒ String
The queue's Amazon resource name (ARN).
-
#batch_change_visibility(*args) ⇒ Object
-
#batch_delete(*messages) ⇒ nil
-
#batch_send(*messages) ⇒ Array<SentMessage>
Sends a batch of up to 10 messages in a single request.
-
#created_timestamp ⇒ Time
The time when the queue was created.
-
#delay_seconds ⇒ Integer
Gets the current default delay for messages sent to the queue.
-
#delay_seconds=(seconds) ⇒ Object
Sets the default delay for messages sent to the queue.
-
#delete ⇒ nil
Deletes the queue, regardless of whether it is empty.
-
#exists? ⇒ Boolean
True if the queue exists.
-
#last_modified_timestamp ⇒ Time
The time when the queue was last changed.
-
#maximum_message_size ⇒ Integer
The limit of how many bytes a message can contain before Amazon SQS rejects it.
-
#maximum_message_size=(size) ⇒ Object
Sets the maximum message size for the queue.
-
#message_retention_period ⇒ Integer
The number of seconds Amazon SQS retains a message.
-
#message_retention_period=(period) ⇒ Object
Sets the message retention period for the queue.
-
#policy ⇒ Policy
Returns the current queue policy if there is one.
-
#policy=(policy) ⇒ nil
Set the policy on this queue.
-
#poll(opts = {}) {|message| ... } ⇒ nil
Polls continually for messages.
-
#receive_message(opts = {}) {|message| ... } ⇒ ReceivedMessage
(also: #receive_messages)
Retrieves one or more messages.
-
#send_message(body, options = {}) ⇒ SentMessage
Delivers a message to this queue.
-
#visibility_timeout ⇒ Integer
Returns the visibility timeout for the queue.
-
#visibility_timeout=(timeout) ⇒ Object
Sets the visibility timeout for the queue.
-
#wait_time_seconds ⇒ Integer
Gets the number of seconds the service will wait for a response when requesting a new message.
-
#wait_time_seconds=(seconds) ⇒ Object
Sets the number of seconds that the service should wait for a response when requesting a new message.
Instance Attribute Details
#url ⇒ String (readonly)
Returns The queue URL.
48 49 50 |
# File 'lib/aws/sqs/queue.rb', line 48 def url @url end |
Instance Method Details
#==(other) ⇒ Boolean Also known as: eql?
Returns true if the other queue has the same url.
684 685 686 |
# File 'lib/aws/sqs/queue.rb', line 684 def ==(other) other.kind_of?(Queue) and other.url == url end |
#approximate_number_of_messages ⇒ Integer Also known as: visible_messages
Returns The approximate number of visible messages in a queue. For more information, see Resources Required to Process Messages in the Amazon SQS Developer Guide.
319 320 321 |
# File 'lib/aws/sqs/queue.rb', line 319 def get_attribute("ApproximateNumberOfMessages").to_i end |
#approximate_number_of_messages_delayed ⇒ Integer
Returns an approximate count of messages delayed.
428 429 430 |
# File 'lib/aws/sqs/queue.rb', line 428 def get_attribute("ApproximateNumberOfMessagesDelayed").to_i end |
#approximate_number_of_messages_not_visible ⇒ Integer Also known as: invisible_messages
Returns The approximate number of messages that are not timed-out and not deleted. For more information, see Resources Required to Process Messages in the Amazon SQS Developer Guide.
328 329 330 |
# File 'lib/aws/sqs/queue.rb', line 328 def get_attribute("ApproximateNumberOfMessagesNotVisible").to_i end |
#arn ⇒ String
Returns The queue's Amazon resource name (ARN).
433 434 435 |
# File 'lib/aws/sqs/queue.rb', line 433 def arn @arn ||= get_attribute("QueueArn") end |
#batch_change_visibility(visibility_timeout, *messages) ⇒ nil #batch_change_visibility(*messages_with_timeouts) ⇒ nil
649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 |
# File 'lib/aws/sqs/queue.rb', line 649 def batch_change_visibility *args args = args.flatten if args.first.is_a?(Integer) timeout = args.shift = args.collect{|m| [m, timeout] } else = args.collect{|m| [m[:message], m[:visibility_timeout]] } end entries = [] .each do |msg,timeout| handle = msg.is_a?(ReceivedMessage) ? msg.handle : msg entries << { :id => entries.size.to_s, :receipt_handle => handle, :visibility_timeout => timeout, } end response = client.( :queue_url => url, :entries => entries) failures = batch_failures(entries, response) raise Errors::BatchChangeVisibilityError.new(failures) unless failures.empty? nil end |
#batch_delete(*messages) ⇒ nil
588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 |
# File 'lib/aws/sqs/queue.rb', line 588 def batch_delete * entries = [] .flatten.each_with_index do |msg,n| handle = msg.is_a?(ReceivedMessage) ? msg.handle : msg entries << { :id => n.to_s, :receipt_handle => handle } end response = client.( :queue_url => url, :entries => entries) failures = batch_failures(entries, response) raise Errors::BatchDeleteError.new(failures) unless failures.empty? nil end |
#batch_send(*messages) ⇒ Array<SentMessage>
Sends a batch of up to 10 messages in a single request.
queue.('message-1', 'message-2')
You can also set an optional delay for all of the messages:
# delay all messages 15 minutes
queue.batch_send(msg1, msg2, :delay_seconds => 900)
If you need to set a custom delay for each message you can pass hashes:
= []
<< { :message_body => 'msg1', :delay_seconds => 60 }
<< { :message_body => 'msg2', :delay_seconds => 30 }
queue.batch_send()
533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 |
# File 'lib/aws/sqs/queue.rb', line 533 def batch_send * entries = .flatten unless entries.first.is_a?(Hash) = entries.last.is_a?(Hash) ? entries.pop : {} entries = entries.collect{|msg| { :message_body => msg } } if delay = [:delay_seconds] entries.each {|entry| entry[:delay_seconds] = delay } end end entries.each_with_index {|entry,n| entry[:id] = n.to_s } client_opts = {} client_opts[:queue_url] = url client_opts[:entries] = entries response = client.(client_opts) failed = batch_failures(entries, response, true) checksum_failed = entries, response sent = response[:successful].collect do |sent| msg = SentMessage.new msg. = sent[:message_id] msg.md5 = sent[:md5_of_message_body] msg end if !failed.empty? && !checksum_failed.empty? send_error = Errors::BatchSendError.new(sent, failed) checksum_error = Errors::ChecksumError.new(checksum_failed) raise Errors::BatchSendMultiError.new send_error, checksum_error elsif !failed.empty? raise Errors::BatchSendError.new(sent, failed) unless failed.empty? elsif !checksum_failed.empty? raise Errors::ChecksumError.new(checksum_failed) end sent end |
#created_timestamp ⇒ Time
Returns The time when the queue was created.
356 357 358 |
# File 'lib/aws/sqs/queue.rb', line 356 def Time.at(get_attribute("CreatedTimestamp").to_i) end |
#delay_seconds ⇒ Integer
Returns Gets the current default delay for messages sent to the queue.
402 403 404 |
# File 'lib/aws/sqs/queue.rb', line 402 def delay_seconds get_attribute("DelaySeconds").to_i end |
#delay_seconds=(seconds) ⇒ Object
Sets the default delay for messages sent to the queue.
408 409 410 |
# File 'lib/aws/sqs/queue.rb', line 408 def delay_seconds= seconds set_attribute("DelaySeconds", seconds.to_s) end |
#delete ⇒ nil
Deletes the queue, regardless of whether it is empty.
When you delete a queue, the deletion process takes up to 60 seconds. Requests you send involving that queue during the 60 seconds might succeed. For example, calling #send_message might succeed, but after the 60 seconds, the queue and that message you sent no longer exist.
Also, when you delete a queue, you must wait at least 60 seconds before creating a queue with the same name.
67 68 69 70 |
# File 'lib/aws/sqs/queue.rb', line 67 def delete client.delete_queue(:queue_url => url) nil end |
#exists? ⇒ Boolean
This may raise an exception if you don't have permission to access the queue attributes. Also, it may return true for up to 60 seconds after a queue has been deleted.
Returns True if the queue exists.
443 444 445 446 447 448 449 450 |
# File 'lib/aws/sqs/queue.rb', line 443 def exists? client.get_queue_attributes(:queue_url => url, :attribute_names => ["QueueArn"]) rescue Errors::NonExistentQueue, Errors::InvalidAddress false else true end |
#last_modified_timestamp ⇒ Time
Returns The time when the queue was last changed.
361 362 363 |
# File 'lib/aws/sqs/queue.rb', line 361 def Time.at(get_attribute("LastModifiedTimestamp").to_i) end |
#maximum_message_size ⇒ Integer
Returns The limit of how many bytes a message can contain before Amazon SQS rejects it.
367 368 369 |
# File 'lib/aws/sqs/queue.rb', line 367 def get_attribute("MaximumMessageSize").to_i end |
#maximum_message_size=(size) ⇒ Object
Sets the maximum message size for the queue.
378 379 380 |
# File 'lib/aws/sqs/queue.rb', line 378 def (size) set_attribute("MaximumMessageSize", size.to_s) end |
#message_retention_period ⇒ Integer
Returns The number of seconds Amazon SQS retains a message.
384 385 386 |
# File 'lib/aws/sqs/queue.rb', line 384 def get_attribute("MessageRetentionPeriod").to_i end |
#message_retention_period=(period) ⇒ Object
Sets the message retention period for the queue
395 396 397 398 |
# File 'lib/aws/sqs/queue.rb', line 395 def (period) set_attribute("MessageRetentionPeriod", period.to_s) period end |
#policy ⇒ Policy
Returns the current queue policy if there is one.
Returns nil
otherwise.
470 471 472 473 474 475 476 477 478 479 |
# File 'lib/aws/sqs/queue.rb', line 470 def policy if policy_json = get_attribute('Policy') policy = SQS::Policy.from_json(policy_json) policy.extend(PolicyProxy) policy.queue = self policy else nil end end |
#policy=(policy) ⇒ nil
Set the policy on this queue.
If you pass nil or an empty string then it will have the same effect as deleting the policy.
492 493 494 495 496 497 498 499 500 |
# File 'lib/aws/sqs/queue.rb', line 492 def policy= policy policy_string = case policy when nil, '' then '' when String then policy else policy.to_json end set_attribute('Policy', policy_string) nil end |
#poll(opts = {}) {|message| ... } ⇒ nil
Polls continually for messages. For example, you can use this to poll indefinitely:
queue.poll { |msg| puts msg.body }
Or, to poll indefinitely for the first message and then continue polling until no message is received for a period of at least ten seconds:
queue.poll(:initial_timeout => false,
:idle_timeout => 10) { |msg| puts msg.body }
As with the block form of #receive_message, this method automatically deletes the message then the block exits normally.
292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 |
# File 'lib/aws/sqs/queue.rb', line 292 def poll(opts = {}, &block) opts[:limit] = opts.delete(:batch_size) if opts.key?(:batch_size) opts[:wait_time_seconds] = DEFAULT_WAIT_TIME_SECONDS unless opts.has_key?(:wait_time_seconds) = Time.now got_first = false loop do got_msg = false (opts) do || got_msg = got_first = true = Time.now yield() end unless got_msg return if hit_timeout?(got_first, , opts) end end nil end |
#receive_message(opts = {}) {|message| ... } ⇒ ReceivedMessage Also known as: receive_messages
Due to the distributed nature of the queue, a weighted random set of machines is sampled on a ReceiveMessage call. That means only the messages on the sampled machines are returned. If the number of messages in the queue is small (less than 1000), it is likely you will get fewer messages than you requested per call to #receive_message. If the number of messages in the queue is extremely small, you might not receive any messages. To poll continually for messages, use the #poll method, which automatically retries the request after a configurable delay.
Retrieves one or more messages. When a block is given, each message is yielded to the block and then deleted as long as the block exits normally. When no block is given, you must delete the message yourself using ReceivedMessage#delete.
200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 |
# File 'lib/aws/sqs/queue.rb', line 200 def (opts = {}, &block) resp = client.(receive_opts(opts)) failed = resp raise Errors::ChecksumError.new(failed) unless failed.empty? = resp[:messages].map do |m| ReceivedMessage.new(self, m[:message_id], m[:receipt_handle], :body => m[:body], :md5 => m[:md5_of_body], :request_id => (resp[:response_metadata] || {})[:request_id], :attributes => m[:attributes], :message_attributes => m[:message_attributes]) end if block (, block) elsif opts[:limit] && opts[:limit] != 1 else .first end end |
#send_message(body, options = {}) ⇒ SentMessage
Delivers a message to this queue.
119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 |
# File 'lib/aws/sqs/queue.rb', line 119 def body, = {} client_opts = .dup client_opts[:queue_url] = url client_opts[:message_body] = body response = client.(client_opts) msg = SentMessage.new msg. = response[:message_id] msg.request_id = (response[:response_metadata] || {})[:request_id] msg.md5 = response[:md5_of_message_body] body, msg.md5 msg end |
#visibility_timeout ⇒ Integer
Returns the visibility timeout for the queue. For more information about visibility timeout, see Visibility Timeout in the Amazon SQS Developer Guide.
337 338 339 |
# File 'lib/aws/sqs/queue.rb', line 337 def visibility_timeout get_attribute("VisibilityTimeout").to_i end |
#visibility_timeout=(timeout) ⇒ Object
Sets the visibility timeout for the queue.
350 351 352 353 |
# File 'lib/aws/sqs/queue.rb', line 350 def visibility_timeout=(timeout) set_attribute("VisibilityTimeout", timeout.to_s) timeout end |
#wait_time_seconds ⇒ Integer
Returns Gets the number of seconds the service will wait for a response when requesting a new message
415 416 417 |
# File 'lib/aws/sqs/queue.rb', line 415 def wait_time_seconds get_attribute("ReceiveMessageWaitTimeSeconds").to_i end |
#wait_time_seconds=(seconds) ⇒ Object
Sets the number of seconds that the service should wait for a response when requesting a new message
423 424 425 |
# File 'lib/aws/sqs/queue.rb', line 423 def wait_time_seconds= seconds set_attribute("ReceiveMessageWaitTimeSeconds", seconds.to_s) end |