🐾
개발자국
  • 🐶ABOUT
  • 🚲프로그래밍
    • 객체 지향 프로그래밍
    • 오브젝트
      • 1장: 객체, 설계
      • 2장: 객체지향 프로그래밍
      • 3장: 역할, 책임, 협력
      • 4장: 설계 품질과 트레이드오프
      • 5장: 책임 할당하기
      • 6장: 메시지와 인터페이스
      • 7장: 객체 분해
      • 8장: 의존성 관리하기
      • 9장: 유연한 설계
      • 10장: 상속과 코드 재사용
      • 11장: 합성과 유연한 설계
      • 12장: 다형성
      • 13장: 서브클래싱과 서브타이핑
      • 14장: 일관성 있는 협력
      • 15장: 디자인 패턴과 프레임워크
    • 도메인 주도 개발 시작하기
      • 1장: 도메인 모델 시작하기
      • 2장: 아키텍처 개요
      • 3장: 애그리거트
      • 4장: 리포지토리와 모델 구현
      • 5장: 스프링 데이터 JPA를 이용한 조회 기능
      • 6장: 응용 서비스와 표현 영역
      • 7장: 도메인 서비스
      • 8장: 애그리거트 트랜잭션 관리
      • 9장: 도메인 모델과 바운디드 컨텍스트
      • 10장: 이벤트
      • 11장: CQRS
    • 클린 아키텍처
      • 만들면서 배우는 클린 아키텍처
        • 계층형 아키텍처의 문제와 의존성 역전
        • 유스케이스
        • 웹 어댑터
        • 영속성 어댑터
        • 아키텍처 요소 테스트
        • 경계 간 매핑 전략
        • 애플리케이션 조립
        • 아키텍처 경계 강제하기
        • 지름길 사용하기
        • 아키텍처 스타일 결정하기
    • 디자인 패턴
      • 생성(Creational) 패턴
        • 팩토리 패턴
        • 싱글톤 패턴
        • 빌더 패턴
        • 프로토타입 패턴
      • 행동(Behavioral) 패턴
        • 전략 패턴
        • 옵저버 패턴
        • 커맨드 패턴
        • 템플릿 메서드 패턴
        • 반복자 패턴
        • 상태 패턴
        • 책임 연쇄 패턴
        • 인터프리터 패턴
        • 중재자 패턴
        • 메멘토 패턴
        • 비지터 패턴
      • 구조(Structural) 패턴
        • 데코레이터 패턴
        • 어댑터 패턴
        • 퍼사드 패턴
        • 컴포지트 패턴
        • 프록시 패턴
        • 브리지 패턴
        • 플라이웨이트 패턴
      • 복합 패턴
  • 시스템 설계
    • 1. 사용자 수에 따른 규모 확장성
    • 2. 개략적 규모 추정
    • 3. 시스템 설계 접근법
    • 4. 처리율 제한 장치
    • 5. 안정 해시
    • 6. 키-값 저장소
    • 7. 유일한 ID 생성기
    • 8. URL 단축기
    • 9. 웹 크롤러
    • 10. 알림 시스템
    • 11. 뉴스 피드
    • 12. 채팅 시스템
    • 13. 검색어 자동완성
    • 14. 유튜브 스트리밍
    • 15. 구글 드라이브
    • ⭐️. 캐싱 전략
    • ⭐️. 재고 시스템으로 알아보는 동시성이슈 해결방법
    • ⭐️. 실습으로 배우는 선착순 이벤트 시스템
  • 🏝️자바
    • 자바의 내부 속으로
      • Java 언어의 특징
      • JDK
      • JVM
        • 메모리 관리
        • Garbage Collector
          • 기본 동작
          • Heap 영역을 제외한 GC 처리 영역
          • (WIP) GC 알고리즘
        • 클래스 로더
      • 자바 실행 방식
      • 메모리 모델과 관리
      • 바이트 코드 조작
      • 리플렉션
      • 다이나믹 프록시
      • 어노테이션 프로세서
    • 자바의 기본
      • 데이터 타입, 변수, 배열
    • 이펙티브 자바
      • 2장: 객체의 생성과 파괴
        • item 1) 생성자 대신 정적 팩토리 메서드를 고려하라
        • item2) 생성자에 매개변수가 많다면 빌더를 고려하라
        • item3) private 생성자나 열거 타입으로 싱글톤임을 보증하라
        • item4) 인스턴스화를 막으려면 private 생성자를 사용
        • item5) 자원을 직접 명시하는 대신 의존 객체 주입 사용
        • item6) 불필요한 객체 생성 지양
        • item7) 다 쓴 객체는 참조 해제하라
        • item8) finalizer와 cleaner 사용 자제
        • item9) try-with-resources를 사용하자
      • 3장: 모든 객체의 공통 메서드
        • item 10) equals는 일반 규약을 지켜 재정의 하자
        • item 11) equals 재정의 시 hashCode도 재정의하라
        • item 12) 항상 toString을 재정의할 것
        • item 13) clone 재정의는 주의해서 진행하라
        • item 14) Comparable 구현을 고려하라
      • 4장: 클래스와 인터페이스
        • item 15) 클래스와 멤버의 접근 권한을 최소화하라
        • item 16) public 클래스에서는 public 필드가 아닌 접근자 메서드를 사용하라
        • item 17) 변경 가능성을 최소화하라
        • item 18) 상속보다는 컴포지션을 사용하라
        • item 19) 상속을 고려해 설계하고 문서화하고, 그러지 않았다면 상속을 금지하라
        • item 20) 추상 클래스보다는 인터페이스를 우선하라
        • item 21) 인터페이스는 구현하는 쪽을 생각해 설계하라
        • item 22) 인터페이스는 타입을 정의하는 용도로만 사용하라
        • item 23) 태그 달린 클래스보다는 클래스 계층구조를 활용하라
        • item 24) 멤버 클래스는 되도록 static으로 만들라
        • item 25) 톱레벨 클래스는 한 파일에 하나만 담으라
      • 5장: 제네릭
        • item 26) 로 타입은 사용하지 말 것
        • item 27) unchecked 경고를 제거하라
        • item 28) 배열보다 리스트를 사용하라
        • item 29) 이왕이면 제네릭 타입으로 만들라
        • item 30) 이왕이면 제네릭 메서드로 만들라
        • item 31) 한정적 와일드카드를 사용해 API 유연성을 높이라
        • item 32) 제네릭과 가변 인수를 함께 사용
        • item 33) 타입 안전 이종 컨테이너를 고려하라
      • 6장: 열거 타입과 어노테이션
        • item 34) int 상수 대신 열거 타입을 사용하라
        • item 35) ordinal 메서드 대신 인스턴스 필드를 사용하라
        • item 36) 비트 필드 대신 EnumSet을 사용하라
        • item 37) ordinal 인덱싱 대신 EnumMap을 사용하라
        • item 38) 확장할 수 있는 열거 타입이 필요하면 인터페이스를 사용하라
        • item 39) 명명 패턴보다 어노테이션을 사용하라
        • item 40) @Override 어노테이션을 일관되게 사용하라
        • item 41) 정의하려는 것이 타입이라면 마커 인터페이스를 사용하라
      • 7장: 람다와 스트림
        • item 42) 익명 클래스보다는 람다를 사용하라
        • item 43) 람다보다는 메서드 참조를 사용하라
        • item 44) 표준 함수형 인터페이스를 사용하라
        • item 45) 스트림은 주의해서 사용하라
        • item 46) 스트림에서는 부작용 없는 함수를 사용하라
        • item 47) 반환 타입으로는 스트림보다 컬렉션이 낫다
        • item 48) 스트림 병렬화는 주의해서 적용하라
      • 8장: 메서드
        • item 49) 매개변수가 유효한지 검사하라
        • item 50) 적시에 방어적 복사본을 만들라
        • item 51) 메서드 시그니처를 신중히 설계하라
        • item 52) 다중정의는 신중히 사용하라
        • item 53) 가변인수는 신중히 사용하라
        • item 54) null이 아닌, 빈 컬렉션이나 배열을 반환하라
        • item 55) 옵셔널 반환은 신중히 하라
        • item 56) 공개된 API 요소에는 항상 문서화 주석을 작성하라
      • 9장: 일반적인 프로그래밍 원칙
        • item 57) 지역 변수의 범위를 최소화하라
        • item 58) 전통적인 for문보다 for-each문을 사용하기
        • item 59) 라이브러리를 익히고 사용하라
        • item 60) 정확한 답이 필요하다면 float, double은 피하라
        • item 61) 박싱된 기본타입보단 기본 타입을 사용하라
        • item 62) 다른 타입이 적절하다면 문자열 사용을 피하라
        • item 63) 문자열 연결은 느리니 주의하라
        • item 64) 객체는 인터페이스를 사용해 참조하라
        • item 65) 리플렉션보단 인터페이스를 사용
        • item 66) 네이티브 메서드는 신중히 사용하라
        • item 67) 최적화는 신중히 하라
        • item 68) 일반적으로 통용되는 명명 규칙을 따르라
      • 10장: 예외
        • item 69) 예외는 진짜 예외 상황에만 사용하라
        • item 70) 복구할 수 있는 상황에서는 검사 예외를, 프로그래밍 오류에는 런타임 예외를 사용하라
        • item 71) 필요 없는 검사 예외 사용은 피하라
        • item 72) 표준 예외를 사용하라
        • item 73) 추상화 수준에 맞는 예외를 던지라
        • item 74) 메서드가 던지는 모든 예외를 문서화하라
        • item 75) 예외의 상세 메시지에 실패 관련 정보를 담으라
        • item 76) 가능한 한 실패 원자적으로 만들라
        • item 77) 예외를 무시하지 말라
      • 11장: 동시성
        • item 78) 공유 중인 가변 데이터는 동기화해 사용하라
        • item 79) 과도한 동기화는 피하라
        • item 80) 스레드보다는 실행자, 태스크, 스트림을 애용하라
        • item 81) wait와 notify보다는 동시성 유틸리티를 애용하라
        • item 82) 스레드 안전성 수준을 문서화하라
        • item 83) 지연 초기화는 신중히 사용하라
        • item 84) 프로그램의 동작을 스레드 스케줄러에 기대지 말라
      • 12장: 직렬화
        • item 85) 자바 직렬화의 대안을 찾으라
        • item 86) Serializable을 구현할지는 신중히 결정하라
        • item 87) 커스텀 직렬화 형태를 고려해보라
        • item 88) readObject 메서드는 방어적으로 작성하라
        • item 89) 인스턴스 수를 통제해야 한다면 readResolve보다는 열거 타입을 사용하라
        • item 90) 직렬화된 인스턴스 대신 직렬화 프록시 사용을 검토하라
    • 모던 자바 인 액션
      • 1장: 자바의 역사
      • 2장: 동작 파라미터화
      • 3장: 람다
      • 4장: 스트림
      • 5장: 스트림 활용
      • 6장: 스트림으로 데이터 수집
      • 7장: 병렬 데이터 처리와 성능
      • 8장: 컬렉션 API 개선
      • 9장: 람다를 이용한 리팩토링, 테스팅, 디버깅
      • 10장: 람다를 이용한 DSL
      • 11장: null 대신 Optional
      • 12장: 날짜와 시간 API
      • 13장: 디폴트 메서드
      • 14장: 자바 모듈 시스템
      • 15장: CompletableFuture와 Reactive 개요
      • 16장: CompletableFuture
      • 17장: 리액티브 프로그래밍
      • 18장: 함수형 프로그래밍
      • 19장: 함수형 프로그래밍 기법
      • 20장: 스칼라 언어 살펴보기
    • 자바의 이모저모
      • Javax
      • Objects
      • NIO
      • Thread
      • Concurrent
        • Atomic
        • Executor, ExecutorService
        • Interrupt
      • Assertions
    • Netty
      • 네티 맛보기
      • 네티의 주요 특징
      • 채널 파이프라인
      • 이벤트 루프
      • 바이트 버퍼
      • 부트스트랩
      • 네티 테스트
      • 코덱
      • 다양한 ChannelHandler와 코덱
      • 웹소켓
      • UDP 브로드캐스팅
    • 자바 병렬 프로그래밍
      • 2장: 스레드 안전성
      • 15장: 단일 연산 변수와 논블로킹 동기화
  • 🏖️코틀린
    • 코틀린 인 액션
      • 코틀린 언어의 특징
      • 코틀린 기초
      • 함수 정의와 호출
      • 클래스, 객체, 인터페이스
      • 람다
      • 타입 시스템
      • 연산자 오버로딩과 기타 관례
      • 고차 함수
      • 제네릭스
      • 어노테이션과 리플렉션
      • DSL 만들기
  • 🌸스프링
    • Spring Core
      • Cron Expression
      • Bean
        • Lifecycle
        • Aware
    • Spring MVC
    • Spring Security
      • 로그인 처리
      • 로그아웃 처리
      • JWT 인증 방식
      • 메소드별 인가 처리
    • Spring Data
      • Pageable
      • Spring Data Couchbase
      • Spring Data Redis
        • Serializer
    • Spring REST Docs
    • Spring Annotations
    • Spring Cloud
      • Service Discovery
      • API Gateway
      • Spring Cloud Config
      • MicroService Communication
      • Data Synchronization
    • Test
      • 테스트 용어 정리
      • JUnit
      • Spring Boot Test
      • Mockito
    • QueryDSL
      • 프로젝트 환경설정
      • 기본 문법
      • 중급 문법
      • 순수 JPA와 QueryDSL
      • 스프링 데이터 JPA와 QueryDSL
    • Lombok
      • @Data
      • @Builder
      • Log Annotations
  • 🕋DB
    • MySQL
      • CentOS7에서 MySQL 8 버전 설치하기
    • MongoDB
      • 
    • Redis
      • Sentinel
      • Cluster
      • Transaction
      • 자료구조
        • String
        • List
        • Set
        • Hash
        • Bitmaps
        • SortedSet
      • Lettuce 단일 서버, 클러스터 서버, 풀링 사용 방법
  • 📽️인프라
    • 리눅스
      • 주요 명령어 모음
    • Docker
      • Docker
      • Docker Compose
      • Docker Swarm
      • Docker Network
      • Linux에서 root 아닌 유저로 docker 실행하기
    • Kubernetes
      • 기초 개념
      • Pod
      • Configuration
      • ReplicationSet
      • Network
      • ConfigMap & Secret
      • Volume, Mount, Claim
      • Controller
      • Multi Container Pod
      • StatefulSet & Job
      • Rollout & Rollback
      • Helm
      • 개발 워크플로우와 CI/CD
      • Container Probes
      • Resource Limit
      • Logging & Monitoring
      • Ingress
      • Security
      • Multi Node/Architecture Cluster
      • Workload & Pod management
      • CRD & Operator
      • Serverless Function
      • K8S Cheat Sheet
    • Kafka
      • 카프카 개요
      • 카프카 설치 및 실습
      • Kafka Broker
      • Topic, Partition, Record
      • Producer
      • Consumer
      • Kafka Streams
      • Kafka Connect
      • MirrorMaker
  • AWS
    • AWS Console / CLI / SDK
    • IAM
    • EC2
      • EC2 Advanced
    • ELB / ASG
    • RDS / Aurora / ElastiCache
    • DynamoDB
    • DocumentDB / Neptune / Keyspaces / QLDB / Timestream
    • Route 53
    • Beanstalk
    • Solution Architect
    • S3
      • 보안
    • CloudFront
    • Global Accelerator
    • AWS Storage
    • Messaging
    • Container
    • Serverless
    • Data Analysis
    • Machine Learning
    • Monitoring
    • Security
    • VPC
    • Data Migration
    • 기타 서비스
  • 🏔️CS
    • 운영 체제
      • Introduction
      • System Structures
      • Process
      • Synchronization
      • Muitithreaded Programming
      • Process Scheduling
      • Memory Management
      • Virtual Memory
    • 네트워크
      • 네트워크 기초
      • 네트워크 통신 방식
      • OSI 7계층
        • 1계층: 물리계층
        • 2계층: 데이터 링크 계층
        • 3계층: 네트워크 계층
        • 4계층: 전송 계층
        • 5계층: 세션 계층
        • 6계층: 표현 계층
        • 7계층: 응용 계층
      • TCP/IP 스택
      • ARP
      • 데이터 크기 조절
      • WDM
      • NAT
      • DNS
      • DHCP
      • VPN
      • 네이글 알고리즘
      • 서버 네트워크
      • 네트워크 보안
        • 보안의 기본
        • 보안 장비
      • 이중화
    • 데이터베이스
      • 트랜잭션
    • 컴퓨터 구조
      • 개요
      • Instruction Set Architecture
      • Procedure Call & Return
      • Linking
      • Pipeline
      • Memory Hierarchy
      • Virtual Memory
      • Interrupt / Exception, IO
    • 자료 구조
      • Array
      • List
      • Map
      • Set
      • Queue
      • PriorityQueue
      • Stack
    • 웹 기술
      • HTTP
        • 쿠키와 세션
  • 🪂Big Data
    • Apache Hadoop
  • 🕹️ETC
    • Git
      • 내부 구조
      • 내가 자주 사용하는 명령어 모음
      • Commit Convention
    • 이력서 작성하기
    • Embedded
      • 라즈베리파이에서 네오픽셀 적용기
    • 기술블로그 모음집
Powered by GitBook
On this page
  • 🏝️ Future의 단순 활용
  • 🏝️ 비동기 API 구현
  • 동기 메서드를 비동기 메서드로 변환
  • 에러 처리 방법
  • supplyAsync
  • 🏝️ 비블록 코드
  • 병렬 스트림
  • CompletableFuture
  • 커스텀 Executor
  • 🏝️ 비동기 작업 파이프라인
  • 할인 서비스 적용
  • 동기 작업과 비동기 작업 조합
  • 독립적인 CompletableFuture들 합치기
  • Future와 CompletableFuture 비교
  • 타임아웃 처리
  • 🏝️ CompletableFuture 종료 대응 방법
  1. 자바
  2. 모던 자바 인 액션

16장: CompletableFuture

🏝️ Future의 단순 활용

  • Java 5 부터 미래의 어느 시점에 결과를 얻는 모델에 활용할수 있도록 Future 인터페이스를 제공하고 있다.

  • 시간이 걸리는 작업들을 Future 내부에 두면, 호출자 스레드가 결과를 기다리는 동안 다른 유용한 작업들을 할 수 있다.

  • 아래 코드는 Future로 다른 스레드에 오래걸리는 작업을 할당하고, 호출자 스레드에서는 future.get() 하기 전 로직을 수행하여 비동기적으로 동작한다.

ExecutorService executor = Executors.newCachedThreadPool();

Future<Double> future = executor.submit(new Callable<Double>() {
  public Double call() {
    return doSomeLongComputation();
  }
})

doSomeThingElse();

try {
  future.get(1, TimeUnit.SECONDS); // 비동기 작업의 결과가 준비되지 않았다면 1초까지만 기다려본다.
} catch (ExecutionException ee) {
  // 계산 중 예외 발생
} catch (InterruptedException ie) {
  // 현재 스레드에서 대기 중 인터럽트 발생
} catch (TimeoutException te) {
  // Future가 완료되기 전 타임아웃 발생
}
  • 여러 Future의 결과가 있을 때 의존성을 표현하기 어렵다. A의 계산이 끝나면 B에게 그 결과를 전달해 계산하고, B의 계산까지 끝나면 다른 질의의 결과와 조합하라는 요구사항이 있는 경우 Future로는 구현이 어렵다.

  • CompletableFuture 클래스는 아래와 같은 기능을 제공한다.

    • 두 비동기 계산 결과를 하나로 합칠 수 있다. 서로 독립적일수도, 의존적일수도 있다.

    • Future 집합이 실행하는 모든 태스크의 완료를 기다린다.

    • Future 집합에서 가장 빨리 완료되는 태스크를 기다렸다가 결과를 얻는다.

    • 프로그램적으로 Future를 완료시킨다.(비동기 동작에 수동으로 결과 제공)

    • Future 완료 동작에 반응한다.(결과를 기다리면서 블록되지 않고 결과가 준비되었다는 알림을 받은 후 Future의 결과에 원하는 추가 동작을 수행할 수 있다)

🏝️ 비동기 API 구현

동기 메서드를 비동기 메서드로 변환

  • 상품의 가격 정보를 반환하는 메서드를 비동기 API로 구현한다.

  • DB 접근, 할인 서비스 호출 등의 작업을 통해 상품의 가격을 얻는 것 처럼 calculatePrice 메서드를 작성했다.

public double getPrice(String product) {
  return calculatePrice(product);
}

private double calculatePrice(String product) {
  delay(); //1초간 블록
  return random.nextDouble() * product.charAt(0) + product.charAt(1);
}
  • 위 메서드를 블로킹되지 않도록 하기 위해 Future를 반환한다.

public Future<Double> getPriceAsync(String product) {
  CompletableFuture<Double> futurePrice = new CompletableFuture<>();
  new Thread(() -> {
    double price = calculatePrice(product);
    futurePrice.complete(price);
  }).start();
  return futurePrice;
}
  • 아래와 같이 제품의 가격 정보를 요청보내고 future 결과가 돌아오기 전까지 다른 작업을 처리할 수 있다.

Shop shop = new Shop("BestShop");
long start = System.nanoTime();
Future<Double> futurePrice = shop.getPriceAsync("my favorite product"); //제품 가격 요청
long invocationTime = ((System.nanoTime() - start) / 1_000_000);

//제품의 가격을 계산하는 동안 다른 상점 검색 등 작업 수행
doSomethingElse();

try {
  double price = future.get(); //가격정보를 받을때까지 블록
} catch (Exception e) {
  throw new RuntimeException(e);
}
long retrievalTime = ((System.nanoTime() - start) / 1_000_000);

에러 처리 방법

  • 예외 발생 시 해당 스레드에만 영향을 미친다.

  • 따라서 에러가 발생해도 클라이언트는 get 메서드가 반환될 때를 기다리며 영원히 블로킹된다.

  • 이를 해결하기 위해 get 메서드에 타임아웃값을 넣어주고, CompletableFuture 내부에서 발생된 에러 정보를 포함시키는 completeExceptionally 메서드를 사용해 외부로 예외를 전달한다.

  • 전달된 예외는 ExecutionException에 감싸서 던져진다.

public Future<Double> getPriceAsync(String product) {
  CompletableFuture<Double> futurePrice = new CompletableFuture<>();
  new Thread(() -> {
    try {
      double price = calculatePrice(product);
      futurePrice.complete(price);
    } catch (Exception e) {
      futurePrice.completeExceptionally(ex); //에러를 포함시켜 Future를 종료
    }
  }).start();
  return futurePrice;
}

supplyAsync

  • CompletableFuture를 직접 생성하는 대신 supplyAsync() 메서드를 이용해 생성되도록 할 수 있다.

  • Supplier를 인수로 받아 CompletableFuture를 반환하는 메서드

  • ForkJoinPool의 Executor 중 하나가 Supplier를 실행하며, ForkJoinPool의 Executor 대신 다른 Executor를 지정하려면 두 번째 인자로 넣어주면 된다.

public Future<Double> getPriceAsync(String product) {
  return CompletableFuture.supplyAsync(() -> calculatePrice(product));
}

🏝️ 비블록 코드

  • 블록 메서드를 사용할 수밖에 없는 상황에서 비동기적으로 여러 API를 호출하여 프로그램 성능을 높일 수 있도록 하자.

  • 아래와 같이 각각의 shop에 존재하는 product의 가격을 가져오도록 하는 메서드가 있다고 한다. 이 때 하나의 가게에서 상품 가격을 가져오는 작업이 약 1초가 걸린다면 <shop의 개수>초 동안 스레드가 블로킹된다.

  • 이 코드를 점차 비블록 코드로 개선해보자.

public List<String> findPrices(String product) {
  return shops.stream()
    .map(shop -> String.format("%s price is %.2f", shop.getName(), shop.getPrice(product)))
    .collect(toList());
}

병렬 스트림

  • 순차 계산을 병렬로 처리해 성능을 개선할 수 있다.

  • shop에서 product 가격을 가져오는 작업들이 병렬로 처리되므로 1초 남짓의 시간에 모든 작업이 완료될 것이다.

public List<String> findPrices(String product) {
  return shops.parallelStream()
    .map(shop -> String.format("%s price is %.2f", shop.getName(), shop.getPrice(product)))
    .collect(toList());
}

CompletableFuture

  • CompletableFuture로 가게의 상품 가격을 찾는 로직을 비동기적으로 수행한다.

  • 스트림 연산은 lazy하기 때문에 하나의 파이프라인(map)으로 연산을 처리하면 모든 가격 정보 요청 동작을 동기적으로 수행하게 된다.

  • 즉, 가게의 상품 가격을 하나 찾으면 join을 호출하고, 또 다른 가게의 상품 가격을 찾으면 join을 호출하는 형태이다.

  • 스트림을 두 개로 나누어 연산할 경우 첫번째 스트림으로 CompletableFuture로 요청을 모두 보내놓고, 두번째 스트림은 이 Future의 결과를 하나씩 가져와 조합한다.

public List<String> findPrices(String product) {
  List<CompletableFuture<String>> priceFutures = 
    shops.stream()
      .map(shop -> CompletableFuture.supplyAsync(
        () -> shop.getName() + "price is " + shop.getPrice(product)))
      .collect(toList());
      
  return priceFutures.stream()
    .map(CompletableFuture::join) //모든 비동기 동작이 끝나기를 기다린다.
    .collect(toList());
}
  • CompletableFuture를 사용하더라도 병렬 스트림보다 성능이 느릴 수 있다.

커스텀 Executor

  • 비동기 동작을 많이 사용하는 상황에서 유용하다.

  • CompletableFuture는 병렬 작업에 이용할 수 있는 다양한 Executor를 직접 커스텀할 수 있어 스레드 풀의 크기를 조정하는 등 애플리케이션을 최적화할 수 있다.

  • 스레드 풀 크기 조정

    • 스레드 풀의 크기가 너무 크면 context switching과 race condition으로 인해 CPU와 메모리 자원을 낭비할 수 있다.

    • 스레드 풀의 크기가 너무 작으면 CPU의 일부 코어가 활용되지 않을 수 있어 효율적이지 않다.

    • 자바 병렬 프로그래밍 책에 따르면 스레드 풀의 크기는 다음 공식으로 대략적인 CPU 활용 비율을 계산 할 수 있다.

      (스레드 개수) = (코어 수) * (CPU 활용 비율) * (1 + (대기시간)/(계산시간))
      - 코어 수는 Runtime.getRuntime().availableProcessors()를 통해 구할 수 있다.
      - CPU 활용 비율은 0과 1 사이의 값을 갖는다.
  • 상점의 상품 가격 구하는 예제에서는 요청의 응답을 오래 기다리는 경우이므로 (대기시간)/(계산시간)이 100이라고 볼 수 있다.

  • 스레드의 개수를 상점 수만큼만 두도록 한다. 스레드 수가 너무 많으면 서버에 충돌날 수 있기 때문에 하나의 Executor에서 사용할 스레드의 최대 개수는 100이하로 설정하도록 한다.

private final Executor executor = Executors.newFixedThreadPool(Math.min(shops.size(), 100),
    new ThreadFactory() {
  public Thread newThread(Runnable r) {
    Thread t = new Thread(r);
    t.setDeamon(true);
    return t;
  }
});
  • 여기서 만든 스레드 풀에는 자바 프로그램이 종료될 때 강제로 종료되는 데몬 스레드를 포함한다.

  • Executor를 정의하였으므로 CompletableFuture의 supplyAsync메서드에 이를 넣어주면, 4개 뿐만 아니라 400개의 상점을 탐색하는 경우에도 성능을 유지할 수 있다.

public List<String> findPrices(String product) {
  List<CompletableFuture<String>> priceFutures = 
    shops.stream()
      .map(shop -> CompletableFuture.supplyAsync(
        () -> shop.getName() + "price is " + shop.getPrice(product), executor))
      .collect(toList());
      
  return priceFutures.stream()
    .map(CompletableFuture::join) //모든 비동기 동작이 끝나기를 기다린다.
    .collect(toList());
}
  • 스트림 병렬화 vs CompletableFuture 병렬화

    • I/O가 포함되지 않은 계산 중심의 로직이면 스트림 인터페이스가 구현하기 간단하고 효율적

    • I/O를 기다리는 작업을 병렬로 수행할 때에는 CompletableFuture가 더 많은 유연성을 제공하며 대기시간/계산시간 비율에 적합한 스레드 수를 설정할 수 있다. 또한 스트림은 lazy 특성으로 인해 I/O를 실제 언제 처리할 지 예측하기 어렵다는 문제도 있다.

🏝️ 비동기 작업 파이프라인

할인 서비스 적용

  • 앞서 다룬 예제의 상점들이 멤버 등급에 따라 서로 다른 할인율을 반환하는 할인 서비스를 사용하게 되었다.

  • 아래는 할인 서비스를 사용하는 간단한 findPrices 메서드이다.

    • Quote라는 클래스는 어떤 가게의 어떤 가격에 어떤 할인율을 적용할지 담고 있다.

    • 이 정보들을 Discount 클래스로 넘기면 실제 할인이 적용된 가격을 조회할 수 있다. 이 과정에서 5초를 지연시킨다.

    • 이렇게 여러 가게로부터 가격을 가져와 할인을 적용한 값들을 리스트 형태로 반환한다.

public List<String> findPrices(String product) {
  return shops.stream()
    .map(shop -> shop.getPrice(product)) // 각 상점에서 상품 가격 조회
    .map(Quote::parse) // 상점과 상품 가격, 할인규칙을 담은 문자열을 Quote 객체로 변환
    .map(Discount::applyDiscount)  // 상점 이름과 할인 적용된 상품 가격을 합친 문자열 반환
    .collect(toList());
}
@Getter
@RequiredArgsConstructor
public class Quote {
  private final String shopName;
  private final double price;
  private final Discount.code discountCode;

  public static Quote parse(String s) {
    String[] split = s.split(":");
    String shopName = split[0];
    double price = Doule.parseDouble(split[1]);
    Discount.Code discountCode = Discount.Code.valueOf(split[2]);
    return new Quote(shopName, price, discountCode);
  }
}

public class Discount {
  public enum Code {
    NONE(0), SILVER(5), GOLD(10), PLATINUM(15), DIAMOND(20);
  }
  // ...
  
  public static String applyDiscount(Quote quote) {
    return quote.getShopName() + " price is " + Discount.apply(
      quote.getPrice(), quote.getDiscountCode());
  }
  
  private static double apply(double price, Code code) {
    delay(); // 1초 블록
    return format(price * (100 - code.percentage) / 100);
  }
}
  • 이 방식은 순차적으로 한 상점에 가격 정보를 요청하고 할인 서비스를 적용하므로 가격 정보 요청과 할인 서비스에 각각 5초씩 걸린다면, 상점이 많아질수록 응답 시간이 기하급수적으로 늘어난다.

  • 병렬 스트림을 사용하더라도 스레드 풀의 크기가 고정되어 있어 상점 수가 늘어났을 때 유연하게 대응할 수 없다.

동기 작업과 비동기 작업 조합

  • 첫번째 map에서는 비동기적으로 가게에서 가격 정보를 조회한다. 이 때 CompletableFuture에는 예전에 정의했던 executor를 설정하여 가게 수만큼 스레드풀을 만들도록 한다.

  • 두번째 map에서는 첫번째 map의 결과인 CompletableFuture<String>에 thenApply() 메서드를 사용해 반환 값을 Completable<Quote>로 변환한다. 이 때 thenApply() 메서드는 블록되지 않는다.

  • 세번째 map에서는 원격 할인 서비스를 통해 할인된 가격을 얻어와야 하므로 동기적으로 작업을 수행해야 한다. thenCompose 메서드를 통해 quote 객체를 할인 서비스에 보내 할인된 가격을 담는 CompletableFuture를 반환한다.

  • 이렇게 얻은 CompletableFuture가 완료되기를 기다렸다가 join으로 값을 추출하면 최종 값을 얻을 수 있다.

public List<String> findPrices(String product) {
  List<CompletableFuture<String>> priceFutures = 
    shops.stream()
      .map(shop -> CompletableFuture.supplyAsync(
        () -> shop.getPrice(product), executor))
      .map(future -> future.thenApply(Quote::parse))
      .map(future -> future.thenCompose(quote ->
        CompletableFuture.supplyAsync(
          () -> Discount.applyDiscount(quote), executor)))
      .collect(toList());
      
  return priceFutures.stream()
    .map(CompletableFuture::join)
    .collect(toList());
}
  • thenCompose 메서드도 Async 형태로 반환할 수 있지만 위 예제의 thenCompose의 CompletableFuture는 이전 CompletableFuture에 의존하기 때문에 Async버전으로 수행해도 실행 시간에는 영향이 없다. 따라서 스레드 전환 오버헤드가 적고 효율이 좋은 thenCompose를 사용했다.

독립적인 CompletableFuture들 합치기

  • thenCombine

    • 두 개의 CompletableFuture 결과를 어떻게 합칠지에 대한 BiFunction을 두번째 인수로 받는 메서드

    • Async 버전도 별도로 존재하여, BiFunction이 정의하는 조합 동작이 스레드 풀으로 제출되면 별도 태스크에서 비동기적으로 수행된다.

  • 아래 예시는 유로로 된 상품 정보를 얻어와 달러로 변환해야 하는 요구사항을 구현한 코드이다.

  • 상품의 가격을 얻는 것을 하나의 비동기 작업으로 요청하고, thenCombine 메서드에 환율 정보를 가져오는 비동기 요청과 함께 어떻게 결과를 합칠지에 대한 함수를 입력받는다.

  • 이로써 두 CompletableFuture의 결과가 생성되면 합칠 수 있는 코드가 되었다.

Funtion<Double> futurePriceInUSD = CompletableFuture
  .supplyAsync(() -> shop.getPrice(product)) // 가격정보 요청
  .thenCombine(CompletableFuture.supplyAsync(
      () -> exchangeService.getRate(Money.EUR, Money.USD)), // 환율정보 요청
    (price, rate) -> price * rate)); //두 결과 합침

Future와 CompletableFuture 비교

  • CompletableFuture을 사용하면 람다 표현식을 사용해 동기/비동기 태스크를 활용한 복잡한 연산 수행 방법을 효과적으로 정의할 수 있다.

  • 바로 위에서 다룬 유로로 된 상품 정보를 얻어와 달러로 변환하는 요구사항을 CompletableFuture와 람다 없이 구현하려면 아래와 같이 작성해야 하므로 코드의 복잡성이 높아진다.

ExecutorService executor = Executors.newCachedThreadPool();
final Future<Double> futureRate = executor.submit(new Callable<Double>() {
  public Double call() {
    return exchangeService.getRate(Money.EUR, Money.USD);
  }
});

Future<Double> futurePriceInUSD = executor.submit(new Callable<Double>() {
  public Double call() {
    double priceInEUR = shop.getPrice(product);
    return priceInEUR * futureRate.get();
  }
});

타임아웃 처리

orTimeout() 메서드

  • Future가 특정 시간 안에 작업을 끝내지 못할 경우 TimeoutException이 발생하도록 할 수 있다.

  • 이를 통해 무한 블로킹되지 않도록 방지할 수 있다.

  • CompletableFuture에서 제공하는 orTimeout 메서드는 지정된 시간 후 CompletableFuture를 TimeoutException으로 완료하고 또다른 CompletableFuture를 반환할 수 있도록 SchedluedThreadExecutor를 사용한다.

Funtion<Double> futurePriceInUSD = CompletableFuture
  .supplyAsync(() -> shop.getPrice(product))
  .thenCombine(CompletableFuture.supplyAsync(
      () -> exchangeService.getRate(Money.EUR, Money.USD)),
    (price, rate) -> price * rate))
  .orTimeout(3, TimeUnit.SECONDS);

completeOntimeout() 메서드

  • completeOntimeout 메서드를 사용해 환율 서비스가 1초안에 응답하지 않을 경우 기본 환율을 사용하도록 할 수 있다.

  • 이를 통해 환율 서비스가 느릴 때마다 예외를 발생시키지 않고 정상적인 결과를 반환해줄 수 있다.

Funtion<Double> futurePriceInUSD = CompletableFuture
  .supplyAsync(() -> shop.getPrice(product))
  .thenCombine(CompletableFuture
      .supplyAsync(() -> exchangeService.getRate(Money.EUR, Money.USD))
      .completeOnTimeout(DEFAULT_RATE, 1, TimeUnit.SECONDS),
    (price, rate) -> price * rate))
  .orTimeout(3, TimeUnit.SECONDS);

🏝️ CompletableFuture 종료 대응 방법

  • 현실적으로 상품 가격 조회에 걸리는 시간은 각 가게마다 다를 수 있다.

  • 모든 가게에서 가격 정보를 제공할 때까지 기다리지 말고 상점에서 가격을 제공할 때마다 즉시 보여주도록 구현한다.

  • 기존에 구현했던 경우와 다르게 CompletableFuture을 join 처리 하지 않고 스트림을 그대로 반환한다.

public Stream<CompletableFuture<String>> findPriceStream(String product) {
  return shop.stream()
    .map(shop -> CompletableFuture.supplyAsync(
      () -> shop.getPrice(product), executor))
    .map(future -> future.thenApply(Quote::parse))
    .map(future -> future.thenCompose(quote ->
      CompletableFuture.supplyAsync(
        () -> Discount.applyDiscount(quote), executor)));
}
  • thenAccept 메서드는 CompletableFuture의 계산이 끝나면 값을 소비하도록 하는데, 연산 결과를 소비하는 Consumer를 인수를 받는데, 여기서는 값을 출력하도록 하였다.

  • thenAccept 메서드의 반환 타입은 CompletableFuture<Void>이다.

findPriceStream("product1").map(f -> f.thenAccept(System.out::println));
  • 상품 가격 조회가 가장 빨랐던 가게부터 가장 오래 걸렸던 가게의 상품 할인 가격을 모두 출력하려면 아래와 같이 allOf 메서드가 반환하는 CompletableFuture에 join을 호출하면 된다.

  • 이를 통해 모든 상점의 결과가 반환되거나, 타임아웃으로 예외가 발생된 사실을 알릴 수 있다.

CompletableFuture[] futures = findPriceStream("product1")
  .map(f -> f.thenAccept(System.out::println))
  .toArray(size -> new CompletableFuture[size]);
CompletableFuture.allOf(futures).join();
  • 반대로 여러 CompletableFuture 중 하나만 작업이 끝나면 되는 상황이면 anyOf 메서드가 반환하는 CompletableFuture에 join을 호출하여 값을 가져올 수 있다.

Previous15장: CompletableFuture와 Reactive 개요Next17장: 리액티브 프로그래밍

Last updated 2 months ago

🏝️