杭州网站优化培训,电商运营方案,凡客诚品是品牌吗,陕西住房和城乡建设部网站1. 什么是阻塞队列#xff1f;阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是#xff1a;在队列为空时#xff0c;获取元素的线程会等待队列变为非空。当队列满时#xff0c;存储元素的线程会等待队列可用。阻塞队列常用于生产者和消费者的场景…1. 什么是阻塞队列阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是在队列为空时获取元素的线程会等待队列变为非空。当队列满时存储元素的线程会等待队列可用。阻塞队列常用于生产者和消费者的场景生产者是往队列里添加元素的线程消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器而消费者也只从容器里拿元素。阻塞队列提供了四种处理方法:方法\处理方式抛出异常返回特殊值一直阻塞超时退出插入方法add(e)offer(e)put(e)offer(e,time,unit)移除方法remove()poll()take()poll(time,unit)检查方法element()peek()不可用不可用抛出异常是指当阻塞队列满时候再往队列里插入元素会抛出IllegalStateException(Queue full)异常。当队列为空时从队列里获取元素时会抛出NoSuchElementException异常 。返回特殊值插入方法会返回是否成功成功则返回true。移除方法则是从队列里拿出一个元素如果没有则返回null一直阻塞当阻塞队列满时如果生产者线程往队列里put元素队列会一直阻塞生产者线程直到拿到数据或者响应中断退出。当队列空时消费者线程试图从队列里take元素队列也会阻塞消费者线程直到队列可用。超时退出当阻塞队列满时队列会阻塞生产者线程一段时间如果超过一定的时间生产者线程就会退出。2. Java里的阻塞队列JDK7提供了7个阻塞队列。分别是ArrayBlockingQueue 一个由数组结构组成的有界阻塞队列。LinkedBlockingQueue 一个由链表结构组成的有界阻塞队列。PriorityBlockingQueue 一个支持优先级排序的×××阻塞队列。DelayQueue一个使用优先级队列实现的×××阻塞队列。SynchronousQueue一个不存储元素的阻塞队列。LinkedTransferQueue一个由链表结构组成的×××阻塞队列。LinkedBlockingDeque一个由链表结构组成的双向阻塞队列。ArrayBlockingQueue是一个用数组实现的有界阻塞队列。此队列按照先进先出(FIFO)的原则对元素进行排序。默认情况下不保证访问者公平的访问队列所谓公平访问队列是指阻塞的所有生产者线程或消费者线程当队列可用时可以按照阻塞的先后顺序访问队列即先阻塞的生产者线程可以先往队列里插入元素先阻塞的消费者线程可以先从队列里获取元素。通常情况下为了保证公平性会降低吞吐量。我们可以使用以下代码创建一个公平的阻塞队列ArrayBlockingQueue fairQueue new ArrayBlockingQueue(1000,true);访问者的公平性是使用可重入锁实现的代码如下public ArrayBlockingQueue(int capacity, boolean fair) {if (capacity 0)throw new IllegalArgumentException();this.items new Object[capacity];lock new ReentrantLock(fair);notEmpty lock.newCondition();notFull lock.newCondition();}LinkedBlockingQueue是一个用链表实现的有界阻塞队列。此队列的默认和最大长度为Integer.MAX_VALUE。此队列按照先进先出的原则对元素进行排序。PriorityBlockingQueue是一个支持优先级的×××队列。默认情况下元素采取自然顺序排列也可以通过比较器comparator来指定元素的排序规则。元素按照升序排列。DelayQueue是一个支持延时获取元素的×××阻塞队列。队列使用PriorityQueue来实现。队列中的元素必须实现Delayed接口在创建元素时可以指定多久才能从队列中获取当前元素。只有在延迟期满时才能从队列中提取元素。我们可以将DelayQueue运用在以下应用场景缓存系统的设计可以用DelayQueue保存缓存元素的有效期使用一个线程循环查询DelayQueue一旦能从DelayQueue中获取元素时表示缓存有效期到了。定时任务调度。使用DelayQueue保存当天将会执行的任务和执行时间一旦从DelayQueue中获取到任务就开始执行从比如TimerQueue就是使用DelayQueue实现的。队列中的Delayed必须实现compareTo来指定元素的顺序。比如让延时时间最长的放在队列的末尾。实现代码如下public int compareTo(Delayed other) {if (other this) // compare zero ONLY if same objectreturn 0;if (other instanceof ScheduledFutureTask) {ScheduledFutureTask x (ScheduledFutureTask)other;long diff time - x.time;if (diff 0)return -1;else if (diff 0)return 1;else if (sequenceNumber x.sequenceNumber)return -1;elsereturn 1;}long d (getDelay(TimeUnit.NANOSECONDS) -other.getDelay(TimeUnit.NANOSECONDS));return (d 0) ? 0 : ((d 0) ? -1 : 1);}如何实现Delayed接口我们可以参考ScheduledThreadPoolExecutor里ScheduledFutureTask类。这个类实现了Delayed接口。首先在对象创建的时候使用time记录前对象什么时候可以使用代码如下ScheduledFutureTask(Runnable r, V result, long ns, long period) {super(r, result);this.time ns;this.period period;this.sequenceNumber sequencer.getAndIncrement();}然后使用getDelay可以查询当前元素还需要延时多久代码如下public long getDelay(TimeUnit unit) {return unit.convert(time - now(), TimeUnit.NANOSECONDS);}通过构造函数可以看出延迟时间参数ns的单位是纳秒自己设计的时候最好使用纳秒因为getDelay时可以指定任意单位一旦以纳秒作为单位而延时的时间又精确不到纳秒就麻烦了。使用时请注意当time小于当前时间时getDelay会返回负数。如何实现延时队列延时队列的实现很简单当消费者从队列里获取元素时如果元素没有达到延时时间就阻塞当前线程。long delay first.getDelay(TimeUnit.NANOSECONDS);if (delay 0)return q.poll();else if (leader ! null)available.await();SynchronousQueue是一个不存储元素的阻塞队列。每一个put操作必须等待一个take操作否则不能继续添加元素。SynchronousQueue可以看成是一个传球手负责把生产者线程处理的数据直接传递给消费者线程。队列本身并不存储任何元素非常适合于传递性场景,比如在一个线程中使用的数据传递给另外一个线程使用SynchronousQueue的吞吐量高于LinkedBlockingQueue 和 ArrayBlockingQueue。LinkedTransferQueue是一个由链表结构组成的×××阻塞TransferQueue队列。相对于其他阻塞队列LinkedTransferQueue多了tryTransfer和transfer方法。transfer方法。如果当前有消费者正在等待接收元素(消费者使用take()方法或带时间限制的poll()方法时)transfer方法可以把生产者传入的元素立刻transfer(传输)给消费者。如果没有消费者在等待接收元素transfer方法会将元素存放在队列的tail节点并等到该元素被消费者消费了才返回。transfer方法的关键代码如下Node pred tryAppend(s, haveData);return awaitMatch(s, pred, e, (how TIMED), nanos);第一行代码是试图把存放当前元素的s节点作为tail节点。第二行代码是让CPU自旋等待消费者消费元素。因为自旋会消耗CPU所以自旋一定的次数后使用Thread.yield()方法来暂停当前正在执行的线程并执行其他线程。tryTransfer方法。则是用来试探下生产者传入的元素是否能直接传给消费者。如果没有消费者等待接收元素则返回false。和transfer方法的区别是tryTransfer方法无论消费者是否接收方法立即返回。而transfer方法是必须等到消费者消费了才返回。对于带有时间限制的tryTransfer(E e, long timeout, TimeUnit unit)方法则是试图把生产者传入的元素直接传给消费者但是如果没有消费者消费该元素则等待指定的时间再返回如果超时还没消费元素则返回false如果在超时时间内消费了元素则返回true。LinkedBlockingDeque是一个由链表结构组成的双向阻塞队列。所谓双向队列指的你可以从队列的两端插入和移出元素。双端队列因为多了一个操作队列的入口在多线程同时入队时也就减少了一半的竞争。相比其他的阻塞队列LinkedBlockingDeque多了addFirstaddLastofferFirstofferLastpeekFirstpeekLast等方法以First单词结尾的方法表示插入获取(peek)或移除双端队列的第一个元素。以Last单词结尾的方法表示插入获取或移除双端队列的最后一个元素。另外插入方法add等同于addLast移除方法remove等效于removeFirst。但是take方法却等同于takeFirst不知道是不是Jdk的bug使用时还是用带有First和Last后缀的方法更清楚。在初始化LinkedBlockingDeque时可以设置容量防止其过渡膨胀。另外双向阻塞队列可以运用在“工作窃取”模式中。3. 阻塞队列的实现原理如果队列是空的消费者会一直等待当生产者添加元素时候消费者是如何知道当前队列有元素的呢如果让你来设计阻塞队列你会如何设计让生产者和消费者能够高效率的进行通讯呢让我们先来看看JDK是如何实现的。使用通知模式实现。所谓通知模式就是当生产者往满的队列里添加元素时会阻塞住生产者当消费者消费了一个队列中的元素后会通知生产者当前队列可用。通过查看JDK源码发现ArrayBlockingQueue使用了Condition来实现代码如下private final Condition notFull;private final Condition notEmpty;public ArrayBlockingQueue(int capacity, boolean fair) {//省略其他代码notEmpty lock.newCondition();notFull lock.newCondition();}public void put(E e) throws InterruptedException {checkNotNull(e);final ReentrantLock lock this.lock;lock.lockInterruptibly();try {while (count items.length)notFull.await();insert(e);} finally {lock.unlock();}}public E take() throws InterruptedException {final ReentrantLock lock this.lock;lock.lockInterruptibly();try {while (count 0)notEmpty.await();return extract();} finally {lock.unlock();}}private void insert(E x) {items[putIndex] x;putIndex inc(putIndex);count;notEmpty.signal();}当我们往队列里插入一个元素时如果队列不可用阻塞生产者主要通过LockSupport.park(this);来实现public final void await() throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();Node node addConditionWaiter();int savedState fullyRelease(node);int interruptMode 0;while (!isOnSyncQueue(node)) {LockSupport.park(this);if ((interruptMode checkInterruptWhileWaiting(node)) ! 0)break;}if (acquireQueued(node, savedState) interruptMode ! THROW_IE)interruptMode REINTERRUPT;if (node.nextWaiter ! null) // clean up if cancelledunlinkCancelledWaiters();if (interruptMode ! 0)reportInterruptAfterWait(interruptMode);}继续进入源码发现调用setBlocker先保存下将要阻塞的线程然后调用unsafe.park阻塞当前线程。public static void park(Object blocker) {Thread t Thread.currentThread();setBlocker(t, blocker);unsafe.park(false, 0L);setBlocker(t, null);}unsafe.park是个native方法代码如下public native void park(boolean isAbsolute, long time);park这个方法会阻塞当前线程只有以下四种情况中的一种发生时该方法才会返回。与park对应的unpark执行或已经执行时。注意已经执行是指unpark先执行然后再执行的park。线程被中断时。如果参数中的time不是零等待了指定的毫秒数时。发生异常现象时。这些异常事先无法确定。我们继续看一下JVM是如何实现park方法的park在不同的操作系统使用不同的方式实现在linux下是使用的是系统方法pthread_cond_wait实现。实现代码在JVM源码路径src/os/linux/vm/os_linux.cpp里的 os::PlatformEvent::park方法代码如下void os::PlatformEvent::park() {int v ;for (;;) {v _Event ;if (Atomic::cmpxchg (v-1, _Event, v) v) break ;}guarantee (v 0, invariant) ;if (v 0) {// Do this the hard way by blocking ...int status pthread_mutex_lock(_mutex);assert_status(status 0, status, mutex_lock);guarantee (_nParked 0, invariant) ; _nParked ;while (_Event 0) {status pthread_cond_wait(_cond, _mutex);// for some reason, under 2.7 lwp_cond_wait() may return ETIME ...// Treat this the same as if the wait was interruptedif (status ETIME) { status EINTR; }assert_status(status 0 || status EINTR, status, cond_wait);}-- _nParked ;// In theory we could move the ST of 0 into _Event past the unlock(),// but then wed need a MEMBAR after the ST._Event 0 ;status pthread_mutex_unlock(_mutex);assert_status(status 0, status, mutex_unlock);}guarantee (_Event 0, invariant) ;}}pthread_cond_wait是一个多线程的条件变量函数cond是condition的缩写字面意思可以理解为线程在等待一个条件发生这个条件是一个全局变量。这个方法接收两个参数一个共享变量_cond一个互斥量_mutex。而unpark方法在linux下是使用pthread_cond_signal实现的。park 在windows下则是使用WaitForSingleObject实现的。当队列满时生产者往阻塞队列里插入一个元素生产者线程会进入WAITING (parking)状态。我们可以使用jstack dump阻塞的生产者线程看到这点main prio5 tid0x00007fc83c000000 nid0x10164e000 waiting on condition [0x000000010164d000]java.lang.Thread.State: WAITING (parking)at sun.misc.Unsafe.park(Native Method)- parking to wait for 0x0000000140559fe8 (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)at java.util.concurrent.ArrayBlockingQueue.put(ArrayBlockingQueue.java:324)at blockingqueue.ArrayBlockingQueueTest.main(ArrayBlockingQueueTest.java:11)4. 参考资料