通过以下方式学习 Step Functions 的基础知识 AWS SDK - AWS SDK代码示例

AWS 文档 AWS SDK示例 GitHub 存储库中还有更多SDK示例

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

通过以下方式学习 Step Functions 的基础知识 AWS SDK

以下代码示例演示了如何:

  • 创建活动。

  • 根据包含先前创建的活动作为步骤的 Amazon States Language 定义创建状态机。

  • 运行状态机并使用用户输入响应活动。

  • 运行完成后获取最终状态和输出,然后清理资源。

.NET
AWS SDK for .NET
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

在命令提示符中运行交互式场景。

global using System.Text.Json; global using Amazon.StepFunctions; global using Microsoft.Extensions.Configuration; global using Microsoft.Extensions.DependencyInjection; global using Microsoft.Extensions.Hosting; global using Microsoft.Extensions.Logging; global using Microsoft.Extensions.Logging.Console; global using Microsoft.Extensions.Logging.Debug; global using StepFunctionsActions; global using LogLevel = Microsoft.Extensions.Logging.LogLevel; using Amazon.IdentityManagement; using Amazon.IdentityManagement.Model; using Amazon.StepFunctions.Model; namespace StepFunctionsBasics; public class StepFunctionsBasics { private static ILogger _logger = null!; private static IConfigurationRoot _configuration = null!; private static IAmazonIdentityManagementService _iamService = null!; static async Task Main(string[] args) { // Set up dependency injection for AWS Step Functions. using var host = Host.CreateDefaultBuilder(args) .ConfigureLogging(logging => logging.AddFilter("System", LogLevel.Debug) .AddFilter<DebugLoggerProvider>("Microsoft", LogLevel.Information) .AddFilter<ConsoleLoggerProvider>("Microsoft", LogLevel.Trace)) .ConfigureServices((_, services) => services.AddAWSService<IAmazonStepFunctions>() .AddAWSService<IAmazonIdentityManagementService>() .AddTransient<StepFunctionsWrapper>() ) .Build(); _logger = LoggerFactory.Create(builder => { builder.AddConsole(); }) .CreateLogger<StepFunctionsBasics>(); // Load configuration settings. _configuration = new ConfigurationBuilder() .SetBasePath(Directory.GetCurrentDirectory()) .AddJsonFile("settings.json") // Load test settings from .json file. .AddJsonFile("settings.local.json", true) // Optionally load local settings. .Build(); var activityName = _configuration["ActivityName"]; var stateMachineName = _configuration["StateMachineName"]; var roleName = _configuration["RoleName"]; var repoBaseDir = _configuration["RepoBaseDir"]; var jsonFilePath = _configuration["JsonFilePath"]; var jsonFileName = _configuration["JsonFileName"]; var uiMethods = new UiMethods(); var stepFunctionsWrapper = host.Services.GetRequiredService<StepFunctionsWrapper>(); _iamService = host.Services.GetRequiredService<IAmazonIdentityManagementService>(); // Load definition for the state machine from a JSON file. var stateDefinitionJson = File.ReadAllText($"{repoBaseDir}{jsonFilePath}{jsonFileName}"); Console.Clear(); uiMethods.DisplayOverview(); uiMethods.PressEnter(); uiMethods.DisplayTitle("Create activity"); Console.WriteLine("Let's start by creating an activity."); string activityArn; string stateMachineArn; // Check to see if the activity already exists. var activityList = await stepFunctionsWrapper.ListActivitiesAsync(); var existingActivity = activityList.FirstOrDefault(activity => activity.Name == activityName); if (existingActivity is not null) { activityArn = existingActivity.ActivityArn; Console.WriteLine($"Activity, {activityName}, already exists."); } else { activityArn = await stepFunctionsWrapper.CreateActivity(activityName); } // Swap the placeholder in the JSON file with the Amazon Resource Name (ARN) // of the recently created activity. var stateDefinition = stateDefinitionJson.Replace("{{DOC_EXAMPLE_ACTIVITY_ARN}}", activityArn); uiMethods.DisplayTitle("Create state machine"); Console.WriteLine("Now we'll create a state machine."); // Find or create an IAM role that can be assumed by Step Functions. var role = await GetOrCreateStateMachineRole(roleName); // See if the state machine already exists. var stateMachineList = await stepFunctionsWrapper.ListStateMachinesAsync(); var existingStateMachine = stateMachineList.FirstOrDefault(stateMachine => stateMachine.Name == stateMachineName); if (existingStateMachine is not null) { Console.WriteLine($"State machine, {stateMachineName}, already exists."); stateMachineArn = existingStateMachine.StateMachineArn; } else { // Create the state machine. stateMachineArn = await stepFunctionsWrapper.CreateStateMachine(stateMachineName, stateDefinition, role.Arn); uiMethods.PressEnter(); } Console.WriteLine("The state machine has been created."); var describeStateMachineResponse = await stepFunctionsWrapper.DescribeStateMachineAsync(stateMachineArn); Console.WriteLine($"{describeStateMachineResponse.Name}\t{describeStateMachineResponse.StateMachineArn}"); Console.WriteLine($"Current status: {describeStateMachineResponse.Status}"); Console.WriteLine($"Amazon Resource Name (ARN) of the role assumed by the state machine: {describeStateMachineResponse.RoleArn}"); var userName = string.Empty; Console.Write("Before we start the state machine, tell me what should ChatSFN call you? "); userName = Console.ReadLine(); // Keep asking until the user enters a string value. while (string.IsNullOrEmpty(userName)) { Console.Write("Enter your name: "); userName = Console.ReadLine(); } var executionJson = @"{""name"": """ + userName + @"""}"; // Start the state machine execution. Console.WriteLine("Now we'll start execution of the state machine."); var executionArn = await stepFunctionsWrapper.StartExecutionAsync(executionJson, stateMachineArn); Console.WriteLine("State machine started."); Console.WriteLine($"Thank you, {userName}. Now let's get started..."); uiMethods.PressEnter(); uiMethods.DisplayTitle("ChatSFN"); var isDone = false; var response = new GetActivityTaskResponse(); var taskToken = string.Empty; var userChoice = string.Empty; while (!isDone) { response = await stepFunctionsWrapper.GetActivityTaskAsync(activityArn, "MvpWorker"); taskToken = response.TaskToken; // Parse the returned JSON string. var taskJsonResponse = JsonDocument.Parse(response.Input); var taskJsonObject = taskJsonResponse.RootElement; var message = taskJsonObject.GetProperty("message").GetString(); var actions = taskJsonObject.GetProperty("actions").EnumerateArray().Select(x => x.ToString()).ToList(); Console.WriteLine($"\n{message}\n"); // Prompt the user for another choice. Console.WriteLine("ChatSFN: What would you like me to do?"); actions.ForEach(action => Console.WriteLine($"\t{action}")); Console.Write($"\n{userName}, tell me your choice: "); userChoice = Console.ReadLine(); if (userChoice?.ToLower() == "done") { isDone = true; } Console.WriteLine($"You have selected: {userChoice}"); var jsonResponse = @"{""action"": """ + userChoice + @"""}"; await stepFunctionsWrapper.SendTaskSuccessAsync(taskToken, jsonResponse); } await stepFunctionsWrapper.StopExecution(executionArn); Console.WriteLine("Now we will wait for the execution to stop."); DescribeExecutionResponse executionResponse; do { executionResponse = await stepFunctionsWrapper.DescribeExecutionAsync(executionArn); } while (executionResponse.Status == ExecutionStatus.RUNNING); Console.WriteLine("State machine stopped."); uiMethods.PressEnter(); uiMethods.DisplayTitle("State machine executions"); Console.WriteLine("Now let's take a look at the execution values for the state machine."); // List the executions. var executions = await stepFunctionsWrapper.ListExecutionsAsync(stateMachineArn); uiMethods.DisplayTitle("Step function execution values"); executions.ForEach(execution => { Console.WriteLine($"{execution.Name}\t{execution.StartDate} to {execution.StopDate}"); }); uiMethods.PressEnter(); // Now delete the state machine and the activity. uiMethods.DisplayTitle("Clean up resources"); Console.WriteLine("Deleting the state machine..."); await stepFunctionsWrapper.DeleteStateMachine(stateMachineArn); Console.WriteLine("State machine deleted."); Console.WriteLine("Deleting the activity..."); await stepFunctionsWrapper.DeleteActivity(activityArn); Console.WriteLine("Activity deleted."); Console.WriteLine("The Amazon Step Functions scenario is now complete."); } static async Task<Role> GetOrCreateStateMachineRole(string roleName) { // Define the policy document for the role. var stateMachineRolePolicy = @"{ ""Version"": ""2012-10-17"", ""Statement"": [{ ""Sid"": """", ""Effect"": ""Allow"", ""Principal"": { ""Service"": ""states.amazonaws.com""}, ""Action"": ""sts:AssumeRole""}]}"; var role = new Role(); var roleExists = false; try { var getRoleResponse = await _iamService.GetRoleAsync(new GetRoleRequest { RoleName = roleName }); roleExists = true; role = getRoleResponse.Role; } catch (NoSuchEntityException) { // The role doesn't exist. Create it. Console.WriteLine($"Role, {roleName} doesn't exist. Creating it..."); } if (!roleExists) { var request = new CreateRoleRequest { RoleName = roleName, AssumeRolePolicyDocument = stateMachineRolePolicy, }; var createRoleResponse = await _iamService.CreateRoleAsync(request); role = createRoleResponse.Role; } return role; } } namespace StepFunctionsBasics; /// <summary> /// Some useful methods to make screen display easier. /// </summary> public class UiMethods { private readonly string _sepBar = new('-', Console.WindowWidth); /// <summary> /// Show information about the scenario. /// </summary> public void DisplayOverview() { Console.Clear(); DisplayTitle("Welcome to the AWS Step Functions Demo"); Console.WriteLine("This example application will do the following:"); Console.WriteLine("\t 1. Create an activity."); Console.WriteLine("\t 2. Create a state machine."); Console.WriteLine("\t 3. Start an execution."); Console.WriteLine("\t 4. Run the worker, then stop it."); Console.WriteLine("\t 5. List executions."); Console.WriteLine("\t 6. Clean up the resources created for the example."); } /// <summary> /// Display a message and wait until the user presses enter. /// </summary> public void PressEnter() { Console.Write("\nPress <Enter> to continue."); _ = Console.ReadLine(); } /// <summary> /// Pad a string with spaces to center it on the console display. /// </summary> /// <param name="strToCenter"></param> /// <returns></returns> private string CenterString(string strToCenter) { var padAmount = (Console.WindowWidth - strToCenter.Length) / 2; var leftPad = new string(' ', padAmount); return $"{leftPad}{strToCenter}"; } /// <summary> /// Display a line of hyphens, the centered text of the title, and another /// line of hyphens. /// </summary> /// <param name="strTitle">The string to be displayed.</param> public void DisplayTitle(string strTitle) { Console.WriteLine(_sepBar); Console.WriteLine(CenterString(strTitle)); Console.WriteLine(_sepBar); } }

定义一个包装状态机和活动操作的类。

namespace StepFunctionsActions; using Amazon.StepFunctions; using Amazon.StepFunctions.Model; /// <summary> /// Wrapper that performs AWS Step Functions actions. /// </summary> public class StepFunctionsWrapper { private readonly IAmazonStepFunctions _amazonStepFunctions; /// <summary> /// The constructor for the StepFunctionsWrapper. Initializes the /// client object passed to it. /// </summary> /// <param name="amazonStepFunctions">An initialized Step Functions client object.</param> public StepFunctionsWrapper(IAmazonStepFunctions amazonStepFunctions) { _amazonStepFunctions = amazonStepFunctions; } /// <summary> /// Create a Step Functions activity using the supplied name. /// </summary> /// <param name="activityName">The name for the new Step Functions activity.</param> /// <returns>The Amazon Resource Name (ARN) for the new activity.</returns> public async Task<string> CreateActivity(string activityName) { var response = await _amazonStepFunctions.CreateActivityAsync(new CreateActivityRequest { Name = activityName }); return response.ActivityArn; } /// <summary> /// Create a Step Functions state machine. /// </summary> /// <param name="stateMachineName">Name for the new Step Functions state /// machine.</param> /// <param name="definition">A JSON string that defines the Step Functions /// state machine.</param> /// <param name="roleArn">The Amazon Resource Name (ARN) of the role.</param> /// <returns></returns> public async Task<string> CreateStateMachine(string stateMachineName, string definition, string roleArn) { var request = new CreateStateMachineRequest { Name = stateMachineName, Definition = definition, RoleArn = roleArn }; var response = await _amazonStepFunctions.CreateStateMachineAsync(request); return response.StateMachineArn; } /// <summary> /// Delete a Step Machine activity. /// </summary> /// <param name="activityArn">The Amazon Resource Name (ARN) of /// the activity.</param> /// <returns>A Boolean value indicating the success of the action.</returns> public async Task<bool> DeleteActivity(string activityArn) { var response = await _amazonStepFunctions.DeleteActivityAsync(new DeleteActivityRequest { ActivityArn = activityArn }); return response.HttpStatusCode == System.Net.HttpStatusCode.OK; } /// <summary> /// Delete a Step Functions state machine. /// </summary> /// <param name="stateMachineArn">The Amazon Resource Name (ARN) of the /// state machine.</param> /// <returns>A Boolean value indicating the success of the action.</returns> public async Task<bool> DeleteStateMachine(string stateMachineArn) { var response = await _amazonStepFunctions.DeleteStateMachineAsync(new DeleteStateMachineRequest { StateMachineArn = stateMachineArn }); return response.HttpStatusCode == System.Net.HttpStatusCode.OK; } /// <summary> /// Retrieve information about the specified Step Functions execution. /// </summary> /// <param name="executionArn">The Amazon Resource Name (ARN) of the /// Step Functions execution.</param> /// <returns>The API response returned by the API.</returns> public async Task<DescribeExecutionResponse> DescribeExecutionAsync(string executionArn) { var response = await _amazonStepFunctions.DescribeExecutionAsync(new DescribeExecutionRequest { ExecutionArn = executionArn }); return response; } /// <summary> /// Retrieve information about the specified Step Functions state machine. /// </summary> /// <param name="StateMachineArn">The Amazon Resource Name (ARN) of the /// Step Functions state machine to retrieve.</param> /// <returns>Information about the specified Step Functions state machine.</returns> public async Task<DescribeStateMachineResponse> DescribeStateMachineAsync(string StateMachineArn) { var response = await _amazonStepFunctions.DescribeStateMachineAsync(new DescribeStateMachineRequest { StateMachineArn = StateMachineArn }); return response; } /// <summary> /// Retrieve a task with the specified Step Functions activity /// with the specified Amazon Resource Name (ARN). /// </summary> /// <param name="activityArn">The Amazon Resource Name (ARN) of /// the Step Functions activity.</param> /// <param name="workerName">The name of the Step Functions worker.</param> /// <returns>The response from the Step Functions activity.</returns> public async Task<GetActivityTaskResponse> GetActivityTaskAsync(string activityArn, string workerName) { var response = await _amazonStepFunctions.GetActivityTaskAsync(new GetActivityTaskRequest { ActivityArn = activityArn, WorkerName = workerName }); return response; } /// <summary> /// List the Step Functions activities for the current account. /// </summary> /// <returns>A list of ActivityListItems.</returns> public async Task<List<ActivityListItem>> ListActivitiesAsync() { var request = new ListActivitiesRequest(); var activities = new List<ActivityListItem>(); do { var response = await _amazonStepFunctions.ListActivitiesAsync(request); if (response.NextToken is not null) { request.NextToken = response.NextToken; } activities.AddRange(response.Activities); } while (request.NextToken is not null); return activities; } /// <summary> /// Retrieve information about executions of a Step Functions /// state machine. /// </summary> /// <param name="stateMachineArn">The Amazon Resource Name (ARN) of the /// Step Functions state machine.</param> /// <returns>A list of ExecutionListItem objects.</returns> public async Task<List<ExecutionListItem>> ListExecutionsAsync(string stateMachineArn) { var executions = new List<ExecutionListItem>(); ListExecutionsResponse response; var request = new ListExecutionsRequest { StateMachineArn = stateMachineArn }; do { response = await _amazonStepFunctions.ListExecutionsAsync(request); executions.AddRange(response.Executions); if (response.NextToken is not null) { request.NextToken = response.NextToken; } } while (response.NextToken is not null); return executions; } /// <summary> /// Retrieve a list of Step Functions state machines. /// </summary> /// <returns>A list of StateMachineListItem objects.</returns> public async Task<List<StateMachineListItem>> ListStateMachinesAsync() { var stateMachines = new List<StateMachineListItem>(); var listStateMachinesPaginator = _amazonStepFunctions.Paginators.ListStateMachines(new ListStateMachinesRequest()); await foreach (var response in listStateMachinesPaginator.Responses) { stateMachines.AddRange(response.StateMachines); } return stateMachines; } /// <summary> /// Indicate that the Step Functions task, indicated by the /// task token, has completed successfully. /// </summary> /// <param name="taskToken">Identifies the task.</param> /// <param name="taskResponse">The response received from executing the task.</param> /// <returns>A Boolean value indicating the success of the action.</returns> public async Task<bool> SendTaskSuccessAsync(string taskToken, string taskResponse) { var response = await _amazonStepFunctions.SendTaskSuccessAsync(new SendTaskSuccessRequest { TaskToken = taskToken, Output = taskResponse }); return response.HttpStatusCode == System.Net.HttpStatusCode.OK; } /// <summary> /// Start execution of an AWS Step Functions state machine. /// </summary> /// <param name="executionName">The name to use for the execution.</param> /// <param name="executionJson">The JSON string to pass for execution.</param> /// <param name="stateMachineArn">The Amazon Resource Name (ARN) of the /// Step Functions state machine.</param> /// <returns>The Amazon Resource Name (ARN) of the AWS Step Functions /// execution.</returns> public async Task<string> StartExecutionAsync(string executionJson, string stateMachineArn) { var executionRequest = new StartExecutionRequest { Input = executionJson, StateMachineArn = stateMachineArn }; var response = await _amazonStepFunctions.StartExecutionAsync(executionRequest); return response.ExecutionArn; } /// <summary> /// Stop execution of a Step Functions workflow. /// </summary> /// <param name="executionArn">The Amazon Resource Name (ARN) of /// the Step Functions execution to stop.</param> /// <returns>A Boolean value indicating the success of the action.</returns> public async Task<bool> StopExecution(string executionArn) { var response = await _amazonStepFunctions.StopExecutionAsync(new StopExecutionRequest { ExecutionArn = executionArn }); return response.HttpStatusCode == System.Net.HttpStatusCode.OK; } }
Java
SDK适用于 Java 2.x
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

/** * You can obtain the JSON file to create a state machine in the following * GitHub location. * * https://github.com/awsdocs/aws-doc-sdk-examples/tree/main/resources/sample_files * * To run this code example, place the chat_sfn_state_machine.json file into * your project's resources folder. * * Also, set up your development environment, including your credentials. * * For information, see this documentation topic: * * https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/get-started.html * * This Java code example performs the following tasks: * * 1. Creates an activity. * 2. Creates a state machine. * 3. Describes the state machine. * 4. Starts execution of the state machine and interacts with it. * 5. Describes the execution. * 6. Delete the activity. * 7. Deletes the state machine. */ public class StepFunctionsScenario { public static final String DASHES = new String(new char[80]).replace("\0", "-"); public static void main(String[] args) throws Exception { final String usage = """ Usage: <roleARN> <activityName> <stateMachineName> Where: roleName - The name of the IAM role to create for this state machine. activityName - The name of an activity to create. stateMachineName - The name of the state machine to create. """; if (args.length != 3) { System.out.println(usage); System.exit(1); } String roleName = args[0]; String activityName = args[1]; String stateMachineName = args[2]; String polJSON = "{\n" + " \"Version\": \"2012-10-17\",\n" + " \"Statement\": [\n" + " {\n" + " \"Sid\": \"\",\n" + " \"Effect\": \"Allow\",\n" + " \"Principal\": {\n" + " \"Service\": \"states.amazonaws.com\"\n" + " },\n" + " \"Action\": \"sts:AssumeRole\"\n" + " }\n" + " ]\n" + "}"; Scanner sc = new Scanner(System.in); boolean action = false; Region region = Region.US_EAST_1; SfnClient sfnClient = SfnClient.builder() .region(region) .build(); Region regionGl = Region.AWS_GLOBAL; IamClient iam = IamClient.builder() .region(regionGl) .build(); System.out.println(DASHES); System.out.println("Welcome to the AWS Step Functions example scenario."); System.out.println(DASHES); System.out.println(DASHES); System.out.println("1. Create an activity."); String activityArn = createActivity(sfnClient, activityName); System.out.println("The ARN of the activity is " + activityArn); System.out.println(DASHES); // Get JSON to use for the state machine and place the activityArn value into // it. InputStream input = StepFunctionsScenario.class.getClassLoader() .getResourceAsStream("chat_sfn_state_machine.json"); ObjectMapper mapper = new ObjectMapper(); JsonNode jsonNode = mapper.readValue(input, JsonNode.class); String jsonString = mapper.writeValueAsString(jsonNode); // Modify the Resource node. ObjectMapper objectMapper = new ObjectMapper(); JsonNode root = objectMapper.readTree(jsonString); ((ObjectNode) root.path("States").path("GetInput")).put("Resource", activityArn); // Convert the modified Java object back to a JSON string. String stateDefinition = objectMapper.writeValueAsString(root); System.out.println(stateDefinition); System.out.println(DASHES); System.out.println("2. Create a state machine."); String roleARN = createIAMRole(iam, roleName, polJSON); String stateMachineArn = createMachine(sfnClient, roleARN, stateMachineName, stateDefinition); System.out.println("The ARN of the state machine is " + stateMachineArn); System.out.println(DASHES); System.out.println(DASHES); System.out.println("3. Describe the state machine."); describeStateMachine(sfnClient, stateMachineArn); System.out.println("What should ChatSFN call you?"); String userName = sc.nextLine(); System.out.println("Hello " + userName); System.out.println(DASHES); System.out.println(DASHES); // The JSON to pass to the StartExecution call. String executionJson = "{ \"name\" : \"" + userName + "\" }"; System.out.println(executionJson); System.out.println("4. Start execution of the state machine and interact with it."); String runArn = startWorkflow(sfnClient, stateMachineArn, executionJson); System.out.println("The ARN of the state machine execution is " + runArn); List<String> myList; while (!action) { myList = getActivityTask(sfnClient, activityArn); System.out.println("ChatSFN: " + myList.get(1)); System.out.println(userName + " please specify a value."); String myAction = sc.nextLine(); if (myAction.compareTo("done") == 0) action = true; System.out.println("You have selected " + myAction); String taskJson = "{ \"action\" : \"" + myAction + "\" }"; System.out.println(taskJson); sendTaskSuccess(sfnClient, myList.get(0), taskJson); } System.out.println(DASHES); System.out.println(DASHES); System.out.println("5. Describe the execution."); describeExe(sfnClient, runArn); System.out.println(DASHES); System.out.println(DASHES); System.out.println("6. Delete the activity."); deleteActivity(sfnClient, activityArn); System.out.println(DASHES); System.out.println(DASHES); System.out.println("7. Delete the state machines."); deleteMachine(sfnClient, stateMachineArn); System.out.println(DASHES); System.out.println(DASHES); System.out.println("The AWS Step Functions example scenario is complete."); System.out.println(DASHES); } public static String createIAMRole(IamClient iam, String rolename, String polJSON) { try { CreateRoleRequest request = CreateRoleRequest.builder() .roleName(rolename) .assumeRolePolicyDocument(polJSON) .description("Created using the AWS SDK for Java") .build(); CreateRoleResponse response = iam.createRole(request); return response.role().arn(); } catch (IamException e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); } return ""; } public static void describeExe(SfnClient sfnClient, String executionArn) { try { DescribeExecutionRequest executionRequest = DescribeExecutionRequest.builder() .executionArn(executionArn) .build(); String status = ""; boolean hasSucceeded = false; while (!hasSucceeded) { DescribeExecutionResponse response = sfnClient.describeExecution(executionRequest); status = response.statusAsString(); if (status.compareTo("RUNNING") == 0) { System.out.println("The state machine is still running, let's wait for it to finish."); Thread.sleep(2000); } else if (status.compareTo("SUCCEEDED") == 0) { System.out.println("The Step Function workflow has succeeded"); hasSucceeded = true; } else { System.out.println("The Status is neither running or succeeded"); } } System.out.println("The Status is " + status); } catch (SfnException | InterruptedException e) { System.err.println(e.getMessage()); System.exit(1); } } public static void sendTaskSuccess(SfnClient sfnClient, String token, String json) { try { SendTaskSuccessRequest successRequest = SendTaskSuccessRequest.builder() .taskToken(token) .output(json) .build(); sfnClient.sendTaskSuccess(successRequest); } catch (SfnException e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); } } public static List<String> getActivityTask(SfnClient sfnClient, String actArn) { List<String> myList = new ArrayList<>(); GetActivityTaskRequest getActivityTaskRequest = GetActivityTaskRequest.builder() .activityArn(actArn) .build(); GetActivityTaskResponse response = sfnClient.getActivityTask(getActivityTaskRequest); myList.add(response.taskToken()); myList.add(response.input()); return myList; } public static void deleteActivity(SfnClient sfnClient, String actArn) { try { DeleteActivityRequest activityRequest = DeleteActivityRequest.builder() .activityArn(actArn) .build(); sfnClient.deleteActivity(activityRequest); System.out.println("You have deleted " + actArn); } catch (SfnException e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); } } public static void describeStateMachine(SfnClient sfnClient, String stateMachineArn) { try { DescribeStateMachineRequest stateMachineRequest = DescribeStateMachineRequest.builder() .stateMachineArn(stateMachineArn) .build(); DescribeStateMachineResponse response = sfnClient.describeStateMachine(stateMachineRequest); System.out.println("The name of the State machine is " + response.name()); System.out.println("The status of the State machine is " + response.status()); System.out.println("The ARN value of the State machine is " + response.stateMachineArn()); System.out.println("The role ARN value is " + response.roleArn()); } catch (SfnException e) { System.err.println(e.getMessage()); } } public static void deleteMachine(SfnClient sfnClient, String stateMachineArn) { try { DeleteStateMachineRequest deleteStateMachineRequest = DeleteStateMachineRequest.builder() .stateMachineArn(stateMachineArn) .build(); sfnClient.deleteStateMachine(deleteStateMachineRequest); DescribeStateMachineRequest describeStateMachine = DescribeStateMachineRequest.builder() .stateMachineArn(stateMachineArn) .build(); while (true) { DescribeStateMachineResponse response = sfnClient.describeStateMachine(describeStateMachine); System.out.println("The state machine is not deleted yet. The status is " + response.status()); Thread.sleep(3000); } } catch (SfnException | InterruptedException e) { System.err.println(e.getMessage()); } System.out.println(stateMachineArn + " was successfully deleted."); } public static String startWorkflow(SfnClient sfnClient, String stateMachineArn, String jsonEx) { UUID uuid = UUID.randomUUID(); String uuidValue = uuid.toString(); try { StartExecutionRequest executionRequest = StartExecutionRequest.builder() .input(jsonEx) .stateMachineArn(stateMachineArn) .name(uuidValue) .build(); StartExecutionResponse response = sfnClient.startExecution(executionRequest); return response.executionArn(); } catch (SfnException e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); } return ""; } public static String createMachine(SfnClient sfnClient, String roleARN, String stateMachineName, String json) { try { CreateStateMachineRequest machineRequest = CreateStateMachineRequest.builder() .definition(json) .name(stateMachineName) .roleArn(roleARN) .type(StateMachineType.STANDARD) .build(); CreateStateMachineResponse response = sfnClient.createStateMachine(machineRequest); return response.stateMachineArn(); } catch (SfnException e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); } return ""; } public static String createActivity(SfnClient sfnClient, String activityName) { try { CreateActivityRequest activityRequest = CreateActivityRequest.builder() .name(activityName) .build(); CreateActivityResponse response = sfnClient.createActivity(activityRequest); return response.activityArn(); } catch (SfnException e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); } return ""; } }
Kotlin
SDK对于 Kotlin 来说
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

import aws.sdk.kotlin.services.iam.IamClient import aws.sdk.kotlin.services.iam.model.CreateRoleRequest import aws.sdk.kotlin.services.sfn.SfnClient import aws.sdk.kotlin.services.sfn.model.CreateActivityRequest import aws.sdk.kotlin.services.sfn.model.CreateStateMachineRequest import aws.sdk.kotlin.services.sfn.model.DeleteActivityRequest import aws.sdk.kotlin.services.sfn.model.DeleteStateMachineRequest import aws.sdk.kotlin.services.sfn.model.DescribeExecutionRequest import aws.sdk.kotlin.services.sfn.model.DescribeStateMachineRequest import aws.sdk.kotlin.services.sfn.model.GetActivityTaskRequest import aws.sdk.kotlin.services.sfn.model.ListActivitiesRequest import aws.sdk.kotlin.services.sfn.model.ListStateMachinesRequest import aws.sdk.kotlin.services.sfn.model.SendTaskSuccessRequest import aws.sdk.kotlin.services.sfn.model.StartExecutionRequest import aws.sdk.kotlin.services.sfn.model.StateMachineType import aws.sdk.kotlin.services.sfn.paginators.listActivitiesPaginated import aws.sdk.kotlin.services.sfn.paginators.listStateMachinesPaginated import com.fasterxml.jackson.databind.JsonNode import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.databind.node.ObjectNode import kotlinx.coroutines.flow.transform import java.util.Scanner import java.util.UUID import kotlin.collections.ArrayList import kotlin.system.exitProcess /** To run this code example, place the chat_sfn_state_machine.json file into your project's resources folder. You can obtain the JSON file to create a state machine in the following GitHub location: https://github.com/awsdocs/aws-doc-sdk-examples/tree/main/resources/sample_files Before running this Kotlin code example, set up your development environment, including your credentials. For more information, see the following documentation topic: https://docs.aws.amazon.com/sdk-for-kotlin/latest/developer-guide/setup.html This Kotlin code example performs the following tasks: 1. List activities using a paginator. 2. List state machines using a paginator. 3. Creates an activity. 4. Creates a state machine. 5. Describes the state machine. 6. Starts execution of the state machine and interacts with it. 7. Describes the execution. 8. Deletes the activity. 9. Deletes the state machine. */ val DASHES: String = String(CharArray(80)).replace("\u0000", "-") suspend fun main(args: Array<String>) { val usage = """ Usage: <roleARN> <activityName> <stateMachineName> Where: roleName - The name of the IAM role to create for this state machine. activityName - The name of an activity to create. stateMachineName - The name of the state machine to create. """ if (args.size != 3) { println(usage) exitProcess(0) } val roleName = args[0] val activityName = args[1] val stateMachineName = args[2] val sc = Scanner(System.`in`) var action = false val polJSON = """{ "Version": "2012-10-17", "Statement": [ { "Sid": "", "Effect": "Allow", "Principal": { "Service": "states.amazonaws.com" }, "Action": "sts:AssumeRole" } ] }""" println(DASHES) println("Welcome to the AWS Step Functions example scenario.") println(DASHES) println(DASHES) println("1. List activities using a Paginator.") listActivitesPagnator() println(DASHES) println(DASHES) println("2. List state machines using a paginator.") listStatemachinesPagnator() println(DASHES) println(DASHES) println("3. Create a new activity.") val activityArn = createActivity(activityName) println("The ARN of the Activity is $activityArn") println(DASHES) // Get JSON to use for the state machine and place the activityArn value into it. val stream = GetStream() val jsonString = stream.getStream() // Modify the Resource node. val objectMapper = ObjectMapper() val root: JsonNode = objectMapper.readTree(jsonString) (root.path("States").path("GetInput") as ObjectNode).put("Resource", activityArn) // Convert the modified Java object back to a JSON string. val stateDefinition = objectMapper.writeValueAsString(root) println(stateDefinition) println(DASHES) println("4. Create a state machine.") val roleARN = createIAMRole(roleName, polJSON) val stateMachineArn = createMachine(roleARN, stateMachineName, stateDefinition) println("The ARN of the state machine is $stateMachineArn") println(DASHES) println(DASHES) println("5. Describe the state machine.") describeStateMachine(stateMachineArn) println("What should ChatSFN call you?") val userName = sc.nextLine() println("Hello $userName") println(DASHES) println(DASHES) // The JSON to pass to the StartExecution call. val executionJson = "{ \"name\" : \"$userName\" }" println(executionJson) println("6. Start execution of the state machine and interact with it.") val runArn = startWorkflow(stateMachineArn, executionJson) println("The ARN of the state machine execution is $runArn") var myList: List<String> while (!action) { myList = getActivityTask(activityArn) println("ChatSFN: " + myList[1]) println("$userName please specify a value.") val myAction = sc.nextLine() if (myAction.compareTo("done") == 0) { action = true } println("You have selected $myAction") val taskJson = "{ \"action\" : \"$myAction\" }" println(taskJson) sendTaskSuccess(myList[0], taskJson) } println(DASHES) println(DASHES) println("7. Describe the execution.") describeExe(runArn) println(DASHES) println(DASHES) println("8. Delete the activity.") deleteActivity(activityArn) println(DASHES) println(DASHES) println("9. Delete the state machines.") deleteMachine(stateMachineArn) println(DASHES) println(DASHES) println("The AWS Step Functions example scenario is complete.") println(DASHES) } suspend fun listStatemachinesPagnator() { val machineRequest = ListStateMachinesRequest { maxResults = 10 } SfnClient { region = "us-east-1" }.use { sfnClient -> sfnClient .listStateMachinesPaginated(machineRequest) .transform { it.stateMachines?.forEach { obj -> emit(obj) } } .collect { obj -> println(" The state machine ARN is ${obj.stateMachineArn}") } } } suspend fun listActivitesPagnator() { val activitiesRequest = ListActivitiesRequest { maxResults = 10 } SfnClient { region = "us-east-1" }.use { sfnClient -> sfnClient .listActivitiesPaginated(activitiesRequest) .transform { it.activities?.forEach { obj -> emit(obj) } } .collect { obj -> println(" The activity ARN is ${obj.activityArn}") } } } suspend fun deleteMachine(stateMachineArnVal: String?) { val deleteStateMachineRequest = DeleteStateMachineRequest { stateMachineArn = stateMachineArnVal } SfnClient { region = "us-east-1" }.use { sfnClient -> sfnClient.deleteStateMachine(deleteStateMachineRequest) println("$stateMachineArnVal was successfully deleted.") } } suspend fun deleteActivity(actArn: String?) { val activityRequest = DeleteActivityRequest { activityArn = actArn } SfnClient { region = "us-east-1" }.use { sfnClient -> sfnClient.deleteActivity(activityRequest) println("You have deleted $actArn") } } suspend fun describeExe(executionArnVal: String?) { val executionRequest = DescribeExecutionRequest { executionArn = executionArnVal } var status = "" var hasSucceeded = false while (!hasSucceeded) { SfnClient { region = "us-east-1" }.use { sfnClient -> val response = sfnClient.describeExecution(executionRequest) status = response.status.toString() if (status.compareTo("RUNNING") == 0) { println("The state machine is still running, let's wait for it to finish.") Thread.sleep(2000) } else if (status.compareTo("SUCCEEDED") == 0) { println("The Step Function workflow has succeeded") hasSucceeded = true } else { println("The Status is neither running or succeeded") } } } println("The Status is $status") } suspend fun sendTaskSuccess( token: String?, json: String?, ) { val successRequest = SendTaskSuccessRequest { taskToken = token output = json } SfnClient { region = "us-east-1" }.use { sfnClient -> sfnClient.sendTaskSuccess(successRequest) } } suspend fun getActivityTask(actArn: String?): List<String> { val myList: MutableList<String> = ArrayList() val getActivityTaskRequest = GetActivityTaskRequest { activityArn = actArn } SfnClient { region = "us-east-1" }.use { sfnClient -> val response = sfnClient.getActivityTask(getActivityTaskRequest) myList.add(response.taskToken.toString()) myList.add(response.input.toString()) return myList } } suspend fun startWorkflow( stateMachineArnVal: String?, jsonEx: String?, ): String? { val uuid = UUID.randomUUID() val uuidValue = uuid.toString() val executionRequest = StartExecutionRequest { input = jsonEx stateMachineArn = stateMachineArnVal name = uuidValue } SfnClient { region = "us-east-1" }.use { sfnClient -> val response = sfnClient.startExecution(executionRequest) return response.executionArn } } suspend fun describeStateMachine(stateMachineArnVal: String?) { val stateMachineRequest = DescribeStateMachineRequest { stateMachineArn = stateMachineArnVal } SfnClient { region = "us-east-1" }.use { sfnClient -> val response = sfnClient.describeStateMachine(stateMachineRequest) println("The name of the State machine is ${response.name}") println("The status of the State machine is ${response.status}") println("The ARN value of the State machine is ${response.stateMachineArn}") println("The role ARN value is ${response.roleArn}") } } suspend fun createMachine( roleARNVal: String?, stateMachineName: String?, jsonVal: String?, ): String? { val machineRequest = CreateStateMachineRequest { definition = jsonVal name = stateMachineName roleArn = roleARNVal type = StateMachineType.Standard } SfnClient { region = "us-east-1" }.use { sfnClient -> val response = sfnClient.createStateMachine(machineRequest) return response.stateMachineArn } } suspend fun createIAMRole( roleNameVal: String?, polJSON: String?, ): String? { val request = CreateRoleRequest { roleName = roleNameVal assumeRolePolicyDocument = polJSON description = "Created using the AWS SDK for Kotlin" } IamClient { region = "AWS_GLOBAL" }.use { iamClient -> val response = iamClient.createRole(request) return response.role?.arn } } suspend fun createActivity(activityName: String): String? { val activityRequest = CreateActivityRequest { name = activityName } SfnClient { region = "us-east-1" }.use { sfnClient -> val response = sfnClient.createActivity(activityRequest) return response.activityArn } }
Python
SDK适用于 Python (Boto3)
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

在命令提示符中运行交互式场景。

class StateMachineScenario: """Runs an interactive scenario that shows how to get started using Step Functions.""" def __init__(self, activity, state_machine, iam_client): """ :param activity: An object that wraps activity actions. :param state_machine: An object that wraps state machine actions. :param iam_client: A Boto3 AWS Identity and Access Management (IAM) client. """ self.activity = activity self.state_machine = state_machine self.iam_client = iam_client self.state_machine_role = None def prerequisites(self, state_machine_role_name): """ Finds or creates an IAM role that can be assumed by Step Functions. A role of this kind is required to create a state machine. The state machine used in this example does not call any additional services, so it needs no additional permissions. :param state_machine_role_name: The name of the role. :return: Data about the role. """ trust_policy = { "Version": "2012-10-17", "Statement": [ { "Sid": "", "Effect": "Allow", "Principal": {"Service": "states.amazonaws.com"}, "Action": "sts:AssumeRole", } ], } try: role = self.iam_client.get_role(RoleName=state_machine_role_name) print(f"Prerequisite IAM role {state_machine_role_name} already exists.") except ClientError as err: if err.response["Error"]["Code"] == "NoSuchEntity": role = None else: logger.error( "Couldn't get prerequisite IAM role %s. Here's why: %s: %s", state_machine_role_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise if role is None: try: role = self.iam_client.create_role( RoleName=state_machine_role_name, AssumeRolePolicyDocument=json.dumps(trust_policy), ) except ClientError as err: logger.error( "Couldn't create prerequisite IAM role %s. Here's why: %s: %s", state_machine_role_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise self.state_machine_role = role["Role"] def find_or_create_activity(self, activity_name): """ Finds or creates a Step Functions activity. :param activity_name: The name of the activity. :return: The Amazon Resource Name (ARN) of the activity. """ print("First, let's set up an activity and state machine.") activity_arn = self.activity.find(activity_name) if activity_arn is None: activity_arn = self.activity.create(activity_name) print( f"Activity {activity_name} created. Its Amazon Resource Name (ARN) is " f"{activity_arn}." ) else: print(f"Activity {activity_name} already exists.") return activity_arn def find_or_create_state_machine( self, state_machine_name, activity_arn, state_machine_file ): """ Finds or creates a Step Functions state machine. :param state_machine_name: The name of the state machine. :param activity_arn: The ARN of an activity that is used as a step in the state machine. This ARN is injected into the state machine definition that's used to create the state machine. :param state_machine_file: The path to a file containing the state machine definition. :return: The ARN of the state machine. """ state_machine_arn = self.state_machine.find(state_machine_name) if state_machine_arn is None: with open(state_machine_file) as state_machine_file: state_machine_def = state_machine_file.read().replace( "{{DOC_EXAMPLE_ACTIVITY_ARN}}", activity_arn ) state_machine_arn = self.state_machine.create( state_machine_name, state_machine_def, self.state_machine_role["Arn"], ) print(f"State machine {state_machine_name} created.") else: print(f"State machine {state_machine_name} already exists.") print("-" * 88) print(f"Here's some information about state machine {state_machine_name}:") state_machine_info = self.state_machine.describe(state_machine_arn) for field in ["name", "status", "stateMachineArn", "roleArn"]: print(f"\t{field}: {state_machine_info[field]}") return state_machine_arn def run_state_machine(self, state_machine_arn, activity_arn): """ Run the state machine. The state machine used in this example is a simple chat simulation. It contains an activity step in a loop that is used for user interaction. When the state machine gets to the activity step, it waits for an external application to get task data and submit a response. This function acts as the activity application by getting task input and responding with user input. :param state_machine_arn: The ARN of the state machine. :param activity_arn: The ARN of the activity used as a step in the state machine. :return: The ARN of the run. """ print( f"Let's run the state machine. It's a simplistic, non-AI chat simulator " f"we'll call ChatSFN." ) user_name = q.ask("What should ChatSFN call you? ", q.non_empty) run_input = {"name": user_name} print("Starting state machine...") run_arn = self.state_machine.start(state_machine_arn, json.dumps(run_input)) action = None while action != "done": activity_task = self.activity.get_task(activity_arn) task_input = json.loads(activity_task["input"]) print(f"ChatSFN: {task_input['message']}") action = task_input["actions"][ q.choose("What now? ", task_input["actions"]) ] task_response = {"action": action} self.activity.send_task_success( activity_task["taskToken"], json.dumps(task_response) ) return run_arn def finish_state_machine_run(self, run_arn): """ Wait for the state machine run to finish, then print final status and output. :param run_arn: The ARN of the run to retrieve. """ print(f"Let's get the final output from the state machine:") status = "RUNNING" while status == "RUNNING": run_output = self.state_machine.describe_run(run_arn) status = run_output["status"] if status == "RUNNING": print( "The state machine is still running, let's wait for it to finish." ) wait(1) elif status == "SUCCEEDED": print(f"ChatSFN: {json.loads(run_output['output'])['message']}") else: print(f"Run status: {status}.") def cleanup( self, state_machine_name, state_machine_arn, activity_name, activity_arn, state_machine_role_name, ): """ Clean up resources created by this example. :param state_machine_name: The name of the state machine. :param state_machine_arn: The ARN of the state machine. :param activity_name: The name of the activity. :param activity_arn: The ARN of the activity. :param state_machine_role_name: The name of the role used by the state machine. """ if q.ask( "Do you want to delete the state machine, activity, and role created for this " "example? (y/n) ", q.is_yesno, ): self.state_machine.delete(state_machine_arn) print(f"Deleted state machine {state_machine_name}.") self.activity.delete(activity_arn) print(f"Deleted activity {activity_name}.") self.iam_client.delete_role(RoleName=state_machine_role_name) print(f"Deleted role {state_machine_role_name}.") def run_scenario(self, activity_name, state_machine_name): print("-" * 88) print("Welcome to the AWS Step Functions state machines demo.") print("-" * 88) activity_arn = self.find_or_create_activity(activity_name) state_machine_arn = self.find_or_create_state_machine( state_machine_name, activity_arn, "../../../resources/sample_files/chat_sfn_state_machine.json", ) print("-" * 88) run_arn = self.run_state_machine(state_machine_arn, activity_arn) print("-" * 88) self.finish_state_machine_run(run_arn) print("-" * 88) self.cleanup( state_machine_name, state_machine_arn, activity_name, activity_arn, self.state_machine_role["RoleName"], ) print("-" * 88) print("\nThanks for watching!") print("-" * 88) if __name__ == "__main__": logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s") try: stepfunctions_client = boto3.client("stepfunctions") iam_client = boto3.client("iam") scenario = StateMachineScenario( Activity(stepfunctions_client), StateMachine(stepfunctions_client), iam_client, ) scenario.prerequisites("doc-example-state-machine-chat") scenario.run_scenario("doc-example-activity", "doc-example-state-machine") except Exception: logging.exception("Something went wrong with the demo.")

定义一个包装状态机和操作的类。

class StateMachine: """Encapsulates Step Functions state machine actions.""" def __init__(self, stepfunctions_client): """ :param stepfunctions_client: A Boto3 Step Functions client. """ self.stepfunctions_client = stepfunctions_client def create(self, name, definition, role_arn): """ Creates a state machine with the specific definition. The state machine assumes the provided role before it starts a run. :param name: The name to give the state machine. :param definition: The Amazon States Language definition of the steps in the the state machine. :param role_arn: The Amazon Resource Name (ARN) of the role that is assumed by Step Functions when the state machine is run. :return: The ARN of the newly created state machine. """ try: response = self.stepfunctions_client.create_state_machine( name=name, definition=definition, roleArn=role_arn ) except ClientError as err: logger.error( "Couldn't create state machine %s. Here's why: %s: %s", name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return response["stateMachineArn"] def find(self, name): """ Find a state machine by name. This requires listing the state machines until one is found with a matching name. :param name: The name of the state machine to search for. :return: The ARN of the state machine if found; otherwise, None. """ try: paginator = self.stepfunctions_client.get_paginator("list_state_machines") for page in paginator.paginate(): for state_machine in page.get("stateMachines", []): if state_machine["name"] == name: return state_machine["stateMachineArn"] except ClientError as err: logger.error( "Couldn't list state machines. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise def describe(self, state_machine_arn): """ Get data about a state machine. :param state_machine_arn: The ARN of the state machine to look up. :return: The retrieved state machine data. """ try: response = self.stepfunctions_client.describe_state_machine( stateMachineArn=state_machine_arn ) except ClientError as err: logger.error( "Couldn't describe state machine %s. Here's why: %s: %s", state_machine_arn, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return response def start(self, state_machine_arn, run_input): """ Start a run of a state machine with a specified input. A run is also known as an "execution" in Step Functions. :param state_machine_arn: The ARN of the state machine to run. :param run_input: The input to the state machine, in JSON format. :return: The ARN of the run. This can be used to get information about the run, including its current status and final output. """ try: response = self.stepfunctions_client.start_execution( stateMachineArn=state_machine_arn, input=run_input ) except ClientError as err: logger.error( "Couldn't start state machine %s. Here's why: %s: %s", state_machine_arn, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return response["executionArn"] def describe_run(self, run_arn): """ Get data about a state machine run, such as its current status or final output. :param run_arn: The ARN of the run to look up. :return: The retrieved run data. """ try: response = self.stepfunctions_client.describe_execution( executionArn=run_arn ) except ClientError as err: logger.error( "Couldn't describe run %s. Here's why: %s: %s", run_arn, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return response def delete(self, state_machine_arn): """ Delete a state machine and all of its run data. :param state_machine_arn: The ARN of the state machine to delete. """ try: response = self.stepfunctions_client.delete_state_machine( stateMachineArn=state_machine_arn ) except ClientError as err: logger.error( "Couldn't delete state machine %s. Here's why: %s: %s", state_machine_arn, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return response

定义一个包装活动操作的类。

class Activity: """Encapsulates Step Function activity actions.""" def __init__(self, stepfunctions_client): """ :param stepfunctions_client: A Boto3 Step Functions client. """ self.stepfunctions_client = stepfunctions_client def create(self, name): """ Create an activity. :param name: The name of the activity to create. :return: The Amazon Resource Name (ARN) of the newly created activity. """ try: response = self.stepfunctions_client.create_activity(name=name) except ClientError as err: logger.error( "Couldn't create activity %s. Here's why: %s: %s", name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return response["activityArn"] def find(self, name): """ Find an activity by name. This requires listing activities until one is found with a matching name. :param name: The name of the activity to search for. :return: If found, the ARN of the activity; otherwise, None. """ try: paginator = self.stepfunctions_client.get_paginator("list_activities") for page in paginator.paginate(): for activity in page.get("activities", []): if activity["name"] == name: return activity["activityArn"] except ClientError as err: logger.error( "Couldn't list activities. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise def get_task(self, activity_arn): """ Gets task data for an activity. When a state machine is waiting for the specified activity, a response is returned with data from the state machine. When a state machine is not waiting, this call blocks for 60 seconds. :param activity_arn: The ARN of the activity to get task data for. :return: The task data for the activity. """ try: response = self.stepfunctions_client.get_activity_task( activityArn=activity_arn ) except ClientError as err: logger.error( "Couldn't get a task for activity %s. Here's why: %s: %s", activity_arn, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return response def send_task_success(self, task_token, task_response): """ Sends a success response to a waiting activity step. A state machine with an activity step waits for the activity to get task data and then respond with either success or failure before it resumes processing. :param task_token: The token associated with the task. This is included in the response to the get_activity_task action and must be sent without modification. :param task_response: The response data from the activity. This data is received and processed by the state machine. """ try: self.stepfunctions_client.send_task_success( taskToken=task_token, output=task_response ) except ClientError as err: logger.error( "Couldn't send task success. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise def delete(self, activity_arn): """ Delete an activity. :param activity_arn: The ARN of the activity to delete. """ try: response = self.stepfunctions_client.delete_activity( activityArn=activity_arn ) except ClientError as err: logger.error( "Couldn't delete activity %s. Here's why: %s: %s", activity_arn, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return response