Class: AWS::Flow::ActivityWorker

Inherits:
GenericWorker show all
Defined in:
aws-flow-ruby/aws-flow/lib/aws/decider/worker.rb

Overview

Used to implement an activity worker. You can use the ActivityWorker class to conveniently poll a task list for activity tasks.

You configure the activity worker with activity implementation objects. This worker class then polls for activity tasks in the specified task list. When an activity task is received, it looks up the appropriate implementation that you provided, and calls the activity method to process the task. Unlike the WorkflowWorker, which creates a new instance for every decision task, the ActivityWorker simply uses the object you provided.

Instance Method Summary (collapse)

Methods inherited from GenericWorker

#resolve_default_task_list

Constructor Details

- (ActivityWorker) initialize(service, domain, task_list, *args, &block)

Creates a new ActivityWorker instance.

Parameters:

  • service

    The Amazon SWF Client used to register this activity worker.

  • domain (String)

    The Amazon SWF Domain to operate on.

  • task_list (Array)

    The default task list to put all of the activity requests.

  • args

    The activities to use.



285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
# File 'aws-flow-ruby/aws-flow/lib/aws/decider/worker.rb', line 285

def initialize(service, domain, task_list, *args, &block)
  @activity_definition_map = {}
  @activity_type_options = []
  @options = Utilities::interpret_block_for_options(WorkerOptions, block)

  if @options
    @logger = @options.logger || Utilities::LogFactory.make_logger(self)
    @options.logger ||= @logger
    # Set the number of execution workers to 0 if it's not already set and
    # if the platform is Windows
    @options.execution_workers ||= 0 if AWS::Flow.on_windows?
    max_workers = @options.execution_workers
    # If max_workers is set to 0, then turn forking off
    @options.use_forking = false if (max_workers && max_workers.zero?)
  end
  max_workers = 20 if (max_workers.nil?)

  @executor = ForkingExecutor.new(
    :max_workers => max_workers,
    :logger => @logger
  )

  @shutdown_first_time_function = lambda do
    @executor.shutdown Float::INFINITY
    Kernel.exit
  end
  super(service, domain, task_list, *args)
end

Instance Method Details

- (Object) add_activities_implementation(class_or_instance)

Adds an activity implementation to this ActivityWorker.

Parameters:

  • class_or_instance (Activity)

    The AWS::Flow::Activity class or instance to add.



361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
# File 'aws-flow-ruby/aws-flow/lib/aws/decider/worker.rb', line 361

def add_activities_implementation(class_or_instance)
  klass = (class_or_instance.class == Class) ? class_or_instance : class_or_instance.class
  instance = (class_or_instance.class == Class) ? class_or_instance.new : class_or_instance
  klass.activities.each do |activity_type|

    # TODO this should assign to an activityImplementation, so that we can
    # call execute on it later
    @activity_definition_map[activity_type] = ActivityDefinition.new(
      instance,
      activity_type.name.split(".").last,
      nil,
      activity_type.options,
      activity_type.options.data_converter
    )
    options = activity_type.options
    option_hash = {
      :domain => @domain.name,
      :name => activity_type.name.to_s,
      :version => activity_type.version
    }

    option_hash.merge!(options.get_registration_options)

    if options.default_task_list
      option_hash.merge!(
        :default_task_list => {:name => resolve_default_task_list(options.default_task_list)}
      )
    end

    @activity_type_options << option_hash
  end
end

- (Object) add_implementation(class_or_instance)

Adds an activity implementation to this ActivityWorker.

Parameters:

  • class_or_instance (Activity)

    The AWS::Flow::Activity class or instance to add.



319
320
321
# File 'aws-flow-ruby/aws-flow/lib/aws/decider/worker.rb', line 319

def add_implementation(class_or_instance)
  add_activities_implementation(class_or_instance)
end

- (Object) register

Registers the activity type.



325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
# File 'aws-flow-ruby/aws-flow/lib/aws/decider/worker.rb', line 325

def register
  @activity_type_options.each do |activity_type_options|
    begin
      @service.register_activity_type(activity_type_options)
    rescue AWS::SimpleWorkflow::Errors::TypeAlreadyExistsFault => e
      @logger.warn "#{e.class} while trying to register activity #{e.message} with options #{activity_type_options}"
      previous_registration = @service.describe_activity_type(
        :domain => @domain.name,
        :activity_type => {
          :name => activity_type_options[:name],
          :version => activity_type_options[:version]
        }
      )
      default_options = activity_type_options.select { |key, val| key =~ /default/}
      previous_keys = previous_registration["configuration"].keys.map {|x| camel_case_to_snake_case(x).to_sym}

      previous_registration = Hash[previous_keys.zip(previous_registration["configuration"].values)]
      if previous_registration[:default_task_list]
        previous_registration[:default_task_list][:name] = previous_registration[:default_task_list].delete("name")
      end
      registration_difference =  default_options.sort.to_a - previous_registration.sort.to_a

      unless registration_difference.empty?
        raise "Activity [#{activity_type_options[:name]}]: There is a difference between the types you have registered previously and the types you are currently registering, but you haven't changed the version. These new changes will not be picked up. In particular, these options are different #{Hash[registration_difference]}"
      end
      # Purposefully eaten up, the alternative is to check first, and who
      # wants to do two trips when one will do?
    end
  end
end

- (Object) run_once(should_register = true, poller = nil)

Starts the activity that was added to the ActivityWorker and, optionally, sets the AWS::Flow::ActivityTaskPoller.

Parameters:



430
431
432
433
434
435
436
437
438
439
440
441
442
443
# File 'aws-flow-ruby/aws-flow/lib/aws/decider/worker.rb', line 430

def run_once(should_register = true, poller = nil)
  register if should_register
  poller = ActivityTaskPoller.new(
    @service,
    @domain,
    @task_list,
    @activity_definition_map,
    @executor,
    @options
  ) if poller.nil?

  Kernel.exit if @shutting_down
  poller.poll_and_process_single_task(@options.use_forking)
end

- (Object) start(should_register = true)

Starts the activity that was added to the ActivityWorker.

Parameters:

  • should_register (true, false) (defaults to: true)

    Set to false if the activity should not register itself (it is already registered).



401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
# File 'aws-flow-ruby/aws-flow/lib/aws/decider/worker.rb', line 401

def start(should_register = true)

  register if should_register
  poller = ActivityTaskPoller.new(
    @service,
    @domain,
    @task_list,
    @activity_definition_map,
    @executor,
    @options
  )

  @logger.debug "Starting an infinite loop to poll and process activity tasks."
  loop do
    run_once(false, poller)
  end
end