自2024年7月31日起, AWS SDK for Java 1.x已进入维护模式,并将于2025年12月31日end-of-support
本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
适当地关闭活动和工作流工作线程
构建简单 Amazon SWF 应用程序主题中介绍实施包括注册应用程序、活动、工作流工作线程以及工作流启动程序的简单工作流应用程序的整个过程。
工作线程类设计为持续运行,轮询 Amazon SWF 发送的任务,以便运行活动或返回决策。完成轮询请求后,Amazon SWF 会记录轮询器,并尝试为其分配任务。
如果工作流工作线程在长轮询过程中终止,Amazon SWF 可能仍然会尝试向终止的工作线程发送任务,导致该任务丢失 (直至该任务超时)。
解决上述情况的一个方法是等待所有长轮询请求返回,然后再终止工作线程。
在该主题中,我们会使用 Java 的关闭挂钩来重写 helloswf
中的活动工作线程,以尝试适当地关闭活动工作线程。
以下是完整的代码:
import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import com.amazonaws.regions.Regions; import com.amazonaws.services.simpleworkflow.AmazonSimpleWorkflow; import com.amazonaws.services.simpleworkflow.AmazonSimpleWorkflowClientBuilder; import com.amazonaws.services.simpleworkflow.model.ActivityTask; import com.amazonaws.services.simpleworkflow.model.PollForActivityTaskRequest; import com.amazonaws.services.simpleworkflow.model.RespondActivityTaskCompletedRequest; import com.amazonaws.services.simpleworkflow.model.RespondActivityTaskFailedRequest; import com.amazonaws.services.simpleworkflow.model.TaskList; public class ActivityWorkerWithGracefulShutdown { private static final AmazonSimpleWorkflow swf = AmazonSimpleWorkflowClientBuilder.standard().withRegion(Regions.DEFAULT_REGION).build(); private static final CountDownLatch waitForTermination = new CountDownLatch(1); private static volatile boolean terminate = false; private static String executeActivityTask(String input) throws Throwable { return "Hello, " + input + "!"; } public static void main(String[] args) { Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { try { terminate = true; System.out.println("Waiting for the current poll request" + " to return before shutting down."); waitForTermination.await(60, TimeUnit.SECONDS); } catch (InterruptedException e) { // ignore } } }); try { pollAndExecute(); } finally { waitForTermination.countDown(); } } public static void pollAndExecute() { while (!terminate) { System.out.println("Polling for an activity task from the tasklist '" + HelloTypes.TASKLIST + "' in the domain '" + HelloTypes.DOMAIN + "'."); ActivityTask task = swf.pollForActivityTask(new PollForActivityTaskRequest() .withDomain(HelloTypes.DOMAIN) .withTaskList(new TaskList().withName(HelloTypes.TASKLIST))); String taskToken = task.getTaskToken(); if (taskToken != null) { String result = null; Throwable error = null; try { System.out.println("Executing the activity task with input '" + task.getInput() + "'."); result = executeActivityTask(task.getInput()); } catch (Throwable th) { error = th; } if (error == null) { System.out.println("The activity task succeeded with result '" + result + "'."); swf.respondActivityTaskCompleted( new RespondActivityTaskCompletedRequest() .withTaskToken(taskToken) .withResult(result)); } else { System.out.println("The activity task failed with the error '" + error.getClass().getSimpleName() + "'."); swf.respondActivityTaskFailed( new RespondActivityTaskFailedRequest() .withTaskToken(taskToken) .withReason(error.getClass().getSimpleName()) .withDetails(error.getMessage())); } } } } }
在该版本中,原始版本的 main
功能中的轮询代码已被移至其自己的 pollAndExecute
方法中。
现在,main
功能使用 CountDownLatch