구독 워크플로 자습서 파트 2: 워크플로 구현 - Amazon Simple Workflow Service

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

구독 워크플로 자습서 파트 2: 워크플로 구현

지금까지 예로 든 코드는 매우 일반적이었습니다. 워크플로가 수행하는 작업과 워크플로를 구현하는 데 필요한 활동을 정의하기 위해 시작할 부분입니다.

워크플로 설계

기억해 보면 워크플로에 대한 최초 아이디어는 다음 단계로 구성되어 있습니다.

  1. 사용자로부터 구독 주소(이메일 또는 SMS)를 얻습니다.

  2. SNS 주제를 생성하고 해당 주제에 대해 제공된 엔드포인트를 구독합니다.

  3. 사용자가 구독을 확인할 때까지 기다립니다.

  4. 사용자가 확인하면 주제에 축하 메시지를 게시합니다.

워크플로의 각 단계를 수행해야 하는 활동이라고 생각할 수 있습니다. 워크플로는 적절한 시점에 각 활동 예약과 활동 간 데이터 전송을 담당합니다.

이 워크플로의 경우 각 단계에 대해 개별 활동을 생성하고 자세한 이름을 지정합니다.

  1. get_contact_activity

  2. subscribe_topic_activity

  3. wait_for_confirmation_activity

  4. send_result_activity

이러한 활동은 순서대로 실행되며 각 단계의 데이터는 후속 단계에 사용됩니다.

모든 코드가 소스 파일 하나에 포함되어 있도록 애플리케이션을 설계할 수 있지만 이는 Amazon SWF가 설계된 방식과 반대로 실행됩니다. 워크플로의 경우 범위가 전체 인터넷에 걸쳐 있을 수 있기 때문에 애플리케이션을 개별 실행 파일 2개로 분할해 보겠습니다.

  • swf_sns_workflow.rb - 워크플로와 워크플로 시작자를 포함합니다.

  • swf_sns_activities.rb - 활동과 활동 시작자를 포함합니다.

워크플로와 활동 구현은 별도의 창, 별도의 컴퓨터 또는 전 세계의 여러 지역에서 실행할 수 있습니다. Amazon SWF는 워크플로 및 활동의 세부 정보를 추적하기 때문에 워크플로는 활동이 실행되는 위치와 상관 없이 활동 예약 및 활동의 데이터 전송을 조정할 수 있습니다.

워크플로 코드 설정

swf_sns_workflow.rb라는 파일을 생성하여 시작합니다. 이 파일에서 SampleWorkflow라는 클래스를 선언합니다. 다음은 클래스 선언 및 해당 클래스의 생성자인 initialize 메서드입니다.

require_relative 'utils.rb' # SampleWorkflow - the main workflow for the SWF/SNS Sample # # See the file called `README.md` for a description of what this file does. class SampleWorkflow attr_accessor :name def initialize(workflowId) # the domain to look for decision tasks in. @domain = init_domain # the task list is used to poll for decision tasks. @workflowId = workflowId # The list of activities to run, in order. These name/version hashes can be # passed directly to AWS::SimpleWorkflow::DecisionTask#schedule_activity_task. @activity_list = [ { :name => 'get_contact_activity', :version => 'v1' }, { :name => 'subscribe_topic_activity', :version => 'v1' }, { :name => 'wait_for_confirmation_activity', :version => 'v1' }, { :name => 'send_result_activity', :version => 'v1' }, ].reverse! # reverse the order... we're treating this like a stack. register_workflow end

보시다시피 여기서는 다음 클래스 인스턴스 데이터를 기록합니다.

  • domain - utils.rbinit_domain에서 검색한 도메인 이름

  • workflowId - initialize로 전달된 작업 목록

  • activity_list - 실행할 활동의 이름 및 버전이 포함된 활동 목록

Amazon SWF는 도메인 이름, 활동 이름 및 활동 버전만 있으면 활동 유형을 충분히 식별할 수 있으므로 활동을 예약하기 위해 활동에 대해 이러한 데이터만 기록하면 됩니다.

작업 목록은 워크플로의 결정자 코드에서 결정 작업을 폴링하고 활동을 예약하는 데 사용합니다.

이 함수의 마지막에서는 아직 정의하지 않은 메서드인 register_workflow를 호출합니다. 이어서 이 메서드를 정의하겠습니다.

워크플로 등록

워크플로 유형을 사용하려면 먼저 등록해야 합니다. 활동 유형과 마찬가지로, 워크플로 유형은 도메인, 이름 및 버전별로 식별됩니다. 또한 도메인 및 활동 유형처럼 기존 워크플로 유형은 다시 등록할 수 없습니다. 워크플로 유형에 대한 일부 정보를 변경해야 하는 경우 워크플로에 새 버전을 제공해야 하는데 즉, 기본적으로 새로운 유형을 만들어야 합니다.

다음은 register_workflow에 대한 코드로, 이전 실행에서 등록한 기존 워크플로 유형을 검색하거나 아직 등록되지 않은 워크플로를 등록하는 데 사용됩니다.

# Registers the workflow def register_workflow workflow_name = 'swf-sns-workflow' @workflow_type = nil # a default value... workflow_version = '1' # Check to see if this workflow type already exists. If so, use it. @domain.workflow_types.each do | a | if (a.name == workflow_name) && (a.version == workflow_version) @workflow_type = a end end if @workflow_type.nil? options = { :default_child_policy => :terminate, :default_task_start_to_close_timeout => 3600, :default_execution_start_to_close_timeout => 24 * 3600 } puts "registering workflow: #{workflow_name}, #{workflow_version}, #{options.inspect}" @workflow_type = @domain.workflow_types.register(workflow_name, workflow_version, options) end puts "** registered workflow: #{workflow_name}" end

먼저 도메인의 workflow_types 모음을 반복해 워크플로우의 이름과 버전이 이미 등록되어 있는지 확인합니다. 일치하는 항목을 찾으면 이미 등록되어 있는 그 워크플로 유형을 사용합니다.

일치하는 항목을 찾지 못하면 워크플로를 검색하던 동일한 workflow_types 컬렉션에서 register를 호출하여 ‘swf-sns-workflow’, 버전 ‘1’ 및 다음 옵션을 사용한 새로운 워크플로 유형이 등록됩니다.

options = { :default_child_policy => :terminate, :default_task_start_to_close_timeout => 3600, :default_execution_start_to_close_timeout => 24 * 3600 }

등록 중 전달된 옵션을 사용하여 해당 워크플로 유형의 기본 동작을 설정하며, 따라서 새 워크플로 실행을 시작할 때마다 이러한 값을 설정할 필요는 없습니다.

여기서는 작업 시작 시점부터 작업이 닫힌 시점까지 걸릴 수 있는 최대 시간(1시간), 워크플로 실행을 완료하는 데 걸릴 수 있는 최대 시간(24시간) 등 제한 시간 값 몇 개를 설정합니다. 이러한 시간 중 하나가 초과되면 작업 또는 워크플로가 시간 초과됩니다.

제한 시간 값에 대한 자세한 내용은 Amazon SWF 제한 시간 유형 단원을 참조하십시오.

결정 폴링

모든 워크플로 실행의 핵심은 결정자입니다. 결정자는 워크플로 자체의 실행 관리를 담당합니다. 결정자는 결정 작업을 수신한 다음 새 활동을 예약해 활동을 다시 시작하거나 워크플로 실행 상태를 완료, 취소됨 또는 실패로 설정해 결정 작업에 응답합니다.

결정자는 워크플로 실행의 작업 목록 이름을 사용해 응답할 결정 작업을 수신합니다. 결정 작업을 폴링하려면 도메인의 decision_tasks 컬렉션에 대해 폴링을 직접적으로 호출하여 사용 가능한 결정 작업을 반복합니다. 그런 다음 결정 작업의 new_events 모음에 대해 반복해 결정 작업에 새 이벤트가 있는지 확인할 수 있습니다.

반환된 이벤트는 AWS::SimpleWorkflow::HistoryEvent 객체이고 반환된 이벤트의 event_type 멤버를 사용해 이벤트 유형을 가져올 수 있습니다. 기록 이벤트 유형의 목록 및 설명은 Amazon Simple Workflow Service API 참조HistoryEvent를 참조하십시오.

다음은 결정 작업 Poller 로직의 시작 부분으로, 워크플로 클래스 poll_for_decisions의 새 메서드입니다.

def poll_for_decisions # first, poll for decision tasks... @domain.decision_tasks.poll(@workflowId) do | task | task.new_events.each do | event | case event.event_type

이제, 수신된 event_type을 기반으로 결정자의 실행을 분기합니다. 처음 수신할 수 있는 이벤트 유형은 WorkflowExecutionStarted입니다. 이 이벤트가 수신되면 Amazon SWF에서 결정자에게 워크플로 실행을 시작해야 한다는 신호를 보낸 것입니다. 폴링 중 수신한 작업에 대해 schedule_activity_task를 호출해 첫 번째 활동을 예약하는 것부터 시작합니다.

활동 목록에서 선언한 첫 번째 활동을 결정자에게 전달합니다. 활동 목록을 스택처럼 사용할 수 있도록 활동 목록을 반전시켰으므로 첫 번째 활동이 목록의 last 위치를 차지합니다. 정의한 "활동"은 이름 및 버전 번호로 구성된 맵인데, 활동이 이미 등록되어 있다고 가정하면 이러한 맵이 Amazon SWF에서 예약을 위해 활동을 식별하는 데 필요한 정보로 충분합니다.

when 'WorkflowExecutionStarted' # schedule the last activity on the (reversed, remember?) list to # begin the workflow. puts "** scheduling activity task: #{@activity_list.last[:name]}" task.schedule_activity_task( @activity_list.last, { :workflowId => "#{@workflowId}-activities" } )

활동을 예약하면 Amazon SWF는 활동 예약 중 전달한 활동 작업 목록으로 활동 작업을 보내 해당 작업을 시작하라고 신호를 보냅니다. 구독 워크플로 자습서 파트 3: 활동 구현 단원의 활동 작업을 처리할 예정이지만 여기서는 이 작업을 실행하지는 않습니다. 여기서는 Amazon SWF에 해당 작업을 예약해야 한다고 알리기만 합니다.

처리해야 하는 다음 활동은 ActivityTaskCompleted 이벤트로, Amazon SWF에서 활동 작업의 활동 완료 응답을 수신한 경우 발생합니다.

when 'ActivityTaskCompleted' # we are running the activities in strict sequential order, and # using the results of the previous activity as input for the next # activity. last_activity = @activity_list.pop if(@activity_list.empty?) puts "!! All activities complete! Sending complete_workflow_execution..." task.complete_workflow_execution return true; else # schedule the next activity, passing any results from the # previous activity. Results will be received in the activity # task. puts "** scheduling activity task: #{@activity_list.last[:name]}" if event.attributes.has_key?('result') task.schedule_activity_task( @activity_list.last, { :input => event.attributes[:result], :workflowId => "#{@workflowId}-activities" } ) else task.schedule_activity_task( @activity_list.last, { :workflowId => "#{@workflowId}-activities" } ) end end

작업을 선형으로 실행하고 활동은 한 번에 하나만 실행되므로 이 기회에 activity_list 스택에서 완료된 작업이 표시되도록 합니다. 그러면 목록이 비게 되고 워크플로가 완료되었음을 알게 됩니다. 이 경우에는 작업에 대해 complete_workflow_execution을 호출하여 워크플로가 완료되었음을 Amazon SWF에 알립니다.

목록에 아직 항목이 있는 경우 목록의 (마지막 위치에 있는) 다음 활동을 예약합니다. 그러나 이 경우에는 완료 시 이전 활동에서 Amazon SWF에 결과 데이터를 반환했는지 살펴볼 것입니다. 이러한 데이터는 이벤트 속성의 선택적 result 키에서 워크플로에 제공됩니다. 활동이 결과를 생성하면 활동 작업 목록과 함께 이러한 결과를 다음 예약된 활동에 input 옵션으로 전달합니다.

완료된 활동의 result 값을 검색하고 예약된 활동의 input 값을 설정하여 하나의 활동에서 다음 활동으로 데이터를 전달하거나 한 활동의 데이터를 사용해 활동의 결과를 바탕으로 결정자의 동작을 변경할 수 있습니다.

이 자습서의 용도에 맞춰 워크플로 동작을 정의할 때 이러한 두 가지 이벤트 유형이 가장 중요합니다. 그러나 활동이 ActivityTaskCompleted 이외의 이벤트를 생성할 수 있습니다. ActivityTaskTimedOutActivityTaskFailed 이벤트와 실행할 활동이 부족할 때 Amazon SWF가 complete_workflow_execution 직접 호출을 처리할 때 생성되는 WorkflowExecutionCompleted 이벤트에 대한 데모 핸들러 코드를 제공하여 결정자 코드를 마무리합니다.

when 'ActivityTaskTimedOut' puts "!! Failing workflow execution! (timed out activity)" task.fail_workflow_execution return false when 'ActivityTaskFailed' puts "!! Failing workflow execution! (failed activity)" task.fail_workflow_execution return false when 'WorkflowExecutionCompleted' puts "## Yesss, workflow execution completed!" task.workflow_execution.terminate return false end end end end

워크플로 실행 시작

워크플로에서 폴링할 결정 작업이 생성되기 전에 워크플로 실행을 시작해야 합니다.

워크플로 실행을 시작하려면 등록된 워크플로 유형(AWS::SimpleWorkflow::WorkflowType)에서 start_execution을 직접적으로 호출하십시오. 클래스 생성자에서 검색한 workflow_type 인스턴스 멤버를 사용하기 위해 이러한 호출과 관련된 작은 래퍼를 정의할 것입니다.

def start_execution workflow_execution = @workflow_type.start_execution( { :workflowId => @workflowId } ) poll_for_decisions end end

워크플로우가 실행되면 결정 이벤트가 시작되어 워크플로우의 작업 목록에 나타나는데, 결정 이벤트는 start_execution에서 워크플로우 실행 옵션으로 전달됩니다.

워크플로 유형 등록 시 제공된 옵션과 달리 start_execution에 전달된 옵션은 워크플로 유형의 일부로 간주되지 않습니다. 이러한 옵션은 워크플로 버전을 변경하지 않고 워크플로 실행당 자유롭게 변경할 수 있습니다.

파일을 실행할 때 워크플로가 실행을 시작하도록 하려고 클래스를 인스턴스화한 다음 방금 정의한 start_execution 메서드를 호출하는 코드를 추가합니다.

if __FILE__ == $0 require 'securerandom' # Use a different task list name every time we start a new workflow execution. # # This avoids issues if our pollers re-start before SWF considers them closed, # causing the pollers to get events from previously-run executions. workflowId = SecureRandom.uuid # Let the user start the activity worker first... puts "" puts "Amazon SWF Example" puts "------------------" puts "" puts "Start the activity worker, preferably in a separate command-line window, with" puts "the following command:" puts "" puts "> ruby swf_sns_activities.rb #{workflowId}-activities" puts "" puts "You can copy & paste it if you like, just don't copy the '>' character." puts "" puts "Press return when you're ready..." i = gets # Now, start the workflow. puts "Starting workflow execution." sample_workflow = SampleWorkflow.new(workflowId) sample_workflow.start_execution end

작업 목록 이름이 충돌하지 않도록 SecureRandom.uuid를 사용해 작업 목록 이름으로 사용할 수 있는 랜덤 UUID를 생성하여 워크플로 실행마다 다른 작업 목록 이름이 사용되도록 할 것입니다.

참고

작업 목록은 워크플로 실행에 대한 이벤트를 기록하는 데 사용됩니다. 따라서 같은 워크플로 유형의 여러 실행에 대해 동일한 작업 목록을 사용하면 이전 실행 중 생성된 이벤트를 수신할 수 있습니다. 워크플로 유형을 거의 연속으로 실행하는 경우에는 특히 더 그렇고, 새 코드를 시험해 보거나 테스트를 실행하는 경우가 종종 여기에 해당합니다.

이전 실행의 결과물을 처리해야 하는 문제를 방지하기 위해 각 실행마다 새 작업 목록을 사용할 수 있습니다. 워크플로 실행을 시작할 때 새 작업 목록을 지정하면 됩니다.

또한 여기에는 워크플로를 실행하는 사람(아마도 여러분)에게 지침을 제공하고 작업 목록의 "활동" 버전을 제공하는 코드도 있습니다. 결정자는 작업 목록 이름을 사용해 워크플로에 대한 활동을 예약하고 활동 구현 시 해당 작업 목록 이름에 대한 활동 이벤트를 수신해 예약된 활동이 시작된 시점과 활동 실행에 대한 업데이트가 제공된 시점을 파악합니다.

또한 코드는 자신이 워크플로 실행을 시작하기 전에 사용자가 활동 시작자를 실행하도록 대기합니다. 그러면 활동 시작자는 제공된 작업 목록에 나타나는 활동 작업이 시작할 때 응답할 준비가 됩니다.

다음 단계

이제 워크플로를 구현했습니다. 다음으로 구독 워크플로 자습서 파트 3: 활동 구현 단원에서 활동과 활동 시작자를 정의해 보겠습니다.