Project Reactor,如何实现主线程消费报错时停止 Flux 流

2021-12-27 18:03:07 +08:00
 git00ll

如下代码,本意是将 flux 流发送到子线程处理,再将处理结果汇聚到主线程。如何能够实现主线程处理报错时停止 Flux 的呢。

    public static void main(String[] args) {
        String[] data = {"2", "2", "2", "0", "8", "9", "10", "11", "12", "13", "14", "15"};

        Iterable<Integer> integers = Flux.fromArray(data)
                .flatMapSequential(s -> Mono.fromSupplier(() -> Integer.parseInt(s)).subscribeOn(Schedulers.boundedElastic()), 3)
                .doOnNext(s -> {
                    System.out.println(Thread.currentThread().getName() + "---------->" + s);
                })
                .toIterable();


        for (Integer i : integers) {
            //如何实现这里报错时,停止 Flux
            System.out.println((10 / i) + "------>>>>" + Thread.currentThread().getName());
        }
    }

1893 次点击
所在节点    Java
5 条回复
yazinnnn
2021-12-28 09:14:58 +08:00
fun main() {
val data = arrayOf("2", "2", "2", "0", "8", "9", "10", "11", "12", "13", "14", "15")

val integers = Flux.fromArray(data)
.flatMapSequential({ s: String ->
Mono.fromSupplier { s.toInt() }.subscribeOn(Schedulers.boundedElastic())
}, 3)
val a = integers.doOnNext { s ->
println(Thread.currentThread().name + "---------->" + s)
}.subscribe()

for (i in integers.toIterable()) {
try {
10 / i
} catch (t: Throwable) {
a.dispose()
}
}

CountDownLatch(1).await()
}


不清楚你的具体需求,如果整个链路不使用 reactivestream 的话,似乎性能(吞吐)并没什么提高
toIterable()迭代时会阻塞当前线程,这样写跟直接用线程池处理比没啥优点
Macolor21
2021-12-28 09:22:53 +08:00
CompletableFuture 最后 join 回主线程?
git00ll
2021-12-28 10:39:40 +08:00
@yazinnnn 假设 toInt 这个操作是比较耗时的,可以实现将 toInt 放置在多核上运行,最终结果再汇聚到主线程上。
因为主线程上开启了传统注解事务,需要在主线程上操作 Flux 的处理结果
yazinnnn
2021-12-28 11:08:07 +08:00
@git00ll
那直接使用 stream 的并行 api 或者 2 楼的 CompletableFuture 是否更适合你的场景?
git00ll
2021-12-28 12:50:30 +08:00
@yazinnnn
业务场景里有一些限制,
1. 需要保持输入和输出的顺序一致,
2. 流中的数据从文件中读取的,数据量非常大,无法全部加载到内存。只能边读取边处理。
3.处理过程中其中一条处理错误时,算失败,中断流不再继续。

一方面 stream 的并行流没有拉模式,无法精准控制载入内存的数据行数。
且并行 stream 提供的 api 太少,相比于 reactor 提供的控制选项不足

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/824749

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX