Menu
AWS SDK for Java
Developer Guide

Shutting Down Activity and Workflow Workers Gracefully

The Building a Simple Amazon SWF Application topic provided a complete implementation of a simple workflow application consisting of a registration application, an activity and workflow worker, and a workflow starter.

Worker classes are designed to run continuously, polling for tasks sent by Amazon SWF in order to run activities or return decisions. Once a poll request is made, Amazon SWF records the poller and will attempt to assign a task to it.

If the workflow worker is terminated during a long poll, Amazon SWF may still try to send a task to the terminated worker, resulting in a lost task (until the task times out).

One way to handle this situation is to wait for all long poll requests to return before the worker terminates.

In this topic, we'll rewrite the activity worker from helloswf, using Java's shutdown hooks to attempt a graceful shutdown of the activity worker.

Here is the complete code:

Copy
package aws.example.helloswf; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import com.amazonaws.services.simpleworkflow.AmazonSimpleWorkflow; import com.amazonaws.services.simpleworkflow.AmazonSimpleWorkflowClientBuilder; import com.amazonaws.services.simpleworkflow.model.ActivityTask; import com.amazonaws.services.simpleworkflow.model.PollForActivityTaskRequest; import com.amazonaws.services.simpleworkflow.model.RespondActivityTaskCompletedRequest; import com.amazonaws.services.simpleworkflow.model.RespondActivityTaskFailedRequest; import com.amazonaws.services.simpleworkflow.model.TaskList; public class ActivityWorkerWithGracefulShutdown { private static final AmazonSimpleWorkflow swf = AmazonSimpleWorkflowClientBuilder.defaultClient(); private static CountDownLatch waitForTermination = new CountDownLatch(1); private static volatile boolean terminate = false; private static String executeActivityTask(String input) throws Throwable { return "Hello, " + input + "!"; } public static void main(String[] args) { Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { try { terminate = true; System.out.println("Waiting for the current poll request" + " to return before shutting down."); waitForTermination.await(60, TimeUnit.SECONDS); } catch (InterruptedException e) { // ignore } } }); try { pollAndExecute(); } finally { waitForTermination.countDown(); } } public static void pollAndExecute() { while (!terminate) { System.out.println("Polling for an activity task from the tasklist '" + HelloTypes.TASKLIST + "' in the domain '" + HelloTypes.DOMAIN + "'."); ActivityTask task = swf.pollForActivityTask(new PollForActivityTaskRequest() .withDomain(HelloTypes.DOMAIN) .withTaskList(new TaskList().withName(HelloTypes.TASKLIST))); String taskToken = task.getTaskToken(); if (taskToken != null) { String result = null; Throwable error = null; try { System.out.println("Executing the activity task with input '" + task.getInput() + "'."); result = executeActivityTask(task.getInput()); } catch (Throwable th) { error = th; } if (error == null) { System.out.println("The activity task succeeded with result '" + result + "'."); swf.respondActivityTaskCompleted( new RespondActivityTaskCompletedRequest() .withTaskToken(taskToken) .withResult(result)); } else { System.out.println("The activity task failed with the error '" + error.getClass().getSimpleName() + "'."); swf.respondActivityTaskFailed( new RespondActivityTaskFailedRequest() .withTaskToken(taskToken) .withReason(error.getClass().getSimpleName()) .withDetails(error.getMessage())); } } } } }

In this version, the polling code that was in the main function in the original version has been moved into its own method, pollAndExecute.

The main function now uses a CountDownLatch in conjunction with a shutdown hook to cause the thread to wait for up to 60 seconds after its termination is requested before letting the thread shut down.