訂購工作流程教學第 2 部分:實作工作流程 - Amazon Simple Workflow Service

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

訂購工作流程教學第 2 部分:實作工作流程

到目前為止,我們的程式碼顯得較通用。因此,我們需要開始實際定義工作流程的運作方式,以及實作工作流程所需的活動。

設計工作流程

回想一下,此工作流程最初的構想包含了下列步驟:

  1. 從使用者取得訂閱地址 (電子郵件或簡訊)。

  2. 建立 SNS 主題並將提供的端點訂閱到主題。

  3. 等待使用者確認訂閱。

  4. 如果使用者確認,將發佈賀辭到主題。

我們可以將工作流程中的每個步驟視為必須執行的「活動」。「工作流程」負責排程每個活動以於適當的時間執行,以及協調活動之間的資料傳輸。

針對此工作流程,我們將為所有這些步驟建立不同的活動,並以描述性的名稱將活動命名為:

  1. get_contact_activity

  2. subscribe_topic_activity

  3. wait_for_confirmation_activity

  4. send_result_activity

這些活動將會依序執行,且每個步驟中的資料將用於後續步驟。

我們可以設計應用程式,讓所有程式碼都存在於一個原始檔案,但這樣會與 Amazon SWF 的設計用意相反。後者是針對可跨整個網際網路規模的工作流程所設計,因此讓我們將應用程式至少分為兩個不同的執行檔:

  • swf_sns_workflow.rb - 包含工作流程和工作流程啟動者。

  • swf_sns_activities.rb - 包含活動和活動啟動者。

工作流程和活動實作可以在不同的視窗、不同的電腦,甚至在世界上不同的區域中執行。因為 Amazon SWF 會記錄您工作流程和活動的詳細資訊,所以您的工作流程可以協調您活動的排程和資料傳輸,而不論其運行位置為何。

設定工作流程程式碼

我們將從建立稱為 swf_sns_workflow.rb 的檔案開始。在此檔案中,宣告稱為 SampleWorkflow 的類別。以下是類別宣告和其建構函數:initialize 方法。

require_relative 'utils.rb' # SampleWorkflow - the main workflow for the SWF/SNS Sample # # See the file called `README.md` for a description of what this file does. class SampleWorkflow attr_accessor :name def initialize(workflowId) # the domain to look for decision tasks in. @domain = init_domain # the task list is used to poll for decision tasks. @workflowId = workflowId # The list of activities to run, in order. These name/version hashes can be # passed directly to AWS::SimpleWorkflow::DecisionTask#schedule_activity_task. @activity_list = [ { :name => 'get_contact_activity', :version => 'v1' }, { :name => 'subscribe_topic_activity', :version => 'v1' }, { :name => 'wait_for_confirmation_activity', :version => 'v1' }, { :name => 'send_result_activity', :version => 'v1' }, ].reverse! # reverse the order... we're treating this like a stack. register_workflow end

如您所見,我們會保留下列類別執行個體資料:

  • domain - utils.rb 中擷取自 init_domain 的網域名稱。

  • workflowId - 傳入 initialize 的任務清單。

  • activity_list - 活動清單,其具有我們將執行之活動的名稱和版本。

Amazon SWF 可以使用域名稱、活動名稱和活動版本,讓 Amazon SWF 積極識別活動類型,因此這些資料就是我們需要保留的所有資料,讓 Amazon SWF 排程活動。

工作流程「決策者」程式碼將會使用任務清單來輪詢決策任務及排程活動。

在此函數結束時,我們呼叫尚未定義的方法:register_workflow。 我們接下來將定義此方法。

註冊工作流程

若要使用工作流程類型,我們必須先予以註冊。如同活動類型,工作流程類型是以其網域、名稱和版本進行識別。另外,如同網域和活動類型,您無法重新註冊現有的工作流程類型。如果您需要變更有關工作流程類型的任何資訊,則必須提供新版本,基本上即會建立新的類型。

下列 register_workflow 程式碼用來擷取我們在先前執行上註冊的現有工作流程類型,或者註冊尚未註冊的工作流程。

# Registers the workflow def register_workflow workflow_name = 'swf-sns-workflow' @workflow_type = nil # a default value... workflow_version = '1' # Check to see if this workflow type already exists. If so, use it. @domain.workflow_types.each do | a | if (a.name == workflow_name) && (a.version == workflow_version) @workflow_type = a end end if @workflow_type.nil? options = { :default_child_policy => :terminate, :default_task_start_to_close_timeout => 3600, :default_execution_start_to_close_timeout => 24 * 3600 } puts "registering workflow: #{workflow_name}, #{workflow_version}, #{options.inspect}" @workflow_type = @domain.workflow_types.register(workflow_name, workflow_version, options) end puts "** registered workflow: #{workflow_name}" end

首先要藉由逐一查看網域的 workflow_types 集合,確認是否已註冊工作流程名稱和版本。如果我們找到相符項目,就會使用已註冊的工作流程類型。

如果我們找不到匹配項,則註冊一個新的工作流類型(通過調用註冊在相同的workflow_types集合,我們正在搜索的工作流)名稱為 'swf-sns-Workflow',版本 '1',以及以下選項。

options = { :default_child_policy => :terminate, :default_task_start_to_close_timeout => 3600, :default_execution_start_to_close_timeout => 24 * 3600 }

在註冊期間傳遞的選項會用來設定工作流程類型的「預設行為」,因此不需要在每次開始新的工作流程執行時設定這些值。

我們在這裡只會設定一些逾時值:從任務開始到結束所需的最長時間 (一小時),以及工作流程執行完成所需的最長時間 (24 小時)。如果超過這兩個時間中的其中一個,則任務或工作流程將會逾時。

如需逾時值的詳細資訊,請參閱「Amazon SWF 超時類型 」。

輪詢決策

每個工作流程執行的核心即為「決策者」。決策者的責任在於管理工作流程本身的執行。決策者會收到並回應「決策任務」,方法是排程新活動、取消並重新啟動活動,或是將工作流程執行的狀態設定為完成、已取消或失敗。

決策者會使用工作流程執行的「任務清單」名稱來接收要回應的決策任務。若要輪詢決策任務,請呼叫民意調查在域的決策任務集合來循環可用的決策任務。您接著可以逐一查看 new_events 集合,來檢查決策任務中的新事件。

傳回的事件為 AWS::SimpleWorkflow::HistoryEvent 物件,而使用所傳回事件的 event_type 成員即可取得事件類型。如需歷史記錄事件類型的清單和描述,請參HistoryEvent 佈中的Amazon Simple Workflow Service API 參考

以下是決策任務輪詢器邏輯的開始。工作流程類別中的新方法稱為 poll_for_decisions

def poll_for_decisions # first, poll for decision tasks... @domain.decision_tasks.poll(@workflowId) do | task | task.new_events.each do | event | case event.event_type

我們現在將根據收到的 event_type 來分支處理決策者的執行。我們可能會收到的第一個類型為 WorkflowExecutionStarted。收到此事件時,表示 Amazon SWF 會對您的決策者發出訊號,通知決策者應開始工作流程執行。首先,對輪詢時收到的任務呼叫 schedule_activity_task 以排程第一個活動。

我們會將活動清單中宣告的第一個活動傳遞給它,但因為我們會反轉清單如堆疊一般使用,所以第一個活動會佔用清單上的 last 位置。我們定義的「活動」就只是包含名稱和版本編號的對應,但這些足以讓 Amazon SWF 識別排程活動,並假設活動已註冊。

when 'WorkflowExecutionStarted' # schedule the last activity on the (reversed, remember?) list to # begin the workflow. puts "** scheduling activity task: #{@activity_list.last[:name]}" task.schedule_activity_task( @activity_list.last, { :workflowId => "#{@workflowId}-activities" } )

當我們安排活動時,Amazon SWF 會發送活動任務傳入的活動任務清單,並通知排程任務時傳入的動作任務清單,並通知任務開始。我們將在「訂工作流程教學第 3 部分:實作活動」中處理活動任務,但需注意的是我們並不會在此執行任務。我們只會告知 Amazon SWF 應計劃

我們需要解決的下一個活動是ActivityTaskCompleted事件,而這會在 Amazon SWF 收到來自活動任務的活動完成回應時會發生。

when 'ActivityTaskCompleted' # we are running the activities in strict sequential order, and # using the results of the previous activity as input for the next # activity. last_activity = @activity_list.pop if(@activity_list.empty?) puts "!! All activities complete! Sending complete_workflow_execution..." task.complete_workflow_execution return true; else # schedule the next activity, passing any results from the # previous activity. Results will be received in the activity # task. puts "** scheduling activity task: #{@activity_list.last[:name]}" if event.attributes.has_key?('result') task.schedule_activity_task( @activity_list.last, { :input => event.attributes[:result], :workflowId => "#{@workflowId}-activities" } ) else task.schedule_activity_task( @activity_list.last, { :workflowId => "#{@workflowId}-activities" } ) end end

由於我們會以線性形式執行任務,且一次僅執行一個活動,因此我們可藉機從 activity_list 堆疊中跳出已完成的任務。如果此結果為空白清單,則表示我們的工作流程已完成。在這種情況下,我們向 Amazon SWF 通過調用完成工作流程執行在任務上。

如果清單上仍有項目,我們會排程清單上的下一個活動 (一樣會在最後一個位置)。不過,這次我們會透過選用的索引鍵,確認前一個活動在完成後是否有任何結果資料傳回,而這會以事件屬性提供給工作流程。result金鑰。如果活動有結果產生,我們會將之做為 input 選項連同活動任務清單一起傳遞給下一個排程的活動。

透過擷取已完成活動的 result 值,及設定已排程活動的 input 值,我們可以根據活動的結果,將資料從某個活動傳遞給下一個活動,或使用某個活動的資料來變更決策者的行為。

基於本教學的用途,這兩個事件類型在定義工作流程的行為時最為重要。不過,活動可能產生 ActivityTaskCompleted 以外的事件。我們將通過提供演示處理程序代碼來結束我們的決策者代碼ActivityTaskTimedOut活動任務失敗事件,以及工作流程執行已完成事件,該事件將在 Amazon SWF 處理complete_workflow_execution當我們運行的活動用完時,我們進行的調用。

when 'ActivityTaskTimedOut' puts "!! Failing workflow execution! (timed out activity)" task.fail_workflow_execution return false when 'ActivityTaskFailed' puts "!! Failing workflow execution! (failed activity)" task.fail_workflow_execution return false when 'WorkflowExecutionCompleted' puts "## Yesss, workflow execution completed!" task.workflow_execution.terminate return false end end end end

啟動工作流程執行

在產生工作流程要輪詢的任何決策任務以前,我們得先啟動工作流程執行。

要開始工作流程執行,請調用啟動執行註冊的工作流程類型(AWS:: SimpleWorkflow:: Workflow Simple Workflow。我們將對此定義小型包裝函式,以利用我們在類別建構函數中擷取的 workflow_type 執行個體成員。

def start_execution workflow_execution = @workflow_type.start_execution( { :workflowId => @workflowId } ) poll_for_decisions end end

工作流程執行之時,決策事件會開始出現在工作流程的任務清單上,而任務清單在 start_execution 中會以工作流程執行選項傳遞。

與註冊工作流程類型時提供的選項不同,傳遞給 start_execution 的選項不會視為工作流程類型的一部分。您可以自由地針對個別的工作流程執行變更這些項目,而不需要變更工作流程版本。

因為我們想要在執行檔案時開始執行工作流程,所以請新增可實例化類別的程式碼,然後呼叫剛才定義的 start_execution 方法。

if __FILE__ == $0 require 'securerandom' # Use a different task list name every time we start a new workflow execution. # # This avoids issues if our pollers re-start before SWF considers them closed, # causing the pollers to get events from previously-run executions. workflowId = SecureRandom.uuid # Let the user start the activity worker first... puts "" puts "Amazon SWF Example" puts "------------------" puts "" puts "Start the activity worker, preferably in a separate command-line window, with" puts "the following command:" puts "" puts "> ruby swf_sns_activities.rb #{workflowId}-activities" puts "" puts "You can copy & paste it if you like, just don't copy the '>' character." puts "" puts "Press return when you're ready..." i = gets # Now, start the workflow. puts "Starting workflow execution." sample_workflow = SampleWorkflow.new(workflowId) sample_workflow.start_execution end

為了避免任何任務清單命名衝突,我們將使用 SecureRandom.uuid 產生可用做任務清單名稱的隨機 UUID,藉以保證每個工作流程執行用的是不同的任務清單名稱。

注意

任務清單會用來記錄工作流程執行的事件,因此如果您在相同工作流程類型的多次執行當中使用相同的任務清單,則可能會取得上一次執行所產生的事件,尤其會發生在近乎連續執行工作流程的時候,大多在試用新程式碼或執行測試的情況下。

為了避免必須處理先前執行之成品的問題,我們可以針對每次執行都使用新的任務清單,在我們開始工作流程執行時予以指定。

這裡也有一些程式碼可提供說明給執行它的人員 (可能是您),以及提供任務清單的「活動」版本。決策者使用此任務清單名稱來排程工作流程的活動,而活動實作將會接聽此任務清單名稱的活動事件,知道何時開始排程的活動,以及提供活動執行的更新。

程式碼也會在啟動工作流程執行「之前」,等待使用者開始執行活動啟動者,因此活動任務開始出現於提供的任務清單時,活動啟動者就已準備好回應。

後續步驟

您已實作工作流程。接下來,您會在「訂工作流程教學第 3 部分:實作活動」中定義活動和啟動者程式碼。