剖析源码,带你看懂JUC线程池运行机制
文章目录
1.线程池基础
1.1 概念
借助池化思想,预先构建指定数目的线程用以处理系统任务,其中若干个线程形成一个线程池。池中通常会维护一定数目的线程来处理系统任务,也会根据执行任务的繁忙程度,动态扩增/缩减线程数目,具备动态调整处理系统任务速度的能力。
1.2 作用
1.避免线程频繁创建和销毁,提前创建指定数目的线程用以线程复用
,降低资源开销
以及提升系统响应速度
。
2.便于对线程资源限制和统一管理
,还可以根据任务情况动态实现增减线程
,提升资源利用率
。【这个作用面试经常问】
1.3 创建方式
JDK中创建线程池使用下面这个构造方法来实现的,除了工作窃取线程ForkJoinPool
。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
corePoolSize:核心线程数。 //线程池中一直存在的线程》
maximumPoolSize: 最大线程数。//线程池中最多允许线程
keepAliveTime: 线程空闲存活时间。 //多余空闲线程回收最长等待时间
unit: 时间单位。
workQueue: 工作队列【实现BlokingQueue接口】。 // 存待处理任务
threadFactory: 线程工厂实例
handler:任务队列满了的拒绝策略
1.3.1 构造方法实现
比如new ThreadPoolExecutor() 实现,自定义设置参数,功能灵活,适用于复杂业务场景自定义实现。对使用者要求较高。
1.3.2 Executors类静态方法实现
如下图所示,使用简单,可调整参数少,提供了很多默认实现,适用于场景业务简单。
// 固定线程池,最大和最小线程数相等都是 nThreads
Executors.newFixedThreadPool(int nThreads)
// parallelism 并行处理数,可以不传,默认就是
// 这个Runtime.getRuntime().availableProcessors() CPU处理核数。
Executors.newWorkStealingPool(int parallelism)
// 最大线程数为Integer.MAX_VALUE,最小线程为0,使用很少。
// 线程太多CPU直接跑慢。
Executors.newCachedThreadPool()
// 定时任务线程池,最大线程数为Integer.MAX_VALUE。
Executors.newScheduledThreadPool(int corePoolSize)
// 单个线程构成的线程池
Executors.newSingleThreadExecutor()
1 .4 类型
1.4.1 按线程池数目
1.单线程线程池 newSingleThreadExecutor
2.固定线程线程池 newFixedThreadPool
3.无限线程线程池 newCachedThreadPool
1.4.2 按线程池功能
- 普通线程池ThreadPool
// 上面三种都是普通线程池,通常用于处理IO密集任务,比如大量http请求等- 工作窃取ForkJoinPool
// 主要用来处理CPU密集型任务,使用少量的线程并行处理大量的计算型任务。- 定时任务线程池scheduledThreadPool
// 处理延时任务或者是定期执行的任务。比如日志定期删除,缓存定期删除,消息定时发送等
2.线程池工作原理
多线程处理任务大致流程是,首先要构建线程池,其次提交任务给线程池,线程池接收任务,则自动会创建工作线程并运行,来处理任务,如果任务太多,处理不过来则会通过新建工作线程或者是将任务放如工作队列中。其中最为重要的几个方法分别是,提交任务
execute
,新建工作线程addWorker
,工作线程运行runWorker
这三个方法。
2.1 任务提交 execute
通过JDK21源码来剖析执行机制
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
// ctl使用AtomicInteger 存储的对象,高3为存储线程状态,低29为存储状态下响应线程数
int c = ctl.get();
// 按位与操作计算c低29位的值 得到当前线程数
// 如果当前线程池小于核心线程池则新建线程成功则结束,未成功继续执行
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 线程正在运行 让工作队列加任务,且添加成功
if (isRunning(c) && workQueue.offer(command)) {
// 二次检查
int recheck = ctl.get();
// 线程池不是运行状态 则删除任务。有可能存在使用者直接shutdownNow/shutdown 方法直接停止线程池。
if (! isRunning(recheck) && remove(command))
// 拒绝策略。
reject(command);
// 如果当前线程为0,可能存在重新启动情况线程刚好为空。
else if (workerCountOf(recheck) == 0)
// 新增工作线程
addWorker(null, false);
}
// 工作队列满了,新增工作线程,新增失败则执行拒绝策略。
else if (!addWorker(command, false))
reject(command);
}
2.2 增加工作线程addWorker方法
JDK21源码分析,
/**
* RUNNING -> SHUTDOWN
* On invocation of shutdown()
* (RUNNING or SHUTDOWN) -> STOP
* On invocation of shutdownNow()
* SHUTDOWN -> TIDYING
* When both queue and pool are empty
* STOP -> TIDYING
* When pool is empty
* TIDYING -> TERMINATED
* When the terminated() hook method has completed
*
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
**/
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (int c = ctl.get();;) {
// Check if queue empty only if necessary.
// 当线程池处为空并且工作队列为空,此时不增加线程池。
// 可结合上面注释代码看懂。
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP)
|| firstTask != null
|| workQueue.isEmpty()))
return false;
for (;;) {
// core变量为false 情况下,超过最大线程数直接失败。
if (workerCountOf(c)
>= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
return false;
// cas 机制,只有一个线程成功
if (compareAndIncrementWorkerCount(c))
break retry;
//
c = ctl.get(); // Re-read ctl
// cas 失败 重试。
if (runStateAtLeast(c, SHUTDOWN))
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 创建一个工作线程Worker对象
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
// 获取锁 可能存在多个线程调用了execute方法。
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int c = ctl.get();
// 判断是否是运行状态或者是重启状态
if (isRunning(c) ||
(runStateLessThan(c, STOP) && firstTask == null)) {
// 如果不是新建线程状态抛异常
if (t.getState() != Thread.State.NEW)
throw new IllegalThreadStateException();
// 增加至workers工作线程hashSet实现。
workers.add(w);
workerAdded = true;
int s = workers.size();
// 这工作线程数目则重新赋值。这个是计算当前线程池实际运行的最大线程池数
if (s > largestPoolSize)
largestPoolSize = s;
}
} finally {
//释放锁
mainLock.unlock();
}
if (workerAdded) {
// 底层还是调用Thread.start方法来开启线程
container.start(t);
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
2.3 工作线程运行原理runWorker方法
通过源码来分析:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 当前任务或者工作队列任务不为空。循环处理
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 任务执行前逻辑,这个默认实现为空。
// JUC大佬采用模板模式,先给开发人员占个位置,让开发者可自定义实现。
beforeExecute(wt, task);
try {
// 执行任务
task.run();
// 执行后逻辑,方法体为空。交由开发人员自定制实现
afterExecute(task, null);
} catch (Throwable ex) {
afterExecute(task, ex);
throw ex;
}
} finally {
// 任务清空 GC回收
task = null;
// 线程处理任务数加一
w.completedTasks++;
// 释放锁
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 工作线程回收,如果回收后,发现当前线程小于核心线程,会继续创建一个线程加入线程池中。
processWorkerExit(w, completedAbruptly);
}
}
3.扩展补充
3.1 拒绝策略
JUC默认四种策略:
AbortPolicy: 直接抛异常。默认实现策略。
DiscardPolicy: 默默丢弃,不抛异常。
DiscardOldestPolicy: 使用poll丢弃一个任务。
CallerRunsPolicy: 交由调用者所在线程执行。
自定义拒绝策略:只需要实现RejectedExecutionHandler 这个接口重写rejectedExecution 方法接口即可。一个简单示例如下:
比如com.zaxxer.hikari.pool.HikariPool 实现了自定义注册策略
3.3shutdown 和shutdownNow 的区别
shutdown:
执行后,线程池不会再接收新任务,但是处于工作队列的任务还是要执行完,方法立刻返回,不会等待队列任务完成返回。【异步】
shutdownNow:
线程池不接受新任务,工作队列任务直接丢弃,正在执行任务会直接中断,立刻返回,返回值为队列里面被丢弃的任务列表。
3.4 与ForkJoinPool区别
1.ForkJoinPool 每个线程单独有个队列,ThreadPoolExecutor中只有一个队列。
2.ForkJoinPool 采用双端队列实现,执行任务采用LIFO顺序,窃取任务从尾部获取任务,有效减少任务竞争,当任务只剩一个时候,允许线程之间CAS来竞争获取锁。ThreadPoolExecutor采用FIFO排队方式执行任务。
3.ThreadPoolExecutor适用于IO密集型,ForkJoinPool 适用于CPU密集型,最好是非阻塞任务。
4.小结
本文通过源码分析深度剖析了JUC下线程池运行机制,并对线程池类型和使用做了简单介绍,希望能给大家带来些许帮助,如文中存在错误,可以在评论区交流指正。
原文地址:https://blog.csdn.net/weixin_43629719/article/details/143679878
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!