Traitement d'éléments de données individuels à l'aide d'une fonction Lambda - AWS Step Functions

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Traitement d'éléments de données individuels à l'aide d'une fonction Lambda

Dans ce didacticiel, vous allez utiliser le ItemBatcher champ d'état de la carte distribuée pour itérer sur les éléments individuels présents dans un lot à l'aide d'une fonction Lambda. L'état de la carte distribuée lance quatre exécutions de flux de travail secondaires. Chacun de ces flux de travail enfants exécute un état de carte intégrée. À chaque itération, l'état Inline Map invoque une fonction Lambda et transmet un seul élément du lot à la fonction. La fonction Lambda traite ensuite l'élément et renvoie le résultat.

Vous allez créer une machine à états qui effectue la multiplication sur un tableau d'entiers. Supposons que le tableau d'entiers que vous fournissez en entrée est [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] et que le facteur de multiplication est7. Ensuite, le tableau résultant formé après avoir multiplié ces entiers par un facteur de 7 sera. [7, 14, 21, 28, 35, 42, 49, 56, 63, 70]

Étape 1 : Création de la machine à états

Au cours de cette étape, vous créez le prototype de flux de travail de la machine à états qui transmet un seul élément d'un lot d'éléments à chaque appel de la fonction Lambda que vous allez créer à l'étape 2.

  • Utilisez la définition suivante pour créer une machine à états à l'aide de la console Step Functions. Pour plus d'informations sur la création d'une machine à états, consultez Étape 1 : Création du prototype de flux de travail le didacticiel Getting started with using Distributed Map state.

    Dans cette machine à états, vous définissez un état de carte distribuée qui accepte un tableau de 10 entiers en entrée et transmet ces éléments du tableau aux flux de travail enfants exécutés par lots. Chaque exécution d'un flux de travail enfant reçoit un lot de trois éléments en entrée et exécute un état Inline Map. Chaque itération de l'état Inline Map appelle une fonction Lambda et transmet un élément du lot à la fonction. Cette fonction multiplie ensuite l'élément par un facteur de 7 et renvoie le résultat.

    La sortie de chaque exécution de flux de travail enfant est un tableau JSON qui contient le résultat de la multiplication pour chacun des éléments transmis.

    Important

    Assurez-vous de remplacer l'Amazon Resource Name (ARN) de la fonction Lambda dans le code suivant par l'ARN de la fonction que vous allez créer à l'étape 2.

    { "StartAt": "Pass", "States": { "Pass": { "Type": "Pass", "Next": "Map", "Result": { "MyMultiplicationFactor": 7, "MyItems": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] } }, "Map": { "Type": "Map", "ItemProcessor": { "ProcessorConfig": { "Mode": "DISTRIBUTED", "ExecutionType": "STANDARD" }, "StartAt": "InnerMap", "States": { "InnerMap": { "Type": "Map", "ItemProcessor": { "ProcessorConfig": { "Mode": "INLINE" }, "StartAt": "Lambda Invoke", "States": { "Lambda Invoke": { "Type": "Task", "Resource": "arn:aws:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "Payload.$": "$", "FunctionName": "arn:aws:lambda:us-east-1:123456789012:function:functionName" }, "Retry": [ { "ErrorEquals": [ "Lambda.ServiceException", "Lambda.AWSLambdaException", "Lambda.SdkClientException", "Lambda.TooManyRequestsException" ], "IntervalSeconds": 2, "MaxAttempts": 6, "BackoffRate": 2 } ], "End": true } } }, "End": true, "ItemsPath": "$.Items", "ItemSelector": { "MyMultiplicationFactor.$": "$.BatchInput.MyMultiplicationFactor", "MyItem.$": "$$.Map.Item.Value" } } } }, "End": true, "Label": "Map", "MaxConcurrency": 1000, "ItemsPath": "$.MyItems", "ItemBatcher": { "MaxItemsPerBatch": 3, "BatchInput": { "MyMultiplicationFactor.$": "$.MyMultiplicationFactor" } } } } }

Étape 2 : Création de la fonction Lambda

Au cours de cette étape, vous créez la fonction Lambda qui traite chaque article transmis par le lot.

Important

Assurez-vous que votre fonction Lambda est identique à celle de votre Région AWS machine à états.

Pour créer la fonction Lambda
  1. Utilisez la console Lambda pour créer une fonction Lambda Python 3.9 nommée. ProcessSingleItem Pour plus d'informations sur la création d'une fonction Lambda, voir Étape 4 : Configuration de la fonction Lambda dans le didacticiel Getting started with using Distributed Map state.

  2. Copiez le code suivant pour la fonction Lambda et collez-le dans la section Code source de votre fonction Lambda.

    import json def lambda_handler(event, context): multiplication_factor = event['MyMultiplicationFactor'] item = event['MyItem'] result = multiplication_factor * item return { 'statusCode': 200, 'multiplied': result }
  3. Après avoir créé votre fonction Lambda, copiez l'ARN de la fonction affiché dans le coin supérieur droit de la page. Pour copier l'ARN, cliquez sur le icon to copy the Lambda function's Amazon Resource Name . Voici un exemple d'ARN, où function-nameest le nom de la fonction Lambda (dans ce cas,ProcessSingleItem) :

    arn:aws:lambda:us-east-1:123456789012:function:function-name

    Vous devrez fournir la fonction ARN dans la machine à états que vous avez créée à l'étape 1.

  4. Choisissez Déployer pour déployer les modifications.

Étape 3 : Exécutez la machine d'état

Lorsque vous exécutez la machine à états, l'état de la carte distribuée lance quatre exécutions de flux de travail secondaires, chaque exécution traitant trois éléments, tandis qu'une exécution traite un seul élément.

L'exemple suivant montre les données transmises à l'une des invocations de ProcessSingleItemfonction dans le cadre de l'exécution d'un flux de travail enfant.

{ "MyMultiplicationFactor": 7, "MyItem": 1 }

Compte tenu de cette entrée, l'exemple suivant montre la sortie renvoyée par la fonction Lambda.

{ "statusCode": 200, "multiplied": 7 }

L'exemple suivant montre le tableau JSON de sortie pour l'une des exécutions du flux de travail enfant.

[ { "statusCode": 200, "multiplied": 7 }, { "statusCode": 200, "multiplied": 14 }, { "statusCode": 200, "multiplied": 21 } ]

La machine d'état renvoie la sortie suivante qui contient quatre tableaux pour les quatre exécutions de flux de travail secondaires. Ces tableaux contiennent les résultats de multiplication des éléments d'entrée individuels.

Enfin, la sortie de la machine à états est un tableau nommé multiplied qui combine tous les résultats de multiplication renvoyés pour les quatre exécutions de flux de travail secondaires.

[ [ { "statusCode": 200, "multiplied": 7 }, { "statusCode": 200, "multiplied": 14 }, { "statusCode": 200, "multiplied": 21 } ], [ { "statusCode": 200, "multiplied": 28 }, { "statusCode": 200, "multiplied": 35 }, { "statusCode": 200, "multiplied": 42 } ], [ { "statusCode": 200, "multiplied": 49 }, { "statusCode": 200, "multiplied": 56 }, { "statusCode": 200, "multiplied": 63 } ], [ { "statusCode": 200, "multiplied": 70 } ] ]

Pour combiner tous les résultats de multiplication renvoyés par les exécutions du flux de travail enfant dans un seul tableau de sortie, vous pouvez utiliser le ResultSelector champ. Définissez ce champ dans l'état de la carte distribuée pour rechercher tous les résultats, extraire les résultats individuels, puis les combiner dans un seul tableau de sortie nommémultiplied.

Pour utiliser le ResultSelector champ, mettez à jour la définition de votre machine à états comme indiqué dans l'exemple suivant.

{ "StartAt": "Pass", "States": { ... ... "Map": { "Type": "Map", ... ... "ItemBatcher": { "MaxItemsPerBatch": 3, "BatchInput": { "MyMultiplicationFactor.$": "$.MyMultiplicationFactor" } }, "ItemsPath": "$.MyItems", "ResultSelector": { "multiplied.$": "$..multiplied" } } } }

La machine d'état mise à jour renvoie un tableau de sortie consolidé, comme indiqué dans l'exemple suivant.

{ "multiplied": [7, 14, 21, 28, 35, 42, 49, 56, 63, 70] }