Class: AWS::Flow::GenericActivityClient

Inherits:
GenericClient show all
Defined in:
aws-flow-ruby/aws-flow/lib/aws/decider/activity.rb

Overview

A generic activity client that can be used to perform standard activity actions.

Instance Attribute Summary (collapse)

Attributes inherited from GenericClient

#option_map

Class Method Summary (collapse)

Instance Method Summary (collapse)

Methods inherited from GenericClient

#decision_context, #exponential_retry, #reconfigure, #retry, #send_async, #with_opts

Constructor Details

- (GenericActivityClient) initialize(decision_helper, options)

Creates a new GenericActivityClient instance.

Parameters:

  • decision_helper (DecisionHelper)

    The decision helper to use for the activity client.

  • options (ActivityOptions)

    The activity options to set for the activity client.



74
75
76
77
78
79
80
81
# File 'aws-flow-ruby/aws-flow/lib/aws/decider/activity.rb', line 74

def initialize(decision_helper, options)
  @decision_helper = decision_helper
  @options = options
  @activity_option_map = @decision_helper.activity_options
  @failure_map = {}
  @data_converter ||= YAMLDataConverter.new
  super
end

Dynamic Method Handling

This class handles dynamic methods through the method_missing method

- (Object) method_missing(method_name, *args, &block)

Registers and schedules a new activity type, provided a name and block of options.

Parameters:

  • method_name

    Required. The name of the activity type to define.

  • args

    Required. Arguments for the method provided in method_name.

  • block

    Required. A block of ActivityOptions to use when registering the new ActivityType. This can be set to an empty block, in which case, the default activity options will be used.



95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# File 'aws-flow-ruby/aws-flow/lib/aws/decider/activity.rb', line 95

def method_missing(method_name, *args, &block)
  options = Utilities::interpret_block_for_options(ActivityOptions, block)
  client_options = Utilities::client_options_from_method_name(method_name, @options)

  options = Utilities::merge_all_options(client_options,
                              @activity_option_map[method_name.to_sym],
                              @option_map[method_name.to_sym],
                              options
                              )
  new_options = ActivityOptions.new(options)

  activity_type = ActivityType.new("#{new_options.prefix_name}.#{method_name.to_s}", new_options.version, new_options.get_registration_options)
  if new_options._exponential_retry
    retry_function = new_options._exponential_retry.retry_function || FlowConstants.exponential_retry_function
    new_options._exponential_retry.return_on_start ||= new_options.return_on_start
    future = _retry_with_options(lambda { self.schedule_activity(activity_type.name, activity_type, args, new_options ) }, retry_function, new_options._exponential_retry, args)
    return future if new_options.return_on_start
    result = Utilities::drill_on_future(future)
  else
    result = schedule_activity(activity_type.name, activity_type, args, new_options)
  end
  result
end

Instance Attribute Details

- (Object) data_converter

The data converter used for serializing/deserializing data when sending requests to and receiving results from workflow executions of this workflow type. By default, this is YAMLDataConverter.



45
46
47
# File 'aws-flow-ruby/aws-flow/lib/aws/decider/activity.rb', line 45

def data_converter
  @data_converter
end

- (Object) decision_helper

The decision helper used by the activity client.



48
49
50
# File 'aws-flow-ruby/aws-flow/lib/aws/decider/activity.rb', line 48

def decision_helper
  @decision_helper
end

- (Object) options

A hash of ActivityRuntimeOptions for the activity client.



51
52
53
# File 'aws-flow-ruby/aws-flow/lib/aws/decider/activity.rb', line 51

def options
  @options
end

Class Method Details

+ (Object) default_option_class

Returns the default option class for the activity client, which is ActivityRuntimeOptions.



54
# File 'aws-flow-ruby/aws-flow/lib/aws/decider/activity.rb', line 54

def self.default_option_class; ActivityRuntimeOptions; end

Instance Method Details

- (Object) handle_activity_task_canceled(event)

A handler for the ActivityClassCanceled event.

Parameters:

  • event (AWS::SimpleWorkflow::HistoryEvent)

    The event data.



147
148
149
150
151
152
153
154
155
156
157
# File 'aws-flow-ruby/aws-flow/lib/aws/decider/activity.rb', line 147

def handle_activity_task_canceled(event)
  activity_id = @decision_helper.get_activity_id(event.attributes[:scheduled_event_id])
  @decision_helper[activity_id].consume(:handle_cancellation_event)
  if @decision_helper[activity_id].done?
    open_request = @decision_helper.scheduled_activities.delete(activity_id)
    exception = CancellationException.new("Cancelled from ActivityTaskCanceledEvent", nil)
    if ! open_request.nil?
      open_request.completion_handle.fail(exception)
    end
  end
end

- (Object) handle_activity_task_completed(event)

A handler for the ActivityClassCompleted event.

Parameters:

  • event (AWS::SimpleWorkflow::HistoryEvent)

    The event data.



222
223
224
225
226
227
228
229
230
231
# File 'aws-flow-ruby/aws-flow/lib/aws/decider/activity.rb', line 222

def handle_activity_task_completed(event)
  scheduled_id = event.attributes[:scheduled_event_id]
  activity_id = @decision_helper.activity_scheduling_event_id_to_activity_id[scheduled_id]
  @decision_helper[activity_id].consume(:handle_completion_event)
  if @decision_helper[activity_id].done?
    open_request = @decision_helper.scheduled_activities.delete(activity_id)
    open_request.result = event.attributes[:result]
    open_request.completion_handle.complete
  end
end

- (Object) handle_activity_task_failed(event)

A handler for the ActivityTaskFailed event.

Parameters:

  • event (ActivityClassFailed)

    The event data.



183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
# File 'aws-flow-ruby/aws-flow/lib/aws/decider/activity.rb', line 183

def handle_activity_task_failed(event)
  attributes = event.attributes
  activity_id = @decision_helper.get_activity_id(attributes[:scheduled_event_id])
  @decision_helper[activity_id].consume(:handle_completion_event)
  open_request_info = @decision_helper.scheduled_activities.delete(activity_id)
  reason = attributes[:reason] if attributes.keys.include? :reason
  reason ||= "The activity which failed did not provide a reason"
  details = attributes[:details] if attributes.keys.include? :details
  details ||= "The activity which failed did not provide details"

  # TODO consider adding user_context to open request, and adding it here
  # @decision_helper[@decision_helper.activity_scheduling_event_id_to_activity_id[event.attributes.scheduled_event_id]].attributes[:options].data_converter
  failure = ActivityTaskFailedException.new(event.id, activity_id, reason, details)
  open_request_info.completion_handle.fail(failure)
end

- (Object) handle_activity_task_timed_out(event)

A handler for the ActivityClassTimedOut event.

Parameters:

  • event (AWS::SimpleWorkflow::HistoryEvent)

    The event data.



164
165
166
167
168
169
170
171
172
173
174
175
176
# File 'aws-flow-ruby/aws-flow/lib/aws/decider/activity.rb', line 164

def handle_activity_task_timed_out(event)
  activity_id = @decision_helper.get_activity_id(event.attributes[:scheduled_event_id])
  activity_state_machine = @decision_helper[activity_id]
  activity_state_machine.consume(:handle_completion_event)
  if activity_state_machine.done?
    open_request = @decision_helper.scheduled_activities.delete(activity_id)
    if ! open_request.nil?
      timeout_type = event.attributes[:timeout_type]
      failure = ActivityTaskTimedOutException.new(event.id, activity_id, timeout_type, "Time out")
      open_request.completion_handle.fail(failure)
    end
  end
end

- (Object) handle_schedule_activity_task_failed(event)

A handler for the ScheduleActivityTaskFailed event.

Parameters:

  • event (AWS::SimpleWorkflow::HistoryEvent)

    The event data.



204
205
206
207
208
209
210
211
212
213
214
215
# File 'aws-flow-ruby/aws-flow/lib/aws/decider/activity.rb', line 204

def handle_schedule_activity_task_failed(event)
  attributes = event.attributes
  activity_id = attributes[:activity_id]
  open_request_info = @decision_helper.scheduled_activities.delete(activity_id)
  activity_state_machine = @decision_helper[activity_id]
  activity_state_machine.consume(:handle_initiation_failed_event)
  if activity_state_machine.done?
    # TODO Fail task correctly
    failure = ScheduleActivityTaskFailedException.new(event.id, event.attributes.activity_type, activity_id, event.attributes.cause)
    open_request_info.completion_handle.fail(failure)
  end
end

- (Object) request_cancel_activity_task(to_cancel)

Requests that the activity is canceled.

Parameters:



134
135
136
137
138
139
140
# File 'aws-flow-ruby/aws-flow/lib/aws/decider/activity.rb', line 134

def request_cancel_activity_task(to_cancel)
   = to_cancel.
  if ! .respond_to? :activity_id
    raise "You need to use a future obtained from an activity"
  end
  @decision_helper[.activity_id].consume(:cancel)
end

- (Object) schedule_activity(name, activity_type, input, options)

Schedules a named activity.

Parameters:

  • name (String)

    Required. The name of the activity to schedule.

  • activity_type (String)

    Required. The activity type for this scheduled activity.

  • input (Object)

    Required. Additional data passed to the activity.

  • options (ActivityOptions)

    Required. ActivityOptions to set for the scheduled activity.



247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
# File 'aws-flow-ruby/aws-flow/lib/aws/decider/activity.rb', line 247

def schedule_activity(name, activity_type, input, options)
  options = Utilities::merge_all_options(@option_map[activity_name_from_activity_type(name)], options)
  new_options = ActivityOptions.new(options)
  output = Utilities::AddressableFuture.new
  open_request = OpenRequestInfo.new
  decision_id = @decision_helper.get_next_id(:Activity)
  output. = ActivityMetadata.new(decision_id)
  error_handler do |t|
    t.begin do
      @data_converter = new_options.data_converter

      #input = input.map { |input_part| @data_converter.dump input_part } unless input.nil?
      input = @data_converter.dump input unless input.empty?
      attributes = {}
      new_options.input ||= input unless input.empty?
      attributes[:options] = new_options
      attributes[:activity_type] = activity_type
      attributes[:decision_id] = decision_id
      @completion_handle = nil
      external_task do |t|
        t.initiate_task do |handle|
          open_request.completion_handle = handle

          @decision_helper.scheduled_activities[decision_id.to_s] = open_request
          @decision_helper[decision_id.to_s] = ActivityDecisionStateMachine.new(decision_id, attributes)
        end
        t.cancellation_handler do |this_handle, cause|
          state_machine = @decision_helper[decision_id.to_s]
          if state_machine.current_state == :created
            open_request = @decision_helper.scheduled_activities.delete(decision_id.to_s)
            open_request.completion_handle.complete
          end
          state_machine.consume(:cancel)
        end
      end
    end
    t.rescue(Exception) do |error|
      @data_converter = new_options.data_converter
      # If we have an ActivityTaskFailedException, then we should figure
      # out what the cause was, and pull that out. If it's anything else,
      # we should serialize the error, and stuff that into details, so
      # that things above us can pull it out correctly. We don't have to
      # do this for ActivityTaskFailedException, as the details is
      # *already* serialized.
      if error.is_a? ActivityTaskFailedException
        details = @data_converter.load(error.details)
        error.cause = details
      else
        details = @data_converter.dump(error)
        error.details = details
      end
      @failure_map[decision_id.to_s] = error
    end
    t.ensure do
      @data_converter = new_options.data_converter
      result = @data_converter.load open_request.result
      output.set(result)
      raise @failure_map[decision_id.to_s] if @failure_map[decision_id.to_s] && new_options.return_on_start
    end
  end
  return output if new_options.return_on_start
  output.get
  this_failure = @failure_map[decision_id.to_s]
  raise this_failure if this_failure
  return output.get
end