Copiar datos CSV a gran escala mediante Distributed Map - AWS Step Functions

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Copiar datos CSV a gran escala mediante Distributed Map

Este tutorial le ayuda a empezar a utilizar el estado Map en modo distribuido. Un estado Map establecido en Distributed se conoce como estado Map Distributed. El estado Distributed Map se utiliza en los flujos de trabajo para iterar sobre orígenes de datos de Amazon S3 a gran escala. El estado Map ejecuta cada iteración como una ejecución de flujo de trabajo secundario, lo que permite una alta simultaneidad. Para obtener más información sobre el modo distribuido, consulte Estado Map en modo distribuido.

En este tutorial, utilice el estado Distributed Map para recorrer en iteración un archivo CSV en un bucket de Amazon S3. A continuación, devuelve su contenido, junto con el ARN de la ejecución de un flujo de trabajo secundario, en otro bucket de Amazon S3. Comience por crear un prototipo de flujo de trabajo en Workflow Studio. A continuación, establezca el modo de procesamiento del estado Map en Distribuido, especifique el archivo CSV como conjunto de datos y proporcione su ubicación al estado Map. También debe especificar el tipo de flujo de trabajo para las ejecuciones del flujo de trabajo secundario en las que el estado Distributed Map comienza como Rápido.

Además de estos ajustes, también debe especificar otras configuraciones, como el número máximo de ejecuciones simultáneas de flujos de trabajo secundarios y la ubicación para exportar el resultado de Map, para el flujo de trabajo de ejemplo utilizado en este tutorial.

Requisitos previos

  • Cargar un archivo CSV en un bucket de Amazon S3. Debe definir una fila de encabezado en el archivo CSV. Para obtener información sobre los límites de tamaño impuestos al archivo CSV y cómo especificar la fila de encabezado, consulte Archivo CSV en un bucket de Amazon S3.

  • Cree otro bucket de Amazon S3 y una carpeta dentro del mismo para exportar el resultado del estado Map.

importante

Asegúrese de que los buckets de Amazon S3 estén en la misma máquina de estado Cuenta de AWS y en la misma posición Región de AWS que ella.

Paso 1: Crear el prototipo de flujo de trabajo

En este paso, creará el prototipo para el flujo de trabajo de utilizando Workflow Studio. Workflow Studio es un diseñador visual de flujos de trabajo disponible en la consola de Step Functions. Puede elegir el estado y la acción de API necesarios en las pestañas Flujo y Acciones, respectivamente. Utilizará la característica de arrastrar y soltar de Workflow Studio para crear el prototipo del flujo de trabajo.

  1. Abra la consola de Step Functions y seleccione Crear máquina de estado.

  2. En el cuadro de diálogo Elegir una plantilla, seleccione En blanco.

  3. Elija Seleccionar. Se abrirá Workflow Studio en Modo Diseño.

  4. Desde la pestaña Flujo, arrastre un estado Map al estado vacío con la etiqueta Arrastrar la primera acción aquí.

  5. En la pestaña Configuración, en Nombre de estado, escriba Process data.

  6. En la pestaña Acciones, arrastre una acción de la API Invocación de AWS Lambda y suéltela dentro del estado Procesar datos.

  7. Cambie el nombre del estado Invoke AWS Lambda a Process CSV data.

Paso 2: Configurar los campos necesarios para el estado Map

En este paso, configurará los siguientes campos obligatorios del estado Distributed Map:

  • ItemReader— Especifica el conjunto de datos y su ubicación desde la que el Map estado puede leer la entrada.

  • ItemProcessor: especifica los siguientes valores:

    • ProcessorConfig, defina los valores Mode y ExecutionType en DISTRIBUTED yEXPRESS, respectivamente. Esto establece el modo de procesamiento del estado Map y el tipo de flujo de trabajo para las ejecuciones de flujos de trabajo secundarios en las que se inicia el estado Distributed Map.

    • StartAt: el primer estado del flujo de trabajo de Map.

    • States: define el flujo de trabajo de Map, que consiste en un conjunto de pasos que se deben repetir en cada ejecución del flujo de trabajo secundario.

  • ResultWriter— Especifica la ubicación de Amazon S3 en la que Step Functions escribe los resultados del estado del mapa distribuido.

    importante

    Asegúrese de que el depósito de Amazon S3 que utiliza para exportar los resultados de una ejecución de mapas esté en la misma máquina de estados Cuenta de AWS y Región de AWS que esté en su máquina de estados. De lo contrario, la ejecución de la máquina de estado fallará y se producirá el error States.ResultWriterFailed.

Para configurar los campos necesarios:
  1. Elija el estado Procesar datos y haga lo siguiente en la pestaña Configuración; haga lo siguiente:

    1. Para Modo de procesamiento, elija Distribuido.

    2. En Origen del elemento, elija Amazon S3 y, a continuación, elija el Archivo CSV en S3) de la lista desplegable Origen del elemento de S3.

    3. Haga lo siguiente para especificar la ubicación en Amazon S3 de su archivo CSV:

      1. Para Objeto S3, seleccione Introducir bucket y clave en la lista desplegable.

      2. En Bucket, introduzca el nombre del bucket de Amazon S3, que contiene el archivo CSV. Por ejemplo, sourceBucket.

      3. En Clave, introduzca el nombre del objeto de Amazon S3 en el que guardó el archivo CSV. También debe especificar el nombre del archivo CSV en este campo. Por ejemplo, csvDataset/ratings.csv.

    4. Para los archivos CSV, también debe especificar la ubicación del encabezado de la columna. Para ello, elija Configuración adicional y, a continuación, para la Ubicación del encabezado CSV, mantenga la selección predeterminada de Primera fila si la primera fila del archivo CSV es el encabezado. De lo contrario, elija Dado para especificar el encabezado dentro de la definición de la máquina de estado. Para obtener más información, consulte ReaderConfig.

    5. Para Tipo de ejecución secundaria, elija Express.

  2. En Exportar ubicación, para exportar los resultados de Map Run a una ubicación específica de Amazon S3, elija Exportar la salida del estado Map a Amazon S3.

  3. Haga lo siguiente:

    1. Para Bucket de S3, seleccione Introducir el nombre y el prefijo del bucket en la lista desplegable.

    2. En bucket, escriba el nombre del bucket de Amazon S3 en el que desea exportar los resultados. Por ejemplo, mapOutputs.

    3. En Prefijo, introduzca el nombre de la carpeta en la que desee guardar los resultados. Por ejemplo, resultData.

Paso 3: Configurar opciones adicionales

Además de los ajustes necesarios para un estado Distributed Map, también puede especificar otras opciones. Estas pueden incluir el número máximo de ejecuciones simultáneas de flujos de trabajo secundarios y la ubicación a la que se debe exportar el resultado del estado Map.

  1. Seleccione el estado Procesar datos. A continuación, en Fuente de elemento, elija Configuración adicional.

  2. Haga lo siguiente:

    1. Seleccione Modificar elementos con ItemSelector para especificar una entrada JSON personalizada para cada ejecución de flujo de trabajo secundario.

    2. Introduzca la entrada de JSON siguiente:

      { "index.$": "$$.Map.Item.Index", "value.$": "$$.Map.Item.Value" }

      Para obtener información sobre cómo crear una entrada personalizada, consulte ItemSelector.

  3. En la Configuración de versión ejecutable, en Límite de simultaneidad, especifique el número de ejecuciones simultáneas del flujo de trabajo secundario que puede iniciar el estado Distributed Map. Por ejemplo, escriba 100.

  4. Abra una nueva ventana o pestaña en el navegador y complete la configuración de la función de Lambda que utilizará en este flujo de trabajo, tal y como se explica en Paso 4: Configurar la función de Lambda.

Paso 4: Configurar la función de Lambda

importante

Asegúrese de que su función Lambda esté en la Región de AWS misma posición que su máquina de estados.

  1. Abra la consola de Lambda; y elija Crear función.

  2. En la página Crear función, elija Diseñar desde cero.

  3. En la sección Información básica, configure la función de Lambda:

    1. En Nombre de la función, introduzca distributedMapLambda.

    2. En Tiempo de ejecución, elija Node.js 16.x.

    3. Mantenga todas las selecciones predeterminadas y elija Crear función.

    4. Tras crear la función de Lambda, copie el Nombre de recurso de Amazon (ARN) de la función que aparece en la esquina superior derecha de la página. Deberá proporcionar esto en su prototipo de flujo de trabajo. Para copiar el ARN, haga clic en copy Amazon Resource Name . A continuación se muestra un ejemplo de ARN:

      arn:aws:lambda:us-east-2:123456789012:function:distributedMapLambda
  4. Copie el siguiente código para la función Lambda y péguelo en la sección Código fuente de la distributedMapLambdapágina.

    exports.handler = async function(event, context) { console.log("Received Input:\n", event); return { 'statusCode' : 200, 'inputReceived' : event //returns the input that it received } };
  5. Elija Implementar. Una vez implementada la función, elija Probar para ver el resultado de la función de Lambda.

Paso 5: Actualizar el prototipo de flujo de trabajo

En la consola de Step Functions, actualizará su flujo de trabajo para añadir el ARN de la función de Lambda.

  1. Vuelva a la pestaña o ventana en la que creó el prototipo del flujo de trabajo.

  2. Elija el estado Procesar datos de CSV y haga lo siguiente en la pestaña Configuración; haga lo siguiente:

    1. En Tipo de integración, elija Optimizado.

    2. En Función de Lambda, empiece a introducir el nombre de la función de Lambda. Elija la función en la lista desplegable que aparece o elija Introducir nombre de función e introduzca el ARN de la función de Lambda.

Paso 6: Revisar la definición de Amazon States Language generada automáticamente y guardar el flujo de trabajo

A medida que arrastra y suelta los estados de las pestañas Acción y Flujo al lienzo, Workflow Studio redacta automáticamente la definición de su flujo de trabajo en Amazon States Language en tiempo real. Puede editar esta definición según sea necesario.

  1. (Opcional) Seleccione Definición en el Inspector panel y visualice la definición de la máquina de estado.

    sugerencia

    También puede ver la definición de ASL en el Editor de código de Workflow Studio. En el editor de código también puede editar la definición de ASL del flujo de trabajo.

    El siguiente código de ejemplo muestra la definición de Amazon States Language generada automáticamente para su flujo de trabajo.

    { "Comment": "Using Map state in Distributed mode", "StartAt": "Process data", "States": { "Process data": { "Type": "Map", "MaxConcurrency": 100, "ItemReader": { "ReaderConfig": { "InputType": "CSV", "CSVHeaderLocation": "FIRST_ROW" }, "Resource": "arn:aws:states:::s3:getObject", "Parameters": { "Bucket": "sourceBucket", "Key": "csvDataset/ratings.csv" } }, "ItemProcessor": { "ProcessorConfig": { "Mode": "DISTRIBUTED", "ExecutionType": "EXPRESS" }, "StartAt": "Process CSV data", "States": { "Process CSV data": { "Type": "Task", "Resource": "arn:aws:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "Payload.$": "$", "FunctionName": "arn:aws:lambda:us-east-2:123456789012:function:distributedMapLambda" }, "End": true } } }, "Label": "Processdata", "End": true, "ResultWriter": { "Resource": "arn:aws:states:::s3:putObject", "Parameters": { "Bucket": "mapOutputs", "Prefix": "resultData" } }, "ItemSelector": { "index.$": "$$.Map.Item.Index", "value.$": "$$.Map.Item.Value" } } } }
  2. Especifique un nombre para la máquina de estado. Para ello, seleccione el icono de edición situado junto al nombre de la máquina de estado predeterminada de MyStateMachine. A continuación, en Configuración de máquina de estado, especifique un nombre en el cuadro Nombre de la máquina de estado.

    En este tutorial, ingrese el nombre DistributedMapDemo.

  3. (Opcional) En Configuración de máquina de estado, especifique otros ajustes del flujo de trabajo, como el tipo de máquina de estado y su función de ejecución.

    Para este tutorial, mantenga todas las selecciones predeterminadas en Configuración de máquina de estado.

  4. En el cuadro de diálogo Confirmar creación de rol, elija Confirmar para continuar.

    También puede seleccionar Ver configuración de rol para volver a Configuración de máquina de estado.

    nota

    Si se elimina el rol de IAM que crea Step Functions, no se podrá volver a crear más adelante. Asimismo, si se modifica el rol (por ejemplo, eliminando Step Functions de las entidades principales de la política de IAM), Step Functions no podrá restablecer la configuración original más adelante.

Paso 7: Ejecutar la máquina de estado

Una ejecución es una instancia de su máquina de estado en la que ejecuta su flujo de trabajo para realizar tareas.

  1. En la DistributedMapDemopágina, elija Iniciar ejecución.

  2. En el cuadro de diálogo Iniciar ejecución, haga lo siguiente:

    1. (Opcional) Para identificar la ejecución, puede especificar un nombre en el cuadro Nombre. De forma predeterminada, Step Functions genera automáticamente un nombre de ejecución único.

      nota

      Step Functions permite crear nombres para máquinas de estado, ejecuciones, actividades y etiquetas que contengan caracteres no ASCII. Estos nombres que no son ASCII no funcionan con Amazon. CloudWatch Para asegurarse de que puede realizar un seguimiento de CloudWatch las métricas, elija un nombre que utilice únicamente caracteres ASCII.

    2. (Opcional) En el cuadro Entrada, introduzca los valores de entrada en formato JSON para ejecutar el flujo de trabajo.

    3. Seleccione Iniciar ejecución.

    4. La consola de Step Functions le dirige a una página cuyo título es su ID de ejecución. Esta página se conoce como Detalles de la ejecución. En esta página, puede revisar los resultados de la ejecución a medida que avanza la ejecución o una vez finalizada.

      Para revisar los resultados de la ejecución, elija los estados individuales en la Vista de gráfico y, a continuación, elija las pestañas individuales del panel Detalles del paso para ver los detalles de cada estado, incluidas la entrada, la salida y la definición, respectivamente. Para obtener más información sobre la ejecución que puede ver en la página Detalles de la ejecución, consulte Página de detalles de ejecución: información general de la interfaz.

    Por ejemplo, elija el estado Map y, a continuación, elija Map Run para abrir la página de Detalles de Map Run. En esta página, puede ver todos los detalles de ejecución de estado Map Distributed y las ejecuciones del flujo de trabajo secundario que inició. Para obtener información acerca de esta página, consulte Examen de Map Run.