作为一个开发初始化线程池通常会使用 Executors 类,然后调用 newFixedThreadPool 或者其他方法来初始化一个线程池,方法如下:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
Executors 中其实最终是初始了 ThreadPoolExecutor 类,上一篇Java 线程池前传已经讲了 ThreadPoolExecutor 线程池的一些关键属性。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
ThreadPoolExecutor 的构造方法中需要指定一些参数,并且这些参数会被线程池的一些属性所使用,这些我们会在后续的剖析线程池中都会提到。
任务调度是整个线程池的入口,当客户端提交了一个任务以后便进入整个阶段,整个任务的调度过程由 execute 方法完成,如下:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
// 1 & 2. addWorker 方法会检查线程池是否是 RUNNING 状态
if (addWorker(command, true))
return;
c = ctl.get();
}
// 1 & 3
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
上述代码的执行过程大致如下:
其中 workerCountOf(recheck) == 0 这一步也很关键,这一步主要是为了确保线程池中至少有一个线程去执行任务。
在上述流程中我们提到了阻塞任务队列(用于任务缓冲)、addWorker 方法(任务申请)、以及 reject 方法(任务拒绝策略),下面我们再来分析一下这三个关键点。
线程池的本质是对线程和任务的管理,为了做到这一点必须要将线程和任务解耦,不再直接关联,通过缓冲队列恰好可以解决这一点。线程池中的缓冲队列类似于生产者消费者模式,客户端线程往缓冲队列里提交任务,线程池中的线程则从缓冲队列中获得任务去执行。
目前 Java 线程中的默认缓冲队列是阻塞队列模式,主要有以下几种,这些缓冲队列必须要实现 BlockingQueue 接口:
队列 | 描述 |
---|---|
ArrayBlockingQueue | 使用数组实现的有界阻塞队列,先进先出,支持公平锁和非公平锁 |
LinkedBlockingQueue | 使用链表实现的有界阻塞队列,先进先出,默认链表长度 Integer.MAX_VALUE |
PriorityBlockingQueue | 一个使用数组实现支持线程优先级排序的无界队列,默认自然排序,也可以自定义实现 Comparator 来进行排序 |
DelayQueue | 一个采用 PriorityBlockingQueue 实现的延迟队列(组合的方式),在创建该对象中,可以指定任务添加至队列后才能被获取 |
SynchronousQueue | 一个不存储元素的阻塞队列,每一个任务入队操作必须等待一个任务的出队,否则不能添加新的任务 |
LinkedTransferQueue | 一个使用链表实现的无解阻塞队列,该队列支持将任务立即传递给消费者 |
LinkedBlockingDeque | 一个由双向链表实现的有界阻塞队列,队头队尾都可以添加任务消费任务 |
当工作线程数( workerCount )大于等于最大线程数( maximumPoolSize ),并且阻塞任务队列已满,线程池会执行具体的 RejectedExecutionHandler 策略。目前 Java 默认的拒绝策略主要有以下几种:
策略 | 描述 |
---|---|
AbortPolicy | 丢弃任务并抛出 RejectedExecutionException 异常 |
DiscardPolicy | 直接丢弃任务 |
DiscardOldestPolicy | 丢弃阻塞队列队头的任务,并重新提交被拒绝的任务 |
CallerRunsPolicy | 直接由调用线程处理被拒绝的任务 |
在工作线程池数未达到最大线程数并且阻塞队列未满时,我们可以将任务提交至线程池(有可能是开启新的线程,也有可能是将任务提交至阻塞队列)等待执行。其中 addWorker 方法便是开启新的线程执行任务。下面我们来看一下 addWorker 方法:
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 线程池如果不是运行状态,线程池根据以下条件来决定是否增加 Work 线程
// 1. 如果线程池不是 SHUTDOWN 状态,那么不允许在增加任何线程,返回 false
// 2. 如果线程池是 SHUTDOWN 状态(不允许接受新的任务),如果 firstTask 不为空表明是新的任务,不应该接受,所以返回 false
// 3. 如果线程池是 SHUTDOWN 状态,并且队列已空,此时也不需要增加线程所以返回 false
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
// 获取工作线程的数量
int wc = workerCountOf(c);
// 工作线程数与线程池容量比较,超过不允许增加线程
// core 为 true,工作与核心线程数比较,超过不允许增加线程
// core 为 false,工作与最大线程数比较,超过不允许增加线程
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// CAS 增加工作线程数,增加成功跳出 retry 循环
if (compareAndIncrementWorkerCount(c))
break retry;
// CAS 增加工作线程数失败,则重新获取 ctl 的值
c = ctl.get(); // Re-read ctl
// 判断线程池状态是否改变,如果线程池状态已经改变,则重新执行 retry 循环,否则执行内部循环,尝试增加线程数
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 根据 firstTask 来创建 Worker 对象
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 重新获取线程池的状态,防止在 mainLock 的 lock 方法执行之前,线程池状态改变
int rs = runStateOf(ctl.get());
// 只有线程池是运行状态或者线程池是 shutdown 并且任务是来自阻塞队列中( firstTask==null )才可以向线程池中增加线程
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 增加 worker,workers 是一个 HashSet
workers.add(w);
// 更新 largestPoolSize,largestPoolSize 代表了线程池中曾经出现过的最大线程数
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 启动线程
t.start();
workerStarted = true;
}
}
} finally {
// 如果 worker 创建或启动失败,修正线程池中的 worker 和 ctl 值
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
上述代码的核心逻辑就是根据线程池当前状态来决定是否开启新的线程来执行任务,线程具体的实现方式是采用一个 Worker 类来进行封装。
Worker 实现了 Runnable 接口,并继承了 AbstractQueuedSynchronizer ( AQS )。
不熟悉 AQS 的读者可以戳这里
final Thread thread;
Runnable firstTask;
volatile long completedTasks;
Worker 中存储了真实的线程( Thread )、该线程需要执行的第一个任务( firstTask )以及线程执行的任务数( completedTasks )。
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
// 使用 ThreadFactory 创建线程
this.thread = getThreadFactory().newThread(this);
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
Worker 线程采用 AQS 实现,使用 AQS 的独占锁功能,通过其 tryAcquire 方法可以看出 Worker 线程是不允许重入的,Worker 线程有以下特点:
构造方法中为什么要执行 setState(-1)方法 ?
setState 是 AQS 中的方法,默认值为 0,tryAcquire 方法是根据 state 是否是 0 来判断的,所以将 state 设置为-1 是为了禁止在执行任务前对线程进行中断,不明白的读者可以看一下 AQS 的 acquire(int arg)方法,如下:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
构造方法中的 getThreadFactory().newThread(this)作用是什么?
ThreadFactory 是在我们构造 ThreadPoolExecutor 时传入的,通过 ThreadFactory 我们可以设置线程的分组、线程的名字、线程的优先级、以及线程是否是 daemon 线程等相关信息。
public void run() {
runWorker(this);
}
Worker 线程获取任务工作是通过调用 ThreadPoolExecutor 中的 runWorker 方法,该方法的参数是 Worker 本身,下面我们看一下 Worker 线程的具体工作原理。
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
// 将 AQS 中的 state 修改为 0
w.unlock(); // allow interrupts
// 线程在执行任务中是否异常
boolean completedAbruptly = true;
try {
// 获取立即执行的任务,或者从阻塞队列中获取
while (task != null || (task = getTask()) != null) {
// 获取独占锁
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 任务执行前的操作
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 任务执行
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 任务执行后的操作
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
// 释放独占锁
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
线程工作的大致流程是:
这里的 beforeExecute 方法和 afterExecute 方法在 ThreadPoolExecutor 类中是空的,留给子类来实现。
completedAbruptly 变量来表示在执行任务过程中是否出现了异常,在 processWorkerExit 方法中会对该变量的值进行判断。
此部分代码的流程图如下:
private Runnable getTask() {
// timeOut 变量的值表示上次从阻塞队列中取任务时是否超时
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
/*
* 如果线程池状态是非 RUNNING 状态,需要进行以下判断:
* 1. rs >= STOP,线程池是否正在 stop ;
* 2. 阻塞队列是否为空。
* 如果以上条件有一个满足,则将 workerCount 减 1 并返回 null 。
* 因为如果当前线程池状态的值是 SHUTDOWN 或以上时,不允许再向阻塞队列中添加任务。
*/
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// timed 变量用于判断是否需要进行超时控制。
// allowCoreThreadTimeOut 默认是 false,也就是核心线程不允许进行超时;
// wc > corePoolSize,表示当前线程池中的线程数量大于核心线程数量;
// 对于超过核心线程数量的这些线程,需要进行超时控制
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
/*
* 重点
* wc > maximumPoolSize 的情况是因为可能在此方法执行阶段同时执行了 setMaximumPoolSize 方法;
* timed && timedOut 如果为 true,表示当前操作需要进行超时控制,并且上次从阻塞队列中获取任务发生了超时
* 接下来判断,如果有效线程数量大于 1,或者阻塞队列是空的,那么尝试将 workerCount 减 1 ;
* 如果减 1 失败,则返回重试。
* 如果 wc == 1 时,也就说明当前线程是线程池中唯一的一个线程了。
*/
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
/*
* 根据 timed 来判断,如果为 true,则通过阻塞队列的 poll 方法进行超时控制,如果在 keepAliveTime 时间内没有获取到任务,则返回 null ;
* 否则通过 take 方法,如果这时队列为空,则 take 方法会阻塞直到队列不为空。
*
*/
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
// 如果 r == null,说明已经超时,timedOut 设置为 true
timedOut = true;
} catch (InterruptedException retry) {
// 如果获取任务时当前线程发生了中断,则设置 timedOut 为 false 并返回循环重试
timedOut = false;
}
}
}
这里重要的地方是第二个 if 判断,目的是控制线程池的有效线程数量。由上文中的分析可以知道,在执行 execute 方法时,如果当前线程池的线程数量超过了 corePoolSize 且小于 maximumPoolSize,并且 workQueue 已满时,则可以增加工作线程,但这时如果超时没有获取到任务,也就是 timedOut 为 true 的情况,说明 workQueue 已经为空了,也就说明了当前线程池中不需要那么多线程来执行任务了,可以把多于 corePoolSize 数量的线程销毁掉,保持线程数量在 corePoolSize 即可。
什么时候会销毁? runWorker 方法执行完之后,也就是 Worker 中的 run 方法执行完,由 JVM 自动回收。
getTask 方法返回 null 时,在 runWorker 方法中会跳出 while 循环,然后会执行 processWorkerExit 方法。
获取任务的流程图如下:
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 如果是 Worker 线程正常结束,工作数量-1
if (completedAbruptly)
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 线程池的任务完成数量增加该 Worker 线程的任务完成数量
completedTaskCount += w.completedTasks;
// 从线程池维护的线程中移除该线程
workers.remove(w);
} finally {
mainLock.unlock();
}
// 根据线程池状态进行判断是否结束线程池
tryTerminate();
int c = ctl.get();
/*
* 当线程池是 RUNNING 或 SHUTDOWN 状态时,如果 worker 是异常结束,那么会直接 addWorker ;
* 如果 allowCoreThreadTimeOut=true,并且等待队列有任务,至少保留一个 worker ;
* 如果 allowCoreThreadTimeOut=false,workerCount 不少于 corePoolSize 。
*/
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN);
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
在 runWorker 方法中,执行任务时对 Worker 对象 w 进行了 lock 操作,为什么要在执行任务的时候对每个工作线程都加锁(lock)呢?
由上可知,shutdown 方法与 getTask 方法(从队列中获取任务时)存在竞态条件;
所以 Worker 继承自 AQS,在工作线程处理任务时会进行 lock,interruptIdleWorkers 在进行中断时会使用 tryLock 来判断该工作线程是否正在处理任务,如果 tryLock 返回 true,说明该工作线程当前未执行任务,这时才可以被中断。
这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。
V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。
V2EX is a community of developers, designers and creative people.