오퍼레이터
sequence 생성
Mono<LocalDateTime> justMono = Mono.just(LocalDateTime.now()); // 12:00:00 Mono<LocalDateTime> deferMono = Mono.defer(() -> Mono.just(LocalDateTime.now())); Thread.sleep(2000); justMono.subscribe(data -> log.info("# onNext just1: {}", data)); // 12:00:00 deferMono.subscribe(data -> log.info("# onNext defer1: {}", data)); // 12:00:02 Thread.sleep(2000); justMono.subscribe(data -> log.info("# onNext just2: {}", data)); // 12:00:00 deferMono.subscribe(data -> log.info("# onNext defer2: {}", data)); // 12:00:04Mono .just("Hello") .delayElement(Duration.ofSeconds(3)) .switchIfEmpty(Mono.defer(() -> sayDefault())) .subscribe(data -> log.info("# onNext: {}", data));Flux .using(() -> Files.lines(path), Flux::fromStream, Stream::close) .subscribe(log::info);Flux .generate(() -> 0, (state, sink) -> { sink.next(state); if (state == 10) sink.complete(); return ++state; }) .subscribe(data -> log.info("# onNext: {}", data));Flux.create((FluxSink<Integer> sink) -> { sink.onRequest(n -> { try { Thread.sleep(1000L); for (int i = 0; i < n; i++) { if (COUNT >= 9) { sink.complete(); } else { COUNT++; sink.next(DATA_SOURCE.get(COUNT)); } } } catch (InterruptedException e) {} }); sink.onDispose(() -> log.info("# clean up")); }).subscribe(new BaseSubscriber<>() { @Override protected void hookOnSubscribe(Subscription subscription) { request(2); } @Override protected void hookOnNext(Integer value) { SIZE++; log.info("# onNext: {}", value); if (SIZE == 2) { request(2); SIZE = 0; } } @Override protected void hookOnComplete() { log.info("# onComplete"); } });Flux.create((FluxSink<Integer> sink) -> priceEmitter.setListener(new CryptoCurrencyPriceListener() { @Override public void onPrice(List<Integer> priceList) { priceList.stream().forEach(price -> { sink.next(price); }); } @Override public void onComplete() { sink.complete(); } })) .publishOn(Schedulers.parallel()) .subscribe( data -> log.info("# onNext: {}", data), error -> {}, () -> log.info("# onComplete")); Thread.sleep(3000L); priceEmitter.flowInto(); // 변동되는 가격을 받아올 수 있도록 한다. Thread.sleep(2000L); priceEmitter.complete(); // 가격을 더이상 받아올 필요가 없으면 complete 처리한다.int start = 1; int end = 4; Flux.create((FluxSink<Integer> emitter) -> { emitter.onRequest(n -> { log.info("# requested: " + n); try { Thread.sleep(500L); for (int i = start; i <= end; i++) { emitter.next(i); } start += 4; end += 4; } catch (InterruptedException e) {} }); emitter.onDispose(() -> { log.info("# clean up"); }); }, FluxSink.OverflowStrategy.DROP) // 오버플로우 발생 시 DROP하도록 한다. .subscribeOn(Schedulers.boundedElastic()) .publishOn(Schedulers.parallel(), 2) .subscribe(data -> log.info("# onNext: {}", data));
sequence 필터링
Sequence 변환
Sequence 내부 동작 확인
에러 처리
Sequence 시간 측정
Flux Sequence 분할
Flux를 다수의 Subscriber에게 Multicasting
Last updated