线程虽有不少了解,但是线程池却一直是处于一个模糊的概念,只知道大概,而并不知道具体的实现方式,本文打算从源码的角度分析JDK1.8中线程池实现的方式,我觉得看再多的文章阐述,都不如自己动手操作一遍,或者是翻开源码看看,这样底层数据结构是实现方法都能一目了然了。
ThreadPoolExecutor是一个灵活的,稳定的线程池,允许各种定制,本文主要基于ThreadPoolExecutor的源码进行分析。
ThreadPoolExecutor使用案例
在认识ThreadPoolExecutor之前我尝试写了一个简单的使用案例,就是构造一个线程池,然后循环丢给它一些任务,代码如下:
基本上是按照下列两行进行循环,打印出来的结果如下:
ThreadPoolExecutor继承关系
下图描述的是线程池API的一部分。广义上的完整线程池可能还包括Thread/Runnable、Timer/TimerTask等部分。
其中红线框住的部分使我们主要关心的部分,Executor唯一定义了一个execute的接口,至于Executors还实现了若干线程池,如下图所示:
ThreadPoolExecutor构造方法分析
ThreadPoolExecutor重载了若干个构造方法,从以上案例触发则是通过下面的代码流程构造出线程池:
corePoolSize表示线程池的基本大小,maximumPoolSize表示线程池的最大大小,workQueue为还没来得及执行的任务队列,这个主要是由阻塞队列BlockingQueue来维持这样的一个队列,线程每次执行完一个任务会到队列里面取出一个任务,keepAliveTime就是线程如果是一直没事干则可以存活的时间,超过这个时间就会被回收。这些变量定义为ThreadPoolExecutor的属性变量,如下所示:
不清楚为什么用阻塞队列的可以,看这篇文章,里面介绍了阻塞队列和非阻塞队列的关系。
线程池的五种状态
下面流程中会经常判断线程池的状态,所以此处先声明一下。
- RUNNING在ThreadPoolExecutor被实例化的时候就是这个状态。
- SHUTDOWN 通常是已经执行过shutdown()方法,不再接受新任务,等待线程池中和队列中任务完成。
- STOP 通常是已经执行过shutdownNow()方法,不接受新任务,队列中的任务也不再执行,并尝试终止线程池中的线程。
- TIDYING 线程池为空,就会到达这个状态,执行terminated()方法。
- TERMINATED terminated()执行完毕,就会到达这个状态。
代码常量如下:12345private static final int RUNNING = -1 << COUNT_BITS;private static final int SHUTDOWN = 0 << COUNT_BITS;private static final int STOP = 1 << COUNT_BITS;private static final int TIDYING = 2 << COUNT_BITS;private static final int TERMINATED = 3 << COUNT_BITS;
ThreadPoolExecutor提交任务
构造Runnable执行任务这个应该不必多说,用过线程的应该明白。我们进入submit这个环节。会调用ThreadPoolExecutor的父类抽象类AbstractExecutorService的submit方法,代码如下:
execute方法
上一步的submit方法最终是调用线程池的execute方法,以下是方法具体实现:
总的来说,execute方法主要完成了两件事,一件就是将新任务入队,另外一件就是根据线程池的状态随时添加新线程。
addWorker方法
废话不多说直接上代码:
除去上述方法的细节,addWorker方法主要做的事情就是创建一个worker线程,放入HashSet中,然后启动线程。到这里其实,提交任务到任务在线程池执行的一个大概流程基本清晰,然而当线程池里的线程主动去阻塞队列取任务的流程我们还尚不得知。
Worker类
Worker在JDK1.8中是ThreadPoolExecutor的内部类,名如其类,它就是一个worker,一个工人,专门负责执行任务的工人。
类的主要结构如下:
这里的Worker不再仅仅是实现Runnable的run方法的简单类,也继承了AQS抽象类的一些特性,构造方法中用自身初始化了线程变量,赋值了task变量。在上一节addWorker方法中,最终调用start方法完成了worker线程的启动,所以run方法是核心方法,看下一节。
runWorker方法
还是先看runWorker方法的实现:
此函数中会实际执行给定任务(即调用用户重写的run方法),并且当给定任务完成后,会继续从阻塞队列中取任务,直到阻塞队列为空(即任务全部完成)。
getTask方法
这个方法是在worker线程中的run方法中调用的,循环判断自己的task任务为空的时候,主动去阻塞队列中取任务。
此函数用于从workerQueue阻塞队列中获取Runnable对象,由于是阻塞队列,所以支持非阻塞等待(poll)和阻塞等待(take)。前者就是等一会不等了,后者是一直等直到取出元素。
小结
到此无论是主动提交任务给新建线程执行,还是已有的线程自己到阻塞队列取任务执行,都应该清楚了然了。
从数据结构的角度来看,线程池主要使用了阻塞队列(BlockingQueue)和HashSet集合构成。
从任务提交的流程角度来看,对于使用线程池的外部来说,线程池的机制是这样的:
- 如果正在运行的线程数 < coreSize,马上创建线程执行该task,不排队等待;
- 如果正在运行的线程数 >= coreSize,把该task放入队列;
- 如果队列已满 && 正在运行的线程数 < maximumPoolSize,创建新的线程执行该task;
- 如果队列已满 && 正在运行的线程数 >= maximumPoolSize,线程池调用handler的reject方法拒绝本次提交。
从worker线程自己的角度来看,当worker的task执行结束之后,循环往阻塞队列中取出任务执行。
参考
线程池 part 1 简介
JDK1.8源码分析之ThreadPoolExecutor(一)
-EOF-