关于微服务并行调用优化的想法

2018-10-22 16:12:23 +08:00
 DeadLion

假设一个场景:对外提供一个 http 接口 A,A 接口内部实际上又是调用 B C D 三个微服务接口,B C D 之间没有依赖关系,可以同时执行,最后将结果组合起来返回。

我们正常一般都是顺序同步执行,假设每个服务用时 1 秒,那么至少需要 3 秒才能返回结果,假如 BCD 并行执行的话,理论上只要 1 秒就能返回结果了。

在 Java 里只想到了用多线程来做这样的优化,可以用 callable 这种可以获取异步返回结果的类。

在公司内网上看到有人说可以用 RxJava 来做这样的优化,但是花了些时间发现好像还不如上面的方法,也可能是自己对 RxJava 不熟悉,如果有熟悉 RxJava 的不吝赐教。还是用这里的例子来说明,在 RxJava 里 B C D 服务需要各封装成一个 observable,然后和 observer 用异步线程模式来关联。那么 BCD 会同时开始执行,但是我发现没有已经封装好的工具来阻塞主线程等大家一起执行完返回结果,callable 好歹还有个 get ()方法,不过也不好用就是了。最好还是借助 CountDownLatch 类的工具来协助。 不知道 RxJava 在服务端应用不多的原因是不是因为这个,实在是没有发现什么特别好的地方,唯一我觉得好用的可能就是在一个 observerable 里 onnext 链式调用,对于有层层依赖的情况可能比较好用,还有一些操作符之类的。

另外在知乎上也看到一篇同样主题的文章

希望大家来点建议。

4623 次点击
所在节点    Java
19 条回复
ysweics
2018-10-22 16:24:56 +08:00
同有这种疑惑,坐等大神
DeadLion
2018-10-22 16:38:21 +08:00
贴一个 Callable demo 出来

```
@RequestMapping("/testWhitCallable")
public String testWhitCallable() throws ExecutionException, InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(2);

ExecutorService es = Executors.newCachedThreadPool();

Future fb = es.submit(() -> {
Object result = b.methodB();
countDownLatch.countDown();
return result;
});

Future fa = es.submit(() -> {
Object result = a.methodA();
countDownLatch.countDown();
return result;
});
countDownLatch.await();
return fa.get().toString() + fb.get().toString();
}
```
sagaxu
2018-10-22 16:38:55 +08:00
可以用 future 来做,每个 http 请求一个 future,再聚合这 3 个 future 得到一个用来等待的 future。http 请求改成异步的,不要让它阻塞你的线程。
lhx2008
2018-10-22 16:39:18 +08:00
有一个 zip 方法,我用的是 reactor3 的,把各个请求任务发到不同线程上面,然后 zip 一起,再 onNext,reactor3 有一个 block 方法转回同步。rxjava 不太清楚。

或者 vert.x 的 future 也有一个类似的 ,叫 CompositeFuture,底层可以看到是 Countdown latch,所以其实 jdk 自带就是 countdownlatch
lhx2008
2018-10-22 16:46:20 +08:00
reacto3 demo,手机打的,意思一下

Mono.zip(Mono.fromCallable(), Mono.fromCallable(),Mono.fromCallable())
.doOnNext(xxx)
.doOnError(xxx)
.block
lhx2008
2018-10-22 16:48:05 +08:00
忘了发线程了

Mono.zip(Mono.fromCallable().subscribeOn(线程池), Mono.fromCallable().subsribeOn(),Mono.fromCallable().subscribeOn)
.doOnNext(xxx)
.doOnError(xxx)
.block
xcstream
2018-10-22 16:48:36 +08:00
java 有协程么
没有的话本质上就是多线程了
ffeii
2018-10-22 16:49:21 +08:00
jinhan13789991
2018-10-22 16:50:10 +08:00
android 开发一枚,我们一般用 combineLatest 来做一些表单验证操作。没有涉及到太多多线程。
你可以看一下 ~
https://blog.csdn.net/johnny901114/article/details/61191723
lhx2008
2018-10-22 16:53:59 +08:00
@ffeii 是 Mono.zip 吧,参考我上面的,Flux 会对流里面每个都合并。
hpeng
2018-10-22 16:57:42 +08:00
zhangwugui
2018-10-22 17:08:01 +08:00
对 RxJava 同样不太熟悉,楼上倒是也说了这个 CompletableFuture,这个是 JDK8 基于 Future 异步处理的增强,这个倒是可以用在这里。
DeadLion
2018-10-22 17:09:08 +08:00
搜到一篇 https://medium.com/@nithinmallya4/processing-streaming-data-with-spring-webflux-ed0fc68a14de

里面提到了很多方案,不过好像大家都说的差不多了

1.Java 5 提供的 Futures
2.Java 8 提供的 CompletableFutures
3.第三方库 RxJava
4.Spring 5 提供的 Reactive Streams,实际就是上面提到的 Reactor 实现的。
5.Spring 5 中的 Webflux,不过我看上面说只能基于 http 或者 websockets 请求
,不过我们说的不仅限于 http 形式的微服务,还有 rpc 也算。

@ysweics @lhx2008 @ffeii
janus77
2018-10-22 17:10:20 +08:00
zip 操作符即可
lhx2008
2018-10-22 17:14:02 +08:00
@DeadLion webflux 就是 reactor,只是 webflux 提供了一个配套的异步网络 API,如果是 RPC 的,Reactor 支持接入别的异步或者 fromCallable
DeadLion
2018-10-22 17:16:06 +08:00
@lhx2008 明白了
wengang285
2018-10-22 17:37:49 +08:00
promise/future
yaoliyc
2018-10-22 19:05:07 +08:00
jdk7 提供了 fork-join
v3exhost
2018-10-24 11:27:30 +08:00
```java
CompletableFuture<String> future1
= CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future2
= CompletableFuture.supplyAsync(() -> "Beautiful");
CompletableFuture<String> future3
= CompletableFuture.supplyAsync(() -> "World");

CompletableFuture<Void> combinedFuture
= CompletableFuture.allOf(future1, future2, future3);
```

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/499896

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX