Verarbeiten von Nachrichten mit hohem Volumen von Amazon SQS (Express-Workflows) - AWS Step Functions

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Verarbeiten von Nachrichten mit hohem Volumen von Amazon SQS (Express-Workflows)

In diesem Beispielprojekt wird veranschaulicht, wie Sie eineAWS Step FunctionsExpress-Workflow, um Nachrichten oder Daten aus einer hochvolumigen Ereignisquelle zu verarbeiten, z. B. Amazon Simple Queue Service (Amazon SQS). Da Express-Workflows mit sehr hoher Rate gestartet werden können, eignen sie sich ideal für die Verarbeitung von hochvolumigen Ereignissen oder für Streamingdaten-Workloads.

Im Folgenden finden Sie zwei häufig verwendete Methoden für die Ausführung Ihres Zustandsautomaten über eine Ereignisquelle:

  • Konfigurieren Sie eine Amazon CloudWatch Events-Regel, um die Ausführung eines Zustandsautomaten immer dann zu starten, wenn die Ereignisquelle ein Ereignis ausgibt. Weitere Informationen finden Sie im Abschnitt Erstellen einer CloudWatch Events-Regel, die bei einem Ereignis ausgelöst wird.

  • Ordnen Sie die Ereignisquelle einer Lambda-Funktion zu und schreiben Sie Funktionscode zur Ausführung Ihres Zustandsautomaten. Die AWS Lambda-Funktion wird jedes Mal aufgerufen, wenn Ihre Ereignisquelle ein Ereignis ausgibt, und startet dann die Ausführung eines Zustandsautomaten. Weitere Informationen finden Sie unterbenutzenAWS Lambdamit Amazon SQSaus.

In diesem Beispielprojekt wird die zweite Methode verwendet, um jedes Mal, wenn die Amazon SQS SQS-Warteschlange eine Nachricht sendet, eine Ausführung zu starten. Sie können eine ähnliche Konfiguration verwenden, um die Ausführung von Express-Workflows aus anderen Ereignisquellen wie Amazon Simple Storage Service (Amazon S3), Amazon DynamoDB und Amazon Kinesis auszulösen.

Weitere Informationen zu -Step-Functions--Serviceintegrationen finden Sie im Folgenden:

Erstellen des Zustandsautomaten und Bereitstellen von Ressourcen

  1. Öffnen SieStep Functions Konsoleund wähleErstellen eines Zustandsautomatenaus.

  2. Klicken Sie aufFühren Sie ein Beispielprojekt ausWählen Sie und anschließend ausVerarbeiten von Nachrichten mit hohem Volumen von Amazon SQSaus.

    Der Code und der Visual Workflow des Zustandsautomaten werden angezeigt.

    
            Express-SQS-Workflow mit hohem Volumen.
  3. Wählen Sie Next (Weiter).

    Die Seite Deploy resources (Ressourcen bereitstellen) wird angezeigt und listet die Ressourcen auf, die erstellt werden. Die Ressourcen für dieses Beispielprojekt:

    • Zustandsautomaten Step Functions-

    • Eine Amazon SQS SQS-Warteschlange

    • Eine Lambda-Funktion

  4. Wählen Sie Deploy Resources (Ressourcen bereitstellen) aus.

    Anmerkung

    Es kann bis zu 10 Minuten dauern, bis diese Ressourcen und dazugehörende IAM-Berechtigungen erstellt werden. Wenn die Seite Deploy resources (Ressourcen bereitstellen) anzeigt wird, können Sie den Link Stack ID (Stack-ID) öffnen, um zu sehen, welche Ressourcen bereitgestellt werden.

Triggerausführung

  1. Öffnen Sie die Amazon-SQS-Konsole.

  2. Wählen Sie die Warteschlange aus, die vom Beispielprojekt erstellt wurde.

    Der Name lautet in etwa Example-SQSQueue-wJalrXUtnFEMI.

  3. Wählen Sie in der Liste Queue Actions (Warteschlangenaktionen) die Option Send a Message (Eine Nachricht senden) aus.

  4. Verwenden Sie die Schaltfläche zum Kopieren, um die folgende Nachricht zu kopieren. Geben Sie sie dann im Fenster Send a Message (Eine Nachricht senden) ein und wählen Sie Send Message (Nachricht senden) aus.

    Anmerkung

    In dieser Beispielmeldung wurde die Zeile input: mit Zeilenumbrüchen formatiert, um sie an die Seite anzupassen. Verwenden Sie die Schaltfläche „Copy (Kopieren)“ oder stellen Sie anderweitig sicher, dass sie als einzelne Zeile ohne Umbrüche eingegeben wird.

    { "input": "QW5kIGxpa2UgdGhlIGJhc2VsZXNzIGZhYnJpYyBvZiB0aGlzIHZpc2lvbiwgVGhlIGNsb3VkLWNhcHBlZCB0b3dlcnMsIHRoZSBnb3JnZW 91cyBwYWxhY2VzLCBUaGUgc29sZW1uIHRlbXBsZXMsIHRoZSBncmVhdCBnbG9iZSBpdHNlbGbigJQgWWVhLCBhbGwgd2hpY2ggaXQgaW5o ZXJpdOKAlHNoYWxsIGRpc3NvbHZlLCBBbmQgbGlrZSB0aGlzIGluc3Vic3RhbnRpYWwgcGFnZWFudCBmYWRlZCwgTGVhdmUgbm90IGEgcm FjayBiZWhpbmQuIFdlIGFyZSBzdWNoIHN0dWZmIEFzIGRyZWFtcyBhcmUgbWFkZSBvbiwgYW5kIG91ciBsaXR0bGUgbGlmZSBJcyByb3Vu ZGVkIHdpdGggYSBzbGVlcC4gU2lyLCBJIGFtIHZleGVkLiBCZWFyIHdpdGggbXkgd2Vha25lc3MuIE15IG9sZCBicmFpbiBpcyB0cm91Ym xlZC4gQmUgbm90IGRpc3R1cmJlZCB3aXRoIG15IGluZmlybWl0eS4gSWYgeW91IGJlIHBsZWFzZWQsIHJldGlyZSBpbnRvIG15IGNlbGwg QW5kIHRoZXJlIHJlcG9zZS4gQSB0dXJuIG9yIHR3byBJ4oCZbGwgd2FsayBUbyBzdGlsbCBteSBiZWF0aW5nIG1pbmQu" }
  5. Klicken Sie auf Close (Schließen).

  6. Öffnen SieStep Functions Konsoleaus.

  7. Wechseln Sie zu IhremAmazon CloudWatch Logs Logs-Protokollgruppeund überprüfen Sie die Protokolle. Der Name der Protokollgruppe lautet example-ExpressLogGroup-wJalrXUtnFEMI.

Beispiel für Lambda-Funktionscode

Im Folgenden finden Sie Lambda-Funktionscode, der anzeigt, wie die initiierende Lambda-Funktion die Ausführung eines Zustandsautomaten mit demAWSSDK.

import boto3 def lambda_handler(event, context): message_body = event['Records'][0]['body'] client = boto3.client('stepfunctions') response = client.start_execution( stateMachineArn='${ExpressStateMachineArn}', input=message_body )

Code des Zustandsautomaten aus diesem Beispiel

Der Express-Workflow in diesem Beispielprojekt besteht aus einer Reihe von Lambda-Funktionen für die Textverarbeitung.

Weitere Informationen darüber, wie AWS Step Functions andere AWS-Services steuern kann, finden Sie unter Verwenden von AWS Step Functions mit sonstigen Services.

{ "Comment": "An example of using Express workflows to run text processing for each message sent from an SQS queue.", "StartAt": "Decode base64 string", "States": { "Decode base64 string": { "Type": "Task", "Resource": "arn:<PARTITION>:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "FunctionName": "<BASE64_DECODER_LAMBDA_FUNCTION_NAME>", "Payload.$": "$" }, "Next": "Generate statistics" }, "Generate statistics": { "Type": "Task", "Resource": "arn:<PARTITION>:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "FunctionName": "<TEXT_STATS_GENERATING_LAMBDA_FUNCTION_NAME>", "Payload.$": "$" }, "Next": "Remove special characters" }, "Remove special characters": { "Type": "Task", "Resource": "arn:<PARTITION>:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "FunctionName": "<STRING_CLEANING_LAMBDA_FUNCTION_NAME>", "Payload.$": "$" }, "Next": "Tokenize and count" }, "Tokenize and count": { "Type": "Task", "Resource": "arn:<PARTITION>:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "FunctionName": "<TOKENIZING_AND_WORD_COUNTING_LAMBDA_FUNCTION_NAME>", "Payload.$": "$" }, "End": true } } }

IAM-Beispiel

In diesem BeispielAWS Identity and Access ManagementDie (IAM) -Beispielprojekt generierte Richtlinie enthält die geringstmöglichen Privilegien, die für die Ausführung des Zustandsautomaten und der verwandten Ressourcen erforderlich sind. Wir empfehlen, nur die erforderlichen Berechtigungen in Ihre IAM-Richtlinien aufzunehmen.

{ "Version": "2012-10-17", "Statement": [ { "Action": [ "lambda:InvokeFunction" ], "Resource": [ "arn:aws:lambda:us-east-1:123456789012:function:example-Base64DecodeLambda-wJalrXUtnFEMI", "arn:aws:lambda:us-east-1:123456789012:function:example-StringCleanerLambda-je7MtGbClwBF", "arn:aws:lambda:us-east-1:123456789012:function:example-TokenizerCounterLambda-wJalrXUtnFEMI", "arn:aws:lambda:us-east-1:123456789012:function:example-GenerateStatsLambda-je7MtGbClwBF" ], "Effect": "Allow" } ] }

Die folgende Richtlinie stellt sicher, dass ausreichende Berechtigungen für CloudWatch Logs vorhanden sind.

{ "Version": "2012-10-17", "Statement": [ { "Action": [ "logs:CreateLogDelivery", "logs:GetLogDelivery", "logs:UpdateLogDelivery", "logs:DeleteLogDelivery", "logs:ListLogDeliveries", "logs:PutResourcePolicy", "logs:DescribeResourcePolicies", "logs:DescribeLogGroups" ], "Resource": [ "*" ], "Effect": "Allow" } ] }

Weitere Informationen zum Konfigurieren von IAM bei Verwendung von Step Functions mit anderenAWSDienstleistungen, sieheIAM-Richtlinien für integrierte Diensteaus.