Procesamiento de elementos de datos individuales con una función de Lambda - AWS Step Functions

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Procesamiento de elementos de datos individuales con una función de Lambda

En este tutorial, utilizará el campo ItemBatcher de estado Distributed Map para recorrer en iteración los elementos individuales presentes en un lote mediante una función de Lambda. El estado Distributed Map inicia cuatro ejecuciones de flujos de trabajo secundarios. Cada uno de estos flujos de trabajo secundarios ejecuta un estado Inline Map. Para cada iteración, el estado Inline Map invoca una función de Lambda y pasa un único elemento del lote a la función. A continuación, la función de Lambda procesa el elemento y devuelve el resultado.

Creará una máquina de estado que realiza la multiplicación de una matriz de números enteros. Supongamos que la matriz de enteros que proporciona como entrada es [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] y el factor de multiplicación es 7. Entonces, la matriz resultante formada después de multiplicar estos enteros por un factor de 7, será [7, 14, 21, 28, 35, 42, 49, 56, 63, 70].

Paso 1: Crear la máquina de estado

En este paso, creará el prototipo de flujo de trabajo de la máquina de estado que pasa un único elemento de un lote de elementos a cada invocación de la función de Lambda que cree en el Paso 2.

  • Utilice la siguiente definición para crear una máquina de estado mediante la consola de Step Functions. Para obtener información sobre cómo crear una máquina de estado, consulte Paso 1: Crear el prototipo de flujo de trabajo en el tutorial Cómo empezar a usar el estado Distributed Map.

    En esta máquina de estado, se define un estado Distributed Map que acepta una matriz de 10 enteros como entrada y pasa estos elementos de la matriz a las ejecuciones del flujo de trabajo secundario en lotes. Cada ejecución del flujo de trabajo secundario recibe un lote de tres elementos como entrada y ejecuta un estado Inline Map. Cada iteración del estado Inline Map invoca una función de Lambda y pasa un elemento del lote a la función. A continuación, esta función multiplica el elemento por un factor de 7 y devuelve el resultado.

    El resultado de la ejecución de cada flujo de trabajo secundario es una matriz JSON que contiene el resultado de la multiplicación de cada uno de los elementos pasados.

    importante

    Asegúrese de sustituir el nombre de recurso de Amazon (ARN) de la función de Lambda en el siguiente código por el ARN de la función que va a crear en el Paso 2.

    { "StartAt": "Pass", "States": { "Pass": { "Type": "Pass", "Next": "Map", "Result": { "MyMultiplicationFactor": 7, "MyItems": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] } }, "Map": { "Type": "Map", "ItemProcessor": { "ProcessorConfig": { "Mode": "DISTRIBUTED", "ExecutionType": "STANDARD" }, "StartAt": "InnerMap", "States": { "InnerMap": { "Type": "Map", "ItemProcessor": { "ProcessorConfig": { "Mode": "INLINE" }, "StartAt": "Lambda Invoke", "States": { "Lambda Invoke": { "Type": "Task", "Resource": "arn:aws:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "Payload.$": "$", "FunctionName": "arn:aws:lambda:us-east-1:123456789012:function:functionName" }, "Retry": [ { "ErrorEquals": [ "Lambda.ServiceException", "Lambda.AWSLambdaException", "Lambda.SdkClientException", "Lambda.TooManyRequestsException" ], "IntervalSeconds": 2, "MaxAttempts": 6, "BackoffRate": 2 } ], "End": true } } }, "End": true, "ItemsPath": "$.Items", "ItemSelector": { "MyMultiplicationFactor.$": "$.BatchInput.MyMultiplicationFactor", "MyItem.$": "$$.Map.Item.Value" } } } }, "End": true, "Label": "Map", "MaxConcurrency": 1000, "ItemsPath": "$.MyItems", "ItemBatcher": { "MaxItemsPerBatch": 3, "BatchInput": { "MyMultiplicationFactor.$": "$.MyMultiplicationFactor" } } } } }

Paso 2: Crear la función de Lambda

En este paso, se crea la función de Lambda que procesa cada elemento que se pasa del lote.

importante

Asegúrese de que su función Lambda esté en la Región de AWS misma posición que su máquina de estados.

Para crear la función de Lambda
  1. Use la consola de Lambda para crear una función de Lambda de Python 3.9 denominada ProcessSingleItem. Para obtener información sobre la creación de una función de Lambda, consulte el paso 4: Configurar la función de Lambda en el tutorial Cómo empezar a usar el estado Distributed Map.

  2. Copie el siguiente código para la función de Lambda y péguelo en la sección Código fuente de la función de Lambda.

    import json def lambda_handler(event, context): multiplication_factor = event['MyMultiplicationFactor'] item = event['MyItem'] result = multiplication_factor * item return { 'statusCode': 200, 'multiplied': result }
  3. Una vez que haya creado la función de Lambda, copie el ARN de la función que se muestra en la esquina superior derecha de la página. Para copiar el ARN, haga clic en icon to copy the Lambda function's Amazon Resource Name . El siguiente es un ejemplo de ARN, donde function-name es el nombre de la función de Lambda (en este caso, ProcessSingleItem):

    arn:aws:lambda:us-east-1:123456789012:function:function-name

    Deberá proporcionar la función ARN en la máquina de estado creada en el Paso 1.

  4. Elija Implementar para implementar estos cambios.

Paso 3: Ejecutar la máquina de estado

Al ejecutar la máquina de estado, el estado Distributed Map inicia cuatro ejecuciones de flujo de trabajo secundarias, en las que cada ejecución procesa tres elementos, mientras que una ejecución procesa un solo elemento.

En el siguiente ejemplo se muestran los datos que se pasan a una de las invocaciones de funciones ProcessSingleItem dentro de la ejecución de un flujo de trabajo secundario.

{ "MyMultiplicationFactor": 7, "MyItem": 1 }

Con esta entrada, el siguiente ejemplo muestra la salida que devuelve la función de Lambda.

{ "statusCode": 200, "multiplied": 7 }

A continuación se muestra la matriz JSON de salida para una de las ejecuciones del flujo de trabajo secundario.

[ { "statusCode": 200, "multiplied": 7 }, { "statusCode": 200, "multiplied": 14 }, { "statusCode": 200, "multiplied": 21 } ]

La máquina de estado devuelve el siguiente resultado, que contiene cuatro matrices para las cuatro ejecuciones del flujo de trabajo secundario. Estas matrices contienen los resultados de la multiplicación de los elementos de entrada individuales.

Por último, la salida de la máquina de estado es una matriz denominada multiplied que combina todos los resultados de multiplicación devueltos para las cuatro ejecuciones del flujo de trabajo secundario.

[ [ { "statusCode": 200, "multiplied": 7 }, { "statusCode": 200, "multiplied": 14 }, { "statusCode": 200, "multiplied": 21 } ], [ { "statusCode": 200, "multiplied": 28 }, { "statusCode": 200, "multiplied": 35 }, { "statusCode": 200, "multiplied": 42 } ], [ { "statusCode": 200, "multiplied": 49 }, { "statusCode": 200, "multiplied": 56 }, { "statusCode": 200, "multiplied": 63 } ], [ { "statusCode": 200, "multiplied": 70 } ] ]

Para combinar todos los resultados de multiplicación devueltos por las ejecuciones del flujo de trabajo secundario en una única matriz de salida, puede utilizar el campo ResultSelector. Defina este campo dentro del estado Distributed Map para buscar todos los resultados, extraer los resultados individuales y, a continuación, combinarlos en una única matriz de salida denominada multiplied.

Para utilizar el campo ResultSelector, actualice la definición de la máquina de estado como se muestra en el siguiente ejemplo.

{ "StartAt": "Pass", "States": { ... ... "Map": { "Type": "Map", ... ... "ItemBatcher": { "MaxItemsPerBatch": 3, "BatchInput": { "MyMultiplicationFactor.$": "$.MyMultiplicationFactor" } }, "ItemsPath": "$.MyItems", "ResultSelector": { "multiplied.$": "$..multiplied" } } } }

La máquina de estado actualizado devuelve una matriz de salida consolidada, como se muestra en el siguiente ejemplo.

{ "multiplied": [7, 14, 21, 28, 35, 42, 49, 56, 63, 70] }