c语言做网站后台服务,科技网站建设方案,推广型的网站怎么做,做网站框架浏览时怎么变长图解java.util.concurrent并发包源码系列——Condition条件等待队列深入详解 Condition的作用Condition的原理Condition源码Condition的定义和Condition对象的获取await方法addConditionWaiter方法unlinkCancelledWaiters方法 fullyRelease方法isOnSyncQueue方法checkInterrupt… 图解java.util.concurrent并发包源码系列——Condition条件等待队列深入详解 Condition的作用Condition的原理Condition源码Condition的定义和Condition对象的获取await方法addConditionWaiter方法unlinkCancelledWaiters方法 fullyRelease方法isOnSyncQueue方法checkInterruptWhileWaiting方法reportInterruptAfterWait signal方法signalAll方法 总结 往期文章
人人都能看懂的图解java.util.concurrent并发包源码系列 ThreadPoolExecutor线程池图解java.util.concurrent并发包源码系列原子类、CAS、AtomicLong、AtomicStampedReference一套带走图解java.util.concurrent并发包源码系列——LongAdder图解java.util.concurrent并发包源码系列——深入理解AQS看完可以吊打面试官图解java.util.concurrent并发包源码系列——深入理解ReentrantLock看完可以吊打面试官图解java.util.concurrent并发包源码系列——深入理解ReentrantReadWriteLock读写锁看完可以吊打面试官
Condition的作用
Condition是Java并发包提供的一个条件等待队列工具类它具有让已获取到锁的线程当所需资源不满足的时候主动释放锁进入条件等待队列的能力与Object的wait方法作用类似。
我们可以通过ReentrantLock的newCondition方法或者ReentrantReadWriteLock中WriteLock的newCondition方法获取Condition对象。
ReentrantLock与Condition的关系相当于是synchronized关键字和Object#wait方法的关系。我们在调用Object的wait方法之前必须先获取到synchronized锁的。相对应的在调用Condition的await方法前必须要先获取到ReentrantLock的锁。
我们看一个生产者消费者的例子了解Condition的具体作用。
/*** 生产者消费者例子* Created by huangjunyi on 2023/8/11.*/
public class ProviderConsumerDemo {private ReentrantLock reentrantLock;private Condition notEmpty;private Condition notFull;private LinkedListInteger queue;private int size;private int capacity;private Provider provider;private Consumer consumer;public ProviderConsumerDemo(int capacity) {this.reentrantLock new ReentrantLock();this.notEmpty reentrantLock.newCondition();this.notFull reentrantLock.newCondition();this.queue new LinkedList();this.capacity capacity;this.size 0;this.provider new Provider();this.consumer new Consumer();}class Provider {public void push(Integer num) {try {reentrantLock.lock();// 如果queue已经满了生产者在notFull条件队列中等待while (size capacity) notFull.await();queue.addLast(num);size;// 唤醒在notEmpty条件队列中等待的消费者notEmpty.signal();} catch (Exception e) {e.printStackTrace();throw new RuntimeException(生产者发生异常);} finally {reentrantLock.unlock();}}}class Consumer {public Integer pull() {try {reentrantLock.lock();// 如果queue已经空了消费者在notEmpty条件队列中等待while (size 0) notEmpty.await();Integer num queue.removeFirst();size--;// 唤醒在notFull条件队列中等待的生成者notFull.signal();return num;} catch (Exception e) {e.printStackTrace();throw new RuntimeException(消费者发生异常);} finally {reentrantLock.unlock();}}}public Provider provider() {return this.provider;}public Consumer consumer() {return this.consumer;}public static void main(String[] args) throws InterruptedException {ProviderConsumerDemo providerConsumerDemo new ProviderConsumerDemo(5);Provider provider providerConsumerDemo.provider();Consumer consumer providerConsumerDemo.consumer();Thread providerThread new Thread(() - {int num 0;for (int i 0; i 10000; i) {provider.push(num);}});providerThread.start();Thread consumerThread new Thread(() - {for (int i 0; i 10000; i) {Integer num consumer.pull();System.out.println(num);}});consumerThread.start();providerThread.join();consumerThread.join();}}notEmpty和notFull是Condition类型的条件等待队列通过调用ReentrantLock的newCondition()方法生成。
当生产者想要往queue中放入元素时发现queue的容量已经满了那么就会调用notFull的await方法在条件队列中进行等待。当生产者成功往queue中放入元素后就会调用notEmpty的signal()方法唤醒notEmpty条件等待队列中的消费者。当消费者想要从queue中获取元素时发现queue已经空了那么就会调用notEmpty的await方法在条件队列中进行等待。当消费者成功从queue中获取到元素后就会调用notFull的signal()方法唤醒notFull条件等待队列中的生产者。
这样就实现了一个生成者消费者的功能。 Condition的原理
Condition其实就是一个用于存放因某种资源不充足而处于等待状态的线程的一个队列。比如上面的生产者线程等待queue队列的空间不满好让它能够往queue中放入它要放的元素那么就可以调用Condition的await方法把该生产者线程放入在Condition内部的队列中进行等待。
一个线程被转移到Condition时会被封装为一个Node节点放入到Condition内部的队列中这与AQS的逻辑是相似的。不同点是AQS队列是双向链表而Condition队列是单向链表。然后Node节点的waitStatus属性固定是CONDITION-2。
Condition条件等待队列有一个firstWaiter头指针和一个lastWaiter尾指针。
当前线程调用Condition的await方法时必须是已经获取到锁的然后它需要释放锁释放锁的时候会唤醒AQS队列中的下一个节点如当前线程没有获取到锁就调用Condition的await方法在尝试释放锁时就会抛异常。
其他线程做完自己的操作之后如果它自己的操作会使得某个Condition队列中的线程所等待的资源又充足时可以调用这个Condition的signal方法唤醒Condition队列中的一个线程或者调用Condition的signalAll方法唤醒Condition队列中的所有线程。
比如上面例子的消费者由于它的消费使得queue又有空间了那么它可以唤醒在Condition队列等待queue有空间的生产者线程。 Condition源码
Condition的定义和Condition对象的获取
Condition接口
public interface Condition {void await() throws InterruptedException;void awaitUninterruptibly();long awaitNanos(long nanosTimeout) throws InterruptedException;boolean await(long time, TimeUnit unit) throws InterruptedException;boolean awaitUntil(Date deadline) throws InterruptedException;void signal();void signalAll();
}可以看到Condition接口除了普通的等待和唤醒方法还提供了不响应中断默认响应中断和带超时机制的等待方法。
而在AQS的内部就定义了一个实现了Condition接口的内部类
public class ConditionObject implements Condition, java.io.Serializable {private transient Node firstWaiter;private transient Node lastWaiter;
}可以看到ConditionObject内部带了头指针firstWaiter和尾指针lastWaiter。 通过ReentrantLock的newCondition方法可以获取到ConditionObject对象。
ReentrantLock#newCondition public Condition newCondition() {return sync.newCondition();}ReentrantLock.Sync#newCondition final ConditionObject newCondition() {return new ConditionObject();}await方法 public final void await() throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();// 封装为Node节点放入Condition队列中Node node addConditionWaiter();// 释放锁int savedState fullyRelease(node);int interruptMode 0;// 如果当前节点不在AQS同步队列中那么就一直循环parkwhile (!isOnSyncQueue(node)) {// park挂起当前线程LockSupport.park(this);// 唤醒后检查是否被中断记录一下中断标记interruptMode方便后续处理if ((interruptMode checkInterruptWhileWaiting(node)) ! 0)break;}// 在AQS的同步队列等待重新获取锁如果在此期间再次被中断则次记录标记interruptModeif (acquireQueued(node, savedState) interruptMode ! THROW_IE)interruptMode REINTERRUPT;// 清除条件等待队列中被取消的节点if (node.nextWaiter ! null)unlinkCancelledWaiters();// 对中断的统一处理if (interruptMode ! 0)reportInterruptAfterWait(interruptMode);}首先要把当前线程封装成一个Node然后把该Node放入到Condition的条件队列中。Node入队列后当前线程要释放所有获取的锁。释放完所有的锁后就一直while循环检查当前线程对应的节点是否已经被挪到了AQS的同步队列当中如果已经挪入了其他线程调用Condition的signal方法会把Condition队列中的一个节点挪到AQS同步队列中那么跳出循环否则就把当前线程挂起。每次线程醒来时都要检查一下自己是否被中断了如果是要记录一个中断标记interruptMode方便后续处理。节点被挪入到AQS同步队列后当前线程就要等待重新获取锁。获取到锁后再检查一下自己是否被中断了如果是则更新一下中断标记。获取到锁后会尝试清除一遍条件队列中已被中断的节点。最后对中断的情况进行处理 addConditionWaiter方法
addConditionWaiter方法的作用是往条件等待队列添加一个节点。 private Node addConditionWaiter() {Node t lastWaiter;// 如果尾节点不是CONDITION状态的那么清理一遍队列然后再次获取尾节点if (t ! null t.waitStatus ! Node.CONDITION) {unlinkCancelledWaiters();t lastWaiter;}// 创建一个新节点waitStatus状态值为CONDITIONNode node new Node(Thread.currentThread(), Node.CONDITION);// t尾节点为null表示队列为空当前节点作为头节点if (t null)firstWaiter node;// 队列不为空就入队列尾部elset.nextWaiter node;lastWaiter node;return node;}addConditionWaiter方法的大体思路就是拿到一个waitStatus状态值为CONDITION的节点或者null然后把当前线程封装为一个Node节点waitStatus设置为CONDITION然后放入队列尾部队列不为空或者设置为队列头节点队列为空。 unlinkCancelledWaiters方法
unlinkCancelledWaiters()方法的作用是清理条件等待队列。每个进入条件等待队列中的节点的waitStatus属性原本都是CONDITION但是随着队列中某些节点的线程被中断等待它的waitStatus属性就不是CONDITION了那么这些waitStatus属性不是CONDITION的节点是不需要的自然要清理出队列。 private void unlinkCancelledWaiters() {// t指针指向后面一个节点Node t firstWaiter;// trail指针指向前面一个节点Node trail null;while (t ! null) {Node next t.nextWaiter;// t指针发现waitStatus不为CONDITION的节点if (t.waitStatus ! Node.CONDITION) {t.nextWaiter null;if (trail null)// 要断连的节点是头节点那么更新头节点为t指针指向的节点的下一个节点firstWaiter next;else// trail指针指向的节点的nextWaiter指针指向t指针指向的节点的下一个节点trail.nextWaiter next;if (next null)// 没有后续节点了把trail指针当前所指的节点设置为尾节点lastWaiter trail;}elsetrail t;t next;}}两个指针一前一后地遍历队列t是后一个节点的指针trail是前一个节点的指针。当t指针遇到waitStatus属性不为CONDITION的节点的时候就把trail节点指向t节点的下一个节点next这样t指针指向的节点自然断连出队列后续会被GC回收。而如果要断连的节点刚好是头节点那么就要更新头节点为要断连的节点的下一个节点。 fullyRelease方法
fullyRelease方法用于进入了条件等待队列的节点对应的线程释放锁资源。 final int fullyRelease(Node node) {boolean failed true;try {int savedState getState();// 调用AQS的release方法一口气全部释放if (release(savedState)) {failed false;// 释放了多少返回多少后面会再次获取return savedState;} else {throw new IllegalMonitorStateException();}} finally {if (failed)node.waitStatus Node.CANCELLED;}}int savedState getState(); 获取state遍历然后再调用 release(savedState) 一口气把自己获取的所有锁资源全部释放。然后释放了多少方法的返回值就是多少后续重新获取锁时就获取多少。 isOnSyncQueue方法
isOnSyncQueue方法是用于判断当前node节点是否在AQS同步队列中 final boolean isOnSyncQueue(Node node) {// waitStatus 属性还是CONDITION 或者prev指针是空代表不在AQS同步队列中因为AQS同步队列中的节点是没有CONDITION 状态的而且AQS队列是一个双向链表prev是不会为null的if (node.waitStatus Node.CONDITION || node.prev null)return false;// node的next指针不为null那么node肯定再AQS队列中因为只有再AQS同步队列中才会用next指针指向下一个节点在Condition条件队列中用的是nextWaiter指针if (node.next ! null) return true;// node的prev指针不为null也不代表当前节点就在AQS队列中因为把一个节点放入AQS同步队列是通过CAS完成的但是CAS有可能会失败所以这里调用findNodeFromTail方法从尾部开始寻找确保node节点确实已经在AQS同步队列中了。return findNodeFromTail(node);}首先判断当前节点如果状态是CONDITION 那么肯定不在AQS同步队列中因为AQS同步队列中的节点是没有CONDITION 状态的。 如果当前节点的prev指针是null那么也不可能在AQS队列中。因为AQS同步队列是一个双向链表而node节点是从链表尾部进去的如果它再AQS同步队列中prev指针是不可能为null的。 如果node节点的next指针不为null那么肯定再AQS同步队列中。因为只有AQS同步队列中的节点才会用next指针记录下一个节点在Condition条件等待队列中的节点是用nextWaiter指针指向下一个节点的。 当这里还不能确定当前节点已经在AQS队列中即使当前节点的prev指针不为null但是一个节点进入AQS队列是通过CAS放进去的而CAS是有可能失败的所以还要调findNodeFromTail方法从尾部开始寻找确保当前节点已经进入了AQS同步队列。 private boolean findNodeFromTail(Node node) {Node t tail;for (;;) {if (t node)// 找到了返回truereturn true;if (t null)// 没有了返回falsereturn false;// 通过prev指针网球遍历t t.prev;}}findNodeFromTail方法就是从尾部开始通过prev指针往前遍历直到找到为止或者遍历完毕。 checkInterruptWhileWaiting方法
checkInterruptWhileWaiting方法用于检查当前线程是否被中断也就是取消等待。如果当前线程被中断了要设置对应的标志方便后续处理。 private int checkInterruptWhileWaiting(Node node) {return Thread.interrupted() ?(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :0;}Thread.interrupted()获取当前线程中断标志位这个方法被调用后当前线程的Thread对象的中断标志位就会被复位。
transferAfterCancelledWait(node) final boolean transferAfterCancelledWait(Node node) {// compareAndSetWaitStatus尝试把当前节点的waitStatus状态改为0// 如果修改成功表示当前节点处于Condition条件等待队列中被中断。// 如果CAS不成功表示当前节点的waitStatus状态不为CONDITION当前节点不在Condition队列中也不是在条件队列中被中断那么当前节点就是在signal之后被中断。if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {enq(node);return true;}// 到这里说明当前节点node是在signal之后被中断的确保节点已经在AQS队列中之后返回false。while (!isOnSyncQueue(node))Thread.yield();return false;}这里中断分两种情况一种当前节点还是CONDITION状态在Condition队列中被中断另一种情况就是被其他线程调用了signal方法修改了当前node节点的waitStatus然后当前node节点才被中断。
分这两种情况是因为后面对于这两种情况的处理是不同的如果是在Condition队列中被中断的那么后续要抛出中断异常。如果是被别的线程调用了signal方法修改了当前node节点的waitStatus后才中断当前线程由于这里通过Thread.interrupted()检查当前线程是否被中断时把当前线程的中断标志位复位了这里只需要把中断标志位重新置位即可。 reportInterruptAfterWait private void reportInterruptAfterWait(int interruptMode)throws InterruptedException {// 在Condition条件队列中被打断的抛异常if (interruptMode THROW_IE)throw new InterruptedException();// 被别的线程signal之后才被打断的中断标志位重新置位else if (interruptMode REINTERRUPT)selfInterrupt();}static void selfInterrupt() {Thread.currentThread().interrupt();}如果是在Condition条件队列中被打断的那么前面记录的中断标记interruptMode就是THROW_IE这里要抛出异常。如果是被别的线程signal之后才被打断的中断标志位重新置位即可。两个分支没进那么就是没有被中断。 signal方法
signal方法用于唤醒等待在Condition条件队列中的线程。 public final void signal() {// 非持有锁的线程调signal方法会抛出异常if (!isHeldExclusively())throw new IllegalMonitorStateException();Node first firstWaiter;if (first ! null)doSignal(first);}首先判断如果当前线程不是持有锁的线程那么会抛出一个异常。如果当前线程是持有锁的线程那么会调用doSignal方法。 private void doSignal(Node first) {do {if ( (firstWaiter first.nextWaiter) null)lastWaiter null;first.nextWaiter null;// 调用transferForSignal(first)方法转移当前节点到AQS同步队列中成功转移一次就退出循环} while (!transferForSignal(first) (first firstWaiter) ! null);}doSignal方法里面就是拿到Condition条件队列中的头节点转移到AQS同步队列中转移成功就退出循环方法结束。 final boolean transferForSignal(Node node) {// 转移前先修改节点状态if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))return false;// 节点如AQS同步队列Node p enq(node);// enq(node)会返回node入队后的前驱节点// 如果前驱节点的waitStatus属性是CANCELLED状态或者CAS修改前驱节点waitStatus属性不成功那么就直接唤醒node节点的线程// 否则node节点的线程将会由AQS中该节点的前驱节点的线程释放锁后唤醒int ws p.waitStatus;if (ws 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))LockSupport.unpark(node.thread);return true;}signalAll方法
signalAll方法与signal方法功能相同区别是signal方法只转一个节点到AQS队列中而signalAll方法则是转移所有节点到AQS队列中。 public final void signalAll() {// 当前线程没有获取锁抛异常if (!isHeldExclusively())throw new IllegalMonitorStateException();Node first firstWaiter;if (first ! null)doSignalAll(first);}如果发现当选线程没有获取到锁就调用了signalAll方法那么也是抛一个异常。否则就调用doSignalAll方法。 private void doSignalAll(Node first) {lastWaiter firstWaiter null;do {Node next first.nextWaiter;first.nextWaiter null;transferForSignal(first);first next;} while (first ! null);}可以看到也是一个do-while循环但是这个循环不是转移成功一个节点就退出而是直到没有节点可以转移为止。 总结
Condition的源码到此就分析完毕了Condition的核心逻辑就是把调用了await方法的线程封装为Node节点放入到条件队列中释放掉该线程获取的所有锁资源然后挂起等待其他线程调用signal方法或者signalAll方法把他移入AQS同步队列中然后将其唤醒。