自学内容网 自学内容网

AQS和ReentrantLock

什么是AQS

一、AQS的核心思想和功能

AQS通过一个FIFO(先进先出)的等待队列来管理多线程对资源的访问。它允许开发者通过继承AQS来创建具体的同步器,并利用AQS提供的模板方法来实现同步逻辑。
功能:AQS主要提供了两部分功能,一是操作state变量(表示资源是否被占用),二是实现排队和阻塞机制。AQS并没有实现任何同步接口,它只是提供方法,调用这些方法可以实现锁和同步器。

也就是说 AQS是一种思想,一种规范

二、AQS的内部机制

状态变量:AQS维护了一个volatile int state变量,用于表示共享资源的状态。state的访问方式有三种:getState()、setState()和compareAndSetState()。
等待队列:AQS还维护了一个FIFO线程等待队列,当多线程争用资源被阻塞时,它们会进入这个队列等待。
CLH队列:AQS基于CLH(Craig, Landin, and Hagersten)队列实现,这是一个虚拟的双向队列,即不存在队列实例,仅存在节点之间的关联关系。AQS将每一条请求共享资源的线程封装成一个CLH锁队列的节点(Node),来实现锁的分配。

三、AQS的同步逻辑

获取锁:
当线程尝试获取锁时,会先调用tryAcquire方法(这是一个抽象方法,由AQS的子类来实现)。
如果获取锁失败,线程会被封装成一个节点,并加入到等待队列中。
然后,线程会在队列中自旋地尝试获取锁,直到成功。
释放锁:
当线程释放锁时,会调用tryRelease方法(这也是一个抽象方法,由AQS的子类来实现)。
释放锁后,AQS会唤醒等待队列中的下一个节点,让它尝试获取锁。

所以说,实现AQS的关键就是维护:state变量+一个FIFO线程等待队列

先学会使用 ReentrantLock

使用 ReentrantLock 实现生产者消费者模式
public class Test {


    public static final ReentrantLock lock = new ReentrantLock();//锁
    public static LinkedList<Integer> list = new LinkedList<>();//缓冲区

    public static void main(String[] args) throws InterruptedException {
        Condition notEmpty = lock.newCondition();//缓冲区不满
        Condition notFull = lock.newCondition();//缓冲区

        Producer producer = new Producer(lock, list, notEmpty, notFull);
        Producer producer1 = new Producer(lock, list, notEmpty, notFull);


        Consumer consumer = new Consumer(lock, list, notEmpty, notFull);
        Consumer consumer1 = new Consumer(lock, list, notEmpty, notFull);

        Thread thread = new Thread(producer);
        thread.start();
        Thread thread1 = new Thread(producer1);
        thread1.start();


        Thread thread2 = new Thread(consumer);
        thread2.start();
        Thread thread3 = new Thread(consumer1);
        thread3.start();

        thread.join();
        thread1.join();
        thread2.join();
        thread3.join();
    }
}

class Producer implements Runnable {

    private ReentrantLock lock;
    private LinkedList<Integer> list;
    private Condition notEmpty;//缓冲区不空的信号
    private Condition notFull;//缓冲区不满的信号

    public Producer(ReentrantLock lock, LinkedList<Integer> list, Condition notEmpty, Condition notFull) {
        this.lock = lock;
        this.list = list;
        this.notEmpty = notEmpty;
        this.notFull = notFull;
    }

    @Override
    public void run() {
        while (true) {
            lock.lock();
            try {
                if (list.size() == 10) {
                    Thread.sleep(500);
                    System.out.println("缓存池已经满了");
                    //生产者等待缓存区不满的信号
                    notFull.await();//这段代码需要倒着看,Producer await notFull--》生产者等待缓存区不满的信号
                } else {
                    Thread.sleep(500);
                    int i = new Random().nextInt(100);
                    list.add(i);
                    System.out.println("生产者" + Thread.currentThread().getName() + "生产了" + i);
                    //生产者通知消费者,缓冲区有元素了
                    notEmpty.signalAll();//这段代码需要倒着看,Producer signalAll notEmpty--》生产者发出缓冲区不空的信号
                }
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } finally {
                lock.unlock();
            }
        }
    }
}

class Consumer implements Runnable {

    private ReentrantLock lock;
    private LinkedList<Integer> list;
    private Condition notEmpty;
    private Condition notFull;


    public Consumer(ReentrantLock lock, LinkedList<Integer> list, Condition notEmpty, Condition notFull) {
        this.lock = lock;
        this.list = list;
        this.notEmpty = notEmpty;
        this.notFull = notFull;
    }

    @Override
    public void run() {
        while (true) {
            lock.lock();
            try {
                if (list.size() == 0) {
                    Thread.sleep(500);
                    System.out.println("缓存池已经空了");
                    //消费者等待缓存区不空的信号
                    notEmpty.await();//这段代码需要倒着看,Consumer await notEmpty--》消费者等待缓存区不空的信号
                } else {
                    Thread.sleep(500);
                    Integer i = list.removeFirst();
                    System.out.println("消费者" + Thread.currentThread().getName() + "消费了" + i);
                    //消费者通知生产者,缓冲区有空间了
                    notFull.signalAll();//这段代码需要倒着看,Consumer signalAll notFull--》消费者发出缓冲区不满的信号
                }
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } finally {
                lock.unlock();
            }
        }
    }
}

ReentrantLock 是如何维护这两个变量的?

第一个线程
   lock.lock();
⬇️
     public void lock() {
        sync.lock();
    }
⬇️非公平的实现是这样的
        final void lock() {
            if (compareAndSetState(0, 1))//进来就去抢锁
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }
公平的是这样的
        final void lock() {
            acquire(1);
        }
⬇️
            if (compareAndSetState(0, 1))就是将 state 将0改成1 
第二个线程
   lock.lock();
⬇️
     public void lock() {
        sync.lock();
    }
⬇️
        final void lock() {
            if (compareAndSetState(0, 1))//进来就去抢锁,但是抢不到
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);//所以肯定走这里
        }
⬇️我们想一下,抢锁失败了,那就需要加入到等待队列了
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
⬇️
这段代码展示了,即便是在并发条件下,也能入队
private Node addWaiter(Node mode) {  
        Node node = new Node(mode);  
        for (;;) {  
            Node oldTail = tail;  
            if (oldTail != null) {  
                node.setPrevRelaxed(oldTail);  
                if (compareAndSetTail(oldTail, node)) {   
                    oldTail.next = node;  
                    return node;  
                }  
            } else {  
                initializeSyncQueue();  
            }  
        }  
    }
⬇️
final boolean acquireQueued(final Node node, int arg) {  
    boolean interrupted = false; // 记录线程是否被中断的标志  
    try {  
        for (;;) {
            // 获取当前节点的前驱节点  
            final Node p = node.predecessor();  
            // 如果前驱节点是头节点,那么就再次的尝试获取锁(即tryAcquire返回true)  
            if (p == head && tryAcquire(arg)) {  
                setHead(node);  
                p.next = null;   
                return interrupted;  
            }  
// 如果当前线程不应该立即挂起(即还有其他线程在竞争锁),或者前驱节点状态不满足挂起条件  
            if (shouldParkAfterFailedAcquire(p, node))    
                interrupted |= parkAndCheckInterrupt();  
        }  
    } catch (Throwable t) {  
        cancelAcquire(node);  
        if (interrupted)  
            selfInterrupt();  
        throw t;  
    }  
}
⬇️shouldParkAfterFailedAcquire的作用是,将当前线程的前驱节点,也就是线程二的前驱节点,也就是线程1的waitstate改成-1
⬇️parkAndCheckInterrupt
    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);//进入阻塞了
        return Thread.interrupted();
    }
                lock.unlock();
⬇️
    public void unlock() {
        sync.release(1);
    }
⬇️
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
⬇️tryRelease() 释放锁就没有必要用cas了,因为已经拿到锁了,不会有竞争的,将state改成0
        @ReservedStackAccess
        protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }
⬇️出队并唤醒下一个等待线程,不过这里注意一下,唤醒等待队列中的线程这句话并不严谨,首先这里面从头到尾其实没有一个具体的队列,是通过一个节点一个节点的prev和next维护的一个链表而已,唤醒下一个等待线程需要先从头节点找到下一个节点,然后对下一个节点进行唤醒,然后将下一个节点设置成头节点(也就是前面说的线程2对应的节点设置成头节点),原先的头节点也就是线程1对应的节点,将prev和next引用都去干净,就会被垃圾回收器干掉
private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        if (ws < 0)
            node.compareAndSetWaitStatus(ws, 0);//将state变成0
        Node s = node.next;//拿到后续节点
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node p = tail; p != node && p != null; p = p.prev)
                if (p.waitStatus <= 0)
                    s = p;
        }
        if (s != null)
            LockSupport.unpark(s.thread);//唤醒后续节点对应的线程
    }

原文地址:https://blog.csdn.net/m0_56356631/article/details/143041885

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!