例アクティビティRuby のワーカー - AWS Step Functions

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

例アクティビティRuby のワーカー

以下は、ベストプラクティスの使用方法およびお客様独自のアクティビティワーカーを実装する方法を示すため、AWS SDK for Ruby を使用するアクティビティワーカーの例です。

このコードには、ポーラーおよびアクティビティワーカー用に構成可能な数のスレッドを含む、コンシューマー/プロデューサーパターンが実装されています。ポーラースレッドは、アクティビティタスクを常にロングポーリングします。アクティビティタスクが取得されると、アクティビティスレッドの区切られたブロックキューを通じて渡され、選択されます。

次の Ruby コードは、この例の Ruby アクティビティワーカーのメインエントリポイントです。

require_relative '../lib/step_functions/activity' credentials = Aws::SharedCredentials.new region = 'us-west-2' activity_arn = 'ACTIVITY_ARN' activity = StepFunctions::Activity.new( credentials: credentials, region: region, activity_arn: activity_arn, workers_count: 1, pollers_count: 1, heartbeat_delay: 30 ) # The start method takes as argument the block that is the actual logic of your custom activity. activity.start do |input| { result: :SUCCESS, echo: input['value'] } end

コードにはデフォルトが含まれていますが、アクティビティを参照するために変更し、実際の実装に合わせることができます。このコードは、実際の実装ロジックを入力として取得するため、特定のアクティビティと認証情報を参照し、スレッドの数とハートビートの遅延を設定できます。詳細およびコードのダウンロードについては、Step Functions Ruby アクティビティワーカーを参照してください。

項目 説明

require_relative

次の例のアクティビティワーカーコードへの相対パス。

region

アクティビティの AWS リージョン。

workers_count

アクティビティワーカーのスレッドの数。ほとんどの実装では、10~20 のスレッドで十分です。アクティビティの処理に時間がかかればかかるほど、必要なスレッドが多くなります。試算するには、1 秒あたりのプロセスアクティビティの数と、99 パーセンタイルアクティビティ処理レイテンシー (秒単位) を乗算してください。

pollers_count

ポーラーのスレッドの数。ほとんどの実装では、10~20 のスレッドで十分です。

heartbeat_delay

ハートビート間の遅延 (秒)。

input アクティビティの実装ロジック。

以下は、コード内で ../lib/step_functions/activity とともに参照される Ruby アクティビティワーカーです。

require 'set' require 'json' require 'thread' require 'logger' require 'aws-sdk' module Validate def self.positive(value) raise ArgumentError, 'Argument has to be positive' if value <= 0 value end def self.required(value) raise ArgumentError, 'Argument is required' if value.nil? value end end module StepFunctions class RetryError < StandardError def initialize(message) super(message) end end def self.with_retries(options = {}, &block) retries = 0 base_delay_seconds = options[:base_delay_seconds] || 2 max_retries = options[:max_retries] || 3 begin block.call rescue => e puts e if retries < max_retries retries += 1 sleep base_delay_seconds**retries retry end raise RetryError, 'All retries of operation had failed' end end class Activity def initialize(options = {}) @states = Aws::States::Client.new( credentials: Validate.required(options[:credentials]), region: Validate.required(options[:region]), http_read_timeout: Validate.positive(options[:http_read_timeout] || 60) ) @activity_arn = Validate.required(options[:activity_arn]) @heartbeat_delay = Validate.positive(options[:heartbeat_delay] || 60) @queue_max = Validate.positive(options[:queue_max] || 5) @pollers_count = Validate.positive(options[:pollers_count] || 1) @workers_count = Validate.positive(options[:workers_count] || 1) @max_retry = Validate.positive(options[:workers_count] || 3) @logger = Logger.new(STDOUT) end def start(&block) @sink = SizedQueue.new(@queue_max) @activities = Set.new start_heartbeat_worker(@activities) start_workers(@activities, block, @sink) start_pollers(@activities, @sink) wait end def queue_size return 0 if @sink.nil? @sink.size end def activities_count return 0 if @activities.nil? @activities.size end private def start_pollers(activities, sink) @pollers = Array.new(@pollers_count) do PollerWorker.new( states: @states, activity_arn: @activity_arn, sink: sink, activities: activities, max_retry: @max_retry ) end @pollers.each(&:start) end def start_workers(activities, block, sink) @workers = Array.new(@workers_count) do ActivityWorker.new( states: @states, block: block, sink: sink, activities: activities, max_retry: @max_retry ) end @workers.each(&:start) end def start_heartbeat_worker(activities) @heartbeat_worker = HeartbeatWorker.new( states: @states, activities: activities, heartbeat_delay: @heartbeat_delay, max_retry: @max_retry ) @heartbeat_worker.start end def wait sleep rescue Interrupt shutdown ensure Thread.current.exit end def shutdown stop_workers(@pollers) wait_workers(@pollers) wait_activities_drained stop_workers(@workers) wait_activities_completed shutdown_workers(@workers) shutdown_worker(@heartbeat_worker) end def shutdown_workers(workers) workers.each do |worker| shutdown_worker(worker) end end def shutdown_worker(worker) worker.kill end def wait_workers(workers) workers.each(&:wait) end def wait_activities_drained wait_condition { @sink.empty? } end def wait_activities_completed wait_condition { @activities.empty? } end def wait_condition(&block) loop do break if block.call sleep(1) end end def stop_workers(workers) workers.each(&:stop) end class Worker def initialize @logger = Logger.new(STDOUT) @running = false end def run raise 'Method run hasn\'t been implemented' end def process loop do begin break unless @running run rescue => e puts e @logger.error('Unexpected error has occurred') @logger.error(e) end end end def start return unless @thread.nil? @running = true @thread = Thread.new do process end end def stop @running = false end def kill return if @thread.nil? @thread.kill @thread = nil end def wait @thread.join end end class PollerWorker < Worker def initialize(options = {}) @states = options[:states] @activity_arn = options[:activity_arn] @sink = options[:sink] @activities = options[:activities] @max_retry = options[:max_retry] @logger = Logger.new(STDOUT) end def run activity_task = StepFunctions.with_retries(max_retry: @max_retry) do begin @states.get_activity_task(activity_arn: @activity_arn) rescue => e @logger.error('Failed to retrieve activity task') @logger.error(e) end end return if activity_task.nil? || activity_task.task_token.nil? @activities.add(activity_task.task_token) @sink.push(activity_task) end end class ActivityWorker < Worker def initialize(options = {}) @states = options[:states] @block = options[:block] @sink = options[:sink] @activities = options[:activities] @max_retry = options[:max_retry] @logger = Logger.new(STDOUT) end def run activity_task = @sink.pop result = @block.call(JSON.parse(activity_task.input)) send_task_success(activity_task, result) rescue => e send_task_failure(activity_task, e) ensure @activities.delete(activity_task.task_token) unless activity_task.nil? end def send_task_success(activity_task, result) StepFunctions.with_retries(max_retry: @max_retry) do begin @states.send_task_success( task_token: activity_task.task_token, output: JSON.dump(result) ) rescue => e @logger.error('Failed to send task success') @logger.error(e) end end end def send_task_failure(activity_task, error) StepFunctions.with_retries do begin @states.send_task_failure( task_token: activity_task.task_token, cause: error.message ) rescue => e @logger.error('Failed to send task failure') @logger.error(e) end end end end class HeartbeatWorker < Worker def initialize(options = {}) @states = options[:states] @activities = options[:activities] @heartbeat_delay = options[:heartbeat_delay] @max_retry = options[:max_retry] @logger = Logger.new(STDOUT) end def run sleep(@heartbeat_delay) @activities.each do |token| send_heartbeat(token) end end def send_heartbeat(token) StepFunctions.with_retries(max_retry: @max_retry) do begin @states.send_task_heartbeat(token) rescue => e @logger.error('Failed to send heartbeat for activity') @logger.error(e) end end rescue => e @logger.error('Failed to send heartbeat for activity') @logger.error(e) end end end end