๐ช 0. ์ด์ ๊ธ
Mono์ Flux์ ๋ํ ์ดํด๋ ์ด์ ๊ธ์์ ํ์ธํ์ค ์ ์์ต๋๋ค.
๐ 1. ์๋ก์ด ์ํ์ค๋ฅผ ์์ฑํ ๋ ์ฌ์ฉํ๋ Operator
์ํฉ์ ๋ฐ๋ผ ์๋ก์ด ์ํ์ค๋ฅผ ์์ฑํ ๋, ๋ค์ํ ์คํผ๋ ์ดํฐ๋ฅผ ๋ง๋ค ์ ์์ต๋๋ค.
1) T ํ์ ์ ๊ฐ์ฒด๋ฅผ ๋ฐฉ์ถํ๊ณ , ํด๋น ํ์ ์ ์์งํ๊ณ ์๋ ๊ฒฝ์ฐ.
public static <T> Flux<T> just(T data);
public static <T> Mono<T> just(T data);
์ ์ ํฉํ ๋ฆฌ ๋ฉ์๋๋ก, Flux์ Mono๋ just๋ฅผ ๊ฐ์ง๊ณ ์์ต๋๋ค. ํด๋น T ํ์ ์ ๊ฐ์ฒด๋ฅผ ์์งํ๊ณ ์๋ ๊ฒฝ์ฐ์ด๋ฏ๋ก, just๋ฅผ ์ฌ์ฉํ ์ ์์ต๋๋ค.
๋ด๊ฐ ์ง์ ์ ์ผ๋ก ์ฌ๋ฌ๊ฐ์ T ํ์ ์ ๊ฐ์ฒด๋ฅผ ๋ช ์ํ ์ ์๋ค๋ฉด, ์ด๊ฑฐํ๋ ๋ฐฉ์์ผ๋ก๋ Flux๋ฅผ ๋ง๋ค ์ ์์ต๋๋ค.
public static <T> Flux<T> just(T... data)
๋ง์ฝ, null ํน์ Optional<T> ๊ฐ์ฒด๋ก๋ถํฐ Mono ๊ฐ์ฒด๋ฅผ ์์ฑํ๊ณ ์ถ๋ค๋ฉด, justOrEmpty ๋ฉ์๋๋ฅผ ์ฌ์ฉํ๋ ๊ฒ์ด ์ข์ต๋๋ค.
public static <T> Mono<T> justOrEmpty(@Nullable Optional<? extends T> data);
public static <T> Mono<T> justOrEmpty(@Nullable T data);
Parameter๋ก, Optional ๊ฐ์ฒด๋ฅผ ๋ฐ์๋ค์ด๋ ๊ฒฝ์ฐ, Optional.isPresent๋ผ๋ฉด (๋น์ด์์ง ์๋ค๋ฉด) ํด๋น ์์ดํ ์ ๋ฐฉ์ถ ๊ฐ๋ฅํ Mono๋ฅผ ์์ฑํฉ๋๋ค. ๋ง์ฝ Optional.isPresent๊ฐ ์๋๋ผ๋ฉด, ์ค์ง onComplete()๋ฅผ ํธ์ถํ๋ Mono๋ฅผ ์์ฑํ๊ฒ ๋ฉ๋๋ค.
๋ค๋ฅธ Nullableํ ๊ฐ์ฒด์ ๊ฒฝ์ฐ, null์ด ์๋๋ผ๋ฉด ํด๋น ์์ดํ ์ ๋ฐฉ์ถ ๊ฐ๋ฅํ Mono๋ฅผ, null ์ด๋ฉด ์ค์ง onComplete()๋ฅผ ํธ์ถํ๋ Mono๋ฅผ ์์ฑํ๊ฒ ๋ฉ๋๋ค.
2) ๋ฉ์๋์ ์ํด ๋ฆฌํด ๋ฐ๋ T ํ์ ์ ๊ฐ์ฒด๋ฅผ ๋ฐฉ์ถํ๋ ๊ฒฝ์ฐ.
์ด ์ญ์ ๋น์ฐํ๋, ์ด์ ์ just ์ ์ ํฉํ ๋ฆฌ ๋ฉ์๋๋ฅผ ์ฌ์ฉํ ์ ์์ต๋๋ค.
public static <T> Flux<T> just(T... data);
public static <T> Flux<T> just(T data);
public static <T> Mono<T> just(T data);
ํ์ง๋ง, lazily captured์ ๊ฒฝ์ฐ, ๋ ๊ฐ์ง์ ์ ํ์ด ๊ฐ๋ฅํฉ๋๋ค.
โป lazily captured : subscribeํ๋ ์์ ์ ๋ฐ์ดํฐ๋ฅผ ์์ฑํ๊ณ ์ถ์ ๊ฒฝ์ฐ๋ฅผ ์๋ฏธํฉ๋๋ค.
1) Mono.fromSupplier์ ์ฌ์ฉ
2) Flux.defer ํน์ Mono.defer์ ๋ด๋ถ๋ฅผ Flux.just ํน์ Mono.just๋ก ๊ฐ์ธ๊ธฐ.
- Mono.just vs Mono.fromSupplier์ ์ฐจ์ด
public class MonoFromSupplier {
public static void main(String[] args) {
Mono<String> just1 = Mono.just(getName());
Mono<String> just2 = Mono.fromSupplier(() -> getName());
}
private static String getName() {
System.out.println("Generating name...");
return Util.faker().name().fullName();
}
}
์์ ์ฝ๋์ ๊ฒฐ๊ณผ๋ก๋,
just1์์๋ getName()์ด ์คํ๋์ด "Generating name..."์ด ์คํ๋๊ณ , just2๋ ์๋ฌด๊ฒ๋ ์ถ๋ ฅ๋์ง ์์ต๋๋ค.
fromSupplier๋ lazily captured๋ก, subscribeํ๋ ์์ ์ ๋ฐ์ดํฐ๋ฅผ ์์ฑํ๊ณ , just2๋ subscribe๋ ์ํ๊ฐ ์๋๊ธฐ ๋๋ฌธ์ ์๋ฌด๋ฐ ๋ฐ์์ด ์๋ ๊ฒ์ ๋๋ค.
public static <T> Mono<T> fromSupplier(Supplier<? extends T> supplier);
์ ๊ณต๋ Supplier๋ฅผ ์ฌ์ฉํด ๊ฐ์ ๋ง๋ค์ด๋ด๋ Mono ๊ฐ์ฒด๋ฅผ ๋ง๋ค์ด๋ ๋๋ค. ๋ง์ฝ Supplier๊ฐ Null์์ด ํ์ธ๋๋ฉด, ๊ฒฐ๊ณผ Mono๋ empty ์ํ๋ก ์๋ฃํฉ๋๋ค.
Mono์ fromSupplier๋ supplier๋ฅผ ์ธ์๋ก ๋ฐ๊ธฐ ๋๋ฌธ์, ์ด supplier๋ฅผ ๋๊ตฐ๊ฐ invoke ํด์ฃผ์ด์ผ ์คํ๋ฉ๋๋ค. ๋ฐ๋ผ์ lazily captured๊ฐ ๊ฐ๋ฅํฉ๋๋ค.
๋ ๋ค๋ฅธ ๋ฐฉ์์ผ๋ก๋, Flux/Mono.defer ๋ด๋ถ๋ฅผ Flux/Mono.just๋ก ๊ฐ์ธ๋ ์ ๋ต์ด ์์ต๋๋ค.
์ด ๋ถ๋ถ์ ํ๋จ์์ "defer" operator์์ ๋ช ์ํ๊ฒ ์ต๋๋ค.
3) ๋ฐ๋ณต๋๋ T ํ์ ์ ๊ฐ์ฒด๋ค์ด ์ฃผ์ด์ง ๊ฒฝ์ฐ
1) Array ์ฆ, ๋ฐฐ์ด์ด ์ฃผ์ด์ง ๊ฒฝ์ฐ
2) Collection ํน์ Iterableํ ๊ฐ์ฒด๊ฐ ์ฃผ์ด์ง ๊ฒฝ์ฐ
3) ์ ์์ ํน์ ๋ฒ์๋ฅผ ์ฌ์ฉํ๊ณ ์ถ์ ๊ฒฝ์ฐ
4) ๊ฐ subscripction์ ๋ํด ์ ๊ณต๋๋ ์คํธ๋ฆผ์ ์ฌ์ฉํ๊ณ ์ถ์ ๊ฒฝ์ฐ
3-1. Array ์ฆ, ๋ฐฐ์ด์ด ์ฃผ์ด์ง ๊ฒฝ์ฐ
public static <T> Flux<T> fromArray(T[] array);
์ฃผ์ด์ง ๋ฐฐ์ด์ ํฌํจ๋ ์์ดํ ๋ค์ ๋ฐฉ์ถํ๋ Flux๋ฅผ ์์ฑํ๋ ๋ฉ์๋๋ fromArray์ ๋๋ค. ์ญ์ ์ ์ ํฉํ ๋ฆฌ ๋ฉ์๋๋ก Flux ๊ฐ์ฒด๋ฅผ ์ฝ๊ฒ ์์ฑํ ์ ์์ต๋๋ค.
3-2. Collection ํน์ Iterableํ ๊ฐ์ฒด๊ฐ ์ฃผ์ด์ง ๊ฒฝ์ฐ
public static <T> Flux<T> fromIterable(Iterable<? extends T> it);
Iterableํ ๊ฐ์ฒด์ ๋ด๊ฒจ์๋ ์์ดํ ์ ๋ฐฉ์ถํ๋ Flux ๊ฐ์ฒด๋ฅผ ์์ฑํฉ๋๋ค. Iterable.iterator() ๋ฉ์๋๋ ์ ์ด๋ ํ ๋ฒ, ์ต๋ ๋ ๋ฒ ๊ฐ๊ฐ์ subscriber๋ฅผ ํธ์ถํ ๊ฒ์ ๋๋ค.
fromIterable๋ Iterable์ Spliterator๋ฅผ ๊ฒ์ฌํด์, ๋ฐ๋ณต์ด ์ ํํ๋ค๋ ๊ฒ์ ๋ณด์ฅํ๋์ง๋ฅผ ํ๋จํฉ๋๋ค. ๊ธฐ๋ณธ Spliterator๋ Iterable.iterator()์ ๋ ๋ฒ ํธ์ถํ ์ ์๋ ๋ฐ๋ณต์๋ฅผ ๊ฐ์ธ๊ณ ์์ผ๋, ์ด ๋ ๋ฒ์งธ ํธ์ถ์ ํญ์ ์ ํํ ๊ฒ์ผ๋ก ๊ฐ์ฃผ๋๋ ์ ํ์ธ Collection์์ ๊ฑด๋๋ฐ๊ฒ ๋ฉ๋๋ค.
3-3. ์ ์์ ํน์ ๋ฒ์๋ฅผ ์ฌ์ฉํ๊ณ ์ถ์ ๊ฒฝ์ฐ
public static Flux<Integer> range(int start, int count);
start ๋ถํฐ ์ฐจ๋ก๋ก ๋์ด๋๋ ์ ์ ์์ด์ ์์ดํ ์ ๋ฐฉ์ถํ๋ Flux๋ฅผ ๋ง๋ค์ด๋ ๋๋ค. [start, start+count)์ ๋ฒ์๋ฅผ ์์ฑํฉ๋๋ค.
3-4) ๊ฐ subscripction์ ๋ํด ์ ๊ณต๋๋ ์คํธ๋ฆผ์ ์ฌ์ฉํ๊ณ ์ถ์ ๊ฒฝ์ฐ
public static <T> Flux<T> fromStream(Supplier<Stream<? extends T>> streamSupplier);
๊ฐ๊ฐ์ Subsricption์ ์ํ Supplier๋ก๋ถํฐ ์์ฑ๋ ์คํธ๋ฆผ์ ์์ดํ ์ ๋ฐฉ์ถํ๋ Flux๋ก ๋ง๋ค์ด๋ ๋๋ค. Stream์ ์คํผ๋ ์ดํฐ๊ฐ ์ทจ์๋๊ฑฐ๋, ์๋ฌ์ํ์ด๊ฑฐ๋, ์๋ฃ์ํ์ผ ๋ ์๋์ผ๋ก ๋ซํ๋๋ค.
4) ๋ค์ํ ๋จ์ผ ๊ฐ ์์ค๋ก๋ถํฐ ๋ฐฉ์ถ๋๋ ๊ฒฝ์ฐ.
1) Supplier<T>
2) Task
3) CompletableFuture<T>
5) ๊ทธ๋ฅ ์๋ฃํ๋ ๊ฒฝ์ฐ.
public static <T> Flux<T> empty();
public static <T> Mono<T> empty();
Mono์ Flux ๋ชจ๋ empty()๋ผ๋ ์ ์ ์คํํฑ ๋ฉ์๋๋ฅผ ๊ฐ์ง๊ณ ์์ต๋๋ค. ์๋ฃํ๋ Mono/Flux ๊ฐ์ฒด๋ก ์ด๋ ํ ์์ดํ ๋ ๋ฐฉ์ถํ์ง ์์ต๋๋ค. T ํ์ ์ ๊ตฌ์ฒดํ๋ Subscriber ํ์ ์ ๋๋ค.
6) ์๋ฌ๋ฅผ ๋ฐ์์ํค๋ ๊ฒฝ์ฐ.
public static <T> Flux<T> error(Throwable error);
public static <T> Mono<T> error(Throwable error);
public static <T> Flux<T> error(Supplier<? extends Throwable> errorSupplier);
public static <T> Mono<T> error(Supplier<? extends Throwable> errorSupplier);
7) ์๋ฌด๊ฒ๋ ํ์ง ์๋ ๊ฒฝ์ฐ.
public static <T> Flux<T> never();
์ด๋ ํ ๋ฐ์ดํฐ, ํน์ ์๋ฌ / ์๋ฃ ์ ํธ๋ ๋ณด๋ด์ง ์๋ Flux๋ฅผ ์์ฑํฉ๋๋ค. Empty()๋ Complete ์ ํธ๋ฅผ ๋ณด๋ด๋ ๊ฒ์ด ์ฐจ์ด์ ๋๋ค.
Complete ์ ํธ๋ ๋ณด๋ด์ง ์๊ธฐ ๋๋ฌธ์ ์ข ๋ฃ๋์ง ์์ต๋๋ค.
๋๋์ฒด ์ด๊ฒ์ ์ด๋์ ์ฐ๋? ์ฐพ์๋ณธ ๊ฒฐ๊ณผ StackOverFlow์ ๋ต๋ณ ์ค ํ๋.
It is very often used in tests (typically to assert timeout behavior), but can also have production usage: some operators take a control Publisher as parameter in various situations where they need an asynchronous external signal to tell them to trigger some behavior. If in some cases you don't want said behavior to trigger, use never().
For instance, 'windowWhen' takes such parameter both for opening and closing windows (the later generated by a Function). Conditionally returning a Mono.never() you could have a window that never closes.
[์ถ์ฒ] https://stackoverflow.com/questions/48273301/when-to-use-mono-never
๊ฒฐ๋ก ์, ์๊ฐ์ด๊ณผ ํ๋์ ๋จ์ธํ๊ธฐ ์ํ ํ ์คํธ์์ ์ข ์ข ์ฌ์ฉ๋๋ฉฐ, ๋๋๋ก๋ ํ๋ก๋์ ์์๋ ์ฌ์ฉ๋๊ธฐ๋ ํ๋ค๊ณ ํฉ๋๋ค. ๋ช๋ช ์คํผ๋ ์ดํฐ๋, ์ผ๋ถ ๋์์ ํธ๋ฆฌ๊ฑฐ ํ๋๋ก ํ๋ ๋น๋๊ธฐ ์ธ๋ถ ์ ํธ๊ฐ ํ์ํ ๋ค์ํ ์ํฉ์์ Publisher๋ฅผ ํ๋ผ๋ฏธํฐ๋ก์จ ์ ์ดํฉ๋๋ค. ๋ง์ฝ ํด๋น ๋์์ด ํธ๋ฆฌ๊ฑฐ๋๋ ๊ฒ์ ์ํ์ง ์๋๋ค๋ฉด never๋ฅผ ์ฌ์ฉํฉ๋๋ค.
8) Subscribeํ ๋ ๊ฒฐ์ ๋๋ ๊ฒฝ์ฐ.
public static <T> Flux<T> defer(Supplier<? extends Publisher<T>> supplier);
public static <T> Mono<T> defer(Supplier<? extends Mono<? extends T>> supplier);
9) ์ผํ์ฉ ์์์ ์์กดํ๋ ๊ฒฝ์ฐ.
public static <T,D> Flux<T> using(Callable<? extends D> resourceSupplier,
Function<? super D,? extends Publisher<? extends T>> sourceSupplier,
Consumer<? super D> resourceCleanup)
public static <T,D> Mono<T> using(Callable<? extends D> resourceSupplier,
Function<? super D,? extends Mono<? extends T>> sourceSupplier,
Consumer<? super D> resourceCleanup)
10) ํ๋ก๊ทธ๋๋ฐ ๋ฐฉ์์ผ๋ก ์ด๋ฒคํธ๋ฅผ ์์ฑํ๋ ๊ฒฝ์ฐ. (์ํ๋ฅผ ์ฌ์ฉํ ์ ์์)
๐ 2. ์กด์ฌํ๋ ์ํ์ค๋ฅผ ๋ณํํ๊ธฐ
..๊ณ์ํด์ ์ถ๊ฐ ์์
๐คท๐ปโ๏ธ ์ถ์ฒ
[๋งํฌ] https://guten-tag.tistory.com/481 (just vs fromSupplier)
[๋งํฌ] https://projectreactor.io/docs/core/release/reference/index.html#which-operator (๋ฆฌ์กํฐ 3 ๋ ํผ๋ฐ์ค ๊ฐ์ด๋)
[๋งํฌ] https://reactivex.io/documentation/operators/empty-never-throw.html (RX ๋ ํผ๋ฐ์ค)