Class: AWS::Flow::ActivityTaskPoller

Inherits:
Object
  • Object
show all
Defined in:
aws-flow-ruby/aws-flow/lib/aws/decider/task_poller.rb

Overview

A poller for activity tasks.

Instance Method Summary (collapse)

Constructor Details

- (ActivityTaskPoller) initialize(service, domain, task_list, activity_definition_map, executor, options = nil)

Initializes a new ActivityTaskPoller.

Parameters:

  • service

    Required. The AWS::SimpleWorkflow instance to use.

  • domain

    Required. The domain used by the workflow.

  • task_list

    Required. The task list used to poll for activity tasks.

  • activity_definition_map

    Required. The AWS::Flow::ActivityDefinition instance that implements the activity to run. This map is in the form:

    { :activity_type => 'activity_definition_name' }
    

    The named activity definition will be run when the #execute method is called.

  • options (defaults to: nil)

    Optional. Options to set for the activity poller. You can set the following options:

    • logger - The logger to use.
    • max_workers - The maximum number of workers that can be running at once. The default is 20.


137
138
139
140
141
142
143
144
145
146
# File 'aws-flow-ruby/aws-flow/lib/aws/decider/task_poller.rb', line 137

def initialize(service, domain, task_list, activity_definition_map, executor, options=nil)
  @service = service
  @service_opts = @service.config.to_h
  @domain = domain
  @task_list = task_list
  @activity_definition_map = activity_definition_map
  @logger = options.logger if options
  @logger ||= Utilities::LogFactory.make_logger(self)
  @executor = executor
end

Instance Method Details

- (Object) execute(task)

Executes the specified activity task.

Parameters:



155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
# File 'aws-flow-ruby/aws-flow/lib/aws/decider/task_poller.rb', line 155

def execute(task)
  activity_type = task.activity_type
  begin
    context = ActivityExecutionContext.new(@service, @domain, task)
    unless activity_implementation = @activity_definition_map[activity_type]
      raise "This activity worker was told to work on activity type "\
        "#{activity_type.inspect}, but this activity worker only knows "\
        "how to work on #{@activity_definition_map.keys.map(&:name).join' '}"
    end

    output, original_result, too_large = activity_implementation.execute(task.input, context)

     @logger.debug "Responding on task_token #{task.task_token.inspect}."
    if too_large
      @logger.error "#{task.activity_type.inspect} failed: "\
        "#{Utilities.validation_error_string_partial("Activity")} For "\
        "reference, the result was #{original_result}"

      respond_activity_task_failed_with_retry(
        task.task_token,
        Utilities.validation_error_string("Activity"),
        ""
      )
    elsif ! activity_implementation.execution_options.manual_completion
      @service.respond_activity_task_completed(
        :task_token => task.task_token,
        :result => output
      )
    end
  rescue ActivityFailureException => e
    @logger.error "#{task.activity_type.inspect} failed with exception: #{e.inspect}."
    respond_activity_task_failed_with_retry(
      task.task_token,
      e.message,
      e.details
    )
  end
end

- (Object) poll_and_process_single_task(use_forking = true)

Polls the task list for a new activity task, and executes it if one is found.

If use_forking is set to true and the maximum number of workers (as set in #initialize) are already executing, this method will block until the number of running workers is less than the maximum.

Parameters:

  • use_forking (defaults to: true)

    Optional. Whether to use forking to execute the task. On Windows, you should set this to false.



359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
# File 'aws-flow-ruby/aws-flow/lib/aws/decider/task_poller.rb', line 359

def poll_and_process_single_task(use_forking = true)
  @poll_semaphore ||= SuspendableSemaphore.new
  @poll_semaphore.acquire
  semaphore_needs_release = true
  begin
    if use_forking
      @executor.block_on_max_workers
    end
    @logger.debug "Polling for a new activity task of type #{@activity_definition_map.keys.map{ |x| "#{x.name} #{x.version}"} } on task_list: #{@task_list}"
    task = @domain.activity_tasks.poll_for_single_task(@task_list)
    if task
      @logger.info Utilities.activity_task_to_debug_string("Got activity task", task)
    end
  rescue Exception => e
    @logger.error "Error in the poller, #{e.inspect}"
    @poll_semaphore.release
    return false
  end
  if task.nil?
    @logger.debug "Didn't get a task on task_list: #{@task_list}"
    @poll_semaphore.release
    return false
  end
  semaphore_needs_release = false
  if use_forking
    @executor.execute { process_single_task(task) }
  else
    process_single_task(task)
  end
  @logger.info Utilities.activity_task_to_debug_string("Finished executing task", task)
  return true
end

- (Object) process_single_task(task)

Processes the specified activity task.

Parameters:



310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
# File 'aws-flow-ruby/aws-flow/lib/aws/decider/task_poller.rb', line 310

def process_single_task(task)

  # We are using the 'build' method to create a new ConnectionPool here to
  # make sure that connection pools are not shared among forked processes.
  # The default behavior of the ConnectionPool class is to cache a pool
  # for a set of options created by the 'new' method and always use the
  # same pool for the same set of options. This is undesirable when
  # multiple processes want to use different connection pools with same
  # options as is the case here.
  #
  # Since we can't change the pool of an already existing NetHttpHandler,
  # we also create a new NetHttpHandler in order to use the new pool.

  @service_opts[:connection_pool] = AWS::Core::Http::ConnectionPool.build(@service_opts[:http_handler].pool.options)
  @service_opts[:http_handler] = AWS::Core::Http::NetHttpHandler.new(@service_opts)
  @service = @service.with_options(@service_opts)

  begin
    begin
      execute(task)
    rescue CancellationException => e
      @logger.error "#{task.activity_type.inspect} failed with exception: #{e.inspect}"
      respond_activity_task_canceled_with_retry(task.task_token, e.message)
    rescue Exception => e
      @logger.error "#{task.activity_type.inspect} failed with exception: #{e.inspect}"
      respond_activity_task_failed_with_retry(task.task_token, e.message, e.backtrace)
    ensure
      @poll_semaphore.release
    end
  rescue Exception => e
    semaphore_needs_release = true
    @logger.error "Error in the poller, exception: #{e.inspect}. stacktrace: #{e.backtrace}"
    raise e
  ensure
    @poll_semaphore.release if semaphore_needs_release
  end
end

- (Object) respond_activity_task_canceled(task_token, message)

Responds to the decider that the activity task should be canceled. No retry is attempted.

Parameters:

  • task_token

    Required. The task token from the AWS::Flow::ActivityDefinition object to retry.

  • message

    Required. A message that provides detail about why the activity task is cancelled.



234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
# File 'aws-flow-ruby/aws-flow/lib/aws/decider/task_poller.rb', line 234

def respond_activity_task_canceled(task_token, message)

  begin
    @service.respond_activity_task_canceled(
      :task_token => task_token,
      :details => message
    )
  rescue AWS::SimpleWorkflow::Errors::ValidationException => e
    if e.message.include? "failed to satisfy constraint: Member must have length less than or equal to"
      # We want to ensure that the ActivityWorker doesn't just sit
      # around and time the activity out. If there is a validation failure
      # possibly because of large custom exceptions we should fail the
      # activity task with some minimal details
      respond_activity_task_failed_with_retry(
        task_token,
        Utilities.validation_error_string("Activity"),
        "AWS::SimpleWorkflow::Errors::ValidationException"
      )
    end
    @logger.error "respond_activity_task_canceled call failed with "\
      "exception: #{e.inspect}"
  end

end

- (Object) respond_activity_task_failed(task_token, reason, details)

Responds to the decider that the activity task has failed. No retry is attempted.

Parameters:

  • task_token

    Required. The task token from the AWS::Flow::ActivityDefinition object to retry. The task token is generated by the service and should be treated as an opaque value.

  • reason

    Required. Description of the error that may assist in diagnostics. Although this value is required, you can set it to an empty string if you don't need this information.

  • details

    Required. Detailed information about the failure. Although this value is required, you can set it to an empty string if you don't need this information.



277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
# File 'aws-flow-ruby/aws-flow/lib/aws/decider/task_poller.rb', line 277

def respond_activity_task_failed(task_token, reason, details)
  @logger.debug "The task token to be reported on is #{task_token}"

  begin
    @service.respond_activity_task_failed(
      task_token: task_token,
      reason: reason.to_s,
      details: details.to_s
    )
  rescue AWS::SimpleWorkflow::Errors::ValidationException => e
    if e.message.include? "failed to satisfy constraint: Member must have length less than or equal to"
      # We want to ensure that the ActivityWorker doesn't just sit
      # around and time the activity out. If there is a validation failure
      # possibly because of large custom exceptions we should fail the
      # activity task with some minimal details
      respond_activity_task_failed_with_retry(
        task.task_token,
        Utilities.validation_error_string("Activity"),
        "AWS::SimpleWorkflow::Errors::ValidationException"
      )
    end
    @logger.error "respond_activity_task_failed call failed with "\
      "exception: #{e.inspect}"
  end
end