AQS和ReentrantLock
什么是AQS
一、AQS的核心思想和功能
AQS通过一个FIFO(先进先出)的等待队列来管理多线程对资源的访问。它允许开发者通过继承AQS来创建具体的同步器,并利用AQS提供的模板方法来实现同步逻辑。
功能:AQS主要提供了两部分功能,一是操作state变量(表示资源是否被占用),二是实现排队和阻塞机制。AQS并没有实现任何同步接口,它只是提供方法,调用这些方法可以实现锁和同步器。
也就是说 AQS是一种思想,一种规范
二、AQS的内部机制
状态变量:AQS维护了一个volatile int state变量,用于表示共享资源的状态。state的访问方式有三种:getState()、setState()和compareAndSetState()。
等待队列:AQS还维护了一个FIFO线程等待队列,当多线程争用资源被阻塞时,它们会进入这个队列等待。
CLH队列:AQS基于CLH(Craig, Landin, and Hagersten)队列实现,这是一个虚拟的双向队列,即不存在队列实例,仅存在节点之间的关联关系。AQS将每一条请求共享资源的线程封装成一个CLH锁队列的节点(Node),来实现锁的分配。
三、AQS的同步逻辑
获取锁:
当线程尝试获取锁时,会先调用tryAcquire方法(这是一个抽象方法,由AQS的子类来实现)。
如果获取锁失败,线程会被封装成一个节点,并加入到等待队列中。
然后,线程会在队列中自旋地尝试获取锁,直到成功。
释放锁:
当线程释放锁时,会调用tryRelease方法(这也是一个抽象方法,由AQS的子类来实现)。
释放锁后,AQS会唤醒等待队列中的下一个节点,让它尝试获取锁。
所以说,实现AQS的关键就是维护:state变量+一个FIFO线程等待队列
先学会使用 ReentrantLock
使用 ReentrantLock 实现生产者消费者模式
public class Test {
public static final ReentrantLock lock = new ReentrantLock();//锁
public static LinkedList<Integer> list = new LinkedList<>();//缓冲区
public static void main(String[] args) throws InterruptedException {
Condition notEmpty = lock.newCondition();//缓冲区不满
Condition notFull = lock.newCondition();//缓冲区
Producer producer = new Producer(lock, list, notEmpty, notFull);
Producer producer1 = new Producer(lock, list, notEmpty, notFull);
Consumer consumer = new Consumer(lock, list, notEmpty, notFull);
Consumer consumer1 = new Consumer(lock, list, notEmpty, notFull);
Thread thread = new Thread(producer);
thread.start();
Thread thread1 = new Thread(producer1);
thread1.start();
Thread thread2 = new Thread(consumer);
thread2.start();
Thread thread3 = new Thread(consumer1);
thread3.start();
thread.join();
thread1.join();
thread2.join();
thread3.join();
}
}
class Producer implements Runnable {
private ReentrantLock lock;
private LinkedList<Integer> list;
private Condition notEmpty;//缓冲区不空的信号
private Condition notFull;//缓冲区不满的信号
public Producer(ReentrantLock lock, LinkedList<Integer> list, Condition notEmpty, Condition notFull) {
this.lock = lock;
this.list = list;
this.notEmpty = notEmpty;
this.notFull = notFull;
}
@Override
public void run() {
while (true) {
lock.lock();
try {
if (list.size() == 10) {
Thread.sleep(500);
System.out.println("缓存池已经满了");
//生产者等待缓存区不满的信号
notFull.await();//这段代码需要倒着看,Producer await notFull--》生产者等待缓存区不满的信号
} else {
Thread.sleep(500);
int i = new Random().nextInt(100);
list.add(i);
System.out.println("生产者" + Thread.currentThread().getName() + "生产了" + i);
//生产者通知消费者,缓冲区有元素了
notEmpty.signalAll();//这段代码需要倒着看,Producer signalAll notEmpty--》生产者发出缓冲区不空的信号
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
}
}
}
class Consumer implements Runnable {
private ReentrantLock lock;
private LinkedList<Integer> list;
private Condition notEmpty;
private Condition notFull;
public Consumer(ReentrantLock lock, LinkedList<Integer> list, Condition notEmpty, Condition notFull) {
this.lock = lock;
this.list = list;
this.notEmpty = notEmpty;
this.notFull = notFull;
}
@Override
public void run() {
while (true) {
lock.lock();
try {
if (list.size() == 0) {
Thread.sleep(500);
System.out.println("缓存池已经空了");
//消费者等待缓存区不空的信号
notEmpty.await();//这段代码需要倒着看,Consumer await notEmpty--》消费者等待缓存区不空的信号
} else {
Thread.sleep(500);
Integer i = list.removeFirst();
System.out.println("消费者" + Thread.currentThread().getName() + "消费了" + i);
//消费者通知生产者,缓冲区有空间了
notFull.signalAll();//这段代码需要倒着看,Consumer signalAll notFull--》消费者发出缓冲区不满的信号
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
}
}
}
ReentrantLock 是如何维护这两个变量的?
第一个线程
lock.lock();
⬇️
public void lock() {
sync.lock();
}
⬇️非公平的实现是这样的
final void lock() {
if (compareAndSetState(0, 1))//进来就去抢锁
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
公平的是这样的
final void lock() {
acquire(1);
}
⬇️
if (compareAndSetState(0, 1))就是将 state 将0改成1
第二个线程
lock.lock();
⬇️
public void lock() {
sync.lock();
}
⬇️
final void lock() {
if (compareAndSetState(0, 1))//进来就去抢锁,但是抢不到
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);//所以肯定走这里
}
⬇️我们想一下,抢锁失败了,那就需要加入到等待队列了
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
⬇️
这段代码展示了,即便是在并发条件下,也能入队
private Node addWaiter(Node mode) {
Node node = new Node(mode);
for (;;) {
Node oldTail = tail;
if (oldTail != null) {
node.setPrevRelaxed(oldTail);
if (compareAndSetTail(oldTail, node)) {
oldTail.next = node;
return node;
}
} else {
initializeSyncQueue();
}
}
}
⬇️
final boolean acquireQueued(final Node node, int arg) {
boolean interrupted = false; // 记录线程是否被中断的标志
try {
for (;;) {
// 获取当前节点的前驱节点
final Node p = node.predecessor();
// 如果前驱节点是头节点,那么就再次的尝试获取锁(即tryAcquire返回true)
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null;
return interrupted;
}
// 如果当前线程不应该立即挂起(即还有其他线程在竞争锁),或者前驱节点状态不满足挂起条件
if (shouldParkAfterFailedAcquire(p, node))
interrupted |= parkAndCheckInterrupt();
}
} catch (Throwable t) {
cancelAcquire(node);
if (interrupted)
selfInterrupt();
throw t;
}
}
⬇️shouldParkAfterFailedAcquire的作用是,将当前线程的前驱节点,也就是线程二的前驱节点,也就是线程1的waitstate改成-1
⬇️parkAndCheckInterrupt
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);//进入阻塞了
return Thread.interrupted();
}
lock.unlock();
⬇️
public void unlock() {
sync.release(1);
}
⬇️
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
⬇️tryRelease() 释放锁就没有必要用cas了,因为已经拿到锁了,不会有竞争的,将state改成0
@ReservedStackAccess
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
⬇️出队并唤醒下一个等待线程,不过这里注意一下,唤醒等待队列中的线程这句话并不严谨,首先这里面从头到尾其实没有一个具体的队列,是通过一个节点一个节点的prev和next维护的一个链表而已,唤醒下一个等待线程需要先从头节点找到下一个节点,然后对下一个节点进行唤醒,然后将下一个节点设置成头节点(也就是前面说的线程2对应的节点设置成头节点),原先的头节点也就是线程1对应的节点,将prev和next引用都去干净,就会被垃圾回收器干掉
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
node.compareAndSetWaitStatus(ws, 0);//将state变成0
Node s = node.next;//拿到后续节点
if (s == null || s.waitStatus > 0) {
s = null;
for (Node p = tail; p != node && p != null; p = p.prev)
if (p.waitStatus <= 0)
s = p;
}
if (s != null)
LockSupport.unpark(s.thread);//唤醒后续节点对应的线程
}
原文地址:https://blog.csdn.net/m0_56356631/article/details/143041885
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!