Executing Tasks Asynchronously

You can schedule tasks to run asynchronously in a number of different ways:

  • Use the GenericClient class method send_async to schedule an asynchronous task using an activity or workflow client.
  • Use the Core instance method task to execute any block of code asynchronously in the context of the AWS Flow Framework for Ruby.

Whichever method you use, the Framework will return an instance of the Core::Future class, which is used to determine when the asynchronous task has been completed.

Waiting for a Future#

A Core::Future provides a mechanism for determining whether or not the task that it is tracking has completed. When the task is complete, the Future becomes set. The Future class itself provides three methods that can be used to determine when the task has completed:

  • get, which blocks until the future is ready.
  • set?, which returns true when the future is ready.
  • on_set, which takes a callback block that will be executed once the task has been completed.

There are also a number of methods in the Core namespace that operate on Futures:

  • Core#wait_for_all takes a list of Future instances and does not return until all of them are set.
  • Core#wait_for_any takes a list of Future instances and returns as soon as any one of them is set.

To obtain a Future, you can use either the client's send_async method or the Core namespace's task method.

Scheduling an Asynchronous Task Using a Workflow or Activity Client#

To schedule an asynchronous task using a workflow or activity client, use the client's GenericClient#send_async method (provided by the parent class, GenericClient) method to schedule the activity. send_async returns a Core::Future immediately.

def make_booking options
  puts "Workflow has started\n" unless is_replaying?
  futures = []

  if options[:reserve_car]
    puts "Reserving a car for customer\n" unless is_replaying?
    futures << client.send_async(:reserve_car, options[:request_id])
  end

  if options[:reserve_air]
    puts "Reserving air ticket\n" unless is_replaying?
    futures << client.send_async(:reserve_air, options[:customer_id])
  end

  puts "Waiting for reservation to complete\n" unless is_replaying?
  wait_for_all(futures)

  client.send_confirmation(options[:customer_id])
  puts "Workflow has completed\n" unless is_replaying?
end

In the preceding code, send_async is used to schedule two asynchronous activities, and then wait_for_all is used to wait for both activities to complete before scheduling a third activity.

Executing a Block Asynchronously#

Using the Core#task method, you can execute any block of code asynchronously. Like send_async, the task method returns a Core::Future immediately and then begins executing the asynchronous code. When the code has completed, the returned Future instance will be set.

Handling Errors in Asynchronous Code#

Asynchronous code requires special consideration when handling errors. The AWS Flow Framework for Ruby provides an Core#error_handler method that provides a Ruby-like way, using begin/rescue/end-like semantic for error handling. You pass error_handler a block of the following form:

error_handler do |t|
  t.begin do
    my_activity_client.send_async :my_activity
  end
  t.rescue ActivityTaskTimedOutException do |e|
    # handle timeout
  end
  t.rescue ActivityTaskFailedException do |e|
    # handle failure
  end
  t.ensure do
    # clean up and stuff
  end
end

See Handling Errors for more information about handling errors in the AWS Flow Framework for Ruby.

You can also use error_handler to provide custom logic for retry attempts on failed tasks. For more information, see Providing your own Retry Logic.

Additional Examples#

For additional examples of working with asynchronous tasks, see the AWS Flow Framework for Ruby Samples project. Many of the recipes and samples demonstrate the use of asynchronous tasks to create various workflow patterns.

The AWS Flow Framework for Ruby samples are hosted on GitHub at: