Segunda parte del tutorial acerca del flujo de trabajo de suscripción: implementación del flujo de trabajo - Amazon Simple Workflow Service

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Segunda parte del tutorial acerca del flujo de trabajo de suscripción: implementación del flujo de trabajo

Hasta ahora, nuestro código ha sido bastante genérico. Esta es la parte donde empezamos a definir realmente lo que hace nuestro flujo de trabajo y qué actividades necesitará para implementarlo.

Diseño del flujo de trabajo

Como recordará, la idea inicial de este flujo de trabajo consistía en los siguientes pasos:

  1. Obtenga una dirección de suscripción (correo electrónico o SMS) del usuario.

  2. Cree un tema de SNS y suscriba los puntos de conexión proporcionados al tema.

  3. Espere a que el usuario confirme la suscripción.

  4. Si el usuario la confirma, publique un mensaje de felicitación en el tema.

Puede pensar en cada paso del flujo de trabajo como una actividad que debe realizar. El flujo de trabajo es responsable de programar cada actividad en el momento oportuno y de coordinar la transferencia de datos entre actividades.

En este flujo de trabajo, crearemos una actividad independiente para cada uno de estos pasos, y les daremos nombres descriptivos:

  1. get_contact_activity

  2. subscribe_topic_activity

  3. wait_for_confirmation_activity

  4. send_result_activity

Estas actividades se ejecutarán por orden, y los datos de cada paso se utilizarán en el paso siguiente.

Podríamos diseñar nuestra aplicación de modo que todo el código se encuentre en un archivo de origen, pero esta estrategia iría en contra de la forma en que se ha diseñado Amazon SWF. De hecho, se ha diseñado para flujos de trabajo que abarcan la totalidad de Internet, por eso vamos a desglosar la aplicación en dos ejecutables distintos:

  • swf_sns_workflow.rb: contiene el flujo de trabajo y el iniciador de flujo de trabajo.

  • swf_sns_activities.rb: contiene las actividades y el iniciador de las actividades.

Las implementaciones de flujo de trabajo y actividades pueden ejecutarse en distintas ventanas, distintos equipos o incluso distintas partes del mundo. Puesto que Amazon SWF hace un seguimiento de los detalles de los flujos de trabajo y actividades, el flujo de trabajo puede coordinar las programaciones y transferencias de datos de las actividades sin importar donde se ejecuten.

Configuración del código del flujo de trabajo

Para empezar, cree un archivo llamado swf_sns_workflow.rb. En este archivo, declare una clase llamada SampleWorkflow. Esta es la declaración de clase y su constructor, el método initialize.

require_relative 'utils.rb' # SampleWorkflow - the main workflow for the SWF/SNS Sample # # See the file called `README.md` for a description of what this file does. class SampleWorkflow attr_accessor :name def initialize(workflowId) # the domain to look for decision tasks in. @domain = init_domain # the task list is used to poll for decision tasks. @workflowId = workflowId # The list of activities to run, in order. These name/version hashes can be # passed directly to AWS::SimpleWorkflow::DecisionTask#schedule_activity_task. @activity_list = [ { :name => 'get_contact_activity', :version => 'v1' }, { :name => 'subscribe_topic_activity', :version => 'v1' }, { :name => 'wait_for_confirmation_activity', :version => 'v1' }, { :name => 'send_result_activity', :version => 'v1' }, ].reverse! # reverse the order... we're treating this like a stack. register_workflow end

Como puede ver, mantenemos los siguientes datos de instancia de clase:

  • domain: el nombre del dominio recuperado de init_domain en utils.rb.

  • workflowId: la lista de tareas transmitida a initialize.

  • activity_list: la lista de actividades, que contiene los nombres y las versiones de las actividades que ejecutaremos.

El nombre de dominio, el nombre de la actividad y la versión de la actividad son suficientes para que Amazon SWF identifique positivamente un tipo de actividad, por lo que estos son todos los datos que necesitamos acerca de nuestras actividades para programarlas.

La lista de tareas utilizada por el código del decisor del flujo de trabajo para realizar sondeos para obtener tareas de decisión y actividades de programación.

Al final de esta función, llamamos a un método que aún no hemos definido: register_workflow. Definiremos este método a continuación.

Registro de un flujo de trabajo

Par utilizar un tipo de flujo de trabajo, primero es necesario registrarlo. Al igual que un tipo de actividad, un tipo de flujo de trabajo se identifica por su dominio, nombre y versión. Además, al igual que los dominios y tipos de actividad, no puede volver a registrar un tipo de flujo de trabajo existente. Si necesita cambiar algo de un tipo de flujo de trabajo, debe hacerlo mediante una nueva versión, lo que básicamente crea un nuevo tipo.

Este es el código para register_workflow, que se utiliza para recuperar el tipo de flujo de trabajo existente que hemos registrado en una ejecución anterior o para registrar el flujo de trabajo si aún no se ha registrado.

# Registers the workflow def register_workflow workflow_name = 'swf-sns-workflow' @workflow_type = nil # a default value... workflow_version = '1' # Check to see if this workflow type already exists. If so, use it. @domain.workflow_types.each do | a | if (a.name == workflow_name) && (a.version == workflow_version) @workflow_type = a end end if @workflow_type.nil? options = { :default_child_policy => :terminate, :default_task_start_to_close_timeout => 3600, :default_execution_start_to_close_timeout => 24 * 3600 } puts "registering workflow: #{workflow_name}, #{workflow_version}, #{options.inspect}" @workflow_type = @domain.workflow_types.register(workflow_name, workflow_version, options) end puts "** registered workflow: #{workflow_name}" end

En primer lugar, para comprobar si el nombre y la versión del flujo de trabajo ya están registrados, procedemos a la iteración a través de la colección de workflow_types del dominio. Si se encuentra una coincidencia, utilizaremos el tipo de flujo de trabajo que ya esté registrado.

Si no es así, se registra un nuevo tipo de flujo de trabajo (llamando al registro en la misma colección de workflow_types en la que buscamos el flujo de trabajo) con el nombre 'swf-sns-workflow', versión '1', y las siguientes opciones.

options = { :default_child_policy => :terminate, :default_task_start_to_close_timeout => 3600, :default_execution_start_to_close_timeout => 24 * 3600 }

Las opciones transmitidas durante el registro se utilizan para configurar el comportamiento predeterminado del tipo de trabajo. De este modo, no es necesario configurar estos valores cada vez que se inicia una nueva ejecución del flujo de trabajo.

Aquí, se definen solo algunos valores de tiempo de espera: la duración máxima desde el momento en que se inicia la tarea hasta que se cierra (una hora) y la duración máxima hasta que se completa el flujo de trabajo (24 horas). Si se supera cualquiera de estos plazos, la tarea o el flujo de trabajo agotarán el tiempo de espera.

Para obtener más información acerca de los valores de tiempo de espera, consulte Tipos de tiempo de espera de Amazon SWF .

Sondeo de decisiones

En el centro de cada ejecución de flujo de trabajo se encuentra un decisor. La responsabilidad del decisor consiste en administrar la ejecución del flujo de trabajo en sí. El decisor recibe tareas de decisión y responde a ellas bien programando nuevas actividades, cancelando y reiniciando actividades, o definiendo el estado de la ejecución del flujo de trabajo como completo, cancelado o erróneo.

El decisor utiliza el nombre de la lista de tareas de la ejecución del flujo de trabajo para recibir tareas de decisión y responder a ellas. Si desea realizar sondeos para obtener tareas de decisión, llame al sondeo de la colección de decision_tasks del dominio para recorrer las tareas de decisión disponibles. A continuación, podrá buscar nuevos eventos en la tarea de decisión realizando una iteración en su colección de new_events.

Los eventos devueltos son objetos AWS::SimpleWorkflow::HistoryEvent. Para obtener el tipo de evento, utilice el miembro event_type correspondiente. Para obtener una lista y una descripción de los tipos de eventos del historial, consulte HistoryEvent en la Referencia de la API de Amazon Simple Workflow Service.

Este es el principio de la lógica del sondeador de tareas de decisión. Un nuevo método de la clase de flujo de trabajo ha llamado poll_for_decisions.

def poll_for_decisions # first, poll for decision tasks... @domain.decision_tasks.poll(@workflowId) do | task | task.new_events.each do | event | case event.event_type

Ahora crearemos ramificaciones de la ejecución del decisor según el event_type recibido. El primer tipo que es probable que reciba es WorkflowExecutionStarted. Cuando se recibe este evento, significa que Amazon SWF señala al decisor que debe empezar la ejecución del flujo de trabajo. Para comenzar, programe la primera actividad llamando a schedule_activity_task en la tarea que se recibió durante el sondeo.

Transmitiremos la primera actividad que declaramos en la lista de actividades, que, como invertimos la lista para poder usarla como una pila, ocupa la posición last de la lista. Las “actividades” que definimos son solo mapas compuestos del nombre y el número de versión, pero eso es todo lo que necesita Amazon SWF para identificar la actividad que se va a programar, en el supuesto caso de que la actividad ya se ha registrado.

when 'WorkflowExecutionStarted' # schedule the last activity on the (reversed, remember?) list to # begin the workflow. puts "** scheduling activity task: #{@activity_list.last[:name]}" task.schedule_activity_task( @activity_list.last, { :workflowId => "#{@workflowId}-activities" } )

Cuando se programa una actividad, Amazon SWF envía una tarea de actividad a la lista de tareas de actividad que transmitimos durante su programación, lo que indica que la tarea debe empezar. Las tareas de actividad se abordarán en Parte 3 del tutorial acerca del flujo de trabajo de suscripción: implementación de las actividades, pero merece la pena señalar que la tarea no se ejecuta aquí. Solo indicamos a Amazon SWF que esta debe programarse.

La siguiente actividad que es necesario abordar es el evento ActivityTaskCompleted, que ocurre cuando Amazon SWF ha recibido una respuesta de actividad completada de una tarea de actividad.

when 'ActivityTaskCompleted' # we are running the activities in strict sequential order, and # using the results of the previous activity as input for the next # activity. last_activity = @activity_list.pop if(@activity_list.empty?) puts "!! All activities complete! Sending complete_workflow_execution..." task.complete_workflow_execution return true; else # schedule the next activity, passing any results from the # previous activity. Results will be received in the activity # task. puts "** scheduling activity task: #{@activity_list.last[:name]}" if event.attributes.has_key?('result') task.schedule_activity_task( @activity_list.last, { :input => event.attributes[:result], :workflowId => "#{@workflowId}-activities" } ) else task.schedule_activity_task( @activity_list.last, { :workflowId => "#{@workflowId}-activities" } ) end end

Como las tareas se ejecutan de forma lineal, y solo se ejecuta una actividad a la vez, este es un buen momento para mostrar la tarea completada de la pila activity_list. Si esto resulta en una lista vacía, quiere decir que se ha completado el flujo de trabajo. En ese caso, informamos a Amazon SWF de que se ha completado el flujo de trabajo; para ello, llamamos a complete_workflow_execution en la tarea.

En caso de que la lista aún tenga entradas, programaremos la siguiente actividad de la lista (de nuevo, en la última posición). Sin embargo, esta vez intentaremos comprobar si la actividad anterior devolvió algún dato de resultados a Amazon SWF tras finalizar, dato que se proporciona al flujo de trabajo en los atributos del evento, en la clave result opcional. Si la actividad generó un resultado, lo transmitiremos como la opción input a la siguiente actividad programada, junto con la lista de tareas de actividad.

Al recuperar los valores result de las actividades completadas, y al configurar los valores input de las actividades programadas, es posible transmitir datos de una actividad a la siguiente, o utilizar datos de una actividad para modificar el comportamiento del decisor en función de los resultados de una actividad.

A efectos de este tutorial, estos dos tipos de eventos son los más importantes a la hora de definir el comportamiento del flujo de trabajo. Sin embargo, una actividad puede generar eventos distintos a ActivityTaskCompleted. Para finalizar el código del decisor, proporcionaremos un código de controlador de demostración para los eventos ActivityTaskTimedOut y ActivityTaskFailed, así como para el evento WorkflowExecutionCompleted, que se generarán cuando Amazon SWF procese la llamada a complete_workflow_execution que debemos enviar cuando ya no haya más actividades que ejecutar.

when 'ActivityTaskTimedOut' puts "!! Failing workflow execution! (timed out activity)" task.fail_workflow_execution return false when 'ActivityTaskFailed' puts "!! Failing workflow execution! (failed activity)" task.fail_workflow_execution return false when 'WorkflowExecutionCompleted' puts "## Yesss, workflow execution completed!" task.workflow_execution.terminate return false end end end end

Comienzo de la ejecución del flujo de trabajo

Antes de que el flujo de trabajo pueda realizar sondeos para obtener tareas de decisión, es necesario comenzar la ejecución del flujo de trabajo.

Para comenzar la ejecución del flujo de trabajo, llame a start_execution en el tipo de flujo de trabajo registrado (AWS::SimpleWorkflow::WorkflowType). Definiremos un pequeño encapsulador a este nivel a fin de utilizar el miembro de instancia workflow_type que recuperamos en el constructor de clase.

def start_execution workflow_execution = @workflow_type.start_execution( { :workflowId => @workflowId } ) poll_for_decisions end end

Una vez que el flujo de trabajo se esté ejecutando, los eventos de decisión empezarán a aparecer en la lista de tareas del flujo de trabajo, que se transmite como una opción de ejecución del flujo de trabajo de start_execution.

A diferencia de las opciones que se ofrecen cuando se registra el tipo de flujo de trabajo, las opciones que se transmiten a start_execution no se consideran parte del tipo de flujo de trabajo. Puede cambiarlas para cada ejecución de flujo de trabajo sin cambiar la versión del flujo de trabajo.

En la medida que desee empezar la ejecución del flujo de trabajo cuando ejecute el archivo, añada código que instancie la clase y luego llame al método start_execution que acaba de definir.

if __FILE__ == $0 require 'securerandom' # Use a different task list name every time we start a new workflow execution. # # This avoids issues if our pollers re-start before SWF considers them closed, # causing the pollers to get events from previously-run executions. workflowId = SecureRandom.uuid # Let the user start the activity worker first... puts "" puts "Amazon SWF Example" puts "------------------" puts "" puts "Start the activity worker, preferably in a separate command-line window, with" puts "the following command:" puts "" puts "> ruby swf_sns_activities.rb #{workflowId}-activities" puts "" puts "You can copy & paste it if you like, just don't copy the '>' character." puts "" puts "Press return when you're ready..." i = gets # Now, start the workflow. puts "Starting workflow execution." sample_workflow = SampleWorkflow.new(workflowId) sample_workflow.start_execution end

Para evitar conflictos de nombres en la lista de tareas, utilice SecureRandom.uuid para generar UUID aleatorios que se utilizarán como nombre de la lista de tareas, lo que garantiza que se empleará un nombre de tarea distinto para cada ejecución del flujo de trabajo.

nota

Las listas de tareas se utilizan para registrar eventos en torno a la ejecución del flujo de trabajo, por lo que si utiliza la misma lista de tareas para varias ejecuciones del mismo tipo de flujo de trabajo, podría obtener eventos que se generaron durante una ejecución anterior, en especial si las ejecuta de forma casi consecutiva, lo que es a menudo el caso cuando se prueba nuevo código o se realizan otras pruebas.

Para evitar el problema de tener que tratar con elementos de ejecuciones anteriores, es posible utilizar una nueva lista de tareas para cada ejecución, especificándola al empezar la ejecución del flujo de trabajo.

También es necesario algo de código para proporcionar instrucciones a la persona a cargo de la ejecución (probablemente usted), y proporcionar la versión de la "actividad" de la lista de tareas. El decisor utilizará el nombre de esta lista de tareas para programar actividades para el flujo de trabajo, en tanto que la implementación de actividades prestará atención a los eventos de actividad en el nombre de esta lista de tareas para saber cuándo empezar las actividades programadas y proporcionar actualizaciones sobre la ejecución de la actividad.

El código también espera a que el usuario comience a ejecutar el iniciador de actividades antes de comenzar la ejecución del flujo de trabajo. El iniciador de actividades estará entonces listo para responder cuando las tareas de actividad empiecen a aparecer en la lista de tareas proporcionada.

Pasos siguientes

Ha implementado el flujo de trabajo. A continuación, definirá las actividades y un iniciador de actividades en Parte 3 del tutorial acerca del flujo de trabajo de suscripción: implementación de las actividades.