线程池原理

为什么要用线程池?

使用线程池来管理线程,使用线程池管理线程主要有如下好处:

  1. 降低资源消耗。 通过重复利用已创建的线程降低线程创建和销毁造成的消耗。

  2. 提升系统响应速度。通过复用线程,省去创建线程的过程,因此整体上提升了系统的响应速度;

  3. 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,有可能导致系统资源不足而产生阻塞的情况,还会降低系统的稳定性,因此,需要使用线程池来管理线程。

线程池状态

程池是有状态的,线程池的开启到关闭的过程就是线程池状态的一个流转的过程。
线程池共有五种状态:

状态 含义
RUNNING 运行状态,该状态下线程池可以接受新的任务,也可以处理阻塞队列中的任务,执行 shutdown 方法可进入 SHUTDOWN 状态,执行 shutdownNow 方法可进入 STOP 状态
SHUTDOWN 待关闭状态,不再接受新的任务,继续处理阻塞队列中的任务,当阻塞队列中的任务为空,并且工作线程数为0时,进入 TIDYING 状态
STOP 停止状态,不接收新任务,也不处理阻塞队列中的任务,并且会尝试结束执行中的任务, 当工作线程数为0时,进入 TIDYING 状态
TIDYING 整理状态,此时任务都已经执行完毕,并且也没有工作线程, 执行 terminated 方法后进入 TERMINATED 状态
TERMINATED 终止状态,此时线程池完全终止了,并完成了所有资源的释放

如何创建线程池

《阿里巴巴Java开发手册》中强制线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。

Executors 返回线程池对象的弊端如下:

  • FixedThreadPool 和 SingleThreadExecutor : 允许请求的队列长度为 Integer.MAX_VALUE ,可能堆积大量的请求,从而导致OOM。
  • CachedThreadPool 和 ScheduledThreadPool : 允许创建的线程数量为 Integer.MAX_VALUE ,可能会创建大量线程,从而导致OOM。
  1. FixedThreadPool : 该方法返回一个固定线程数量的线程池。该线程池中的线程数量始终不变。当有一个新的任务提交时,线程池中若有空闲线程,则立即执行。若没有,则新的任务会被暂存在一个任务队列中,待有线程空闲时,便处理在任务队列中的任务。
  1. SingleThreadExecutor: 方法返回一个只有一个线程的线程池。若多余一个任务被提交到该线程池,任务会被保存在一个任务队列中,待线程空闲,按先入先出的顺序执行队列中的任务。
  1. CachedThreadPool: 该方法返回一个可根据实际情况调整线程数量的线程池。线程池的线程数量不确定,但若有空闲线程可以复用,则会优先使用可复用的线程。若所有线程均在工作,又有新的任务提交,则会创建新的线程处理任务。所有线程在当前任务执行完毕后,将返回线程池进行复用。

ThreadPoolExecutor提供了构造函数:

1
2
3
4
5
6
7
ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)

重要属性

线程池状态和工作线程数量

首先线程池是有状态的,不同状态下线程池的行为是不一样的。

另外线程池肯定是需要线程去执行具体的任务的,所以在线程池中就封装了一个内部类 Worker 作为工作线程,每个 Worker 中都维持着一个 Thread。

线程池的重点之一就是控制线程资源合理高效的使用,所以必须控制工作线程的个数,所以需要保存当前线程池中工作线程的个数。

看到这里,你是否觉得需要用两个变量来保存线程池的状态和线程池中工作线程的个数呢?但是在 ThreadPoolExecutor 中只用了一个 AtomicInteger 型的变量就保存了这两个属性的值,那就是 ctl。

ctl 的高3位用来表示线程池的状态(runState),低29位用来表示工作线程的个数(workerCnt)

为什么要用3位来表示线程池的状态呢?
原因是线程池一共有5种状态,而2位只能表示出4种情况,所以至少需要3位才能表示得了5种状态。

核心线程数和最大线程数

核心线程数:corePoolSize 用来表示线程池中的核心线程的数量,也可以称为可闲置的线程数量。

线程池新建线程的时候,如果当前线程总数小于corePoolSize,则新建的是核心线程,如果超过corePoolSize,则新建的是非核心线程
核心线程默认情况下会一直存活在线程池中,即使这个核心线程啥也不干(闲置状态)。
如果指定ThreadPoolExecutor的allowCoreThreadTimeOut这个属性为true,那么核心线程如果不干活(闲置状态)的话,超过一定时间(时长下面参数决定),就会被销毁掉

最大线程数:maximumPoolSize 用来表示线程池中最多能够创建的线程数量。

最大线程数 = 核心线程数 + 非核心线程数

现在我们有一个疑问,既然已经有了标识工作线程的个数的变量了,为什么还要有核心线程数、最大线程数呢?

其实你这样想就能够理解了,创建线程是有代价的,不能每次要执行一个任务时就创建一个线程,但是也不能在任务非常多的时候,只有少量的线程在执行,这样任务是来不及处理的,而是应该创建合适的足够多的线程来及时的处理任务。随着任务数量的变化,当任务数明显很小时,原本创建的多余的线程就没有必要再存活着了,因为这时使用少量的线程就能够处理的过来了,所以说真正工作的线程的数量,是随着任务的变化而变化的。

那核心线程数和最大线程数与工作线程个数的关系是什么呢?


工作线程的个数可能从0到最大线程数之间变化,当执行一段时间之后可能维持在 corePoolSize,但也不是绝对的,取决于核心线程是否允许被超时回收。

非核心线程存活时间-keepAliveTime

一个非核心线程,如果不干活(闲置状态)的时长超过这个参数所设定的时长,就会被销毁掉。
如果设置allowCoreThreadTimeOut = true,则会作用于核心线程。

创建线程的工厂

线程该如何来创建呢?这个任务就交给了线程工厂 ThreadFactory 来完成。

缓存任务的阻塞队列

当线程池接收到一个任务时,如果工作线程数没有达到corePoolSize,那么就会新建一个线程,并绑定该任务,直到工作线程的数量达到 corePoolSize 前都不会重用之前的线程。当工作线程数达到corePoolSize了,这时又接收到新任务时,会将任务存放在一个阻塞队列中等待核心线程去执行。如果队列满了,则新建非核心线程执行任务。

为什么不直接创建更多的线程来执行新任务呢?
原因是核心线程中很可能已经有线程执行完自己的任务了,或者有其他线程马上就能处理完当前的任务,并且接下来就能投入到新的任务中去,所以阻塞队列是一种缓冲的机制,给核心线程一个机会让他们充分发挥自己的能力。另外一个值得考虑的原因是,创建线程毕竟是比较昂贵的,不可能一有任务要执行就去创建一个新的线程。

常用的workQueue类型
  1. SynchronousQueue:这个队列接收到任务的时候,会直接提交给线程处理,而不保留它。如果所有线程都在工作怎么办?那就新建一个线程来处理这个任务!所以为了保证不出现<线程数达到了maximumPoolSize而不能新建线程>的错误,使用这个类型队列的时候,maximumPoolSize一般指定成Integer.MAX_VALUE,即无限大。

  2. LinkedBlockingQueue:这个队列接收到任务的时候,由于这个队列没有最大值限制,即所有超过核心线程数的任务都将被添加到队列中,这也就导致了maximumPoolSize的设定失效,因为总线程数永远不会超过corePoolSize。

  3. ArrayBlockingQueue:可以限定队列的长度。

  4. DelayQueue:队列内元素必须实现Delayed接口,这就意味着你传进去的任务必须先实现Delayed接口。这个队列接收到任务时,首先先入队,只有达到了指定的延时时间,才会执行任务

拒绝策略

如果是有界的阻塞队列,那就存在队列满的情况,也存在工作线程的数据已经达到最大线程数的时候。如果这时候再有新的任务提交时,显然线程池已经心有余而力不足了,因为既没有空余的队列空间来存放该任务,也无法创建新的线程来执行该任务了,所以这时我们就需要有一种拒绝策略,即 handler。

拒绝策略是一个 RejectedExecutionHandler 类型的变量,用户可以自行指定拒绝的策略,如果不指定的话,线程池将使用默认的拒绝策略:抛出异常。

AbortPolicy: 直接拒绝所提交的任务,并抛出RejectedExecutionException异常;
CallerRunsPolicy:只用调用者所在的线程来执行任务;
DiscardPolicy:不处理直接丢弃掉任务;
DiscardOldestPolicy:丢弃掉阻塞队列中存放时间最久的任务,执行当前任务

工作流程


整个过程可以拆分成以下几个部分:

提交任务

当向线程池提交一个新的任务时,线程池有三种处理情况,分别是:创建一个工作线程来执行该任务、将任务加入阻塞队列、拒绝该任务。

提交任务的过程也可以拆分成以下几个部分:

  • 当工作线程数小于核心线程数时,直接创建新的核心工作线程

  • 当工作线程数大于核心线程数时,就需要尝试将任务添加到阻塞队列中去

  • 如果能够加入成功,说明队列还没有满,那么需要做以下的二次验证来保证添加进去的任务能够成功被执行

    1、验证当前线程池的运行状态,如果是非RUNNING状态,则需要将任务从阻塞队列中移除,然后拒绝该任务

    2、验证当前线程池中的工作线程的个数,如果为0,则需要主动添加一个空工作线程来执行刚刚添加到阻塞队列中的任务

  • 如果加入失败,则说明队列已经满了,那么这时就需要创建新的“临时”工作线程来执行任务

    1、 如果创建成功,则直接执行该任务

    2、 如果创建失败,则说明工作线程数已经等于最大线程数了,则只能拒绝该任务了

整个过程可以用下面这张图来表示:

创建工作线程

创建工作线程需要做一系列的判断,需要确保当前线程池可以创建新的线程之后,才能创建。

首先,当线程池的状态是 SHUTDOWN 或者 STOP 时,则不能创建新的线程。

另外,当线程工厂创建线程失败时,也不能创建新的线程。

还有就是当前工作线程的数量与核心线程数、最大线程数进行比较,如果前者大于后者的话,也不允许创建。

除此之外,会尝试通过 CAS 来自增工作线程的个数,如果自增成功了,则会创建新的工作线程,即 Worker 对象。

然后加锁进行二次验证是否能够创建工作线程,最后如果创建成功,则会启动该工作线程。

启动工作线程

当工作线程创建成功后,也就是 Worker 对象已经创建好了,这时就需要启动该工作线程,让线程开始干活了,Worker 对象中关联着一个 Thread,所以要启动工作线程的话,只要通过 worker.thread.start() 来启动该线程即可。

启动完了之后,就会执行 Worker 对象的 run 方法,而不是 start()。因为 Worker 实现了 Runnable 接口,所以本质上 Worker 也是一个线程。

通过线程 start 开启之后就会调用到 Runnable 的 run 方法,在 worker 对象的 run 方法中,调用了 runWorker(this) 方法,也就是把当前对象传递给了 runWorker 方法,让他来执行。

获取任务并执行

在 runWorker 方法被调用之后,就是执行具体的任务了,首先需要拿到一个可以执行的任务,而 Worker 对象中默认绑定了一个任务,如果该任务不为空的话,那么就是直接执行。

执行完了之后,就会去阻塞队列中获取任务来执行,而获取任务的过程,需要考虑当前工作线程的个数。

  • 如果工作线程数大于核心线程数,那么就需要通过 poll 来获取,因为这时需要对闲置的线程进行回收;

  • 如果工作线程数小于等于核心线程数,那么就可以通过 take 来获取了,因此这时所有的线程都是核心线程,不需要进行回收,前提是没有设置 allowCoreThreadTimeOut

ThreadPoolExecutor的execute方法

execute方法源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* clt记录着runState和workerCount
*/
int c = ctl.get();
/*
* workerCountOf方法取出低29位的值,表示当前活动的线程数;
* 如果当前活动线程数小于corePoolSize,则新建一个线程放入线程池中;
* 并把任务添加到该线程中。
*/
if (workerCountOf(c) < corePoolSize) {
/*
* addWorker中的第二个参数表示限制添加线程的数量是根据corePoolSize来判断还是maximumPoolSize来判断;
* 如果为true,根据corePoolSize来判断;
* 如果为false,则根据maximumPoolSize来判断
*/
if (addWorker(command, true))
return;
/*
* 如果添加失败,则重新获取ctl值
*/
c = ctl.get();
}
/*
* 如果当前线程池是运行状态并且任务添加到队列成功
*/
if (isRunning(c) && workQueue.offer(command)) {
// 重新获取ctl值
int recheck = ctl.get();
// 再次判断线程池的运行状态,如果不是运行状态,由于之前已经把command添加到workQueue中了,
// 这时需要移除该command
// 执行过后通过handler使用拒绝策略对该任务进行处理,整个方法返回
if (! isRunning(recheck) && remove(command))
reject(command);
/*
* 获取线程池中的有效线程数,如果数量是0,则执行addWorker方法
* 这里传入的参数表示:
* 1. 第一个参数为null,表示在线程池中创建一个线程,但不去启动;
* 2. 第二个参数为false,将线程池的有限线程数量的上限设置为maximumPoolSize,添加线程时根据maximumPoolSize来判断;
* 如果判断workerCount大于0,则直接返回,在workQueue中新增的command会在将来的某个时刻被执行。
*/
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
/*
* 如果执行到这里,有两种情况:
* 1. 线程池已经不是RUNNING状态;
* 2. 线程池是RUNNING状态,但workerCount >= corePoolSize并且workQueue已满。
* 这时,再次调用addWorker方法,但第二个参数传入为false,将线程池的有限线程数量的上限设置为maximumPoolSize;
* 如果失败则拒绝该任务
*/
else if (!addWorker(command, false))
reject(command);
}

这里要注意一下addWorker(null, false);,也就是创建一个线程,但并没有传入任务,因为任务已经被添加到workQueue中了,所以worker在执行的时候,会直接从workQueue中获取任务。所以,在workerCountOf(recheck) == 0时执行addWorker(null, false);也是为了保证线程池在RUNNING状态下必须要有一个线程来执行任务。

execute方法的执行示意图:

addWorker方法

addWorker方法的主要工作是在线程池中创建一个新的线程并执行,firstTask参数 用于指定新增的线程执行的第一个任务,core参数为true表示在新增线程时会判断当前活动线程数是否少于corePoolSize,false表示新增线程前需要判断当前活动线程数是否少于maximumPoolSize,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
// 获取运行状态
int rs = runStateOf(c);

/*
* 这个if判断
* 如果rs >= SHUTDOWN,则表示此时不再接收新任务;
* 接着判断以下3个条件,只要有1个不满足,则返回false:
* 1. rs == SHUTDOWN,这时表示关闭状态,不再接受新提交的任务,但却可以继续处理阻塞队列中已保存的任务
* 2. firsTask为空
* 3. 阻塞队列不为空
*
* 首先考虑rs == SHUTDOWN的情况
* 这种情况下不会接受新提交的任务,所以在firstTask不为空的时候会返回false;
* 然后,如果firstTask为空,并且workQueue也为空,则返回false,
* 因为队列中已经没有任务了,不需要再添加线程了
*/
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
// 获取线程数
int wc = workerCountOf(c);
// 如果wc超过CAPACITY,也就是ctl的低29位的最大值(二进制是29个1),返回false;
// 这里的core是addWorker方法的第二个参数,如果为true表示根据corePoolSize来比较,
// 如果为false则根据maximumPoolSize来比较。
//
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 尝试增加workerCount,如果成功,则跳出第一个for循环
if (compareAndIncrementWorkerCount(c))
break retry;
// 如果增加workerCount失败,则重新获取ctl的值
c = ctl.get(); // Re-read ctl
// 如果当前的运行状态不等于rs,说明状态已被改变,返回第一个for循环继续执行
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 根据firstTask来创建Worker对象
w = new Worker(firstTask);
// 每一个Worker对象都会创建一个线程
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
// rs < SHUTDOWN表示是RUNNING状态;
// 如果rs是RUNNING状态或者rs是SHUTDOWN状态并且firstTask为null,向线程池中添加线程。
// 因为在SHUTDOWN时不会在添加新的任务,但还是会执行workQueue中的任务
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// workers是一个HashSet
workers.add(w);
int s = workers.size();
// largestPoolSize记录着线程池中出现过的最大线程数量
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 启动线程
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}

这里的t.start()这个语句,启动时会调用Worker类中的run方法,Worker本身实现了Runnable接口,所以一个Worker类型的对象也是一个线程。

Worker类

线程池中的每一个线程被封装成一个Worker对象,ThreadPool维护的其实就是一组Worker对象,看一下Worker的定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}

线程池的关闭

关闭线程池,可以通过shutdown和shutdownNow这两个方法。它们的原理都是遍历线程池中所有的线程,然后依次中断线程。shutdown和shutdownNow还是有不一样的地方:

  1. shutdownNow首先将线程池的状态设置为STOP,然后尝试停止所有的正在执行和未执行任务的线程,并返回等待执行任务的列表;

  2. shutdown只是将线程池的状态设置为SHUTDOWN状态,然后中断所有没有正在执行任务的线程

可以看出shutdown方法会将正在执行的任务继续执行完,而shutdownNow会直接中断正在执行的任务。调用了这两个方法的任意一个,isShutdown方法都会返回true,当所有的线程都关闭成功,才表示线程池成功关闭,这时调用isTerminated方法才会返回true。

线程池的监控

通过线程池提供的参数进行监控。线程池里有一些属性在监控线程池的时候可以使用

  • getTaskCount:线程池已经执行的和未执行的任务总数;
  • getCompletedTaskCount:线程池已完成的任务数量,该值小于等于taskCount;
  • getLargestPoolSize:线程池曾经创建过的最大线程数量。通过这个数据可以知道线程池是否满过,也就是达到了maximumPoolSize;
  • getPoolSize:线程池当前的线程数量;
  • getActiveCount:当前线程池中正在执行任务的线程数量。

通过这些方法,可以对线程池进行监控,在ThreadPoolExecutor类中提供了几个空方法,如beforeExecute方法,afterExecute方法和terminated方法,可以扩展这些方法在执行前或执行后增加一些新的操作,例如统计线程池的执行任务的时间等,可以继承自ThreadPoolExecutor来进行扩展。

如何合理配置线程池参数

CPU密集型任务配置尽可能少的线程数量,如配置Ncpu+1个线程的线程池。

IO密集型任务则由于需要等待IO操作,线程并不是一直在执行任务,则配置尽可能多的线程,如2xNcpu。
混合型的任务,如果可以拆分,则将其拆分成一个CPU密集型任务和一个IO密集型任务,只要这两个任务执行的时间相差不是太大,那么分解后执行的吞吐率要高于串行执行的吞吐率,如果这两个任务执行时间相差太大,则没必要进行分解。我们可以通过Runtime.getRuntime().availableProcessors()方法获得当前设备的CPU个数。

优先级不同的任务可以使用优先级队列PriorityBlockingQueue来处理。它可以让优先级高的任务先得到执行,需要注意的是如果一直有优先级高的任务提交到队列里,那么优先级低的任务可能永远不能执行。

执行时间不同的任务可以交给不同规模的线程池来处理,或者也可以使用优先级队列,让执行时间短的任务先执行。

依赖数据库连接池的任务,因为线程提交SQL后需要等待数据库返回结果,如果等待的时间越长CPU空闲时间就越长,那么线程数应该设置越大,这样才能更好的利用CPU。并且,阻塞队列最好是使用有界队列,如果采用无界队列的话,一旦任务积压在阻塞队列中的话就会占用过多的内存资源,甚至会使得系统崩溃。

执行execute()方法和submit()方法的区别是什么呢?

1)execute() 方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功与否;

2)submit()方法用于提交需要返回值的任务。线程池会返回一个future类型的对象,通过这个future对象可以判断任务是否执行成功,并且可以通过future的get()方法来获取返回值,get()方法会阻塞当前线程直到任务完成,而使用 get(long timeout,TimeUnit unit)方法则会阻塞当前线程一段时间后立即返回,这时候有可能任务没有执行完。

那么什么是“非核心线程”呢?

是不是先创建的线程就是核心线程,后创建的就是非核心线程呢?

其实核心线程跟创建的先后没有关系,而是跟工作线程的个数有关,如果当前工作线程的个数大于核心线程数,那么所有的线程都可能是“非核心线程”,都有被回收的可能。

一个线程执行完了一个任务后,会去阻塞队列里面取新的任务,在取到任务之前它就是一个闲置的线程。

取任务的方法有两种,一种是通过 take() 方法一直阻塞直到取出任务,另一种是通过 poll(keepAliveTime,timeUnit) 方法在一定时间内取出任务或者超时,如果超时这个线程就会被回收,请注意核心线程一般不会被回收。

那么怎么保证核心线程不会被回收呢?还是跟工作线程的个数有关,每一个线程在取任务的时候,线程池会比较当前的工作线程个数与核心线程数:

  • 如果工作线程数小于当前的核心线程数,则使用第一种方法取任务,也就是没有超时回收,这时所有的工作线程都是“核心线程”,他们不会被回收;

  • 如果大于核心线程数,则使用第二种方法取任务,一旦超时就回收,所以并没有绝对的核心线程,只要这个线程没有在存活时间内取到任务去执行就会被回收

核心线程一般不会被回收,但是也不是绝对的,如果我们设置了允许核心线程超时被回收的话,那么就没有核心线程这种说法了,所有的线程都会通过 poll(keepAliveTime, timeUnit) 来获取任务,一旦超时获取不到任务,就会被回收,一般很少会这样来使用,除非该线程池需要处理的任务非常少,并且频率也不高,不需要将核心线程一直维持着。

为什是调用任务的 run() 而不是 start()

假设这里是调用的 Runnable 的 start 方法,那会发生什么事情。

如果我们往一个核心、最大线程数为 2 的线程池里丢了 1000 个任务,那么它会额外的创建 1000 个线程,同时每个任务都是异步执行的,一下子就执行完毕了。

从而没法做到由这两个 Worker 线程来调度这 1000 个任务,而只有当做一个同步阻塞的 run() 方法调用时才能满足这个要求。

说说几种常见的线程池及使用场景

  • newFixedThreadPool(固定大小的线程池)
  • newSingleThreadExecutor(单线程线程池)
  • newCachedThreadPool(可缓存线程的线程池)
  • newScheduledThreadPool

newFixedThreadPool

1
2
3
4
5
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
  1. 线程池特点:
  • 核心线程数和最大线程数大小一样
  • keepAliveTime为0
  • 阻塞队列是LinkedBlockingQueue

它是固定大小的线程池,其核心线程数和最大线程数大小一样。并且阻塞队列用的是LinkedBlockingQueue,也就是说线程最大数这个参数失效了基本,所以不会出现非核心线程的存在,所以也可以认为keepAliveTime参数是一个摆设。除非allowCoreThreadTimeOut方法的调用。

  1. 该线程池的工作机制是:
  • 线程数少于核心线程数,也就是设置的线程数时,新建线程执行任务

  • 线程数等于核心线程数后,将任务加入阻塞队列。由于队列容量非常大(Integer.MAX_VALUE),可以一直加加加。(当线程池中的任务比较特殊时,比如关于数据库的长时间的IO操作,可能导致OOM)

  • 执行完任务的线程反复去队列中取任务执行

  1. 适用场景:
    FixedThreadPool 适用于处理CPU密集型的任务,确保CPU在长期被工作线程使用的情况下,尽可能的少的分配线程即可。一般Ncpu+1

newSingleThreadExecutor

1
2
3
4
5
6
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
  1. 线程池特点:
  • 核心线程数和最大线程数大小一样且都是1
  • keepAliveTime为0
  • 阻塞队列是LinkedBlockingQueue
  1. 该线程池的工作机制是:
  • 线程池中没有线程时,新建一个线程执行任务
  • 有一个线程以后,将任务加入阻塞队列,不停加加加。 也有能导致OOM。
  • 唯一的这一个线程不停地去队列里取任务执行
  1. 适用场景:
    SingleThreadExecutor适用于串行执行任务的场景,每个任务必须按顺序执行,不需要并发执行。

newCachedThreadPool

1
2
3
4
5
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
  1. 线程池特点:
  • 核心线程数为0,且最大线程数为Integer.MAX_VALUE,可能会导致OOM
  • 阻塞队列是SynchronousQueue

SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue。

当提交任务的速度大于处理任务的速度时,每次提交一个任务,就必然会创建一个线程。极端情况下会创建过多的线程,耗尽 CPU 和内存资源。由于空闲 60 秒的线程会被终止,长时间保持空闲的 CachedThreadPool 不会占用任何资源。

  1. 该线程池的工作机制是:
  • 没有核心线程,直接向SynchronousQueue中提交任务
  • 如果有空闲线程,就去取出任务执行;如果没有空闲线程,就新建一个
  • 执行完任务的线程有60秒生存时间,如果在这个时间内可以接到新任务,就可以继续活下去,否则就拜拜
  1. 适用场景:
    CachedThreadPool 用于并发执行大量短期的小任务。

newScheduledThreadPool

1
2
3
4
5
6
7
8
9
10
11
12
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}


public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue());
}

private static final long DEFAULT_KEEPALIVE_MILLIS = 10L;
  1. 线程池特点:
  • 最大线程数为Integer.MAX_VALUE,可能会导致OOM
  • 阻塞队列是DelayedWorkQueue

ScheduledThreadPoolExecutor 添加任务提供了另外两个方法:

  • scheduleAtFixedRate() :按某种速率周期执行
  • scheduleWithFixedDelay():在某个延迟后执行

两种方法的内部实现都是创建了一个ScheduledFutureTask对象封装了任务的延迟执行时间及执行周期,并调用decorateTask()方法转成RunnableScheduledFuture对象,然后添加到延迟队列中。
DelayQueue:中封装了一个优先级队列,这个队列会对队列中的ScheduledFutureTask 进行排序,两个任务的执行 time 不同时,time 小的先执行;否则比较添加到队列中的ScheduledFutureTask的顺序号 sequenceNumber ,先提交的先执行。

  1. 该线程池的工作机制是:
  • 调用上面两个方法添加一个任务
  • 线程池中的线程从 DelayQueue 中取任务
  • 然后执行任务

具体执行步骤:

  • 线程从 DelayQueue 中获取 time 大于等于当前时间的 ScheduledFutureTask,DelayQueue.take()
  • 执行完后修改这个 task 的 time 为下次被执行的时间
  • 然后再把这个 task 放回队列中-DelayQueue.add()
  1. 适用场景:
    ScheduledThreadPoolExecutor用于需要多个后台线程执行周期任务,同时需要限制线程数量的场景。

单机上一个线程正在处理服务,如果忽然断电了怎么办(正在处理和阻塞队列里的请求怎么处理)

阻塞队列持久化,正在处理事务控制。断电之后正在处理的回滚,日志恢复该次操作。服务器重启后阻塞队列中的数据再加载

参考资料