7장: 병렬 데이터 처리와 성능
스트림의 꽃, 병렬 처리 과정을 알아보자.
자바 7 이전에는 데이터를 서브 파트로 분할해 각각의 스레드로 할당하고 적절한 동기화 처리를 하고, 마지막으로 결과를 모두 합쳐야 하는 것이 개발자의 몫이었다.
자바 7은 포크/조인 프레임워크 기능을 통해 더 쉽게 병렬화를 수행하며 에러를 최소화할 수 있도록 한다.
병렬 스트림
각각의 스레드에서 처리할 수 있도록 스트림 요소를 여러 청크로 분할한 스트림이다.
컬렉션에 parallelStream을 호출하면 병렬 스트림이 생성된다.
순차 스트림을 병렬 스트림으로 변환
순차 스트림에 parallel 메서드를 호출하면 기존의 함수형 리듀싱 연산이 병렬로 처리된다. 즉, 리듀싱 연산을 여러 청크로 나누어 병렬 수행이 가능하다.
parallel() 메서드와 sequential() 메서드를 사용해 병렬로 실행할 연산과 순차로 실행할 연산을 제어할 수 있다.
두 메서드 중 최종적으로 호출된 메서드가 전체 파이프라인에 영향을 미친다.
아래의 경우 parallel() 메서드가 마지막에 호출되었으므로 파이프라인은 전체적으로 병렬로 실행된다.
스레드 풀 설정
스트림의 parallel 메서드에서 병렬로 작업을 수행하는 스레드는 ForkJoinPool에 의해 관리된다.
보통 프로세서 수에 따라 스레드 개수를 자동으로 설정하기 때문에 특별한 이유가 없으면 기본 값 사용을 권장한다.
스트림 성능 측정
병렬화를 한다고 반드시 성능이 좋아지는 것이 아니므로 직접 측정해보아야 한다.
JMH를 사용해 작은 벤치마크를 구현할 수 있다.
JVM으로 실행되는 프로그램의 벤치마크 과정은 핫스팟이 바이트 코드 최적화를 위해 필요한 웜업 시간, GC로 인한 오버헤드 등으로 영향을 받기 때문에 어려운 편이다.
병렬 처리 시 조심해야 할 점
iterate()와 같이 순차적인 연산은 이전 연산의 결과에 따라 다음 함수의 입력이 달라지기 때문에 청크로 분할할 수 없다.
따라서 병렬 처리 시에는 IntStream.rangeClosed() 와 같이 청크로 분할이 용이한 데이터 소스를 사용해야 한다.
병렬화 이용 시 스트림을 재귀적으로 분할, 각 서브스트림을 서로 다른 스레드에 할당, 결과를 하나의 값으로 합치는 오버헤드가 발생한다.
따라서 코어 간 데이터 전송 시간보다 훨씬 오래걸리는 작업만 병렬화하는것이 바람직하다.
공유된 상태(누적자)를 바꾸는 알고리즘에서 제대로 동기화를 해주지 않는다면 문제가 발생하게 된다.
병렬 스트림 효과적으로 사용하기
장비의 사양과 연산의 특성 등 상황에 따라 천차만별이기 때문에 어떤 상황에서 병렬 스트림을 사용할 지에 대한 정확한 가이드는 없다.
확신이 서지 않을때는 순차 스트림과 병렬 스트림 구현 시의 성능을 직접 측정한다.
오토박싱과 언박싱은 성능을 크게 저하시킬 수 있는 요소이므로 주의해야 하며, 기본형 특화 스트림(IntStream, LongStream, DoubleStream)을 사용하는 것이 좋다.
limit이나 findFirst처럼 요소의 순서에 의존하는 연산은 병렬 스트림에서 성능이 더 떨어진다. 스트림 요소의 순서가 상관없다면 unordered를 호출해서 비정렬된 스트림을 얻은 후 limit을 호출하는 것이 더 효율적이다.
스트림에서 수행하는 전체 파이프라인 연산 비용을 고려해 처리해야 할 요소 수가 많고 한 요소 당 처리 비용이 높다면 병렬 스트림으로 성능을 개선할 여지가 있다.
소량의 데이터에서는 병렬화 과정의 부가 비용을 상쇄하지 못하므로 병렬스트림을 쓰지 말자.
스트림을 구성하는 자료구조가 적절한지 확인한다.
ArrayList는 요소를 탐색하지 않고도 분할할 수 있지만 LinkedList는 모든 요소를 탐색해야 분할할 수 있다.
range 팩토리 메서드로 만든 기본형 스트림이나 커스텀 Spliterator를 구현하면 쉽게 분해할 수 있다.
스트림의 특성과 파이프라인 중간 연산이 스트림의 특성을 어떻게 바꾸는지에 따라 분해 과정의 성능이 달라질 수 있다.
SIZED 스트림은 정확히 같은 크기의 두 스트림으로 분할할 수 있으므로 효과적으로 병렬 처리가 가능하다.
필터 연산이 있으면 스트림의 길이를 예측할 수 없어 분할이 어려우므로 병렬 처리가 어려워진다.
최종 연산의 병합 과정 비용이 비싸다면 병렬 스트림으로 얻은 이익이 상쇄될 수 있다.
포크/조인 프레임워크
병렬화할 수 있는 작업을 재귀적으로 작은 작업으로 분할하여 서브태스크로 각 스레드에 분산 할당하여 처리한 뒤, 각각의 결과를 합쳐서 전체 결과로 만든다.
RecursiveTask 활용
스레드풀을 이용하려면 RecursiveTask<R>의 서브클래스를 만들어 추상메서드 compute()를 구현해야 한다.
compute()
태스크를 서브 태스크로 분할하는 로직과, 더이상 분할 불가일 때 서브 태스크의 결과를 생산할 알고리즘을 정의한다.
병렬로 합계 구하기
RecursiveTask를 구현하여 포크/조인 프레임워크를 통해 범위의 숫자를 더할 수 있다.
numbers라는 배열을 입력받아 해당 배열의 합계를 구해야 한다.
한 태스크의 요소가 10000개 이하가 될 때까지 태스크를 분할하고, 이후 각 태스크의 작업 결과를 합친다.
아래와 같이 ForkJoinTask를 ForkJoinPool의 invoke 메서드로 전달하면, compute 메서드를 실행하여 태스크 결과를 얻을 수 있다.
일반적으로 애플리케이션에서는 둘 이상의 ForkJoinPool을 사용하지 않는다.
즉, 소프트웨어의 필요한 곳에서 언제든 가져다 쓸 수 있도록 ForkJoinPool을 한 번만 인스턴스화해서 정적 필드에 싱글턴으로 저장한다.
ForkJoinPool의 인스턴스 생성 시 인자를 주지 않으면 JVM에서 이용할 수 있는 모든 프로세서가 자유롭게 풀에 접근할 수 있다는 의미이다.
ForkJoinPool은 기본적으로 Runtime.availableProcessors의 반환값으로 풀에 사용할 스레드 수를 결정하는데, 이 반환값에는 하이퍼스레딩과 관련된 가상 프로세서도 개수에 포함이 된다.
포크/조인 프레임워크 제대로 사용하기
두 서브태스크가 모두 시작된 다음에 join을 호출할 것
join 메서드를 태스크에 호출하면 태스크가 생산하는 결과가 준비될 때까지 호출자를 블록시킨다.
한 서브태스크가 시작되지 않았다면, 각각의 서브태스크가 다른 태스크의 종료를 기다리는 일이 발생할 수 있다.
RecursiveTask 내에서는 compute나 fork 메서드를 사용하며, 순차코드에서 병렬 계산을 시작할때만 ForkJoinPool의 invoke 메서드를 사용해야 한다.
서브태스크에 fork 메서드를 호출해서 ForkJoinPool의 일정을 조절할 수 있다.
한쪽 작업에만 fork를 호출하고 다른쪽에는 compute를 호출하면, 한 태스크에는 같은 스레드를 재사용할 수 있으므로 불필요한 오버헤드를 피할 수 있다.
포크/조인 프레임워크의 병렬 계산은 디버깅하기 어렵다. fork라 불리는 스레드에서 compute를 호출하므로 스택 트레이스가 도움이 되지 않는다.
병렬 처리로 성능을 개선하려면 태스크를 여러 독립적인 서브태스크로 분할할 수 있어야 하며, 각 서브태스크의 실행 시간은 새로운 태스크를 포킹하는데에 드는 시간보다 길어야한다.
I/O 작업을 한 서브태스크에 할당하고, 다른 서브태스크에서는 계산을 실행할 수 있다.
작업 훔치기
이론적으로는 CPU의 코어 개수만큼 병렬화된 태스크로 작업부하를 분할하면 모든 코어에서 태스크를 실행할 것이고, 같은 시간에 종료될 것이라고 생각할 수 있다.
하지만 디스크 접근 속도 저하, 외부 서비스 호출 지연 등의 이유로 각각의 서브태스크의 작업완료 시간이 크게 달라질 수 있다.
작업 훔치기
ForkJoinPool의 모든 스레드를 공정하게 분할하기 위해, 어떤 스레드가 자신에게 할당된 작업이 모두 끝났다면 다른 스레드 작업 큐의 tail에서 작업을 가져와 처리한다.
실제 코어 수 보다 더 잘게 나누면, 스레드 간의 작업부하를 비슷한 수준으로 유지할 수 있다.
결론은, 코어 개수와 관계없이 적절한 크기로 분활된 많은 태스크를 포킹하는 것이 바람직하다.
Spliterator 인터페이스
자동으로 스트림을 분할하는 기법
Iterator처럼 소스의 요소 탐색 기능을 제공하며, 병렬 작업에 특화되어 있다.
Spliterator 동작 방법을 잘 이해한다면 병렬 스트림 동작에 대한 이해를 높일 수 있을 것!
tryAdvance: Spliterator의 요소를 순차적으로 소비하면서 탐색해야할 요소가 있으면 참을 반환
trySplit: Spliterator의 일부 요소(자신이 반환한 요소)를 분할해서 두 번째 Spliterator를 생성
estimateSize: 탐색해야할 요소 수
characteristics: Spliterator의 특성을 정의
분할 과정
초기 Spliterator에 trySplit을 호출하여 하위 Spliterator를 생성한다. 이 과정을 아래 그림과 같이 반복하고, trySplit 메서드에서 null이 나오면 재귀 분할 과정이 종료된다.
Spliterator 특성
characteristics 메서드는 Spliterator 자체의 특성 집합을 포함하는 int를 반환한다.
특성 | 의미 |
---|---|
ORDERED | 리스트처럼 정해진 순서가 있으므로 요소를 탐색하고 분할할 때 순서에 유의해야 함 |
DISTINCT | x, y 두 요소를 방문했을 때 x.equals(y)는 항상 false를 반환 |
SORTED | 탐색된 요소는 미리 정의된 정렬 순서를 따른다. |
SIZED | 크기가 알려진 소스로 생성했으므로 estimatedSize()는 정확한 값을 반환한다. |
NON-NULL | 탐색하는 모든 요소는 null이 아니다. |
IMMUTABLE | 이 Spliterator의 소스는 불변이므로 요소를 탐색하는 동안 추가/삭제/수정할 수 없다. |
CONCURRENT | 동기화 없이 Spliterator의 소스를 여러 스레드에서 동시에 고칠 수 있다. |
SUBSIZED | 이 Spliterator와 분할되어 생성되는 모든 spliterator는 SIZED 특성을 갖는다. |
커스텀 Spliterator 구현
문자열의 단어 수 계산
문자열에 해당하는 단어 개수를 반환하는 예제를 통해 커스텀 Spliterator의 구현을 알아보자.
커스텀 Spliterator의 필요성
병렬로 문자열의 단어 수를 계산하려 할 때 원본 문자열을 임의의 위치에서 둘로 나누면 하나의 단어를 둘로 계산하는 상황이 발생할 수 있다.
따라서 문자열을 임의의 위치에서 분할하지 않고 단어가 끝나는 위치에서만 분할하도록 커스텀 Spliterator 클래스를 만들어 trySplit() 메서드를 구현해주어야 한다.
먼저 문자열의 단어 수를 계산하기 위해 변수 상태를 캡슐화하는 WordCounter 클래스를 만들자.
스트림을 탐색하며 새로운 문자를 찾을때마다
accumulate
메서드를 호출한다.accumulate
메서드는 새로운 WordCounter 클래스를 어떤 상태로 생성할 것인지 정의한다.공백이 아닌 문자이고 직전 글자는 공백이었을 경우에만 counter(단어의 개수)를 추가한다.
combine
메서드는 서브스트림으로 처리한 결과들을 어떻게 합칠 지 정의한다.
병렬 스트림을 사용하기에 앞서 간단하게 리듀싱 연산을 통해 문자열의 단어 개수를 찾아보자.
이제 앞서 말했던 병렬 처리의 문제를 해결하기 위해 Spliterator 구현 클래스를 만들자.
tryAdvance
: 탐색 할 요소가 남아있다면 true 반환trySplit
: 요소를 분할해서 Spliterator 생성해 반환, 공백이 아닐 때에는 인덱스를 넘어가 해당 단어가 끝난 지점으로 이동한다.estimateSize
: 탐색해야 할 요소의 수를 반환characteristics
: Spliterator 객체에 포함된 모든 특성값의 합을 반환ORDERED : 문자열의 순서가 유의미함
SIZED : estimatedSize 메서드의 반환값이 정확함
NONNULL : 문자열에는 null이 존재하지 않음
IMMUTABLE : 문자열 자체가 불변 클래스이므로 파싱하며 속성이 추가되지 않음
드디어 Spliterator를 사용해 병렬 스트림을 생성한 뒤 단어 수를 계산할 수 있다.
Last updated