自学内容网 自学内容网

什么是AQS(抽象队列同步器)?

AQSAbstractQueuedSynchronizer的简称,即抽象队列同步器,从字面上可以这样理解:

  • 抽象:抽象类,只实现一些主要逻辑,有些方法由子类实现;
  • 队列:使用先进先出(FIFO)的队列存储数据;
  • 同步:实现了同步的功能

那 AQS 有什么用呢?

        AQS 是一个用来构建锁和同步器的框架,使用 AQS 能简单且高效地构造出应用广泛的同步器,比如我们后面会细讲的ReentrantLock,Semaphore,ReentrantReadWriteLock,SynchronousQueue,FutureTask 等等,都是基于 AQS 的。

AQS 的数据结构

        AQS 内部使用了一个 volatile 的变量 state 来作为资源的标识

/**
 * The synchronization state.
 */
private volatile int state;

同时定义了几个获取和改变 state 的 protected 方法,子类可以覆盖这些方法来实现自己的逻辑:

getState()
setState()
compareAndSetState()

        这三种操作均是原子操作,其中 compareAndSetState 的实现依赖于Unsafe的 compareAndSwapInt() 方法。

        AQS 内部使用了一个先进先出(FIFO)的双端队列,并使用了两个引用 head 和 tail 用于标识队列的头部和尾部。其数据结构如下图所示

 但它并不直接储存线程,而是储存拥有线程的 Node 节点。

AQS 的 Node 节点 

 资源有两种共享模式,或者说两种同步方式:

  • 独占模式(Exclusive):资源是独占的,一次只能有一个线程获取
  • 共享模式(Share):同时可以被多个线程获取,具体的资源个数可以通过参数指定

         一般情况下,子类只需要根据需求实现其中一种模式就可以,当然也有同时实现两种模式的同步类

AQS 中关于这两种资源共享模式的定义源码均在内部类 Node 中。我们来看看 Node 的结构:

static final class Node {
    // 标记一个结点(对应的线程)在共享模式下等待
    static final Node SHARED = new Node();
    // 标记一个结点(对应的线程)在独占模式下等待
    static final Node EXCLUSIVE = null;

    // waitStatus的值,表示该结点(对应的线程)已被取消
    static final int CANCELLED = 1;
    // waitStatus的值,表示后继结点(对应的线程)需要被唤醒
    static final int SIGNAL = -1;
    // waitStatus的值,表示该结点(对应的线程)在等待某一条件
    static final int CONDITION = -2;
    /*waitStatus的值,表示有资源可用,新head结点需要继续唤醒后继结点(共享模式下,多线程并发释放资源,而head唤醒其后继结点后,需要把多出来的资源留给后面的结点;设置新的head结点时,会继续唤醒其后继结点)*/
    static final int PROPAGATE = -3;

    // 等待状态,取值范围,-3,-2,-1,0,1
    volatile int waitStatus;
    volatile Node prev; // 前驱结点
    volatile Node next; // 后继结点
    volatile Thread thread; // 结点对应的线程
    Node nextWaiter; // 等待队列里下一个等待条件的结点


    // 判断共享模式的方法
    final boolean isShared() {
        return nextWaiter == SHARED;
    }

    Node(Thread thread, Node mode) {     // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }

    // 其它方法忽略,可以参考具体的源码
}

// AQS里面的addWaiter私有方法
private Node addWaiter(Node mode) {
    // 使用了Node的这个构造函数
    Node node = new Node(Thread.currentThread(), mode);
    // 其它代码省略
}

这里面的 waitStatus 是用来标记当前节点的状态的,它有以下几种状态:

  • CANCELLED:表示当前节点(对应的线程)已被取消。当等待超时或被中断,会触发进入为此状态,进入该状态后节点状态不再变化;
  • SIGNAL:后面节点等待当前节点唤醒;
  • CONDITION:Condition中使用,当前线程阻塞在Condition,如果其他线程调用了Condition的signal方法,这个节点将从等待队列转移到同步队列队尾,等待获取同步锁;
  • PROPAGATE:共享模式,前置节点唤醒后面节点后,唤醒操作无条件传播下去;
  • 0:中间状态,当前节点后面的节点已经唤醒,但是当前节点线程还没有执行完成。

通过 Node 我们可以实现两种队列: 

1)一是通过 prev 和 next 实现 CLH(Craig, Landin, and Hagersten)队列(线程同步队列、双向队列)。

        在 CLH 锁中,每个等待的线程都会有一个关联的 Node,每个 Node 有一个 prev 和 next 指针。当一个线程尝试获取锁并失败时,它会将自己添加到队列的尾部并自旋,等待前一个节点的线程释放锁。类似下面这样。

public class CLHLock {
    private volatile Node tail;
    private ThreadLocal<Node> myNode = ThreadLocal.withInitial(Node::new);
    private ThreadLocal<Node> myPred = new ThreadLocal<>();

    public void lock() {
        Node node = myNode.get();
        node.locked = true;
        // 把自己放到队尾,并取出前面的节点
        Node pred = tail;
        myPred.set(pred);
        while (pred.locked) {
            // 自旋等待
        }
    }

    public void unlock() {
        Node node = myNode.get();
        node.locked = false;
        myNode.set(myPred.get());
    }

    private static class Node {
        private volatile boolean locked;
    }
}

2)二是通过 nextWaiter 实现 Condition上的等待线程队列(单向队列),这个 Condition 主要用在 ReentrantLock类中。


原文地址:https://blog.csdn.net/qq_62636650/article/details/140559870

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!