如下代码,本意是将 flux 流发送到子线程处理,再将处理结果汇聚到主线程。如何能够实现主线程处理报错时停止 Flux 的呢。
public static void main(String[] args) {
String[] data = {"2", "2", "2", "0", "8", "9", "10", "11", "12", "13", "14", "15"};
Iterable<Integer> integers = Flux.fromArray(data)
.flatMapSequential(s -> Mono.fromSupplier(() -> Integer.parseInt(s)).subscribeOn(Schedulers.boundedElastic()), 3)
.doOnNext(s -> {
System.out.println(Thread.currentThread().getName() + "---------->" + s);
})
.toIterable();
for (Integer i : integers) {
//如何实现这里报错时,停止 Flux
System.out.println((10 / i) + "------>>>>" + Thread.currentThread().getName());
}
}
1
yazinnnn 2021-12-28 09:14:58 +08:00
fun main() {
val data = arrayOf("2", "2", "2", "0", "8", "9", "10", "11", "12", "13", "14", "15") val integers = Flux.fromArray(data) .flatMapSequential({ s: String -> Mono.fromSupplier { s.toInt() }.subscribeOn(Schedulers.boundedElastic()) }, 3) val a = integers.doOnNext { s -> println(Thread.currentThread().name + "---------->" + s) }.subscribe() for (i in integers.toIterable()) { try { 10 / i } catch (t: Throwable) { a.dispose() } } CountDownLatch(1).await() } 不清楚你的具体需求,如果整个链路不使用 reactivestream 的话,似乎性能(吞吐)并没什么提高 toIterable()迭代时会阻塞当前线程,这样写跟直接用线程池处理比没啥优点 |
2
Macolor21 2021-12-28 09:22:53 +08:00
CompletableFuture 最后 join 回主线程?
|
3
git00ll OP @yazinnnn 假设 toInt 这个操作是比较耗时的,可以实现将 toInt 放置在多核上运行,最终结果再汇聚到主线程上。
因为主线程上开启了传统注解事务,需要在主线程上操作 Flux 的处理结果 |