サブスクリプションワークフローのチュートリアルのパート 2: ワークフローの実装 - Amazon Simple Workflow Service

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

サブスクリプションワークフローのチュートリアルのパート 2: ワークフローの実装

これまでは、コードはかなり汎用的でした。この部分で、ワークフローが何を行うか、またそれを実装するのにどのようなアクティビティが必要になるかを実際に定義し始めます。

ワークフローの設計

思い返して頂くと、このワークフローの最初のアイデアは以下のステップで構成されていました。

  1. ユーザーからサブスクリプションアドレス (E メールまたは SMS) を取得します。

  2. SNS トピックを作成し、トピックに対して提供されたエンドポイントにサブスクライブします。

  3. ユーザーによるサブスクリプションの確認を待機します。

  4. ユーザーが確認した場合、トピックに対して成功のメッセージを発行します。

ワークフローの各ステップのことを、実行する必要のあるアクティビティと考えることができます。ワークフローは、適切なタイミングで各アクティビティをスケジュールし、アクティビティ間のデータ転送を調整します。

このワークフローでは、これらのステップの各々に別個のアクティビティを作成し、それらに記述的に名前を付けます。

  1. get_contact_activity

  2. subscribe_topic_activity

  3. wait_for_confirmation_activity

  4. send_result_activity

これらのアクティビティは順番に実行され、各ステップからのデータは、次のステップで使用されます。

コードすべてが 1 つのソースファイルに存在するようにアプリケーションを設計できますが、これは Amazon SWF の設計された方法に反しています。範囲がインターネット全体に及ぶワークフローのために設計されているので、少なくともアプリケーションを 2 つの別個の実行可能ファイルに分割しましょう。

  • swf_sns_workflow.rb - ワークフローおよびワークフロースターターが含まれています。

  • swf_sns_activities.rb - アクティビティおよびアクティビティスターターが含まれています。

ワークフローとアクティビティの実装は、別々のウィンドウ、別々のコンピュータ、または世界の異なる場所でも実行できます。Amazon SWF がワークフローとアクティビティの詳細を追跡しているため、ワークフローは、実行中の場所に関係なくアクティビティのスケジューリングとデータ転送を調整できます。

ワークフローコードの設定

swf_sns_workflow.rb というファイルを作成して開始します。このファイルでは、SampleWorkflow というクラスを宣言します。クラスの宣言とそのコンストラクタである initialize メソッドを次に示します。

require_relative 'utils.rb' # SampleWorkflow - the main workflow for the SWF/SNS Sample # # See the file called `README.md` for a description of what this file does. class SampleWorkflow attr_accessor :name def initialize(workflowId) # the domain to look for decision tasks in. @domain = init_domain # the task list is used to poll for decision tasks. @workflowId = workflowId # The list of activities to run, in order. These name/version hashes can be # passed directly to AWS::SimpleWorkflow::DecisionTask#schedule_activity_task. @activity_list = [ { :name => 'get_contact_activity', :version => 'v1' }, { :name => 'subscribe_topic_activity', :version => 'v1' }, { :name => 'wait_for_confirmation_activity', :version => 'v1' }, { :name => 'send_result_activity', :version => 'v1' }, ].reverse! # reverse the order... we're treating this like a stack. register_workflow end

ご覧のとおり、次のクラスインスタンスデータを保持しています。

  • domain - utils.rbinit_domain から取得したドメイン名。

  • workflowId - initialize に渡されるタスクリスト。

  • activity_list - 実行するアクティビティの名前とバージョンを持つアクティビティリスト。

Amazon SWF がアクティビティタイプを確実に識別するには、ドメイン名、アクティビティ名、アクティビティバージョンで十分であるため、スケジュールするためにアクティビティについて保持する必要があるデータはそれだけです。

タスクリストは、決定タスクをポーリングしアクティビティをスケジュールするために、ワークフローのディサイダーコードによって使用されます。

この関数の最後に、まだ定義していないメソッドである register_workflow を呼び出します。次にこのメソッドを定義します。

ワークフローの登録

ワークフロータイプを使用するには、まずそれを登録する必要があります。アクティビティタイプと同様、ワークフロータイプは、そのドメイン、名前、およびバージョンによって識別されます。また、ドメインもアクティビティタイプも同様ですが、既存のワークフロータイプを再登録することはできません。ワークフロータイプについて何か変更する必要がある場合は、それに新しいバージョンを提供する必要がありますが、それにより本質的に新しいタイプが作成されます。

register_workflow のコードは次のとおりです。以前の実行時に登録した既存のワークフロータイプを取得するため、またはまだ登録されていない場合はそのワークフローを登録するために使用されます。

# Registers the workflow def register_workflow workflow_name = 'swf-sns-workflow' @workflow_type = nil # a default value... workflow_version = '1' # Check to see if this workflow type already exists. If so, use it. @domain.workflow_types.each do | a | if (a.name == workflow_name) && (a.version == workflow_version) @workflow_type = a end end if @workflow_type.nil? options = { :default_child_policy => :terminate, :default_task_start_to_close_timeout => 3600, :default_execution_start_to_close_timeout => 24 * 3600 } puts "registering workflow: #{workflow_name}, #{workflow_version}, #{options.inspect}" @workflow_type = @domain.workflow_types.register(workflow_name, workflow_version, options) end puts "** registered workflow: #{workflow_name}" end

まず、ドメインの workflow_types コレクションを反復処理することにより、ワークフロー名とバージョンが既に登録されているかどうか確認します。一致が検出されると、既に登録されていたワークフロータイプを使用します。

一致するものが見つからない場合、(ワークフローを検索したのと同じ workflow_types コレクションで register を呼び出すことにより)名前を「swf-sns-workflow」、バージョンを「1」、オプションを次のようにした新しいワークフロータイプが登録されます。

options = { :default_child_policy => :terminate, :default_task_start_to_close_timeout => 3600, :default_execution_start_to_close_timeout => 24 * 3600 }

登録中に渡されるオプションは、ワークフロータイプのデフォルトの動作を設定するために使用されるので、新しいワークフロー実行を開始するたびにこれらの値を設定する必要はありません。

ここでは、タスクの開始から終了までの最大時間 (1 時間)、およびワークフロー実行が完了するまでにかかる最大時間 (24 時間) というタイムアウト値だけを設定します。これらの時間のどちらかを超過すると、タスクまたはワークフローがタイムアウトします。

タイムアウト値の詳細については、「Amazon SWF タイムアウトの種類 」を参照してください。

決定のポーリング

すべてのワークフロー実行の中心には、ディサイダーがあります。ディサイダーには、ワークフロー自体の実行を管理する責任があります。ディサイダーは決定タスクを取得し、新しいアクティビティのスケジューリングやアクティビティのキャンセルや再開を行うか、ワークフロー実行の状態を完了、キャンセル済み、または失敗と設定するかのどちらかでそれに応答します。

ディサイダーはワークフロー実行のタスクリスト名を使用して、応答する決定タスクを取得します。決定タスクをポーリングするには、ドメインの decision_tasks コレクションで poll を呼び出して、使用可能な決定タスクをループします。その後、new_events コレクションに対して反復処理することにより、その決定タスクに新しいイベントがないか確認できます。

返されるイベントは AWS::SimpleWorkflow::HistoryEvent オブジェクトで、その返されたイベントの event_type メンバーを使用することによって、イベントのタイプを取得できます。履歴イベントタイプのリストと説明については、Amazon Simple Workflow Service API リファレンス の「HistoryEvent」を参照してください。

以下に決定タスクポーラーのロジックの冒頭を示します。poll_for_decisions というワークフロークラスの新しいメソッドです。

def poll_for_decisions # first, poll for decision tasks... @domain.decision_tasks.poll(@workflowId) do | task | task.new_events.each do | event | case event.event_type

次に、取得する event_type に基づいてディサイダーの実行を分岐します。取得する可能性が高い最初のものは、WorkflowExecutionStarted です。このイベントを取得した場合、それは Amazon SWF がディサイダーにワークフロー実行を開始するよう合図していることを意味します。まず初めに、ポーリング中に取得したタスクで schedule_activity_task を呼び出すことで、最初のアクティビティをスケジューリングします。

それにアクティビティリストで宣言した最初のアクティビティを渡します。これは、スタックのように使用できるようリストを逆にしたために、リストの last 位置にあります。定義した「アクティビティ」は名前とバージョン番号で構成されたマップにすぎませんが、Amazon SWF がアクティビティは登録済みであると想定してスケジューリング用のアクティビティを識別するために必要とするのはこれだけです。

when 'WorkflowExecutionStarted' # schedule the last activity on the (reversed, remember?) list to # begin the workflow. puts "** scheduling activity task: #{@activity_list.last[:name]}" task.schedule_activity_task( @activity_list.last, { :workflowId => "#{@workflowId}-activities" } )

アクティビティをスケジュールすると、Amazon SWF はスケジューリング中に渡されるアクティビティタスクリストに アクティビティタスク を送り、開始すべきタスクを合図します。「サブスクリプションワークフローのチュートリアルのパート 3: アクティビティの実装」でアクティビティタスクを扱いますが、ここでそのタスクを実行しないことは注目に値します。Amazon SWF にはそれを scheduled (スケジュールする) ようにとだけ指示します。

取り組む必要のある次のアクティビティは ActivityTaskCompleted イベントで、これは Amazon SWF がアクティビティタスクからアクティビティ完了の応答を取得したときに発生します。

when 'ActivityTaskCompleted' # we are running the activities in strict sequential order, and # using the results of the previous activity as input for the next # activity. last_activity = @activity_list.pop if(@activity_list.empty?) puts "!! All activities complete! Sending complete_workflow_execution..." task.complete_workflow_execution return true; else # schedule the next activity, passing any results from the # previous activity. Results will be received in the activity # task. puts "** scheduling activity task: #{@activity_list.last[:name]}" if event.attributes.has_key?('result') task.schedule_activity_task( @activity_list.last, { :input => event.attributes[:result], :workflowId => "#{@workflowId}-activities" } ) else task.schedule_activity_task( @activity_list.last, { :workflowId => "#{@workflowId}-activities" } ) end end

タスクを直線的に実行して一度に 1 つのアクティビティのみを実行しているので、この機会を利用して activity_list スタックから完了済みタスクをポップします。この結果が空のリストなら、ワークフローが完了していることが分かります。この場合、タスクで complete_workflow_execution を呼び出すことにより、ワークフローが完了したことを Amazon SWF に通知します。

リストにまだエントリがあるイベントでは、リストで (この場合もやはり last 位置にある) 次のアクティビティをスケジュールします。ただし今回は、完了時に前のアクティビティが Amazon SWF に結果データを返したかどうかを確認します。そのデータは、イベントの属性、オプションの result キーでワークフローに提供されます。アクティビティが結果を生成した場合は、それを input オプションとして、アクティビティタスクリストと共に次のスケジュールされたアクティビティに渡します。

完了済みアクティビティの result 値を取得することにより、またスケジュールされたアクティビティの input 値を設定することにより、1 つのアクティビティから次のアクティビティへとデータを渡したり、アクティビティから取得したデータを使用して、アクティビティの結果に基づいてディサイダー内の動作を変更することができます。

このチュートリアルでは、これら 2 つのイベントタイプがワークフローの動作を定義するのに最も重要です。ただし、アクティビティは ActivityTaskCompleted 以外のイベントを生成することができます。ActivityTaskTimedOut イベントと ActivityTaskFailed イベント、および実行するアクティビティが不足したときに Amazon SWF が行う complete_workflow_execution 呼び出しの処理時に生成される WorkflowExecutionCompleted イベントのデモハンドラコードを提供して、ディサイダーコードをまとめます。

when 'ActivityTaskTimedOut' puts "!! Failing workflow execution! (timed out activity)" task.fail_workflow_execution return false when 'ActivityTaskFailed' puts "!! Failing workflow execution! (failed activity)" task.fail_workflow_execution return false when 'WorkflowExecutionCompleted' puts "## Yesss, workflow execution completed!" task.workflow_execution.terminate return false end end end end

ワークフロー実行の開始

ワークフローがポーリングするための決定タスクを生成する前に、ワークフロー実行を開始する必要があります。

ワークフローの実行を開始するには、登録済みのワークフロータイプ (AWS::SimpleWorkflow::WorkflowType) で start_execution を呼び出します。クラスコンストラクタで取得した workflow_type インスタンスメンバーを使用するため、これに小さなラッパーを定義します。

def start_execution workflow_execution = @workflow_type.start_execution( { :workflowId => @workflowId } ) poll_for_decisions end end

ワークフローが実行中になると、決定イベントがワークフローのタスクリスト上に含まれ始め、start_execution のワークフロー実行オプションとして渡されます。

ワークフロータイプが登録されるときに提供されるオプションとは異なり、start_execution に渡されるオプションは、ワークフロータイプの一部とは見なされません。これらは、ワークフローのバージョンを変更せずに、ワークフロー実行ごとに自由に変更できます。

ファイルを実行するときにワークフローが実行を開始するようにしたいので、クラスをインスタンス化してから今定義した start_execution メソッドを呼び出すコードを追加します。

if __FILE__ == $0 require 'securerandom' # Use a different task list name every time we start a new workflow execution. # # This avoids issues if our pollers re-start before SWF considers them closed, # causing the pollers to get events from previously-run executions. workflowId = SecureRandom.uuid # Let the user start the activity worker first... puts "" puts "Amazon SWF Example" puts "------------------" puts "" puts "Start the activity worker, preferably in a separate command-line window, with" puts "the following command:" puts "" puts "> ruby swf_sns_activities.rb #{workflowId}-activities" puts "" puts "You can copy & paste it if you like, just don't copy the '>' character." puts "" puts "Press return when you're ready..." i = gets # Now, start the workflow. puts "Starting workflow execution." sample_workflow = SampleWorkflow.new(workflowId) sample_workflow.start_execution end

タスクリストの名前の競合を避けるため、SecureRandom.uuid を使用してタスクリスト名として使用できるランダムな UUID を生成し、各ワークフロー実行に異なるタスクリスト名が使用されることを保証します。

注記

タスクリストはワークフロー実行に関するイベントを記録するために使用されるので、同じワークフロータイプの複数の実行に同じタスクリストを使用すると、前の実行中に生成されたイベントを取得する可能性があります。新しいコードの試行やテストの実行中によくあるケースですが、特に互いにすぐ連続して実行される場合に、その可能性があります。

前の実行からのアーティファクトに対処しなければならないという問題を回避するには、各実行に新しいタスクリストを使用して、ワークフロー実行の開始時にそれを指定できます。

ここでもまた、それを実行している人 (おそらくお客様) に手順を示し、タスクリストの「アクティビティ」バージョンを提供するコードが少しあります。ディサイダーはこのタスクリスト名を使用してワークフローのアクティビティをスケジュールし、アクティビティ実装は、このタスクリスト名のアクティビティイベントをリッスンして、そのスケジュールされたアクティビティをいつ開始するかを知り、アクティビティ実行に関する更新を提供します。

また、コードはワークフロー実行を開始する前にユーザーがアクティビティスターターの実行を開始するのを待機します。そのため、アクティビティタスクが提供されたタスクリストに表示され始めるときには、アクティビティスターターは応答する準備が整っています。

次のステップ

ワークフローは実装しました。次に、「サブスクリプションワークフローのチュートリアルのパート 3: アクティビティの実装」では、アクティビティとアクティビティスターターを定義します。