생산자 소비자 문제2
Lock Condition - 예제4
생산자가 생산자를 깨우고, 소비자가 소비자를 깨우는 비효율을 어떻게 해결할 수 있을까?
해결방안
핵심은 생산자 스레드는 데이터를 생성하고, 대기중인 소비자 스레드에게 알려주어야 한다. 반대로 소비자 스레드는 데이터를 소비하고, 대기중인 생산자 스레드에게 알려주면 된다.
결국 생산자 스레드가 대기하는 대기 집합과, 소비자 스레드가 대기하는 대기 집합을 둘로 나누면 된다.
이러한 기능을 앞서 학습한
Lock
,ReentrantLock
을 사용하며 된다.
아래 V4 예제는 V3 예제와 동일하게 동작한다. (락의 구현체만 바꾼 것이다)
자바는 1.0 부터 존재한
synchronized
,BLOCKED
상태를 통한 임계영역 관리의 단점을 해결하기 위해서 자바 1.5 부터Lock
인터페이스와ReentrantLock
구현체를 제공한다.
condition
Condition condition = lock.newCondition();
Condition
은ReentrantLock
을 사용하는 스레드가 대기하는 스레드 대기 공간이다.lock.newCondition()
메서드를 호출하면 스레드 대기 공간이 만들어진다.참고로
Object.wait()
에서 사용한 스레드 대기 공간은 모든 객체 인스턴스가 내부에 기본적으로 가지고 있다.반면에,
Lock(ReentrantLock)
을 사용하는 경우 이렇게 스레드 대기 공간을 직접 만들어서 사용해야 한다.
condition.await()
Object.wait()
와 유사한 기능이다. 지정한condition
에 현재 스레드를 대기 (WAITING
) 상태로 보관한다.이때
ReentrantLock
에서 획득한 락을 반납하고 대기 상태로condition
에 보관한다.
condition.signal()
Object.notify()
와 유사한 기능이다. 지정한condition
에서 대기중인 스레드를 하나 깨운다. 깨어난 스레드는condition
에서 빠져나온다.
생산자 소비자 대기 공간 분리 - 예제5 코드
Condition 분리
consumerCond
: 생산자를 위한 스레드 대기 공간producerCond
: 소비자를 위한 스레드 대기 공간
put(data) - 생산자 스레드가 호출
큐에 가득 찬 경우
producerCond.await()
를 호출해서 생산자 스레드를 생산자 전용 스레드 대기 공간에 보관한다.
데이터를 저장한 경우
생산자가 데이터를 생산하면 큐에 데이터가 추가된다. 따라서 소비자를 깨우는 것이 좋다.
consumerCond.signal()
을 호출해서 소비자 전용 스레드 대기 공간에 신호를 보낸다. 이렇게 하면 대기중인 소비자 스레드 하나를 깨워 데이터를 소비할 수 있다.
take() - 소비자 스레드가 호출
큐가 빈 경우
consumerCond.await()
를 호출해서 소비자 스레드를 소비자 전용 스레드가 대기 공간에 보관한다.
데이터를 소비한 경우
소비자가 데이터를 소비한 경우 큐에 여유 공간이 생긴다. 따라서 생산자를 깨우는 것이 좋다.
producerCond.signal()
를 호출해서 생산자 전용 스레드 대기 공간에 신호를 보낸다. 이렇게 하면 대기 중인 생산자 스레드가 하나 깨어나서 데이터를 추가할 수 있다.
핵심은 생산자는 소비자를 깨우고, 소비자는 생산자를 깨운다는 것이다.
생산자 소비자 대기 공간 분리 - 예제5 분석
생산자 실행
소비자 실행
Object.notify() vs Condition.signal()
Obejct.notify()
대기 중인 스레드 중 임의의 하나를 선택해서 깨운다.
스레드가 깨어나는 순서는 정리되어 있지 않으며, JVM 구현에 따라 다르다. 보통은 먼저 들어온 스레드가 먼저 수행되지만, 구현에 따라 다를 수 있다.
synchronized
블록 내에서 모니터 락을 가지고 있는 스레드가 호출해야 한다.
Condition.signal()
대기 중인 스레드 중 하나를 깨우며, 일반적으로는 FIFO 순서로 깨운다.
이 부분은 자바 버전과 구현에 따라 달라질 수 있지만, 보통
Condition
의 구현은Queue
구조를 사용하기 때문에, FIFO 순서로 깨운다.ReentrantLock
을 가지고 있는 스레드가 호출해야 한다.
스레드 대기
다음 내용으로 진행하기 전에 synchronized
, ReentrantLock
의 대기 상태에 대해서 정리해보자.
먼저 synchronized
대기 상태에 대해서 정리해보자.
잘 생각해보면 synchronized
의 대기 상태는 2가지로 분리되어 있다.
synchronized 대기 상태
대기1 : 락 획득 대기
BLOCKED
상태로 락 획득 대기synchronized
를 시작할 때 락이 없으면 대기다른 스레드가
synchronized
를 빠져나갈 때 대기가 풀리며 락 획득 시도
대기2 : wait()
대기
WAITING
상태로 대기wait()
를 호출 했을 때 스레드 대기 집합에서 대기다른 스레드가
notify()
를 호출 했을 때 빠져나감
예제를 통해 대기 상태를 생각해보자.
c2
,c3
는 락 획득을 시도하지만, 모니터 락이 없기 때문에 락을 대기하면BLOCKED
상태가 된다.c1
은 나중에 락을 반납할 것이다. 그러면c2
,c3
중에 하나가 락을 획득해야 한다.
그런데 잘 생각해보면 락을 기다리는 c2
, c3
도 어디선가 관리 되어야 한다.
그래야만 락이 반환되었을 때 JVM이 c2
, c3
중에 하나를 선택해서 락을 제공할 수 있다.
예를 들어서
List
,Set
,Queue
같은 자료구조에 관리가 되어야 한다.
그림에서는 c2
, c3
가 단순히 BLOCKED
상태로 변경만 되었다. 그래서 관리되는 것처럼 보이지는 않는다.
사실은, BLOCKED
상태의 스레드도 자바 내부에서 따로 관리된다.
락 대기 집합 (소비자 스레드 동작)
아래 그림은 이전 그림과 같은 상태를 조금 더 자세하게 그린 그림이다.
그림을 보면 락 대기 집합이라는 곳이 있다. 이곳은 락을 기다리는 BLOCKED
상태의 스레드들을 관리한다.
락 대기 집합은 자바 내부에 구현되어 있기 때문에, 모니터 락과 같이 개발자가 확인하기는 어렵다..
여기서는
BLOCKED
상태의 스레드c2
,c3
가 관리된다.
언젠가 c1
이 락을 반납하면 락 대기 집합에서 관리되는 스레드 중 하나가 락을 획득한다.
아래 그림에서는 c1
이 락을 반납하고 스레드 대기 집합에 들어가면 락 대기 집합에 있는 c2
스레드가 락을 획득하고
동작하기를 반복한다. (c2
가 먼저 락을 획득한다는 보장은 없다.. 그저 보기 편하려고 가정하는 것이다)
락 대기 집합 (생산자 스레드 동작)
p1
이 락을 획득하고 데이터를 저장한 다음 스레드 대기 집합에 이 사실을 알린다.스레드 대기 집합의
c1
이 스레드 대기 집합을 빠져나가고,c1
이 락을 획득하려 하지만 락이 없다.. 따라서 락 대기 집합에서 관리된다.락을 얻어서 락 대기 집합까지 빠져나가야 임계 영역을 수행할 수 있다. (임계 영역에 2개의 스레드가 동작하게 되면 문제가 발생한다)
이후
p1
이 락을 반납하고,c1
은 락을 획득하여 임계 영역을 수행한다.
개념상 락 대기 집합이 1차 대기소이고, 스레드 대기 집합이 2차 대기소이다.
2차 대기소에 있는 스레드는 1차 대기소까지 빠져 나와야 임계 영역에서 로직을 수행할 수 있다.
정리
자바의 모든 객체 인스턴스는 멀티스레드와 임계 영역을 다루기 위해서 내부에 3가지 기본 요소를 가진다.
모니터 락
락 대기 집합(모니터 락 대기 집합)
스레드 대기 집합
여기서 락 대기 집합이 1차 대기소이고, 스레드 대기 집합이 2차 대기소라 생각하면 된다. 2차 대기소에 들어간 스레드는 2차, 1차 대기소를 모두 빠져나와야 임계 영역을 수행할 수 있다.
이 3가지 요소는 서로 맞물려 돌아간다.
synchronized
를 사용한 임계 영역에 들어가려면 모니터 락이 필요하다.모니터 락이 없으면 락 대기 집합에 들어가서
BLOCKED
상태로 락을 기다린다.모니터 락을 반납하면 락 대기 집합에 있는 스레드 중 하나가 락을 획득하고
BLOCKED -> RUNNABLE
상태가 된다.wait()
를 호출해서 스레드 대기 집합에 들어가기 위해서는 모니터 락이 필요하다.스레드 대기 집합에 들어가면 모니터 락을 반납한다.
스레드가
notify()
를 호출하면 스레드 대기 집합에 있는 스레드 중 하나가 스레드 대기 집합을 빠져나온다. 그리고 모니터 락 획득을 시도한다.모니터 락을 획득하면 임계 영역을 수행한다.
모니터 락을 획득하지 못하면 락 대기 집합에 들어가서
BLOCKED
상태로 락을 기다린다.
synchronized vs ReentrantLock 대기
synchronized
와 마찬가지로Lock(ReentrantLock)
도 2가지 단계의 대기 상태가 존재한다. 둘다 같은 개념을 구현한 것이기 때문에 비슷하다. 먼저synchronized
대기를 정리해보자.
synchronized 대기
대기1 : 모니터 락 획득 대기
자바 객체 내부의 락 대기 집합(모니터 락 대기 집합) 에서 관리
BLOCKED
상태로 락 획득 대기synchronized
를 시작할 때 락이 없으면 대기다른 스레드가
synchronized
를 빠져나갈 때 락 획득 시도, 락을 획득하면 락 대기 집합을 빠져나감
대기2 :
wait()
대기wait()
를 호출했을 때 자바 객체 내부의 스레드 대기 집합에서 대기WAITING
상태로 대기다른 스레드가
notify()
를 호출 했을 때 스레드 대기 집합을 빠져나감
Lock(ReentrantLock) 대기
대기1 :
ReentrantLock
락 획득 대기ReentrantLock
의 대기 큐에서 관리WAITING
상태로 락 획득 대기lock.lock()
을 호출 했을 때 락이 없으면 대기다른 스레드가
lock.unlock()
을 호출 했을 때, 대기가 풀리며 락 획득 시도, 락을 획득하면 대기 큐를 빠져나감
대기2 :
await()
대기condition.await()
를 호출 했을 때,condition
객체의 스레드 대기 공간에서 관리WAITING
상태로 대기다른 스레드가
condition.signal()
을 호출했을 때condition
객체의 스레드 대기 공간에서 빠져나감
2단계 대기소
참고로 깨어난 스레드가 바로 실행되는 것이 아니다. synchronized
와 마찬가지로 ReentrantLock
도
대기소가 2단계로 되어 있다.
2단계 대기소인 condition
객체의 스레드 대기 공간을 빠져나온다고 바로 실행되는 것이 아니다.
임계 영역 안에서는 항상 락이 있는 하나의 스레드만 실행될 수 있다. 여기서는 ReentrantLock
의 락을 획득해야 RUNNABLE
상태가 되면서 그 다음 코드를 실행할 수 있다. 락을 획득하지 못하면 WAITING
상태로 락을 획득할 때까지 ReentrantLock
의 대기 큐에서 대기한다.
BlockingQueue
BoundedQueueV5
는 생산자 소비자 문제, 또는 한정된 버퍼라고 알려진 문제, 또는 한정된 버퍼라고 알려진 문제를 매우 효율적으로 해결할 수 있는 자료구조이다.
이 자료구조는 단순한 큐의 기능을 넘어서 스레드를 효과적으로 제어하는 기능도 포함한다.
BoundedQueueV5
를 스레드 관점에서 보면 큐가 특정 조건이 만족될 때까지 스레드의 작업을 차단(Blocking
) 한다.
데이터 추가 차단 : 큐가 가득 차면 데이터 추가 작업(
put()
) 을 시도하는 스레드는 공간이 생길 때까지 차단된다.데이터 획득 차단 : 큐가 비어 있으면 획득 작업(
take()
) 을 시도하는 스레드는 큐에 데이터가 들어올 때까지 차단된다.
그래서 스레드 관점에서 이 큐에 이름을 지어보면 BlockingQueue
라는 이름이 적절하다.
자바는 생산자 소비자 문제 또는 한정된 버퍼라고 불리는 문제를 해결하기 위해서
java.util.concurrent.BlockingQueue
라는 인터페이스와 구현체를 제공한다.
BlockingQueue - 예제6
자바는 생산자 소비자 문제를 해결하기 위해서 java.util.concurrent.BlockingQueue
라는 특별한 멀티스레드 구조를 제공한다. 이것은 이름 그대로 스레드를 차단(Blocking
) 할 수 있는 큐다.
데이터 추가 차단 : 큐가 가득 차면 데이터 추가 작업(
put()
) 을 시도하는 스레드는 공간이 생길 때까지 차단된다.데이터 획득 차단 : 큐가 비어 있으면 획득 작업(
take()
) 을 시도하는 스레드는 큐에 데이터가 들어올 떄까지 차단된다.
java.util.concurrent.BlockingQueue
BlockingQueue
는 인터페이스이고, 다음과 같은 다양한 기능을 제공한다.
데이터 추가 메서드 :
add()
,offer()
,put()
,offer(타임아웃)
데이터 획득 메서드 :
take()
,poll(타임아웃)
,remove(...)
BlockingQueue
인터페이스의 대표적인 구현체
ArrayBlockingQueue
: 배열 기반으로 구현되어 있고, 버퍼의 크기가 고정되어 있다.LinkedBlockingQueue
: 링크 기반으로 구현되어 있고, 버퍼의 크기를 고정할 수도, 또는 무한하게 사용할 수도 있다.
BlockingQueue
를 사용하는BoundedQueueV6_1
를 만들고, 구현체를 바꾸어 실행해보자.
BoundedQueueV5
와 동일하게 동작하는 것을 확인할 수 있다.
ArrayBlockingQueue.put()
주요 코드만 가지고 왔다.
앞서 우리가 구현한 BoundedQueueV5
와 비슷하게 구현되어 있다. ArrayBlockingQueue
는 내부에서 ReentrantLock
을 사용한다. 그리고 생산자 전용 대기실과 소비자 전용 대기실이 있다.
만약 버퍼가 가득 차면 생산자 스레드는 생산자 전용 대기실에서 대기(await()
) 한다. 생산자 스레드가 생산을 완료하면 소비자 전용 대기실에 signal()
신호를 전달한다.
우리가 구현한 기능과 차이가 있다면 인터럽트가 걸릴 수 있도록 lock.lock()
대신 lock.lockInterruptibly()
을 사용한 점과, 내부 자료 구조의 차이 정도이다. (lock.lock()
은 인터럽트를 무시한다)
BlockingQueue - 기능 설명
실무에서 멀티스레드를 사용할때는 응답성이 중요하다.
예를 들어, 대기 상태가 있어도 고객이 중지 요청을 하거나, 또는 너무 오래 대기한 경우 포기하고 빠져나갈 수 있는 방법이 필요하다.
생산자가 무언가 데이터를 생산하는데, 버퍼가 빠지지 않아서 너무 오래 대기해야 한다면, 무한정 기다리는 것 보다는 작업을 포기하고, 고객분께는 "죄송합니다. 현재 시스템에 문제가 있습니다." 라고 하는 것이 더 나은 성택일 것이다.
큐가 가득 찼을 때 생각할 수 있는 선택지는 4가지가 있다.
예외를 던진다. 예외를 받아서 처리한다.
대기하지 않는다.
false
를 반환한다.대기한다.
특정 시간 만큼만 대기한다.
이런 문제를 해결하기 위해서 BlockingQueue
는 각 상황에 맞게 다양한 메서드를 제공한다.
BlockingQueue 의 다양한 기능 (공식 문서)
Throws Exception - 대기시 예외
add(e)
: 지정된 요소를 큐에 추가하며, 큐가 가득 차면IllegalStateException
예외를 던진다.remove()
: 큐에 머리 요소를 반환하지만, 큐가 비어 있으면NoSuchElementException
예외를 던진다.element()
: 큐에 머리 요소를 반환하지만, 요소를 큐에서 제거하지 않는다. 큐가 비어 있으면NoSuchElementException
예외를 던진다.
Special Value - 대기시 즉시 반환
offer(e)
: 지정된 요소를 큐에 추가하려고 시도하며, 큐가 가득 차면false
를 반환한다.poll()
: 큐에서 요소를 제거하고 반환한다. 큐가 비어 있으면null
을 반환한다.peek()
: 큐의 머리 요소를 반환하지만, 요소를 큐에서 제거하지 않는다.
Blocks - 대기
put(e)
: 지정된 요소를 큐에 추가할 떄까지 대기한다. 큐가 가득 차면 공간이 생길 때까지 대기한다.take()
: 큐에서 요소를 제거하고 반환한다. 큐가 비어 있으면 요소가 준비될 때까지 대기한다.
Times Out - 시간 대기
offer(e, time, unit)
: 지정된 요소를 큐에 추가하려고 시도하며, 지정된 시간 동안 큐가 비워지기를 기다리다가, 시간이 초과되면false
를 반환한다.poll(time, unit)
: 큐에서 요소를 제거하고 반환한다. 큐에 요소가 없다면 지정된 시간 동안 요소가 준비되기를 기다리다가 시간이 초과되면null
을 반환한다.
참고로 BlockingQueue 의 모든 대기, 시간 대기 메서드는 인터럽트를 제공한다.
BlockingQueue - 기능 확인
BlockingQueue - 즉시 반환
offer(data)
는 성공하면,true
를 반환하고, 버퍼가 가득 차면 즉시false
를 반환한다.poll()
버퍼에 데이터가 없으면 즉시null
을 반환한다.
BlockingQueue - 시간 대기
offer(data, 시간)
는 성공하면true
를 반환하고, 버퍼가 가득 차서 스레드가 대기해야 하는 상황이면, 지 정한 시간까지 대기한다. 대기 시간을 지나면false
를 반환한다.여기서는 확인을 목적으로 1 나노초(
NANOSECONDS
)로 설정했다.
poll(시간)
버퍼에 데이터가 없어서 스레드가 대기해야 하는 상황이면, 지정한 시간까지 대기한다. 대기 시간을 지나면null
을 반환한다.여기서는 2초(
SECONDS
)로 설정했다.
BlockingQueue - 예외
add(data)
는 성공이면true
를 반환하고, 버퍼가 가득 차면 즉시 예외가 발생한다.java.lang.IllegalStateException : Queue full
remove()
는 버퍼에 데이터가 없으면, 즉시 예외가 발생한다.java.util.NoSuchElementException
Last updated