韩家英设计公司官网,快速排名软件seo系统,wordpress 管理员登陆,襄阳网站seo诊断文章目录 再谈多线程并发与并行顺序执行并发执行并行执行 再谈锁机制重量级锁轻量级锁偏向锁锁消除和锁粗化 JMM内存模型Java内存模型重排序volatile关键字happens-before原则 多线程编程核心锁框架Lock和Condition接口可重入锁公平锁与非公平锁 读写锁锁降级和锁升级 队列同步… 文章目录 再谈多线程并发与并行顺序执行并发执行并行执行 再谈锁机制重量级锁轻量级锁偏向锁锁消除和锁粗化 JMM内存模型Java内存模型重排序volatile关键字happens-before原则 多线程编程核心锁框架Lock和Condition接口可重入锁公平锁与非公平锁 读写锁锁降级和锁升级 队列同步器AQS底层实现公平锁一定公平吗Condition实现原理自行实现锁类 原子类原子类介绍ABA问题及解决方案 并发容器传统容器线程安全吗并发容器介绍阻塞队列 并发编程进阶线程池线程池的使用执行带返回值的任务执行定时任务线程池实现原理 并发工具类计数器锁 CountDownLatch循环屏障 CyclicBarrier信号量 Semaphore数据交换 ExchangerFork/Join框架 资料来自
itbaima 再谈多线程 JUC相对于Java应用层的学习难度更大开篇推荐掌握的预备知识JavaSE多线程部分必备、操作系统、JVM**推荐**、计算机组成原理。掌握预备知识会让你的学习更加轻松其中JavaSE多线程部分要求必须掌握否则无法继续学习本教程我们不会再去重复教学JavaSE阶段的任何知识了。 各位小伙伴一定要点击收藏按钮收藏 学会 还记得我们在JavaSE中学习的多线程吗让我们来回顾一下
在我们的操作系统之上可以同时运行很多个进程并且每个进程之间相互隔离互不干扰。我们的CPU会通过时间片轮转算法为每一个进程分配时间片并在时间片使用结束后切换下一个进程继续执行通过这种方式来实现宏观上的多个程序同时运行。
由于每个进程都有一个自己的内存空间进程之间的通信就变得非常麻烦比如要共享某些数据而且执行不同进程会产生上下文切换非常耗时那么有没有一种更好地方案呢
后来线程横空出世一个进程可以有多个线程线程是程序执行中一个单一的顺序控制流程现在线程才是程序执行流的最小单元各个线程之间共享程序的内存空间也就是所在进程的内存空间上下文切换速度也高于进程。
现在有这样一个问题
public static void main(String[] args) {int[] arr new int[]{3, 1, 5, 2, 4};//请将上面的数组按升序输出
}按照正常思维我们肯定是这样
public static void main(String[] args) {int[] arr new int[]{3, 1, 5, 2, 4};//直接排序吧Arrays.sort(arr);for (int i : arr) {System.out.println(i);}
}而我们学习了多线程之后可以换个思路来实现
public static void main(String[] args) {int[] arr new int[]{3, 1, 5, 2, 4};for (int i : arr) {new Thread(() - {try {Thread.sleep(i * 1000); //越小的数休眠时间越短优先被打印System.out.println(i);} catch (InterruptedException e) {e.printStackTrace();}}).start();}
}我们接触过的很多框架都在使用多线程比如Tomcat服务器所有用户的请求都是通过不同的线程来进行处理的这样我们的网站才可以同时响应多个用户的请求要是没有多线程可想而知服务器的处理效率会有多低。
虽然多线程能够为我们解决很多问题但是如何才能正确地使用多线程如何才能将多线程的资源合理使用这都是我们需要关心的问题。
在Java 5的时候新增了java.util.concurrentJUC包其中包括大量用于多线程编程的工具类目的是为了更好的支持高并发任务让开发者进行多线程编程时减少竞争条件和死锁的问题通过使用这些工具类我们的程序会更加合理地使用多线程。而我们这一系列视频的主角正是JUC。
但是我们先不着急去看这些内容第一章我们先来补点基础知识。 并发与并行
我们经常听到并发编程那么这个并发代表的是什么意思呢而与之相似的并行又是什么意思它们之间有什么区别
比如现在一共有三个工作需要我们去完成。 顺序执行
顺序执行其实很好理解就是我们依次去将这些任务完成了 实际上就是我们同一时间只能处理一个任务所以需要前一个任务完成之后才能继续下一个任务依次完成所有任务。
并发执行
并发执行也是我们同一时间只能处理一个任务但是我们可以每个任务轮着做时间片轮转 只要我们单次处理分配的时间足够的短在宏观看来就是三个任务在同时进行。
而我们Java中的线程正是这种机制当我们需要同时处理上百个上千个任务时很明显CPU的数量是不可能赶得上我们的线程数的所以说这时就要求我们的程序有良好的并发性能来应对同一时间大量的任务处理。学习Java并发编程能够让我们在以后的实际场景中知道该如何应对高并发的情况。
并行执行
并行执行就突破了同一时间只能处理一个任务的限制我们同一时间可以做多个任务 比如我们要进行一些排序操作就可以用到并行计算只需要等待所有子任务完成最后将结果汇总即可。包括分布式计算模型MapReduce也是采用的并行计算思路。 再谈锁机制
谈到锁机制相信各位应该并不陌生了我们在JavaSE阶段通过使用synchronized关键字来实现锁这样就能够很好地解决线程之间争抢资源的情况。那么synchronized底层到底是如何实现的呢
我们知道使用synchronized一定是和某个对象相关联的比如我们要对某一段代码加锁那么我们就需要提供一个对象来作为锁本身
public static void main(String[] args) {synchronized (Main.class) {//这里使用的是Main类的Class对象作为锁}
}我们来看看它变成字节码之后会用到哪些指令 其中最关键的就是monitorenter指令了可以看到之后也有monitorexit与之进行匹配注意这里有2个monitorenter和monitorexit分别对应加锁和释放锁在执行monitorenter之前需要尝试获取锁每个对象都有一个monitor监视器与之对应而这里正是去获取对象监视器的所有权一旦monitor所有权被某个线程持有那么其他线程将无法获得管程模型的一种实现。
在代码执行完成之后我们可以看到一共有两个monitorexit在等着我们那么为什么这里会有两个呢按理说monitorenter和monitorexit不应该一一对应吗这里为什么要释放锁两次呢
首先我们来看第一个这里在释放锁之后会马上进入到一个goto指令跳转到15行而我们的15行对应的指令就是方法的返回指令其实正常情况下只会执行第一个monitorexit释放锁在释放锁之后就接着同步代码块后面的内容继续向下执行了。而第二个其实是用来处理异常的可以看到它的位置是在12行如果程序运行发生异常那么就会执行第二个monitorexit并且会继续向下通过athrow指令抛出异常而不是直接跳转到15行正常运行下去。 实际上synchronized使用的锁就是存储在Java对象头中的我们知道对象是存放在堆内存中的而每个对象内部都有一部分空间用于存储对象头信息而对象头信息中则包含了Mark Word用于存放hashCode和对象的锁信息在不同状态下它存储的数据结构有一些不同。 重量级锁
在JDK6之前synchronized一直被称为重量级锁monitor依赖于底层操作系统的Lock实现Java的线程是映射到操作系统的原生线程上切换成本较高。而在JDK6之后锁的实现得到了改进。我们先从最原始的重量级锁开始
我们说了每个对象都有一个monitor与之关联在Java虚拟机HotSpot中monitor是由ObjectMonitor实现的
ObjectMonitor() {_header NULL;_count 0; //记录个数_waiters 0,_recursions 0;_object NULL;_owner NULL;_WaitSet NULL; //处于wait状态的线程会被加入到_WaitSet_WaitSetLock 0 ;_Responsible NULL ;_succ NULL ;_cxq NULL ;FreeNext NULL ;_EntryList NULL ; //处于等待锁block状态的线程会被加入到该列表_SpinFreq 0 ;_SpinClock 0 ;OwnerIsThread 0 ;
}每个等待锁的线程都会被封装成ObjectWaiter对象进入到如下机制 ObjectWaiter首先会进入 Entry Set等着当线程获取到对象的monitor后进入 The Owner 区域并把monitor中的owner变量设置为当前线程同时monitor中的计数器count加1若线程调用wait()方法将释放当前持有的monitorowner变量恢复为nullcount自减1同时该线程进入 WaitSet集合中等待被唤醒。若当前线程执行完毕也将释放monitor并复位变量的值以便其他线程进入获取对象的monitor。
虽然这样的设计思路非常合理但是在大多数应用上每一个线程占用同步代码块的时间并不是很长我们完全没有必要将竞争中的线程挂起然后又唤醒并且现代CPU基本都是多核心运行的我们可以采用一种新的思路来实现锁。
在JDK1.4.2时引入了自旋锁JDK6之后默认开启它不会将处于等待状态的线程挂起而是通过无限循环的方式不断检测是否能够获取锁由于单个线程占用锁的时间非常短所以说循环次数不会太多可能很快就能够拿到锁并运行这就是自旋锁。当然仅仅是在等待时间非常短的情况下自旋锁的表现会很好但是如果等待时间太长由于循环是需要处理器继续运算的所以这样只会浪费处理器资源因此自旋锁的等待时间是有限制的默认情况下为10次如果失败那么会进而采用重量级锁机制。 在JDK6之后自旋锁得到了一次优化自旋的次数限制不再是固定的而是自适应变化的比如在同一个锁对象上自旋等待刚刚成功获得过锁并且持有锁的线程正在运行那么这次自旋也是有可能成功的所以会允许自旋更多次。当然如果某个锁经常都自旋失败那么有可能会不再采用自旋策略而是直接使用重量级锁。
轻量级锁 从JDK 1.6开始为了减少获得锁和释放锁带来的性能消耗就引入了轻量级锁。 轻量级锁的目标是在无竞争情况下减少重量级锁产生的性能消耗并不是为了代替重量级锁实际上就是赌一手同一时间只有一个线程在占用资源包括系统调用引起的内核态与用户态切换、线程阻塞造成的线程切换等。它不像是重量级锁那样需要向操作系统申请互斥量。它的运作机制如下
在即将开始执行同步代码块中的内容时会首先检查对象的Mark Word查看锁对象是否被其他线程占用如果没有任何线程占用那么会在当前线程中所处的栈帧中建立一个名为锁记录Lock Record的空间用于复制并存储对象目前的Mark Word信息官方称为Displaced Mark Word。
接着虚拟机将使用CAS操作将对象的Mark Word更新为轻量级锁状态数据结构变为指向Lock Record的指针指向的是当前的栈帧 CASCompare And Swap是一种无锁算法我们之前在Springboot阶段已经讲解过了它并不会为对象加锁而是在执行的时候看看当前数据的值是不是我们预期的那样如果是那就正常进行替换如果不是那么就替换失败。比如有两个线程都需要修改变量i的值默认为10现在一个线程要将其修改为20另一个要修改为30如果他们都使用CAS算法那么并不会加锁访问i而是直接尝试修改i的值但是在修改时需要确认i是不是10如果是表示其他线程还没对其进行修改如果不是那么说明其他线程已经将其修改此时不能完成修改任务修改失败。 在CPU中CAS操作使用的是cmpxchg指令能够从最底层硬件层面得到效率的提升。 如果CAS操作失败了的话那么说明可能这时有线程已经进入这个同步代码块了这时虚拟机会再次检查对象的Mark Word是否指向当前线程的栈帧如果是说明不是其他线程而是当前线程已经有了这个对象的锁直接放心大胆进同步代码块即可。如果不是那确实是被其他线程占用了。
这时轻量级锁一开始的想法就是错的这时有对象在竞争资源已经赌输了所以说只能将锁膨胀为重量级锁按照重量级锁的操作执行注意锁的膨胀是不可逆的 所以轻量级锁 - 失败 - 自适应自旋锁 - 失败 - 重量级锁
解锁过程同样采用CAS算法如果对象的MarkWord仍然指向线程的锁记录那么就用CAS操作把对象的MarkWord和复制到栈帧中的Displaced Mark Word进行交换。如果替换失败说明其他线程尝试过获取该锁在释放锁的同时需要唤醒被挂起的线程。
偏向锁
偏向锁相比轻量级锁更纯粹干脆就把整个同步都消除掉不需要再进行CAS操作了。它的出现主要是得益于人们发现某些情况下某个锁频繁地被同一个线程获取这种情况下我们可以对轻量级锁进一步优化。
偏向锁实际上就是专门为单个线程而生的当某个线程第一次获得锁时如果接下来都没有其他线程获取此锁那么持有锁的线程将不再需要进行同步操作。
可以从之前的MarkWord结构中看到偏向锁也会通过CAS操作记录线程的ID如果一直都是同一个线程获取此锁那么完全没有必要在进行额外的CAS操作。当然如果有其他线程来抢了那么偏向锁会根据当前状态决定是否要恢复到未锁定或是膨胀为轻量级锁。
如果我们需要使用偏向锁可以添加-XX:UseBiased参数来开启。
所以最终的锁等级为未锁定 偏向锁 轻量级锁 重量级锁
值得注意的是如果对象通过调用hashCode()方法计算过对象的一致性哈希值那么它是不支持偏向锁的会直接进入到轻量级锁状态因为Hash是需要被保存的而偏向锁的Mark Word数据结构无法保存Hash值如果对象已经是偏向锁状态再去调用hashCode()方法那么会直接将锁升级为重量级锁并将哈希值存放在monitor有预留位置保存中。 锁消除和锁粗化
锁消除和锁粗化都是在运行时的一些优化方案比如我们某段代码虽然加了锁但是在运行时根本不可能出现各个线程之间资源争夺的情况这种情况下完全不需要任何加锁机制所以锁会被消除。锁粗化则是我们代码中频繁地出现互斥同步操作比如在一个循环内部加锁这样明显是非常消耗性能的所以虚拟机一旦检测到这种操作会将整个同步范围进行扩展。 JMM内存模型
注意这里提到的内存模型和我们在JVM中介绍的内存模型不在同一个层次JVM中的内存模型是虚拟机规范对整个内存区域的规划而Java内存模型是在JVM内存模型之上的抽象模型具体实现依然是基于JVM内存模型实现的我们会在后面介绍。
Java内存模型
我们在计算机组成原理中学习过在我们的CPU中一般都会有高速缓存而它的出现是为了解决内存的速度跟不上处理器的处理速度的问题所以CPU内部会添加一级或多级高速缓存来提高处理器的数据获取效率但是这样也会导致一个很明显的问题因为现在基本都是多核心处理器每个处理器都有一个自己的高速缓存那么又该怎么去保证每个处理器的高速缓存内容一致呢 为了解决缓存一致性的问题需要各个处理器访问缓存时都遵循一些协议在读写时要根据协议来进行操作这类协议有MSI、MESIIllinois Protocol、MOSI、Synapse、Firefly及Dragon Protocol等。
而Java也采用了类似的模型来实现支持多线程的内存模型 JMMJava Memory Model内存模型规定如下
所有的变量全部存储在主内存注意这里包括下面提到的变量指的都是会出现竞争的变量包括成员变量、静态变量等而局部变量这种属于线程私有不包括在内每条线程有着自己的工作内存可以类比CPU的高速缓存线程对变量的所有操作必须在工作内存中进行不能直接操作主内存中的数据。不同线程之间的工作内存相互隔离如果需要在线程之间传递内容只能通过主内存完成无法直接访问对方的工作内存。
也就是说每一条线程如果要操作主内存中的数据那么得先拷贝到自己的工作内存中并对工作内存中数据的副本进行操作操作完成之后也需要从工作副本中将结果拷贝回主内存中具体的操作就是Save保存和Load加载操作。
那么各位肯定会好奇这个内存模型结合之前JVM所讲的内容具体是怎么实现的呢
主内存对应堆中存放对象的实例的部分。工作内存对应线程的虚拟机栈的部分区域虚拟机可能会对这部分内存进行优化将其放在CPU的寄存器或是高速缓存中。比如在访问数组时由于数组是一段连续的内存空间所以可以将一部分连续空间放入到CPU高速缓存中那么之后如果我们顺序读取这个数组那么大概率会直接缓存命中。
前面我们提到在CPU中可能会遇到缓存不一致的问题而Java中也会遇到比如下面这种情况
public class Main {private static int i 0;public static void main(String[] args) throws InterruptedException {new Thread(() - {for (int j 0; j 100000; j) i;System.out.println(线程1结束);}).start();new Thread(() - {for (int j 0; j 100000; j) i;System.out.println(线程2结束);}).start();//等上面两个线程结束Thread.sleep(1000);System.out.println(i);}
}可以看到这里是两个线程同时对变量i各自进行100000次自增操作但是实际得到的结果并不是我们所期望的那样。
那么为什么会这样呢在之前学习了JVM之后相信各位应该已经知道自增操作实际上并不是由一条指令完成的注意一定不要理解为一行代码就是一个指令完成的 包括变量i的获取、修改、保存都是被拆分为一个一个的操作完成的那么这个时候就有可能出现在修改完保存之前另一条线程也保存了但是当前线程是毫不知情的。 所以说我们当时在JavaSE阶段讲解这个问题的时候是通过synchronized关键字添加同步代码块解决的当然我们后面还会讲解另外的解决方案原子类
重排序
在编译或执行时为了优化程序的执行效率编译器或处理器常常会对指令进行重排序有以下情况
编译器重排序Java编译器通过对Java代码语义的理解根据优化规则对代码指令进行重排序。机器指令级别的重排序现代处理器很高级能够自主判断和变更机器指令的执行顺序。
指令重排序能够在不改变结果单线程的情况下优化程序的运行效率比如
public static void main(String[] args) {int a 10;int b 20;System.out.println(a b);
}我们其实可以交换第一行和第二行
public static void main(String[] args) {int b 10;int a 20;System.out.println(a b);
}即使发生交换但是我们程序最后的运行结果是不会变的当然这里只通过代码的形式演示实际上JVM在执行字节码指令时也会进行优化可能两个指令并不会按照原有的顺序进行。
虽然单线程下指令重排确实可以起到一定程度的优化作用但是在多线程下似乎会导致一些问题
public class Main {private static int a 0;private static int b 0;public static void main(String[] args) {new Thread(() - {if(b 1) {if(a 0) {System.out.println(A);}else {System.out.println(B);} }}).start();new Thread(() - {a 1;b 1;}).start();}
}上面这段代码在正常情况下按照我们的正常思维是不可能输出A的因为只要b等于1那么a肯定也是1才对因为a是在b之前完成的赋值。但是如果进行了重排序那么就有可能a和b的赋值发生交换b先被赋值为1而恰巧这个时候线程1开始判定b是不是1了这时a还没来得及被赋值为1可能线程1就已经走到打印那里去了所以是有可能输出A的。
volatile关键字
好久好久都没有认识新的关键字了今天我们来看一个新的关键字volatile开始之前我们先介绍三个词语
原子性其实之前讲过很多次了就是要做什么事情要么做完要么就不做不存在做一半的情况。可见性指当多个线程访问同一个变量时一个线程修改了这个变量的值其他线程能够立即看得到修改的值。有序性即程序执行的顺序按照代码的先后顺序执行。
我们之前说了如果多线程访问同一个变量那么这个变量会被线程拷贝到自己的工作内存中进行操作而不是直接对主内存中的变量本体进行操作下面这个操作看起来是一个有限循环但是是无限的
public class Main {private static int a 0;public static void main(String[] args) throws InterruptedException {new Thread(() - {while (a 0);System.out.println(线程结束);}).start();Thread.sleep(1000);System.out.println(正在修改a的值...);a 1; //很明显按照我们的逻辑来说a的值被修改那么另一个线程将不再循环}
}实际上这就是我们之前说的虽然我们主线程中修改了a的值但是另一个线程并不知道a的值发生了改变所以循环中依然是使用旧值在进行判断因此普通变量是不具有可见性的。
要解决这种问题我们第一个想到的肯定是加锁同一时间只能有一个线程使用这样总行了吧确实这样的话肯定是可以解决问题的
public class Main {private static int a 0;public static void main(String[] args) throws InterruptedException {new Thread(() - {while (a 0) {synchronized (Main.class){}}System.out.println(线程结束);}).start();Thread.sleep(1000);System.out.println(正在修改a的值...);synchronized (Main.class){a 1;}}
}但是除了硬加一把锁的方案我们也可以使用volatile关键字来解决此关键字的第一个作用就是保证变量的可见性。当写一个volatile变量时JMM会把该线程本地内存中的变量强制刷新到主内存中去并且这个写会操作会导致其他线程中的volatile变量缓存无效这样另一个线程修改了这个变时当前线程会立即得知并将工作内存中的变量更新为最新的版本。
那么我们就来试试看
public class Main {//添加volatile关键字private static volatile int a 0;public static void main(String[] args) throws InterruptedException {new Thread(() - {while (a 0);System.out.println(线程结束);}).start();Thread.sleep(1000);System.out.println(正在修改a的值...);a 1;}
}结果还真的如我们所说的那样当a发生改变时循环立即结束。
当然虽然说volatile能够保证可见性但是不能保证原子性要解决我们上面的i的问题以我们目前所学的知识还是只能使用加锁来完成
public class Main {private static volatile int a 0;public static void main(String[] args) throws InterruptedException {Runnable r () - {for (int i 0; i 10000; i) a;System.out.println(任务完成);};new Thread(r).start();new Thread(r).start();//等待线程执行完成Thread.sleep(1000);System.out.println(a);}
}不对啊volatile不是能在改变变量的时候其他线程可见吗那为什么还是不能保证原子性呢还是那句话自增操作是被瓜分为了多个步骤完成的虽然保证了可见性但是只要手速够快依然会出现两个线程同时写同一个值的问题比如线程1刚刚将a的值更新为100这时线程2可能也已经执行到更新a的值这条指令了已经刹不住车了所以依然会将a的值再更新为一次100
那要是真的遇到这种情况那么我们不可能都去写个锁吧后面我们会介绍原子类来专门解决这种问题。
最后一个功能就是volatile会禁止指令重排也就是说如果我们操作的是一个volatile变量它将不会出现重排序的情况也就解决了我们最上面的问题。那么它是怎么解决的重排序问题呢若用volatile修饰共享变量在编译时会在指令序列中插入内存屏障来禁止特定类型的处理器重排序 内存屏障Memory Barrier又称内存栅栏是一个CPU指令它的作用有两个 保证特定操作的顺序保证某些变量的内存可见性volatile的内存可见性其实就是依靠这个实现的 由于编译器和处理器都能执行指令重排的优化如果在指令间插入一条Memory Barrier则会告诉编译器和CPU不管什么指令都不能和这条Memory Barrier指令重排序。 屏障类型指令示例说明LoadLoadLoad1;LoadLoad;Load2保证Load1的读取操作在Load2及后续读取操作之前执行StoreStoreStore1;StoreStore;Store2在Store2及其后的写操作执行前保证Store1的写操作已刷新到主内存LoadStoreLoad1;LoadStore;Store2在Store2及其后的写操作执行前保证Load1的读操作已读取结束StoreLoadStore1;StoreLoad;Load2保证load1的写操作已刷新到主内存之后load2及其后的读操作才能执行 所以volatile能够保证之前的指令一定全部执行之后的指令一定都没有执行并且前面语句的结果对后面的语句可见。
最后我们来总结一下volatile关键字的三个特性
保证可见性不保证原子性防止指令重排
在之后我们的设计模式系列视频中还会讲解单例模式下volatile的运用。
happens-before原则
经过我们前面的讲解相信各位已经了解了JMM内存模型以及重排序等机制带来的优点和缺点综上JMM提出了happens-before先行发生原则定义一些禁止编译优化的场景来向各位程序员做一些保证只要我们是按照原则进行编程那么就能够保持并发编程的正确性。具体如下
**程序次序规则**同一个线程中按照程序的顺序前面的操作happens-before后续的任何操作。 同一个线程内代码的执行结果是有序的。其实就是可能会发生指令重排但是保证代码的执行结果一定是和按照顺序执行得到的一致程序前面对某一个变量的修改一定对后续操作可见的不可能会出现前面才把a修改为1接着读a居然是修改前的结果这也是程序运行最基本的要求。 **监视器锁规则**对一个锁的解锁操作happens-before后续对这个锁的加锁操作。 就是无论是在单线程环境还是多线程环境对于同一个锁来说一个线程对这个锁解锁之后另一个线程获取了这个锁都能看到前一个线程的操作结果。比如前一个线程将变量x的值修改为了12并解锁之后另一个线程拿到了这把锁对之前线程的操作是可见的可以得到x是前一个线程修改后的结果12所以synchronized是有happens-before规则的 **volatile变量规则**对一个volatile变量的写操作happens-before后续对这个变量的读操作。 就是如果一个线程先去写一个volatile变量紧接着另一个线程去读这个变量那么这个写操作的结果一定对读的这个变量的线程可见。 **线程启动规则**主线程A启动线程B线程B中可以看到主线程启动B之前的操作。 在主线程A执行过程中启动子线程B那么线程A在启动子线程B之前对共享变量的修改结果对线程B可见。 **线程加入规则**如果线程A执行操作join()线程B并成功返回那么线程B中的任意操作happens-before线程Ajoin()操作成功返回。**传递性规则**如果A happens-before BB happens-before C那么A happens-before C。
那么我们来从happens-before原则的角度来解释一下下面的程序结果
public class Main {private static int a 0;private static int b 0;public static void main(String[] args) {a 10;b a 1;new Thread(() - {if(b 10) System.out.println(a); }).start();}
}首先我们定义以上出现的操作
**A**将变量a的值修改为10**B**将变量b的值修改为a 1**C**主线程启动了一个新的线程并在新的线程中获取b进行判断如果大于10那么就打印a
首先我们来分析由于是同一个线程并且B是一个赋值操作且读取了A那么按照程序次序规则A happens-before B接着在B之后马上执行了C按照线程启动规则在新的线程启动之前当前线程之前的所有操作对新的线程是可见的所以 B happens-before C最后根据传递性规则由于A happens-before BB happens-before C所以A happens-before C因此在新的线程中会输出a修改后的结果10。
多线程编程核心
在前面我们了解了多线程的底层运作机制我们终于知道原来多线程环境下存在着如此之多的问题。在JDK5之前我们只能选择synchronized关键字来实现锁而JDK5之后由于volatile关键字得到了升级具体功能就是上一章所描述的所以并发框架包便出现了相比传统的synchronized关键字我们对于锁的实现有了更多的选择。 Doug Lea — JUC并发包的作者 如果IT的历史是以人为主体串接起来的话那么肯定少不了Doug Lea。这个鼻梁挂着眼镜留着德王威廉二世的胡子脸上永远挂着谦逊腼腆笑容服务于纽约州立大学Oswego分校计算机科学系的老大爷。 说他是这个世界上对Java影响力最大的一个人一点也不为过。因为两次Java历史上的大变革他都间接或直接的扮演了举足轻重的角色。2004年所推出的Tiger。Tiger广纳了15项JSRs(Java Specification Requests)的语法及标准其中一项便是JSR-166。JSR-166是来自于Doug编写的util.concurrent包。 那么从这章开始就让我们来感受一下JUC为我们带来了什么。 锁框架
在JDK 5之后并发包中新增了Lock接口以及相关实现类用来实现锁功能Lock接口提供了与synchronized关键字类似的同步功能但需要在使用时手动获取锁和释放锁。
Lock和Condition接口
使用并发包中的锁和我们传统的synchronized锁不太一样这里的锁我们可以认为是一把真正意义上的锁每个锁都是一个对应的锁对象我只需要向锁对象获取锁或是释放锁即可。我们首先来看看此接口中定义了什么
public interface Lock {//获取锁拿不到锁会阻塞等待其他线程释放锁获取到锁后返回void lock();//同上但是等待过程中会响应中断void lockInterruptibly() throws InterruptedException;//尝试获取锁但是不会阻塞如果能获取到会返回true不能返回falseboolean tryLock();//尝试获取锁但是可以限定超时时间如果超出时间还没拿到锁返回false否则返回true可以响应中断boolean tryLock(long time, TimeUnit unit) throws InterruptedException;//释放锁void unlock();//暂时可以理解为替代传统的Object的wait()、notify()等操作的工具Condition newCondition();
}这里我们可以演示一下如何使用Lock类来进行加锁和释放锁操作
public class Main {private static int i 0;public static void main(String[] args) throws InterruptedException {Lock testLock new ReentrantLock(); //可重入锁ReentrantLock类是Lock类的一个实现我们后面会进行介绍Runnable action () - {for (int j 0; j 100000; j) { //还是以自增操作为例testLock.lock(); //加锁加锁成功后其他线程如果也要获取锁会阻塞等待当前线程释放i;testLock.unlock(); //解锁释放锁之后其他线程就可以获取这把锁了注意在这之前一定得加锁不然报错}};new Thread(action).start();new Thread(action).start();Thread.sleep(1000); //等上面两个线程跑完System.out.println(i);}
}可以看到和我们之前使用synchronized相比我们这里是真正在操作一个锁对象当我们需要加锁时只需要调用lock()方法而需要释放锁时只需要调用unlock()方法。程序运行的最终结果和使用synchronized锁是一样的。
那么我们如何像传统的加锁那样调用对象的wait()和notify()方法呢并发包提供了Condition接口
public interface Condition {//与调用锁对象的wait方法一样会进入到等待状态但是这里需要调用Condition的signal或signalAll方法进行唤醒感觉就是和普通对象的wait和notify是对应的同时等待状态下是可以响应中断的void await() throws InterruptedException;//同上但不响应中断看名字都能猜到void awaitUninterruptibly();//等待指定时间如果在指定时间纳秒内被唤醒会返回剩余时间如果超时会返回0或负数可以响应中断long awaitNanos(long nanosTimeout) throws InterruptedException;//等待指定时间可以指定时间单位如果等待时间内被唤醒返回true否则返回false可以响应中断boolean await(long time, TimeUnit unit) throws InterruptedException;//可以指定一个明确的时间点如果在时间点之前被唤醒返回true否则返回false可以响应中断boolean awaitUntil(Date deadline) throws InterruptedException;//唤醒一个处于等待状态的线程注意还得获得锁才能接着运行void signal();//同上但是是唤醒所有等待线程void signalAll();
}这里我们通过一个简单的例子来演示一下
public static void main(String[] args) throws InterruptedException {Lock testLock new ReentrantLock();Condition condition testLock.newCondition();new Thread(() - {testLock.lock(); //和synchronized一样必须持有锁的情况下才能使用awaitSystem.out.println(线程1进入等待状态);try {condition.await(); //进入等待状态} catch (InterruptedException e) {e.printStackTrace();}System.out.println(线程1等待结束);testLock.unlock();}).start();Thread.sleep(100); //防止线程2先跑new Thread(() - {testLock.lock();System.out.println(线程2开始唤醒其他等待线程);condition.signal(); //唤醒线程1但是此时线程1还必须要拿到锁才能继续运行System.out.println(线程2结束);testLock.unlock(); //这里释放锁之后线程1就可以拿到锁继续运行了}).start();
}可以发现Condition对象使用方法和传统的对象使用差别不是很大。
**思考**下面这种情况跟上面有什么不同
public static void main(String[] args) throws InterruptedException {Lock testLock new ReentrantLock();new Thread(() - {testLock.lock();System.out.println(线程1进入等待状态);try {testLock.newCondition().await();} catch (InterruptedException e) {e.printStackTrace();}System.out.println(线程1等待结束);testLock.unlock();}).start();Thread.sleep(100);new Thread(() - {testLock.lock();System.out.println(线程2开始唤醒其他等待线程);testLock.newCondition().signal();System.out.println(线程2结束);testLock.unlock();}).start();
}通过分析可以得到在调用newCondition()后会生成一个新的Condition对象并且同一把锁内是可以存在多个Condition对象的实际上原始的锁机制等待队列只能有一个而这里可以创建很多个Condition来实现多等待队列而上面的例子中实际上使用的是不同的Condition对象只有对同一个Condition对象进行等待和唤醒操作才会有效而不同的Condition对象是分开计算的。
最后我们再来讲解一下时间单位这是一个枚举类也是位于java.util.concurrent包下
public enum TimeUnit {/*** Time unit representing one thousandth of a microsecond*/NANOSECONDS {public long toNanos(long d) { return d; }public long toMicros(long d) { return d/(C1/C0); }public long toMillis(long d) { return d/(C2/C0); }public long toSeconds(long d) { return d/(C3/C0); }public long toMinutes(long d) { return d/(C4/C0); }public long toHours(long d) { return d/(C5/C0); }public long toDays(long d) { return d/(C6/C0); }public long convert(long d, TimeUnit u) { return u.toNanos(d); }int excessNanos(long d, long m) { return (int)(d - (m*C2)); }},//....可以看到时间单位有很多的比如DAY、SECONDS、MINUTES等我们可以直接将其作为时间单位比如我们要让一个线程等待3秒钟可以像下面这样编写
public static void main(String[] args) throws InterruptedException {Lock testLock new ReentrantLock();new Thread(() - {testLock.lock();try {System.out.println(等待是否未超时testLock.newCondition().await(1, TimeUnit.SECONDS));} catch (InterruptedException e) {e.printStackTrace();}testLock.unlock();}).start();
}当然Lock类的tryLock方法也是支持使用时间单位的各位可以自行进行测试。TimeUnit除了可以作为时间单位表示以外还可以在不同单位之间相互转换
public static void main(String[] args) throws InterruptedException {System.out.println(60秒 TimeUnit.SECONDS.toMinutes(60) 分钟);System.out.println(365天 TimeUnit.DAYS.toSeconds(365) 秒);
}也可以更加便捷地使用对象的wait()方法
public static void main(String[] args) throws InterruptedException {synchronized (Main.class) {System.out.println(开始等待);TimeUnit.SECONDS.timedWait(Main.class, 3); //直接等待3秒System.out.println(等待结束);}
}我们也可以直接使用它来进行休眠操作
public static void main(String[] args) throws InterruptedException {TimeUnit.SECONDS.sleep(1); //休眠1秒钟
}可重入锁
前面我们讲解了锁框架的两个核心接口那么我们接着来看看锁接口的具体实现类我们前面用到了ReentrantLock它其实是锁的一种叫做可重入锁那么这个可重入代表的是什么意思呢简单来说就是同一个线程可以反复进行加锁操作
public static void main(String[] args) throws InterruptedException {ReentrantLock lock new ReentrantLock();lock.lock();lock.lock(); //连续加锁2次new Thread(() - {System.out.println(线程2想要获取锁);lock.lock();System.out.println(线程2成功获取到锁);}).start();lock.unlock();System.out.println(线程1释放了一次锁);TimeUnit.SECONDS.sleep(1);lock.unlock();System.out.println(线程1再次释放了一次锁); //释放两次后其他线程才能加锁
}可以看到主线程连续进行了两次加锁操作此操作是不会被阻塞的在当前线程持有锁的情况下继续加锁不会被阻塞并且加锁几次就必须要解锁几次否则此线程依旧持有锁。我们可以使用getHoldCount()方法查看当前线程的加锁次数
public static void main(String[] args) throws InterruptedException {ReentrantLock lock new ReentrantLock();lock.lock();lock.lock();System.out.println(当前加锁次数lock.getHoldCount()是否被锁lock.isLocked());TimeUnit.SECONDS.sleep(1);lock.unlock();System.out.println(当前加锁次数lock.getHoldCount()是否被锁lock.isLocked());TimeUnit.SECONDS.sleep(1);lock.unlock();System.out.println(当前加锁次数lock.getHoldCount()是否被锁lock.isLocked());
}可以看到当锁不再被任何线程持有时值为0并且通过isLocked()方法查询结果为false。
实际上如果存在线程持有当前的锁那么其他线程在获取锁时是会暂时进入到等待队列的我们可以通过getQueueLength()方法获取等待中线程数量的预估值
public static void main(String[] args) throws InterruptedException {ReentrantLock lock new ReentrantLock();lock.lock();Thread t1 new Thread(lock::lock), t2 new Thread(lock::lock);;t1.start();t2.start();TimeUnit.SECONDS.sleep(1);System.out.println(当前等待锁释放的线程数lock.getQueueLength());System.out.println(线程1是否在等待队列中lock.hasQueuedThread(t1));System.out.println(线程2是否在等待队列中lock.hasQueuedThread(t2));System.out.println(当前线程是否在等待队列中lock.hasQueuedThread(Thread.currentThread()));
}我们可以通过hasQueuedThread()方法来判断某个线程是否正在等待获取锁状态。
同样的Condition也可以进行判断
public static void main(String[] args) throws InterruptedException {ReentrantLock lock new ReentrantLock();Condition condition lock.newCondition();new Thread(() - {lock.lock();try {condition.await();} catch (InterruptedException e) {e.printStackTrace();}lock.unlock();}).start();TimeUnit.SECONDS.sleep(1);lock.lock();System.out.println(当前Condition的等待线程数lock.getWaitQueueLength(condition));condition.signal();System.out.println(当前Condition的等待线程数lock.getWaitQueueLength(condition));lock.unlock();
}通过使用getWaitQueueLength()方法能够查看同一个Condition目前有多少线程处于等待状态。
公平锁与非公平锁
前面我们了解了如果线程之间争抢同一把锁会暂时进入到等待队列中那么多个线程获得锁的顺序是不是一定是根据线程调用lock()方法时间来定的呢我们可以看到ReentrantLock的构造方法中是这样写的
public ReentrantLock() {sync new NonfairSync(); //看名字貌似是非公平的
}其实锁分为公平锁和非公平锁默认我们创建出来的ReentrantLock是采用的非公平锁作为底层锁机制。那么什么是公平锁什么又是非公平锁呢
公平锁多个线程按照申请锁的顺序去获得锁线程会直接进入队列去排队永远都是队列的第一位才能得到锁。非公平锁多个线程去获取锁的时候会直接去尝试获取获取不到再去进入等待队列如果能获取到就直接获取到锁。
简单来说公平锁不让插队都老老实实排着非公平锁让插队但是排队的人让不让你插队就是另一回事了。
我们可以来测试一下公平锁和非公平锁的表现情况
public ReentrantLock(boolean fair) {sync fair ? new FairSync() : new NonfairSync();
}这里我们选择使用第二个构造方法可以选择是否为公平锁实现
public static void main(String[] args) throws InterruptedException {ReentrantLock lock new ReentrantLock(false);Runnable action () - {System.out.println(线程 Thread.currentThread().getName() 开始获取锁...);lock.lock();System.out.println(线程 Thread.currentThread().getName() 成功获取锁);lock.unlock();};for (int i 0; i 10; i) { //建立10个线程new Thread(action, Ti).start();}
}这里我们只需要对比将在1秒后开始获取锁...和成功获取锁的顺序是否一致即可如果是一致那说明所有的线程都是按顺序排队获取的锁如果不是那说明肯定是有线程插队了。
运行结果可以发现在公平模式下确实是按照顺序进行的而在非公平模式下一般会出现这种情况线程刚开始获取锁马上就能抢到并且此时之前早就开始的线程还在等待状态很明显的插队行为。
那么接着下一个问题公平锁在任何情况下都一定是公平的吗有关这个问题我们会留到队列同步器中再进行讨论。 读写锁
除了可重入锁之外还有一种类型的锁叫做读写锁当然它并不是专门用作读写操作的锁它和可重入锁不同的地方在于可重入锁是一种排他锁当一个线程得到锁之后另一个线程必须等待其释放锁否则一律不允许获取到锁。而读写锁在同一时间是可以让多个线程获取到锁的它其实就是针对于读写场景而出现的。
读写锁维护了一个读锁和一个写锁这两个锁的机制是不同的。
读锁在没有任何线程占用写锁的情况下同一时间可以有多个线程加读锁。写锁在没有任何线程占用读锁的情况下同一时间只能有一个线程加写锁。
读写锁也有一个专门的接口
public interface ReadWriteLock {//获取读锁Lock readLock();//获取写锁Lock writeLock();
}此接口有一个实现类ReentrantReadWriteLock实现的是ReadWriteLock接口不是Lock接口它本身并不是锁注意我们操作ReentrantReadWriteLock时不能直接上锁而是需要获取读锁或是写锁再进行锁操作
public static void main(String[] args) throws InterruptedException {ReentrantReadWriteLock lock new ReentrantReadWriteLock();lock.readLock().lock();new Thread(lock.readLock()::lock).start();
}这里我们对读锁加锁可以看到可以多个线程同时对读锁加锁。
public static void main(String[] args) throws InterruptedException {ReentrantReadWriteLock lock new ReentrantReadWriteLock();lock.readLock().lock();new Thread(lock.writeLock()::lock).start();
}有读锁状态下无法加写锁反之亦然
public static void main(String[] args) throws InterruptedException {ReentrantReadWriteLock lock new ReentrantReadWriteLock();lock.writeLock().lock();new Thread(lock.readLock()::lock).start();
}并且ReentrantReadWriteLock不仅具有读写锁的功能还保留了可重入锁和公平/非公平机制比如同一个线程可以重复为写锁加锁并且必须全部解锁才真正释放锁
public static void main(String[] args) throws InterruptedException {ReentrantReadWriteLock lock new ReentrantReadWriteLock();lock.writeLock().lock();lock.writeLock().lock();new Thread(() - {lock.writeLock().lock();System.out.println(成功获取到写锁);}).start();System.out.println(释放第一层锁);lock.writeLock().unlock();TimeUnit.SECONDS.sleep(1);System.out.println(释放第二层锁);lock.writeLock().unlock();
}通过之前的例子来验证公平和非公平
public static void main(String[] args) throws InterruptedException {ReentrantReadWriteLock lock new ReentrantReadWriteLock(true);Runnable action () - {System.out.println(线程 Thread.currentThread().getName() 将在1秒后开始获取锁...);lock.writeLock().lock();System.out.println(线程 Thread.currentThread().getName() 成功获取锁);lock.writeLock().unlock();};for (int i 0; i 10; i) { //建立10个线程new Thread(action, Ti).start();}
}可以看到结果是一致的。
锁降级和锁升级
锁降级指的是写锁降级为读锁。当一个线程持有写锁的情况下虽然其他线程不能加读锁但是线程自己是可以加读锁的
public static void main(String[] args) throws InterruptedException {ReentrantReadWriteLock lock new ReentrantReadWriteLock();lock.writeLock().lock();lock.readLock().lock();System.out.println(成功加读锁);
}那么如果我们在同时加了写锁和读锁的情况下释放写锁是否其他的线程就可以一起加读锁了呢
public static void main(String[] args) throws InterruptedException {ReentrantReadWriteLock lock new ReentrantReadWriteLock();lock.writeLock().lock();lock.readLock().lock();new Thread(() - {System.out.println(开始加读锁);lock.readLock().lock();System.out.println(读锁添加成功);}).start();TimeUnit.SECONDS.sleep(1);lock.writeLock().unlock(); //如果释放写锁会怎么样
}可以看到一旦写锁被释放那么主线程就只剩下读锁了因为读锁可以被多个线程共享所以这时第二个线程也添加了读锁。而这种操作就被称之为锁降级注意不是先释放写锁再加读锁而是持有写锁的情况下申请读锁再释放写锁
注意在仅持有读锁的情况下去申请写锁属于锁升级ReentrantReadWriteLock是不支持的
public static void main(String[] args) throws InterruptedException {ReentrantReadWriteLock lock new ReentrantReadWriteLock();lock.readLock().lock();lock.writeLock().lock();System.out.println(所升级成功);
}可以看到线程直接卡在加写锁的那一句了。
队列同步器AQS
**注意**难度巨大如果对锁的使用不是很熟悉建议之后再来看
前面我们了解了可重入锁和读写锁那么它们的底层实现原理到底是什么样的呢又是大家看到就想跳过的套娃解析环节。
比如我们执行了ReentrantLock的lock()方法那它的内部是怎么在执行的呢
public void lock() {sync.lock();
}可以看到它的内部实际上啥都没做而是交给了Sync对象在进行并且不只是这个方法其他的很多方法都是依靠Sync对象在进行
public void unlock() {sync.release(1);
}那么这个Sync对象是干什么的呢可以看到公平锁和非公平锁都是继承自Sync而Sync是继承自AbstractQueuedSynchronizer简称队列同步器
abstract static class Sync extends AbstractQueuedSynchronizer {//...
}static final class NonfairSync extends Sync {}
static final class FairSync extends Sync {}所以要了解它的底层到底是如何进行操作的还得看队列同步器我们就先从这里下手吧
底层实现
AbstractQueuedSynchronizer下面称为AQS是实现锁机制的基础它的内部封装了包括锁的获取、释放、以及等待队列。
一个锁排他锁为例的基本功能就是获取锁、释放锁、当锁被占用时其他线程来争抢会进入等待队列AQS已经将这些基本的功能封装完成了其中等待队列是核心内容等待队列是由双向链表数据结构实现的每个等待状态下的线程都可以被封装进结点中并放入双向链表中而对于双向链表是以队列的形式进行操作的它像这样 AQS中有一个head字段和一个tail字段分别记录双向链表的头结点和尾结点而之后的一系列操作都是围绕此队列来进行的。我们先来了解一下每个结点都包含了哪些内容
//每个处于等待状态的线程都可以是一个节点并且每个节点是有很多状态的
static final class Node {//每个节点都可以被分为独占模式节点或是共享模式节点分别适用于独占锁和共享锁static final Node SHARED new Node();static final Node EXCLUSIVE null;//等待状态这里都定义好了//唯一一个大于0的状态表示已失效可能是由于超时或中断此节点被取消。static final int CANCELLED 1;//此节点后面的节点被挂起进入等待状态static final int SIGNAL -1; //在条件队列中的节点才是这个状态static final int CONDITION -2;//传播一般用于共享锁static final int PROPAGATE -3;volatile int waitStatus; //等待状态值volatile Node prev; //双向链表基操volatile Node next;volatile Thread thread; //每一个线程都可以被封装进一个节点进入到等待队列Node nextWaiter; //在等待队列中表示模式条件队列中作为下一个结点的指针final boolean isShared() {return nextWaiter SHARED;}final Node predecessor() throws NullPointerException {Node p prev;if (p null)throw new NullPointerException();elsereturn p;}Node() {}Node(Thread thread, Node mode) {this.nextWaiter mode;this.thread thread;}Node(Thread thread, int waitStatus) {this.waitStatus waitStatus;this.thread thread;}
}在一开始的时候head和tail都是nullstate为默认值0
private transient volatile Node head;private transient volatile Node tail;private volatile int state;不用担心双向链表不会进行初始化初始化是在实际使用时才开始的先不管我们接着来看其他的初始化内容
//直接使用Unsafe类进行操作
private static final Unsafe unsafe Unsafe.getUnsafe();
//记录类中属性的在内存中的偏移地址方便Unsafe类直接操作内存进行赋值等直接修改对应地址的内存
private static final long stateOffset; //这里对应的就是AQS类中的state成员字段
private static final long headOffset; //这里对应的就是AQS类中的head头结点成员字段
private static final long tailOffset;
private static final long waitStatusOffset;
private static final long nextOffset;static { //静态代码块在类加载的时候就会自动获取偏移地址try {stateOffset unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField(state));headOffset unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField(head));tailOffset unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField(tail));waitStatusOffset unsafe.objectFieldOffset(Node.class.getDeclaredField(waitStatus));nextOffset unsafe.objectFieldOffset(Node.class.getDeclaredField(next));} catch (Exception ex) { throw new Error(ex); }
}//通过CAS操作来修改头结点
private final boolean compareAndSetHead(Node update) {//调用的是Unsafe类的compareAndSwapObject方法通过CAS算法比较对象并替换return unsafe.compareAndSwapObject(this, headOffset, null, update);
}//同上省略部分代码
private final boolean compareAndSetTail(Node expect, Node update) {private static final boolean compareAndSetWaitStatus(Node node, int expect, int update) {private static final boolean compareAndSetNext(Node node, Node expect, Node update) {可以发现队列同步器由于要使用到CAS算法所以直接使用了Unsafe工具类Unsafe类中提供了CAS操作的方法Java无法实现底层由C实现所有对AQS类中成员字段的修改都有对应的CAS操作封装。
现在我们大致了解了一下它的底层运作机制我们接着来看这个类是如何进行使用的它提供了一些可重写的方法根据不同的锁类型和机制可以自由定制规则并且为独占式和非独占式锁都提供了对应的方法以及一些已经写好的模板方法模板方法会调用这些可重写的方法使用此类只需要将可重写的方法进行重写并调用提供的模板方法从而实现锁功能学习过设计模式会比较好理解一些
我们首先来看可重写方法
//独占式获取同步状态查看同步状态是否和参数一致如果返没有问题那么会使用CAS操作设置同步状态并返回true
protected boolean tryAcquire(int arg) {throw new UnsupportedOperationException();
}//独占式释放同步状态
protected boolean tryRelease(int arg) {throw new UnsupportedOperationException();
}//共享式获取同步状态返回值大于0表示成功否则失败
protected int tryAcquireShared(int arg) {throw new UnsupportedOperationException();
}//共享式释放同步状态
protected boolean tryReleaseShared(int arg) {throw new UnsupportedOperationException();
}//是否在独占模式下被当前线程占用锁是否被当前线程持有
protected boolean isHeldExclusively() {throw new UnsupportedOperationException();
}可以看到这些需要重写的方法默认是直接抛出UnsupportedOperationException也就是说根据不同的锁类型我们需要去实现对应的方法我们可以来看一下ReentrantLock此类是全局独占式的中的公平锁是如何借助AQS实现的
static final class FairSync extends Sync {private static final long serialVersionUID -3000897897090466540L;//加锁操作调用了模板方法acquire//为了防止各位绕晕请时刻记住lock方法一定是在某个线程下为了加锁而调用的并且同一时间可能会有其他线程也在调用此方法final void lock() {acquire(1);}...
}我们先看看加锁操作干了什么事情这里直接调用了AQS提供的模板方法acquire()我们来看看它在AQS类中的实现细节
ReservedStackAccess //这个是JEP 270添加的新注解它会保护被注解的方法通过添加一些额外的空间防止在多线程运行的时候出现栈溢出下同
public final void acquire(int arg) {if (!tryAcquire(arg) acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) //节点为独占模式Node.EXCLUSIVEselfInterrupt();
}首先会调用tryAcquire()方法这里是由FairSync类实现的如果尝试加独占锁失败返回false了说明可能这个时候有其他线程持有了此独占锁所以当前线程得先等着那么会调用addWaiter()方法将线程加入等待队列中
private Node addWaiter(Node mode) {Node node new Node(Thread.currentThread(), mode);// 先尝试使用CAS直接入队如果这个时候其他线程也在入队就是不止一个线程在同一时间争抢这把锁就进入enq()Node pred tail;if (pred ! null) {node.prev pred;if (compareAndSetTail(pred, node)) {pred.next node;return node;}}//此方法是CAS快速入队失败时调用enq(node);return node;
}private Node enq(final Node node) {//自旋形式入队可以看到这里是一个无限循环for (;;) {Node t tail;if (t null) { //这种情况只能说明头结点和尾结点都还没初始化if (compareAndSetHead(new Node())) //初始化头结点和尾结点tail head;} else {node.prev t;if (compareAndSetTail(t, node)) {t.next node;return t; //只有CAS成功的情况下才算入队成功如果CAS失败那说明其他线程同一时间也在入队并且手速还比当前线程快刚好走到CAS操作的时候其他线程就先入队了那么这个时候node.prev就不是我们预期的节点了而是另一个线程新入队的节点所以说得进下一次循环再来一次CAS这种形式就是自旋}}}
}在了解了addWaiter()方法会将节点加入等待队列之后我们接着来看addWaiter()会返回已经加入的节点acquireQueued()在得到返回的节点时也会进入自旋状态等待唤醒也就是开始进入到拿锁的环节了
ReservedStackAccess
final boolean acquireQueued(final Node node, int arg) {boolean failed true;try {boolean interrupted false;for (;;) {final Node p node.predecessor();if (p head tryAcquire(arg)) { //可以看到当此节点位于队首(node.prev head)时会再次调用tryAcquire方法获取锁如果获取成功会返回此过程中是否被中断的值setHead(node); //新的头结点设置为当前结点p.next null; // 原有的头结点没有存在的意义了failed false; //没有失败return interrupted; //直接返回等待过程中是否被中断} //依然没获取成功if (shouldParkAfterFailedAcquire(p, node) //将当前节点的前驱节点等待状态设置为SIGNAL如果失败将直接开启下一轮循环直到成功为止如果成功接着往下parkAndCheckInterrupt()) //挂起线程进入等待状态等待被唤醒如果在等待状态下被中断那么会返回true直接将中断标志设为true否则就是正常唤醒继续自旋interrupted true;}} finally {if (failed)cancelAcquire(node);}
}private final boolean parkAndCheckInterrupt() {LockSupport.park(this); //通过unsafe类操作底层挂起线程会直接进入阻塞状态return Thread.interrupted();
}private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {int ws pred.waitStatus;if (ws Node.SIGNAL)return true; //已经是SIGNAL直接trueif (ws 0) { //不能是已经取消的节点必须找到一个没被取消的do {node.prev pred pred.prev;} while (pred.waitStatus 0);pred.next node; //直接抛弃被取消的节点} else {//不是SIGNAL先CAS设置为SIGNAL这里没有返回true因为CAS不一定成功需要下一轮再判断一次compareAndSetWaitStatus(pred, ws, Node.SIGNAL);}return false; //返回false马上开启下一轮循环
}所以acquire()中的if条件如果为true那么只有一种情况就是等待过程中被中断了其他任何情况下都是成功获取到独占锁所以当等待过程被中断时会调用selfInterrupt()方法
static void selfInterrupt() {Thread.currentThread().interrupt();
}这里就是直接向当前线程发送中断信号了。
上面提到了LockSupport类它是一个工具类我们也可以来玩一下这个park和unpark:
public static void main(String[] args) throws InterruptedException {Thread t Thread.currentThread(); //先拿到主线程的Thread对象new Thread(() - {try {TimeUnit.SECONDS.sleep(1);System.out.println(主线程可以继续运行了);LockSupport.unpark(t);//t.interrupt(); 发送中断信号也可以恢复运行} catch (InterruptedException e) {e.printStackTrace();}}).start();System.out.println(主线程被挂起);LockSupport.park();System.out.println(主线程继续运行);
}这里我们就把公平锁的lock()方法实现讲解完毕了让我猜猜已经晕了对吧越是到源码越考验个人的基础知识掌握基础不牢地动山摇接着我们来看公平锁的tryAcquire()方法
static final class FairSync extends Sync {//可重入独占锁的公平实现ReservedStackAccessprotected final boolean tryAcquire(int acquires) {final Thread current Thread.currentThread(); //先获取当前线程的Thread对象int c getState(); //获取当前AQS对象状态独占模式下0为未占用大于0表示已占用if (c 0) { //如果是0那就表示没有占用现在我们的线程就要来尝试占用它if (!hasQueuedPredecessors() //等待队列是否不为空且当前线程没有拿到锁其实就是看看当前线程有没有必要进行排队如果没必要排队就说明可以直接获取锁compareAndSetState(0, acquires)) { //CAS设置状态如果成功则说明成功拿到了这把锁失败则说明可能这个时候其他线程在争抢并且还比你先抢到setExclusiveOwnerThread(current); //成功拿到锁会将独占模式所有者线程设定为当前线程这个方法是父类AbstractOwnableSynchronizer中的就表示当前这把锁已经是这个线程的了return true; //占用锁成功返回true}}else if (current getExclusiveOwnerThread()) { //如果不是0那就表示被线程占用了这个时候看看是不是自己占用的如果是由于是可重入锁可以继续加锁int nextc c acquires; //多次加锁会将状态值进行增加状态值就是加锁次数if (nextc 0) //加到int值溢出了throw new Error(Maximum lock count exceeded);setState(nextc); //设置为新的加锁次数return true;}return false; //其他任何情况都是加锁失败}
}在了解了公平锁的实现之后是不是感觉有点恍然大悟的感觉虽然整个过程非常复杂但是只要理清思路还是比较简单的。
加锁过程已经OK我们接着来看它的解锁过程unlock()方法是在AQS中实现的
public void unlock() {sync.release(1); //直接调用了AQS中的release方法参数为1表示解锁一次state值-1
}ReservedStackAccess
public final boolean release(int arg) {if (tryRelease(arg)) { //和tryAcquire一样也得子类去重写释放锁操作Node h head; //释放锁成功后获取新的头结点if (h ! null h.waitStatus ! 0) //如果新的头结点不为空并且不是刚刚建立的结点初始状态下status为默认值0而上面在进行了shouldParkAfterFailedAcquire之后会被设定为SIGNAL状态值为-1unparkSuccessor(h); //唤醒头节点下一个节点中的线程return true;}return false;
}private void unparkSuccessor(Node node) {// 将等待状态waitStatus设置为初始值0int ws node.waitStatus;if (ws 0)compareAndSetWaitStatus(node, ws, 0);//获取下一个结点Node s node.next;if (s null || s.waitStatus 0) { //如果下一个结点为空或是等待状态是已取消那肯定是不能通知unpark的这时就要遍历所有节点再另外找一个符合unpark要求的节点了s null;for (Node t tail; t ! null t ! node; t t.prev) //这里是从队尾向前因为enq()方法中的t.next node是在CAS之后进行的而 node.prev t 是CAS之前进行的所以从后往前一定能够保证遍历所有节点if (t.waitStatus 0)s t;}if (s ! null) //要是找到了就直接unpark要是还是没找到那就算了LockSupport.unpark(s.thread);
}那么我们来看看tryRelease()方法是怎么实现的具体实现在Sync中
ReservedStackAccess
protected final boolean tryRelease(int releases) {int c getState() - releases; //先计算本次解锁之后的状态值if (Thread.currentThread() ! getExclusiveOwnerThread()) //因为是独占锁那肯定这把锁得是当前线程持有才行throw new IllegalMonitorStateException(); //否则直接抛异常boolean free false;if (c 0) { //如果解锁之后的值为0表示已经完全释放此锁free true;setExclusiveOwnerThread(null); //将独占锁持有线程设置为null}setState(c); //状态值设定为creturn free; //如果不是0表示此锁还没完全释放返回false是0就返回true
}综上我们来画一个完整的流程图 这里我们只讲解了公平锁有关非公平锁和读写锁还请各位观众根据我们之前的思路自行解读。
公平锁一定公平吗
前面我们讲解了公平锁的实现原理那么我们尝试分析一下在并发的情况下公平锁一定公平吗
我们再次来回顾一下tryAcquire()方法的实现
ReservedStackAccess
protected final boolean tryAcquire(int acquires) {final Thread current Thread.currentThread();int c getState();if (c 0) {if (!hasQueuedPredecessors() //注意这里公平锁的机制是一开始会查看是否有节点处于等待compareAndSetState(0, acquires)) { //如果前面的方法执行后发现没有等待节点就直接进入占锁环节了setExclusiveOwnerThread(current);return true;}}else if (current getExclusiveOwnerThread()) {int nextc c acquires;if (nextc 0)throw new Error(Maximum lock count exceeded);setState(nextc);return true;}return false;
}所以hasQueuedPredecessors()这个环节容不得半点闪失否则会直接破坏掉公平性假如现在出现了这样的情况
线程1已经持有锁了这时线程2来争抢这把锁走到hasQueuedPredecessors()判断出为 false线程2继续运行然后线程2肯定获取锁失败因为锁这时是被线程1占有的因此就进入到等待队列中
private Node enq(final Node node) {for (;;) {Node t tail;if (t null) { // 线程2进来之后肯定是要先走这里的因为head和tail都是nullif (compareAndSetHead(new Node()))tail head; //这里就将tail直接等于head了注意这里完了之后还没完这里只是初始化过程} else {node.prev t;if (compareAndSetTail(t, node)) {t.next node;return t;}}}
}private Node addWaiter(Node mode) {Node node new Node(Thread.currentThread(), mode);Node pred tail;if (pred ! null) { //由于一开始head和tail都是null所以线程2直接就进enq()了node.prev pred;if (compareAndSetTail(pred, node)) {pred.next node;return node;}}enq(node); //请看上面return node;
}而碰巧不巧这个时候线程3也来抢锁了按照正常流程走到了hasQueuedPredecessors()方法而在此方法中
public final boolean hasQueuedPredecessors() {Node t tail; // Read fields in reverse initialization orderNode h head;Node s;//这里直接判断h ! t而此时线程2才刚刚执行完 tail head所以直接就返回false了return h ! t ((s h.next) null || s.thread ! Thread.currentThread());
}因此线程3这时就紧接着准备开始CAS操作了又碰巧这时线程1释放锁了现在的情况就是线程3直接开始CAS判断而线程2还在插入节点状态结果可想而知居然是线程3先拿到了锁这显然是违背了公平锁的公平机制。
一张图就是 因此公不公平全看hasQueuedPredecessors()而此方法只有在等待队列中存在节点时才能保证不会出现问题。所以公平锁只有在等待队列存在节点时才是真正公平的。
Condition实现原理
通过前面的学习我们知道Condition类实际上就是用于代替传统对象的wait/notify操作的同样可以实现等待/通知模式并且同一把锁下可以创建多个Condition对象。那么我们接着来看看它又是如何实现的呢我们先从单个Condition对象进行分析
在AQS中Condition有一个实现类ConditionObject而这里也是使用了链表实现了条件队列
public class ConditionObject implements Condition, java.io.Serializable {private static final long serialVersionUID 1173984872572414699L;/** 条件队列的头结点 */private transient Node firstWaiter;/** 条件队列的尾结点 */private transient Node lastWaiter;//...这里是直接使用了AQS中的Node类但是使用的是Node类中的nextWaiter字段连接节点并且Node的status为CONDITION 我们知道当一个线程调用await()方法时会进入等待状态直到其他线程调用signal()方法将其唤醒而这里的条件队列正是用于存储这些处于等待状态的线程。
我们先来看看最关键的await()方法是如何实现的为了防止一会绕晕在开始之前我们先明确此方法的目标
只有已经持有锁的线程才可以使用此方法当调用此方法后会直接释放锁无论加了多少次锁只有其他线程调用signal()或是被中断时才会唤醒等待中的线程被唤醒后需要等待其他线程释放锁拿到锁之后才可以继续执行并且会恢复到之前的状态await之前加了几层锁唤醒后依然是几层锁
好了差不多可以上源码了
public final void await() throws InterruptedException {if (Thread.interrupted())throw new InterruptedException(); //如果在调用await之前就被添加了中断标记那么会直接抛出中断异常Node node addConditionWaiter(); //为当前线程创建一个新的节点并将其加入到条件队列中int savedState fullyRelease(node); //完全释放当前线程持有的锁并且保存一下state值因为唤醒之后还得恢复int interruptMode 0; //用于保存中断状态while (!isOnSyncQueue(node)) { //循环判断是否位于同步队列中如果等待状态下的线程被其他线程唤醒那么会正常进入到AQS的等待队列中之后我们会讲LockSupport.park(this); //如果依然处于等待状态那么继续挂起if ((interruptMode checkInterruptWhileWaiting(node)) ! 0) //看看等待的时候是不是被中断了break;}//出了循环之后那线程肯定是已经醒了这时就差拿到锁就可以恢复运行了if (acquireQueued(node, savedState) interruptMode ! THROW_IE) //直接开始acquireQueued尝试拿锁之前已经讲过了从这里开始基本就和一个线程去抢锁是一样的了interruptMode REINTERRUPT;//已经拿到锁了基本可以开始继续运行了这里再进行一下后期清理工作if (node.nextWaiter ! null) unlinkCancelledWaiters(); //将等待队列中不是Node.CONDITION状态的节点移除if (interruptMode ! 0) //依然是响应中断reportInterruptAfterWait(interruptMode);//OK接着该干嘛干嘛
}实际上await()方法比较中规中矩大部分操作也在我们的意料之中那么我们接着来看signal()方法是如何实现的同样的为了防止各位绕晕先明确signal的目标
只有持有锁的线程才能唤醒锁所属的Condition等待的线程优先唤醒条件队列中的第一个如果唤醒过程中出现问题接着找往下找直到找到一个可以唤醒的唤醒操作本质上是将条件队列中的结点直接丢进AQS等待队列中让其参与到锁的竞争中拿到锁之后线程才能恢复运行 好了上源码
public final void signal() {if (!isHeldExclusively()) //先看看当前线程是不是持有锁的状态throw new IllegalMonitorStateException(); //不是那你不配唤醒别人Node first firstWaiter; //获取条件队列的第一个结点if (first ! null) //如果队列不为空获取到了那么就可以开始唤醒操作doSignal(first);
}private void doSignal(Node first) {do {if ( (firstWaiter first.nextWaiter) null) //如果当前节点在本轮循环没有后继节点了条件队列就为空了lastWaiter null; //所以这里相当于是直接清空first.nextWaiter null; //将给定节点的下一个结点设置为null因为当前结点马上就会离开条件队列了} while (!transferForSignal(first) //接着往下看(first firstWaiter) ! null); //能走到这里只能说明给定节点被设定为了取消状态那就继续看下一个结点
}final boolean transferForSignal(Node node) {/** 如果这里CAS失败那有可能此节点被设定为了取消状态*/if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))return false;//CAS成功之后结点的等待状态就变成了默认值0接着通过enq方法直接将节点丢进AQS的等待队列中相当于唤醒并且可以等待获取锁了//这里enq方法返回的是加入之后等待队列队尾的前驱节点就是原来的tailNode p enq(node);int ws p.waitStatus; //保存前驱结点的等待状态//如果上一个节点的状态为取消, 或者尝试设置上一个节点的状态为SIGNAL失败可能是在ws0判断完之后马上变成了取消状态导致CAS失败if (ws 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))LockSupport.unpark(node.thread); //直接唤醒线程return true;
}其实最让人不理解的就是倒数第二行明明上面都正常进入到AQS等待队列了应该是可以开始走正常流程了那么这里为什么还要提前来一次unpark呢
这里其实是为了进行优化而编写直接unpark会有两种情况
如果插入结点前AQS等待队列的队尾节点就已经被取消则满足wc 0如果插入node后AQS内部等待队列的队尾节点已经稳定满足tail.waitStatus 0但在执行ws 0之后!compareAndSetWaitStatus(p, ws, Node.SIGNAL)之前被取消则CAS也会失败满足compareAndSetWaitStatus(p, ws, Node.SIGNAL) false
如果这里被提前unpark那么在await()方法中将可以被直接唤醒并跳出while循环直接开始争抢锁因为前一个等待结点是被取消的状态没有必要再等它了。
所以大致流程下 只要把整个流程理清楚还是很好理解的。
自行实现锁类
既然前面了解了那么多AQS的功能那么我就仿照着这些锁类来实现一个简单的锁
要求同一时间只能有一个线程持有锁不要求可重入反复加锁无视即可
public class Main {public static void main(String[] args) throws InterruptedException {}/*** 自行实现一个最普通的独占锁* 要求同一时间只能有一个线程持有锁不要求可重入*/private static class MyLock implements Lock {/*** 设计思路* 1. 锁被占用那么exclusiveOwnerThread应该被记录并且state 1* 2. 锁没有被占用那么exclusiveOwnerThread为null并且state 0*/private static class Sync extends AbstractQueuedSynchronizer {Overrideprotected boolean tryAcquire(int arg) {if(isHeldExclusively()) return true; //无需可重入功能如果是当前线程直接返回trueif(compareAndSetState(0, arg)){ //CAS操作进行状态替换setExclusiveOwnerThread(Thread.currentThread()); //成功后设置当前的所有者线程return true;}return false;}Overrideprotected boolean tryRelease(int arg) {if(getState() 0)throw new IllegalMonitorStateException(); //没加锁情况下是不能直接解锁的if(isHeldExclusively()){ //只有持有锁的线程才能解锁setExclusiveOwnerThread(null); //设置所有者线程为nullsetState(0); //状态变为0return true;}return false;}Overrideprotected boolean isHeldExclusively() {return getExclusiveOwnerThread() Thread.currentThread();}protected Condition newCondition(){return new ConditionObject(); //直接用现成的}}private final Sync sync new Sync();Overridepublic void lock() {sync.acquire(1);}Overridepublic void lockInterruptibly() throws InterruptedException {sync.acquireInterruptibly(1);}Overridepublic boolean tryLock() {return sync.tryAcquire(1);}Overridepublic boolean tryLock(long time, TimeUnit unit) throws InterruptedException {return sync.tryAcquireNanos(1, unit.toNanos(time));}Overridepublic void unlock() {sync.release(1);}Overridepublic Condition newCondition() {return sync.newCondition();}}
}到这里我们对应队列同步器AQS的讲解就先到此为止了当然AQS的全部机制并非仅仅只有我们讲解的内容一些我们没有提到的内容还请各位观众自行探索会有满满的成就感哦~ 原子类
前面我们讲解了锁框架的使用和实现原理虽然比较复杂但是收获还是很多的主要是观摩大佬写的代码这一部分我们就来讲一点轻松的。
前面我们说到如果要保证i的原子性那么我们的唯一选择就是加锁那么除了加锁之外还有没有其他更好的解决方法呢JUC为我们提供了原子类底层采用CAS算法它是一种用法简单、性能高效、线程安全地更新变量的方式。
所有的原子类都位于java.util.concurrent.atomic包下。
原子类介绍
常用基本数据类有对应的原子类封装
AtomicInteger原子更新intAtomicLong原子更新longAtomicBoolean原子更新boolean
那么原子类和普通的基本类在使用上有没有什么区别呢我们先来看正常情况下使用一个基本类型
public class Main {public static void main(String[] args) {int i 1;System.out.println(i);}
}现在我们使用int类型对应的原子类要实现同样的代码该如何编写
public class Main {public static void main(String[] args) {AtomicInteger i new AtomicInteger(1);System.out.println(i.getAndIncrement()); //如果想实现i 2这种操作可以使用 addAndGet() 自由设置delta 值}
}我们可以将int数值封装到此类中注意必须调用构造方法它不像Integer那样有装箱机制并且通过调用此类提供的方法来获取或是对封装的int值进行自增乍一看这不就是基本类型包装类嘛有啥高级的。确实还真有包装类那味但是它可不仅仅是简单的包装它的自增操作是具有原子性的
public class Main {private static AtomicInteger i new AtomicInteger(0);public static void main(String[] args) throws InterruptedException {Runnable r () - {for (int j 0; j 100000; j)i.getAndIncrement();System.out.println(自增完成);};new Thread(r).start();new Thread(r).start();TimeUnit.SECONDS.sleep(1);System.out.println(i.get());}
}同样是直接进行自增操作我们发现使用原子类是可以保证自增操作原子性的就跟我们前面加锁一样。怎么会这么神奇我们来看看它的底层是如何实现的直接从构造方法点进去
private volatile int value;public AtomicInteger(int initialValue) {value initialValue;
}public AtomicInteger() {
}可以看到它的底层是比较简单的其实本质上就是封装了一个volatile类型的int值这样能够保证可见性在CAS操作的时候不会出现问题。
private static final Unsafe unsafe Unsafe.getUnsafe();
private static final long valueOffset;static {try {valueOffset unsafe.objectFieldOffset(AtomicInteger.class.getDeclaredField(value));} catch (Exception ex) { throw new Error(ex); }
}可以看到最上面是和AQS采用了类似的机制因为要使用CAS算法更新value的值所以得先计算出value字段在对象中的偏移地址CAS直接修改对应位置的内存即可可见Unsafe类的作用巨大很多的底层操作都要靠它来完成
接着我们来看自增操作是怎么在运行的
public final int getAndIncrement() {return unsafe.getAndAddInt(this, valueOffset, 1);
}可以看到这里调用了unsafe.getAndAddInt()套娃时间到我们接着看看Unsafe里面写了什么
public final int getAndAddInt(Object o, long offset, int delta) { //delta就是变化的值操作就是自增1int v;do {//volatile版本的getInt()//能够保证可见性v getIntVolatile(o, offset);} while (!compareAndSwapInt(o, offset, v, v delta)); //这里是开始cas替换int的值每次都去拿最新的值去进行替换如果成功则离开循环不成功说明这个时候其他线程先修改了值就进下一次循环再获取最新的值然后再cas一次直到成功为止return v;
}可以看到这是一个do-while循环那么这个循环在做一个什么事情呢感觉就和我们之前讲解的AQS队列中的机制差不多也是采用自旋形式来不断进行CAS操作直到成功。 可见原子类底层也是采用了CAS算法来保证的原子性包括getAndSet、getAndAdd等方法都是这样。原子类也直接提供了CAS操作方法我们可以直接使用
public static void main(String[] args) throws InterruptedException {AtomicInteger integer new AtomicInteger(10);System.out.println(integer.compareAndSet(30, 20));System.out.println(integer.compareAndSet(10, 20));System.out.println(integer);
}如果想以普通变量的方式来设定值那么可以使用lazySet()方法这样就不采用volatile的立即可见机制了。
AtomicInteger integer new AtomicInteger(1);
integer.lazySet(2);除了基本类有原子类以外基本类型的数组类型也有原子类
AtomicIntegerArray原子更新int数组AtomicLongArray原子更新long数组AtomicReferenceArray原子更新引用数组
其实原子数组和原子类型一样的不过我们可以对数组内的元素进行原子操作
public static void main(String[] args) throws InterruptedException {AtomicIntegerArray array new AtomicIntegerArray(new int[]{0, 4, 1, 3, 5});Runnable r () - {for (int i 0; i 100000; i)array.getAndAdd(0, 1);};new Thread(r).start();new Thread(r).start();TimeUnit.SECONDS.sleep(1);System.out.println(array.get(0));
}在JDK8之后新增了DoubleAdder和LongAdder在高并发情况下LongAdder的性能比AtomicLong的性能更好主要体现在自增上它的大致原理如下在低并发情况下和AtomicLong是一样的对value值进行CAS操作但是出现高并发的情况时AtomicLong会进行大量的循环操作来保证同步而LongAdder会将对value值的CAS操作分散为对数组cells中多个元素的CAS操作内部维护一个Cell[] as数组每个Cell里面有一个初始值为0的long型变量在高并发时会进行分散CAS就是不同的线程可以对数组中不同的元素进行CAS自增这样就避免了所有线程都对同一个值进行CAS只需要最后再将结果加起来即可。 使用如下
public static void main(String[] args) throws InterruptedException {LongAdder adder new LongAdder();Runnable r () - {for (int i 0; i 100000; i)adder.add(1);};for (int i 0; i 100; i)new Thread(r).start(); //100个线程TimeUnit.SECONDS.sleep(1);System.out.println(adder.sum()); //最后求和即可
}由于底层源码比较复杂这里就不做讲解了。两者的性能对比这里用到了CountDownLatch建议学完之后再来看
public class Main {public static void main(String[] args) throws InterruptedException {System.out.println(使用AtomicLong的时间消耗test2()ms);System.out.println(使用LongAdder的时间消耗test1()ms);}private static long test1() throws InterruptedException {CountDownLatch latch new CountDownLatch(100);LongAdder adder new LongAdder();long timeStart System.currentTimeMillis();Runnable r () - {for (int i 0; i 100000; i)adder.add(1);latch.countDown();};for (int i 0; i 100; i)new Thread(r).start();latch.await();return System.currentTimeMillis() - timeStart;}private static long test2() throws InterruptedException {CountDownLatch latch new CountDownLatch(100);AtomicLong atomicLong new AtomicLong();long timeStart System.currentTimeMillis();Runnable r () - {for (int i 0; i 100000; i)atomicLong.incrementAndGet();latch.countDown();};for (int i 0; i 100; i)new Thread(r).start();latch.await();return System.currentTimeMillis() - timeStart;}
}除了对基本数据类型支持原子操作外对于引用类型也是可以实现原子操作的
public static void main(String[] args) throws InterruptedException {String a Hello;String b World;AtomicReferenceString reference new AtomicReference(a);reference.compareAndSet(a, b);System.out.println(reference.get());
}JUC还提供了字段原子更新器可以对类中的某个指定字段进行原子操作注意字段必须添加volatile关键字
public class Main {public static void main(String[] args) throws InterruptedException {Student student new Student();AtomicIntegerFieldUpdaterStudent fieldUpdater AtomicIntegerFieldUpdater.newUpdater(Student.class, age);System.out.println(fieldUpdater.incrementAndGet(student));}public static class Student{volatile int age;}
}了解了这么多原子类是不是感觉要实现保证原子性的工作更加轻松了
ABA问题及解决方案
我们来想象一下这种场景 线程1和线程2同时开始对a的值进行CAS修改但是线程1的速度比较快将a的值修改为2之后紧接着又修改回1这时线程2才开始进行判断发现a的值是1所以CAS操作成功。
很明显这里的1已经不是一开始的那个1了而是被重新赋值的1这也是CAS操作存在的问题无锁虽好但是问题多多它只会机械地比较当前值是不是预期值但是并不会关心当前值是否被修改过这种问题称之为ABA问题。
那么如何解决这种ABA问题呢JUC提供了带版本号的引用类型只要每次操作都记录一下版本号并且版本号不会重复那么就可以解决ABA问题了
public static void main(String[] args) throws InterruptedException {String a Hello;String b World;AtomicStampedReferenceString reference new AtomicStampedReference(a, 1); //在构造时需要指定初始值和对应的版本号reference.attemptStamp(a, 2); //可以中途对版本号进行修改注意要填写当前的引用对象System.out.println(reference.compareAndSet(a, b, 2, 3)); //CAS操作时不仅需要提供预期值和修改值还要提供预期版本号和新的版本号
}至此有关原子类的讲解就到这里。 并发容器
简单的讲完了又该讲难一点的了。
**注意**本版块的重点在于探究并发容器是如何利用锁机制和算法实现各种丰富功能的我们会忽略一些常规功能的实现细节比如链表如何插入元素删除元素而更关注并发容器应对并发场景算法上的实现比如在多线程环境下的插入操作是按照什么规则进行的
在单线程模式下集合类提供的容器可以说是非常方便了几乎我们每个项目中都能或多或少的用到它们我们在JavaSE阶段为各位讲解了各个集合类的实现原理我们了解了链表、顺序表、哈希表等数据结构那么在多线程环境下这些数据结构还能正常工作吗
传统容器线程安全吗
我们来测试一下100个线程同时向ArrayList中添加元素会怎么样
public class Main {public static void main(String[] args) {ListString list new ArrayList();Runnable r () - {for (int i 0; i 100; i)list.add(lbwnb);};for (int i 0; i 100; i)new Thread(r).start();TimeUnit.SECONDS.sleep(1);System.out.println(list.size());}
}不出意外的话肯定是会报错的
Exception in thread Thread-0 java.lang.ArrayIndexOutOfBoundsException: 73at java.util.ArrayList.add(ArrayList.java:465)at com.test.Main.lambda$main$0(Main.java:13)at java.lang.Thread.run(Thread.java:750)
Exception in thread Thread-19 java.lang.ArrayIndexOutOfBoundsException: 1851at java.util.ArrayList.add(ArrayList.java:465)at com.test.Main.lambda$main$0(Main.java:13)at java.lang.Thread.run(Thread.java:750)
9773那么我们来看看报的什么错从栈追踪信息可以看出是add方法出现了问题
public boolean add(E e) {ensureCapacityInternal(size 1); // Increments modCount!!elementData[size] e; //这一句出现了数组越界return true;
}也就是说同一时间其他线程也在疯狂向数组中添加元素那么这个时候有可能在ensureCapacityInternal确认容量足够执行之后elementData[size] e;执行之前其他线程插入了元素导致size的值超出了数组容量。这些在单线程的情况下不可能发生的问题在多线程下就慢慢出现了。
我们再来看看比较常用的HashMap呢
public static void main(String[] args) throws InterruptedException {MapInteger, String map new HashMap();for (int i 0; i 100; i) {int finalI i;new Thread(() - {for (int j 0; j 100; j)map.put(finalI * 1000 j, lbwnb);}).start();}TimeUnit.SECONDS.sleep(2);System.out.println(map.size());
}经过测试发现虽然没有报错但是最后的结果并不是我们期望的那样实际上它还有可能导致Entry对象出现环状数据结构引起死循环。
所以在多线程环境下要安全地使用集合类我们得找找解决方案了。
并发容器介绍
怎么才能解决并发情况下的容器问题呢我们首先想到的肯定是给方法前面加个synchronzed关键字这样总不会抢了吧在之前我们可以使用Vector或是Hashtable来解决但是它们的效率实在是太低了完全依靠锁来解决问题因此现在已经很少再使它们了这里也不会再去进行讲解。
JUC提供了专用于并发场景下的容器比如我们刚刚使用的ArrayList在多线程环境下是没办法使用的我们可以将其替换为JUC提供的多线程专用集合类
public static void main(String[] args) throws InterruptedException {ListString list new CopyOnWriteArrayList(); //这里使用CopyOnWriteArrayList来保证线程安全Runnable r () - {for (int i 0; i 100; i)list.add(lbwnb);};for (int i 0; i 100; i)new Thread(r).start();TimeUnit.SECONDS.sleep(1);System.out.println(list.size());
}我们发现使用了CopyOnWriteArrayList之后再没出现过上面的问题。
那么它是如何实现的呢我们先来看看它是如何进行add()操作的
public boolean add(E e) {final ReentrantLock lock this.lock;lock.lock(); //直接加锁保证同一时间只有一个线程进行添加操作try {Object[] elements getArray(); //获取当前存储元素的数组int len elements.length;Object[] newElements Arrays.copyOf(elements, len 1); //直接复制一份数组newElements[len] e; //修改复制出来的数组setArray(newElements); //将元素数组设定为复制出来的数组return true;} finally {lock.unlock();}
}可以看到添加操作是直接上锁并且会先拷贝一份当前存放元素的数组然后对数组进行修改再将此数组替换CopyOnWrite接着我们来看读操作
public E get(int index) {return get(getArray(), index);
}因此CopyOnWriteArrayList对于读操作不加锁而对于写操作是加锁的类似于我们前面讲解的读写锁机制这样就可以保证不丢失读性能的情况下写操作不会出现问题。
接着我们来看对于HashMap的并发容器ConcurrentHashMap
public static void main(String[] args) throws InterruptedException {MapInteger, String map new ConcurrentHashMap();for (int i 0; i 100; i) {int finalI i;new Thread(() - {for (int j 0; j 100; j)map.put(finalI * 100 j, lbwnb);}).start();}TimeUnit.SECONDS.sleep(1);System.out.println(map.size());
}可以看到这里的ConcurrentHashMap就没有出现之前HashMap的问题了。因为线程之间会争抢同一把锁我们之前在讲解LongAdder的时候学习到了一种压力分散思想既然每个线程都想抢锁那我就干脆多搞几把锁让你们每个人都能拿到这样就不会存在等待的问题了而JDK7之前ConcurrentHashMap的原理也比较类似它将所有数据分为一段一段地存储先分很多段出来每一段都给一把锁当一个线程占锁访问时只会占用其中一把锁也就是仅仅锁了一小段数据而其他段的数据依然可以被其他线程正常访问。 这里我们重点讲解JDK8之后它是怎么实现的它采用了CAS算法配合锁机制实现我们先来回顾一下JDK8下的HashMap是什么样的结构 HashMap就是利用了哈希表哈希表的本质其实就是一个用于存放后续节点的头结点的数组数组里面的每一个元素都是一个头结点也可以说就是一个链表当要新插入一个数据时会先计算该数据的哈希值找到数组下标然后创建一个新的节点添加到对应的链表后面。当链表的长度达到8时会自动将链表转换为红黑树这样能使得原有的查询效率大幅度降低当使用红黑树之后我们就可以利用二分搜索的思想快速地去寻找我们想要的结果而不是像链表一样挨个去看。
又是基础不牢地动山摇环节由于ConcurrentHashMap的源码比较复杂所以我们先从最简单的构造方法开始下手 我们发现它的构造方法和HashMap的构造方法有很大的出入但是大体的结构和HashMap是差不多的也是维护了一个哈希表并且哈希表中存放的是链表或是红黑树所以我们直接来看put()操作是如何实现的只要看明白这个基本上就懂了
public V put(K key, V value) {return putVal(key, value, false);
}//有点小乱如果看着太乱可以在IDEA中折叠一下代码块不然有点难受
final V putVal(K key, V value, boolean onlyIfAbsent) {if (key null || value null) throw new NullPointerException(); //键值不能为空基操int hash spread(key.hashCode()); //计算键的hash值用于确定在哈希表中的位置int binCount 0; //一会用来记录链表长度的忽略for (NodeK,V[] tab table;;) { //无限循环而且还是并发包中的类盲猜一波CAS自旋锁NodeK,V f; int n, i, fh;if (tab null || (n tab.length) 0)tab initTable(); //如果数组哈希表为空肯定是要进行初始化的然后再重新进下一轮循环else if ((f tabAt(tab, i (n - 1) hash)) null) { //如果哈希表该位置为null直接CAS插入结点作为头结即可注意这里会将f设置当前哈希表位置上的头结点if (casTabAt(tab, i, null,new NodeK,V(hash, key, value, null))) break; // 如果CAS成功直接break结束put方法失败那就继续下一轮循环} else if ((fh f.hash) MOVED) //头结点哈希值为-1这里只需要知道是因为正在扩容即可tab helpTransfer(tab, f); //帮助进行迁移完事之后再来下一次循环else { //特殊情况都完了这里就该是正常情况了V oldVal null;synchronized (f) { //在前面的循环中f肯定是被设定为了哈希表某个位置上的头结点这里直接把它作为锁加锁了防止同一时间其他线程也在操作哈希表中这个位置上的链表或是红黑树if (tabAt(tab, i) f) {if (fh 0) { //头结点的哈希值大于等于0说明是链表下面就是针对链表的一些列操作...实现细节略} else if (f instanceof TreeBin) { //肯定不大于0肯定也不是-1还判断是不是TreeBin所以不用猜了肯定是红黑树下面就是针对红黑树的情况进行操作//在ConcurrentHashMap并不是直接存储的TreeNode而是TreeBin...实现细节略}}}//根据链表长度决定是否要进化为红黑树if (binCount ! 0) {if (binCount TREEIFY_THRESHOLD)treeifyBin(tab, i); //注意这里只是可能会进化为红黑树如果当前哈希表的长度小于64它会优先考虑对哈希表进行扩容if (oldVal ! null)return oldVal;break;}}}addCount(1L, binCount);return null;
}怎么样是不是感觉看着挺复杂其实也还好总结一下就是 我们接着来看看get()操作
public V get(Object key) {NodeK,V[] tab; NodeK,V e, p; int n, eh; K ek;int h spread(key.hashCode()); //计算哈希值if ((tab table) ! null (n tab.length) 0 (e tabAt(tab, (n - 1) h)) ! null) {// 如果头结点就是我们要找的那直接返回值就行了if ((eh e.hash) h) {if ((ek e.key) key || (ek ! null key.equals(ek)))return e.val;}//要么是正在扩容要么就是红黑树负数只有这两种情况else if (eh 0)return (p e.find(h, key)) ! null ? p.val : null;//确认无误肯定在列表里开找while ((e e.next) ! null) {if (e.hash h ((ek e.key) key || (ek ! null key.equals(ek))))return e.val;}}//没找到只能null了return null;
}综上ConcurrentHashMap的put操作实际上是对哈希表上的所有头结点元素分别加锁理论上来说哈希表的长度很大程度上决定了ConcurrentHashMap在同一时间能够处理的线程数量这也是为什么treeifyBin()会优先考虑为哈希表进行扩容的原因。显然这种加锁方式比JDK7的分段锁机制性能更好。
其实这里也只是简单地介绍了一下它的运行机制ConcurrentHashMap真正的难点在于扩容和迁移操作我们主要了解的是他的并发执行机制有关它的其他实现细节这里暂时不进行讲解。
阻塞队列
除了我们常用的容器类之外JUC还提供了各种各样的阻塞队列用于不同的工作场景。
阻塞队列本身也是队列但是它是适用于多线程环境下的基于ReentrantLock实现的它的接口定义如下
public interface BlockingQueueE extends QueueE {boolean add(E e);//入队如果队列已满返回false否则返回true非阻塞boolean offer(E e);//入队如果队列已满阻塞线程直到能入队为止void put(E e) throws InterruptedException;//入队如果队列已满阻塞线程直到能入队或超时、中断为止入队成功返回true否则falseboolean offer(E e, long timeout, TimeUnit unit)throws InterruptedException;//出队如果队列为空阻塞线程直到能出队为止E take() throws InterruptedException;//出队如果队列为空阻塞线程直到能出队超时、中断为止出队成功正常返回否则返回nullE poll(long timeout, TimeUnit unit)throws InterruptedException;//返回此队列理想情况下在没有内存或资源限制的情况下可以不阻塞地入队的数量如果没有限制则返回 Integer.MAX_VALUEint remainingCapacity();boolean remove(Object o);public boolean contains(Object o);//一次性从BlockingQueue中获取所有可用的数据对象还可以指定获取数据的个数int drainTo(Collection? super E c);int drainTo(Collection? super E c, int maxElements);比如现在有一个容量为3的阻塞队列这个时候一个线程put向其添加了三个元素第二个线程接着put向其添加三个元素那么这个时候由于容量已满会直接被阻塞而这时第三个线程从队列中取走2个元素线程二停止阻塞先丢两个进去还有一个还是进不去所以说继续阻塞。 利用阻塞队列我们可以轻松地实现消费者和生产者模式还记得我们在JavaSE中的实战吗 所谓的生产者消费者模型是通过一个容器来解决生产者和消费者的强耦合问题。通俗的讲就是生产者在不断的生产消费者也在不断的消费可是消费者消费的产品是生产者生产的这就必然存在一个中间容器我们可以把这个容器想象成是一个货架当货架空的时候生产者要生产产品此时消费者在等待生产者往货架上生产产品而当货架有货物的时候消费者可以从货架上拿走商品生产者此时等待货架出现空位进而补货这样不断的循环。 通过多线程编程来模拟一个餐厅的2个厨师和3个顾客假设厨师炒出一个菜的时间为3秒顾客吃掉菜品的时间为4秒窗口上只能放一个菜。
我们来看看使用阻塞队列如何实现这里我们就使用ArrayBlockingQueue实现类
public class Main {public static void main(String[] args) throws InterruptedException {BlockingQueueObject queue new ArrayBlockingQueue(1);Runnable supplier () - {while (true){try {String name Thread.currentThread().getName();System.err.println(time()生产者 name 正在准备餐品...);TimeUnit.SECONDS.sleep(3);System.err.println(time()生产者 name 已出餐);queue.put(new Object());} catch (InterruptedException e) {e.printStackTrace();break;}}};Runnable consumer () - {while (true){try {String name Thread.currentThread().getName();System.out.println(time()消费者 name 正在等待出餐...);queue.take();System.out.println(time()消费者 name 取到了餐品。);TimeUnit.SECONDS.sleep(4);System.out.println(time()消费者 name 已经将饭菜吃完了);} catch (InterruptedException e) {e.printStackTrace();break;}}};for (int i 0; i 2; i) new Thread(supplier, Supplier-i).start();for (int i 0; i 3; i) new Thread(consumer, Consumer-i).start();}private static String time(){SimpleDateFormat format new SimpleDateFormat(HH:mm:ss);return [format.format(new Date()) ] ;}
}可以看到阻塞队列在多线程环境下的作用是非常明显的算上ArrayBlockingQueue一共有三种常用的阻塞队列
ArrayBlockingQueue有界带缓冲阻塞队列就是队列是有容量限制的装满了肯定是不能再装的只能阻塞数组实现SynchronousQueue无缓冲阻塞队列相当于没有容量的ArrayBlockingQueue因此只有阻塞的情况LinkedBlockingQueue无界带缓冲阻塞队列没有容量限制也可以限制容量也会阻塞链表实现
这里我们以ArrayBlockingQueue为例进行源码解读我们先来看看构造方法
final ReentrantLock lock;private final Condition notEmpty;private final Condition notFull;public ArrayBlockingQueue(int capacity, boolean fair) {if (capacity 0)throw new IllegalArgumentException();this.items new Object[capacity];lock new ReentrantLock(fair); //底层采用锁机制保证线程安全性这里我们可以选择使用公平锁或是非公平锁notEmpty lock.newCondition(); //这里创建了两个Condition都属于lock一会用于入队和出队的线程阻塞控制notFull lock.newCondition();
}接着我们来看put和offer方法是如何实现的
public boolean offer(E e) {checkNotNull(e);final ReentrantLock lock this.lock; //可以看到这里也是使用了类里面的ReentrantLock进行加锁操作lock.lock(); //保证同一时间只有一个线程进入try {if (count items.length) //直接看看队列是否已满如果没满则直接入队如果已满则返回falsereturn false;else {enqueue(e);return true;}} finally {lock.unlock();}
}public void put(E e) throws InterruptedException {checkNotNull(e);final ReentrantLock lock this.lock; //同样的需要进行加锁操作lock.lockInterruptibly(); //注意这里是可以响应中断的try {while (count items.length)notFull.await(); //可以看到当队列已满时会直接挂起当前线程在其他线程出队操作时会被唤醒enqueue(e); //直到队列有空位才将线程入队} finally {lock.unlock();}
}private E dequeue() {// assert lock.getHoldCount() 1;// assert items[takeIndex] ! null;final Object[] items this.items;SuppressWarnings(unchecked)E x (E) items[takeIndex];items[takeIndex] null;if (takeIndex items.length)takeIndex 0;count--;if (itrs ! null)itrs.elementDequeued();notFull.signal(); //出队操作会调用notFull的signal方法唤醒被挂起处于等待状态的线程return x;
}接着我们来看出队操作
public E poll() {final ReentrantLock lock this.lock;lock.lock(); //出队同样进行加锁操作保证同一时间只能有一个线程执行try {return (count 0) ? null : dequeue(); //如果队列不为空则出队否则返回null} finally {lock.unlock();}
}public E take() throws InterruptedException {final ReentrantLock lock this.lock;lock.lockInterruptibly(); //可以响应中断进行加锁try {while (count 0)notEmpty.await(); //和入队相反也是一直等直到队列中有元素之后才可以出队在入队时会唤醒此线程return dequeue();} finally {lock.unlock();}
}private void enqueue(E x) {// assert lock.getHoldCount() 1;// assert items[putIndex] null;final Object[] items this.items;items[putIndex] x;if (putIndex items.length)putIndex 0;count;notEmpty.signal(); //对notEmpty的signal唤醒操作
}可见如果各位对锁的使用非常熟悉的话那么在阅读这些源码的时候就会非常轻松了。
接着我们来看一个比较特殊的队列SynchronousQueue它没有任何容量也就是说正常情况下出队必须和入队操作成对出现我们先来看它的内部可以看到内部有一个抽象类Transferer它定义了一个transfer方法
abstract static class TransfererE {/*** 可以是put也可以是take操作** param e 如果不是空即作为生产者那么表示会将传入参数元素e交给消费者* 如果为空即作为消费者那么表示会从生产者那里得到一个元素e并返回* param 是否可以超时* param 超时时间* return 不为空就是从生产者那里返回的为空表示要么被中断要么超时。*/abstract E transfer(E e, boolean timed, long nanos);
}乍一看有点迷惑难不成还要靠这玩意去实现put和take操作吗实际上它是直接以生产者消费者模式进行的由于不需要依靠任何容器结构来暂时存放数据所以我们可以直接通过transfer方法来对生产者和消费者之间的数据进行传递。
比如一个线程put一个新的元素进入这时如果没有其他线程调用take方法获取元素那么会持续被阻塞直到有线程取出元素而transfer正是需要等生产者消费者双方都到齐了才能进行交接工作单独只有其中一方都需要进行等待。
public void put(E e) throws InterruptedException {if (e null) throw new NullPointerException(); //判空if (transferer.transfer(e, false, 0) null) { //直接使用transfer方法进行数据传递Thread.interrupted(); //为空表示要么被中断要么超时throw new InterruptedException();}
}它在公平和非公平模式下有两个实现这里我们来看公平模式下的SynchronousQueue是如何实现的
static final class TransferQueueE extends TransfererE {//头结点头结点仅作为头结点后续节点才是真正等待的线程节点transient volatile QNode head;//尾结点transient volatile QNode tail;/** 节点有生产者和消费者角色之分 */static final class QNode {volatile QNode next; // 后继节点volatile Object item; // 存储的元素volatile Thread waiter; // 处于等待的线程和之前的AQS一样的思路每个线程等待的时候都会被封装为节点final boolean isData; // 是生产者节点还是消费者节点公平模式下Transferer的实现是TransferQueue是以先进先出的规则的进行的内部有一个QNode类来保存等待的线程。
好了我们直接上transfer()方法的实现这里再次提醒各位多线程环境下的源码分析和单线程的分析不同我们需要时刻关注当前代码块的加锁状态如果没有加锁一定要具有多线程可能会同时运行的意识这个意识在以后你自己处理多线程问题伴随着你才能保证你的思路在多线程环境下是正确的
E transfer(E e, boolean timed, long nanos) { //注意这里面没加锁肯定会多个线程之间竞争QNode s null;boolean isData (e ! null); //e为空表示消费者不为空表示生产者for (;;) {QNode t tail;QNode h head;if (t null || h null) // 头结点尾结点任意为空但是在构造的时候就已经不是空了continue; // 自旋if (h t || t.isData isData) { // 头结点等于尾结点表示队列中只有一个头结点肯定是空或者尾结点角色和当前节点一样这两种情况下都需要进行入队操作QNode tn t.next;if (t ! tail) // 如果这段时间内t被其他线程修改了如果是就进下一轮循环重新来continue;if (tn ! null) { // 继续校验是否为队尾如果tn不为null那肯定是其他线程改了队尾可以进下一轮循环重新来了advanceTail(t, tn); // CAS将新的队尾节点设置为tn成不成功都无所谓反正这一轮肯定没戏了continue;}if (timed nanos 0) // 超时返回nullreturn null;if (s null)s new QNode(e, isData); //构造当前结点准备加入等待队列if (!t.casNext(null, s)) // CAS添加当前节点为尾结点的下一个如果失败肯定其他线程又抢先做了直接进下一轮循环重新来continue;advanceTail(t, s); // 上面的操作基本OK了那么新的队尾元素就修改为sObject x awaitFulfill(s, e, timed, nanos); //开始等待s所对应的消费者或是生产者进行交接比如s现在是生产者那么它就需要等到一个消费者的到来才会继续这个方法会先进行自旋等待匹配如果自旋一定次数后还是没有匹配成功那么就挂起if (x s) { // 如果返回s本身说明等待状态下被取消clean(t, s);return null;}if (!s.isOffList()) { // 如果s操作完成之后没有离开队列那么这里将其手动丢弃advanceHead(t, s); // 将s设定为新的首节点(注意头节点仅作为头结点并非处于等待的线程节点)if (x ! null) // 删除s内的其他信息s.item s;s.waiter null;}return (x ! null) ? (E)x : e; //假如当前是消费者直接返回x即可x就是从生产者那里拿来的元素} else { // 这种情况下就是与队列中结点类型匹配的情况了注意队列要么为空要么只会存在一种类型的节点因为一旦出现不同类型的节点马上会被交接掉QNode m h.next; // 获取头结点的下一个接口准备进行交接工作if (t ! tail || m null || h ! head)continue; // 判断其他线程是否先修改如果修改过那么开下一轮Object x m.item;if (isData (x ! null) || // 判断节点类型如果是相同的操作那肯定也是有问题的x m || // 或是当前操作被取消!m.casItem(x, e)) { // 上面都不是那么最后再进行CAS替换m中的元素成功表示交接成功失败就老老实实重开吧advanceHead(h, m); // dequeue and retrycontinue;}advanceHead(h, m); // 成功交接新的头结点可以改为m了原有的头结点直接不要了LockSupport.unpark(m.waiter); // m中的等待交接的线程可以继续了已经交接完成return (x ! null) ? (E)x : e; // 同上该返回什么就返回什么}}
}所以总结为以下流程 对于非公平模式下的SynchronousQueue则是采用的栈结构来存储等待节点但是思路也是与这里的一致需要等待并进行匹配操作各位如果感兴趣可以继续了解一下非公平模式下的SynchronousQueue实现。
在JDK7的时候基于SynchronousQueue产生了一个更强大的TransferQueue它保留了SynchronousQueue的匹配交接机制并且与等待队列进行融合。
我们知道SynchronousQueue并没有使用锁而是采用CAS操作保证生产者与消费者的协调但是它没有容量而LinkedBlockingQueue虽然是有容量且无界的但是内部基本都是基于锁实现的性能并不是很好这时我们就可以将它们各自的优点单独拿出来揉在一起就成了性能更高的LinkedTransferQueue
public static void main(String[] args) throws InterruptedException {LinkedTransferQueueString queue new LinkedTransferQueue();queue.put(1); //插入时会先检查是否有其他线程等待获取如果是直接进行交接否则插入到存储队列中queue.put(2); //不会像SynchronousQueue那样必须等一个匹配的才可以queue.forEach(System.out::println); //直接打印所有的元素这在SynchronousQueue下只能是空因为单独的入队或出队操作都会被阻塞
}相比 SynchronousQueue 它多了一个可以存储的队列我们依然可以像阻塞队列那样获取队列中所有元素的值简单来说LinkedTransferQueue其实就是一个多了存储队列的SynchronousQueue。
接着我们来了解一些其他的队列
PriorityBlockingQueue - 是一个支持优先级的阻塞队列元素的获取顺序按优先级决定。DelayQueue - 它能够实现延迟获取元素同样支持优先级。
我们先来看优先级阻塞队列
public static void main(String[] args) throws InterruptedException {PriorityBlockingQueueInteger queue new PriorityBlockingQueue(10, Integer::compare); //可以指定初始容量可扩容和优先级比较规则这里我们使用升序queue.add(3);queue.add(1);queue.add(2);System.out.println(queue); //注意保存顺序并不会按照优先级排列所以可以看到结果并不是排序后的结果System.out.println(queue.poll()); //但是出队顺序一定是按照优先级进行的System.out.println(queue.poll());System.out.println(queue.poll());
}我们的重点是DelayQueue它能实现延时出队也就是说当一个元素插入后如果没有超过一定时间那么是无法让此元素出队的。
public class DelayQueueE extends Delayed extends AbstractQueueEimplements BlockingQueueE {可以看到此类只接受Delayed的实现类作为元素
public interface Delayed extends ComparableDelayed { //注意这里继承了Comparable它支持优先级//获取剩余等待时间正数表示还需要进行等待0或负数表示等待结束long getDelay(TimeUnit unit);
}这里我们手动实现一个
private static class Test implements Delayed {private final long time; //延迟时间这里以毫秒为单位private final int priority;private final long startTime;private final String data;private Test(long time, int priority, String data) {this.time TimeUnit.SECONDS.toMillis(time); //秒转换为毫秒this.priority priority;this.startTime System.currentTimeMillis(); //这里我们以毫秒为单位this.data data;}Overridepublic long getDelay(TimeUnit unit) {long leftTime time - (System.currentTimeMillis() - startTime); //计算剩余时间 设定时间 - 已度过时间( 当前时间 - 开始时间)return unit.convert(leftTime, TimeUnit.MILLISECONDS); //注意进行单位转换单位由队列指定默认是纳秒单位}Overridepublic int compareTo(Delayed o) {if(o instanceof Test)return priority - ((Test) o).priority; //优先级越小越优先return 0;}Overridepublic String toString() {return data;}
}接着我们在主方法中尝试使用
public static void main(String[] args) throws InterruptedException {DelayQueueTest queue new DelayQueue();queue.add(new Test(1, 2, 2号)); //1秒钟延时queue.add(new Test(3, 1, 1号)); //1秒钟延时优先级最高System.out.println(queue.take()); //注意出队顺序是依照优先级来的即使一个元素已经可以出队了依然需要等待优先级更高的元素到期System.out.println(queue.take());
}我们来研究一下DelayQueue是如何实现的首先来看add()方法
public boolean add(E e) {return offer(e);
}public boolean offer(E e) {final ReentrantLock lock this.lock;lock.lock();try {q.offer(e); //注意这里是向内部维护的一个优先级队列添加元素并不是DelayQueue本身存储元素if (q.peek() e) { //如果入队后队首就是当前元素那么直接进行一次唤醒操作因为有可能之前就有其他线程等着take了leader null;available.signal();}return true;} finally {lock.unlock();}
}public void put(E e) {offer(e);
}可以看到无论是哪种入队操作都会加锁进行属于常规操作。我们接着来看take()方法
public E take() throws InterruptedException {final ReentrantLock lock this.lock; //出队也要先加锁基操lock.lockInterruptibly();try {for (;;) { //无限循环常规操作E first q.peek(); //获取队首元素if (first null) //如果为空那肯定队列为空先等着吧等有元素进来available.await();else {long delay first.getDelay(NANOSECONDS); //获取延迟这里传入的时间单位是纳秒if (delay 0)return q.poll(); //如果获取到延迟时间已经小于0了那说明ok可以直接出队返回first null;if (leader ! null) //这里用leader来减少不必要的等待时间如果不是null那说明有线程在等待为null说明没有线程等待available.await(); //如果其他线程已经在等元素了那么当前线程直接进永久等待状态else {Thread thisThread Thread.currentThread();leader thisThread; //没有线程等待就将leader设定为当前线程try {available.awaitNanos(delay); //获取到的延迟大于0那么就需要等待延迟时间再开始下一次获取} finally {if (leader thisThread)leader null;}}}}} finally {if (leader null q.peek() ! null)available.signal(); //当前take结束之后唤醒一个其他永久等待状态下的线程lock.unlock(); //解锁完事}
}到此有关并发容器的讲解就到这里。
下一章我们会继续讲解线程池以及并发工具类。
并发编程进阶
欢迎来到JUC学习的最后一章王炸当然是放在最后了。
线程池
在我们的程序中多多少少都会用到多线程技术而我们以往都是使用Thread类来创建一个新的线程
public static void main(String[] args) {Thread t new Thread(() - System.out.println(Hello World!));t.start();
}利用多线程我们的程序可以更加合理地使用CPU多核心资源在同一时间完成更多的工作。但是如果我们的程序频繁地创建线程由于线程的创建和销毁也需要占用系统资源因此这样会降低我们整个程序的性能那么怎么做才能更高效地使用多线程呢
我们其实可以将已创建的线程复用利用池化技术就像数据库连接池一样我们也可以创建很多个线程然后反复地使用这些线程而不对它们进行销毁。
虽然听起来这个想法比较新颖但是实际上线程池早已利用到各个地方比如我们的Tomcat服务器要在同一时间接受和处理大量的请求那么就必须要在短时间内创建大量的线程结束后又进行销毁这显然会导致很大的开销因此这种情况下使用线程池显然是更好的解决方案。
由于线程池可以反复利用已有线程执行多线程操作所以它一般是有容量限制的当所有的线程都处于工作状态时那么新的多线程请求会被阻塞直到有一个线程空闲出来为止实际上这里就会用到我们之前讲解的阻塞队列。
所以我们可以暂时得到下面一个样子 当然JUC提供的线程池肯定没有这么简单接下来就让我们深入进行了解。
线程池的使用
我们可以直接创建一个新的线程池对象它已经提前帮助我们实现好了线程的调度机制我们先来看它的构造方法
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueueRunnable workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {if (corePoolSize 0 ||maximumPoolSize 0 ||maximumPoolSize corePoolSize ||keepAliveTime 0)throw new IllegalArgumentException();if (workQueue null || threadFactory null || handler null)throw new NullPointerException();this.acc System.getSecurityManager() null ?null :AccessController.getContext();this.corePoolSize corePoolSize;this.maximumPoolSize maximumPoolSize;this.workQueue workQueue;this.keepAliveTime unit.toNanos(keepAliveTime);this.threadFactory threadFactory;this.handler handler;
}参数稍微有一点多这里我们依次进行讲解
corePoolSize核心线程池大小我们每向线程池提交一个多线程任务时都会创建一个新的核心线程无论是否存在其他空闲线程直到到达核心线程池大小为止之后会尝试复用线程资源。当然也可以在一开始就全部初始化好调用 prestartAllCoreThreads()即可。maximumPoolSize最大线程池大小当目前线程池中所有的线程都处于运行状态并且等待队列已满那么就会直接尝试继续创建新的非核心线程运行但是不能超过最大线程池大小。keepAliveTime线程最大空闲时间当一个非核心线程空闲超过一定时间会自动销毁。unit线程最大空闲时间的时间单位workQueue线程等待队列当线程池中核心线程数已满时就会将任务暂时存到等待队列中直到有线程资源可用为止这里可以使用我们上一章学到的阻塞队列。threadFactory线程创建工厂我们可以干涉线程池中线程的创建过程进行自定义。handler拒绝策略当等待队列和线程池都没有空间了真的不能再来新的任务时来了个新的多线程任务那么只能拒绝了这时就会根据当前设定的拒绝策略进行处理。
最为重要的就是线程池大小的限定了这个也是很有学问的合理地分配大小会使得线程池的执行效率事半功倍
首先我们可以分析一下线程池执行任务的特性是CPU 密集型还是 IO 密集型 **CPU密集型**主要是执行计算任务响应时间很快CPU一直在运行这种任务CPU的利用率很高那么线程数应该是根据 CPU 核心数来决定CPU 核心数 最大同时执行线程数以 i5-9400F 处理器为例CPU 核心数为 6那么最多就能同时执行 6 个线程。**IO密集型**主要是进行 IO 操作因为执行 IO 操作的时间比较较长比如从硬盘读取数据之类的CPU就得等着IO操作很容易出现空闲状态导致 CPU 的利用率不高这种情况下可以适当增加线程池的大小让更多的线程可以一起进行IO操作一般可以配置为CPU核心数的2倍。
这里我们手动创建一个新的线程池看看效果
public static void main(String[] args) throws InterruptedException {ThreadPoolExecutor executor new ThreadPoolExecutor(2, 4, //2个核心线程最大线程数为4个3, TimeUnit.SECONDS, //最大空闲时间为3秒钟new ArrayBlockingQueue(2)); //这里使用容量为2的ArrayBlockingQueue队列for (int i 0; i 6; i) { //开始6个任务int finalI i;executor.execute(() - {try {System.out.println(Thread.currentThread().getName() 开始执行 finalI);TimeUnit.SECONDS.sleep(1);System.out.println(Thread.currentThread().getName() 已结束finalI);} catch (InterruptedException e) {e.printStackTrace();}});}TimeUnit.SECONDS.sleep(1); //看看当前线程池中的线程数量System.out.println(线程池中线程数量executor.getPoolSize());TimeUnit.SECONDS.sleep(5); //等到超过空闲时间System.out.println(线程池中线程数量executor.getPoolSize());executor.shutdownNow(); //使用完线程池记得关闭不然程序不会结束它会取消所有等待中的任务以及试图中断正在执行的任务关闭后无法再提交任务一律拒绝//executor.shutdown(); 同样可以关闭但是会执行完等待队列中的任务再关闭
}这里我们创建了一个核心容量为2最大容量为4等待队列长度为2空闲时间为3秒的线程池现在我们向其中执行6个任务每个任务都会进行1秒钟休眠那么当线程池中2个核心线程都被占用时还有4个线程就只能进入到等待队列中了但是等待队列中只有2个容量这时紧接着的2个任务线程池将直接尝试创建线程由于不大于最大容量因此可以成功创建。最后所有线程完成之后在等待5秒后超过了线程池的最大空闲时间非核心线程被回收了所以线程池中只有2个线程存在。
那么要是等待队列设定为没有容量的SynchronousQueue呢这个时候会发生什么
pool-1-thread-1 开始执行0
pool-1-thread-4 开始执行3
pool-1-thread-3 开始执行2
pool-1-thread-2 开始执行1
Exception in thread main java.util.concurrent.RejectedExecutionException: Task com.test.Main$$Lambda$1/1283928880682a0b20 rejected from java.util.concurrent.ThreadPoolExecutor3d075dc0[Running, pool size 4, active threads 4, queued tasks 0, completed tasks 0]at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)at com.test.Main.main(Main.java:15)可以看到前4个任务都可以正常执行但是到第五个任务时直接抛出了异常这其实就是因为等待队列的容量为0相当于没有容量那么这个时候就只能拒绝任务了拒绝的操作会根据拒绝策略决定。
线程池的拒绝策略默认有以下几个
AbortPolicy(默认)像上面一样直接抛异常。CallerRunsPolicy直接让提交任务的线程运行这个任务比如在主线程向线程池提交了任务那么就直接由主线程执行。DiscardOldestPolicy丢弃队列中最近的一个任务替换为当前任务。DiscardPolicy什么也不用做。
这里我们进行一下测试
public static void main(String[] args) throws InterruptedException {ThreadPoolExecutor executor new ThreadPoolExecutor(2, 4,3, TimeUnit.SECONDS,new SynchronousQueue(),new ThreadPoolExecutor.CallerRunsPolicy()); //使用另一个构造方法最后一个参数传入策略比如这里我们使用了CallerRunsPolicy策略CallerRunsPolicy策略是谁提交的谁自己执行所以
pool-1-thread-1 开始执行0
pool-1-thread-2 开始执行1
main 开始执行4
pool-1-thread-4 开始执行3
pool-1-thread-3 开始执行2
pool-1-thread-3 已结束2
pool-1-thread-2 已结束1
pool-1-thread-1 已结束0
main 已结束4
pool-1-thread-4 已结束3
pool-1-thread-1 开始执行5
pool-1-thread-1 已结束5
线程池中线程数量4
线程池中线程数量2可以看到当队列塞不下时直接在主线程运行任务运行完之后再继续向下执行。
我们吧策略修改为DiscardOldestPolicy试试看
public static void main(String[] args) throws InterruptedException {ThreadPoolExecutor executor new ThreadPoolExecutor(2, 4,3, TimeUnit.SECONDS,new ArrayBlockingQueue(1), //这里设置为ArrayBlockingQueue长度为1new ThreadPoolExecutor.DiscardOldestPolicy()); 它会移除等待队列中的最近的一个任务所以可以看到有一个任务实际上是被抛弃了的
pool-1-thread-1 开始执行0
pool-1-thread-4 开始执行4
pool-1-thread-3 开始执行3
pool-1-thread-2 开始执行1
pool-1-thread-1 已结束0
pool-1-thread-4 已结束4
pool-1-thread-1 开始执行5
线程池中线程数量4
pool-1-thread-3 已结束3
pool-1-thread-2 已结束1
pool-1-thread-1 已结束5
线程池中线程数量2比较有意思的是如果选择没有容量的SynchronousQueue作为等待队列会爆栈
pool-1-thread-1 开始执行0
pool-1-thread-3 开始执行2
pool-1-thread-2 开始执行1
pool-1-thread-4 开始执行3
Exception in thread main java.lang.StackOverflowErrorat java.util.concurrent.SynchronousQueue.offer(SynchronousQueue.java:912)at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1371) ...
pool-1-thread-1 已结束0
pool-1-thread-2 已结束1
pool-1-thread-4 已结束3
pool-1-thread-3 已结束2这是为什么呢我们来看看这个拒绝策略的源码
public static class DiscardOldestPolicy implements RejectedExecutionHandler {public DiscardOldestPolicy() { }public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {if (!e.isShutdown()) {e.getQueue().poll(); //会先执行一次出队操作但是这对于SynchronousQueue来说毫无意义e.execute(r); //这里会再次调用execute方法}}
}可以看到它会先对等待队列进行出队操作但是由于SynchronousQueue压根没容量所有这个操作毫无意义然后就会递归执行execute方法而进入之后又发现没有容量不能插入于是又重复上面的操作这样就会无限的递归下去最后就爆栈了。
当然除了使用官方提供的4种策略之外我们还可以使用自定义的策略
public static void main(String[] args) throws InterruptedException {ThreadPoolExecutor executor new ThreadPoolExecutor(2, 4,3, TimeUnit.SECONDS,new SynchronousQueue(),(r, executor1) - { //比如这里我们也来实现一个就在当前线程执行的策略System.out.println(哎呀线程池和等待队列都满了你自己耗子尾汁吧);r.run(); //直接运行});接着我们来看线程创建工厂我们可以自己决定如何创建新的线程
public static void main(String[] args) throws InterruptedException {ThreadPoolExecutor executor new ThreadPoolExecutor(2, 4,3, TimeUnit.SECONDS,new SynchronousQueue(),new ThreadFactory() {int counter 0;Overridepublic Thread newThread(Runnable r) {return new Thread(r, 我的自定义线程-counter);}});for (int i 0; i 4; i) {executor.execute(() - System.out.println(Thread.currentThread().getName() 开始执行));}
}这里传入的Runnable对象就是我们提交的任务可以看到需要我们返回一个Thread对象其实就是线程池创建线程的过程而如何创建这个对象以及它的一些属性就都由我们来决定。
各位有没有想过这样一个情况如果我们的任务在运行过程中出现异常了那么是不是会导致线程池中的线程被销毁呢
public static void main(String[] args) throws InterruptedException {ThreadPoolExecutor executor new ThreadPoolExecutor(1, 1, //最大容量和核心容量锁定为10, TimeUnit.MILLISECONDS, new LinkedBlockingDeque());executor.execute(() - {System.out.println(Thread.currentThread().getName());throw new RuntimeException(我是异常);});TimeUnit.SECONDS.sleep(1);executor.execute(() - {System.out.println(Thread.currentThread().getName());});
}可以看到出现异常之后再次提交新的任务执行的线程是一个新的线程了。
除了我们自己创建线程池之外官方也提供了很多的线程池定义我们可以使用Executors工具类来快速创建线程池
public static void main(String[] args) throws InterruptedException {ExecutorService executor Executors.newFixedThreadPool(2); //直接创建一个固定容量的线程池
}可以看到它的内部实现为
public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueueRunnable());
}这里直接将最大线程和核心线程数量设定为一样的并且等待时间为0因为压根不需要并且采用的是一个无界的LinkedBlockingQueue作为等待队列。
使用newSingleThreadExecutor来创建只有一个线程的线程池
public static void main(String[] args) throws InterruptedException {ExecutorService executor Executors.newSingleThreadExecutor();//创建一个只有一个线程的线程池
}原理如下
public static ExecutorService newSingleThreadExecutor() {return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueueRunnable()));
}可以看到这里并不是直接创建的一个ThreadPoolExecutor对象而是套了一层FinalizableDelegatedExecutorService那么这个又是什么东西呢
static class FinalizableDelegatedExecutorServiceextends DelegatedExecutorService {FinalizableDelegatedExecutorService(ExecutorService executor) {super(executor);}protected void finalize() { //在GC时会执行finalize方法此方法中会关闭掉线程池释放资源super.shutdown();}
}static class DelegatedExecutorService extends AbstractExecutorService {private final ExecutorService e; //被委派对象DelegatedExecutorService(ExecutorService executor) { e executor; } //实际上所以的操作都是让委派对象执行的有点像代理public void execute(Runnable command) { e.execute(command); }public void shutdown() { e.shutdown(); }public ListRunnable shutdownNow() { return e.shutdownNow(); }所以下面两种写法的区别在于
public static void main(String[] args) throws InterruptedException {ExecutorService executor1 Executors.newSingleThreadExecutor();ExecutorService executor2 Executors.newFixedThreadPool(1);
}前者实际上是被代理了我们没办法直接修改前者的相关属性显然使用前者创建只有一个线程的线程池更加专业和安全可以防止属性被修改一些。
最后我们来看newCachedThreadPool方法
public static void main(String[] args) throws InterruptedException {ExecutorService executor Executors.newCachedThreadPool();//它是一个会根据需要无限制创建新线程的线程池
}我们来看看它的实现
public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueueRunnable());
}可以看到核心线程数为0那么也就是说所有的线程都是非核心线程也就是说线程空闲时间超过1秒钟一律销毁。但是它的最大容量是Integer.MAX_VALUE也就是说它可以无限制地增长下去所以这玩意一定要慎用。
执行带返回值的任务
一个多线程任务不仅仅可以是void无返回值任务比如我们现在需要执行一个任务但是我们需要在任务执行之后得到一个结果这个时候怎么办呢
这里我们就可以使用到Future了它可以返回任务的计算结果我们可以通过它来获取任务的结果以及任务当前是否完成
public static void main(String[] args) throws InterruptedException, ExecutionException {ExecutorService executor Executors.newSingleThreadExecutor(); //直接用Executors创建方便就完事了FutureString future executor.submit(() - 我是字符串!); //使用submit提交任务会返回一个Future对象注意提交的对象可以是Runable也可以是Callable这里使用的是Callable能够自定义返回值System.out.println(future.get()); //如果任务未完成get会被阻塞任务完成返回Callable执行结果返回值executor.shutdown();
}当然结果也可以一开始就定义好然后等待Runnable执行完之后再返回
public static void main(String[] args) throws InterruptedException, ExecutionException {ExecutorService executor Executors.newSingleThreadExecutor();FutureString future executor.submit(() - {try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}}, 我是字符串);System.out.println(future.get());executor.shutdown();
}还可以通过传入FutureTask对象的方式
public static void main(String[] args) throws ExecutionException, InterruptedException {ExecutorService service Executors.newSingleThreadExecutor();FutureTaskString task new FutureTask(() - 我是字符串);service.submit(task);System.out.println(task.get());executor.shutdown();
}我们可以还通过Future对象获取当前任务的一些状态
public static void main(String[] args) throws ExecutionException, InterruptedException {ExecutorService executor Executors.newSingleThreadExecutor();FutureString future executor.submit(() - 都看到这里了不赏UP主一个一键三连吗);System.out.println(future.get());System.out.println(任务是否执行完成future.isDone());System.out.println(任务是否被取消future.isCancelled());executor.shutdown();
}我们来试试看在任务执行途中取消任务
public static void main(String[] args) throws ExecutionException, InterruptedException {ExecutorService executor Executors.newSingleThreadExecutor();FutureString future executor.submit(() - {TimeUnit.SECONDS.sleep(10);return 这次一定;});System.out.println(future.cancel(true));System.out.println(future.isCancelled());executor.shutdown();
}执行定时任务
既然线程池怎么强大那么线程池能不能执行定时任务呢我们之前如果需要执行一个定时任务那么肯定会用到Timer和TimerTask但是它只会创建一个线程处理我们的定时任务无法实现多线程调度并且它无法处理异常情况一旦抛出未捕获异常那么会直接终止显然我们需要一个更加强大的定时器。
JDK5之后我们可以使用ScheduledThreadPoolExecutor来提交定时任务它继承自ThreadPoolExecutor并且所有的构造方法都必须要求最大线程池容量为Integer.MAX_VALUE并且都是采用的DelayedWorkQueue作为等待队列。
public ScheduledThreadPoolExecutor(int corePoolSize) {super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue());
}public ScheduledThreadPoolExecutor(int corePoolSize,ThreadFactory threadFactory) {super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue(), threadFactory);
}public ScheduledThreadPoolExecutor(int corePoolSize,RejectedExecutionHandler handler) {super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue(), handler);
}public ScheduledThreadPoolExecutor(int corePoolSize,ThreadFactory threadFactory,RejectedExecutionHandler handler) {super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue(), threadFactory, handler);
}我们来测试一下它的方法这个方法可以提交一个延时任务只有到达指定时间之后才会开始
public static void main(String[] args) throws ExecutionException, InterruptedException {//直接设定核心线程数为1ScheduledThreadPoolExecutor executor new ScheduledThreadPoolExecutor(1);//这里我们计划在3秒后执行executor.schedule(() - System.out.println(HelloWorld!), 3, TimeUnit.SECONDS);executor.shutdown();
}我们也可以像之前一样传入一个Callable对象用于接收返回值
public static void main(String[] args) throws ExecutionException, InterruptedException {ScheduledThreadPoolExecutor executor new ScheduledThreadPoolExecutor(2);//这里使用ScheduledFutureScheduledFutureString future executor.schedule(() - ????, 3, TimeUnit.SECONDS);System.out.println(任务剩余等待时间future.getDelay(TimeUnit.MILLISECONDS) / 1000.0 s);System.out.println(任务执行结果future.get());executor.shutdown();
}可以看到schedule方法返回了一个ScheduledFuture对象和Future一样它也支持返回值的获取、包括对任务的取消同时还支持获取剩余等待时间。
那么如果我们希望按照一定的频率不断执行任务呢
public static void main(String[] args) throws ExecutionException, InterruptedException {ScheduledThreadPoolExecutor executor new ScheduledThreadPoolExecutor(2);executor.scheduleAtFixedRate(() - System.out.println(Hello World!),3, 1, TimeUnit.SECONDS);//三秒钟延迟开始之后每隔一秒钟执行一次
}Executors也为我们预置了newScheduledThreadPool方法用于创建线程池
public static void main(String[] args) throws ExecutionException, InterruptedException {ScheduledExecutorService service Executors.newScheduledThreadPool(1);service.schedule(() - System.out.println(Hello World!), 1, TimeUnit.SECONDS);
}线程池实现原理
前面我们了解了线程池的使用那么接着我们来看看它的详细实现过程结构稍微有点复杂坐稳发车了。
这里需要首先介绍一下ctl变量
//这个变量比较关键用到了原子AtomicInteger用于同时保存线程池运行状态和线程数量使用原子类是为了保证原子性
//它是通过拆分32个bit位来保存数据的前3位保存状态后29位保存工作线程数量那要是工作线程数量29位装不下不就GG
private final AtomicInteger ctl new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS Integer.SIZE - 3; //29位线程数量位
private static final int CAPACITY (1 COUNT_BITS) - 1; //计算得出最大容量1左移29位最大容量为2的29次方-1// 所有的运行状态注意都是只占用前3位不会占用后29位
// 接收新任务并等待执行队列中的任务
private static final int RUNNING -1 COUNT_BITS; //111 | 0000... (后29数量位下同)
// 不接收新任务但是依然等待执行队列中的任务
private static final int SHUTDOWN 0 COUNT_BITS; //000 | 数量位
// 不接收新任务也不执行队列中的任务并且还要中断正在执行中的任务
private static final int STOP 1 COUNT_BITS; //001 | 数量位
// 所有的任务都已结束线程数量为0即将完全关闭
private static final int TIDYING 2 COUNT_BITS; //010 | 数量位
// 完全关闭
private static final int TERMINATED 3 COUNT_BITS; //011 | 数量位// 封装和解析ctl变量的一些方法
private static int runStateOf(int c) { return c ~CAPACITY; } //对CAPACITY取反就是后29位全部为0前三位全部为1接着与c进行与运算这样就可以只得到前三位的结果了所以这里是取运行状态
private static int workerCountOf(int c) { return c CAPACITY; }
//同上这里是为了得到后29位的结果所以这里是取线程数量
private static int ctlOf(int rs, int wc) { return rs | wc; }
// 比如上面的RUNNING, 0进行与运算之后
// 111 | 0000000000000000000000000我们先从最简单的入手看看在调用execute方法之后线程池会做些什么
//这个就是我们指定的阻塞队列
private final BlockingQueueRunnable workQueue;//再次提醒这里没加锁该有什么意识不用我说了吧所以说ctl才会使用原子类。
public void execute(Runnable command) {if (command null)throw new NullPointerException(); //如果任务为null那执行个寂寞所以说直接空指针int c ctl.get(); //获取ctl的值一会要读取信息的if (workerCountOf(c) corePoolSize) { //判断工作线程数量是否小于核心线程数if (addWorker(command, true)) //如果是那不管三七二十一直接加新的线程执行然后返回即可return;c ctl.get(); //如果线程添加失败有可能其他线程也在对线程池进行操作那就更新一下c的值}if (isRunning(c) workQueue.offer(command)) { //继续判断如果当前线程池是运行状态那就尝试向阻塞队列中添加一个新的等待任务int recheck ctl.get(); //再次获取ctl的值if (! isRunning(recheck) remove(command)) //这里是再次确认当前线程池是否关闭如果添加等待任务后线程池关闭了那就把刚刚加进去任务的又拿出来reject(command); //然后直接拒绝当前任务的提交会根据我们的拒绝策略决定如何进行拒绝操作else if (workerCountOf(recheck) 0) //如果这个时候线程池依然在运行状态那么就检查一下当前工作线程数是否为0如果是那就直接添加新线程执行addWorker(null, false); //添加一个新的非核心线程但是注意没添加任务//其他情况就啥也不用做了}else if (!addWorker(command, false)) //这种情况要么就是线程池没有运行要么就是队列满了按照我们之前的规则核心线程数已满且队列已满那么会直接添加新的非核心线程但是如果已经添加到最大数量这里肯定是会失败的reject(command); //确实装不下了只能拒绝
}是不是感觉思路还挺清晰的我们接着来看addWorker是怎么创建和执行任务的又是一大堆代码
private boolean addWorker(Runnable firstTask, boolean core) {//这里给最外层循环打了个标签方便一会的跳转操作retry:for (;;) { //无限循环老套路了注意这里全程没加锁int c ctl.get(); //获取ctl值int rs runStateOf(c); //解析当前的运行状态// Check if queue empty only if necessary.if (rs SHUTDOWN //判断线程池是否不是处于运行状态! (rs SHUTDOWN //如果不是运行状态判断线程是SHUTDOWN状态并、任务不为null、等待队列不为空只要有其中一者不满足直接返回false添加失败firstTask null ! workQueue.isEmpty()))return false;for (;;) { //内层又一轮无限循环这个循环是为了将线程计数增加然后才可以真正地添加一个新的线程int wc workerCountOf(c); //解析当前的工作线程数量if (wc CAPACITY ||wc (core ? corePoolSize : maximumPoolSize)) //判断一下还装得下不如果装得下看看是核心线程还是非核心线程如果是核心线程不能大于核心线程数的限制如果是非核心线程不能大于最大线程数限制return false;if (compareAndIncrementWorkerCount(c)) //CAS自增线程计数如果增加成功任务完成直接跳出继续break retry; //注意这里要直接跳出最外层循环所以用到了标签类似于goto语句c ctl.get(); // 如果CAS失败更新一下c的值if (runStateOf(c) ! rs) //如果CAS失败的原因是因为线程池状态和一开始的不一样了那么就重新从外层循环再来一次continue retry; //注意这里要直接从最外层循环继续所以用到了标签类似于goto语句// 如果是其他原因导致的CAS失败那只可能是其他线程同时在自增所以重新再来一次内层循环}}//好了线程计数自增也完了接着就是添加新的工作线程了boolean workerStarted false; //工作线程是否已启动boolean workerAdded false; //工作线程是否已添加Worker w null; //暂时理解为工作线程别急我们之后会解读Worker类try {w new Worker(firstTask); //创建新的工作线程传入我们提交的任务final Thread t w.thread; //拿到工作线程中封装的Thread对象if (t ! null) { //如果线程不为null那就可以安排干活了final ReentrantLock mainLock this.mainLock; //又是ReentrantLock加锁环节这里开始就是只有一个线程能进入了mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.int rs runStateOf(ctl.get()); //获取当前线程的运行状态if (rs SHUTDOWN ||(rs SHUTDOWN firstTask null)) { //只有当前线程池是正在运行状态或是SHUTDOWN状态且firstTask为空那么就继续if (t.isAlive()) // 检查一下线程是否正在运行状态throw new IllegalThreadStateException(); //如果是那肯定是不能运行我们的任务的workers.add(w); //直接将新创建的Work丢进 workers 集合中int s workers.size(); //看看当前workers的大小if (s largestPoolSize) //这里是记录线程池运行以来历史上的最多线程数largestPoolSize s;workerAdded true; //工作线程已添加}} finally {mainLock.unlock(); //解锁}if (workerAdded) {t.start(); //启动线程workerStarted true; //工作线程已启动}}} finally {if (! workerStarted) //如果线程在上面的启动过程中失败了addWorkerFailed(w); //将w移出workers并将计数器-1最后如果线程池是终止状态会尝试加速终止线程池}return workerStarted; //返回是否成功
}接着我们来看Worker类是如何实现的它继承自AbstractQueuedSynchronizer时隔两章居然再次遇到AQS那也就是说它本身就是一把锁
private final class Workerextends AbstractQueuedSynchronizerimplements Runnable {//用来干活的线程final Thread thread;//要执行的第一个任务构造时就确定了的Runnable firstTask;//干活数量计数器也就是这个线程完成了多少个任务volatile long completedTasks;Worker(Runnable firstTask) {setState(-1); // 执行Task之前不让中断将AQS的state设定为-1this.firstTask firstTask;this.thread getThreadFactory().newThread(this); //通过预定义或是我们自定义的线程工厂创建线程}public void run() {runWorker(this); //真正开始干活包括当前活干完了又要等新的活来就从这里开始一会详细介绍}//0就是没加锁1就是已加锁protected boolean isHeldExclusively() {return getState() ! 0;}...
}最后我们来看看一个Worker到底是怎么在进行任务的
final void runWorker(Worker w) {Thread wt Thread.currentThread(); //获取当前线程Runnable task w.firstTask; //取出要执行的任务w.firstTask null; //然后把Worker中的任务设定为nullw.unlock(); // 因为一开始为-1这里是通过unlock操作将其修改回0只有state大于等于0才能响应中断boolean completedAbruptly true;try {//只要任务不为null或是任务为空但是可以从等待队列中取出任务不为空那么就开始执行这个任务注意这里是无限循环也就是说如果当前没有任务了那么会在getTask方法中卡住因为要从阻塞队列中等着取任务while (task ! null || (task getTask()) ! null) {w.lock(); //对当前Worker加锁这里其实并不是防其他线程而是在shutdown时保护此任务的运行//由于线程池在STOP状态及以上会禁止新线程加入并且中断正在进行的线程if ((runStateAtLeast(ctl.get(), STOP) || //只要线程池是STOP及以上的状态那肯定是不能开始新任务的(Thread.interrupted() //线程是否已经被打上中断标记并且线程一定是STOP及以上runStateAtLeast(ctl.get(), STOP))) !wt.isInterrupted()) //再次确保线程被没有打上中断标记wt.interrupt(); //打中断标记try {beforeExecute(wt, task); //开始之前的准备工作这里暂时没有实现Throwable thrown null;try {task.run(); //OK开始执行任务} catch (RuntimeException x) {thrown x; throw x;} catch (Error x) {thrown x; throw x;} catch (Throwable x) {thrown x; throw new Error(x);} finally {afterExecute(task, thrown); //执行之后的工作也没实现}} finally {task null; //任务已完成不需要了w.completedTasks; //任务完成数w.unlock(); //解锁}}completedAbruptly false;} finally {//如果能走到这一步那说明上面的循环肯定是跳出了也就是说这个Worker可以丢弃了//所以这里会直接将 Worker 从 workers 里删除掉processWorkerExit(w, completedAbruptly);}
}那么它是怎么从阻塞队列里面获取任务的呢
private Runnable getTask() {boolean timedOut false; // Did the last poll() time out?for (;;) { //无限循环获取int c ctl.get(); //获取ctl int rs runStateOf(c); //解析线程池运行状态// Check if queue empty only if necessary.if (rs SHUTDOWN (rs STOP || workQueue.isEmpty())) { //判断是不是没有必要再执行等待队列中的任务了也就是处于关闭线程池的状态了decrementWorkerCount(); //直接减少一个工作线程数量return null; //返回null这样上面的runWorker就直接结束了下同}int wc workerCountOf(c); //如果线程池运行正常那就获取当前的工作线程数量// Are workers subject to culling?boolean timed allowCoreThreadTimeOut || wc corePoolSize; //如果线程数大于核心线程数或是允许核心线程等待超时那么就标记为可超时的//超时或maximumPoolSize在运行期间被修改了并且线程数大于1或等待队列为空那也是不能获取到任务的if ((wc maximumPoolSize || (timed timedOut)) (wc 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c)) //如果CAS减少工作线程成功return null; //返回nullcontinue; //否则开下一轮循环}try {Runnable r timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : //如果可超时那么最多等到超时时间workQueue.take(); //如果不可超时那就一直等着拿任务if (r ! null) //如果成功拿到任务ok返回return r;timedOut true; //否则就是超时了下一轮循环将直接返回null} catch (InterruptedException retry) {timedOut false;}//开下一轮循环吧}
}虽然我们的源码解读越来越深但是只要各位的思路不断依然是可以继续往下看的。到此有关execute()方法的源码解读就先到这里。
接着我们来看当线程池关闭时会做什么事情
//普通的shutdown会继续将等待队列中的线程执行完成后再关闭线程池
public void shutdown() {final ReentrantLock mainLock this.mainLock;mainLock.lock();try {//判断是否有权限终止checkShutdownAccess();//CAS将线程池运行状态改为SHUTDOWN状态还算比较温柔详细过程看下面advanceRunState(SHUTDOWN);//让闲着的线程比如正在等新的任务中断但是并不会影响正在运行的线程详细过程请看下面interruptIdleWorkers();onShutdown(); //给ScheduledThreadPoolExecutor提供的钩子方法就是等ScheduledThreadPoolExecutor去实现的当前类没有实现} finally {mainLock.unlock();}tryTerminate(); //最后尝试终止线程池
}private void advanceRunState(int targetState) {for (;;) {int c ctl.get(); //获取ctlif (runStateAtLeast(c, targetState) || //是否大于等于指定的状态ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c)))) //CAS设置ctl的值break; //任意一个条件OK就可以结束了}
}private void interruptIdleWorkers(boolean onlyOne) {final ReentrantLock mainLock this.mainLock;mainLock.lock();try {for (Worker w : workers) {Thread t w.thread; //拿到Worker中的线程if (!t.isInterrupted() w.tryLock()) { //先判断一下线程是不是没有被中断然后尝试加锁但是通过前面的runWorker()源代码我们得知开始之后是让Worker加了锁的所以如果线程还在执行任务那么这里肯定会falsetry {t.interrupt(); //如果走到这里那么说明线程肯定是一个闲着的线程直接给中断吧} catch (SecurityException ignore) {} finally {w.unlock(); //解锁}}if (onlyOne) //如果只针对一个Worker那么就结束循环break;}} finally {mainLock.unlock();}
}而shutdownNow()方法也差不多但是这里会更直接一些
//shutdownNow开始后不仅不允许新的任务到来也不会再执行等待队列的线程而且会终止正在执行的线程
public ListRunnable shutdownNow() {ListRunnable tasks;final ReentrantLock mainLock this.mainLock;mainLock.lock();try {checkShutdownAccess();//这里就是直接设定为STOP状态了不再像shutdown那么温柔advanceRunState(STOP);//直接中断所有工作线程详细过程看下面interruptWorkers();//取出仍处于阻塞队列中的线程tasks drainQueue();} finally {mainLock.unlock();}tryTerminate();return tasks; //最后返回还没开始的任务
}private void interruptWorkers() {final ReentrantLock mainLock this.mainLock;mainLock.lock();try {for (Worker w : workers) //遍历所有Workerw.interruptIfStarted(); //无差别对待一律加中断标记} finally {mainLock.unlock();}
}最后的最后我们再来看看tryTerminate()是怎么完完全全终止掉一个线程池的
final void tryTerminate() {for (;;) { //无限循环int c ctl.get(); //上来先获取一下ctl值//只要是正在运行 或是 线程池基本上关闭了 或是 处于SHUTDOWN状态且工作队列不为空那么这时还不能关闭线程池返回if (isRunning(c) ||runStateAtLeast(c, TIDYING) ||(runStateOf(c) SHUTDOWN ! workQueue.isEmpty()))return;//走到这里要么处于SHUTDOWN状态且等待队列为空或是STOP状态if (workerCountOf(c) ! 0) { // 如果工作线程数不是0这里也会中断空闲状态下的线程interruptIdleWorkers(ONLY_ONE); //这里最多只中断一个空闲线程然后返回return;}//走到这里工作线程也为空了可以终止线程池了final ReentrantLock mainLock this.mainLock;mainLock.lock();try {if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { //先CAS将状态设定为TIDYING表示基本终止正在做最后的操作try {terminated(); //终止暂时没有实现} finally {ctl.set(ctlOf(TERMINATED, 0)); //最后将状态设定为TERMINATED线程池结束了它年轻的生命termination.signalAll(); //如果有线程调用了awaitTermination方法会等待当前线程池终止到这里差不多就可以唤醒了}return; //结束}//注意如果CAS失败会直接进下一轮循环重新判断} finally {mainLock.unlock();}// else retry on failed CAS}
}OK有关线程池的实现原理我们就暂时先介绍到这里关于更高级的定时任务线程池这里就不做讲解了。 并发工具类
计数器锁 CountDownLatch
多任务同步神器。它允许一个或多个线程等待其他线程完成工作比如现在我们有这样的一个需求
有20个计算任务我们需要先将这些任务的结果全部计算出来每个任务的执行时间未知当所有任务结束之后立即整合统计最终结果
要实现这个需求那么有一个很麻烦的地方我们不知道任务到底什么时候执行完毕那么可否将最终统计延迟一定时间进行呢但是最终统计无论延迟多久进行要么不能保证所有任务都完成要么可能所有任务都完成了而这里还在等。
所以说我们需要一个能够实现子任务同步的工具。
public static void main(String[] args) throws InterruptedException {CountDownLatch latch new CountDownLatch(20); //创建一个初始值为10的计数器锁for (int i 0; i 20; i) {int finalI i;new Thread(() - {try {Thread.sleep((long) (2000 * new Random().nextDouble()));System.out.println(子任务 finalI 执行完成);} catch (InterruptedException e) {e.printStackTrace();}latch.countDown(); //每执行一次计数器都会-1}).start();}//开始等待所有的线程完成当计数器为0时恢复运行latch.await(); //这个操作可以同时被多个线程执行一起等待这里只演示了一个System.out.println(所有子任务都完成任务完成);//注意这个计数器只能使用一次用完只能重新创一个没有重置的说法
}我们在调用await()方法之后实际上就是一个等待计数器衰减为0的过程而进行自减操作则由各个子线程来完成当子线程完成工作后那么就将计数器-1所有的子线程完成之后计数器为0结束等待。
那么它是如何实现的呢实现 原理非常简单
public class CountDownLatch {//同样是通过内部类实现AbstractQueuedSynchronizerprivate static final class Sync extends AbstractQueuedSynchronizer {Sync(int count) { //这里直接使用AQS的state作为计数器可见state能被玩出各种花样也就是说一开始就加了count把共享锁当线程调用countdown时就解一层锁setState(count);}int getCount() {return getState();}//采用共享锁机制因为可以被不同的线程countdown所以实现的tryAcquireShared和tryReleaseShared//获取这把共享锁其实就是去等待state被其他线程减到0protected int tryAcquireShared(int acquires) {return (getState() 0) ? 1 : -1;}protected boolean tryReleaseShared(int releases) {// 每次执行都会将state值-1直到为0for (;;) {int c getState();if (c 0)return false; //如果已经是0了那就falseint nextc c-1;if (compareAndSetState(c, nextc)) //CAS设置state值失败直接下一轮循环return nextc 0; //返回c-1之后是不是0如果是那就true否则false也就是说只有刚好减到0的时候才会返回true}}}private final Sync sync;public CountDownLatch(int count) {if (count 0) throw new IllegalArgumentException(count 0); //count那肯定不能小于0啊this.sync new Sync(count); //构造Sync对象将count作为state初始值}//通过acquireSharedInterruptibly方法获取共享锁但是如果state不为0那么会被持续阻塞详细原理下面讲public void await() throws InterruptedException {sync.acquireSharedInterruptibly(1);}//同上但是会超时public boolean await(long timeout, TimeUnit unit)throws InterruptedException {return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));}//countDown其实就是解锁一次public void countDown() {sync.releaseShared(1);}//获取当前的计数也就是AQS中state的值public long getCount() {return sync.getCount();}//这个就不说了public String toString() {return super.toString() [Count sync.getCount() ];}
}在深入讲解之前我们先大致了解一下CountDownLatch的基本实现思路
利用共享锁实现在一开始的时候就是已经上了count层锁的状态也就是state countawait()就是加共享锁但是必须state为0才能加锁成功否则按照AQS的机制会进入等待队列阻塞加锁成功后结束阻塞countDown()就是解1层锁也就是靠这个方法一点一点把state的值减到0
由于我们前面只对独占锁进行了讲解没有对共享锁进行讲解这里还是稍微提一下它
public final void acquireShared(int arg) {if (tryAcquireShared(arg) 0) //上来就调用tryAcquireShared尝试以共享模式获取锁小于0则失败上面判断的是state0返回1否则-1也就是说如果计数器不为0那么这里会判断成功doAcquireShared(arg); //计数器不为0的时候按照它的机制那么会阻塞所以我们来看看doAcquireShared中是怎么进行阻塞的
}private void doAcquireShared(int arg) {final Node node addWaiter(Node.SHARED); //向等待队列中添加一个新的共享模式结点boolean failed true;try {boolean interrupted false;for (;;) { //无限循环final Node p node.predecessor(); //获取当前节点的前驱的结点if (p head) { //如果p就是头结点那么说明当前结点就是第一个等待节点int r tryAcquireShared(arg); //会再次尝试获取共享锁if (r 0) { //要是获取成功setHeadAndPropagate(node, r); //那么就将当前节点设定为新的头结点并且会继续唤醒后继节点p.next null; // help GCif (interrupted)selfInterrupt();failed false;return;}}if (shouldParkAfterFailedAcquire(p, node) //和独占模式下一样的操作这里不多说了parkAndCheckInterrupt())interrupted true;}} finally {if (failed)cancelAcquire(node); //如果最后都还是没获取到那么就cancel}
}
//其实感觉大体上和独占模式的获取有点像但是它多了个传播机制会继续唤醒后续节点private void setHeadAndPropagate(Node node, int propagate) {Node h head; // 取出头结点并将当前节点设定为新的头结点setHead(node);//因为一个线程成功获取到共享锁之后有可能剩下的等待中的节点也有机会拿到共享锁if (propagate 0 || h null || h.waitStatus 0 ||(h head) null || h.waitStatus 0) { //如果propagate大于0表示共享锁还能继续获取或是h.waitStatus 0这是由于在其他线程释放共享锁时doReleaseShared会将状态设定为PROPAGATE表示可以传播唤醒后面会讲Node s node.next;if (s null || s.isShared())doReleaseShared(); //继续唤醒下一个等待节点}
}我们接着来看它的countdown过程
public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) { //直接尝试释放锁如果成功返回true在CountDownLatch中只有state减到0的那一次会返回truedoReleaseShared(); //这里也会调用doReleaseShared继续唤醒后面的结点return true;}return false; //其他情况false//不过这里countdown并没有用到这些返回值
}private void doReleaseShared() {for (;;) { //无限循环Node h head; //获取头结点if (h ! null h ! tail) { //如果头结点不为空且头结点不是尾结点那么说明等待队列中存在节点int ws h.waitStatus; //取一下头结点的等待状态if (ws Node.SIGNAL) { //如果是SIGNAL那么就CAS将头结点的状态设定为初始值if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))continue; //失败就开下一轮循环重来unparkSuccessor(h); //和独占模式一样当锁被释放都会唤醒头结点的后继节点doAcquireShared循环继续如果成功那么根据setHeadAndPropagate又会继续调用当前方法不断地传播下去让后面的线程一个一个地获取到共享锁直到不能再继续获取为止}else if (ws 0 !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) //如果等待状态是默认值0那么说明后继节点已经被唤醒直接将状态设定为PROPAGATE它代表在后续获取资源的时候够向后面传播continue; //失败就开下一轮循环重来}if (h head) // 如果头结点发生了变化不会break而是继续循环否则直接break退出break;}
}可能看完之后还是有点乱我们再来理一下
共享锁是线程共享的同一时刻能有多个线程拥有共享锁。如果一个线程刚获取了共享锁那么在其之后等待的线程也很有可能能够获取到锁所以得传播下去继续尝试唤醒后面的结点不像独占锁独占的压根不需要考虑这些。如果一个线程刚释放了锁不管是独占锁还是共享锁都需要唤醒后续等待结点的线程。
回到CountDownLatch再结合整个AQS共享锁的实现机制进行一次完整的推导看明白还是比较简单的。
循环屏障 CyclicBarrier
好比一场游戏我们必须等待房间内人数足够之后才能开始并且游戏开始之后玩家需要同时进入游戏以保证公平性。
假如现在游戏房间内一共5人但是游戏开始需要10人所以我们必须等待剩下5人到来之后才能开始游戏并且保证游戏开始时所有玩家都是同时进入那么怎么实现这个功能呢我们可以使用CyclicBarrier翻译过来就是循环屏障那么这个屏障正式为了解决这个问题而出现的。
public static void main(String[] args) {CyclicBarrier barrier new CyclicBarrier(10, //创建一个初始值为10的循环屏障() - System.out.println(飞机马上就要起飞了各位特种兵请准备)); //人等够之后执行的任务for (int i 0; i 10; i) {int finalI i;new Thread(() - {try {Thread.sleep((long) (2000 * new Random().nextDouble()));System.out.println(玩家 finalI 进入房间进行等待... (barrier.getNumberWaiting()/10));barrier.await(); //调用await方法进行等待直到等待的线程足够多为止//开始游戏所有玩家一起进入游戏System.out.println(玩家 finalI 进入游戏);} catch (InterruptedException | BrokenBarrierException e) {e.printStackTrace();}}).start();}
}可以看到循环屏障会不断阻挡线程直到被阻挡的线程足够多时才能一起冲破屏障并且在冲破屏障时我们也可以做一些其他的任务。这和人多力量大的道理是差不多的当人足够多时方能冲破阻碍到达美好的明天。当然屏障由于是可循环的所以它在被冲破后会重新开始计数继续阻挡后续的线程
public static void main(String[] args) {CyclicBarrier barrier new CyclicBarrier(5); //创建一个初始值为5的循环屏障for (int i 0; i 10; i) { //创建5个线程int finalI i;new Thread(() - {try {Thread.sleep((long) (2000 * new Random().nextDouble()));System.out.println(玩家 finalI 进入房间进行等待... (barrier.getNumberWaiting()/5));barrier.await(); //调用await方法进行等待直到等待线程到达5才会一起继续执行//人数到齐之后可以开始游戏了System.out.println(玩家 finalI 进入游戏);} catch (InterruptedException | BrokenBarrierException e) {e.printStackTrace();}}).start();}
}可以看到通过使用循环屏障我们可以对线程进行一波一波地放行每一波都放行5个线程当然除了自动重置之外我们也可以调用reset()方法来手动进行重置操作同样会重新计数
public static void main(String[] args) throws InterruptedException {CyclicBarrier barrier new CyclicBarrier(5); //创建一个初始值为10的计数器锁for (int i 0; i 3; i)new Thread(() - {try {barrier.await();} catch (InterruptedException | BrokenBarrierException e) {e.printStackTrace();}}).start();Thread.sleep(500); //等一下上面的线程开始运行System.out.println(当前屏障前的等待线程数barrier.getNumberWaiting());barrier.reset();System.out.println(重置后屏障前的等待线程数barrier.getNumberWaiting());
}可以看到在调用reset()之后处于等待状态下的线程全部被中断并且抛出BrokenBarrierException异常循环屏障等待线程数归零。那么要是处于等待状态下的线程被中断了呢屏障的线程等待数量会不会自动减少
public static void main(String[] args) throws InterruptedException {CyclicBarrier barrier new CyclicBarrier(10);Runnable r () - {try {barrier.await();} catch (InterruptedException | BrokenBarrierException e) {e.printStackTrace();}};Thread t new Thread(r);t.start();t.interrupt();new Thread(r).start();
}可以看到当await()状态下的线程被中断那么屏障会直接变成损坏状态一旦屏障损坏那么这一轮就无法再做任何等待操作了。也就是说本来大家计划一起合力冲破屏障结果有一个人摆烂中途退出了那么所有人的努力都前功尽弃这一轮的屏障也不可能再被冲破了所以CyclicBarrier告诉我们不要做那个害群之马要相信你的团队不然没有好果汁吃只能进行reset()重置操作进行重置才能恢复正常。
乍一看怎么感觉和之前讲的CountDownLatch有点像好了这里就得区分一下了千万别搞混
CountDownLatch 它只能使用一次是一个一次性的工具它是一个或多个线程用于等待其他线程完成的同步工具 CyclicBarrier 它可以反复使用允许自动或手动重置计数它是让一定数量的线程在同一时间开始运行的同步工具
我们接着来看循环屏障的实现细节
public class CyclicBarrier {//内部类存放broken标记表示屏障是否损坏损坏的屏障是无法正常工作的private static class Generation {boolean broken false;}/** 内部维护一个可重入锁 */private final ReentrantLock lock new ReentrantLock();/** 再维护一个Condition */private final Condition trip lock.newCondition();/** 这个就是屏障的最大阻挡容量就是构造方法传入的初始值 */private final int parties;/* 在屏障破裂时做的事情 */private final Runnable barrierCommand;/** 当前这一轮的Generation对象每一轮都有一个新的用于保存broken标记 */private Generation generation new Generation();//默认为最大阻挡容量每来一个线程-1和CountDownLatch挺像当屏障破裂或是被重置时都会将其重置为最大阻挡容量private int count;//构造方法public CyclicBarrier(int parties, Runnable barrierAction) {if (parties 0) throw new IllegalArgumentException();this.parties parties;this.count parties;this.barrierCommand barrierAction;}public CyclicBarrier(int parties) {this(parties, null);}//开启下一轮屏障一般屏障被冲破之后就自动重置了进入到下一轮private void nextGeneration() {// 唤醒所有等待状态的线程trip.signalAll();// 重置count的值count parties;//创建新的Generation对象generation new Generation();}//破坏当前屏障变为损坏状态之后就不能再使用了除非重置private void breakBarrier() {generation.broken true;count parties;trip.signalAll();}//开始等待public int await() throws InterruptedException, BrokenBarrierException {try {return dowait(false, 0L);} catch (TimeoutException toe) {throw new Error(toe); // 因为这里没有使用定时机制不可能发生异常如果发生怕是出了错误}}//可超时的等待public int await(long timeout, TimeUnit unit)throws InterruptedException,BrokenBarrierException,TimeoutException {return dowait(true, unit.toNanos(timeout));}//这里就是真正的等待流程了让我们细细道来private int dowait(boolean timed, long nanos)throws InterruptedException, BrokenBarrierException,TimeoutException {final ReentrantLock lock this.lock;lock.lock(); //加锁注意因为多个线程都会调用await方法因此只有一个线程能进其他都被卡着了try {final Generation g generation; //获取当前这一轮屏障的Generation对象if (g.broken)throw new BrokenBarrierException(); //如果这一轮屏障已经损坏那就没办法使用了if (Thread.interrupted()) { //如果当前等待状态的线程被中断那么会直接破坏掉屏障并抛出中断异常破坏屏障的第1种情况breakBarrier();throw new InterruptedException();}int index --count; //如果上面都没有出现不正常那么就走正常流程首先count自减并赋值给indexindex表示当前是等待的第几个线程if (index 0) { // 如果自减之后就是0了那么说明来的线程已经足够可以冲破屏障了boolean ranAction false;try {final Runnable command barrierCommand;if (command ! null)command.run(); //执行冲破屏障后的任务如果这里抛异常了那么会进finallyranAction true;nextGeneration(); //一切正常开启下一轮屏障方法进入之后会唤醒所有等待的线程这样所有的线程都可以同时继续运行了然后返回0注意最下面finally中会解锁不然其他线程唤醒了也拿不到锁啊return 0;} finally {if (!ranAction) //如果是上面出现异常进来的那么也会直接破坏屏障破坏屏障的第2种情况breakBarrier();}}// 能走到这里那么说明当前等待的线程数还不够多不足以冲破屏障for (;;) { //无限循环一直等等到能冲破屏障或是出现异常为止try {if (!timed)trip.await(); //如果不是定时的那么就直接永久等待else if (nanos 0L)nanos trip.awaitNanos(nanos); //否则最多等一段时间} catch (InterruptedException ie) { //等的时候会判断是否被中断依然是破坏屏障的第1种情况if (g generation ! g.broken) {breakBarrier();throw ie;} else {Thread.currentThread().interrupt();}}if (g.broken)throw new BrokenBarrierException(); //如果线程被唤醒之后发现屏障已经被破坏那么直接抛异常if (g ! generation) //成功冲破屏障开启下一轮那么直接返回当前是第几个等待的线程。return index;if (timed nanos 0L) { //线程等待超时也会破坏屏障破坏屏障的第3种情况然后抛异常breakBarrier();throw new TimeoutException();}}} finally {lock.unlock(); //最后别忘了解锁不然其他线程拿不到锁}}//不多说了public int getParties() {return parties;}//判断是否被破坏也是加锁访问因为有可能这时有其他线程正在执行dowaitpublic boolean isBroken() {final ReentrantLock lock this.lock;lock.lock();try {return generation.broken;} finally {lock.unlock();}}//重置操作也要加锁public void reset() {final ReentrantLock lock this.lock;lock.lock();try {breakBarrier(); // 先破坏这一轮的线程注意这个方法会先破坏再唤醒所有等待的线程那么所有等待的线程会直接抛BrokenBarrierException异常详情请看上方dowait倒数第13行nextGeneration(); // 开启下一轮} finally {lock.unlock();}}//获取等待线程数量也要加锁public int getNumberWaiting() {final ReentrantLock lock this.lock;lock.lock();try {return parties - count; //最大容量 - 当前剩余容量 正在等待线程数} finally {lock.unlock();}}
}看完了CyclicBarrier的源码之后是不是感觉比CountDownLatch更简单一些
信号量 Semaphore
还记得我们在《操作系统》中学习的信号量机制吗它在解决进程之间的同步问题中起着非常大的作用。 信号量(Semaphore)有时被称为信号灯是在多线程环境下使用的一种设施是可以用来保证两个或多个关键代码段不被并发调用。在进入一个关键代码段之前线程必须获取一个信号量一旦该关键代码段完成了那么该线程必须释放信号量。其它想进入该关键代码段的线程必须等待直到第一个线程释放信号量。 通过使用信号量我们可以决定某个资源同一时间能够被访问的最大线程数它相当于对某个资源的访问进行了流量控制。简单来说它就是一个可以被N个线程占用的排它锁因此也支持公平和非公平模式我们可以在最开始设定Semaphore的许可证数量每个线程都可以获得1个或n个许可证当许可证耗尽或不足以供其他线程获取时其他线程将被阻塞。
public static void main(String[] args) throws ExecutionException, InterruptedException {//每一个Semaphore都会在一开始获得指定的许可证数数量也就是许可证配额Semaphore semaphore new Semaphore(2); //许可证配额设定为2for (int i 0; i 3; i) {new Thread(() - {try {semaphore.acquire(); //申请一个许可证System.out.println(许可证申请成功);semaphore.release(); //归还一个许可证} catch (InterruptedException e) {e.printStackTrace();}}).start();}
}public static void main(String[] args) throws ExecutionException, InterruptedException {//每一个Semaphore都会在一开始获得指定的许可证数数量也就是许可证配额Semaphore semaphore new Semaphore(3); //许可证配额设定为3for (int i 0; i 2; i)new Thread(() - {try {semaphore.acquire(2); //一次性申请两个许可证System.out.println(许可证申请成功);} catch (InterruptedException e) {e.printStackTrace();}}).start();}我们也可以通过Semaphore获取一些常规信息
public static void main(String[] args) throws InterruptedException {Semaphore semaphore new Semaphore(3); //只配置一个许可证5个线程进行争抢不内卷还想要许可证for (int i 0; i 5; i)new Thread(semaphore::acquireUninterruptibly).start(); //可以以不响应中断主要是能简写一行方便Thread.sleep(500);System.out.println(剩余许可证数量semaphore.availablePermits());System.out.println(是否存在线程等待许可证(semaphore.hasQueuedThreads() ? 是 : 否));System.out.println(等待许可证线程数量semaphore.getQueueLength());
}我们可以手动回收掉所有的许可证
public static void main(String[] args) throws InterruptedException {Semaphore semaphore new Semaphore(3);new Thread(semaphore::acquireUninterruptibly).start();Thread.sleep(500);System.out.println(收回剩余许可数量semaphore.drainPermits()); //直接回收掉剩余的许可证
}这里我们模拟一下比如现在有10个线程同时进行任务任务要求是执行某个方法但是这个方法最多同时只能由5个线程执行这里我们使用信号量就非常合适。
数据交换 Exchanger
线程之间的数据传递也可以这么简单。
使用Exchanger它能够实现线程之间的数据交换
public static void main(String[] args) throws InterruptedException {ExchangerString exchanger new Exchanger();new Thread(() - {try {System.out.println(收到主线程传递的交换数据exchanger.exchange(AAAA));} catch (InterruptedException e) {e.printStackTrace();}}).start();System.out.println(收到子线程传递的交换数据exchanger.exchange(BBBB));
}在调用exchange方法后当前线程会等待其他线程调用同一个exchanger对象的exchange方法当另一个线程也调用之后方法会返回对方线程传入的参数。
可见功能还是比较简单的。
Fork/Join框架
在JDK7时出现了一个新的框架用于并行执行任务它的目的是为了把大型任务拆分为多个小任务最后汇总多个小任务的结果得到整大任务的结果并且这些小任务都是同时在进行大大提高运算效率。Fork就是拆分Join就是合并。
我们来演示一下实际的情况比如一个算式18x736x89x778x53可以拆分为四个小任务18x7、36x8、9x77、8x53最后我们只需要将这四个任务的结果加起来就是我们原本算式的结果了有点归并排序的味道。 它不仅仅只是拆分任务并使用多线程而且还可以利用工作窃取算法提高线程的利用率。 **工作窃取算法**是指某个线程从其他队列里窃取任务来执行。一个大任务分割为若干个互不依赖的子任务为了减少线程间的竞争把这些子任务分别放到不同的队列里并为每个队列创建一个单独的线程来执行队列里的任务线程和队列一一对应。但是有的线程会先把自己队列里的任务干完而其他线程对应的队列里还有任务待处理。干完活的线程与其等着不如帮其他线程干活于是它就去其他线程的队列里窃取一个任务来执行。 现在我们来看看如何使用它这里以计算1-1000的和为例我们可以将其拆分为8个小段的数相加比如1-125、126-250… 最后再汇总即可它也是依靠线程池来实现的
public class Main {public static void main(String[] args) throws InterruptedException, ExecutionException {ForkJoinPool pool new ForkJoinPool();System.out.println(pool.submit(new SubTask(1, 1000)).get());}//继承RecursiveTask这样才可以作为一个任务泛型就是计算结果类型private static class SubTask extends RecursiveTaskInteger {private final int start; //比如我们要计算一个范围内所有数的和那么就需要限定一下范围这里用了两个int存放private final int end;public SubTask(int start, int end) {this.start start;this.end end;}Overrideprotected Integer compute() {if(end - start 125) { //每个任务最多计算125个数的和如果大于继续拆分小于就可以开始算了SubTask subTask1 new SubTask(start, (end start) / 2);subTask1.fork(); //会继续划分子任务执行SubTask subTask2 new SubTask((end start) / 2 1, end);subTask2.fork(); //会继续划分子任务执行return subTask1.join() subTask2.join(); //越玩越有递归那味了} else {System.out.println(Thread.currentThread().getName() 开始计算 start-end 的值!);int res 0;for (int i start; i end; i) {res i;}return res; //返回的结果会作为join的结果}}}
}ForkJoinPool-1-worker-2 开始计算 1-125 的值!
ForkJoinPool-1-worker-2 开始计算 126-250 的值!
ForkJoinPool-1-worker-0 开始计算 376-500 的值!
ForkJoinPool-1-worker-6 开始计算 751-875 的值!
ForkJoinPool-1-worker-3 开始计算 626-750 的值!
ForkJoinPool-1-worker-5 开始计算 501-625 的值!
ForkJoinPool-1-worker-4 开始计算 251-375 的值!
ForkJoinPool-1-worker-7 开始计算 876-1000 的值!
500500可以看到结果非常正确但是整个计算任务实际上是拆分为了8个子任务同时完成的结合多线程原本的单线程任务在多线程的加持下速度成倍提升。
包括Arrays工具类提供的并行排序也是利用了ForkJoinPool来实现
public static void parallelSort(byte[] a) {int n a.length, p, g;if (n MIN_ARRAY_SORT_GRAN ||(p ForkJoinPool.getCommonPoolParallelism()) 1)DualPivotQuicksort.sort(a, 0, n - 1);elsenew ArraysParallelSortHelpers.FJByte.Sorter(null, a, new byte[n], 0, n, 0,((g n / (p 2)) MIN_ARRAY_SORT_GRAN) ?MIN_ARRAY_SORT_GRAN : g).invoke();
}并行排序的性能在多核心CPU环境下肯定是优于普通排序的并且排序规模越大优势越显著。
至此并发编程篇完结。