5장: 구성 단위
동기화된 컬렉션 클래스
모든 메서드를 클래스 내부에 캡슐화하여 내부의 값을 한 번에 한 스레드만 사용할 수 있도록 제어한다.
여러 연산을 단일 연산처럼 사용하기
여러 개의 연산을 묶어 단일 연산처럼 사용해야 하는 경우, 자체적인 스레드 안전성 범위 내에서 처리 불가능하기 때문에 문제가 발생할 수 있다.
이를 해결하려면 클라이언트 측 락을 사용하여 사용자가 정의하려는 메서드를 동기화시켜주어야 한다. 보통 동기화된 컬렉션 클래스는 객체 자체를 락으로 사용(synchronized 키워드)해 내부 메서드들을 동기화하므로 이 때에도 객체 자체를 락으로 사용해도 된다.
컬렉션 순회
컬렉션 순회 시 Iterator를 사용하는 것이 일반적이지만 다른 스레드가 동시에 컬렉션 내부를 변경하는 경우를 지원하지 않는다. 따라서 반복문 수행 도중 변경이 감지되면 ConcurrentModificationException이 발생한다.
예외를 방지하려면 반복문 전체를 락으로 동기화시키는 방법밖에 쓸 수 없다. 하지만 이 경우 반복 작업이 오래걸리면 대기 기간이 길어질 수 있고, 내부적으로 다른 락을 동시에 획득해 사용해야 하는 경우 데드락에 빠질 수도 있다.
동기화된 컬렉션을 사용하면 이러한 문제가 발생할 가능성이 사라진다.
병렬 컬렉션
동기화된 컬렉션 클래스는 컬렉션의 내부 변수에 접근하는 통로를 하나로 만드는 것이므로 여러 스레드가 동시에 사용하려할 때 성능이 떨어진다.
병렬 컬렉션은 여러 스레드에서 동시에 사용하더라도 성능에 문제 없게 설계되었다.
ConcurrentHashMap
Lock Striping이라는 세밀한 동기화 방법을 사용해 읽기 연산은 얼마든지 동시에 처리 가능하고, 읽기/쓰기 연산을 동시에 처리할 수 있고, 쓰기 연산은 제한된 개수만큼 동시에 처리할 수 있다.이 클래스가 만든 Iterator는 ConcurrentModificationException을 발생시키지 않는다. 대신 변경이 발생하더라도 Iterator 생성 시점의 데이터를 기반으로 계속해서 순회하게 된다.
size, isEmpty 메서드의 경우 값을 반환하는 시점에 변경이 있을 수 있으므로, 결과가 항상 정확하지는 않다.
맵 객체를 독점해서 사용해야 하는 경우가 있다면 사용할 수 없다.
CopyOnWriteArrayList
변경할 때 마다 복사본을 생성하여 스레드 안전성을 확보한다.
컬렉션이 매우 크다면 복사할 때 성능이 저하되고 메모리를 많이 쓸 수 있다.
따라서 변경보다는 반복문으로 읽는 작업이 많을 때 유용하다.
이벤트 리스너를 등록하고 이벤트가 발생하면 리스너들을 호출하는 경우에 리스너를 담아두는 자료구조로 활용하면 좋다. 리스너 목록을 순회하는 행위는 자주 발생하지만 리스너 추가/제거는 자주 발생하지 않기 때문이다.
블로킹 큐와 Producer-Consumer
블로킹 큐는 put/take 메서드를 가지며, 큐가 가득 차 있다면 put 메서드에서 큐에 삽입 가능할 때 까지 대기하고 큐가 비어있는 상태라면 take 메서드는 큐에서 데이터를 조회할 수 있을 때 까지 대기한다.
LinkedBlockingQueue, ArrayBlockingQueue는 LinkedList, ArrayList에 대응되는 블로킹 큐이다.
PriorityBlockingQueue는 우선 순위를 기준으로 동작하는 큐이다.
SynchronousQueue는 큐에 데이터를 저장하지 않고 바로 컨슈머에 데이터를 넘겨준다. 즉, put/take 메서드는 다른 스레드에 의해 take/put 메서드가 호출될 때 까지 대기한다. 따라서 데이터를 처리할 수 있는 컨슈머가 대기하고 있는 경우에 사용하면 좋다.
직렬 스레드 한정
객체의 소유권을 프로듀서에서 컨슈머로 옮길 때 직렬 스레드 한정 기법을 사용한다.
객체는 특정 스레드 하나만이 소유권을 가질 수 있으며 안전한 방법으로 공개할 경우 객체의 소유권을 이전할 수 있다.
원래 소유권을 갖고 있던 스레드는 소유권이 이전되면 해당 객체에 대한 상태를 알 수 없게 된다.
소유권을 이전받는 스레드는 하나여야 하는데, 블로킹 큐를 사용하면 이를 항상 지키게 된다.
work stealing 패턴
여러 컨슈머들이 각자의 덱에 쌓인 작업을 처리할 때 자신의 덱에 있는 작업을 모두 처리했다면 다른 컨슈머의 덱에 접근하여 맨 뒤에 추가된 작업을 가로채서 처리하는 것을 의미한다.
GC 중 힙을 마킹하는 작업을 비롯한 그래프 탐색 알고리즘 구현 시에 적용하면 멀티 스레드를 통해 쉽게 병렬화할 수 있다.
컨슈머가 프로듀서의 역할도 갖고 있는 경우, 작업을 수행하다가 새로운 작업들을 추가해야 할 수도 있다. 이 때에도 쉬는 스레드가 없도록 적절히 분배되어야 한다.
블로킹 메서드 / 인터럽터블 메서드
스레드가 블록되면 동작이 멈춰지고 BLOCKED, WAITING, TIMED_WAITING 상태 중 하나가 된다.
스레드가 블로킹되어 있다면 외부에서 작업 재개 신호를 받아야 다시 RUNNABLE 상태로 돌아간다.
메서드를 실행중인 스레드에게 외부에서 인터럽트를 걸면, 대기 작업으로부터 벗어나게 된다.
인터럽트를 원활하게 처리하도록 메서드를 구현하여 실행 시간이 너무 길어지는 것을 방지할 수 있다.
InterruptedException이 발생할 수 있는 메서드는 블로킹 메서드이다.
일반적으로 인터럽트를 처리하는 방법은 다음과 같다.
InterruptedException을 그대로 넘겨 책임을 떠넘긴다.
InterruptedException을 catch한 후 현재 스레드의 interrupt 메서드를 호출해 인터럽트 상태를 설정하고, 메서드 호출부에서 인터럽트 상황이 발생했음을 알린다. InterruptedException은 넘기지 않는다.
인터럽트를 무시하면 안된다.
동기화 클래스
동기화 클래스는 클래스에 접근하려는 스레드를 제어하기 위한 상태 정보를 가지며, 상태를 변경하기 위한 메서드를 제공하고, 특정 상태에 진입할 때 까지 효과적으로 기다리는 메서드를 제공한다.
Latch
한 번 terminal 상태(최종 상태)에 도달하면 모든 스레드가 통과할 수 있도록 하는 클래스이다.
특정 자원이 확보되기 전까지 대기해야 하는 경우, 현재 작업을 수행하기 전에 먼저 수행되어야 하는 작업이 끝나야 하는 경우, 특정 작업에 필요한 모든 자원이 준비되어야 하는 경우 유용하다.
자바에서는 CountDownLatch 클래스를 통해 대기하는 동안 발생해야 하는 이벤트의 개수인 카운터를 지정하고, 카운터 값이 0이 될 때 까지 대기하는 메서드를 제공한다. 대기 중 인터럽트가 발생하거나 타임아웃이 걸릴 때 까지 대기한다.
FutureTask
Callable 인터페이스 구현체를 입력받아 스레드가 연산을 수행하도록 한다.
시작 전 대기, 시작됨, 종료됨 상태를 가질 수 있으며, 종료되었을 때 정상 종료, 취소, 예외 상황이 될 수 있다.
Executor 프레임워크에서 비동기 작업을 실행하고자 할 때 사용한다.
실행 시간이 오래걸리는 작업이 있을 때 실제 결과가 필요한 시점보다 미리 작업을 실행시키는 용도로도 사용한다.
스레드는 생성자나 스태틱 초기화 영역에서 시작하는 것보다는 일반 메서드에서 실행하는 것이 좋다.
Semaphore
이진 세마포어는 non-reentrant lock 역할을 하는 뮤텍스로 활용할 수 있다.
자바의 Semaphore 클래스는 카운팅 세마포어 기능을 제공한다.
특정 자원이나 연산을 동시에 사용/호출 가능한 스레드 수를 제어할 때 사용하며, 자원 풀 또는 컬렉션 크기에 제한을 둘 때 유용하다.
permit이라는 논리적인 개념을 통해 내부 상태를 관리하며, 생성 메서드에 최초로 생성할 permit의 수를 넘긴다. permit의 수만큼만 동시에 리소스에 접근 가능하다.
이미 permit 수 만큼 스레드가 접근해 있는 상태라면 세마포어 확보를 위한 대기 상태에 들어가며, 인터럽트가 걸리거나 대기 중 타임아웃이 발생할 경우 대기를 멈춘다.
배리어
특정 이벤트가 발생할 때 까지 여러 스레드를 대기 상태로 둔다.
모든 스레드가 배리어 위치에 도달해야 다음 작업을 수행할 수 있도록 한다.
CyclicBarrier
반복적으로 배리어 포인트에 도달하는 기능을 제공하며, 한 문제를 작은 문제들로 나누어 반복적으로 병렬 처리할 때 유용하게 사용할 수 있다.
await 메서드를 통해 모든 스레드가 동일한 상태에 도달하는 것을 대기하는데, 이 때 인터럽트가 걸리거나 대기 중 타임아웃이 발생하면 await 하던 모든 스레드에게
BrokenBarrierException이 발생한다.모든 스레드가 배리어 포인트에 도달하면 각 스레드마다 배리어 포인트에 도착한 순서를 알려준다. 이를 통해 반복 작업을 담당할 리더를 선출할 수 있다.
배리어가 통과된 후 대기하던 스레드들을 풀어주기 직전 수행할 Runnable 작업을 할당할 수도 있다.
Exchanger
두 스레드가 배리어 포인트에 도달하면 서로 갖고 있던 값을 교환하는 클래스이다.
예를 들어 한 스레드는 버퍼에 값을 채워넣고 한 스레드는 버퍼의 값을 빼내어 사용하는 경우, 두 스레드가 모두 배리어 포인트에 도달할 때 마다 버퍼를 교환하도록 할 수 있다.
스레드 간 값을 넘길 때 안전한 공개 방법으로 넘기므로 동기화 문제가 발생하지 않는다.
효율적이고 확장성 있는 결과 캐싱
다음은 멀티 스레드 환경에서 스레드 안전하고 효율적이고 확장성 있는 캐싱을 구현한 예시이다.
ConcurrentHashMap을 통해 여러 스레드가 결과를 담는 Future를 추가/접근/삭제해도 안전하도록 한다.
ConcurrentHashMap에 V 자체를 저장하는 대신 Future를 저장하여 값 획득 중인지 아닌지 여부를 미리 알 수 있도록 한다. 이를 통해 동일한 값에 대한 연산을 여러 스레드에서 동시에 수행되는 것을 일부 막는다.
putIfAbsent를 통해 동시에 동일한 값에 대한 Computable을 여러 스레드에서 수행하지 않도록 한다.
public class Memoizer<A, V> implements Computable<A, V> {
private final ConcurrentMap<A, Future<V>> cache = new ConcurrentHashMap<>();
private final Computable<A, V> c;
public V compute(final A arg) throws InterruptedException {
while (true) {
Future<V> f = cache.get(arg);
if (f == null) {
Callable<V> eval = Computable::compute;
FutureTask<V> ft = new FutureTask<>(eval);
f = cache.putIfAbsent(arg, ft);
if (f == null) {
f = ft;
ft.run();
}
}
try {
return f.get();
} catch (CancellationException e) {
cache.remove(arg, f);
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
}
}
}Last updated