Verarbeiten von Nachrichten mit hohem Volumen aus Amazon SQS (Express Workflows) - AWS Step Functions

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Verarbeiten von Nachrichten mit hohem Volumen aus Amazon SQS (Express Workflows)

Dieses Beispielprojekt zeigt, wie Sie einen AWS Step Functions Express-Workflow verwenden, um Nachrichten oder Daten aus einer Ereignisquelle mit hohem Volumen zu verarbeiten, z. B. Amazon Simple Queue Service (Amazon SQS). Da Express-Workflows mit sehr hoher Rate gestartet werden können, eignen sie sich ideal für die Verarbeitung von hochvolumigen Ereignissen oder für Streamingdaten-Workloads.

Im Folgenden finden Sie zwei häufig verwendete Methoden für die Ausführung Ihres Zustandsautomaten über eine Ereignisquelle:

  • Konfigurieren Sie eine Amazon- CloudWatch Events-Regel, um eine Ausführung des Zustandsautomaten zu starten, wenn die Ereignisquelle ein Ereignis ausgibt. Weitere Informationen finden Sie unter Erstellen einer CloudWatch Ereignisregel, die bei einem Ereignis ausgelöst wird.

  • Ordnen Sie die Ereignisquelle einer Lambda-Funktion zu und schreiben Sie Funktionscode zur Ausführung Ihres Zustandsautomaten. Die AWS Lambda Funktion wird jedes Mal aufgerufen, wenn Ihre Ereignisquelle ein Ereignis ausgibt, und startet dann eine Zustandsautomaten-Ausführung. Weitere Informationen finden Sie unter Verwenden von AWS Lambda mit Amazon SQS.

Dieses Beispielprojekt verwendet die zweite Methode, um eine Ausführung jedes Mal zu starten, wenn die Amazon SQS-Warteschlange eine Nachricht sendet. Sie können eine ähnliche Konfiguration verwenden, um die Ausführung von Express Workflows aus anderen Ereignisquellen wie Amazon Simple Storage Service (Amazon S3), Amazon DynamoDB und Amazon Kinesis auszulösen.

Weitere Informationen zu Express-Workflows- und Step-Functions-Serviceintegrationen finden Sie im Folgenden:

Schritt 1: Erstellen des Zustandsautomaten und Bereitstellen von Ressourcen

  1. Öffnen Sie die Step-Functions-Konsole und wählen Sie Zustandsautomaten erstellen aus.

  2. Geben Sie Process high-volume messages from SQS in das Suchfeld ein und wählen Sie dann in den zurückgegebenen Suchergebnissen die Option Nachrichten mit hohem Volumen aus SQS verarbeiten aus.

  3. Wählen Sie Next (Weiter), um fortzufahren.

  4. Step Functions listet die im ausgewählten Beispielprojekt AWS-Services verwendeten auf. Es zeigt auch ein Workflow-Diagramm für das Beispielprojekt. Stellen Sie dieses Projekt in Ihrem bereit AWS-Konto oder verwenden Sie es als Ausgangspunkt für die Erstellung Ihrer eigenen Projekte. Wählen Sie je nachdem, wie Sie fortfahren möchten, Demo ausführen oder darauf aufbauen aus.

    Dieses Beispielprojekt stellt die folgenden Ressourcen bereit:

    • Vier Lambda-Funktionen

    • Eine Amazon-SQS-Warteschlange

    • Ein - AWS Step Functions Zustandsautomat

    • Verwandte AWS Identity and Access Management (IAM)-Rollen

    Die folgende Abbildung zeigt das Workflow-Diagramm für das Beispielprojekt Verarbeiten von Nachrichten mit hohem Volumen aus SQS:

    
            Workflow-Diagramm des Beispielprojekts Verarbeiten von Nachrichten mit hohem Volumen aus SQS.
  5. Wählen Sie Vorlage verwenden, um mit Ihrer Auswahl fortzufahren.

  6. Führen Sie eine der folgenden Aktionen aus:

    • Wenn Sie darauf aufbauen ausgewählt haben, erstellt Step Functions den Workflow-Prototyp für das ausgewählte Beispielprojekt. Step Functions stellt die in der Workflow-Definition aufgeführten Ressourcen nicht bereit.

      Ziehen Sie in Workflow Studio den Status aus dem Entwurfsmodus, Bundesstaaten-Browser um mit der Erstellung Ihres Workflow-Projekts fortzufahren. Oder wechseln Sie zu Codemodus, das einen integrierten Code-Editor ähnlich VS Code zum Aktualisieren der Amazon States Language (ASL)-Definition Ihres Zustandsautomaten in der Step Functions-Konsole bietet. Weitere Informationen zur Verwendung von Workflow Studio zum Erstellen Ihrer Zustandsautomaten finden Sie unter Verwenden von Workflow Studio.

      Wichtig

      Denken Sie daran, den Amazon-Ressourcennamen (ARN) des Platzhalters für die im Beispielprojekt verwendeten Ressourcen zu aktualisieren, bevor Sie Ihren Workflow ausführen.

    • Wenn Sie Demo ausführen ausgewählt haben, erstellt Step Functions ein schreibgeschütztes Beispielprojekt, das eine - AWS CloudFormation Vorlage verwendet, um die in dieser Vorlage aufgeführten AWS Ressourcen in Ihrem bereitzustellen AWS-Konto.

      Tipp

      Um die Definition des Zustandsautomaten des Beispielprojekts anzuzeigen, wählen Sie Code .

      Wenn Sie bereit sind, wählen Sie Bereitstellen und Ausführen aus, um das Beispielprojekt bereitzustellen und die Ressourcen zu erstellen.

      Es kann bis zu 10 Minuten dauern, bis diese Ressourcen und zugehörige IAM-Berechtigungen erstellt werden. Während Ihre Ressourcen bereitgestellt werden, können Sie den Link CloudFormation Stack-ID öffnen, um zu sehen, welche Ressourcen bereitgestellt werden.

      Nachdem alle Ressourcen im Beispielprojekt erstellt wurden, wird das neue Beispielprojekt auf der Seite Zustandsautomaten aufgeführt.

      Wichtig

      Für jeden in der CloudFormation Vorlage verwendeten Service können Standardgebühren anfallen.

Schritt 2: Auslösen der Ausführung des Zustandsautomaten

  1. Öffnen Sie die Amazon-SQS-Konsole.

  2. Wählen Sie die Warteschlange aus, die vom Beispielprojekt erstellt wurde.

    Der Name lautet in etwa Example-SQSQueue-wJalrXUtnFEMI.

  3. Wählen Sie in der Liste Queue Actions (Warteschlangenaktionen) die Option Send a Message (Eine Nachricht senden) aus.

  4. Verwenden Sie die Schaltfläche zum Kopieren, um die folgende Nachricht zu kopieren. Geben Sie sie dann im Fenster Send a Message (Eine Nachricht senden) ein und wählen Sie Send Message (Nachricht senden) aus.

    Anmerkung

    In dieser Beispielmeldung wurde die Zeile input: mit Zeilenumbrüchen formatiert, um sie an die Seite anzupassen. Verwenden Sie die Schaltfläche „Copy (Kopieren)“ oder stellen Sie anderweitig sicher, dass sie als einzelne Zeile ohne Umbrüche eingegeben wird.

    { "input": "QW5kIGxpa2UgdGhlIGJhc2VsZXNzIGZhYnJpYyBvZiB0aGlzIHZpc2lvbiwgVGhlIGNsb3VkLWNhcHBlZCB0b3dlcnMsIHRoZSBnb3JnZW 91cyBwYWxhY2VzLCBUaGUgc29sZW1uIHRlbXBsZXMsIHRoZSBncmVhdCBnbG9iZSBpdHNlbGbigJQgWWVhLCBhbGwgd2hpY2ggaXQgaW5o ZXJpdOKAlHNoYWxsIGRpc3NvbHZlLCBBbmQgbGlrZSB0aGlzIGluc3Vic3RhbnRpYWwgcGFnZWFudCBmYWRlZCwgTGVhdmUgbm90IGEgcm FjayBiZWhpbmQuIFdlIGFyZSBzdWNoIHN0dWZmIEFzIGRyZWFtcyBhcmUgbWFkZSBvbiwgYW5kIG91ciBsaXR0bGUgbGlmZSBJcyByb3Vu ZGVkIHdpdGggYSBzbGVlcC4gU2lyLCBJIGFtIHZleGVkLiBCZWFyIHdpdGggbXkgd2Vha25lc3MuIE15IG9sZCBicmFpbiBpcyB0cm91Ym xlZC4gQmUgbm90IGRpc3R1cmJlZCB3aXRoIG15IGluZmlybWl0eS4gSWYgeW91IGJlIHBsZWFzZWQsIHJldGlyZSBpbnRvIG15IGNlbGwg QW5kIHRoZXJlIHJlcG9zZS4gQSB0dXJuIG9yIHR3byBJ4oCZbGwgd2FsayBUbyBzdGlsbCBteSBiZWF0aW5nIG1pbmQu" }
  5. Klicken Sie auf Schließen.

  6. Öffnen Sie die Step-Functions-Konsole .

  7. Rufen Sie Ihre Amazon- CloudWatch Logs-Protokollgruppe auf und überprüfen Sie die Protokolle. Der Name der Protokollgruppe sieht wie example-ExpressLogGroup-wJalrXUtnFEMI aus.

Beispiel für Lambda-Funktionscode

Im Folgenden finden Sie Lambda-Funktionscode, der zeigt, wie die initiierende Lambda-Funktion eine Zustandsautomaten-Ausführung mit dem AWS SDK startet.

import boto3 def lambda_handler(event, context): message_body = event['Records'][0]['body'] client = boto3.client('stepfunctions') response = client.start_execution( stateMachineArn='${ExpressStateMachineArn}', input=message_body )

Code des Zustandsautomaten aus diesem Beispiel

Der Express-Workflow in diesem Beispielprojekt besteht aus einer Reihe von Lambda-Funktionen für die Textverarbeitung.

Weitere Informationen darüber, wie andere - AWS Services steuern AWS Step Functions kann, finden Sie unter Verwendung AWS Step Functions mit anderen Diensten.

{ "Comment": "An example of using Express workflows to run text processing for each message sent from an SQS queue.", "StartAt": "Decode base64 string", "States": { "Decode base64 string": { "Type": "Task", "Resource": "arn:<PARTITION>:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "FunctionName": "<BASE64_DECODER_LAMBDA_FUNCTION_NAME>", "Payload.$": "$" }, "Next": "Generate statistics" }, "Generate statistics": { "Type": "Task", "Resource": "arn:<PARTITION>:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "FunctionName": "<TEXT_STATS_GENERATING_LAMBDA_FUNCTION_NAME>", "Payload.$": "$" }, "Next": "Remove special characters" }, "Remove special characters": { "Type": "Task", "Resource": "arn:<PARTITION>:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "FunctionName": "<STRING_CLEANING_LAMBDA_FUNCTION_NAME>", "Payload.$": "$" }, "Next": "Tokenize and count" }, "Tokenize and count": { "Type": "Task", "Resource": "arn:<PARTITION>:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "FunctionName": "<TOKENIZING_AND_WORD_COUNTING_LAMBDA_FUNCTION_NAME>", "Payload.$": "$" }, "End": true } } }

IAM-Beispiel

Diese vom Beispielprojekt generierte AWS Identity and Access Management (IAM)-Richtlinie enthält die geringsten Berechtigungen, die zum Ausführen des Zustandsautomaten und der zugehörigen Ressourcen erforderlich sind. Wir empfehlen Ihnen, nur die Berechtigungen einzubeziehen, die in Ihren IAM-Richtlinien erforderlich sind.

{ "Version": "2012-10-17", "Statement": [ { "Action": [ "lambda:InvokeFunction" ], "Resource": [ "arn:aws:lambda:us-east-1:123456789012:function:example-Base64DecodeLambda-wJalrXUtnFEMI", "arn:aws:lambda:us-east-1:123456789012:function:example-StringCleanerLambda-je7MtGbClwBF", "arn:aws:lambda:us-east-1:123456789012:function:example-TokenizerCounterLambda-wJalrXUtnFEMI", "arn:aws:lambda:us-east-1:123456789012:function:example-GenerateStatsLambda-je7MtGbClwBF" ], "Effect": "Allow" } ] }

Die folgende Richtlinie stellt sicher, dass ausreichende Berechtigungen für CloudWatch Protokolle vorhanden sind.

{ "Version": "2012-10-17", "Statement": [ { "Action": [ "logs:CreateLogDelivery", "logs:GetLogDelivery", "logs:UpdateLogDelivery", "logs:DeleteLogDelivery", "logs:ListLogDeliveries", "logs:PutResourcePolicy", "logs:DescribeResourcePolicies", "logs:DescribeLogGroups" ], "Resource": [ "*" ], "Effect": "Allow" } ] }

Informationen zum Konfigurieren von IAM bei der Verwendung von Step Functions mit anderen - AWS Services finden Sie unter IAM-Richtlinien für integrierte Dienste.