5장: 구성 단위

동기화된 컬렉션 클래스

  • 모든 메서드를 클래스 내부에 캡슐화하여 내부의 값을 한 번에 한 스레드만 사용할 수 있도록 제어한다.

여러 연산을 단일 연산처럼 사용하기

  • 여러 개의 연산을 묶어 단일 연산처럼 사용해야 하는 경우, 자체적인 스레드 안전성 범위 내에서 처리 불가능하기 때문에 문제가 발생할 수 있다.

  • 이를 해결하려면 클라이언트 측 락을 사용하여 사용자가 정의하려는 메서드를 동기화시켜주어야 한다. 보통 동기화된 컬렉션 클래스는 객체 자체를 락으로 사용(synchronized 키워드)해 내부 메서드들을 동기화하므로 이 때에도 객체 자체를 락으로 사용해도 된다.

컬렉션 순회

  • 컬렉션 순회 시 Iterator를 사용하는 것이 일반적이지만 다른 스레드가 동시에 컬렉션 내부를 변경하는 경우를 지원하지 않는다. 따라서 반복문 수행 도중 변경이 감지되면 ConcurrentModificationException이 발생한다.

  • 예외를 방지하려면 반복문 전체를 락으로 동기화시키는 방법밖에 쓸 수 없다. 하지만 이 경우 반복 작업이 오래걸리면 대기 기간이 길어질 수 있고, 내부적으로 다른 락을 동시에 획득해 사용해야 하는 경우 데드락에 빠질 수도 있다.

  • 동기화된 컬렉션을 사용하면 이러한 문제가 발생할 가능성이 사라진다.

병렬 컬렉션

  • 동기화된 컬렉션 클래스는 컬렉션의 내부 변수에 접근하는 통로를 하나로 만드는 것이므로 여러 스레드가 동시에 사용하려할 때 성능이 떨어진다.

  • 병렬 컬렉션은 여러 스레드에서 동시에 사용하더라도 성능에 문제 없게 설계되었다.

ConcurrentHashMap

  • Lock Striping이라는 세밀한 동기화 방법을 사용해 읽기 연산은 얼마든지 동시에 처리 가능하고, 읽기/쓰기 연산을 동시에 처리할 수 있고, 쓰기 연산은 제한된 개수만큼 동시에 처리할 수 있다.

  • 이 클래스가 만든 Iterator는 ConcurrentModificationException을 발생시키지 않는다. 대신 변경이 발생하더라도 Iterator 생성 시점의 데이터를 기반으로 계속해서 순회하게 된다.

  • size, isEmpty 메서드의 경우 값을 반환하는 시점에 변경이 있을 수 있으므로, 결과가 항상 정확하지는 않다.

  • 맵 객체를 독점해서 사용해야 하는 경우가 있다면 사용할 수 없다.

CopyOnWriteArrayList

  • 변경할 때 마다 복사본을 생성하여 스레드 안전성을 확보한다.

  • 컬렉션이 매우 크다면 복사할 때 성능이 저하되고 메모리를 많이 쓸 수 있다.

  • 따라서 변경보다는 반복문으로 읽는 작업이 많을 때 유용하다.

  • 이벤트 리스너를 등록하고 이벤트가 발생하면 리스너들을 호출하는 경우에 리스너를 담아두는 자료구조로 활용하면 좋다. 리스너 목록을 순회하는 행위는 자주 발생하지만 리스너 추가/제거는 자주 발생하지 않기 때문이다.

블로킹 큐와 Producer-Consumer

  • 블로킹 큐는 put/take 메서드를 가지며, 큐가 가득 차 있다면 put 메서드에서 큐에 삽입 가능할 때 까지 대기하고 큐가 비어있는 상태라면 take 메서드는 큐에서 데이터를 조회할 수 있을 때 까지 대기한다.

  • LinkedBlockingQueue, ArrayBlockingQueue는 LinkedList, ArrayList에 대응되는 블로킹 큐이다.

  • PriorityBlockingQueue는 우선 순위를 기준으로 동작하는 큐이다.

  • SynchronousQueue는 큐에 데이터를 저장하지 않고 바로 컨슈머에 데이터를 넘겨준다. 즉, put/take 메서드는 다른 스레드에 의해 take/put 메서드가 호출될 때 까지 대기한다. 따라서 데이터를 처리할 수 있는 컨슈머가 대기하고 있는 경우에 사용하면 좋다.

직렬 스레드 한정

  • 객체의 소유권을 프로듀서에서 컨슈머로 옮길 때 직렬 스레드 한정 기법을 사용한다.

  • 객체는 특정 스레드 하나만이 소유권을 가질 수 있으며 안전한 방법으로 공개할 경우 객체의 소유권을 이전할 수 있다.

  • 원래 소유권을 갖고 있던 스레드는 소유권이 이전되면 해당 객체에 대한 상태를 알 수 없게 된다.

  • 소유권을 이전받는 스레드는 하나여야 하는데, 블로킹 큐를 사용하면 이를 항상 지키게 된다.

work stealing 패턴

  • 여러 컨슈머들이 각자의 덱에 쌓인 작업을 처리할 때 자신의 덱에 있는 작업을 모두 처리했다면 다른 컨슈머의 덱에 접근하여 맨 뒤에 추가된 작업을 가로채서 처리하는 것을 의미한다.

  • GC 중 힙을 마킹하는 작업을 비롯한 그래프 탐색 알고리즘 구현 시에 적용하면 멀티 스레드를 통해 쉽게 병렬화할 수 있다.

  • 컨슈머가 프로듀서의 역할도 갖고 있는 경우, 작업을 수행하다가 새로운 작업들을 추가해야 할 수도 있다. 이 때에도 쉬는 스레드가 없도록 적절히 분배되어야 한다.

블로킹 메서드 / 인터럽터블 메서드

  • 스레드가 블록되면 동작이 멈춰지고 BLOCKED, WAITING, TIMED_WAITING 상태 중 하나가 된다.

  • 스레드가 블로킹되어 있다면 외부에서 작업 재개 신호를 받아야 다시 RUNNABLE 상태로 돌아간다.

  • 메서드를 실행중인 스레드에게 외부에서 인터럽트를 걸면, 대기 작업으로부터 벗어나게 된다.

  • 인터럽트를 원활하게 처리하도록 메서드를 구현하여 실행 시간이 너무 길어지는 것을 방지할 수 있다.

  • InterruptedException이 발생할 수 있는 메서드는 블로킹 메서드이다.

  • 일반적으로 인터럽트를 처리하는 방법은 다음과 같다.

    • InterruptedException을 그대로 넘겨 책임을 떠넘긴다.

    • InterruptedException을 catch한 후 현재 스레드의 interrupt 메서드를 호출해 인터럽트 상태를 설정하고, 메서드 호출부에서 인터럽트 상황이 발생했음을 알린다. InterruptedException은 넘기지 않는다.

  • 인터럽트를 무시하면 안된다.

동기화 클래스

  • 동기화 클래스는 클래스에 접근하려는 스레드를 제어하기 위한 상태 정보를 가지며, 상태를 변경하기 위한 메서드를 제공하고, 특정 상태에 진입할 때 까지 효과적으로 기다리는 메서드를 제공한다.

  • Latch

    • 한 번 terminal 상태(최종 상태)에 도달하면 모든 스레드가 통과할 수 있도록 하는 클래스이다.

    • 특정 자원이 확보되기 전까지 대기해야 하는 경우, 현재 작업을 수행하기 전에 먼저 수행되어야 하는 작업이 끝나야 하는 경우, 특정 작업에 필요한 모든 자원이 준비되어야 하는 경우 유용하다.

    • 자바에서는 CountDownLatch 클래스를 통해 대기하는 동안 발생해야 하는 이벤트의 개수인 카운터를 지정하고, 카운터 값이 0이 될 때 까지 대기하는 메서드를 제공한다. 대기 중 인터럽트가 발생하거나 타임아웃이 걸릴 때 까지 대기한다.

  • FutureTask

    • Callable 인터페이스 구현체를 입력받아 스레드가 연산을 수행하도록 한다.

    • 시작 전 대기, 시작됨, 종료됨 상태를 가질 수 있으며, 종료되었을 때 정상 종료, 취소, 예외 상황이 될 수 있다.

    • Executor 프레임워크에서 비동기 작업을 실행하고자 할 때 사용한다.

    • 실행 시간이 오래걸리는 작업이 있을 때 실제 결과가 필요한 시점보다 미리 작업을 실행시키는 용도로도 사용한다.

    • 스레드는 생성자나 스태틱 초기화 영역에서 시작하는 것보다는 일반 메서드에서 실행하는 것이 좋다.

  • Semaphore

    • 이진 세마포어는 non-reentrant lock 역할을 하는 뮤텍스로 활용할 수 있다.

    • 자바의 Semaphore 클래스는 카운팅 세마포어 기능을 제공한다.

    • 특정 자원이나 연산을 동시에 사용/호출 가능한 스레드 수를 제어할 때 사용하며, 자원 풀 또는 컬렉션 크기에 제한을 둘 때 유용하다.

    • permit이라는 논리적인 개념을 통해 내부 상태를 관리하며, 생성 메서드에 최초로 생성할 permit의 수를 넘긴다. permit의 수만큼만 동시에 리소스에 접근 가능하다.

    • 이미 permit 수 만큼 스레드가 접근해 있는 상태라면 세마포어 확보를 위한 대기 상태에 들어가며, 인터럽트가 걸리거나 대기 중 타임아웃이 발생할 경우 대기를 멈춘다.

  • 배리어

    • 특정 이벤트가 발생할 때 까지 여러 스레드를 대기 상태로 둔다.

    • 모든 스레드가 배리어 위치에 도달해야 다음 작업을 수행할 수 있도록 한다.

    • CyclicBarrier

      • 반복적으로 배리어 포인트에 도달하는 기능을 제공하며, 한 문제를 작은 문제들로 나누어 반복적으로 병렬 처리할 때 유용하게 사용할 수 있다.

      • await 메서드를 통해 모든 스레드가 동일한 상태에 도달하는 것을 대기하는데, 이 때 인터럽트가 걸리거나 대기 중 타임아웃이 발생하면 await 하던 모든 스레드에게 BrokenBarrierException이 발생한다.

      • 모든 스레드가 배리어 포인트에 도달하면 각 스레드마다 배리어 포인트에 도착한 순서를 알려준다. 이를 통해 반복 작업을 담당할 리더를 선출할 수 있다.

      • 배리어가 통과된 후 대기하던 스레드들을 풀어주기 직전 수행할 Runnable 작업을 할당할 수도 있다.

    • Exchanger

      • 두 스레드가 배리어 포인트에 도달하면 서로 갖고 있던 값을 교환하는 클래스이다.

      • 예를 들어 한 스레드는 버퍼에 값을 채워넣고 한 스레드는 버퍼의 값을 빼내어 사용하는 경우, 두 스레드가 모두 배리어 포인트에 도달할 때 마다 버퍼를 교환하도록 할 수 있다.

      • 스레드 간 값을 넘길 때 안전한 공개 방법으로 넘기므로 동기화 문제가 발생하지 않는다.

효율적이고 확장성 있는 결과 캐싱

  • 다음은 멀티 스레드 환경에서 스레드 안전하고 효율적이고 확장성 있는 캐싱을 구현한 예시이다.

    • ConcurrentHashMap을 통해 여러 스레드가 결과를 담는 Future를 추가/접근/삭제해도 안전하도록 한다.

    • ConcurrentHashMap에 V 자체를 저장하는 대신 Future를 저장하여 값 획득 중인지 아닌지 여부를 미리 알 수 있도록 한다. 이를 통해 동일한 값에 대한 연산을 여러 스레드에서 동시에 수행되는 것을 일부 막는다.

    • putIfAbsent를 통해 동시에 동일한 값에 대한 Computable을 여러 스레드에서 수행하지 않도록 한다.

public class Memoizer<A, V> implements Computable<A, V> {
    private final ConcurrentMap<A, Future<V>> cache = new ConcurrentHashMap<>();
    private final Computable<A, V> c;
    
    public V compute(final A arg) throws InterruptedException {
        while (true) {
            Future<V> f = cache.get(arg);
            if (f == null) {
                Callable<V> eval = Computable::compute;
                FutureTask<V> ft = new FutureTask<>(eval);
                f = cache.putIfAbsent(arg, ft);
                if (f == null) {
                    f = ft;
                    ft.run();
                }
            }
            try {
                return f.get();
            } catch (CancellationException e) {
                cache.remove(arg, f);
            } catch (ExecutionException e) {
                throw new RuntimeException(e);
            }
        }
    }
    
}

Last updated