自学内容网 自学内容网

剖析源码,带你看懂JUC线程池运行机制

1.线程池基础

1.1 概念

借助池化思想,预先构建指定数目的线程用以处理系统任务,其中若干个线程形成一个线程池。池中通常会维护一定数目的线程来处理系统任务,也会根据执行任务的繁忙程度,动态扩增/缩减线程数目,具备动态调整处理系统任务速度的能力。

1.2 作用

1.避免线程频繁创建和销毁,提前创建指定数目的线程用以线程复用降低资源开销以及提升系统响应速度
2.便于对线程资源限制和统一管理,还可以根据任务情况动态实现增减线程提升资源利用率。【这个作用面试经常问】

1.3 创建方式

JDK中创建线程池使用下面这个构造方法来实现的,除了工作窃取线程ForkJoinPool

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) 

corePoolSize:核心线程数。 //线程池中一直存在的线程》
maximumPoolSize: 最大线程数。//线程池中最多允许线程
keepAliveTime: 线程空闲存活时间。 //多余空闲线程回收最长等待时间
unit: 时间单位。
workQueue: 工作队列【实现BlokingQueue接口】。 // 存待处理任务
threadFactory: 线程工厂实例
handler:任务队列满了的拒绝策略

1.3.1 构造方法实现

比如new ThreadPoolExecutor() 实现,自定义设置参数,功能灵活,适用于复杂业务场景自定义实现。对使用者要求较高。

1.3.2 Executors类静态方法实现

如下图所示,使用简单,可调整参数少,提供了很多默认实现,适用于场景业务简单。

// 固定线程池,最大和最小线程数相等都是 nThreads
Executors.newFixedThreadPool(int nThreads)
// parallelism 并行处理数,可以不传,默认就是
// 这个Runtime.getRuntime().availableProcessors() CPU处理核数。
Executors.newWorkStealingPool(int parallelism)
// 最大线程数为Integer.MAX_VALUE,最小线程为0,使用很少。
// 线程太多CPU直接跑慢。
Executors.newCachedThreadPool()
// 定时任务线程池,最大线程数为Integer.MAX_VALUE。
Executors.newScheduledThreadPool(int corePoolSize)
// 单个线程构成的线程池
Executors.newSingleThreadExecutor()

1 .4 类型

1.4.1 按线程池数目

1.单线程线程池 newSingleThreadExecutor
2.固定线程线程池 newFixedThreadPool
3.无限线程线程池 newCachedThreadPool

1.4.2 按线程池功能
  1. 普通线程池ThreadPool
    // 上面三种都是普通线程池,通常用于处理IO密集任务,比如大量http请求等
  2. 工作窃取ForkJoinPool
    // 主要用来处理CPU密集型任务,使用少量的线程并行处理大量的计算型任务。
  3. 定时任务线程池scheduledThreadPool
    // 处理延时任务或者是定期执行的任务。比如日志定期删除,缓存定期删除,消息定时发送等

2.线程池工作原理

多线程处理任务大致流程是,首先要构建线程池,其次提交任务给线程池,线程池接收任务,则自动会创建工作线程并运行,来处理任务,如果任务太多,处理不过来则会通过新建工作线程或者是将任务放如工作队列中。其中最为重要的几个方法分别是,提交任务execute,新建工作线程addWorker,工作线程运行 runWorker 这三个方法。

2.1 任务提交 execute

通过JDK21源码来剖析执行机制

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        // ctl使用AtomicInteger 存储的对象,高3为存储线程状态,低29为存储状态下响应线程数  
        int c = ctl.get();
        // 按位与操作计算c低29位的值 得到当前线程数
        // 如果当前线程池小于核心线程池则新建线程成功则结束,未成功继续执行
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        // 线程正在运行 让工作队列加任务,且添加成功
        if (isRunning(c) && workQueue.offer(command)) {
            // 二次检查
            int recheck = ctl.get();
            // 线程池不是运行状态 则删除任务。有可能存在使用者直接shutdownNow/shutdown 方法直接停止线程池。
            if (! isRunning(recheck) && remove(command))
            // 拒绝策略。
                reject(command);
            // 如果当前线程为0,可能存在重新启动情况线程刚好为空。
            else if (workerCountOf(recheck) == 0)
            // 新增工作线程
                addWorker(null, false);
        }
        // 工作队列满了,新增工作线程,新增失败则执行拒绝策略。
        else if (!addWorker(command, false))
            reject(command);
    }

2.2 增加工作线程addWorker方法

JDK21源码分析,

/**
     * RUNNING -> SHUTDOWN
     *    On invocation of shutdown()
     * (RUNNING or SHUTDOWN) -> STOP
     *    On invocation of shutdownNow()
     * SHUTDOWN -> TIDYING
     *    When both queue and pool are empty
     * STOP -> TIDYING
     *    When pool is empty
     * TIDYING -> TERMINATED
     *    When the terminated() hook method has completed
     *
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;
**/
    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (int c = ctl.get();;) {
            // Check if queue empty only if necessary.
            // 当线程池处为空并且工作队列为空,此时不增加线程池。
            // 可结合上面注释代码看懂。
            if (runStateAtLeast(c, SHUTDOWN)
                && (runStateAtLeast(c, STOP)
                    || firstTask != null
                    || workQueue.isEmpty()))
                return false;

            for (;;) {
            // core变量为false 情况下,超过最大线程数直接失败。
                if (workerCountOf(c)
                    >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
                    return false;
                // cas 机制,只有一个线程成功    
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                // 
                c = ctl.get();  // Re-read ctl
                // cas 失败 重试。
                if (runStateAtLeast(c, SHUTDOWN))
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
        // 创建一个工作线程Worker对象
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                // 获取锁 可能存在多个线程调用了execute方法。
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int c = ctl.get();
// 判断是否是运行状态或者是重启状态
                    if (isRunning(c) ||
                        (runStateLessThan(c, STOP) && firstTask == null)) {
                        // 如果不是新建线程状态抛异常
                        if (t.getState() != Thread.State.NEW)
                            throw new IllegalThreadStateException();
                        // 增加至workers工作线程hashSet实现。
                        workers.add(w);
                        workerAdded = true;
                        int s = workers.size();
                        // 这工作线程数目则重新赋值。这个是计算当前线程池实际运行的最大线程池数
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                    }
                } finally {
                //释放锁
                    mainLock.unlock();
                }
                if (workerAdded) {
                // 底层还是调用Thread.start方法来开启线程
                    container.start(t);
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

2.3 工作线程运行原理runWorker方法

通过源码来分析:

    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
        // 当前任务或者工作队列任务不为空。循环处理
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                 // 任务执行前逻辑,这个默认实现为空。
                 // JUC大佬采用模板模式,先给开发人员占个位置,让开发者可自定义实现。
                    beforeExecute(wt, task);
                    try {
                    // 执行任务
                        task.run();
                        // 执行后逻辑,方法体为空。交由开发人员自定制实现
                        afterExecute(task, null);
                    } catch (Throwable ex) {
                        afterExecute(task, ex);
                        throw ex;
                    }
                } finally {
                // 任务清空 GC回收
                    task = null;
                    // 线程处理任务数加一
                    w.completedTasks++;
                    //  释放锁
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
        // 工作线程回收,如果回收后,发现当前线程小于核心线程,会继续创建一个线程加入线程池中。
            processWorkerExit(w, completedAbruptly);
        }
    }

3.扩展补充

3.1 拒绝策略

JUC默认四种策略
AbortPolicy: 直接抛异常。默认实现策略。
DiscardPolicy: 默默丢弃,不抛异常。
DiscardOldestPolicy: 使用poll丢弃一个任务。
CallerRunsPolicy: 交由调用者所在线程执行。

自定义拒绝策略:只需要实现RejectedExecutionHandler 这个接口重写rejectedExecution 方法接口即可。一个简单示例如下:
比如com.zaxxer.hikari.pool.HikariPool 实现了自定义注册策略
在这里插入图片描述

3.3shutdown 和shutdownNow 的区别

shutdown
执行后,线程池不会再接收新任务,但是处于工作队列的任务还是要执行完,方法立刻返回,不会等待队列任务完成返回。【异步】
shutdownNow
线程池不接受新任务,工作队列任务直接丢弃,正在执行任务会直接中断,立刻返回,返回值为队列里面被丢弃的任务列表。

3.4 与ForkJoinPool区别

1.ForkJoinPool 每个线程单独有个队列,ThreadPoolExecutor中只有一个队列。
2.ForkJoinPool 采用双端队列实现,执行任务采用LIFO顺序,窃取任务从尾部获取任务,有效减少任务竞争,当任务只剩一个时候,允许线程之间CAS来竞争获取锁。ThreadPoolExecutor采用FIFO排队方式执行任务。
3.ThreadPoolExecutor适用于IO密集型,ForkJoinPool 适用于CPU密集型,最好是非阻塞任务。

4.小结

本文通过源码分析深度剖析了JUC下线程池运行机制,并对线程池类型和使用做了简单介绍,希望能给大家带来些许帮助,如文中存在错误,可以在评论区交流指正。


原文地址:https://blog.csdn.net/weixin_43629719/article/details/143679878

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!