在 Step Functions 中使用分布式地图复制大规模CSV数据 - AWS Step Functions

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

在 Step Functions 中使用分布式地图复制大规模CSV数据

本教程将帮助您开始在分布式模式下使用 Map 状态。设置为分布式Map 状态被称为分布式 Map 状态。您可以在工作流中使用分布式 Map 状态来迭代大规模 Amazon S3 数据来源。Map 状态将每次迭代作为子工作流执行来运行,从而实现高并发数。有关分布式模式的更多信息,请参阅分布式模式下的 Map 状态

在本教程中,您将使用分布式地图状态来迭代 Amazon S3 存储桶中的CSV文件。然后,您可以将其内容以及子工作流程执行ARN的内容返回到另一个 Amazon S3 存储桶中。首先,在 Workflow Studio 中创建一个工作流原型。接下来,将Map状态的处理模式设置为 “分布式”,将CSV文件指定为数据集,然后将其位置提供给该Map状态。您还可以为子工作流执行指定工作流类型,分布式 Map 状态快速方式启动。

除了这些设置外,您还可以为本教程中使用的示例工作流指定其他配置,例如并发子工作流执行的最大数量和导出 Map 结果的位置。

先决条件

  • 将CSV文件上传到 Amazon S3 存储桶。您必须在CSV文件中定义标题行。有关对CSV文件施加的大小限制以及如何指定标题行的信息,请参见CSV在 Amazon S3 存储桶中存入文件

  • 创建另一个 Amazon S3 存储桶,并在其中创建的一个文件夹,以便将 Map 状态结果导出到该存储桶中。

重要

确保您的 Amazon S3 存储桶处于相同位置 AWS 账户 以及 AWS 区域 作为你的状态机。

第 1 步:创建工作流原型

在此步骤中,您将使用 Workflow Studio 为工作流创建原型。Workflow Studio 是一款可视化工作流设计器,可在 Step Functions 控制台中使用。您可以分别从 “流程” 和 “操作API” 选项卡中选择所需的状态和操作。您将使用 Workflow Studio 的拖放特征来创建工作流原型。

  1. 打开 Step Functions 控制台,然后选择创建状态机

  2. 选择模板对话框中,选择空白

  3. 选择 “选择” 以在中打开 “工作流工作室” 设计模式

  4. 选项卡中,将 Map 状态拖放到标有将第一个状态拖至此处的空白状态处。

  5. 配置选项卡下,在状态名称中输入 Process data

  6. 从 “操作” 选项卡中,拖动 AWS Lambda 调用API操作并将其放入流程数据状态中。

  7. 重命名 AWS Lambda 调用状态Process CSV data

第 2 步:配置 Map 状态的必填字段

在此步骤中,您将配置分布式 Map 状态的以下必填字段:

  • ItemReader— 指定数据集及其位置,该Map州可以从中读取输入。

  • ItemProcessor – 指定以下值:

    • ProcessorConfig – 将 ModeExecutionType 分别设置为 DISTRIBUTEDEXPRESS。这将为分布式 Map 状态启动的子工作流执行设置 Map 状态的处理模式和工作流类型。

    • StartAt – Map 工作流中的第一个状态。

    • States – 定义 Map 工作流程,这是在每个子工作流执行中要重复的一组步骤。

  • ResultWriter— 指定 Step Functions 写入分布式地图状态结果的 Amazon S3 位置。

    重要

    确保用于导出 Map Run 结果的 Amazon S3 存储桶处于相同位置 AWS 账户 以及 AWS 区域 作为你的状态机。否则,您的状态机执行将因 States.ResultWriterFailed 错误而失败。

要配置必填句字段,请执行以下操作:
  1. 选择 Process data 状态,然后在配置选项卡中执行以下操作:

    1. 对于处理模式,选择分布式

    2. 对于项目来源,选择 Amazon S3,然后从 S3 项目来源下拉列表中选择 S3 中的CSV文件

    3. 执行以下操作来指定CSV文件的 Amazon S3 位置:

      1. 对于 S3 对象,请从下拉列表中选择输入存储桶和密钥

      2. 对于存储桶,输入包含CSV文件的 Amazon S3 存储桶的名称。例如,amzn-s3-demo-source-bucket

      3. 对于密钥,输入您保存CSV文件的 Amazon S3 对象的名称。您还必须在此字段中指定CSV文件名。例如,csvDataset/ratings.csv

    4. 对于CSV文件,还必须指定列标题的位置。为此,请选择 “其他配置”,如果CSV文件的第一行是标题,则对于CSV标题位置,请保留默认的 “第一行” 选择。否则,请选择给定以在状态机定义中指定标题。有关更多信息,请参阅 ReaderConfig

    5. 对于子执行类型,请选择快速

  2. 导出位置中,要将 Map Run 结果导出到特定的 Amazon S3 位置,请选择将 Map 状态的输出导出到 Amazon S3

  3. 执行以下操作:

    1. 对于 S3 存储桶,请从下拉列表中选择输入存储桶名称和前缀

    2. 对于存储桶,输入要将结果导出到的 Amazon S3 存储桶的名称。例如,mapOutputs

    3. 对于前缀,输入要将结果保存到的文件夹名称。例如,resultData

第 3 步:配置其他选项

除了分布式 Map 状态 所需的设置外,您还可以指定其他选项。这些选项可以包括子工作流并发执行的最大数量和导出 Map 状态结果的位置。

  1. 选择 Process data 状态。然后,在项目来源中,选择其他配置

  2. 执行以下操作:

    1. 选择 “修改项目” ItemSelector,为每个子工作流程执行指定自定义JSON输入。

    2. 输入以下JSON输入:

      { "index.$": "$$.Map.Item.Index", "value.$": "$$.Map.Item.Value" }

      有关如何创建自定义输入的信息,请参阅 ItemSelector (地图)

  3. 运行时设置中,对于并发限制,指定分布式 Map 状态 可以启动的并发子工作流执行数量。例如,输入 100

  4. 在浏览器中打开一个新窗口或选项卡,完成您将在此工作流中使用的 Lambda 函数的配置,如第 4 步:配置 Lambda 函数中所述。

第 4 步:配置 Lambda 函数

重要

确保您的 Lambda 函数处于相同状态 AWS 区域 作为你的状态机。

  1. 打开 Lambda 控制台,然后选择创建函数

  2. 创建函数页面上,选择从头开始创作

  3. 基本信息部分中,配置您的 Lambda 函数:

    1. 对于函数名称,请输入 distributedMapLambda

    2. 对于 Runtime (运行时),选择 Node.js

    3. 保留所有默认选项,然后选择创建函数

    4. 创建 Lambda 函数后,复制页面右上角显示的函数的 Amazon 资源名称 (ARN)。您需要在工作流原型中提供此信息。以下是一个示例ARN:

      arn:aws:lambda:us-east-2:123456789012:function:distributedMapLambda
  4. 复制以下 Lambda 函数的代码,然后将其粘贴到页面的distributedMapLambda代码源部分。

    exports.handler = async function(event, context) { console.log("Received Input:\n", event); return { 'statusCode' : 200, 'inputReceived' : event //returns the input that it received } };
  5. 选择部署。函数部署后,选择测试,查看您的 Lambda 函数的输出。

第 5 步:更新工作流原型

在 Step Functions 控制台中,您将更新工作流程以添加 Lambda 函数。ARN

  1. 返回到创建工作流原型的选项卡或窗口。

  2. 选择 “处理CSV数据” 步骤,然后在 “配置” 选项卡中执行以下操作:

    1. 对于集成类型,请选择已优化

    2. 对于函数名称,输入 Lambda 函数名称。从出现的下拉列表中选择函数,或者选择输入函数名称并提供 Lambda 函数。ARN

第 6 步:查看自动生成的 Amazon States Language 定义并保存工作流

当您将状态从操作选项卡拖放到画布上时,Workflow Studio 会自动实时撰写工作流的 Amazon States Language 定义。您可以根据需要编辑此定义。

  1. (可选)在 Inspector 面板上选择定义,然后查看状态机定义。

    提示

    您也可以在 Workflow Studio 中查看ASL定义。代码编辑器在代码编辑器中,您还可以编辑工作流程的ASL定义。

    以下示例代码显示了为您的工作流自动生成的 Amazon States Language 定义。

    { "Comment": "Using Map state in Distributed mode", "StartAt": "Process data", "States": { "Process data": { "Type": "Map", "MaxConcurrency": 100, "ItemReader": { "ReaderConfig": { "InputType": "CSV", "CSVHeaderLocation": "FIRST_ROW" }, "Resource": "arn:aws:states:::s3:getObject", "Parameters": { "Bucket": "amzn-s3-demo-source-bucket", "Key": "csvDataset/ratings.csv" } }, "ItemProcessor": { "ProcessorConfig": { "Mode": "DISTRIBUTED", "ExecutionType": "EXPRESS" }, "StartAt": "Process CSV data", "States": { "Process CSV data": { "Type": "Task", "Resource": "arn:aws:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "Payload.$": "$", "FunctionName": "arn:aws:lambda:us-east-2:123456789012:function:distributedMapLambda" }, "End": true } } }, "Label": "Processdata", "End": true, "ResultWriter": { "Resource": "arn:aws:states:::s3:putObject", "Parameters": { "Bucket": "mapOutputs", "Prefix": "resultData" } }, "ItemSelector": { "index.$": "$$.Map.Item.Index", "value.$": "$$.Map.Item.Value" } } } }
  2. 为状态机指定一个名称。为此,请选择默认状态机名称旁边的编辑图标MyStateMachine。然后,找到状态机配置,在状态机名称框中指定一个名称。

    对于本教程,请输入名称 DistributedMapDemo

  3. (可选)在状态机配置中,指定其他工作流设置,例如状态机类型及其执行角色。

    在本教程中,请保留状态机配置中的所有默认选项。

  4. 确认角色创建对话框中,选择确认继续。

    您也可以选择查看角色设置,返回至状态机配置

    注意

    如果你删除 Step Functions 创建的IAM角色,Step Functions 以后将无法重新创建它。同样,如果您修改角色(例如,通过从IAM策略的主体中删除 Step Functions),Step Functions 以后将无法恢复其原始设置。

第 7 步:运行状态机

执行是状态机的一个实例,您可以在其中运行工作流来执行任务。

  1. DistributedMapDemo页面上,选择开始执行

  2. 启动执行对话框中,执行以下操作:

    1. (可选)输入自定义执行名称以覆盖生成的默认值。

      非ASCII姓名和日志

      Step Functions 接受状态机、执行、活动和包含非ASCII字符的标签的名称。由于此类字符不适用于亚马逊 CloudWatch,因此我们建议您仅使用ASCII字符,以便您可以跟踪中的指标 CloudWatch。

    2. (可选)在 “输入” 框中,按JSON格式输入输入值以运行您的工作流程。

    3. 选择启动执行

    4. Step Functions 控制台会将您引导到一个以您的执行 ID 为标题的页面。该页面被称为执行详细信息页面。在此页面上,您可以随着执行的进展或者在执行完成后查看执行结果。

      要查看执行结果,请在图表视图上选择各个状态,然后在步骤详细信息窗格中选择各个选项卡,分别查看每个状态的详细信息,包括输入、输出和定义。有关可在执行详细信息页面上查看的执行信息的详细信息,请参阅执行详情概述

    例如,选择 Map 状态,然后选择 Map Run,打开 Map Run 详细信息页面。在此页面上,您可以查看分布式 Map 状态的所有执行细节及其启动的子工作流执行。有关该页面的信息,请参阅查看地图运行情况