Ruby 활동 작업자 예제 - AWS Step Functions

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

Ruby 활동 작업자 예제

다음은 AWS SDK for Ruby를 사용하여 모범 사례 사용 방법과 활동 작업자 구현 방법을 보여주는 활동 작업자 예제입니다.

이 코드는 폴러 및 활동 작업자용 스레드 수를 구성할 수 있는 소비자-생산자 패턴을 구현합니다. 폴러 스레드는 지속적으로 활동 작업을 폴링합니다. 검색된 활동 작업은 활동 스레드가 픽업할 수 있도록 경계가 있는 차단 대기열을 통해 전달됩니다.

아래 Ruby 코드는 이 Ruby 활동 작업자 예제에 대한 주 진입점입니다.

require_relative '../lib/step_functions/activity' credentials = Aws::SharedCredentials.new region = 'us-west-2' activity_arn = 'ACTIVITY_ARN' activity = StepFunctions::Activity.new( credentials: credentials, region: region, activity_arn: activity_arn, workers_count: 1, pollers_count: 1, heartbeat_delay: 30 ) # The start method takes as argument the block that is the actual logic of your custom activity. activity.start do |input| { result: :SUCCESS, echo: input['value'] } end

이 코드에는 기본값이 포함되어 있으며, 사용자가 각 값을 본인의 활동을 참조하도록 변경하고 특정 구현에 맞춰 조정할 수 있습니다. 이 코드는 실제 구현 로직을 입력으로 사용하며 사용자가 특정 활동 및 자격 증명을 참조하고 스레드 수 및 하트비트 지연을 구성할 수 있습니다. 코드에 대한 자세한 내용과 다운로드 방법은 Step Functions Ruby Activity Worker를 참조하세요.

항목 설명

require_relative

다음 활동 작업자 코드 예제의 상대 경로입니다.

region

AWS 활동의 리전입니다.

workers_count

활동 작업자용 스레드 수입니다. 대부분의 구현에서는 10~20개 스레드면 충분할 것입니다. 활동 처리 시간이 길수록 더 많은 스레드가 필요할 수 있습니다. 초당 프로세스 활동 수에 활동 처리 지연 시간(초)의 99 백분위를 곱하면 추정치를 구할 수 있습니다.

pollers_count

폴러용 스레드 수입니다. 대부분의 구현에서는 10~20개 스레드면 충분할 것입니다.

heartbeat_delay

하트비트 간 지연 시간(초)입니다.

input 활동의 구현 로직입니다.

다음은 코드에서 ../lib/step_functions/activity로 참조되는 Ruby 활동 작업자입니다.

require 'set' require 'json' require 'thread' require 'logger' require 'aws-sdk' module Validate def self.positive(value) raise ArgumentError, 'Argument has to be positive' if value <= 0 value end def self.required(value) raise ArgumentError, 'Argument is required' if value.nil? value end end module StepFunctions class RetryError < StandardError def initialize(message) super(message) end end def self.with_retries(options = {}, &block) retries = 0 base_delay_seconds = options[:base_delay_seconds] || 2 max_retries = options[:max_retries] || 3 begin block.call rescue => e puts e if retries < max_retries retries += 1 sleep base_delay_seconds**retries retry end raise RetryError, 'All retries of operation had failed' end end class Activity def initialize(options = {}) @states = Aws::States::Client.new( credentials: Validate.required(options[:credentials]), region: Validate.required(options[:region]), http_read_timeout: Validate.positive(options[:http_read_timeout] || 60) ) @activity_arn = Validate.required(options[:activity_arn]) @heartbeat_delay = Validate.positive(options[:heartbeat_delay] || 60) @queue_max = Validate.positive(options[:queue_max] || 5) @pollers_count = Validate.positive(options[:pollers_count] || 1) @workers_count = Validate.positive(options[:workers_count] || 1) @max_retry = Validate.positive(options[:workers_count] || 3) @logger = Logger.new(STDOUT) end def start(&block) @sink = SizedQueue.new(@queue_max) @activities = Set.new start_heartbeat_worker(@activities) start_workers(@activities, block, @sink) start_pollers(@activities, @sink) wait end def queue_size return 0 if @sink.nil? @sink.size end def activities_count return 0 if @activities.nil? @activities.size end private def start_pollers(activities, sink) @pollers = Array.new(@pollers_count) do PollerWorker.new( states: @states, activity_arn: @activity_arn, sink: sink, activities: activities, max_retry: @max_retry ) end @pollers.each(&:start) end def start_workers(activities, block, sink) @workers = Array.new(@workers_count) do ActivityWorker.new( states: @states, block: block, sink: sink, activities: activities, max_retry: @max_retry ) end @workers.each(&:start) end def start_heartbeat_worker(activities) @heartbeat_worker = HeartbeatWorker.new( states: @states, activities: activities, heartbeat_delay: @heartbeat_delay, max_retry: @max_retry ) @heartbeat_worker.start end def wait sleep rescue Interrupt shutdown ensure Thread.current.exit end def shutdown stop_workers(@pollers) wait_workers(@pollers) wait_activities_drained stop_workers(@workers) wait_activities_completed shutdown_workers(@workers) shutdown_worker(@heartbeat_worker) end def shutdown_workers(workers) workers.each do |worker| shutdown_worker(worker) end end def shutdown_worker(worker) worker.kill end def wait_workers(workers) workers.each(&:wait) end def wait_activities_drained wait_condition { @sink.empty? } end def wait_activities_completed wait_condition { @activities.empty? } end def wait_condition(&block) loop do break if block.call sleep(1) end end def stop_workers(workers) workers.each(&:stop) end class Worker def initialize @logger = Logger.new(STDOUT) @running = false end def run raise 'Method run hasn\'t been implemented' end def process loop do begin break unless @running run rescue => e puts e @logger.error('Unexpected error has occurred') @logger.error(e) end end end def start return unless @thread.nil? @running = true @thread = Thread.new do process end end def stop @running = false end def kill return if @thread.nil? @thread.kill @thread = nil end def wait @thread.join end end class PollerWorker < Worker def initialize(options = {}) @states = options[:states] @activity_arn = options[:activity_arn] @sink = options[:sink] @activities = options[:activities] @max_retry = options[:max_retry] @logger = Logger.new(STDOUT) end def run activity_task = StepFunctions.with_retries(max_retry: @max_retry) do begin @states.get_activity_task(activity_arn: @activity_arn) rescue => e @logger.error('Failed to retrieve activity task') @logger.error(e) end end return if activity_task.nil? || activity_task.task_token.nil? @activities.add(activity_task.task_token) @sink.push(activity_task) end end class ActivityWorker < Worker def initialize(options = {}) @states = options[:states] @block = options[:block] @sink = options[:sink] @activities = options[:activities] @max_retry = options[:max_retry] @logger = Logger.new(STDOUT) end def run activity_task = @sink.pop result = @block.call(JSON.parse(activity_task.input)) send_task_success(activity_task, result) rescue => e send_task_failure(activity_task, e) ensure @activities.delete(activity_task.task_token) unless activity_task.nil? end def send_task_success(activity_task, result) StepFunctions.with_retries(max_retry: @max_retry) do begin @states.send_task_success( task_token: activity_task.task_token, output: JSON.dump(result) ) rescue => e @logger.error('Failed to send task success') @logger.error(e) end end end def send_task_failure(activity_task, error) StepFunctions.with_retries do begin @states.send_task_failure( task_token: activity_task.task_token, cause: error.message ) rescue => e @logger.error('Failed to send task failure') @logger.error(e) end end end end class HeartbeatWorker < Worker def initialize(options = {}) @states = options[:states] @activities = options[:activities] @heartbeat_delay = options[:heartbeat_delay] @max_retry = options[:max_retry] @logger = Logger.new(STDOUT) end def run sleep(@heartbeat_delay) @activities.each do |token| send_heartbeat(token) end end def send_heartbeat(token) StepFunctions.with_retries(max_retry: @max_retry) do begin @states.send_task_heartbeat(token) rescue => e @logger.error('Failed to send heartbeat for activity') @logger.error(e) end end rescue => e @logger.error('Failed to send heartbeat for activity') @logger.error(e) end end end end