关于 Java 线程池并发查询的问题

2021-04-16 17:01:02 +08:00
 zhangslob669

example

public void test() throws Exception {
        ExecutorService threadPoolExecutor = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
        List<Future> futureList = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            Future future = threadPoolExecutor.submit(() -> {
                System.out.println("do some work");
                int time = new Random().nextInt(2000);
                try {
                    Thread.sleep(time);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
            futureList.add(future);
        }

        for (Future future : futureList) {
            future.get(1000, TimeUnit.MILLISECONDS);
        }
    }

我想在 1 秒内,批量查询,如果某次查询超时,就不要结果。最后获取所有成功的查询结果

现在的写法是有问题的,每次从 futureList 获取结果都是阻塞的,最终结果肯定是大于 1 秒的,有没有好办法或者轮子?

2579 次点击
所在节点    Java
15 条回复
Ariver
2021-04-16 17:04:43 +08:00
你需要 Reactor.
DanielGuo
2021-04-16 17:06:22 +08:00
public void test() throws Exception {
ExecutorService threadPoolExecutor = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
List<Future> futureList = new ArrayList<>();
for (int i = 0; i < 10; i++) {
Future future = threadPoolExecutor.submit(() -> {
System.out.println("do some work");
int time = new Random().nextInt(2000);
try {
Thread.sleep(time);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
futureList.add(future);
}
Thread.sleep(1000);
boolean allDone = futureList.stream().map(f -> f.isDone()).allMatch(result -> result == true);
if (allDone) {
for (Future future : futureList) {
future.get();
}
}

}
DanielGuo
2021-04-16 17:07:04 +08:00
等待一秒,判断是否全部完成,然后获取结果。。。
guxingke
2021-04-16 17:08:08 +08:00
CompletableFuture.allOf(f1,f2...fn).get(timeout)

> 也许可以,没验证
zhangslob669
2021-04-16 17:09:53 +08:00
@DanielGuo 这样就必须强制等待了,并不是一种优雅的做法;而且项目里不允许写 Thread.sleep(1000);等代码
dqzcwxb
2021-04-16 17:19:01 +08:00
@guxingke #4 Completablefuture 可行
securityCoding
2021-04-16 17:19:50 +08:00
Completablefuture 直接用这个
SlipStupig
2021-04-16 17:21:51 +08:00
结果用异步回调
xiaoxinshiwo
2021-04-16 17:33:37 +08:00
CountDownLatch 不香吗
blisteringsands
2021-04-16 19:42:10 +08:00
submit()之后取一下当前时间,续 1 秒算出 deadline
每次 future.get 之前重新取当前时间,和 deadline 减一下算出等待时间
zzl22100048
2021-04-16 19:56:01 +08:00
你这要求完美符合 completablefuture
zhady009
2021-04-16 19:59:52 +08:00
CompletableFuture 有个 completeOnTimeout 超时的时候可以设置默认值给个 null
最后过滤掉为 null 的
zhady009
2021-04-16 20:09:49 +08:00
```java

@Test
public void demo() {
QueryTask var0 = new QueryTask(900);
QueryTask var1 = new QueryTask(2100);
QueryTask var2 = new QueryTask(2000);
QueryTask var3 = new QueryTask(2000);

Demo<QueryTask, Integer> test = new Demo<>(1000, List.of(var0, var1, var2, var3));
long l = System.currentTimeMillis();
Collection<Integer> d = test.execute();
System.out.println(System.currentTimeMillis() - l);
assert d.size() > 0;
for (Integer integer : d) {
assert integer <= 1000;
}
}
static class Demo<T extends Supplier<E>, E> {
private static final ExecutorService ES = Executors.newFixedThreadPool(10);
private final int timeout;
private final Collection<T> tasks;
Demo(int timeout, Collection<T> tasks) {
this.timeout = timeout;
this.tasks = tasks;
}
public List<E> execute() {
List<CompletableFuture<E>> collect = tasks.stream().map(x -> CompletableFuture.supplyAsync(x, ES)
.completeOnTimeout(null, timeout, TimeUnit.MILLISECONDS))
.collect(Collectors.toUnmodifiableList());

CompletableFuture<List<E>> listCompletableFuture = CompletableFuture.allOf(collect.toArray(new CompletableFuture[collect.size()]))
.thenApply(v -> collect.stream().map(CompletableFuture::join)
.filter(Objects::nonNull)
.collect(Collectors.toList()));
return listCompletableFuture.join();
}
}
static class QueryTask implements Supplier<Integer> {
private final int time;
QueryTask(int time) {
this.time = time;
}
@Override
public Integer get() {
try {
//query
Thread.sleep(time);
} catch (InterruptedException e) {
e.printStackTrace();
}
return time;
}
}

```
zhgg0
2021-04-17 00:37:05 +08:00
用线程池的 invokeAll 方法。
或者 timeout 每次 get 前实时算。
yazinnnn
2021-04-17 18:28:55 +08:00
val list = (0..9).map {
async {
withTimeoutOrNull(1000) {
val long = Random.nextLong(2000)
delay(long)
it
}
}
}
println(list.map { it.await() })


[0, 1, 2, 3, null, 5, 6, null, null, 9]

kotlin 协程可以简单实现...

或者 jdk11 用 CompletableFuture

或者 jdk8 用一下 vertx 的 Promise api...


fun main() {
println("start ${Date()}")
foo()
println("end ${Date()}")
}

var threadPoolExecutor = ThreadPoolExecutor(10, 10, 0L, TimeUnit.SECONDS, LinkedBlockingQueue())

fun foo() {
val future = (0..9).map { getSomething() }
println(future.map { it.result() })
Thread.sleep(1000)
println(future.map { it.result() })
}


fun getSomething(): Future<String> {
val promise = Promise.promise<String>()
threadPoolExecutor.execute {
Thread.sleep(Random.nextLong(1500))
val result = Random.nextLong(3000).toString()
promise.complete(result)
}
return promise.future()
}


start Sat Apr 17 17:56:12 CST 2021
[null, null, null, null, null, null, null, null, null, null]
[2255, null, 2370, 750, 1399, 2796, null, null, 39, null]
end Sat Apr 17 17:56:13 CST 2021

不过这样无法取消任务...

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/771133

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX