专门做电子书的网站,免费网站建站工具,苏州网站建设选苏州梦易行,银行网站维护是做哪些转自#xff1a;
Java 7 种阻塞队列详解 - 云社区 - 腾讯云队列#xff08;Queue#xff09;是一种经常使用的集合。Queue 实际上是实现了一个先进先出#xff08;FIFO#xff1a;First In First Out#xff09;的有序表。和 List、Set ...https://cloud.tencent.com/de…转自
Java 7 种阻塞队列详解 - 云社区 - 腾讯云队列Queue是一种经常使用的集合。Queue 实际上是实现了一个先进先出FIFOFirst In First Out的有序表。和 List、Set ...https://cloud.tencent.com/developer/article/1706970 队列和阻塞队列
队列
队列Queue是一种经常使用的集合。Queue 实际上是实现了一个先进先出FIFOFirst In First Out的有序表。和 List、Set 一样都继承自 Collection。它和 List 的区别在于List可以在任意位置添加和删除元素而Queue 只有两个操作
把元素添加到队列末尾从队列头部取出元素。
超市的收银台就是一个队列 我们常用的 LinkedList 就可以当队列使用实现了 Dequeue 接口还有 ConcurrentLinkedQueue他们都属于非阻塞队列。
阻塞队列
阻塞队列顾名思义首先它是一个队列而一个阻塞队列在数据结构中所起的作用大致如下 线程 1 往阻塞队列中添加元素而线程 2 从阻塞队列中移除元素
当阻塞队列是空时从队列中获取元素的操作将会被阻塞。当阻塞队列是满时从队列中添加元素的操作将会被阻塞。
试图从空的阻塞队列中获取元素的线程将会阻塞直到其他的线程往空的队列插入新的元素同样试图往已满的阻塞队列添加新元素的线程同样也会阻塞直到其他的线程从列中移除一个或多个元素或者完全清空队列后继续新增。 类似我们去海底捞排队海底捞爆满情况下阻塞队列相当于用餐区用餐区满了的话就阻塞在候客区等着可以用餐的话 put 一波去用餐吃完就 take 出去。 为什么要用阻塞队列有什么好处吗
在多线程领域所谓阻塞是指在某些情况下会挂起线程即阻塞一旦条件满足被挂起的线程又会自动被唤醒。
那为什么需要 BlockingQueue 呢
好处是我们不需要关心什么时候需要阻塞线程什么时候需要唤醒线程因为这些 BlockingQueue 都包办了。
在 concurrent 包发布以前多线程环境下我们每个程序员都必须自己去实现这些细节尤其还要兼顾效率和线程安全这会给我们的程序带来不小的复杂性。现在有了阻塞队列我们的操作就从手动挡换成了自动挡。
Java 里的阻塞队列 Collection的子类除了我们熟悉的 List 和 Set还有一个 Queue阻塞队列 BlockingQueue 继承自 Queue。
BlockingQueue 是个接口需要使用它的实现之一来使用 BlockingQueuejava.util.concurrent 包下具有以下 BlockingQueue 接口的实现类
JDK 提供了 7 个阻塞队列。分别是
ArrayBlockingQueue 一个由数组结构组成的有界阻塞队列LinkedBlockingQueue 一个由链表结构组成的有界阻塞队列PriorityBlockingQueue 一个支持优先级排序的无界阻塞队列DelayQueue一个使用优先级队列实现的无界阻塞队列SynchronousQueue一个不存储元素的阻塞队列LinkedTransferQueue一个由链表结构组成的无界阻塞队列实现了继承于 BlockingQueue 的 TransferQueueLinkedBlockingDeque一个由链表结构组成的双向阻塞队列
BlockingQueue 核心方法
相比 Queue 接口BlockingQueue 有四种形式的 API。 方法类型 抛出异常 返回特殊值 一直阻塞 超时退出 插入 add(e) offer(e) put(e) offer(e,time,unit) 移除取出 remove() poll() take() poll(time,unit) 检查 element() peek() 不可用 不可用
以 ArrayBlockingQueue 为例来看下 Java 阻塞队列提供的常用方法
抛出异常当阻塞队列满时再往队列里 add 插入元素会抛出 java.lang.IllegalStateException: Queue full 异常当队列为空时从队列里 remove 移除元素时会抛出 NoSuchElementException 异常 。element()返回队列头部的元素如果队列为空则抛出一个 NoSuchElementException 异常返回特殊值offer()插入方法成功返回 true失败返回 falsepoll()移除方法成功返回出队列的元素队列里没有则返回 nullpeek() 返回队列头部的元素如果队列为空则返回 null一直阻塞当阻塞队列满时如果生产线程继续往队列里 put 元素队列会一直阻塞生产线程直到拿到数据或者响应中断退出当阻塞队列空时消费线程试图从队列里 take 元素队列也会一直阻塞消费线程直到队列可用。超时退出当阻塞队列满时队列会阻塞生产线程一定时间如果超过一定的时间生产线程就会退出返回 false当阻塞队列空时队列会阻塞消费线程一定时间如果超过一定的时间消费线程会退出返回 nullBlockingQueue 实现类
逐个分析下这 7 个阻塞队列常用的几个顺便探究下源码。
ArrayBlockingQueue
ArrayBlockingQueue一个由数组实现的有界阻塞队列。该队列采用先进先出FIFO的原则对元素进行排序添加的。
ArrayBlockingQueue 为有界且固定其大小在构造时由构造函数来决定确认之后就不能再改变了。
ArrayBlockingQueue 支持对等待的生产者线程和使用者线程进行排序的可选公平策略但是在默认情况下不保证线程公平的访问在构造时可以选择公平策略fair true。公平性通常会降低吞吐量但是减少了可变性和避免了“不平衡性”。ArrayBlockingQueue 内部的阻塞队列是通过 ReentrantLock 和 Condition 条件队列实现的 所以 ArrayBlockingQueue 中的元素存在公平和非公平访问的区别
所谓公平访问队列是指阻塞的所有生产者线程或消费者线程当队列可用时可以按照阻塞的先后顺序访问队列即先阻塞的生产者线程可以先往队列里插入元素先阻塞的消费者线程可以先从队列里获取元素可以保证先进先出避免饥饿现象。
源码解读
public class ArrayBlockingQueueE extends AbstractQueueEimplements BlockingQueueE, java.io.Serializable {// 通过数组来实现的队列final Object[] items;//记录队首元素的下标int takeIndex;//记录队尾元素的下标int putIndex;//队列中的元素个数int count;//通过ReentrantLock来实现同步final ReentrantLock lock;//有2个条件对象分别表示队列不为空和队列不满的情况private final Condition notEmpty;private final Condition notFull;//迭代器transient Itrs itrs;//offer方法用于向队列中添加数据public boolean offer(E e) {// 可以看出添加的数据不支持null值checkNotNull(e);final ReentrantLock lock this.lock;//通过重入锁来实现同步lock.lock();try {//如果队列已经满了的话直接就返回false不会阻塞调用这个offer方法的线程if (count items.length)return false;else {//如果队列没有满就调用enqueue方法将元素添加到队列中enqueue(e);return true;}} finally {lock.unlock();}}//多了个等待时间的 offer方法public boolean offer(E e, long timeout, TimeUnit unit)throws InterruptedException {checkNotNull(e);long nanos unit.toNanos(timeout);final ReentrantLock lock this.lock;//获取可中断锁lock.lockInterruptibly();try {while (count items.length) {if (nanos 0)return false;//等待设置的时间nanos notFull.awaitNanos(nanos);}//如果等待时间过了队列有空间的话就会调用enqueue方法将元素添加到队列enqueue(e);return true;} finally {lock.unlock();}}//将数据添加到队列中的具体方法private void enqueue(E x) {// assert lock.getHoldCount() 1;// assert items[putIndex] null;final Object[] items this.items;items[putIndex] x;//通过循环数组实现的队列当数组满了时下标就变成0了if (putIndex items.length)putIndex 0;count;//激活因为notEmpty条件而阻塞的线程比如调用take方法的线程notEmpty.signal();}//将数据从队列中取出的方法private E dequeue() {// assert lock.getHoldCount() 1;// assert items[takeIndex] ! null;final Object[] items this.items;SuppressWarnings(unchecked)E x (E) items[takeIndex];//将对应的数组下标位置设置为null释放资源items[takeIndex] null;if (takeIndex items.length)takeIndex 0;count--;if (itrs ! null)itrs.elementDequeued();//激活因为notFull条件而阻塞的线程比如调用put方法的线程notFull.signal();return x;}//put方法和offer方法不一样的地方在于如果队列是满的话它就会把调用put方法的线程阻塞直到队列里有空间public void put(E e) throws InterruptedException {checkNotNull(e);final ReentrantLock lock this.lock;//因为后面调用了条件变量的await()方法而await()方法会在中断标志设置后抛出InterruptedException异常后退出// 所以在加锁时候先看中断标志是不是被设置了如果设置了直接抛出InterruptedException异常就不用再去获取锁了lock.lockInterruptibly();try {while (count items.length)//如果队列满的话就阻塞等待直到notFull的signal方法被调用也就是队列里有空间了notFull.await();//队列里有空间了执行添加操作enqueue(e);} finally {lock.unlock();}}//poll方法用于从队列中取数据不会阻塞当前线程public E poll() {final ReentrantLock lock this.lock;lock.lock();try {//如果队列为空的话会直接返回null否则调用dequeue方法取数据return (count 0) ? null : dequeue();} finally {lock.unlock();}}//有等待时间的 poll 重载方法public E poll(long timeout, TimeUnit unit) throws InterruptedException {long nanos unit.toNanos(timeout);final ReentrantLock lock this.lock;lock.lockInterruptibly();try {while (count 0) {if (nanos 0)return null;nanos notEmpty.awaitNanos(nanos);}return dequeue();} finally {lock.unlock();}}//take方法也是用于取队列中的数据但是和poll方法不同的是它有可能会阻塞当前的线程public E take() throws InterruptedException {final ReentrantLock lock this.lock;lock.lockInterruptibly();try {//当队列为空时就会阻塞当前线程while (count 0)notEmpty.await();//直到队列中有数据了调用dequeue方法将数据返回return dequeue();} finally {lock.unlock();}}//返回队首元素public E peek() {final ReentrantLock lock this.lock;lock.lock();try {return itemAt(takeIndex); // null when queue is empty} finally {lock.unlock();}}//获取队列的元素个数加了锁所以结果是准确的public int size() {final ReentrantLock lock this.lock;lock.lock();try {return count;} finally {lock.unlock();}}// 此外还有一些其他方法//返回队列剩余空间还能加几个元素public int remainingCapacity() {final ReentrantLock lock this.lock;lock.lock();try {return items.length - count;} finally {lock.unlock();}}// 判断队列中是否存在当前元素opublic boolean contains(Object o){}// 返回一个按正确顺序包含队列中所有元素的数组public Object[] toArray(){}// 自动清空队列中的所有元素public void clear(){}// 移除队列中所有可用元素并将他们加入到给定的 Collection 中 public int drainTo(Collection? super E c){}// 返回此队列中按正确顺序进行迭代的包含所有元素的迭代器public IteratorE iterator()
}
LinkedBlockingQueue
LinkedBlockingQueue 是一个用单向链表实现的有界阻塞队列。此队列的默认和最大长度为 Integer.MAX_VALUE。此队列按照先进先出的原则对元素进行排序。
如果不是特殊业务LinkedBlockingQueue 使用时切记要定义容量 new LinkedBlockingQueue(capacity)
防止过度膨胀。
源码解读
public class LinkedBlockingQueueE extends AbstractQueueEimplements BlockingQueueE, java.io.Serializable {private static final long serialVersionUID -6903933977591709194L;// 基于链表实现肯定要有结点类典型的单链表结构static class NodeE {E item;NodeE next;Node(E x) { item x; }}//容量private final int capacity;//当前队列元素数量private final AtomicInteger count new AtomicInteger();// 头节点不存数据transient NodeE head;// 尾节点便于入队private transient NodeE last;// take锁出队锁只有takepoll方法会持有private final ReentrantLock takeLock new ReentrantLock();// 出队等待条件// 当队列无元素时take锁会阻塞在notEmpty条件上等待其它线程唤醒private final Condition notEmpty takeLock.newCondition();// 入队锁只有putoffer会持有private final ReentrantLock putLock new ReentrantLock();// 入队等待条件// 当队列满了时put锁会会阻塞在notFull上等待其它线程唤醒private final Condition notFull putLock.newCondition();//同样提供三个构造器public LinkedBlockingQueue(int capacity) {if (capacity 0) throw new IllegalArgumentException();// 初始化head和last指针为空值节点this.capacity capacity;last head new NodeE(null);}public LinkedBlockingQueue() {// 如果没传容量就使用最大int值初始化其容量this(Integer.MAX_VALUE);}public LinkedBlockingQueue(Collection? extends E c) {}//入队public void put(E e) throws InterruptedException {// 不允许null元素if (e null) throw new NullPointerException();//规定给当前put方法预留一个本地变量int c -1;// 新建一个节点NodeE node new NodeE(e);final ReentrantLock putLock this.putLock;final AtomicInteger count this.count;// 使用put锁加锁putLock.lockInterruptibly();try {// 如果队列满了就阻塞在notFull条件上// 等待被其它线程唤醒while (count.get() capacity) {notFull.await();}// 队列不满了就入队enqueue(node);// 队列长度加1c count.getAndIncrement();// 如果现队列长度小于容量// 就再唤醒一个阻塞在notFull条件上的线程// 这里为啥要唤醒一下呢// 因为可能有很多线程阻塞在notFull这个条件上的// 而取元素时只有取之前队列是满的才会唤醒notFull// 为什么队列满的才唤醒notFull呢// 因为唤醒是需要加putLock的这是为了减少锁的次数// 所以这里索性在放完元素就检测一下未满就唤醒其它notFull上的线程// 说白了这也是锁分离带来的代价if (c 1 capacity)notFull.signal();} finally {// 释放锁putLock.unlock();}// 如果原队列长度为0现在加了一个元素后立即唤醒notEmpty条件if (c 0)signalNotEmpty();}private void signalNotEmpty() {final ReentrantLock takeLock this.takeLock;// 加take锁takeLock.lock();try {// 唤醒notEmpty条件notEmpty.signal();} finally {takeLock.unlock();}}private void signalNotFull() {final ReentrantLock putLock this.putLock;putLock.lock();try {notFull.signal();} finally {putLock.unlock();}}private void enqueue(NodeE node) {// 直接加到last后面last last.next node;}public boolean offer(E e) {//用带过期时间的说明}public boolean offer(E e, long timeout, TimeUnit unit)throws InterruptedException {if (e null) throw new NullPointerException();//转换为纳秒long nanos unit.toNanos(timeout);int c -1;final ReentrantLock putLock this.putLock;final AtomicInteger count this.count;//获取入队锁支持等待锁的过程中被中断putLock.lockInterruptibly();try {//队列满了再看看有没有超时while (count.get() capacity) {if (nanos 0)//等待时间超时return false;//进行等待awaitNanos(long nanos)是AQS中的方法//在等待过程中如果被唤醒或超时则继续当前循环//如果被中断则抛出中断异常nanos notFull.awaitNanos(nanos);}//进入队尾enqueue(new NodeE(e));c count.getAndIncrement();//说明当前元素后面还能再插入一个//就唤醒一个入队条件队列中阻塞的线程if (c 1 capacity)notFull.signal();} finally {putLock.unlock();}//节点数量为0说明队列是空的if (c 0)//唤醒一个出队条件队列阻塞的线程signalNotEmpty();return true;}public E take() throws InterruptedException {E x;int c -1;final AtomicInteger count this.count;final ReentrantLock takeLock this.takeLock;takeLock.lockInterruptibly();try {// 如果队列无元素则阻塞在notEmpty条件上while (count.get() 0) {notEmpty.await();}// 否则出队x dequeue();// 获取出队前队列的长度c count.getAndDecrement();// 如果取之前队列长度大于1则唤醒notEmptyif (c 1)notEmpty.signal();} finally {takeLock.unlock();}// 如果取之前队列长度等于容量// 则唤醒notFullif (c capacity)signalNotFull();return x;}private E dequeue() {NodeE h head;NodeE first h.next;h.next h; // help GChead first;E x first.item;first.item null;return x;}public E poll(long timeout, TimeUnit unit) throws InterruptedException {E x null;int c -1;long nanos unit.toNanos(timeout);final AtomicInteger count this.count;final ReentrantLock takeLock this.takeLock;takeLock.lockInterruptibly();try {while (count.get() 0) {//队列为空且已经超时直接返回空if (nanos 0)return null;//等待过程中可能被唤醒超时中断nanos notEmpty.awaitNanos(nanos);}//进行出队操作x dequeue();c count.getAndDecrement();if (c 1)notEmpty.signal();} finally {takeLock.unlock();}//如果出队前队列是满的则唤醒一个被take()阻塞的线程if (c capacity)signalNotFull();return x;}public E poll() {//}public E peek() {if (count.get() 0)return null;final ReentrantLock takeLock this.takeLock;takeLock.lock();try {NodeE first head.next;if (first null)return null;elsereturn first.item;} finally {takeLock.unlock();}}void unlink(NodeE p, NodeE trail) {// assert isFullyLocked();// p.next is not changed, to allow iterators that are// traversing p to maintain their weak-consistency guarantee.p.item null;trail.next p.next;if (last p)last trail;if (count.getAndDecrement() capacity)notFull.signal();}public boolean remove(Object o) {if (o null) return false;fullyLock();try {for (NodeE trail head, p trail.next;p ! null;trail p, p p.next) {if (o.equals(p.item)) {unlink(p, trail);return true;}}return false;} finally {fullyUnlock();}}public boolean contains(Object o) {}static final class LBQSpliteratorE implements SpliteratorE {}
}
LinkedBlockingQueue 与 ArrayBlockingQueue 对比
ArrayBlockingQueue 入队出队采用一把锁导致入队出队相互阻塞效率低下LinkedBlockingQueue 入队出队采用两把锁入队出队互不干扰效率较高二者都是有界队列如果长度相等且出队速度跟不上入队速度都会导致大量线程阻塞LinkedBlockingQueue 如果初始化不传入初始容量则使用最大 int 值如果出队速度跟不上入队速度会导致队列特别长占用大量内存
PriorityBlockingQueue
PriorityBlockingQueue 是一个支持优先级的无界阻塞队列。(虽说是无界队列但是由于资源耗尽的话也会OutOfMemoryError无法添加元素)
默认情况下元素采用自然顺序升序排列。也可以自定义类实现 compareTo() 方法来指定元素排序规则或者初始化 PriorityBlockingQueue 时指定构造参数 Comparator 来对元素进行排序。但需要注意的是不能保证同优先级元素的顺序。PriorityBlockingQueue 是基于最小二叉堆实现使用基于 CAS 实现的自旋锁来控制队列的动态扩容保证了扩容操作不会阻塞 take 操作的执行。
DelayQueue
DelayQueue 是一个使用优先级队列实现的延迟无界阻塞队列。
队列使用 PriorityQueue 来实现。队列中的元素必须实现 Delayed 接口在创建元素时可以指定多久才能从队列中获取当前元素。只有在延迟期满时才能从队列中提取元素。我们可以将 DelayQueue 运用在以下应用场景
缓存系统的设计可以用 DelayQueue 保存缓存元素的有效期使用一个线程循环查询 DelayQueue一旦能从 DelayQueue 中获取元素时表示缓存有效期到了。定时任务调度。使用 DelayQueue 保存当天将会执行的任务和执行时间一旦从 DelayQueue 中获取到任务就开始执行从比如 Timer 就是使用 DelayQueue 实现的。
SynchronousQueue
SynchronousQueue 是一个不存储元素的阻塞队列也即是单个元素的队列。
每一个 put 操作必须等待一个 take 操作否则不能继续添加元素。SynchronousQueue 可以看成是一个传球手负责把生产者线程处理的数据直接传递给消费者线程。队列本身并不存储任何元素非常适合于传递性场景, 比如在一个线程中使用的数据传递给另外一个线程使用SynchronousQueue 的吞吐量高于 LinkedBlockingQueue 和 ArrayBlockingQueue。
Coding
synchronousQueue 是一个没有数据缓冲的阻塞队列生产者线程对其的插入操作 put() 必须等待消费者的移除操作 take()反过来也一样。
对应 peek, contains, clear, isEmpty ... 等方法其实是无效的。
但是 poll() 和 offer() 就不会阻塞举例来说就是 offer 的时候如果有消费者在等待那么就会立马满足返回 true如果没有就会返回 false不会等待消费者到来。
public class SynchronousQueueDemo {public static void main(String[] args) {BlockingQueueString queue new SynchronousQueue();//System.out.println(queue.offer(aaa)); //false//System.out.println(queue.poll()); //nullSystem.out.println(queue.add(bbb)); //IllegalStateException: Queue fullnew Thread(()-{try {System.out.println(Thread 1 put a);queue.put(a);System.out.println(Thread 1 put b);queue.put(b);System.out.println(Thread 1 put c);queue.put(c);} catch (InterruptedException e) {e.printStackTrace();}}).start();new Thread(()-{try {TimeUnit.SECONDS.sleep(2);System.out.println(Thread 2 get:queue.take());TimeUnit.SECONDS.sleep(2);System.out.println(Thread 2 get:queue.take());TimeUnit.SECONDS.sleep(2);System.out.println(Thread 2 get:queue.take());} catch (InterruptedException e) {e.printStackTrace();}}).start();}
}
Thread 1 put a
Thread 2 get:a
Thread 1 put b
Thread 2 get:b
Thread 1 put c
Thread 2 get:c
源码解读
不像ArrayBlockingQueue、LinkedBlockingDeque之类的阻塞队列依赖AQS实现并发操作SynchronousQueue直接使用CAS实现线程的安全访问。
synchronousQueue 提供了两个构造器公平与否内部是通过 Transferer 来实现的具体分为两个Transferer分别是 TransferStack 和 TransferQueue。
TransferStack非公平竞争模式使用的数据结构是后进先出栈(LIFO Stack)
TransferQueue公平竞争模式则使用先进先出队列FIFO Queue
性能上两者是相当的一般情况下FIFO 通常可以支持更大的吞吐量但 LIFO 可以更大程度的保持线程的本地化。
private transient volatile TransfererE transferer;public SynchronousQueue() {this(false);
}public SynchronousQueue(boolean fair) {transferer fair ? new TransferQueueE() : new TransferStackE();
}
分析 TransferQueue 的实现
//构造函数中会初始化一个出队的节点并且首尾都指向这个节点
TransferQueue() {QNode h new QNode(null, false); // initialize to dummy node.head h;tail h;
}
//队列节点,
static final class QNode {volatile QNode next; // next node in queuevolatile Object item; // CASed to or from nullvolatile Thread waiter; // to control park/unparkfinal boolean isData;QNode(Object item, boolean isData) {this.item item;this.isData isData;}// 设置next和item的值用于进行并发更新, cas 无锁操作boolean casNext(QNode cmp, QNode val) {return next cmp UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);}boolean casItem(Object cmp, Object val) {return item cmp UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);}void tryCancel(Object cmp) {UNSAFE.compareAndSwapObject(this, itemOffset, cmp, this);}boolean isCancelled() {return item this;}boolean isOffList() {return next this;}// Unsafe mechanicsprivate static final sun.misc.Unsafe UNSAFE;private static final long itemOffset;private static final long nextOffset;static {try {UNSAFE sun.misc.Unsafe.getUnsafe();Class? k QNode.class;itemOffset UNSAFE.objectFieldOffset(k.getDeclaredField(item));nextOffset UNSAFE.objectFieldOffset(k.getDeclaredField(next));} catch (Exception e) {throw new Error(e);}}
}
从 put() 方法和 take() 方法可以看出最终调用的都是 TransferQueue 的 transfer() 方法。
public void put(E e) throws InterruptedException {if (e null) throw new NullPointerException();if (transferer.transfer(e, false, 0) null) {Thread.interrupted();throw new InterruptedException();}
}public E take() throws InterruptedException {E e transferer.transfer(null, false, 0);if (e ! null)return e;Thread.interrupted();throw new InterruptedException();
}//transfer方法用于提交数据或者是获取数据
E transfer(E e, boolean timed, long nanos) {QNode s null; // constructed/reused as needed//如果e不为null就说明是添加数据的入队操作boolean isData (e ! null);for (;;) {QNode t tail;QNode h head;if (t null || h null) // saw uninitialized valuecontinue; // spin//如果当前操作和 tail 节点的操作是一样的或者头尾相同表明队列中啥都没有。if (h t || t.isData isData) { // empty or same-modeQNode tn t.next;// 如果 t 和 tail 不一样说明tail 被其他的线程改了重来if (t ! tail) // inconsistent readcontinue;// 如果 tail 的 next 不是空。就需要将 next 追加到 tail 后面了if (tn ! null) { // lagging tail// 使用 CAS 将 tail.next 变成 tail,advanceTail(t, tn);continue;}// 时间到了不等待返回 null插入失败获取也是失败的if (timed nanos 0) // cant waitreturn null;if (s null)s new QNode(e, isData);if (!t.casNext(null, s)) // failed to link incontinue;advanceTail(t, s); // swing tail and waitObject x awaitFulfill(s, e, timed, nanos);if (x s) { // wait was cancelledclean(t, s);return null;}if (!s.isOffList()) { // not already unlinkedadvanceHead(t, s); // unlink if headif (x ! null) // and forget fieldss.item s;s.waiter null;}return (x ! null) ? (E)x : e;} else { // complementary-modeQNode m h.next; // node to fulfillif (t ! tail || m null || h ! head)continue; // inconsistent readObject x m.item;if (isData (x ! null) || // m already fulfilledx m || // m cancelled!m.casItem(x, e)) { // lost CASadvanceHead(h, m); // dequeue and retrycontinue;}advanceHead(h, m); // successfully fulfilledLockSupport.unpark(m.waiter);return (x ! null) ? (E)x : e;}}
}
LinkedTransferQueue
LinkedTransferQueue 是一个由链表结构组成的无界阻塞 TransferQueue 队列。
LinkedTransferQueue采用一种预占模式。意思就是消费者线程取元素时如果队列不为空则直接取走数据若队列为空那就生成一个节点节点元素为null入队然后消费者线程被等待在这个节点上后面生产者线程入队时发现有一个元素为null的节点生产者线程就不入队了直接就将元素填充到该节点并唤醒该节点等待的线程被唤醒的消费者线程取走元素从调用的方法返回。我们称这种节点操作为“匹配”方式。
队列实现了 TransferQueue 接口重写了 tryTransfer 和 transfer 方法这组方法和 SynchronousQueue 公平模式的队列类似具有匹配的功能
LinkedBlockingDeque
LinkedBlockingDeque 是一个由链表结构组成的双向阻塞队列。
所谓双向队列指的你可以从队列的两端插入和移出元素。双端队列因为多了一个操作队列的入口在多线程同时入队时也就减少了一半的竞争。相比其他的阻塞队列LinkedBlockingDeque 多了 addFirstaddLastofferFirstofferLastpeekFirstpeekLast 等方法以 First 单词结尾的方法表示插入获取peek或移除双端队列的第一个元素。以 Last 单词结尾的方法表示插入获取或移除双端队列的最后一个元素。另外插入方法 add 等同于 addLast移除方法 remove 等效于 removeFirst。
在初始化 LinkedBlockingDeque 时可以设置容量防止其过渡膨胀默认容量也是 Integer.MAX_VALUE。另外双向阻塞队列可以运用在“工作窃取”模式中。
阻塞队列使用场景
我们常用的生产者消费者模式就可以基于阻塞队列实现
线程池中活跃线程数达到 corePoolSize 时线程池将会将后续的 task 提交到 BlockingQueue 中
生产者消费者模式
JDK API文档的 BlockingQueue 给出了一个典型的应用 面试题一个初始值为 0 的变量两个线程对齐交替操作一个1一个-15 轮 public class ProdCounsume_TraditionDemo {public static void main(String[] args) {ShareData shareData new ShareData();new Thread(() - {for (int i 0; i 5; i) {shareData.increment();}}, T1).start();new Thread(() - {for (int i 0; i 5; i) {shareData.decrement();}}, T1).start();}
}//线程操作资源类
class ShareData {private int num 0;private Lock lock new ReentrantLock();private Condition condition lock.newCondition();public void increment() {lock.lock();try {while (num ! 0) {//等待不能生产condition.await();}//干活num;System.out.println(Thread.currentThread().getName() \t num);//唤醒condition.signal();} catch (InterruptedException e) {e.printStackTrace();} finally {lock.unlock();}}public void decrement() {lock.lock();try {while (num 0) {//等待不能生产condition.await();}//干活num--;System.out.println(Thread.currentThread().getName() \t num);//唤醒condition.signal();} catch (InterruptedException e) {e.printStackTrace();} finally {lock.unlock();}}
}
线程池
线程池的核心方法 ThreadPoolExecutor用 BlockingQueue 存放任务的阻塞队列被提交但尚未被执行的任务
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueueRunnable workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler)
线程池在内部实际也是构建了一个生产者消费者模型将线程和任务两者解耦并不直接关联从而良好的缓冲任务复用线程。 不同的线程池实现用的是不同的阻塞队列newFixedThreadPool 和 newSingleThreadExecutor 用的是LinkedBlockingQueuenewCachedThreadPool 用的是 SynchronousQueue。 文章持续更新可以微信搜「 JavaKeeper 」第一时间阅读无套路领取 500 本电子书和 30 视频教学和源码本文 GitHub github.com/JavaKeeper 已经收录Javaer 开发、面试必备技能兵器谱有你想要的。 参考与感谢
Home - 廖雪峰的官方网站
SynchronousQueue源码 并发编程之 SynchronousQueue 核心源码分析 - 掘金