自学内容网 自学内容网

Java并发 - 线程池

线程的创建是消耗资源的, 而线程间的调度需要频繁依赖CPU的切换,不能频繁创建线程是站在线程池的角度来看的, 如果说创建线程所消耗的资源,在每次使用完后就销毁, 那么下一次使用的时候又得创建,这样造成资源频繁浪费

总体设计

下面是线程池相关类的继承体系

在这里插入图片描述

主要介绍以下几个类:

  1. Executor接口只提供了一个任务执行的抽象。
  2. ExecutorService才是线程池的一个抽象,提供了一些线程池相关的操作
  3. AbstractExecutorService 抽象类,提供一些可供子类复用的方法
  4. ScheduledExecutorService 定义任务相关的实现

常见线程池

FixedThreadPool

该线程池的最大线程数等于核心线程数,所以在默认情况下,该线程池的线程不会因为闲置状态超时而被销毁。如果当前线程数小于核心线程数,并且也有闲置线程的时候提交了任务,这时也不会去复用之前的闲置线程,会创建新的线程去执行任务。如果当前执行任务数大于了核心线程数,大于的部分就会进入队列等待。等着有闲置的线程来执行这个任务。

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

CachedThreadPool

这种线程池内部没有核心线程,线程的数量是有没限制的。在创建任务时,若有空闲的线程时则复用空闲的线程,若没有则新建线程。没有工作的线程闲置状态超过了60s就会销毁。

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

SingleThreadPool

有且仅有一个工作线程执行任务,所有任务按照加入的顺序执行,即遵循队列的入队出队规则

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

ThreadPoolExecutor

最常用的线程池,支持参数配置,提供了四个构造函数,可通过构造函数来进行线程池的配置,下面来解释下各个参数:

核心参数

int corePoolSize 该线程池中核心线程数最大值

核心线程:线程池新建线程的时候,如果当前线程总数小于corePoolSize,则新建的是核心线程,如果超过corePoolSize,则新建的是非核心线程核心线程默认情况下会一直存活在线程池中,即使这个核心线程啥也不干(闲置状态)。如果指定ThreadPoolExecutor的allowCoreThreadTimeOut这个属性为true,那么核心线程如果不干活(处于闲置状态)的话,并且不干活超过一定时间(时长由下面参数决定),就会被销毁掉。

int maximumPoolSize

该线程池中线程总数最大值,线程总数 = 核心线程数 + 非核心线程数。

核心线程数 VS 最大线程数
核心线程数定义了线程池中最小线程数量,即使这些线程处于空闲状态,也不会被销毁。
最大线程数定义了线程池中允许的最大线程数量,最大线程数等于核心线程数 + 临时线程数,最大线程数主要是提供了一种机制来应对突发的高并发请求,当有大量任务的时候,可以创建线程数量的上限。

long keepAliveTime

该线程池中非核心线程闲置超时时长。一个非核心线程,如果不干活(闲置状态)的时长超过这个参数所设定的时长,就会被销毁掉,如果设置allowCoreThreadTimeOut = true,则会作用于核心线程

TimeUnit unit

TimeUnit是一个枚举类型,表示参数keepAliveTime的单位,包括:
NANOSECONDS : 1微毫秒 = 1微秒 / 1000
MICROSECONDS : 1微秒 = 1毫秒 / 1000
MILLISECONDS : 1毫秒 = 1秒 /1000
SECONDS : 秒
MINUTES : 分
HOURS : 小时
DAYS : 天

BlockingQueue workQueue

该线程池中的任务队列,维护着等待执行的Runnable对象。当所有的核心线程都在干活时,新添加的任务会被添加到这个队列中等待处理,如果队列满了,则新建非核心线程执行任务。

ThreadFactory threadFactory

这是一个线程创建工厂接口,new它的时候需要实现Thread newThread(Runnable r)方法

RejectedExecutionHandler handler 拒绝策略

当提交的任务无法被执行时,线程池会采取拒绝策略。拒绝策略用于处理任务的拒绝情况,以确保系统的稳定性和可靠性。以下是一些常见的拒绝策略:

  1. AbortPolicy(默认策略)
    该策略会直接抛出 RejectedExecutionException,终止当前的任务提交。
    适用场景: 当希望立即知道任务被拒绝时可以使用此策略。
ThreadPoolExecutor executor = new ThreadPoolExecutor(
    corePoolSize,
    maximumPoolSize,
    keepAliveTime,
    TimeUnit.SECONDS,
    new ArrayBlockingQueue<>(queueCapacity),
    new ThreadPoolExecutor.AbortPolicy() // 默认策略
);
  1. CallerRunsPolicy
    描述: 该策略会使调用 execute 方法的线程自己执行被拒绝的任务。这意味着任务将会在调用者的线程中执行,而不是在池中的线程中执行。
    适用场景: 适用于希望减轻任务提交压力的场景。
new ThreadPoolExecutor.CallerRunsPolicy()
  1. DiscardPolicy
    描述: 该策略会默默地丢弃被拒绝的任务。这意味着该任务将被忽略,且不会引发任何异常。
    适用场景: 当你不关心被拒绝的任务时,可以使用此策略。
new ThreadPoolExecutor.DiscardPolicy()
  1. DiscardOldestPolicy
    描述: 该策略会丢弃任务队列中最旧的任务,然后尝试提交当前任务。如果队列中没有任务,提交将成功。
    适用场景: 适用于希望优先保留最新任务的场景。
new ThreadPoolExecutor.DiscardOldestPolicy()
  1. 自定义拒绝策略
    描述: 你可以实现 RejectedExecutionHandler 接口来自定义拒绝策略,以满足特定需求。
public class CustomRejectHandler implements RejectedExecutionHandler {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        // 自定义逻辑,例如记录日志、重试任务等
        System.out.println("Task rejected: " + r.toString());
    }
}

// 使用自定义拒绝策略
new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.SECONDS,
                       new ArrayBlockingQueue<>(queueCapacity),
                       new CustomRejectHandler());

工作原理

生产者消费者模型

线程池的工作过程是经典的生产者消费者模型,生产者提交任务,消费者线程获取任务进行消费
在这里插入图片描述

创建线程池

使用下面的代码创建线程池

ThreadPoolExecutor executor = new ThreadPoolExecutor(
5, 5, 0L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));

刚创建出来的线程池中只有一个构造时传入的阻塞队列,里面并没有线程

在这里插入图片描述

如果想要在执行任务之前创建好核心线程数,可以调用ThreadPoolExecutor#prestartCoreThread或者ThreadPoolExecutor#prestartAllCoreThreads方法来预先创建线程,默认是没有线程的。

// 创建单个核心线程
public boolean prestartCoreThread() {
    return workerCountOf(ctl.get()) < corePoolSize &&
        addWorker(null, true);
}
// 启动所有核心线程
public int prestartAllCoreThreads() {
    int n = 0;
    while (addWorker(null, true))
        ++n;
    return n;
}

提交任务

任务提交方式

ThreadPoolExecutor提供了两个主要的方法来提交任务到线程池执行:

  1. void execute(Runnable command) :Executor接口的方法,没有返回值,也不抛出任何已检查的异常
  2. <T> Future<T> submit(Callable<T> task):返回一个结果,并可以抛出已检查的异常
  3. Future<?> submit(Runnable task)
  4. <T> Future<T> submit(Runnable task, T result)

这两个方法的主要区别体现在它们如何处理任务和返回结果上。所有的submit方法内部都是调用execute方法进行任务提交

public Future<?> submit(Runnable task) { // Callable的重载方法也一样
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
}

public <T> Future<T> submit(Callable<T> task) {
     if (task == null) throw new NullPointerException();
     RunnableFuture<T> ftask = newTaskFor(task);
     execute(ftask);
     return ftask;
 }

public <T> Future<T> submit(Runnable task, T result) {
    if (task == null) throw new NullPointerException();
    // FutureTask
    RunnableFuture<T> ftask = newTaskFor(task, result);
    execute(ftask);
    return ftask;
}

submit和execute的区别

异常处理:

  1. 对于 execute() 方法,如果任务在执行过程中抛出了异常,并且你没有为任务设置特定的异常处理器(通过 Thread.UncaughtExceptionHandler),那么这个异常将传递给线程池的默认未捕获异常处理器(如果有的话),否则它将被忽略。
  2. 对于 submit() 方法,如果 Callable 任务在执行过程中抛出了异常,这个异常将被封装到一个 ExecutionException 中,并在调用Future.get()时才抛出。

任务取消

  1. 对于通过execute()提交的任务,你可以使用Future.cancel(boolean mayInterruptIfRunning)方法来尝试取消任务,但这个方法并不保证能够成功取消任务。此外由于execute()不返回 Future 对象,无法直接获取该 Future 对象来调用 cancel() 方法。
  2. 对于通过 submit() 提交的任务,你可以获取返回的 Future 对象并调用其 cancel() 方法来尝试取消任务。如果任务已经开始执行,并且 mayInterruptIfRunning 参数为 true,那么线程将尝试中断任务。

任务提交流程

线程通过execute方法提交了一个任务的流程如下:

  1. 首先会去判断当前线程池的线程数是否小于核心线程数,如果小于,那么就直接通过 ThreadFactory 创建一个线程来执行这个任务,当任务执行完之后,创建的这个线程由于是核心线程不会退出,而是会去阻塞队列中获取任务。接下来如果又提交了一个任务,也会按照上述的步骤去判断是否小于核心线程数,如果小于,还是会创建线程来执行任务,执行完之后也会从阻塞队列中获取任务。这里有个细节,就是提交任务的时候,就算有线程池里的线程从阻塞队列中获取不到任务,如果线程池里的线程数还是小于核心线程数,那么依然会继续创建线程,而不是复用已有的线程。
  2. 如果线程池里的线程数不再小于核心线程数,那么此时就会尝试将任务放入阻塞队列中,入队成功之后,核心线程中阻塞的线程就可以获取到任务了。
  3. 随着任务越来越多,队列已经满了,此时会判断当前线程池里的线程数是否小于最大线程数,也就是入参时的 maximumPoolSize 参数。如果小于最大线程数,那么也会创建非核心线程来执行提交的任务,所以就算队列中有任务,新创建的线程还是会优先处理这个提交的任务,而不是从队列中获取已有的任务执行,所以先提交的任务不一定先执行。
  4. 假如线程数已经达到最大线程数量,此时就会执行拒绝策略,也就是构造线程池的时候传入的RejectedExecutionHandler对象来处理这个任务。

在这里插入图片描述

execute
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();

    int c = ctl.get();
    // 工作线程数量是否小于核心线程
    if (workerCountOf(c) < corePoolSize) {
    // 添加核心线程
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // 判断线程池是否在运行,并且工作队列是否已满
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
        // 执行拒绝策略
            reject(command);
        else if (workerCountOf(recheck) == 0)
        // 如果队列为空,添加非核心线程执行
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
    // 执行拒绝策略
        reject(command);
}
addWorker

firstTask 表示要添加的任务执行逻辑,第二个参数表示是否是核心线程

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:  // 这个是Java的 goto 语法
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry; // 退出循环
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask); // 创建Worker
        final Thread t = w.thread;
        if (t != null) {
        // 加锁
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get()); // 重新检查,保证可见性

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
            // 启动线程,执行run方法
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

Worker队列

ThreadPoolExecutor中存有所有Worker的集合,这里使用的是非线程安全的HashSet,需要配合ThreadPoolExecutor中的mainLock锁使用

private final HashSet<Worker> workers = new HashSet<Worker>();

Worker类是对线程和任务的一个封装,实现了 Runnable, 它基于AQS队列来实现同步

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable {
    /** Thread this worker is running in.  Null if factory fails. */
    final Thread thread;
    /** Initial task to run.  Possibly null. */
    Runnable firstTask;
    /** Per-thread task counter */
    volatile long completedTasks;
}

创建Worker时就会调用ThreadFactory创建线程

Worker(Runnable firstTask) {
    setState(-1); // inhibit interrupts until runWorker
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this);
}

这里可以看到 Worker 才是作为 Thread 的 Runnable 来使用的,也就是说 Thread#start 执行的是 Worker#run 方法

线程运行 runWoker

创建Worker时的Runable会作为第一个任务被线程执行

public void run() {
    runWorker(this);
}

final void runWorker(Worker w) {
// 运行Worker的这个线程
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
    // 创建worker时如果任务为null,则调用getTask()获取任务
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
    // 执行到这里表示任务队列中没了任务,或者线程池关闭了
    // 此时需要将worker从缓存冲清除
        processWorkerExit(w, completedAbruptly);
    }
}

为什么 runWoker 要加锁?

Worker和线程之间都是一对一的关系,除了下面这里,runWorker访问的都是Woker自身的数据,而且下图这里也都是读操作,为什么要加锁呢?

在这里插入图片描述

原因就得从这个条件入手了,它是判断当前线程池是否停止或者当前线程是否被中断

(runStateAtLeast(ctl.get(), STOP) ||
       (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) 
&& !wt.isInterrupted()

如果条件满足,就会中断当前Worker这个线程,并且这里是Worker这个线程自己中断自己,中断后下面还是会继续执行的

在这里插入图片描述

这里主要是针对线程池关闭的场景进行并发控制,后面再讨论

中断后线程还是会继续执行

可以根据下面的代码试一下:

public class Test2 {
    static final AtomicReference<Thread> ar = new AtomicReference<>();
    public static void main(String[] args) {
        Thread t = new Thread(new Runnable() {
            
            public void run() {
                System.out.println(ar.get() == Thread.currentThread());
                Thread.currentThread().interrupt();
                System.out.println("after Thread.currentThread().interrupt()");
            }
        });
        ar.set(t);
        t.start();
while (true) {}
    }
}

获取任务

线程需要从任务队列中不断地取任务执行,实现生产者和消费者模型。这部分由getTask方法实现,其执行流程如下图所示:

在这里插入图片描述

代码实现如下:

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // Are workers subject to culling?
        // wc > corePoolSize 表示是非核心线程
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

销毁工作线程

工作线程Worker会不断接收新任务去执行,当接收不到任务的时候,就会开始被回收。

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        completedTaskCount += w.completedTasks;
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }

    tryTerminate();

    int c = ctl.get();
    if (runStateLessThan(c, STOP)) {
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        addWorker(null, false);
    }
}

此方法做了以下几件事

  1. 将Worker从保存的Worker集合中移除
  2. 切换线程池的状态至TERMINATED,一是SHUTDOWN并且线程池和任务队列为空,二是STOP并且线程池为空

线程池关闭

线程池关闭有以下2种方式:

  1. shutdown:停止接收新任务,但会继续执行已经提交的任务(即正在运行和排队的任务)。当所有任务完成后,线程池会完全关闭。
  2. shutdownNow:该方法会立即停止接受新任务,并尝试中断正在执行的任务。会返回一个列表,包含所有未开始执行的任务。对于正在运行的任务,线程池会尝试中断这些任务,但是否成功取决于任务的实现(任务必须检查中断状态并处理)。

shutdown/shutdownNow

两者具体实现如下,主要区别在于interruptIdleWorkers和interruptWorkers方法

public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
    // 校验是否有关闭线程池的权限,这里主要通过 SecurityManager 校验
    // 当前线程与每个 Worker 线程的 “modifyThread” 权限
        checkShutdownAccess();
        // 修改状态为 SHUTDOWN
        advanceRunState(SHUTDOWN);
        // 关闭所有空闲线程
        interruptIdleWorkers();
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
}

public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        advanceRunState(STOP);
        interruptWorkers();
        tasks = drainQueue();
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
    return tasks;
}

interruptIdleWorkers 和 interruptWorkers

interruptIdleWorkers 意为中断所有空闲的线程,interruptWorkers 就是直接中断所有Worker,不管空闲与否了

private void interruptIdleWorkers() {
    interruptIdleWorkers(false);
}

// 中断可能正在等待任务的线程
private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers) {
            Thread t = w.thread;
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            if (onlyOne)
                break;
        }
    } finally {
        mainLock.unlock();
    }
}

// shutdownNow 调用: 直接调用Worker的interruptIfStarted方法
private void interruptWorkers() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers)
            w.interruptIfStarted();
    } finally {
        mainLock.unlock();
    }
}

为什么 shutdown 调用 interruptIdleWorkers 过程加 3 次锁?

从 interruptIdleWorkers 方法可以看到有两次嵌套的锁操作

从外面shutdown可以看到还有一层嵌套的锁

在这里插入图片描述

首先是最外面加锁,是操作 mainLock,这点其实在源码中有说明

在这里插入图片描述

We also hold mainLock on shutdown and shutdownNow, for the sake of ensuring workers set is stable while separately checking permission to interrupt and actually interrupting.

就是保证在执行shutdown时没有其它线程来操作workers这个集合,也就是关闭时不允许继续添加任务了

那么为什么interruptIdleWorkers方法里面还要操作 mainLock 加锁呢?因为 interruptIdleWorkers 这个方法不只在 shutdown 方法里调用,由于 ReentrantLock 可重入,因此在shutdown这个调用里影响不大

在这里插入图片描述

而 tryTerminate 方法被调用的地方就多了,不仅是线程池关闭,添加任务,删除任务时都会调用

在这里插入图片描述

最后就是为什么要在Worker这个AQS队列上继续加锁

先看一下Worker的AQS相关的方法

protected boolean isHeldExclusively() {
    return getState() != 0;
}

// 参数为unused,从命名也可以知道该参数未被使用
protected boolean tryAcquire(int unused) {
// 通过CAS改变将状态由0改变为1
    if (compareAndSetState(0, 1)) {
    // 设置当前线程独占
        setExclusiveOwnerThread(Thread.currentThread());
        return true;
    }
    return false;
}

protected boolean tryRelease(int unused) {
    setExclusiveOwnerThread(null);
    setState(0);
    return true;
}

public void lock()        { acquire(1); }
public boolean tryLock()  { return tryAcquire(1); }
public void unlock()      { release(1); }
public boolean isLocked() { return isHeldExclusively(); }

void interruptIfStarted() {
    Thread t;
    if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
        try {
            t.interrupt();
        } catch (SecurityException ignore) {
        }
    }
}

tryLock 在源码中只有在 interruptIdleWorkers 方法里才有调用,而 lock 方法只有在 runWorker 方法里才调用

在这里插入图片描述

tryLock方法的作用在于判断Worker的state是不是0,如果是0,说明这个Worker还没有调用lock方法,那么此时它是一个IdleWorker,也就是可以中断。如果tryLock失败,说明Worker已经执行了lock()方法。此时,Worker在While循环中不断获取阻塞队列的任务执行,并且不能在shutdown()方法中中断。

因此,Worker的状态管理实际上通过状态值(0或1)判断Worker是否处于空闲状态。如果Worker处于空闲状态,则可以在线程池关闭时中断它,否则它必须保持在while循环中才能进入阻塞队列,此时任务将被执行,并且在队列中的任务为空之前不会被释放。

如果Worker正在执行任务,因为是个 while 循环,等到任务执行完成后,会再次调用 getTask 方法,getTask 方法里面会先判断线程池状态,这个时候就能感知到线程池关闭了,返回 null,这个 worker 也就默默的退出了。

在这里插入图片描述

这里我们也可以看到shutdown和shutdownNow的区别,如下图所示:

在这里插入图片描述

shutdownNow 是怎么直接中断Worker,并返回未执行的任务的

void interruptIfStarted() {
    Thread t;
    if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
        try {
            t.interrupt();
        } catch (SecurityException ignore) {
        }
    }
}

Worker在创建时候状态为 -1,此时调用interruptIfStarted是中断不了的,只能中断 0 或 1 的状态

在这里插入图片描述

那么什么时候不是-1呢?如果不是 -1,也就是lock了一次,或者调用了 unlock。但是前面说过 lock 方法只在 runWorker 里调用,并且调用了lock说明该Worker正在执行,那不是说明 shutdownNow 中断的都是正在执行的任务了?正在执行任务的线程是不应该被中断的

其实runWorker方法提前调用了一次unlock方法,目的就是将状态从-1改为0,可以理解为线程的就绪态,此时是可以被中断的

在这里插入图片描述

问题总结

为什么 Worker 要实现AQS?

这点在Woker类的注释上有说明:
Class Worker mainly maintains interrupt control state for threads running tasks, along with other minor bookkeeping. This class opportunistically extends AbstractQueuedSynchronizer to simplify acquiring and releasing a lock surrounding each task execution. This protects against interrupts that are intended to wake up a worker thread waiting for a task from instead interrupting a task being run. We implement a simple non-reentrant mutual exclusion lock rather than use ReentrantLock because we do not want worker tasks to be able to reacquire the lock when they invoke pool control methods like setCorePoolSize. Additionally, to suppress interrupts until the thread actually starts running tasks, we initialize lock state to a negative value, and clear it upon start (in runWorker).

因为是要一个不能重入的互斥锁,setCorePoolSize 时通过 interruptIdleWorkers 实现减少当前的Worker功能,如果锁是可以重入的,那么可能会中断当前正在执行的Worker

参考资料:

  1. https://mp.weixin.qq.com/s/baYuX8aCwQ9PP6k7TDl2Ww
  2. https://www.cnblogs.com/thisiswhy/p/15493027.html

原文地址:https://blog.csdn.net/qq_40926260/article/details/142930007

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!