Exemple d'activité de travail en Ruby - AWS Step Functions

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Exemple d'activité de travail en Ruby

Ce qui suit est un exemple de travail d'activité utilisant le AWS SDK for Ruby pour montrer comment utiliser les bonnes pratiques et implémenter votre propre travail d'activité.

Le code implémente un modèle consommateur-producteur avec un nombre configurable de threads pour les observateurs et les travaux d'activité. Les threads de l'observateur interrogent constamment et longuement la tâche d'activité. Une fois qu'une tâche d'activité est extraite, elle est transmise via une file d'attente à blocage délimité pour que le thread de l'activité la sélectionne.

Le code Ruby suivant est le point d'entrée principal pour cet exemple de travail d'activité.

require_relative '../lib/step_functions/activity' credentials = Aws::SharedCredentials.new region = 'us-west-2' activity_arn = 'ACTIVITY_ARN' activity = StepFunctions::Activity.new( credentials: credentials, region: region, activity_arn: activity_arn, workers_count: 1, pollers_count: 1, heartbeat_delay: 30 ) # The start method takes as argument the block that is the actual logic of your custom activity. activity.start do |input| { result: :SUCCESS, echo: input['value'] } end

Le code inclut les valeurs par défaut que vous pouvez modifier pour faire référence à votre activité et l'adapter à votre implémentation spécifique. Le code prend comme entrée la logique réelle de l'implémentation, vous permet de faire référence à votre activité spécifique et à vos informations d'identification, et vous permet de configurer le nombre de threads et le délai de pulsation. Pour plus d'informations et pour télécharger le code, consultez Step Functions Ruby Activity Worker.

Élément Description

require_relative

Chemin d'accès relatif à l'exemple du code du travail d'activité suivant.

region

AWS Région de votre activité.

workers_count

Nombre de threads de votre travail d'activité. Pour la plupart des implémentations, entre 10 et 20 threads doivent suffire. Plus le temps de traitement de l'activité est long, plus le nombre de threads nécessaires est élevé. À titre d'estimation, multipliez le nombre d'activités de traitement par seconde par la latence du traitement d'activité du 99e percentile, en secondes.

pollers_count

Nombre de threads de vos observateurs. Entre 10 et 20 threads doivent suffire pour la plupart des implémentations.

heartbeat_delay

Délai en secondes entre les pulsations.

input Logique d'implémentation de votre activité.

Voici le travail d'activité Ruby, auquel il est fait référence dans votre code avec ../lib/step_functions/activity.

require 'set' require 'json' require 'thread' require 'logger' require 'aws-sdk' module Validate def self.positive(value) raise ArgumentError, 'Argument has to be positive' if value <= 0 value end def self.required(value) raise ArgumentError, 'Argument is required' if value.nil? value end end module StepFunctions class RetryError < StandardError def initialize(message) super(message) end end def self.with_retries(options = {}, &block) retries = 0 base_delay_seconds = options[:base_delay_seconds] || 2 max_retries = options[:max_retries] || 3 begin block.call rescue => e puts e if retries < max_retries retries += 1 sleep base_delay_seconds**retries retry end raise RetryError, 'All retries of operation had failed' end end class Activity def initialize(options = {}) @states = Aws::States::Client.new( credentials: Validate.required(options[:credentials]), region: Validate.required(options[:region]), http_read_timeout: Validate.positive(options[:http_read_timeout] || 60) ) @activity_arn = Validate.required(options[:activity_arn]) @heartbeat_delay = Validate.positive(options[:heartbeat_delay] || 60) @queue_max = Validate.positive(options[:queue_max] || 5) @pollers_count = Validate.positive(options[:pollers_count] || 1) @workers_count = Validate.positive(options[:workers_count] || 1) @max_retry = Validate.positive(options[:workers_count] || 3) @logger = Logger.new(STDOUT) end def start(&block) @sink = SizedQueue.new(@queue_max) @activities = Set.new start_heartbeat_worker(@activities) start_workers(@activities, block, @sink) start_pollers(@activities, @sink) wait end def queue_size return 0 if @sink.nil? @sink.size end def activities_count return 0 if @activities.nil? @activities.size end private def start_pollers(activities, sink) @pollers = Array.new(@pollers_count) do PollerWorker.new( states: @states, activity_arn: @activity_arn, sink: sink, activities: activities, max_retry: @max_retry ) end @pollers.each(&:start) end def start_workers(activities, block, sink) @workers = Array.new(@workers_count) do ActivityWorker.new( states: @states, block: block, sink: sink, activities: activities, max_retry: @max_retry ) end @workers.each(&:start) end def start_heartbeat_worker(activities) @heartbeat_worker = HeartbeatWorker.new( states: @states, activities: activities, heartbeat_delay: @heartbeat_delay, max_retry: @max_retry ) @heartbeat_worker.start end def wait sleep rescue Interrupt shutdown ensure Thread.current.exit end def shutdown stop_workers(@pollers) wait_workers(@pollers) wait_activities_drained stop_workers(@workers) wait_activities_completed shutdown_workers(@workers) shutdown_worker(@heartbeat_worker) end def shutdown_workers(workers) workers.each do |worker| shutdown_worker(worker) end end def shutdown_worker(worker) worker.kill end def wait_workers(workers) workers.each(&:wait) end def wait_activities_drained wait_condition { @sink.empty? } end def wait_activities_completed wait_condition { @activities.empty? } end def wait_condition(&block) loop do break if block.call sleep(1) end end def stop_workers(workers) workers.each(&:stop) end class Worker def initialize @logger = Logger.new(STDOUT) @running = false end def run raise 'Method run hasn\'t been implemented' end def process loop do begin break unless @running run rescue => e puts e @logger.error('Unexpected error has occurred') @logger.error(e) end end end def start return unless @thread.nil? @running = true @thread = Thread.new do process end end def stop @running = false end def kill return if @thread.nil? @thread.kill @thread = nil end def wait @thread.join end end class PollerWorker < Worker def initialize(options = {}) @states = options[:states] @activity_arn = options[:activity_arn] @sink = options[:sink] @activities = options[:activities] @max_retry = options[:max_retry] @logger = Logger.new(STDOUT) end def run activity_task = StepFunctions.with_retries(max_retry: @max_retry) do begin @states.get_activity_task(activity_arn: @activity_arn) rescue => e @logger.error('Failed to retrieve activity task') @logger.error(e) end end return if activity_task.nil? || activity_task.task_token.nil? @activities.add(activity_task.task_token) @sink.push(activity_task) end end class ActivityWorker < Worker def initialize(options = {}) @states = options[:states] @block = options[:block] @sink = options[:sink] @activities = options[:activities] @max_retry = options[:max_retry] @logger = Logger.new(STDOUT) end def run activity_task = @sink.pop result = @block.call(JSON.parse(activity_task.input)) send_task_success(activity_task, result) rescue => e send_task_failure(activity_task, e) ensure @activities.delete(activity_task.task_token) unless activity_task.nil? end def send_task_success(activity_task, result) StepFunctions.with_retries(max_retry: @max_retry) do begin @states.send_task_success( task_token: activity_task.task_token, output: JSON.dump(result) ) rescue => e @logger.error('Failed to send task success') @logger.error(e) end end end def send_task_failure(activity_task, error) StepFunctions.with_retries do begin @states.send_task_failure( task_token: activity_task.task_token, cause: error.message ) rescue => e @logger.error('Failed to send task failure') @logger.error(e) end end end end class HeartbeatWorker < Worker def initialize(options = {}) @states = options[:states] @activities = options[:activities] @heartbeat_delay = options[:heartbeat_delay] @max_retry = options[:max_retry] @logger = Logger.new(STDOUT) end def run sleep(@heartbeat_delay) @activities.each do |token| send_heartbeat(token) end end def send_heartbeat(token) StepFunctions.with_retries(max_retry: @max_retry) do begin @states.send_task_heartbeat(token) rescue => e @logger.error('Failed to send heartbeat for activity') @logger.error(e) end end rescue => e @logger.error('Failed to send heartbeat for activity') @logger.error(e) end end end end