Procesar mensajes de un volumen elevado desde Amazon SQS (flujos de trabajo rápidos) - AWS Step Functions

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Procesar mensajes de un volumen elevado desde Amazon SQS (flujos de trabajo rápidos)

Este proyecto de ejemplo demuestra cómo utilizar un flujo de AWS Step Functions trabajo rápido para procesar mensajes o datos de una fuente de eventos de gran volumen, como Amazon Simple Queue Service (Amazon SQS). Dado que los flujos de trabajo rápidos se pueden iniciar a una velocidad muy elevada, son ideales para las cargas de trabajo de datos de streaming o procesamiento de eventos de un volumen elevado.

A continuación se muestran dos métodos utilizados con frecuencia para ejecutar la máquina de estado desde un origen de eventos:

  • Configure una regla de Amazon CloudWatch Events para iniciar la ejecución de una máquina de estados siempre que la fuente del evento emita un evento. Para obtener más información, consulte Creación de una regla de CloudWatch eventos que se active en un evento.

  • Asigne el origen de eventos a una función Lambda y escriba el código de función para ejecutar la máquina de estado. La AWS Lambda función se invoca cada vez que la fuente de eventos emite un evento, lo que a su vez inicia una ejecución en una máquina de estados. Para obtener más información, consulte Uso de AWS Lambda con Amazon SQS.

Este proyecto de ejemplo utiliza el segundo método para iniciar una ejecución cada vez que la cola de Amazon SQS envía un mensaje. Puede usar una configuración similar para activar la ejecución de flujos de trabajo rápidos desde otros orígenes de eventos, como Amazon Simple Storage Service (Amazon S3), Amazon DynamoDB y Amazon Kinesis.

Para obtener más información acerca de los flujos de trabajo rápidos y las integraciones de servicios de Step Functions, consulte los siguientes temas:

Paso 1: Crear la máquina de estado y aprovisionar recursos

  1. Abra la consola de Step Functions y seleccione Crear máquina de estado.

  2. Escriba Process high-volume messages from SQS en el cuadro de búsqueda y, a continuación, seleccione Procesar mensajes de un volumen elevado desde SQS en los resultados de búsqueda que aparecen.

  3. Elija Siguiente para continuar.

  4. Step Functions muestra una lista de las Servicios de AWS utilizadas en el proyecto de muestra que ha seleccionado. También muestra un gráfico del flujo de trabajo para el proyecto de muestra. Implemente este proyecto en su empresa Cuenta de AWS o utilícelo como punto de partida para crear sus propios proyectos. En función de cómo desee continuar, elija Ejecutar una demostración o Crear a partir de ella.

    En este proyecto de muestra se implementan los siguientes recursos:

    • Cuatro funciones de Lambda

    • Una cola de Amazon SQS

    • ¿Una máquina de AWS Step Functions estados

    • Funciones relacionadas AWS Identity and Access Management (IAM)

    En la siguiente imagen se ilustra el proyecto de muestra Procesar mensajes de un volumen elevado desde SQS:

    Gráfico del flujo de trabajo del proyecto de muestra Procesar mensajes de un volumen elevado desde SQS.
  5. Elija Utilizar plantilla para continuar con la selección.

  6. Realice una de las acciones siguientes:

    • Si se ha seleccionado Crear a partir de ella, Step Functions crea el prototipo de flujo de trabajo para el proyecto de muestra que ha seleccionado. Step Functions no implementa los recursos que se enumeran en la definición del flujo de trabajo.

      En Modo Diseño de Workflow Studio, arrastre y suelte los estados desde el Navegador de estados para seguir creando su prototipo de flujo de trabajo. Del mismo modo, cambie al Modo Código que proporciona un editor de código integrado similar a VS Code para actualizar la definición (ASL) de Lenguaje de estados de Amazon de su máquina de estado en la consola de Step Functions. Para obtener más información acerca del uso de Workflow Studio para crear máquinas de estados, consulte Usar Workflow Studio.

      importante

      No olvide actualizar el marcador de posición del nombre de recurso de Amazon (ARN) para los recursos que se utilizan en el proyecto de muestra antes de ejecutar el flujo de trabajo.

    • Si seleccionó Ejecutar una demostración, Step Functions crea un proyecto de ejemplo de solo lectura que utiliza una AWS CloudFormation plantilla para implementar los AWS recursos que figuran en esa plantilla en su empresa. Cuenta de AWS

      sugerencia

      Seleccione Código para ver la definición de máquina de estados del proyecto de muestra.

      Cuando esté listo, elija Implementar y ejecutar para implementar el proyecto de muestra y crear los recursos.

      El proceso de creación de estos recursos y los permisos de IAM relacionados puede tardar hasta 10 minutos. Mientras se despliegan sus recursos, puede abrir el enlace CloudFormation Stack ID para ver qué recursos se están aprovisionando.

      Una vez que se creen todos los recursos del proyecto de muestra, podrá ver el nuevo proyecto de muestra en la página Máquinas de estado.

      importante

      Es posible que se apliquen cargos estándar por cada servicio utilizado en la CloudFormation plantilla.

Paso 2: Activar la ejecución de la máquina de estado

  1. Abra la consola de Amazon SQS.

  2. Seleccione la cola que ha creado el proyecto de ejemplo.

    El nombre será similar a Example-SQSQueue-wJalrXUtnFEMI.

  3. En la lista Acciones de cola, seleccione Enviar un mensaje.

  4. Utilice el botón de copiar para copiar el siguiente mensaje y, en la ventana Enviar un mensaje, escríbalo y seleccione el botón Enviar mensaje.

    nota

    En este mensaje de ejemplo, la línea de input: tiene un formato con saltos de línea para ajustarse a la página. Utilice el botón de copiar o asegúrese de que se introduzca como una línea única sin saltos de línea.

    { "input": "QW5kIGxpa2UgdGhlIGJhc2VsZXNzIGZhYnJpYyBvZiB0aGlzIHZpc2lvbiwgVGhlIGNsb3VkLWNhcHBlZCB0b3dlcnMsIHRoZSBnb3JnZW 91cyBwYWxhY2VzLCBUaGUgc29sZW1uIHRlbXBsZXMsIHRoZSBncmVhdCBnbG9iZSBpdHNlbGbigJQgWWVhLCBhbGwgd2hpY2ggaXQgaW5o ZXJpdOKAlHNoYWxsIGRpc3NvbHZlLCBBbmQgbGlrZSB0aGlzIGluc3Vic3RhbnRpYWwgcGFnZWFudCBmYWRlZCwgTGVhdmUgbm90IGEgcm FjayBiZWhpbmQuIFdlIGFyZSBzdWNoIHN0dWZmIEFzIGRyZWFtcyBhcmUgbWFkZSBvbiwgYW5kIG91ciBsaXR0bGUgbGlmZSBJcyByb3Vu ZGVkIHdpdGggYSBzbGVlcC4gU2lyLCBJIGFtIHZleGVkLiBCZWFyIHdpdGggbXkgd2Vha25lc3MuIE15IG9sZCBicmFpbiBpcyB0cm91Ym xlZC4gQmUgbm90IGRpc3R1cmJlZCB3aXRoIG15IGluZmlybWl0eS4gSWYgeW91IGJlIHBsZWFzZWQsIHJldGlyZSBpbnRvIG15IGNlbGwg QW5kIHRoZXJlIHJlcG9zZS4gQSB0dXJuIG9yIHR3byBJ4oCZbGwgd2FsayBUbyBzdGlsbCBteSBiZWF0aW5nIG1pbmQu" }
  5. Elija Close.

  6. Abra la consola de Step Functions.

  7. Ve a tu grupo de CloudWatch registros de Amazon Logs e inspecciona los registros. El nombre del grupo de registros tendrá el siguiente aspecto: ExpressLogGroup -wjalRxUtnFemi.

Código de función de Lambda de ejemplo

El siguiente es el código de la función Lambda que muestra cómo la función Lambda iniciadora inicia la ejecución de una máquina de estados mediante el SDK. AWS

import boto3 def lambda_handler(event, context): message_body = event['Records'][0]['body'] client = boto3.client('stepfunctions') response = client.start_execution( stateMachineArn='${ExpressStateMachineArn}', input=message_body )

Código de la máquina de estado de ejemplo

El flujo de trabajo rápido de este proyecto de ejemplo está compuesto por un conjunto de funciones Lambda para procesar textos.

Para obtener más información sobre cómo AWS Step Functions puede controlar otros AWS servicios, consulte. Uso AWS Step Functions con otros servicios

{ "Comment": "An example of using Express workflows to run text processing for each message sent from an SQS queue.", "StartAt": "Decode base64 string", "States": { "Decode base64 string": { "Type": "Task", "Resource": "arn:<PARTITION>:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "FunctionName": "<BASE64_DECODER_LAMBDA_FUNCTION_NAME>", "Payload.$": "$" }, "Next": "Generate statistics" }, "Generate statistics": { "Type": "Task", "Resource": "arn:<PARTITION>:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "FunctionName": "<TEXT_STATS_GENERATING_LAMBDA_FUNCTION_NAME>", "Payload.$": "$" }, "Next": "Remove special characters" }, "Remove special characters": { "Type": "Task", "Resource": "arn:<PARTITION>:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "FunctionName": "<STRING_CLEANING_LAMBDA_FUNCTION_NAME>", "Payload.$": "$" }, "Next": "Tokenize and count" }, "Tokenize and count": { "Type": "Task", "Resource": "arn:<PARTITION>:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "FunctionName": "<TOKENIZING_AND_WORD_COUNTING_LAMBDA_FUNCTION_NAME>", "Payload.$": "$" }, "End": true } } }

Ejemplo de IAM

Esta política de ejemplo AWS Identity and Access Management (IAM) generada por el proyecto de muestra incluye los privilegios mínimos necesarios para ejecutar la máquina de estados y los recursos relacionados. Le recomendamos que incluya únicamente los permisos necesarios en las políticas de IAM.

{ "Version": "2012-10-17", "Statement": [ { "Action": [ "lambda:InvokeFunction" ], "Resource": [ "arn:aws:lambda:us-east-1:123456789012:function:example-Base64DecodeLambda-wJalrXUtnFEMI", "arn:aws:lambda:us-east-1:123456789012:function:example-StringCleanerLambda-je7MtGbClwBF", "arn:aws:lambda:us-east-1:123456789012:function:example-TokenizerCounterLambda-wJalrXUtnFEMI", "arn:aws:lambda:us-east-1:123456789012:function:example-GenerateStatsLambda-je7MtGbClwBF" ], "Effect": "Allow" } ] }

La siguiente política garantiza que haya permisos suficientes para CloudWatch los registros.

{ "Version": "2012-10-17", "Statement": [ { "Action": [ "logs:CreateLogDelivery", "logs:GetLogDelivery", "logs:UpdateLogDelivery", "logs:DeleteLogDelivery", "logs:ListLogDeliveries", "logs:PutResourcePolicy", "logs:DescribeResourcePolicies", "logs:DescribeLogGroups" ], "Resource": [ "*" ], "Effect": "Allow" } ] }

Para obtener información sobre cómo configurar IAM al utilizar Step Functions con otros AWS servicios, consultePolíticas de IAM para servicios integrados.