ThreadPoolExecutor线程池源码分析

线程虽有不少了解,但是线程池却一直是处于一个模糊的概念,只知道大概,而并不知道具体的实现方式,本文打算从源码的角度分析JDK1.8中线程池实现的方式,我觉得看再多的文章阐述,都不如自己动手操作一遍,或者是翻开源码看看,这样底层数据结构是实现方法都能一目了然了。
ThreadPoolExecutor是一个灵活的,稳定的线程池,允许各种定制,本文主要基于ThreadPoolExecutor的源码进行分析。

ThreadPoolExecutor使用案例

在认识ThreadPoolExecutor之前我尝试写了一个简单的使用案例,就是构造一个线程池,然后循环丢给它一些任务,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public static void main(String[] args) throws InterruptedException {
//1.构造定制的线程池
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2, 8, 600, TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>());
//2.构造Runnable任务
Runnable runnable=new Runnable() {
public void run() {
System.out.println(Thread.currentThread().getId()+":"+Thread.currentThread().getName());
}
};
//3.将任务提交到线程池
while (true) {
threadPool.submit(runnable);
Thread.sleep(200);
}
}

基本上是按照下列两行进行循环,打印出来的结果如下:

1
2
13:pool-1-thread-1
14:pool-1-thread-2

ThreadPoolExecutor继承关系

下图描述的是线程池API的一部分。广义上的完整线程池可能还包括Thread/Runnable、Timer/TimerTask等部分。

其中红线框住的部分使我们主要关心的部分,Executor唯一定义了一个execute的接口,至于Executors还实现了若干线程池,如下图所示:

ThreadPoolExecutor构造方法分析

ThreadPoolExecutor重载了若干个构造方法,从以上案例触发则是通过下面的代码流程构造出线程池:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
//Executors中帮忙实现了默认的defaultThreadFactory()
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
//所有的构造方法最终调用到了下面的构造方法
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

corePoolSize表示线程池的基本大小,maximumPoolSize表示线程池的最大大小,workQueue为还没来得及执行的任务队列,这个主要是由阻塞队列BlockingQueue来维持这样的一个队列,线程每次执行完一个任务会到队列里面取出一个任务,keepAliveTime就是线程如果是一直没事干则可以存活的时间,超过这个时间就会被回收。这些变量定义为ThreadPoolExecutor的属性变量,如下所示:

1
2
3
4
5
6
private volatile int corePoolSize;
private volatile int maximumPoolSize;
private final BlockingQueue<Runnable> workQueue;//阻塞队列
private volatile long keepAliveTime;
private volatile ThreadFactory threadFactory;//构造线程的工厂
private volatile RejectedExecutionHandler handler;

不清楚为什么用阻塞队列的可以,看这篇文章,里面介绍了阻塞队列和非阻塞队列的关系。

线程池的五种状态

下面流程中会经常判断线程池的状态,所以此处先声明一下。

  • RUNNING在ThreadPoolExecutor被实例化的时候就是这个状态。
  • SHUTDOWN 通常是已经执行过shutdown()方法,不再接受新任务,等待线程池中和队列中任务完成。
  • STOP 通常是已经执行过shutdownNow()方法,不接受新任务,队列中的任务也不再执行,并尝试终止线程池中的线程。
  • TIDYING 线程池为空,就会到达这个状态,执行terminated()方法。
  • TERMINATED terminated()执行完毕,就会到达这个状态。
    代码常量如下:
    1
    2
    3
    4
    5
    private static final int RUNNING = -1 << COUNT_BITS;
    private static final int SHUTDOWN = 0 << COUNT_BITS;
    private static final int STOP = 1 << COUNT_BITS;
    private static final int TIDYING = 2 << COUNT_BITS;
    private static final int TERMINATED = 3 << COUNT_BITS;

ThreadPoolExecutor提交任务

构造Runnable执行任务这个应该不必多说,用过线程的应该明白。我们进入submit这个环节。会调用ThreadPoolExecutor的父类抽象类AbstractExecutorService的submit方法,代码如下:

1
2
3
4
5
6
7
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
//execute是关键的一步,这个execute就是顶级接口Executor中定义的唯一一个方法,而ThreadPoolExecutor实现了这个接口也就必须实现了这个方法。
execute(ftask);
return ftask;
}

execute方法

上一步的submit方法最终是调用线程池的execute方法,以下是方法具体实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/*进行下面三步:
1. 如果少于corePoolSize线程正在运行,请尝试使用给定Runnable对象启动一个新线程。调用addWorker函数会原子性的检查runState和workCount,通过返回false来防止在不应该添加线程时添加了线程。
2. 如果一个任务可以成功排队,那么我们还需要仔细检查一下线程池状态和活跃线程数量,如果线程池停止,则回滚刚才入队操作,或者当前线程池没有线程时,重新添加一个新线程。
3. 如果我们无法排队任务,那么我们需要尝试添加一个新线程。 如果失败,我们知道线程池被shutdown或已经饱和,所以拒绝任务。
*/
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
// 不成功则再次获取线程池控制状态
c = ctl.get();
}
// 线程池处于RUNNING状态,将命令(用户自定义的Runnable对象)添加进workQueue队列
if (isRunning(c) && workQueue.offer(command)) {
// 再次检查,获取线程池控制状态
int recheck = ctl.get();
// 线程池不处于RUNNING状态,将命令从workQueue队列中移除
if (! isRunning(recheck) && remove(command))
// 拒绝执行命令
reject(command);
else if (workerCountOf(recheck) == 0)
//如果worker数量为0,则添加worker
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}

总的来说,execute方法主要完成了两件事,一件就是将新任务入队,另外一件就是根据线程池的状态随时添加新线程。

addWorker方法

废话不多说直接上代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
//这个HashSet就是存放所有线程的一个数据结构,赞!
private final HashSet<Worker> workers = new HashSet<Worker>();
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
//如果队列为空,不创建新线程
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
...
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//创建了一个worker线程,如果不清楚worker可以先跳过下方,阅读下一节。
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
//上锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//获取线程池运行状态
int rs = runStateOf(ctl.get());
//如果线程池正在运行
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//这里出现了workers这个变量,这里是一个非常重要的数据结构,它大概可以解释一直悬停在我们脑海中的【线程池】这样的一个概念。
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;/
workerAdded = true;
}
} finally {
mainLock.unlock();//解锁
}
//解锁完之后,再启动线程运行
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
//如果最终启动失败,则要移除线程池中worker
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;//返回成功或者失败
}
//移除失败线程
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (w != null)
workers.remove(w);
decrementWorkerCount();
tryTerminate();
} finally {
mainLock.unlock();
}
}

除去上述方法的细节,addWorker方法主要做的事情就是创建一个worker线程,放入HashSet中,然后启动线程。到这里其实,提交任务到任务在线程池执行的一个大概流程基本清晰,然而当线程池里的线程主动去阻塞队列取任务的流程我们还尚不得知。

Worker类

Worker在JDK1.8中是ThreadPoolExecutor的内部类,名如其类,它就是一个worker,一个工人,专门负责执行任务的工人。
类的主要结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
final Thread thread;
Runnable firstTask;
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
//这里有个工厂类后面会再详述。
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}
...
}

这里的Worker不再仅仅是实现Runnable的run方法的简单类,也继承了AQS抽象类的一些特性,构造方法中用自身初始化了线程变量,赋值了task变量。在上一节addWorker方法中,最终调用start方法完成了worker线程的启动,所以run方法是核心方法,看下一节。

runWorker方法

还是先看runWorker方法的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;//获取worker线程需要执行的task任务
w.firstTask = null;//然后赋空
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 循环执行,任务不为null或者阻塞队列还存在任务
while (task != null || (task = getTask()) != null) {
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
}... finally {
afterExecute(task, thrown);
}
} finally {
task = null;//及时赋空,方便下次getTask
w.completedTasks++;// 增加给worker完成的任务数量
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}

此函数中会实际执行给定任务(即调用用户重写的run方法),并且当给定任务完成后,会继续从阻塞队列中取任务,直到阻塞队列为空(即任务全部完成)。

getTask方法

这个方法是在worker线程中的run方法中调用的,循环判断自己的task任务为空的时候,主动去阻塞队列中取任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
...
try {
int wc = workerCountOf(c);
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//取任务
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}

此函数用于从workerQueue阻塞队列中获取Runnable对象,由于是阻塞队列,所以支持非阻塞等待(poll)和阻塞等待(take)。前者就是等一会不等了,后者是一直等直到取出元素。

小结

到此无论是主动提交任务给新建线程执行,还是已有的线程自己到阻塞队列取任务执行,都应该清楚了然了。
从数据结构的角度来看,线程池主要使用了阻塞队列(BlockingQueue)和HashSet集合构成。
从任务提交的流程角度来看,对于使用线程池的外部来说,线程池的机制是这样的:

  1. 如果正在运行的线程数 < coreSize,马上创建线程执行该task,不排队等待;
  2. 如果正在运行的线程数 >= coreSize,把该task放入队列;
  3. 如果队列已满 && 正在运行的线程数 < maximumPoolSize,创建新的线程执行该task;
  4. 如果队列已满 && 正在运行的线程数 >= maximumPoolSize,线程池调用handler的reject方法拒绝本次提交。
    从worker线程自己的角度来看,当worker的task执行结束之后,循环往阻塞队列中取出任务执行。

参考

线程池 part 1 简介
JDK1.8源码分析之ThreadPoolExecutor(一)

-EOF-