Abonnement-Workflow, Anleitung Teil 2: Implementieren des Workflows - Amazon Simple Workflow Service

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Abonnement-Workflow, Anleitung Teil 2: Implementieren des Workflows

Bisher war unser Code sehr allgemein. In diesem Teil beginnen wir mit dem Definieren der Vorgehensweise des Workflows und der zu implementierenden Aktivitäten.

Entwerfen des Workflows

Wie Sie sich sicherlich erinnern, bestand das ursprüngliche Konzept des Workflows aus den folgenden Schritten:

  1. Abrufen einer Abonnementadresse (E-Mail oder SMS) des Benutzers.

  2. Erstellen Sie ein SNS-Thema und abonnieren Sie es für die bereitgestellten Endpunkte.

  3. Warten Sie darauf, dass der Benutzer das Abonnement bestätigt.

  4. Veröffentlichen Sie eine Glückwunschnachricht unter dem Thema, sobald der Benutzer die Bestätigung ausgeführt hat.

Jeder Schritt in unserem Workflow stellt eine Aktivität dar, die ausgeführt werden muss. Der Workflow dient zum Planen der einzelnen Aktivitäten zu den entsprechenden Zeitpunkten und dem Koordinieren des Datentransfers zwischen den Aktivitäten.

Für diesen Workflow generieren wir für die einzelnen Schritte jeweils eine separate Aktivität und benennen sie entsprechend:

  1. get_contact_activity

  2. subscribe_topic_activity

  3. wait_for_confirmation_activity

  4. send_result_activity

Diese Aktivitäten werden der Reihenfolge nach ausgeführt. Dabei nutzt der jeweils nachfolgende Schritt die Daten des vorherigen Schritts.

Wir könnten unsere Anwendung so entwerfen, dass der gesamte Code in einer Quelldatei enthalten ist. Das würde aber nicht der Konstruktion von Amazon SWF entsprechen. Amazon SWF wurde für Workflows entworfen, die den gesamten Internetbereich umfassen. Deshalb sollte die Anwendung in mindestens zwei separate ausführbare Programme unterteilt werden:

  • swf_sns_workflow.rb – Enthält den Workflow und den Workflow-Starter.

  • swf_sns_activities.rb – Enthält die Aktivitäten und den Aktivitäten-Starter.

Der Workflow und das Implementieren der Aktivitäten können in unterschiedlichen Fenstern, auf unterschiedlichen Computern und sogar in unterschiedlichen Teilen der Welt ausgeführt werden. Da Amazon SWF die Details der Workflows und Aktivitäten verfolgt, kann Ihr Workflow die Planung und den Datentransfer Ihrer Aktivitäten unabhängig vom Ausführungsort koordinieren.

Einrichten des Workflow-Codes

Als Erstes erstellen wir eine Datei namens swf_sns_workflow.rb. In dieser deklarieren wir eine Klasse mit dem Namen SampleWorkflow. Hier sehen Sie die Klassendeklaration und den dazugehörigen Konstruktor, die initialize-Methode.

require_relative 'utils.rb' # SampleWorkflow - the main workflow for the SWF/SNS Sample # # See the file called `README.md` for a description of what this file does. class SampleWorkflow attr_accessor :name def initialize(workflowId) # the domain to look for decision tasks in. @domain = init_domain # the task list is used to poll for decision tasks. @workflowId = workflowId # The list of activities to run, in order. These name/version hashes can be # passed directly to AWS::SimpleWorkflow::DecisionTask#schedule_activity_task. @activity_list = [ { :name => 'get_contact_activity', :version => 'v1' }, { :name => 'subscribe_topic_activity', :version => 'v1' }, { :name => 'wait_for_confirmation_activity', :version => 'v1' }, { :name => 'send_result_activity', :version => 'v1' }, ].reverse! # reverse the order... we're treating this like a stack. register_workflow end

Wie Sie sehen, behalten wir die folgenden Daten der Klassen-Instance bei:

  • domain – Den von init_domain in utils.rb abgerufenen Domänennamen.

  • workflowId – Die an initialize übergebene Aufgabenliste.

  • activity_list – Die Aktivitätenliste mit den Namen und Versionen der Aktivitäten, die wir ausführen werden.

Der Domänennamen, der Aktivitätsname und die Aktivitätsversion reichen aus, um Amazon SWF einen Aktivitätstyp zu identifizieren. Somit stehen alle Daten zur Verfügung, die für das Einplanen der Aktivitäten erforderlich sind.

Die Aufgabenliste wird vom Entscheider-Code des Workflows zum Abfragen von Entscheidungsaufgaben und Planen der Aktivitäten verwendet.

Am Ende dieser Funktion rufen wir eine Methode auf, die wir noch nicht definiert haben: register_workflowaus. Wir definieren diese Methode später.

Registrieren des Workflows

Wir müssen einen Workflow-Typ erst registrieren, damit wir ihn verwenden können. Genau wir ein Aktivitätstyp wird auch ein Workflow-Typ anhand seiner Domäne, seines Namens und seiner Version identifiziert. Und genau wie Domänen- und Aktivitätstypen können Sie einen vorhandenen Workflow-Typ nicht erneut registrieren. Wenn Sie den Workflow-Typ ändern müssen, müssen Sie ihn mit einer neuen Version zur Verfügung stellen, wodurch ein neuer Typ generiert wird.

Hier ist der Code für register_workflow. Dieser wird entweder zum Abrufen des vorhandenen Workflow-Typs genutzt, den wir während einer vorherigen Ausführung registriert haben, oder zum Registrieren des Workflows, sofern dies noch nicht geschehen ist.

# Registers the workflow def register_workflow workflow_name = 'swf-sns-workflow' @workflow_type = nil # a default value... workflow_version = '1' # Check to see if this workflow type already exists. If so, use it. @domain.workflow_types.each do | a | if (a.name == workflow_name) && (a.version == workflow_version) @workflow_type = a end end if @workflow_type.nil? options = { :default_child_policy => :terminate, :default_task_start_to_close_timeout => 3600, :default_execution_start_to_close_timeout => 24 * 3600 } puts "registering workflow: #{workflow_name}, #{workflow_version}, #{options.inspect}" @workflow_type = @domain.workflow_types.register(workflow_name, workflow_version, options) end puts "** registered workflow: #{workflow_name}" end

Zunächst prüfen wir, ob der Workflow-Name und die Version bereits registriert sind. Dazu iterieren wir durch die workflow_types-Sammlung der Domäne. Bei einer Übereinstimmung nutzen wir den bereits registrierten Workflow-Typ.

Wird keine Übereinstimmung gefunden, wird ein neuer Workflow-Typ registriert (durch Aufruf von register in derselben workflow_types-Sammlung, in der wir den Workflow gesucht haben). Diesem werden der Name „swf-sns-workflow“, Version „1“ und die folgenden Optionen zugewiesen.

options = { :default_child_policy => :terminate, :default_task_start_to_close_timeout => 3600, :default_execution_start_to_close_timeout => 24 * 3600 }

Die während der Registrierung übergebenen Optionen werden zum Festlegen des Standardverhaltens unseres Workflow-Typs genutzt, sodass wir diese Werte nicht bei jeder neuen Workflow-Ausführung einstellen müssen.

In diesem Beispiel legen wir nur einige Timeout-Werte fest: Die maximale Dauer zwischen dem Start und der Beendigung einer Aufgabe (1 Stunde) und die Zeit, die es maximal dauert, bis die Workflow-Ausführung abgeschlossen ist (24 Stunden). Wird eine dieser Zeitgrenzen überschritten, kommt es zu einem Aufgaben- oder Workflow-Timeout.

Weitere Informationen zur Timeout-Werten finden Sie unter Amazon SWF-Timeout-Typen .

Abrufen von Entscheidungen

Das Herzstück einer jeden Workflow-Ausführung stellt der Entscheider dar. Der Entscheider ist für die Verwaltung der Ausführung des Workflows selbst verantwortlich. Er empfängt Entscheidungsaufgaben und reagiert auf diese, indem er entweder neue Aktivitäten plant, Aktivitäten abricht oder neu startet oder den Status der Workflow-Ausführung auf abgeschlossen, abgebrochen oder fehlgeschlagen setzt.

Der Entscheider nutzt den Namen der Aufgabenliste der Workflow-Ausführung, um Entscheidungsaufgaben abzurufen, auf die er reagieren muss. Zum Abrufen von Entscheidungsaufgaben rufen Sie die Abfrage https://docs.aws.amazon.com/AWSRubySDK/latest/AWS/SimpleWorkflow/DecisionTaskCollection.html#poll-instance_method in der decision_tasks-Sammlung der Domäne auf, um nach verfügbaren Entscheidungsaufgaben zu suchen. Sie können neue Ereignisse in der Entscheidungsaufgabe finden, indem Sie eine Iteration über der new_events-Sammlung starten.

Bei den zurückgegebenen Ereignissen handelt es sich um AWS::SimpleWorkflow::HistoryEvent-Objekte. Den Ereignistyp erhalten Sie über das zurückgegebene event_type-Mitglied des Ereignisses. Eine Liste sowie eine Beschreibung der Verlaufsereignistypen finden Sie unterhistoryEventimAmazon Simple Workflow Service API Referenzaus.

Nachfolgend sehen Sie den Anfang der Logik des Entscheidungsaufgaben-Pollers. Eine neue Methode in unserer Workflow-Klasse namens poll_for_decisions.

def poll_for_decisions # first, poll for decision tasks... @domain.decision_tasks.poll(@workflowId) do | task | task.new_events.each do | event | case event.event_type

Wir verzweigen nun die Ausführung unseres Entscheiders basierend auf dem empfangenen event_type. Zunächst erhalten wir vermutlich WorkflowExecutionStarted. Wurde das Ereignis empfangen, signalisiert Amazon SWF Entscheider, dass er mit der Workflow-Ausführung beginnen soll. Wir beginnen mit dem Planen der ersten Aktivität, indem wir schedule_activity_task für die Aufgabe aufrufen, die wir während des Abrufens empfangen haben.

Wir übergeben sie als erste in unserer Aktivitätsliste deklarierten Aktivität, die, da wir die Listenreihenfolge umgedreht haben, sodass sie als Stack verwendet werden kann, die last Position in der Liste einnimmt. Die von uns definierten „Aktivitäten“ sind lediglich Maps und bestehen aus einem Namen und einer Versionsnummer. Das ist jedoch alles, was Amazon SWF benötigt, um die Aktivität für die Planung zu identifizieren, sofern die Aktivität bereits registriert wurde.

when 'WorkflowExecutionStarted' # schedule the last activity on the (reversed, remember?) list to # begin the workflow. puts "** scheduling activity task: #{@activity_list.last[:name]}" task.schedule_activity_task( @activity_list.last, { :workflowId => "#{@workflowId}-activities" } )

Wenn wir eine Aktivität planen, sendet Amazon SWF eineAktivitäts-Aufgabezur Aktivitätsaufgabenliste, die wir beim Planen übergeben, um den Beginn der Aufgabe zu signalisieren. Wir beschäftigen uns in Abonnement-Workflow, Tutorial Teil 3: Implementieren der Aktivitäten mit Aktivitätsaufgaben, an dieser Stelle sollte aber erwähnt werden, dass wir hier die Aufgabe nicht ausführen. Wir weisen Amazon SWF nur an, dass es so sein solltegeplantaus.

Die nächste Aktivität, die wir ansprechen müssen, ist dieActivityTaskCompleted-Ereignis, das auftritt, wenn Amazon SWF von einer Aktivitätsaufgabe erhalten hat.

when 'ActivityTaskCompleted' # we are running the activities in strict sequential order, and # using the results of the previous activity as input for the next # activity. last_activity = @activity_list.pop if(@activity_list.empty?) puts "!! All activities complete! Sending complete_workflow_execution..." task.complete_workflow_execution return true; else # schedule the next activity, passing any results from the # previous activity. Results will be received in the activity # task. puts "** scheduling activity task: #{@activity_list.last[:name]}" if event.attributes.has_key?('result') task.schedule_activity_task( @activity_list.last, { :input => event.attributes[:result], :workflowId => "#{@workflowId}-activities" } ) else task.schedule_activity_task( @activity_list.last, { :workflowId => "#{@workflowId}-activities" } ) end end

Da wir unsere Aufgaben linear ausführen und nur jeweils eine Aktivität zurzeit ausgeführt wird, nutzen wir die Gelegenheit, die gesamte Aufgabe aus dem activity_list-Stack abzurufen. Führt dies zu einer leeren Liste, wissen wir, dass unser Workflow abgeschlossen ist. In diesem Fall signalisieren wir Amazon SWF, dass unser Workflow durch Aufrufen abgeschlossen istcomplete_workflow_executionüber die Aufgabe.

Enthält die Liste noch Einträge, planten wir die nächste Aktivität auf der Liste (auch hier an letzter Stelle). Dieses Mal prüfen wir jedoch, ob die vorherige Aktivität nach dem Abschluss Ergebnisdaten an Amazon SWF zurückgegeben hat, die dem Workflow in den Attributen des Ereignisses zur Verfügung gestellt werden, im optionalenresultkey. Wurde durch die Aktivität ein Ergebnis generiert, übergeben wir dieses als input-Option zusammen mit der Aktivitätsaufgabenliste an die nächste geplante Aktivität.

Durch Abruf der result-Werte abgeschlossener Aktivitäten und Festlegen derinput-Werte geplanter Aktivitäten, können wir Daten von einer Aktivität an die nächste übergeben oder Daten aus einer Aktivität nutzen, um das Verhalten unseres Entscheiders basierend auf den Ergebnissen einer Aktivität zu ändern.

Im Rahmen dieser Anleitung stellen diese beiden Ereignistypen die wichtigsten Typen in Bezug auf das Definieren des Workflow-Verhaltens dar. Dennoch kann eine Aktivität andere Ereignisse als ActivityTaskCompleted generieren. Wir werden unseren Entscheidercode abschließen, indem wir den Demo-Handler-Code für dieActivityTaskTimedOutundActivityTaskFailedEvents und für dieWorkflowexEcutionCompleted-Ereignis, das generiert wird, wenn Amazon SWFcomplete_workflow_executionrufen Sie an, die wir machen, wenn uns die Aktivitäten ausgehen.

when 'ActivityTaskTimedOut' puts "!! Failing workflow execution! (timed out activity)" task.fail_workflow_execution return false when 'ActivityTaskFailed' puts "!! Failing workflow execution! (failed activity)" task.fail_workflow_execution return false when 'WorkflowExecutionCompleted' puts "## Yesss, workflow execution completed!" task.workflow_execution.terminate return false end end end end

Starten der Workflow-Ausführung

Ehe Entscheidungsaufgaben für den Workflow generiert werden, die abgerufen werden können, müssen wir die Workflow-Ausführung starten.

Zum Starten der Workflow-Ausführung rufen Sie start_execution für Ihren registrierten Workflow-Typ (AWS::SimpleWorkflow::WorkflowType) auf. Wir definieren einen kleinen Wrapper darum herum, um das workflow_type-Instance-Mitglied zu nutzen, das wir im Klassen-Konstruktor abgerufen haben.

def start_execution workflow_execution = @workflow_type.start_execution( { :workflowId => @workflowId } ) poll_for_decisions end end

Sobald der Workflow ausgeführt wird, erscheinen Entscheidungsereignisse in der Aufgabenliste des Workflows, die als Workflow-Ausführungsoption in start_execution übergeben wird.

Im Gegensatz zu Optionen, die bereitgestellt werden, wenn der Workflow-Typ registriert wird, gelten Optionen, die an start_execution übergeben werden, nicht als Teil des Workflow-Typs. Sie können dies pro Workflow-Ausführung ändern, ohne die Workflow-Version ändern zu müssen.

Wir möchten, dass die Workflow-Ausführung startet, wenn wir die Datei ausführen. Fügen Sie deshalb einen Code hinzu, der die Klasse instanziiert und dann die von uns soeben definierte start_execution-Methode aufruft.

if __FILE__ == $0 require 'securerandom' # Use a different task list name every time we start a new workflow execution. # # This avoids issues if our pollers re-start before SWF considers them closed, # causing the pollers to get events from previously-run executions. workflowId = SecureRandom.uuid # Let the user start the activity worker first... puts "" puts "Amazon SWF Example" puts "------------------" puts "" puts "Start the activity worker, preferably in a separate command-line window, with" puts "the following command:" puts "" puts "> ruby swf_sns_activities.rb #{workflowId}-activities" puts "" puts "You can copy & paste it if you like, just don't copy the '>' character." puts "" puts "Press return when you're ready..." i = gets # Now, start the workflow. puts "Starting workflow execution." sample_workflow = SampleWorkflow.new(workflowId) sample_workflow.start_execution end

Um zu vermeiden, dass Konflikte mit Aufgabenlistennamen auftreten, verwenden wir SecureRandom.uuid, um eine zufällig UUID zu generieren, die wir als Aufgabenlistennamen verwenden können. So wird sichergestellt, dass für jede Workflow-Ausführung ein anderer Aufgabenlistenname genutzt wird.

Anmerkung

Aufgabenlisten zeichnen Ereignisse zur Workflow-Ausführung auf. Wenn Sie also dieselbe Aufgabenliste für mehrere Ausführungen desselben Workflow-Typs nutzen, erhalten Sie möglicherweise Ereignisse, die während der vorherigen Ausführung generiert wurden. Das gilt insbesondere dann, wenn Sie diese dicht nacheinander ausführen. Dies ist oftmals der Fall, wenn neuer Code ausprobiert oder Tests durchgeführt werden.

Zur Vermeidung des Problems mit Artefakten aus vorherigen Ausführungen kann für jede Ausführung eine neue Aufgabenliste verwendet werden, die angegeben wird, wenn mit der Workflow-Ausführung begonnen wird.

Hier ist auch etwas Code vorhanden, der Anleitungen für den ausführenden Benutzer (möglicherweise Sie) sowie die Aktivitätsversion der Aufgabenliste bereitstellt. Der Entscheider nutzt den Aufgabenlistennamen zum Planen der Aktivitäten für den Workflow. Durch die Aktivitätenimplementierung wird nach Aktivitätsereignissen in diesem Aufgabenlistennamen gesucht, damit die Anwendung weiß, wann die geplante Aktivität beginnen soll und um Aktualisierungen zur Aktivitätsausführung bereitzustellen.

Der Code wartet zudem darauf, dass der Benutzer mit der Ausführung des Aktivitäten-Starters beginnt, bevor er die Workflow-Ausführung startet. Somit ist der Aktivitäten-Starter bereit, sobald Aktivitätsaufgaben auf der bereitgestellten Aufgabenliste erscheinen.

Nächste Schritte

Die Implementierung des Workflows ist abgeschlossen. Als Nächstes definieren Sie die Aktivitäten sowie einen Aktivitäten-Starter in Abonnement-Workflow, Tutorial Teil 3: Implementieren der Aktivitäten.