本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
订阅工作流程教程第 4 部分:实现活动任务轮询器
在 Amazon SWF 中,运行工作流执行的活动任务会显示在活动任务列表中,该列表在您计划工作流中的活动时提供。
我们将实施一个基本的活动轮询器来处理工作流的这些任务,并在 Amazon SWF 将任务放入活动任务列表以启动活动时使用它来启动我们的活动。
首先,新建一个名为 swf_sns_activities.rb
的文件。我们将使用它:
-
将我们创建的活动类实例化。
-
将每个活动注册到 Amazon SWF。
-
轮询活动,并在其名称出现在活动任务列表上时对每个活动调用
do_activity
。
在 swf_sns_activities.rb
中,添加以下语句以需要我们定义的每个活动类。
require_relative 'get_contact_activity.rb' require_relative 'subscribe_topic_activity.rb' require_relative 'wait_for_confirmation_activity.rb' require_relative 'send_result_activity.rb'
现在,我们将创建类并提供一些初始化代码。
class ActivitiesPoller def initialize(domain, workflowId) @domain = domain @workflowId = workflowId @activities = {} # These are the activities we'll run activity_list = [ GetContactActivity, SubscribeTopicActivity, WaitForConfirmationActivity, SendResultActivity ] activity_list.each do | activity_class | activity_obj = activity_class.new puts "** initialized and registered activity: #{activity_obj.name}" # add it to the hash @activities[activity_obj.name.to_sym] = activity_obj end end
除了保存传入的域 和任务列表 以外,此代码还将我们创建的每个活动类实例化。由于每个类都会注册与其关联的活动(如需查看代码,请参阅 basic_activity.rb
),这足以让 Amazon SWF 知道我们将运行的所有活动。
对于每个实例化的活动,我们都将其存储在使用活动名称(如 get_contact_activity
)作为键的映射上,因此我们可在活动轮询器代码(接下来我们将定义这段代码)中轻松查找这些活动。
创建一个名为 poll_for_activities
的新方法,并调用域托管的 activity_tasks 上的 poll 来获取活动任务。
def poll_for_activities @domain.activity_tasks.poll(@workflowId) do | task | activity_name = task.activity_type.name
我们可从任务的 activity_type 成员获取活动名称。接下来,我们将使用与此任务关联的活动名称查找要对其运行 do_activity
的类,并向其传递此任务(其中包括应传输到活动的任何输入数据)。
# find the task on the activities list, and run it. if @activities.key?(activity_name.to_sym) activity = @activities[activity_name.to_sym] puts "** Starting activity task: #{activity_name}" if activity.do_activity(task) puts "++ Activity task completed: #{activity_name}" task.complete!({ :result => activity.results }) # if this is the final activity, stop polling. if activity_name == 'send_result_activity' return true end else puts "-- Activity task failed: #{activity_name}" task.fail!( { :reason => activity.results[:reason], :details => activity.results[:detail] } ) end else puts "couldn't find key in @activities list: #{activity_name}" puts "contents: #{@activities.keys}" end end end end
这段代码仅等待 do_activity
的完成,然后根据返回代码对此任务调用 complete! 或 fail!。
注意
启动最后一个活动后,此代码即退出轮询器,因为它已完成其任务并已启动所有活动。在您自己的 Amazon SWF 代码中,如果您的活动可能再次运行,您可能需要使活动轮询器无限期运行。
至此,我们的 ActivitiesPoller 类代码结束,但我们将在文件结尾再添加一些代码,以使用户可在命令行下运行它。
if __FILE__ == $0 if ARGV.count < 1 puts "You must supply a task-list name to use!" exit end poller = ActivitiesPoller.new(init_domain, ARGV[0]) poller.poll_for_activities puts "All done!" end
如果用户在命令行下运行该文件(向其传递活动任务列表作为第一个参数),则此代码将轮询器类实例化,并启动它,轮询活动。轮询器结束后(在它启动最后一个活动后),我们打印一条消息即退出。
活动轮询器就此结束。接下来,您只需运行代码,观察其如何发挥作用,如订阅工作流程教程:运行工作流程所述。