Parte 2 del tutorial sul flusso di lavoro di abbonamento implementazione del flusso di lavoro - Amazon Simple Workflow Service

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Parte 2 del tutorial sul flusso di lavoro di abbonamento implementazione del flusso di lavoro

Il codice che abbiamo creato fino a ora è alquanto generico. In questa parte del tutorial cominceremo quindi a definire realmente la funzione del nostro flusso di lavoro e le attività necessarie per implementarla.

Progettazione del flusso di lavoro

L'idea iniziale di questo flusso di lavoro comprendeva le seguenti fasi:

  1. Ricevere un indirizzo di sottoscrizione (e-mail o SMS) dall'utente.

  2. Creare un argomento SNS e sottoscrivervi gli endpoint disponibili.

  3. Attendere che l'utente confermi la sottoscrizione.

  4. In caso di conferma dell'utente, pubblica un messaggio di congratulazioni sull'argomento.

Possiamo considerare ogni fase del nostro flusso di lavoro come un'attività che deve eseguire. Il flusso di lavoro ha la responsabilità di pianificare ogni attività al momento opportuno e di coordinare il trasferimento di dati tra le attività.

Per questo flusso di lavoro, creeremo un'attività distinta per ogni fase, a cui assegneremo i nomi descrittivi seguenti:

  1. get_contact_activity

  2. subscribe_topic_activity

  3. wait_for_confirmation_activity

  4. send_result_activity

Queste attività saranno eseguite nell'ordine in cui sono elencate e i dati di ogni fase saranno utilizzati nella fase successiva.

Potremmo progettare la nostra applicazione in modo che tutto il codice si trovi in un unico file di origine, ma questo approccio è contrario alla scopo per il quale è stato creato Amazon SWF. ovvero per flussi di lavoro il cui ambito copre l'integralità di Internet. Di conseguenza, suddividiamo l'applicazione in due eseguibili distinti:

  • swf_sns_workflow.rb – Contiene il flusso di lavoro e il relativo starter.

  • swf_sns_activities.rb – Contiene le attività e il relativo starter.

Le implementazioni di flusso di lavoro e attività possono essere eseguite in finestre o computer distinti o addirittura in differenti aree del mondo. Poiché Amazon SWF tiene traccia dei dettagli dei flussi di lavoro e delle attività, il flusso di lavoro può coordinare la pianificazione e il trasferimento dei dati delle attività indipendentemente dal luogo in cui sono eseguite.

Configurazione del codice del flusso di lavoro

Per prima cosa, creeremo un file denominato swf_sns_workflow.rb. In questo file, devi dichiarare una classe denominata SampleWorkflow. Di seguito è riportata la dichiarazione di classe e il relativo costruttore, il metodo initialize.

require_relative 'utils.rb' # SampleWorkflow - the main workflow for the SWF/SNS Sample # # See the file called `README.md` for a description of what this file does. class SampleWorkflow attr_accessor :name def initialize(workflowId) # the domain to look for decision tasks in. @domain = init_domain # the task list is used to poll for decision tasks. @workflowId = workflowId # The list of activities to run, in order. These name/version hashes can be # passed directly to AWS::SimpleWorkflow::DecisionTask#schedule_activity_task. @activity_list = [ { :name => 'get_contact_activity', :version => 'v1' }, { :name => 'subscribe_topic_activity', :version => 'v1' }, { :name => 'wait_for_confirmation_activity', :version => 'v1' }, { :name => 'send_result_activity', :version => 'v1' }, ].reverse! # reverse the order... we're treating this like a stack. register_workflow end

Come puoi vedere, conserviamo i seguenti dati dell'istanza di classe:

  • domain – Il nome di dominio recuperato da init_domain in utils.rb.

  • workflowId – L'elenco di task passato a initialize.

  • activity_list – L'elenco di attività, con i nomi e le versioni delle attività che eseguiremo.

Il nome di dominio, il nome e la versione dell'attività sono sufficienti per consentire ad Amazon SWF di identificare positivamente un tipo di attività. Questi sono quindi i soli dati sulle attività di cui abbiamo bisogno per pianificarle.

L'elenco di task verrà utilizzato dal codice decisore del flusso di lavoro per eseguire il polling dei task di decisione e delle attività di pianificazione.

Al termine di questa funzione, chiamiamo un metodo che non abbiamo ancora definito, ovvero register_workflow. Definiremo questo metodo in seguito.

Registrazione del flusso di lavoro

Per utilizzare un tipo di flusso di lavoro, dobbiamo prima registrarlo. Come un tipo di attività, un tipo di flusso di lavoro è identificato dal relativo dominio, nome e versione. Inoltre, come per i domini e i tipi di attività, non è possibile registrare di nuovo un tipo di flusso di lavoro esistente. Se hai la necessità di modificare un tipo di flusso di lavoro, devi farlo mediante una nuova versione, che in pratica crea un nuovo tipo.

Di seguito è riportato il codice register_workflow, che utilizziamo per recuperare il tipo di flusso di lavoro esistente registrato durante un'esecuzione precedente oppure per registrare il flusso di lavoro se questa operazione non è ancora stata eseguita.

# Registers the workflow def register_workflow workflow_name = 'swf-sns-workflow' @workflow_type = nil # a default value... workflow_version = '1' # Check to see if this workflow type already exists. If so, use it. @domain.workflow_types.each do | a | if (a.name == workflow_name) && (a.version == workflow_version) @workflow_type = a end end if @workflow_type.nil? options = { :default_child_policy => :terminate, :default_task_start_to_close_timeout => 3600, :default_execution_start_to_close_timeout => 24 * 3600 } puts "registering workflow: #{workflow_name}, #{workflow_version}, #{options.inspect}" @workflow_type = @domain.workflow_types.register(workflow_name, workflow_version, options) end puts "** registered workflow: #{workflow_name}" end

Per prima cosa, verifichiamo se il nome e la versione del flusso di lavoro sono già registrati scorrendo la raccolta workflow_types del dominio. Se troviamo una corrispondenza, utilizzeremo il tipo di flusso di lavoro già registrato.

In caso contrario, viene registrato un nuovo tipo di flusso di lavoro (chiamando register sulla stessa raccolta workflow_types in cui abbiamo cercato il flusso di lavoro) con il nome "swf-sns-workflow", version "1" e le opzioni seguenti.

options = { :default_child_policy => :terminate, :default_task_start_to_close_timeout => 3600, :default_execution_start_to_close_timeout => 24 * 3600 }

Le opzioni passate durante la registrazione sono utilizzate per impostare il comportamento di default del tipo di flusso di lavoro. Di conseguenza, non abbiamo bisogno di impostare questi valori ogni volta che avviamo una nuova esecuzione di flusso di lavoro.

Qui impostiamo soltanto alcuni valori di timeout: il periodo di tempo massimo tra l'avvio di un task e la chiusura dello stesso (un'ora) e la durata massima dell'esecuzione di flusso di lavoro (24 ore). Se uno di questi valori viene superato, si verifica il timeout del task o del flusso di lavoro.

Per ulteriori informazioni sui valori di timeout, consulta Tipi di timeout di Amazon SWF .

Polling delle decisioni

Al centro di ogni esecuzione di flusso di lavoro si trova un decisore. Il ruolo del decisore è di gestire l'esecuzione del flusso di lavoro. Il decisore riceve i task di decisione e risponde agli stessi pianificando nuove attività, annullando e riavviando attività o definendo lo stato dell'esecuzione di flusso di lavoro come completa, annullata o non riuscita.

Il decisore utilizza il nome dell'elenco di task del esecuzione di flusso di lavoro per ricevere task di decisione a cui rispondere. Per eseguire il polling dei task di decisione, chiama poll sulla raccolta decision_tasks del dominio per scorrere i task di decisione disponibili. Successivamente, puoi cercare nuovi eventi nel task di decisione scorrendo la relativa raccolta new_events.

Gli eventi restituiti sono oggetti AWS::SimpleWorkflow::HistoryEvent ed è possibile ottenere il tipo di evento utilizzando il membro event_type dell'evento restituito. Per un elenco e una descrizione dei tipi di evento della cronologia, consultaHistoryEventnellaRiferimento API di Amazon Simple Workflow Service.

Di seguito viene riportato l'inizio della logica del poller dei task di decisione. Un nuovo metodo nella nostra classe di flusso di lavoro denominato poll_for_decisions.

def poll_for_decisions # first, poll for decision tasks... @domain.decision_tasks.poll(@workflowId) do | task | task.new_events.each do | event | case event.event_type

Ora creeremo diramazioni dell'esecuzione del decisore in base al valore event_type ricevuto. Il primo tipo di evento che probabilmente riceveremo è WorkflowExecutionStarted. Se è così, significa che Amazon SWF sta segnalando al decisore che deve iniziare l'esecuzione del flusso di lavoro. Cominceremo quindi col pianificare la prima attività chiamando schedule_activity_task sul task ricevuto durante il polling.

A questo metodo passeremo la prima attività dichiarata nel nostro elenco di attività. Questa attività occupa la posizione last nell'elenco, in quanto abbiamo invertito quest'ultimo per poterlo utilizzare come stack. Le «attività» che abbiamo definito sono soltanto mappe composte da un nome e da un numero di versione, ma è tutto ciò di cui necessita Amazon SWF per identificare l'attività da pianificare, supponendo che sia già stata registrata.

when 'WorkflowExecutionStarted' # schedule the last activity on the (reversed, remember?) list to # begin the workflow. puts "** scheduling activity task: #{@activity_list.last[:name]}" task.schedule_activity_task( @activity_list.last, { :workflowId => "#{@workflowId}-activities" } )

Quando pianifichiamo un'attività, Amazon SWF invia untask di attivitàall'elenco di task di attività che passiamo durante la pianificazione dello stesso, indicando in questo modo il task da avviare. I task di attività sono descritti nella sezione Terza parte della esercitazione sul flusso di lavoro Implementazione delle attività, ma vale comunque la pena segnalare che non eseguiamo il task in questa fase. Indichiamo solamente ad Amazon SWF che deve esserepianificata.

La prossima attività che dovremo affrontare è laActivityTaskCompletedevento, che si verifica quando Amazon SWF ha ricevuto una risposta completata da un'attività di attività.

when 'ActivityTaskCompleted' # we are running the activities in strict sequential order, and # using the results of the previous activity as input for the next # activity. last_activity = @activity_list.pop if(@activity_list.empty?) puts "!! All activities complete! Sending complete_workflow_execution..." task.complete_workflow_execution return true; else # schedule the next activity, passing any results from the # previous activity. Results will be received in the activity # task. puts "** scheduling activity task: #{@activity_list.last[:name]}" if event.attributes.has_key?('result') task.schedule_activity_task( @activity_list.last, { :input => event.attributes[:result], :workflowId => "#{@workflowId}-activities" } ) else task.schedule_activity_task( @activity_list.last, { :workflowId => "#{@workflowId}-activities" } ) end end

Poiché eseguiamo i task in modo lineare e una sola attività viene eseguita in un dato momento, è il momento giusto per prelevare il task completato dallo stack activity_list. Se viene restituito un elenco vuoto, significa che il nostro flusso di lavoro è completato. In tal caso, segnaliamo questa condizione ad Amazon SWF che il flusso di lavoro è completo chiamandocomplete_workflow_executionsul compito.

Se invece l'elenco non è vuoto, pianificheremo l'attività successiva (sempre in ultima posizione). Questa volta, tuttavia, verificheremo se l'attività precedente ha restituito dati di risultato ad Amazon SWF al termine, che sono forniti al flusso di lavoro negli attributi dell'evento, nell'opzione facoltativaresultchiave. Se l'attività ha generato un risultato, lo passeremo come opzione input all'attività pianificata successiva, insieme all'elenco dei task di attività.

Recuperando i valori result delle attività completate e impostando i valori input delle attività pianificate, possiamo passare dati da un'attività a quella successiva oppure utilizzare i dati di un'attività per modificare il comportamento del decisore in base ai risultati di un'attività.

In questo tutorial, questi due tipi di evento sono i più importanti per definire il comportamento del flusso di lavoro. Tuttavia, un'attività può generare eventi differenti da ActivityTaskCompleted. Concluderemo il nostro codice decider fornendo il codice del gestore dimostrativo per ilActivityTaskTimedOuteAttività di attività non riuscitaeventi e per ilEsecuzione del flusso di lavoro completataevento, che verrà generato quando Amazon SWF elabora ilcomplete_workflow_executionchiamata che facciamo quando esauriamo le attività da eseguire.

when 'ActivityTaskTimedOut' puts "!! Failing workflow execution! (timed out activity)" task.fail_workflow_execution return false when 'ActivityTaskFailed' puts "!! Failing workflow execution! (failed activity)" task.fail_workflow_execution return false when 'WorkflowExecutionCompleted' puts "## Yesss, workflow execution completed!" task.workflow_execution.terminate return false end end end end

Avvio dell'esecuzione di flusso di lavoro

Per consentire al flusso di lavoro di eseguire il polling dei task di decisione, dobbiamo dapprima avviare l'esecuzione di flusso di lavoro.

A questo proposito, chiama start_execution sul tipo di flusso di lavoro registrato (AWS::SimpleWorkflow::WorkflowType). Per utilizzare il membro di istanza workflow_type che abbiamo recuperato nel costruttore della classe, definiremo un piccolo wrapper.

def start_execution workflow_execution = @workflow_type.start_execution( { :workflowId => @workflowId } ) poll_for_decisions end end

Una volta che il flusso di lavoro è in esecuzione, gli eventi di decisione cominceranno a apparire nell'elenco di task corrispondente, che viene passato come opzione dell'esecuzione di flusso di lavoro in start-execution.

A differenza delle opzioni che vengono fornite quando il tipo di flusso di lavoro è registrato, le opzioni passate a start_execution non sono considerate come facenti parte del tipo di flusso di lavoro. Sei libero di modificarle per ogni esecuzione di flusso di lavoro senza dover cambiare la versione del flusso di lavoro.

Poiché vogliamo che il flusso di lavoro venga avviato quando eseguiamo il file, aggiungiamo del codice che crea un'istanza della classe e quindi chiama il metodo start_execution appena definito.

if __FILE__ == $0 require 'securerandom' # Use a different task list name every time we start a new workflow execution. # # This avoids issues if our pollers re-start before SWF considers them closed, # causing the pollers to get events from previously-run executions. workflowId = SecureRandom.uuid # Let the user start the activity worker first... puts "" puts "Amazon SWF Example" puts "------------------" puts "" puts "Start the activity worker, preferably in a separate command-line window, with" puts "the following command:" puts "" puts "> ruby swf_sns_activities.rb #{workflowId}-activities" puts "" puts "You can copy & paste it if you like, just don't copy the '>' character." puts "" puts "Press return when you're ready..." i = gets # Now, start the workflow. puts "Starting workflow execution." sample_workflow = SampleWorkflow.new(workflowId) sample_workflow.start_execution end

Per evitare qualsiasi conflitto di denominazione negli elenchi di task, mediante SecureRandom.uuid genereremo un UUID aleatorio che possiamo utilizzare come nome di elenco di task. In questo modo, garantiremo l'utilizzo di un nome di elenco di task differente per ogni esecuzione di flusso di lavoro.

Nota

Gli elenchi di task sono utilizzati per registrare eventi relativi a un'esecuzione di flusso di lavoro. Se quindi utilizzi lo stesso elenco di task per molteplici esecuzioni dello stesso tipo di flusso di lavoro, puoi ottenere eventi generati durante un'esecuzione precedente, soprattutto se le esegui quasi in successione, condizione piuttosto frequente quando provi nuovo codice o effettui dei test.

Per evitare il problema di dover gestire elementi di esecuzioni precedenti, possiamo utilizzare un nuovo elenco di task per ogni esecuzione, definendolo quando iniziamo l'esecuzione di flusso di lavoro.

Il codice fornisce anche istruzioni alla persona responsabile dell'esecuzione (tu nella maggior parte dei casi) e la versione di "attività" dell'elenco di task. Il decisore utilizzerà queste nome di elenco di task per pianificare attività per il flusso di lavoro, mentre l'implementazione di attività rimarrà in attesa di eventi di attività su questo nome di elenco di task per sapere quando iniziare le attività pianificate e per fornire aggiornamenti sull'esecuzione dell'attività.

Il codice attende inoltre che l'utente inizi l'esecuzione dello starter di attività prima dell'esecuzione di flusso di lavoro, di modo che lo starter sia in grado di rispondere quando i task di attività cominciano a apparire nell'elenco di task fornito.

Fasi successive

L'implementazione del flusso di lavoro è completata. Successivamente, definirai le attività e uno starter di attività nella sezione Terza parte della esercitazione sul flusso di lavoro Implementazione delle attività.