Module: AWS::Flow::Core

Defined in:
aws-flow-ruby/aws-flow/lib/aws/flow/tasks.rb,
aws-flow-ruby/aws-flow/lib/aws/flow/fiber.rb,
aws-flow-ruby/aws-flow/lib/aws/flow/future.rb,
aws-flow-ruby/aws-flow/lib/aws/flow/simple_dfa.rb,
aws-flow-ruby/aws-flow/lib/aws/flow/flow_utils.rb,
aws-flow-ruby/aws-flow/lib/aws/flow/async_scope.rb,
aws-flow-ruby/aws-flow/lib/aws/flow/implementation.rb,
aws-flow-ruby/aws-flow/lib/aws/flow/async_backtrace.rb,
aws-flow-ruby/aws-flow/lib/aws/flow/begin_rescue_ensure.rb

Defined Under Namespace

Classes: AlreadySetException, BeginRescueEnsure, ExternalConditionVariable, ExternalFuture, Future, NoContextException

Instance Method Summary (collapse)

Instance Method Details

- (Future) daemon_task(&block)

Returns The tasks result, which is a Future.

Parameters:

  • block

    The block of code to be executed when the daemon task is run.

Returns:

Raises:



56
57
58
59
60
61
62
63
64
# File 'aws-flow-ruby/aws-flow/lib/aws/flow/implementation.rb', line 56

def daemon_task(&block)
  fiber = ::Fiber.current
  raise NoContextException unless fiber.respond_to? :__context__
  context = fiber.__context__
  t = DaemonTask.new(nil, &block)
  task_context = TaskContext.new(:parent => context.get_closest_containing_scope, :task => t)
  context << t
  t.result
end

- (Object) error_handler(&block)

Creates a new error handler for asynchronous tasks.

Parameters:

Returns:

  • The result of the begin statement if there is no error; otherwise the value of the return statement.

Raises:



97
98
99
100
101
102
103
104
105
106
# File 'aws-flow-ruby/aws-flow/lib/aws/flow/implementation.rb', line 97

def error_handler(&block)
  fiber = ::Fiber.current
  raise NoContextException unless fiber.respond_to? :__context__
  context = fiber.__context__
  begin_rescue_ensure = BeginRescueEnsure.new(:parent => context.get_closest_containing_scope)
  bge = BeginRescueEnsureWrapper.new(block, begin_rescue_ensure)
  context << bge
  context << begin_rescue_ensure
  begin_rescue_ensure
end

- (nil) external_task(&block)

Parameters:

  • block

    The block of code to be executed when the external task is run.

Returns:

  • (nil)

Raises:



75
76
77
78
79
80
81
82
83
# File 'aws-flow-ruby/aws-flow/lib/aws/flow/implementation.rb', line 75

def external_task(&block)
  fiber = ::Fiber.current
  raise NoContextException unless fiber.respond_to? :__context__
  context = fiber.__context__
  t = ExternalTask.new(:parent => context.get_closest_containing_scope, &block)
  task_context = TaskContext.new(:parent => context.get_closest_containing_scope, :task => t)
  context << t
  nil
end

- (Future) task(future = nil, &block)

Returns The tasks result, which is a Future.

Parameters:

  • future (Future) (defaults to: nil)

    Unused; defaults to nil.

  • block

    The block of code to be executed when the task is run.

Returns:

Raises:



36
37
38
39
40
41
42
43
44
# File 'aws-flow-ruby/aws-flow/lib/aws/flow/implementation.rb', line 36

def task(future = nil, &block)
  fiber = ::Fiber.current
  raise NoContextException unless fiber.respond_to? :__context__
  context = fiber.__context__
  t = Task.new(nil, &block)
  task_context = TaskContext.new(:parent => context.get_closest_containing_scope, :task => t)
  context << t
  t.result
end

- (Array<Future>) timed_wait_for_all(timeout, *futures)

Blocks until all of the arguments are set or until timeout expires

Parameters:

  • futures (Array<Future>)

    A list of futures to wait for. The function will return only when all of them are set.

  • timeout (Integer)

    The timeout value after which it will raise a Timeout::Error

Returns:

  • (Array<Future>)

    A list of the set futures, in the order of being set.



180
181
182
# File 'aws-flow-ruby/aws-flow/lib/aws/flow/implementation.rb', line 180

def timed_wait_for_all(timeout, *futures)
  timed_wait_for_function(timeout, lambda {|result, future_list| result.size == future_list.size}, futures)
end

- (Array<Future>) timed_wait_for_any(timeout, *futures)

Blocks until any of the arguments are set.

Parameters:

  • futures (Array<Future>)

    A list of futures to wait for. The function will return when at least one of these is set.

  • timeout (Integer)

    The timeout value after which it will raise a Timeout::Error

Returns:

  • (Array<Future>)

    A list of the set futures, in the order of being set.



165
166
167
# File 'aws-flow-ruby/aws-flow/lib/aws/flow/implementation.rb', line 165

def timed_wait_for_any(timeout, *futures)
  timed_wait_for_function(timeout, lambda {|result, future_list| result.length >= 1 }, futures)
end

- (Array<Future>) timed_wait_for_function(timeout, function, *futures)

Waits for the passed-in function to complete, setting values for the provided futures when it does.

Parameters:

  • function

    The function to wait for.

  • futures (Array<Future>)

    A list of futures to provide values for when the function completes.

  • timeout (Integer)

    The timeout value after which it will raise a Timeout::Error

Returns:

  • (Array<Future>)

    A list of the set futures, in the order of being set.



198
199
200
# File 'aws-flow-ruby/aws-flow/lib/aws/flow/implementation.rb', line 198

def timed_wait_for_function(timeout, function, *futures)
  wait_for_function_helper(timeout, function, *futures)
end

- (Array<Future>) wait_for_all(*futures)

Blocks until all of the arguments are set.

Parameters:

  • futures (Array<Future>)

    A list of futures to wait for. The function will return only when all of them are set.

Returns:

  • (Array<Future>)

    A list of the set futures, in the order of being set.



150
151
152
# File 'aws-flow-ruby/aws-flow/lib/aws/flow/implementation.rb', line 150

def wait_for_all(*futures)
  wait_for_function(lambda {|result, future_list| result.size == future_list.size}, futures)
end

- (Array<Future>) wait_for_any(*futures)

Blocks until any of the arguments are set.

Parameters:

  • futures (Array<Future>)

    A list of futures to wait for. The function will return when at least one of these is set.

Returns:

  • (Array<Future>)

    A list of the set futures, in the order of being set.



138
139
140
# File 'aws-flow-ruby/aws-flow/lib/aws/flow/implementation.rb', line 138

def wait_for_any(*futures)
  wait_for_function(lambda {|result, future_list| result.length >= 1 }, futures)
end

- (Array<Future>) wait_for_function(function, *futures)

Waits for the passed-in function to complete, setting values for the provided futures when it does.

Parameters:

  • function

    The function to wait for.

  • futures (Array<Future>)

    A list of futures to provide values for when the function completes.

Returns:

  • (Array<Future>)

    A list of the set futures, in the order of being set.



126
127
128
# File 'aws-flow-ruby/aws-flow/lib/aws/flow/implementation.rb', line 126

def wait_for_function(function, *futures)
  wait_for_function_helper(nil, function, *futures)
end

- (Object) wait_for_function_helper(timeout, function, *futures)

Helper method to refactor away the common implementation of wait_for_function and timed_wait_for_function.



204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
# File 'aws-flow-ruby/aws-flow/lib/aws/flow/implementation.rb', line 204

def wait_for_function_helper(timeout, function, *futures)
  futures.flatten!

  f = futures.select { |x| x.is_a?(ExternalFuture) }

  if f.size > 0 && f.size != futures.size
    raise ArgumentError, "The futures array must contain either all "\
      "objects of Future or all objects of ExternalFuture"
  end

  conditional = f.size == 0 ? FiberConditionVariable.new :
    ExternalConditionVariable.new

  return nil if futures.empty?
  result = futures.select(&:set?)
  return futures.find(&:set?)if function.call(result, futures)
  futures.each do |f|
    f.on_set do |set_one|
      result << set_one
      conditional.broadcast if function.call(result, futures)
    end
  end

  if conditional.is_a?(FiberConditionVariable)
    conditional.wait
  else
    conditional.wait(timeout)
    raise Timeout::Error.new unless function.call(result, futures)
  end
  result
end