백엔드 엔지니어 이재혁
[Java] Executor 프레임워크 기본 본문
`Thread` 객체를 직접 사용하는 것은 생성 비용 문제와 관리 문제로 인해 비효율적일 때가 많다.
1. 스레드 생성 비용 문제
`Thread` 객체는 다음과 같은 이유로 일반적인 Java 객체를 생성하는 비용보다 훨씬 많은 비용이 든다.
- 메모리 할당: 스레드는 자신만의 call stack을 가지고 있고, 스레드 생성시 call stack을 위한 메모리 공간을 확보해야 한다.
- 시스템 콜: 스레드를 생성하는 작업은 운영 체제의 커널 수준에서 작동해 "시스템 콜"을 통해 처리된다. CPU와 메모리 리소스를 소모한다.
- 운영체제 스케쥴러 설정: 새로운 스레드가 생성되면 CPU 스케쥴러가 스레드 실행 순서를 스케쥴링 알고리즘에 맞게 조정한다. 이런 과정에서 추가적인 오버헤드가 발생할 수 있다.
스레드 생성에 드는 비용으로 인해, 가벼운 여러 작업을 멀티스레드로 실행하는 것이 오히려 비효율적이게 될 수 있다.
2. 스레드 관리 문제
CPU와 메모리 자원 한계로 인해, 스레드는 무한으로 생성할 수 없다. 즉, 요청이 들어올 때마다 스레드를 생성하면 그러다가 서버의 한계를 넘어갈 수도 있다. 이런 문제를 해결하기 위해 시스템이 버틸 수 있는 최대 스레드 수까지만 스레드가 생성되도록 관리해야 한다.
3. Runnable 인터페이스의 불편함
반환값이 없다: 일반적인 메서드와 다르게 무언가 결과를 return할 방법이 없다. 별도의 메커니즘으로 값을 저장하고 수정하는 방식을 활용해야 한다. (멤버 변수로 지정하는 방식이라던가)
예외 처리: Runnable 인터페이스는 체크 예외를 던질 수 없다. 체크 예외 처리는 메서드 내부에서 끝내야 한다.
Thread Pool
1, 2번을 해결하기 위해 스레드 풀(Thread Pool)을 사용할 수 있다.
스레드 풀은 스레드들을 생성해놓고 스레드가 필요해지면 해당 스레드 풀에 이미 생성된 스레드를 가져다가 사용하도록 할 수 있다.
스레드 풀을 통해 시스템이 감당할 수 있을 정도의 스레드만 생성하고 그 스레드들만 사용하도록 할 수 있다.
Executor Framework
이 Thread Pool 개념을 제공하는 Executor 프레임워크를 사용하자.
Executor 프레임워크가 스레드 풀, 스레드관리, `Runnable`의 문제점은 물론이고, 생산자 소비자 문제까지 한 방에 해결해주는 자바 멀티스레드 최고의 도구이다.
실무에서 멀티스레드를 활용해야겠다고 하면, Thread를 직접 다루는 것보다 Executor 프레임워크를 써야겠다는 생각이 바로 들게 될 정도로 유용하다고 한다.
ExecutorService
`ExecutorService`의 구현체로는 보통 `ThreadPoolExecutor`를 사용한다. 5개의 매개변수 중 앞의 4개는 `Thread Pool`에 대한 설정이고, 마지막은 `Task`를 담을 `BlockingQueue`를 넣어주는 것이다.
ExecutorService es = new ThreadPoolExecutor(2, 2, 0, TimeUnit.MILLISECONDS, new LinkedBlockingDeque<>());
ExecutorService esFixed = Executors.newFixedThreadPool(1); // 고정 크기 ThreadPool은 이렇게 쉽게 만들 수도 있음
`es.execute(Runnable r)`로 Task를 실행하게 할 수 있다.
es.execute(new RunnableTask("taskA"));
es.execute(new RunnableTask("taskB"));
es.execute(new RunnableTask("taskC"));
es.execute(new RunnableTask("taskD"));
여기서 잠깐!
ExecutorService는 Thread를 미리 생성해두지 않는다! 작업이 들어오면 그 때 처음으로 Thread를 생성한다. 그 Thread를 계속 재사용하는 것이다.
그리고 최대 스레드 풀 갯수를 2로 설정해둔 상태에서 위와 같이 4개 작업이 들어오면, 스레드는 2개까지만 생성되고, 작업이 완료된 스레드가 나타날 때까지 작업을 대기시킨다.
Callable (Runnable 대체)
`Runnable`의 단점을 보완하기 위해 `Callable`이라는 인터페이스를 사용한다. `call()` 이라는 메서드를 구현해야 한다.
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService es = Executors.newFixedThreadPool(1);
Future<Integer> future = es.submit(new MyCallable());
Integer result = future.get();
log("result value = " + result);
es.close();
}
static class MyCallable implements Callable<Integer> {
@Override
public Integer call() throws Exception {
sleep(2000);
return new Random().nextInt(10);
}
}
`Runnable`과 달리, `Callable`은 값을 `return`할 수 있고, `Future`의 `get()`을 통해 받을 수 있다. main 메서드 코드를 보면 마치 싱글스레드로 작업하는 것 같이 보일 정도로 자연스럽게 메서드 호출 결과를 return 받는 방식으로 되어 있다.
엥? main 스레드가 작업을 던지고 알아서 작업이 끝날 때까지 기다리는건가? 작업이 안끝났는데 `future.get()`을 해도 잘 작동하는건 어떻게 그런걸까? ← 실제로 `Callable` 작업이 끝날 때까지 main 메서드의 log가 출력되지 않았다!
그렇다. 알아서 기다리는게 맞다. Future를 조금 더 깊이 보다보면 어떻게 그런 일이 가능한지 알 수 있다.
여기서 잠깐! `ExecutorService`는 `Runnable`도 받을 수 있다. return 값이 null인 `Callable`처럼 사용된다.
Future
`Future`는 전달한 작업의 미래 결과를 담고 있다. 실제로 사용되는 구현체는 `FutureTask`이다.
`java.util.concurrent.FutureTask@6193b845[Not completed, task = thread.executor.future.CallableMainV2$MyCallable@4d405ef7]`
위와 같이 Future의 toString()을 호출해보면, task가 완료됐는지 여부를 가지고 있고, task도 참조하고 있다
(객체의 Task 필드: `private Callable<V> callable;`)
작업 종료 탐지
어떻게 Future에 들어있는 작업이 끝났을 때 딱 맞춰서 결과를 받아올 수 있을까?
`Future`의 `get()` 메서드를 호출하면 된다.
`FutureTask` 구현체의 `get()` 메서드를 확인해보면 작업이 완료되지 않았을 때는 아래의 `awaitDone()` 메서드를 호출한다.
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
// ... 여기는 싹 다 생략하고 핵심은! LockSupport 유틸 메서드를 사용한다.
if (state < COMPLETING)
LockSupport.parkNanos(this, parkNanos);
}
else
LockSupport.park(this);
}
}
`LockSupport`의 `park` `partNanos` 유틸 메서드를 사용해 결과를 요청한 스레드가 다른 스레드의 작업을 기다리도록 만든다.
처음 Callable을 써봤을 때 의문이 들었던 점, 알아서 스레드가 끝날 때까지 대기하는 것은 이렇게 `Future`의 `get()`을 호출할 때, 완료될 때까지 호출한 스레드를 대기 상태로 만들어주는 블로킹 메서드로 구현되어 있기 때문이었다.
작업이 완료되면, 작업을 하던 스레드가, '결과를 요청한 뒤 대기하던 스레드'를 깨워 RUNNABLE 상태로 바꿔준다.
작업이 이미 완료되었다면? 대기없이 바로 결과를 반환한다.
논블로킹과 블로킹의 적절한 사용
논블로킹: Callable을 ExecutorService에 넘겨주고, 그 결과를 담는 Future 객체는 논블로킹 방식으로 받아온다.
(논블로킹이기 때문에 여러 작업 요청을 동시에 할 수도 있다.)
블로킹: Future의 get() 메서드를 호출할 때가 되서야 main 스레드를 블로킹 한다.
* 주의
멀티스레딩을 할 때, 블로킹과 논블로킹을 잘 구분해서 써야 한다. 아래와 같이 논블로킹 메서드 뒤에 블로킹 메서드를 붙여버리면 통으로 블로킹 메서드처럼 작동하게 되기 때문에 멀티스레드가 아니라 싱글스레드처럼 작동하게 된다 😓
Integer result1 = es.submit(new MyCallable()).get(); // 작업 완료할 때까지 대기
Integer result2 = es.submit(new MyCallable()).get(); // 위 작업이 완료되어야 이 작업이 시작됨
대부분이 논블로킹이기 때문에 블로킹에 주의하면 될 것 같다.
작업 취소
`boolean cancel(boolean mayInterruptIfRunning)`: 아직 완료되지 않은 작업을 취소
반환값: 취소 성공 여부 (이미 작업이 완료됐거나 작업 취소를 못하는 상황이면 `false`)
실행 대기 중: 작업 대기 큐에서 제거. 작업 실행 안함.
실행 중일 때: 만약 `mayInterruptIfRunning`을 `false`로 실행한다면, 실행 중인 작업은 그대로 쭉 하게 둔다.
`true`면 이미 작업 중인 작업도 인터럽트 시켜버린다.
그럼, 실행 중인 작업에 `cancel(false)` 를 실행하면 아무런 일이 일어나지 않는 것일까?
아니다. 한가지 큰 차이점이 있는데, `future.get()` 으로 작업의 return 값을 받아오고자 시도하면 `CancellationException`이 터진다. 이것을 두 가지로 나눠서 생각해볼 수 있다.
- 취소된 작업의 결과를 받아오려고 시도하면 `CancellationException` 예외가 터진다.
- 작업은 계속 실행 중이어도, 위 예외를 즉시 터뜨리기 때문에 `get()` 메서드는 논블로킹처럼 작동한다.
`isCancelled()` 메서드와 ` isDone()` 메서드도 있는데 그냥 이름대로 받아들이면 된다.
Future의 예외 전달
Runnable은 예외를 던질 수 없었지만, Future는 예외를 전달할 수 있도록 만들었다.
아래 `call()`과 같이 작업이 Exception을 던지는 상황을 확인해보자.
@Override
public Integer call() throws Exception {
throw new IllegalStateException("ex!");
}
Future가 던지는 예외는 `ExecutionException`이지만, 그 Exception 안에 그 이유, 즉 원본 Exception을 담아서 터뜨리게 된다.
아래는 `ExecutionException`의 `printStackTrace()` 결과
java.util.concurrent.ExecutionException: java.lang.IllegalStateException: ex!
at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122)
... 생략
Caused by: java.lang.IllegalStateException: ex!
at thread.executor.future.FutureExceptionMain$ExCallable.call(FutureExceptionMain.java:35)
... 생략
Caused by를 확인하면 그 예외의 원인을 하나씩 확인할 수 있다.
그래, 던지는건 알겠는데 그래서 어떻게 잡아?
try {
Integer result = future.get();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) { // Exception은 Exception 안에 그 이유를 담을 수 있다.
Throwable cause = e.getCause(); // 그래서 원본 예외을 꺼내서 확인할 수 있다.
if (cause instanceof IllegalStateException)
System.out.println("IllegalStateException 터짐");
else
System.out.println("무언가 Exception이 터짐");
}
먼저 `ExecutionException`을 잡고, 그 안의 예외를 `getCause()`로 가져온다. 그 예외가 어떤 클래스인지 체크한다.
InvokeAll / InvokeAny
간단하게 Node.js의 `Promise.all()`과 `Promise.any()`와 같은 기능이라고 생각하면 된다.
아래와 같이 실행하며, `ExecutorService`의 메서드이다.
List<Future<Integer>> futures = es.invokeAll(tasks);
InvokeAll
`<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException`
`Callable<T>`을 `Collection`으로 모아서 `invokeAll()` 메서드에 넣어서 실행 → 모든 작업이 끝날 때까지 대기 및 모든 결과를 `List<Future<T>>` 형태로 반환
`<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException`
매개변수에 시간을 추가할 수 있는데,
시간 안에 완료된 것들: 일반적인 Future처럼 `get()` 가능.
시간 안에 완료하지 못한 것들: 진행 중이었던 작업에 인터럽트 발생. `get()`하면 `CancellationException` 터짐.
InvokeAny
`<T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException`
가장 먼저 실행 완료된 작업의 결과를 리턴한다. (Future를 리턴하지 않음)
나머지 작업들은 전부 인터럽트
InvokeAny도 시간 매개변수를 추가할 수 있는데, 시간 안에 완료하지 못하면 전부 인터럽트.
InvokeAll과 InvokeAny의 단점
지금 생각드는 단점은, 기존에 사용했던 future와 다르게 작업 실행과 동시에 요청 스레드가 블로킹된다는 점이 마음에 걸린다. InvokeAll과 InvokeAny도 작업을 실행하도록 지시하는 것(논블로킹)과 그 결과를 받아오는 것(블로킹)을 분리할 수 있지 않았을까?
'Java' 카테고리의 다른 글
| [Java] Executor 프레임워크 심화 (0) | 2025.06.30 |
|---|---|
| [JUnit] Controller 단위 테스트 작성하기, 그리고 회고 (1) | 2025.06.29 |
| [Java] 컬렉션 프레임워크와 동시성 (0) | 2025.06.13 |
| [JAVA] CAS 락 구현 (0) | 2025.06.10 |
| [Java] CAS 연산 활용 (1) | 2025.06.09 |