使用 Step Functions 创建活动状态机 - AWS Step Functions

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用 Step Functions 创建活动状态机

本教程向您展示如何使用 Java 创建基于活动的状态机以及 AWS Step Functions。 Activity 允许您控制在状态机其他地方运行的工作代码。有关概述,请参阅 在 Step Functions 中了解状态机中的了解 Step Functions 中的活动

要完成本教程,您需要:

  • SDK适用于 Java 的。本教程中的示例活动是一个 Java 应用程序,它使用 AWS SDK for Java 与之交流 AWS.

  • AWS 环境或标准中的凭证 AWS 配置文件。有关更多信息,请参阅设置您的 AWS中的凭证 AWS SDK for Java 开发者指南

第 1 步:创建活动

您必须让 Step Functions 知道某项活动,您希望创建该活动的工作线程(一个程序)。Step Functions 使用亚马逊资源名称 (ARN) 进行响应,该名称为活动建立了身份。可以使用此身份协调状态机与工作线程之间传递的信息。

重要

确保您的活动任务处于相同状态 AWS 账户作为你的状态机。

  1. Step Functions 控制台左侧的导航窗格中,选择活动

  2. 选择创建活动

  3. 输入活动的名称,例如 get-greeting,然后选择创建活动

  4. 创建活动任务后,请记下其内容ARN,如以下示例所示。

    arn:aws:states:us-east-1:123456789012:activity:get-greeting

第 2 步:创建状态机

创建一个状态机,该状态机确定什么时候调用活动以及什么时候工作线程执行其主要工作,收集其结果并返回这些结果。要创建状态机,您需要使用 Workflow Studio 的代码编辑器

  1. Step Functions 控制台左侧的导航窗格中,选择状态机

  2. 状态机页面上,选择创建状态机

  3. 选择模板对话框中,选择空白

  4. 选择 “选择” 以在中打开 “工作流工作室” 设计模式

  5. 在本教程中,你将在代码编辑器中编写状态机的 Amazon States Language (ASL) 定义。要执行此操作,请选择代码

  6. 删除现有的样板代码并粘贴以下代码。请记住将此代码ARN中的示例替换为您之前在该Resource字段中创建ARN的活动任务的示例。

    { "Comment": "An example using a Task state.", "StartAt": "getGreeting", "Version": "1.0", "TimeoutSeconds": 300, "States": { "getGreeting": { "Type": "Task", "Resource": "arn:aws:states:us-east-1:123456789012:activity:get-greeting", "End": true } } }

    这是使用 Amazon States Language (ASL) 对状态机的描述。它定义了名为 getGreeting 的单个 Task 状态。有关更多信息,请参阅状态机结构

  7. 在上图形可视化,确保您添加的ASL定义的工作流程图表与下图相似。

    带有 RunActivity 任务状态的状态机的图形可视化。
  8. 为状态机指定一个名称。为此,请选择默认状态机名称旁边的编辑图标MyStateMachine。然后,找到状态机配置,在状态机名称框中指定一个名称。

    对于本教程,请输入名称 ActivityStateMachine

  9. (可选)在状态机配置中,指定其他工作流设置,例如状态机类型及其执行角色。

    在本教程中,请保留状态机设置中的所有默认选项。

    如果您之前为状态机创建了一个具有正确权限的IAM角色并想使用该角色,请在权限选择 “选择现有角色”,然后从列表中选择一个角色。或者选择 “输入角色”,ARN然后ARN为该IAM角色提供。

  10. 确认角色创建对话框中,选择确认继续。

    您也可以选择查看角色设置,返回至状态机配置

    注意

    如果你删除 Step Functions 创建的IAM角色,Step Functions 以后将无法重新创建它。同样,如果您修改角色(例如,通过从IAM策略的主体中删除 Step Functions),Step Functions 以后将无法恢复其原始设置。

第 3 步:实现工作线程

创建工作线程。工作线程是负责以下事务的程序:

  • 使用 Step Functions 轮询是否存在使用该GetActivityTaskAPI操作的活动。

  • 使用您的代码执行活动的工作(例如,以下代码中的 getGreeting() 方法)。

  • 使用SendTaskSuccessSendTaskFailure、和SendTaskHeartbeatAPI操作返回结果。

注意

有关活动工作线程的更完整示例,请参阅示例:Ruby 中的活动工作器。该示例提供了一个基于最佳实操的实现,您可以参考它创建自己的活动工作线程。这段代码实现了消费者/生产者模型,并且允许对轮询器和活动工作线程的线程数进行配置。

实施工作线程

  1. 创建一个名为 GreeterActivities.java的文件。

  2. 将以下代码添加到其中。

    import com.amazonaws.ClientConfiguration; import com.amazonaws.auth.EnvironmentVariableCredentialsProvider; import com.amazonaws.regions.Regions; import com.amazonaws.services.stepfunctions.AWSStepFunctions; import com.amazonaws.services.stepfunctions.AWSStepFunctionsClientBuilder; import com.amazonaws.services.stepfunctions.model.GetActivityTaskRequest; import com.amazonaws.services.stepfunctions.model.GetActivityTaskResult; import com.amazonaws.services.stepfunctions.model.SendTaskFailureRequest; import com.amazonaws.services.stepfunctions.model.SendTaskSuccessRequest; import com.amazonaws.util.json.Jackson; import com.fasterxml.jackson.databind.JsonNode; import java.util.concurrent.TimeUnit; public class GreeterActivities { public String getGreeting(String who) throws Exception { return "{\"Hello\": \"" + who + "\"}"; } public static void main(final String[] args) throws Exception { GreeterActivities greeterActivities = new GreeterActivities(); ClientConfiguration clientConfiguration = new ClientConfiguration(); clientConfiguration.setSocketTimeout((int)TimeUnit.SECONDS.toMillis(70)); AWSStepFunctions client = AWSStepFunctionsClientBuilder.standard() .withRegion(Regions.US_EAST_1) .withCredentials(new EnvironmentVariableCredentialsProvider()) .withClientConfiguration(clientConfiguration) .build(); while (true) { GetActivityTaskResult getActivityTaskResult = client.getActivityTask( new GetActivityTaskRequest().withActivityArn(ACTIVITY_ARN)); if (getActivityTaskResult.getTaskToken() != null) { try { JsonNode json = Jackson.jsonNodeOf(getActivityTaskResult.getInput()); String greetingResult = greeterActivities.getGreeting(json.get("who").textValue()); client.sendTaskSuccess( new SendTaskSuccessRequest().withOutput( greetingResult).withTaskToken(getActivityTaskResult.getTaskToken())); } catch (Exception e) { client.sendTaskFailure(new SendTaskFailureRequest().withTaskToken( getActivityTaskResult.getTaskToken())); } } else { Thread.sleep(1000); } } } }
    注意

    该示例中的 EnvironmentVariableCredentialsProvider 类假定已设置了 AWS_ACCESS_KEY_ID(或 AWS_ACCESS_KEY)和 AWS_SECRET_KEY(或 AWS_SECRET_ACCESS_KEY)环境变量。有关向工厂提供所需凭证的更多信息,请参阅AWSCredentialsProvider中的 AWS SDK for Java API参考设置 AWS 该领域的资质和发展区域 AWS SDK for Java 开发者指南

    默认情况下 AWS SDK将等待最多 50 秒钟才能从服务器接收任何操作的数据。GetActivityTask 操作是一个长时间运行的轮询操作,对于下一个可用任务将等待多达 60 秒。为防止收到SocketTimeoutException错误,请设置 SocketTimeout 为 70 秒。

  3. GetActivityTaskRequest().withActivityArn()构造函数的参数列表中,将该ACTIVITY_ARN值替换为您之前创建ARN的活动任务的值。

第 4 步:运行状态机

开始执行状态机时,您的工作线程在 Step Functions 中轮询活动,执行其工作(使用您提供的输入)并返回其结果。

  1. 在存储库的 ActivityStateMachine页面上,选择开始执行

    随即显示启动执行对话框。

  2. 启动执行对话框中,执行以下操作:

    1. (可选)输入自定义执行名称以覆盖生成的默认执行名称。

      非ASCII姓名和日志

      Step Functions 接受状态机、执行、活动和包含非ASCII字符的标签的名称。由于此类字符不适用于亚马逊 CloudWatch,因此我们建议您仅使用ASCII字符,以便您可以跟踪中的指标 CloudWatch。

    2. 在 “输入” 框中,输入以下JSON输入以运行您的工作流程。

      { "who": "AWS Step Functions" }
    3. 选择启动执行

    4. Step Functions 控制台会将您引导到一个以您的执行 ID 为标题的页面。该页面被称为执行详细信息页面。在此页面上,您可以随着执行的进展或者在执行完成后查看执行结果。

      要查看执行结果,请在图表视图上选择各个状态,然后在步骤详细信息窗格中选择各个选项卡,分别查看每个状态的详细信息,包括输入、输出和定义。有关可在执行详细信息页面上查看的执行信息的详细信息,请参阅执行详情概述

第 5 步:运行和停止工作线程

要让工作线程在您的状态机中轮询活动,必须运行工作线程。

  1. 在命令行上,导航到您在其中创建 GreeterActivities.java 的目录。

  2. 要再次使用 AWS SDK,将libthird-party目录的完整路径添加到生成文件的依赖项和 Java 中CLASSPATH。有关更多信息,请参阅SDK中的 “下载和解压缩” AWS SDK for Java 开发者指南

  3. 编译文件。

    $ javac GreeterActivities.java
  4. 运行文件。

    $ java GreeterActivities
  5. Step Functions 控制台中,导航到执行详细信息页面。

  6. 执行完成后,检查执行结果。

  7. 停止工作线程。