public static void test() {
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
String supplyAsyncResult = " "+Thread.currentThread().getName()+" Hello world! ";
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(supplyAsyncResult);
return supplyAsyncResult;
}).thenApplyAsync(r -> { //添加后续任务
String thenApplyResult = Thread.currentThread().getName()+r + " thenApply! ";
System.out.println(thenApplyResult);
return thenApplyResult;
});
try {
System.out.println(completableFuture.get() + " finish!");
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
打印:
ForkJoinPool.commonPool-worker-9 Hello world!
ForkJoinPool.commonPool-worker-9 ForkJoinPool.commonPool-worker-9 Hello world! thenApply!
ForkJoinPool.commonPool-worker-9 ForkJoinPool.commonPool-worker-9 Hello world! thenApply! finish!
public void run() {
CompletableFuture<T> d; Supplier<T> f;
if ((d = dep) != null && (f = fn) != null) {
dep = null; fn = null; //只是为了防止内存泄漏,方便 GC
if (d.result == null) {
try {
d.completeValue(f.get()); //执行 task
} catch (Throwable ex) { //执行 task 期间抛出了异常
d.completeThrowable(ex);
}
}
d.postComplete();
}
}
从源码上来看,supplyAsync 新起了一个线程,等到线程执行完 task,开始执行 d.postComplete(),即开始执行后续 task,然后 postComplete 会执行后续 task 的 completion 对象的 tryFire 方法。
static final class UniApply<T,V> extends UniCompletion<T,V> {
Function<? super T,? extends V> fn;
UniApply(Executor executor, CompletableFuture<V> dep,
CompletableFuture<T> src,
Function<? super T,? extends V> fn) {
super(executor, dep, src); this.fn = fn;
}
final CompletableFuture<V> tryFire(int mode) {
CompletableFuture<V> d; CompletableFuture<T> a;
if ((d = dep) == null ||
!d.uniApply(a = src, fn, mode > 0 ? null : this))//这里会发现前一个 stage 执行完毕,但提供了线程池
return null;
dep = null; src = null; fn = null;
return d.postFire(a, mode);
}
}
final <S> boolean uniApply(CompletableFuture<S> a,
Function<? super S,? extends T> f,
UniApply<S,T> c) {
Object r; Throwable x;
if (a == null || (r = a.result) == null || f == null)
return false;
tryComplete: if (result == null) {
if (r instanceof AltResult) {
if ((x = ((AltResult)r).ex) != null) {
completeThrowable(x, r);
break tryComplete;
}
r = null;
}
try {
if (c != null && !c.claim())//会执行到这里,然后发现 claim 返回 false
return false;
@SuppressWarnings("unchecked") S s = (S) r;
completeValue(f.apply(s));
} catch (Throwable ex) {
completeThrowable(ex);
}
}
return true;
}
final boolean claim() {
Executor e = executor;
if (compareAndSetForkJoinTaskTag((short)0, (short)1)) {
if (e == null)
return true;
executor = null; // disable
e.execute(this); //会执行到这里,然后把 this completion 对象提交给线程池执行,当前线程即将返回
}
return false;
}
我的问题在于,当 worker-9 线程执行完第一个 task 之后,它把第二个 task 提交给了 executor (e.execute(this)
),然后线程就返回了(从 claim 函数一层一层返回,直到返回 postComplete )。那为什么第二个 task 从打印结果来看,还是同一个 worker-9 线程来执行的?
还是说,只是因为我的例子比较简单,所以 executor 没有分配一个新的线程出来,其他情况下,thenApplyAsync 里面在执行e.execute(this)
时,还是有可能新起一个线程的吗?
这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。
V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。
V2EX is a community of developers, designers and creative people.