reactor.io 使用 Flux.create 创建 Flux 时,需要注意 consumer 会被多次调用,解决起来挺麻烦的

2021-08-02 11:44:54 +08:00
 git00ll

创建 flux 代码如下,其中的 long consumer 可能会被下游多次调用。

        Flux.create(new Consumer<FluxSink<Object>>() {
            @Override
            public void accept(FluxSink<Object> fluxSink) {
                fluxSink.onRequest(new LongConsumer() {
                    @Override
                    public void accept(long value) {
                        log.info("我被多次调用了 request:" + value);
                        for (long i = 0; i < value; i++) {
                            fluxSink.next("request:" + i);
                        }
                    }
                });
            }
        })

也就是说,我们不能决定下游调用的时机,调用的次数,调用的所在线程。这样就很容易产生 bug 。


FluxArray 解决此问题的办法是使用 Operators.addCap(REQUESTED, this, n) == 0判断, 只有返回为 0 时,才进行处理,否则将请求的 n 叠加到 request 后就 return 。

		public void request(long n) {
			if (Operators.validate(n)) {
				if (Operators.addCap(REQUESTED, this, n) == 0) {
					if (n == Long.MAX_VALUE) {
						fastPath();
					}
					else {
						slowPath(n);
					}
				}
			}
		}

我们自己写 Flux.create() 时也可以借鉴 FluxArray 的处理办法,但是这样就变得麻烦了。 不知道有什么现有封装好的实现没有??

1261 次点击
所在节点    Java
0 条回复

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/793120

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX