适当地关闭活动和工作流工作线程 - AWS SDK for Java 1.x

我们宣布了即将推出 end-of-support 的 AWS SDK for Java (v1)。建议您迁移到 AWS SDK for Java v2。有关日期、其他详细信息以及如何迁移的信息,请参阅链接的公告。

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

适当地关闭活动和工作流工作线程

构建简单 Amazon SWF 应用程序主题中介绍实施包括注册应用程序、活动、工作流工作线程以及工作流启动程序的简单工作流应用程序的整个过程。

工作线程类设计为持续运行,轮询 Amazon SWF 发送的任务,以便运行活动或返回决策。完成轮询请求后,Amazon SWF 会记录轮询器,并尝试为其分配任务。

如果工作流工作线程在长轮询过程中终止,Amazon SWF 可能仍然会尝试向终止的工作线程发送任务,导致该任务丢失 (直至该任务超时)。

解决上述情况的一个方法是等待所有长轮询请求返回,然后再终止工作线程。

在该主题中,我们会使用 Java 的关闭挂钩来重写 helloswf 中的活动工作线程,以尝试适当地关闭活动工作线程。

以下是完整的代码:

import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import com.amazonaws.regions.Regions; import com.amazonaws.services.simpleworkflow.AmazonSimpleWorkflow; import com.amazonaws.services.simpleworkflow.AmazonSimpleWorkflowClientBuilder; import com.amazonaws.services.simpleworkflow.model.ActivityTask; import com.amazonaws.services.simpleworkflow.model.PollForActivityTaskRequest; import com.amazonaws.services.simpleworkflow.model.RespondActivityTaskCompletedRequest; import com.amazonaws.services.simpleworkflow.model.RespondActivityTaskFailedRequest; import com.amazonaws.services.simpleworkflow.model.TaskList; public class ActivityWorkerWithGracefulShutdown { private static final AmazonSimpleWorkflow swf = AmazonSimpleWorkflowClientBuilder.standard().withRegion(Regions.DEFAULT_REGION).build(); private static final CountDownLatch waitForTermination = new CountDownLatch(1); private static volatile boolean terminate = false; private static String executeActivityTask(String input) throws Throwable { return "Hello, " + input + "!"; } public static void main(String[] args) { Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { try { terminate = true; System.out.println("Waiting for the current poll request" + " to return before shutting down."); waitForTermination.await(60, TimeUnit.SECONDS); } catch (InterruptedException e) { // ignore } } }); try { pollAndExecute(); } finally { waitForTermination.countDown(); } } public static void pollAndExecute() { while (!terminate) { System.out.println("Polling for an activity task from the tasklist '" + HelloTypes.TASKLIST + "' in the domain '" + HelloTypes.DOMAIN + "'."); ActivityTask task = swf.pollForActivityTask(new PollForActivityTaskRequest() .withDomain(HelloTypes.DOMAIN) .withTaskList(new TaskList().withName(HelloTypes.TASKLIST))); String taskToken = task.getTaskToken(); if (taskToken != null) { String result = null; Throwable error = null; try { System.out.println("Executing the activity task with input '" + task.getInput() + "'."); result = executeActivityTask(task.getInput()); } catch (Throwable th) { error = th; } if (error == null) { System.out.println("The activity task succeeded with result '" + result + "'."); swf.respondActivityTaskCompleted( new RespondActivityTaskCompletedRequest() .withTaskToken(taskToken) .withResult(result)); } else { System.out.println("The activity task failed with the error '" + error.getClass().getSimpleName() + "'."); swf.respondActivityTaskFailed( new RespondActivityTaskFailedRequest() .withTaskToken(taskToken) .withReason(error.getClass().getSimpleName()) .withDetails(error.getMessage())); } } } } }

在该版本中,原始版本的 main 功能中的轮询代码已被移至其自己的 pollAndExecute 方法中。

现在,main 功能使用 CountDownLatch 以及关闭挂钩,实现了在收到终止线程的请求后,让线程等待最多 60 秒才允许线程关闭。