활동 및 워크플로 작업자 정상 종료 - AWS SDK for Java 1.x

AWS SDK for Java 1.x는 2024년 7월 31일부터 유지 관리 모드로 전환되었으며 2025년 12월 end-of-support31일에 출시될 예정입니다. 새 기능, 가용성 개선 사항 및 AWS SDK for Java 2.x보안 업데이트를 계속 받으려면 로 마이그레이션하는 것이 좋습니다.

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

활동 및 워크플로 작업자 정상 종료

간단한 Amazon SWF 애플리케이션 구축 주제에서는 등록 애플리케이션, 활동과 워크플로 작업자, 워크플로 시작자로 이루어진 단순 워크플로 애플리케이션의 전체 구현을 제공했습니다.

작업자 클래스는 지속적으로 실행하며 Amazon SWF에서 전송된 작업을 폴링하여 활동을 실행하거나 결정을 반환하도록 설계되었습니다. 폴링 요청이 생성되면 Amazon SWF는 폴러를 기록하고 이 폴러에 작업을 할당하려고 시도합니다.

워크플로 작업자가 긴 폴링 기간 동안 종료되는 경우 Amazon SWF가 여전히 작업을 종료된 작업자로 전송하려고 시도하여 (작업 제한 시간이 경과될 때까지) 작업 손실이 발생할 수도 있습니다.

이 상황을 처리하는 한 가지 방법은 작업자가 종료되기 전에 모든 긴 폴링 요청이 반환될 때까지 기다리는 것입니다.

이 주제에서는 Java의 종료 후크를 사용하여 활동 작업자의 정상 종료를 시도하도록 helloswf의 활동 작업자를 다시 작성하겠습니다.

전체 코드는 다음과 같습니다.

import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import com.amazonaws.regions.Regions; import com.amazonaws.services.simpleworkflow.AmazonSimpleWorkflow; import com.amazonaws.services.simpleworkflow.AmazonSimpleWorkflowClientBuilder; import com.amazonaws.services.simpleworkflow.model.ActivityTask; import com.amazonaws.services.simpleworkflow.model.PollForActivityTaskRequest; import com.amazonaws.services.simpleworkflow.model.RespondActivityTaskCompletedRequest; import com.amazonaws.services.simpleworkflow.model.RespondActivityTaskFailedRequest; import com.amazonaws.services.simpleworkflow.model.TaskList; public class ActivityWorkerWithGracefulShutdown { private static final AmazonSimpleWorkflow swf = AmazonSimpleWorkflowClientBuilder.standard().withRegion(Regions.DEFAULT_REGION).build(); private static final CountDownLatch waitForTermination = new CountDownLatch(1); private static volatile boolean terminate = false; private static String executeActivityTask(String input) throws Throwable { return "Hello, " + input + "!"; } public static void main(String[] args) { Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { try { terminate = true; System.out.println("Waiting for the current poll request" + " to return before shutting down."); waitForTermination.await(60, TimeUnit.SECONDS); } catch (InterruptedException e) { // ignore } } }); try { pollAndExecute(); } finally { waitForTermination.countDown(); } } public static void pollAndExecute() { while (!terminate) { System.out.println("Polling for an activity task from the tasklist '" + HelloTypes.TASKLIST + "' in the domain '" + HelloTypes.DOMAIN + "'."); ActivityTask task = swf.pollForActivityTask(new PollForActivityTaskRequest() .withDomain(HelloTypes.DOMAIN) .withTaskList(new TaskList().withName(HelloTypes.TASKLIST))); String taskToken = task.getTaskToken(); if (taskToken != null) { String result = null; Throwable error = null; try { System.out.println("Executing the activity task with input '" + task.getInput() + "'."); result = executeActivityTask(task.getInput()); } catch (Throwable th) { error = th; } if (error == null) { System.out.println("The activity task succeeded with result '" + result + "'."); swf.respondActivityTaskCompleted( new RespondActivityTaskCompletedRequest() .withTaskToken(taskToken) .withResult(result)); } else { System.out.println("The activity task failed with the error '" + error.getClass().getSimpleName() + "'."); swf.respondActivityTaskFailed( new RespondActivityTaskFailedRequest() .withTaskToken(taskToken) .withReason(error.getClass().getSimpleName()) .withDetails(error.getMessage())); } } } } }

이 버전에서는 원래 버전의 main 함수에 있었던 폴링 코드가 고유 메서드인 pollAndExecute로 이동되었습니다.

이제 main 함수는 종료 후크와 함께 CountDownLatch를 사용하여 스레드를 종료하기 전에 스레드가 종료 요청 후 최대 60초 동안 대기하도록 합니다.