그래서 쓰레드를 미리 만들어두고 재사용하기 위한 쓰레드 풀(Thread Pool) 이 등장하게 되었는데,
Executor 인터페이스는 쓰레드 풀의 구현을 위한 인터페이스이다.
이러한 Executor 인터페이스를 간단히 정리하면 다음과 같다.
등록된 작업(Runnable) 을 실행하기 위한 인터페이스
작업 등록과 작업 실행 중에서 작업 실행만을 책임짐
쓰레드는 크게 작업의 등록과 실행으로 나누어진다. 그 중에서도 Executor 인터페이스는 인터페이스 분리 원칙(Interface Segregation Principle) 에 맞게 등록된 작업을 실행하는 책임만 갖는다.
-> 그래서 전달 받은 작업(Runnable) 을 실행하는 메서드만 가지고 있다.
public interface Executor {
void execute(Runnable command);
}
Executor 인터페이스는 개발자들이 해당 작업의 실행과 쓰레드의 사용 및 스케줄링 등으로부터 벗어날 수 있도록 도와준다.
단순히 전달 받은 Runnable 작업을 사용하는 코드를 Executor 로 구현하면 다음과 같다.
public class RunExecutor implements Executor {
@Override
public void execute(final Runnable command) {
command.run();
}
}
@Test
void executorRun() {
final Runnable runnable = ()
-> System.out.println("Thread : " + Thread.cuncurrentThread.getName());
Executor executor = new RunExecutor();
executor.execute(runnable);
}
하지만 위와 같은 코드는 단순히 객체의 메서드를 호출하는 것이므로, 새로운 쓰레드가 아닌 메인 쓰레드에서 실행된다.
만약 위의 코드를 새로운 쓰레드에서 실행시키려면 Executor 의 execute 메서드를 다음과 같이 수정하면 된다.
public class StartExecutor implements Executor {
@Override
public void execute(final Runnable command) {
new Thread(command).start();
}
}
@Test
void executorRun() {
final Runnable runnable = ()
-> System.out.println("Thread : " + Thread.cuncurrentThread.getName());
Executor executor = new StartExecutor();
executor.execute(runnable);
}
2. ExecutorService
ExecutorService 는 작업(Runnable, Callable) 등록을 위한 인터페이스이다.
ExecutorService 는 Executor 를 상속 받아서 작업 등록(관리) 뿐 아니라 실행을 위한 책임도 갖는다.
그래서 쓰레드 풀은 기본적으로 ExecutorService 인터페이스를 구현한다.
대표적으로 ThreadPoolExecutor 가 ExecutorService 의 구현체인데,
ThreadPoolExecutor 내부에 있는 블로킹 큐에 작업들을 등록해둔다.
위와 같이 크기가 2인 쓰레드 풀이 있다고 가정했을 때, 각각의 쓰레드는 작업을 할당 받아 처리한다.
만약 사용 가능한 쓰레드가 없다면 작업은 블로킹 큐에 대기한다. 그러다 쓰레드가 작업이 끝나면 다음 작업을 할당받게 되는 것이다.
이러한 ExecutorService 가 제공하는 퍼블릭 메서드들은 다음과 같이 분류 가능하다.
각각 newFixedThreadPool, newScheduledThreadPool 에 1개의 쓰레드만을 생성하도록 한 것이다.
5. Future
Future 는 비동기 작업의 결과를 나타내는 데 사용된다.
비동기적으로 실행되는 작업의 상태나 결과를 나중에 확인할 수 있도록 하는 인터페이스이다.
주로 ExecutorService 에서 비동기 작업을 제출할 때 반환된다. Future 를 사용하면 작업이 완료될 때까지 기다리거나, 작업의 취소, 상태 확인 등을 할 수 있다.
제공 메서드
T get()
작업이 완료될 때까지 기다렸다가 결과를 반환한다.
작업이 완료되면 그 결과를 반환하고, 작업이 완료되지 않았다면 해당 쓰레드가 작업을 완료할 때까지 기다린다.
T get(long timeout, TimeUnit unit)
지정한 시간(timeout) 동안 작업이 완료되기를 기다린다.
지정된 시간이 초과되면 TimoutException 을 던지고, 그렇지 않으면 작업 결과를 반환한다.
boolean cancel(boolean mayInterruptIfRunning)
mayInterruptIfRunning 이 true 이면 작업이 실행 중일 때도 인터럽트로 작업을 중단할 수 있고,
false 이면 작업이 시작되지 않았거나, 대기 중일 때만 취소된다.
boolean isCancelled()
작업이 취소되었는지 여부를 반환한다.
boolean isDone()
작업이 완료되었는지 여부를 반환한다.
ExecutorService executorService = Executors.newFixedThreadPool(10);
// Future 인스턴스 생성
Future<String> future = executorService.submit(() -> {
// ...
Thread.sleep(10000l);
return "Hello world";
});
// Future 결과가 준비되었는지 확인하고, 실행이 완료되면 데이터를 가져올 수 있다.
if (future.isDone() && !future.isCancelled()) {
try {
str = future.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
// 또한, 주어진 작업에 대해서 타임아웃을 지정할 수 있다.
// 작업이 지정한 시간보다 오래 걸리면 TimeoutException 이 발생한다.
try {
future.get(10, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
e.printStackTrace();
}
6. CountDownLatch (단일 이벤트 대기용으로 사용)
CountDownLatch 는 특정 개수의 작업이 완료될 때까지 하나 이상의 쓰레드가 기다리도록 하는 동기화 도구이다.
재사용이 불가하다.
이 클래스는 멀티스테드 환경에서 작업을 조율하는데 매우 유용하다.
제공 메서드
await()
이 메서드를 호출한 쓰레드는 CountDownLatch 의 Count 가 0이 될 때까지 대기한다.
Count가 0이 되면 대기중인 모든 쓰레드는 실행하게 된다.
countDown()
이 메서드를 호출할 때마가 Count가 1씩 감소한다.
Count 가 0이 되면 대기중인 모든 쓰레드는 실행하게 된다.
getCount()
현재 Count 값을 반환하는 메서드이다.
import java.util.concurrent.CountDownLatch;
public class CountDownLatchExample {
public static void main(String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(3);
Thread worker1 = new Thread(() -> {
try {
System.out.println("Worker 1 is working...");
Thread.sleep(1000);
System.out.println("Worker 1 finished");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
latch.countDown();
}
});
Thread worker2 = new Thread(() -> {
try {
System.out.println("Worker 2 is working...");
Thread.sleep(2000);
System.out.println("Worker 2 finished");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
latch.countDown();
}
});
Thread worker3 = new Thread(() -> {
try {
System.out.println("Worker 3 is working...");
Thread.sleep(3000);
System.out.println("Worker 3 finished");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
latch.countDown();
}
});
worker1.start();
worker2.start();
worker3.start();
// Count 가 3인 latch 에서 작업이 마무리 된 후 latch 를 실행한다.
latch.await();
System.out.println("All workers finished, main thread proceeding");
}
}
7. CyclicBarrier (쓰레드 간 협력이 필요한 경우 사용)
여러 쓰레드가 특정 지점(장벽) 에 도착할 때까지 대기하며, 도착한 후 동작을 실행
해당 객체를 여러번 사용이 가능하다.
제공 메서드
await()
모든 스레드가 도착할 때까지 대기합니다.
reset()
장벽을 초기 상태로 재설정합니다.
getNumberWaiting()
현재 대기 중인 스레드 수를 반환합니다.
isBroken()
장벽이 깨졌는지 여부를 확인합니다.
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierExample {
public static void main(String[] args) {
int numberOfThreads = 3;
CyclicBarrier barrier = new CyclicBarrier(numberOfThreads, () -> {
System.out.println("All threads reached the barrier, let's proceed.");
});
Runnable task = () -> {
try {
System.out.println(Thread.currentThread().getName() + " is doing work");
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName() + " reached the barrier");
barrier.await(); // 모든 스레드가 barrier에 도착할 때까지 대기
System.out.println(Thread.currentThread().getName() + " is proceeding");
} catch (Exception e) {
e.printStackTrace();
}
};
// 세 개의 스레드가 협력하여 동시에 실행
new Thread(task).start();
new Thread(task).start();
new Thread(task).start();
}
}
8. Semaphore (자원에 대한 접근을 제어 시 사용)
자원에 대한 접근을 제어하는 도구로, 제한된 수의 쓰레드만이 자원에 접근할 수 있도록 허용
permit 이라는 개념을 통해서 자원의 접근을 제어
제공 메서드
acquire()
자원을 요청하고, 자원이 허용되면 작업을 진행한다. 자원이 없으면 대기 ..
release()
작업이 끝난 후 자원을 반환하여 다른 쓰레드가 자원에 접근할 수 있도록 한다.
availablePermits()
자원에 접근 가능한 쓰레드 수를 반환한다.
import java.util.concurrent.Semaphore;
public class SemaphoreExample {
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(2); // 동시에 2개의 스레드만 자원에 접근 가능
Runnable task = () -> {
try {
System.out.println(Thread.currentThread().getName() + " is waiting for a permit");
semaphore.acquire(); // 자원 접근 요청
System.out.println(Thread.currentThread().getName() + " acquired a permit");
Thread.sleep(2000); // 자원 사용
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println(Thread.currentThread().getName() + " released a permit");
semaphore.release(); // 자원 반환
}
};
// 4개의 스레드가 있으나, 동시에 2개만 자원에 접근 가능
new Thread(task).start();
new Thread(task).start();
new Thread(task).start();
new Thread(task).start();
}
}
9. ThreadFactory
필요에 따라서 새로운 쓰레드를 만드는 쓰레드 풀의 역할을 한다.
ThreadFactory 에서 newThread 메서드를 사용하게 되면 런타임에 동적으로 새로운 쓰레드를 생성할 수 있다는 장점이 있다.
public class BaeldungThreadFactory implements ThreadFactory {
private int threadId;
private String name;
public BaeldungThreadFactory(String name) {
threadId = 1;
this.name = name;
}
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, name + "-Thread_" + threadId);
LOG.info("created new thread with id : " + threadId +
" and name : " + t.getName());
threadId++;
return t;
}
}
// ...
BaeldungThreadFactory factory = new BaeldungThreadFactory(
"BaeldungThreadFactory");
for (int i = 0; i < 10; i++) {
Thread t = factory.newThread(new Task());
t.start();
}