@
Vedar @
1194129822 @
lancelee01 @
micean 感谢各位的热情解答,我很受启发。再结合朋友给的例子,我仔细读了下源码,已经大致能复盘这个错误了。
**inner 线程池 reject 的原因:**
1. 主要原因:队列太小,这里给的是 1,实际每个 outer 线程要产生 3 个任务
2. 次要原因:outter 线程里面使用 countdownlatch 确实不能起到很好的限流作用,
**次要原因分析:**
如 runWorker()源码所示,run 执行完毕并不能代表线程任务执行完毕。这意味着 outter 线程与 inner 线程的空闲线程数可能不是 1:3 的关系。但这里可以通过让 outter 线程 sleep 等待 inner 先执行完成,规避这个因素的影响。规避后,问题还是会存在,说明不是主要原因。
**主要原因分析:**
先来看个案例
```
static class MyLinkedBlockingQueue<E> extends LinkedBlockingQueue<E> {
public MyLinkedBlockingQueue(int capacity) {
super(capacity);
}
@
Override public boolean offer(E o) {
System.out.println("任务加入,当前队列数:" + this.size());
return super.offer(o);
}
}
public static void main(String[] args) throws InterruptedException {
BlockingQueue queue = new MyLinkedBlockingQueue<>(1);
// 3 个线程的线程池
ThreadPoolExecutor taskPoolExecutor = new ThreadPoolExecutor(3, 3, 30, TimeUnit.SECONDS, queue);
// 先将线程池拉满
for (int i = 0; i < 3; i++) {
final int finalI = i;
taskPoolExecutor.execute(() -> {
logger.info("{}", finalI);
});
}
// 等待全部任务执行完
Thread.sleep(1000);
// 再次执行任务,发现每一个任务都触发加入队列操作。
for (int i = 10; i < 12; i++) {
final int finalI = i;
// 多线程更容易触发 reject
// new Thread(()-> taskPoolExecutor.execute(() ->
logger.info("{}", finalI))).start();
taskPoolExecutor.execute(() ->
logger.info("{}", finalI));
}
}
```
执行结果:
> 23:12:39.988 [pool-1-thread-3] INFO c.r.s.Demo8.lambda$main$0:34 - 2
23:12:39.988 [pool-1-thread-2] INFO c.r.s.Demo8.lambda$main$0:34 - 1
23:12:39.988 [pool-1-thread-1] INFO c.r.s.Demo8.lambda$main$0:34 - 0
任务加入,当前队列数:0
23:12:40.997 [pool-1-thread-3] INFO c.r.s.Demo8.lambda$null$1:46 - 10
任务加入,当前队列数:0
23:12:41.000 [pool-1-thread-2] INFO c.r.s.Demo8.lambda$null$1:46 - 11
跑完这个案例我感觉我根本不懂线程池,我翻了下源码:
```
public void execute(Runnable command) {
...
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 线程池满了后,直接不创建核心线程了
// 这里 isRunning 看的我懵逼,明明任务都执行完了,为啥还是 isRunning,先接受,后面再研究 [1]
// 然后就触发入队列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
```
我以为的线程池是:只要有空闲线程,任务是直接丢给线程去执行的。
**实际情况是:当核心线程数满,不管已有线程是否空闲,任务是先丢到队列,然后空闲线程从队列里面自取。**
案例中,我给的队列大小是 1,当队列满的时候,会扩容线程池到最大线程池大小到 12,此时如果队列是满的(不管线程是否空闲),继续添加就会 reject 。案例中每组有三个任务,只要线程从队列 take 任务不及时,队列很容易满,从而触发 reject 。
**验证:**
1. countDownLatch.await(); 后面加上 sleep,让 outter 线程等 inner 线程结束,排除最开始说的第二个因素的影响。
2. 将队列改成 3,适当调整线程执行时间(也可以不调),reject 很少触发或不触发。
3. 将队列改成 9,没有触发 reject
**总结:**
1. 这个任务表面是多线程嵌套调用,内外线程调度不确定性导致的线程池问题,其实本质是对线程池理解不对导致线程池滥用的问题。
2. 任务是添加到队列,空闲线程调用 take()获取,而不是有空闲线程就直接丢到空闲线程(实际任务也难以主动去找空闲线程,还容易造成等待,让线程自取则是生产消费的模式。)
3. isRunning(c) 这个方法以及相关机制,还要再研究一下。
再次感谢各位,如有不对的地方,还请指出。。