创建 flux 代码如下,其中的 long consumer 可能会被下游多次调用。
Flux.create(new Consumer<FluxSink<Object>>() {
@Override
public void accept(FluxSink<Object> fluxSink) {
fluxSink.onRequest(new LongConsumer() {
@Override
public void accept(long value) {
log.info("我被多次调用了 request:" + value);
for (long i = 0; i < value; i++) {
fluxSink.next("request:" + i);
}
}
});
}
})
也就是说,我们不能决定下游调用的时机,调用的次数,调用的所在线程。这样就很容易产生 bug 。
FluxArray 解决此问题的办法是使用 Operators.addCap(REQUESTED, this, n) == 0
判断,
只有返回为 0 时,才进行处理,否则将请求的 n 叠加到 request 后就 return 。
public void request(long n) {
if (Operators.validate(n)) {
if (Operators.addCap(REQUESTED, this, n) == 0) {
if (n == Long.MAX_VALUE) {
fastPath();
}
else {
slowPath(n);
}
}
}
}
我们自己写 Flux.create() 时也可以借鉴 FluxArray 的处理办法,但是这样就变得麻烦了。 不知道有什么现有封装好的实现没有??
这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。
V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。
V2EX is a community of developers, designers and creative people.