ForkJoinPool

개념

  • ExecutorService의 구현체로, 워커 스레드를 관리하고 스레드 풀 상태나 성능 정보를 제공하기도 한다.

Work-Stealing Algorithm

  • 스레드의 워크로드를 균형있게 유지하기 위해 work-stealing 알고리즘을 사용한다.

  • ForkJoinPool은 각 워커 스레드마다 가지는 WorkQueue 객체의 배열(WorkQueue[])을 ForkJoinPool에 저장해둔다.

  • WorkQueue[]의 홀수 인덱스에는 ForkJoinWorkerThread가 사용하는 스레드 별 큐가 저장되고, 짝수 인덱스는 외부 스레드가 제출한 작업을 저장하는 공유 큐로 사용된다. 즉, 외부 스레드와 내부 스레드의 작업 큐를 구분한다.

  • 만약 deque가 비어있다면, 다른 바쁜 스레드의 deque의 가장 뒤에 있는(tail) 태스크를 가져오거나 global entry queue에서 큰 단위의 태스크를 가져온다.

commonPool

  • 모든 ForkJoinTask를 위한 기본 스레드 풀로, static 블록을 통해 자바 애플리케이션 구동 시 자동으로 생성된다.

  • 기본적으로 Runtime.getRuntime().availableProcessors() - 1 개수까지 스레드를 생성할 수 있다. 최대 스레드 수는 직접 지정 가능하며, 0 초과 32767 이하여야 한다.

    • 프로세서 수보다 1 적게 설정하는 이유는 시스템의 다른 자원이 사용할 몫을 남겨두기 위함이라고 한다.

  • 작업 큐에 작업이 존재하고 작업을 실행할 스레드가 부족할 때, parallelism 값 이하로 스레드가 생성된다.

    • 코드

      final void signalWork(WorkQueue[] ws, WorkQueue q) {
      		long c; int sp, i; WorkQueue v; Thread p;
      		while ((c = ctl) < 0L) {                       // too few active
      		    if ((sp = (int)c) == 0) {                  // no idle workers
      		        if ((c & ADD_WORKER) != 0L)            // too few workers
      		            tryAddWorker(c);
      		        break;
      		    }
      		    ...
          }
      }
      private void tryAddWorker(long c) {
          boolean add = false;
          do {
              long nc = ((AC_MASK & (c + AC_UNIT)) |
                         (TC_MASK & (c + TC_UNIT)));
              if (ctl == c) {
                  int rs, stop;                 // check if terminating
                  if ((stop = (rs = lockRunState()) & STOP) == 0)
                      add = U.compareAndSwapLong(this, CTL, c, nc);
                  unlockRunState(rs, rs & ~RSLOCK);
                  if (stop != 0)
                      break;
                  if (add) {
                      createWorker();
                      break;
                  }
              }
          } while (((c = ctl) & ADD_WORKER) != 0L && (int)c == 0);
        }
  • ForkJoinWorkerThread 클래스를 통해 스레드가 생성된다.

  • 계속해서 다른 스레드의 CPU 바운드 작업을 훔쳐 실행시키면서 idle 스레드가 없도록 사용하는 것이 목적이다.

  • 스레드는 작업이 완료될 때 까지 공용 풀에 반환되지 않는다. 따라서 I/O 바운드 작업 시에는 절대 사용하지 말아야 한다.

구성 요소

  • Task

    • 하나의 작업을 Task 단위로 구분한다.

  • Methods

    • execute(ForkJoinTask<?>) 메서드를 통해 스레드 풀에 Task를 등록할 수 있다.

    • invoke(ForkJoinTask<T>) 메서드를 통해 스레드 풀에 Task를 등록하고 결과가 나올때까지 기다린 후 반환한다.

Task

ForkJoinTask

  • ForkJoinPool에서 수행되는 가장 기본적인 Task 타입이다.

태스크 시작

  • fork()

    • ForkJoinWorkerThread의 작업 큐 또는 ForkJoinPool의 외부 큐에 Task를 등록할 수 있다.

    • 현재 스레드에서 다른 스레드로 작업을 비동기로 분할하여 보낼 수 있다.

  • invoke()

    • 현재 스레드에서 바로 작업을 수행하여 결과가 나올 때 까지 기다린 후 결과를 반환한다.

  • 태스크를 시작시키고 결과를 얻는 예시는 아래와 같다.

결과 조회

  • join()

    • 메서드 호출 시 fork한 작업의 완료를 기다린다.

    • 이 때 join()을 호출한 스레드가 대기 상태에 들어가면, ForkJoinPool의 다른 스레드가 유휴 상태에 빠져있지 않도록 Work Stealing Algorithm을 활성화한다.

    • 모든 서브태스크 작업이 완료되면 결과를 합병해 최종 결과를 반환한다.

  • get(), get(long, TimeUnit) 메서드로 결과를 조회할 수 있다.

결과 주입

  • complete(V) / completeExceptionally(Throwable) 메서드로 Task를 완료시킬 수 있다.

  • 추상 클래스이므로 ForkJoinPool에 태스크를 맡기려면, 직접 구현체를 만들어 입력해야 한다. 다음은 구현해야 하는 추상 메서드의 종류이다.

    • getRawResult

      • join 메서드에서 반환되는 결과를 반환한다. 작업이 완료되지 않았다면 null을 반환한다.

    • setRawResult

      • 외부에서 호출하는 용도가 아니다. 입력받은 인자를 result로 설정한다.

    • exec

      • 외부에서 호출하는 용도가 아니다. Task의 기본 동작을 수행하도록 한다. 정상적으로 완료되었다면 true를 반환하고, 완료가 필요하지 않거나 완료되지 않았다면 false를 반환한다.

  • 아래는 CompletableFuture에서 사용되는 Completion이라는 ForkJoinTask이다. CompletableFuture에서는 commonPool의 execute 메서드를 통해 태스크를 실행시킨다.

RecursiveTask

  • compute 추상 메서드를 구현해야 하는 추상 클래스이다.

  • compute 메서드에서는 태스크를 서브 태스크로 분할하는 로직과, 더이상 분할 불가일 때 서브 태스크의 결과를 생산할 알고리즘을 정의한다.

ManagedBlocker

  • ForkJoinPool은 기본적으로 논블로킹 작업을 처리하도록 설계되어 있다.

  • 만약 블로킹 작업을 처리해야 한다면, ForkJoinPool의 managedBlock 메서드의 인자로 입력하여 블로킹될 수 있는 태스크를 전달한다.

  • ForkJoinPool의 스레드를 사용하게 되면 블로킹되기 때문에 병렬성을 충분히 보장하지 못하게 된다. 따라서 내부적으로 논블로킹 작업을 처리하는 스레드 개수가 parallelism 이하가 되지 않도록 해야 한다.

  • 이를 위해 기존 스레드가 대기중이라면 이를 활성화하여 블로킹 작업을 할당하고, 모두 사용중이라면 새로운 스레드를 생성해 블로킹 작업을 할당한다.

  • 이 때 스레드 개수는 최대 32767까지만 생성 가능하다.

  • CompletableFuture와의 관계

    • CompletableFuture의 get(), join() 메서드 호출 시 이 ManagedBlocker를 호출하여 블로킹 작업을 위임한다.

    • ForkJoinPool에 속한 스레드가 호출하지 않았다면, 기본적으로 해당 스레드가 대기하게 된다.

    • 만약 get(), join() 메서드가 ForkJoinPool에 속한 스레드에서 호출되었다면 병렬성을 위해 앞서 소개한 방식대로 새로운 스레드를 만들거나 유휴 상태의 스레드를 통해 대기하게 된다.

  • 본 인터페이스에 속한 메서드의 의미와 역할은 다음과 같다.

    • isReleasable()

      • 블로킹이 더 이상 필요 없다면 true를 반환한다.

      • 블로킹 조건이 충족되었는지 확인하는 용도로 사용한다.

    • block()

      • 현재 스레드를 블로킹하고 작업을 수행한다.

      • 예를 들어 락이나 특정 조건, 외부 응답 등을 기다리는 블로킹 작업을 구현한다.

병렬 스트림과 통합

  • 병렬 스트림을 사용하는 경우 ForkJoinPool의 commonPool을 사용하게 된다.

  • 스트림을 재귀적으로 분할하고, 각 서브스트림을 서로 다른 스레드에 할당하고, 결과를 하나의 값으로 합치는 오버헤드가 발생하기 때문에, 코어 간 데이터 전송 시간보다 훨씬 오래걸리는 작업만 병렬화하는것이 바람직하다.

  • 소스 데이터가 크지 않거나, 소스 데이터의 자료구조를 분할하는 데에 오버헤드가 큰 경우 사용하면 안된다.

    • 예를 들어 LinkedList를 소스로 사용하는 경우 모든 요소를 탐색해야 분할이 가능하므로 적절하지 않다.

    • IntStream#iterate 과 같이 정확히 정해진 범위가 없는 경우 사용하면 안된다.

병렬 스트림 동작 방식

  • 스트림을 분리할 때에는 Spliterator를 사용한다. 앞서 살펴본 RecursiveTask가 하나의 Task를 여러 Task로 분리했던 방식과 유사하게 하나의 Spliterator를 여러 Spliterator로 분리한다.

  • 이렇게 분리된 각 태스크는 ForkJoinPool에 의해 병렬적으로 작업을 수행하게 된다.

  • 아래와 같이 단순한 병렬 스트림을 사용 시,Streams$RangeIntSpliterator에 의해 적절한 단위로 쪼개어 작업이 수행된다.

출처

Last updated