Partie du didacticiel sur le flux de travail d'abonnement Implémentation du flux de - Amazon Simple Workflow Service

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Partie du didacticiel sur le flux de travail d'abonnement Implémentation du flux de

Jusqu'à présent, notre code est assez générique. Nous entrons à présent dans la partie où l'on commence à définir réellement ce que fait notre flux de travail, ainsi que les activités nécessaires pour le mettre en œuvre.

Conception du flux de travail

Pour rappel, l'idée initiale de ce flux de travail comprenait les étapes suivantes :

  1. Obtenez une adresse d'abonnement (e-mail ou SMS) de la part de l'utilisateur.

  2. Créez une rubrique SNS et abonnez-y les points de terminaison fournis.

  3. Attendez que l'utilisateur confirme l'abonnement.

  4. Si l'utilisateur le confirme, publiez un message de félicitations dans la rubrique.

Nous pouvons considérer chaque étape de notre flux de travail comme activité qu'il doit exécuter. Le flux de travail est responsable de la planification de chaque activité à l'heure appropriée et de la coordination du transfert des données entre les activités.

Pour ce flux de travail, nous créerons une activité distincte pour chacune de ces étapes, en leur donnant le nom descriptif suivant :

  1. get_contact_activity

  2. subscribe_topic_activity

  3. wait_for_confirmation_activity

  4. send_result_activity

Ces activités seront exécutées dans l'ordre, et les données de chaque étape seront utilisées à l'étape suivante.

Nous pourrions concevoir notre application pour que tout le code se trouve dans un fichier source, mais cette approche va à l'encontre de la manière dont a été conçu Amazon SWF. En effet, il convient particulièrement aux flux de travail qui couvrent l'intégralité d'Internet. Dès lors, nous allons répartir l'application en deux fichiers exécutables distincts :

  • swf_sns_workflow.rb : contient le flux de travail et le démarreur.

  • swf_sns_activities.rb : contient les activités et leur démarreur.

Les implémentations du flux de travail et des activités peuvent être exécutées dans des fenêtres distinctes, sur des ordinateurs séparés, voire dans différentes régions du monde. Comme Amazon SWF assure le suivi des détails de vos flux de travail et de vos activités, votre flux de travail peut coordonner la planification et le transfert des données de vos activités, quel que soit l'endroit où elles sont exécutées.

Configuration du code de flux de travail

Nous commencerons par créer un fichier nommé swf_sns_workflow.rb. Dans ce fichier, déclarez une classe appelée SampleWorkflow. Voici la déclaration de classe et son constructeur, la méthode initialize.

require_relative 'utils.rb' # SampleWorkflow - the main workflow for the SWF/SNS Sample # # See the file called `README.md` for a description of what this file does. class SampleWorkflow attr_accessor :name def initialize(workflowId) # the domain to look for decision tasks in. @domain = init_domain # the task list is used to poll for decision tasks. @workflowId = workflowId # The list of activities to run, in order. These name/version hashes can be # passed directly to AWS::SimpleWorkflow::DecisionTask#schedule_activity_task. @activity_list = [ { :name => 'get_contact_activity', :version => 'v1' }, { :name => 'subscribe_topic_activity', :version => 'v1' }, { :name => 'wait_for_confirmation_activity', :version => 'v1' }, { :name => 'send_result_activity', :version => 'v1' }, ].reverse! # reverse the order... we're treating this like a stack. register_workflow end

Comme vous pouvez le voir, nous conservons les données d'instance de classe suivantes :

  • domain : nom de domaine extrait d'init_domain dans utils.rb.

  • workflowId : liste des tâches transmises à initialize.

  • activity_list : liste d'activités, avec le nom et la version des activités que nous allons exécuter.

Le nom de domaine, ainsi que le nom et la version de l'activité suffisent à pour qu'Amazon SWF identifie positivement un type d'activité. Dès lors, ce sont les seules données dont nous avons besoin concernant les activités afin de les planifier.

La liste des tâches est utilisée par le code de décideur du flux de travail pour rechercher les tâches de décision et les activités de planification.

A la fin de cette fonction, nous appelons une méthode que nous n'avons pas encore définie : register_workflow. Nous définirons cette méthode ensuite.

Enregistrement du flux de travail

Pour utiliser un type de flux de travail, nous devons tout d'abord l'enregistrer. Comme un type d'activité, un type de flux de travail est identifié par son domaine, son nom et sa version. En outre, comme les domaines et les types d'activités, vous ne peut pas ré-enregistrer un type de flux de travail existant. Si vous avez besoin de modifier un type de flux de travail, vous devez le faire via une nouvelle version, ce qui crée un autre type.

Voici le code du flux de travail register_workflow, qui est utilisé pour récupérer le type de flux de travail existant que nous avons enregistré lors d'une exécution précédente ou pour l'enregistrer si cela n'est pas déjà fait.

# Registers the workflow def register_workflow workflow_name = 'swf-sns-workflow' @workflow_type = nil # a default value... workflow_version = '1' # Check to see if this workflow type already exists. If so, use it. @domain.workflow_types.each do | a | if (a.name == workflow_name) && (a.version == workflow_version) @workflow_type = a end end if @workflow_type.nil? options = { :default_child_policy => :terminate, :default_task_start_to_close_timeout => 3600, :default_execution_start_to_close_timeout => 24 * 3600 } puts "registering workflow: #{workflow_name}, #{workflow_version}, #{options.inspect}" @workflow_type = @domain.workflow_types.register(workflow_name, workflow_version, options) end puts "** registered workflow: #{workflow_name}" end

Tout d'abord, pour vérifier si le nom et la version du flux de travail sont déjà enregistrés, nous procédons à l'itération via la collection workflow_types du domaine. Si nous constatons une correspondance, nous utiliserons le type de flux de travail qui a déjà été enregistré.

Si nous ne trouvons pas de correspondance, un nouveau type de flux de travail est enregistré (en appelant register au niveau de la même collection workflow_types que celle dans laquelle nous avons recherché le flux de travail) avec le nom « swf-sns-workflow », « version 1 » et les options suivantes.

options = { :default_child_policy => :terminate, :default_task_start_to_close_timeout => 3600, :default_execution_start_to_close_timeout => 24 * 3600 }

Les options transmises lors de l'enregistrement sont utilisées pour définir le comportement par défaut de notre type de flux de travail. Dès lors, nous n'avons pas besoin de définir ces valeurs chaque fois nous que nous débutons une nouvelle exécution de flux de travail.

Ici, nous définissons uniquement certaines valeurs de délai : la durée maximale entre le début d'une tâche et sa fin (une heure), et la durée maximale d'exécution du flux de travail (24 heures). Si un de ces délais est dépassé, la tâche ou le flux de travail expire.

Pour plus d'informations sur les valeurs de ces délais, consultez la section Types Amazon SWF Timeout .

Recherche de décisions

Un décideur se trouve au cœur de l'exécution de chaque flux de travail. La responsabilité du décideur consiste à gérer l'exécution du flux de travail lui-même. Il reçoit les tâches de décision et y répond soit en planifiant de nouvelles activités, en annulant des activités et en les redémarrant, soit en définissant l'état de l'exécution du flux de travail comme terminé, comme annulé ou comme ayant échoué.

Le décideur utilise le nom de la liste des tâches de l'exécution du flux de travail pour recevoir des tâches de décision et y répondre. Pour rechercher des tâches de décision, appelez pool au niveau de la collection decision_tasks du domaine afin de parcourir les tâches de décision disponibles. Vous pouvez ensuite rechercher les nouveaux événements dans la tâche de la décision en procédant à une itération avec sa collection new_events.

Les événements renvoyés sont les objets AWS::SimpleWorkflow::HistoryEvent. Pour obtenir le type d'événement, utilisez le membre event_type correspondant. Pour obtenir une liste et une description des types d'événements d'historique, consultez la sectionHistoryEventdans leRéférence de l'API Amazon Simple Workflow Service.

Voici le début de la logique de l'observateur de tâches de décision. Une nouvelle méthode dans notre classe de flux de travail a appelé poll_for_decisions.

def poll_for_decisions # first, poll for decision tasks... @domain.decision_tasks.poll(@workflowId) do | task | task.new_events.each do | event | case event.event_type

Nous allons maintenant connecter l'exécution de notre décideur en fonction de l'event_type reçu. Le premier type d'événement que nous sommes susceptibles de recevoir est WorkflowExecutionStarted. En cas de réception de cet événement, signale à votre décideur qu'Amazon SWF signale à votre décideur qu'il doit commencer l'exécution du flux de travail. Nous allons commencer par planifier la première activité. Pour ce faire, nous appelons schedule_activity_task au niveau de la tâche que nous avons reçu lors de la recherche.

Nous lui transmettons la première activité que nous avons déclarée dans notre liste d'activités, qui occupe la position last dans la liste, car nous avons inversé cette dernière pour l'utiliser telle une pile. Les « activités » que nous avons définies sont seulement des cartes composées d'un nom et d'un numéro de version, mais c'est tout ce dont a besoin Amazon SWF pour identifier l'activité à planifier, en supposant qu'elle ait déjà été enregistrée.

when 'WorkflowExecutionStarted' # schedule the last activity on the (reversed, remember?) list to # begin the workflow. puts "** scheduling activity task: #{@activity_list.last[:name]}" task.schedule_activity_task( @activity_list.last, { :workflowId => "#{@workflowId}-activities" } )

Lorsque nous planifions une activité, Amazon SWF envoie untâche d'activitéà la liste des tâches d'activité que nous transmettons lors de sa planification, indiquant ainsi à la tâche de commencer. Nous nous intéresserons aux tâches d'activité dans la section Troisième partie du didacticiel sur le flux de travail Implémentation des activités, mais il est important de noter que nous n'exécutons pas la tâche ici. Nous indiquons seulement à Amazon SWF qu'elle doit êtreprévu.

La prochaine activité que nous devrons aborder est laActivityTaskCompleted, qui se produit lorsqu'Amazon SWF a reçu une réponse d'activité terminée d'une tâche d'activité.

when 'ActivityTaskCompleted' # we are running the activities in strict sequential order, and # using the results of the previous activity as input for the next # activity. last_activity = @activity_list.pop if(@activity_list.empty?) puts "!! All activities complete! Sending complete_workflow_execution..." task.complete_workflow_execution return true; else # schedule the next activity, passing any results from the # previous activity. Results will be received in the activity # task. puts "** scheduling activity task: #{@activity_list.last[:name]}" if event.attributes.has_key?('result') task.schedule_activity_task( @activity_list.last, { :input => event.attributes[:result], :workflowId => "#{@workflowId}-activities" } ) else task.schedule_activity_task( @activity_list.last, { :workflowId => "#{@workflowId}-activities" } ) end end

Dans la mesure où nous exécutons nos tâches de façon linéaire et qu'une seule activité est exécutée à la fois, nous profitons de cette occasion pour afficher la tâche terminée à partir de la pile activity_list. Si une liste vide apparaît, nous savons que le flux de travail est terminé. Dans ce cas, nous signalons à Amazon SWF que notre flux de travail est terminé en appelantcomplete_workflow_exécutionsur la tâche.

Dans le cas où la liste contiendrait encore des entrées, nous allons planifier l'activité suivante qui s'y trouve (une fois encore, en dernière position). Cette fois-ci, cependant, nous allons voir si l'activité précédente a renvoyé des données de résultat à Amazon SWF à la fin, ce qui est fourni au flux de travail dans les attributs de l'événement, dans le manuel facultatifresultclé. Si l'activité a généré un résultat, nous le transmettrons comme option input à la prochaine activité planifiée, avec la liste des tâches d'activité.

En récupérant les valeurs result des activités terminées et en définissant les valeurs input des activités planifiées, nous pouvons transférer les données d'une activité à l'autre, ou nous pouvons utiliser les données d'une activité pour modifier le comportement du décideur en fonction des résultats d'une activité.

Pour les besoins de ce didacticiel, ces deux types d'événements sont les plus importants pour définir le comportement de notre flux de travail. Toutefois, une activité peut générer des événements autres qu'ActivityTaskCompleted. Nous terminerons notre code décideur en fournissant un code de gestionnaire de démonstration pour leActivityTaskTimedOutetÉchec de la tâche d'activitéévénements, et pour leExécution du flux de travail terminée, qui sera généré lorsque Amazon SWF traitera lecomplete_workflow_executionappel que nous faisons lorsque nous manquons d'activités à exécuter.

when 'ActivityTaskTimedOut' puts "!! Failing workflow execution! (timed out activity)" task.fail_workflow_execution return false when 'ActivityTaskFailed' puts "!! Failing workflow execution! (failed activity)" task.fail_workflow_execution return false when 'WorkflowExecutionCompleted' puts "## Yesss, workflow execution completed!" task.workflow_execution.terminate return false end end end end

Lancement de l'exécution du flux de travail

Avant que le flux de travail puisse rechercher des tâches de décision, nous devons lancer l'exécution du flux de travail.

Pour démarrer l'exécution de flux de travail, appelez start_execution au niveau du type de flux de travail enregistré (AWS::SimpleWorkflow::WorkflowType). Nous allons définir un petit wrapper à ce niveau afin d'exploiter le membre d'instance workflow_type que nous avons récupéré dans le constructeur de classe.

def start_execution workflow_execution = @workflow_type.start_execution( { :workflowId => @workflowId } ) poll_for_decisions end end

Une fois que le flux de travail s'exécute, les événements décision commencent à apparaître dans la liste des tâches correspondante, qui est transmise comme une option d'exécution du flux de travail dans start_execution.

Contrairement aux options qui sont fournies lorsque le type de flux de travail est enregistré, celles qui sont transmises à start_execution ne sont pas considérées comme faisant partie du type de flux de travail. Vous êtes libre de les modifier pour chaque exécution du flux de travail sans avoir à changer de version du flux de travail.

Dans la mesure où nous souhaiterions que le flux de travail se lance lorsque nous exécutons le fichier, ajoutez le code qui instancie la classe, puis qui appelle la méthode start_execution que nous venons de définir.

if __FILE__ == $0 require 'securerandom' # Use a different task list name every time we start a new workflow execution. # # This avoids issues if our pollers re-start before SWF considers them closed, # causing the pollers to get events from previously-run executions. workflowId = SecureRandom.uuid # Let the user start the activity worker first... puts "" puts "Amazon SWF Example" puts "------------------" puts "" puts "Start the activity worker, preferably in a separate command-line window, with" puts "the following command:" puts "" puts "> ruby swf_sns_activities.rb #{workflowId}-activities" puts "" puts "You can copy & paste it if you like, just don't copy the '>' character." puts "" puts "Press return when you're ready..." i = gets # Now, start the workflow. puts "Starting workflow execution." sample_workflow = SampleWorkflow.new(workflowId) sample_workflow.start_execution end

Pour éviter tout conflit de nom dans la liste des tâches, nous utiliserons SecureRandom.uuid pour générer un UUID aléatoire que nous utiliserons comme nom de la liste de tâches. Nous garantissons ainsi qu'un nom de liste de tâches différent s'applique à chaque exécution de flux de travail.

Note

Les listes de tâches permettent d'enregistrer les événements concernant une exécution de flux de travail. Donc, si vous utilisez la même liste des tâches pour plusieurs exécutions du même type de flux de travail, vous pouvez obtenir des événements qui ont été générés au cours d'une exécution précédente, surtout si vous avez procédé de manière quasi consécutive (ce qui est souvent le cas lorsque vous effectuez des tests, comme celui d'un nouveau code).

Pour éviter d'avoir à gérer les éléments issues des précédentes exécutions, nous pouvons utiliser une nouvelle liste des tâches pour chaque exécution, en le spécifiant lorsque nous commençons l'exécution du flux de travail.

Un peu de code est également nécessaire ici pour fournir des instructions à la personne chargée de l'exécution (vous, dans la plupart des cas) et pour fournir la version d'« activité » de la liste des tâches. Le décideur utilisera le nom de cette liste de tâches pour planifier les activités du flux de travail, tandis que la mise en œuvre des activités écoutera les événements d'activité correspondant à cette liste pour savoir quand commencer les activités planifiées et pour fournir des informations sur l'exécution de l'activité.

Le code attend également que l'utilisateur commence à exécuter le démarreur d'activités avant de lancer l'exécution du flux de travail. Le démarreur d'activités sera donc en mesure de réagir lorsque les tâches d'activité commenceront à apparaître dans la liste des tâches fournie.

Étapes suivantes

Vous avez implémenté le flux de travail. Vous définirez ensuite les activités et un démarreur, dans la section Troisième partie du didacticiel sur le flux de travail Implémentation des activités.