建购物网站需要多少钱,iis编辑网站绑定,优秀设计作品欣赏,360建网站好不好?文章目录零 基本概念1 CAS、ABA 问题和原子变量2 this 引用逸出3 不变性 immutable4 同步、异步、阻塞、非阻塞5 JMM6 同步方案演示#xff1a;计数器 demo*一 进程与线程1 区别与联系2 Java内存区域3 线程组4 线程的上下文切换5 并发与并行6 线程的生命周期与状态二 线程间的…
文章目录零 基本概念1 CAS、ABA 问题和原子变量2 this 引用逸出3 不变性 immutable4 同步、异步、阻塞、非阻塞5 JMM6 同步方案演示计数器 demo*一 进程与线程1 区别与联系2 Java内存区域3 线程组4 线程的上下文切换5 并发与并行6 线程的生命周期与状态二 线程间的通信和同步1 线程同步锁synchronized / Lock2 线程同步wait-notify3 线程同步volatile 信号量4 线程通信管道三 线程死锁1 死锁的四个必要条件2 死锁解决预防死锁、避免死锁、检测与解除死锁3 避免死锁开放调用限制锁顺序*四 并发编程的相关方法1 sleep() 与 wait()2 run() 与 start()3 join()4 yield()5 线程创建的四种方法6 获取与设置优先级、守护线程五 synchronized1 简介2 使用方法3 注意事项六 volatile1 作用2 双重校验锁实现 单例模式线程安全3 与 synchronized 的关系4 volatile 的实现——内存屏障*5 使用 volatile 必须满足的条件*七 ReentrantLock1 可重入锁2 和 synchronized 异同八 并发容器1 ConcurrentHashMap扩容问题**扩容需要解决的问题扩容流程sizeCtltransferIndexForwardingNode2 写时复制CopyOnWriteArrayList / CopyOnWriteArraySet3 并发队列4 基于跳表ConcurrentSkipListMap / ConcurrentSkipListSet九 异步任务执行服务、线程池0 基本接口和类 提交任务的方法1 为什么使用线程池2 ThreadPoolExecutor 构造方法3 ThreadPoolExecutor 的状态 扩展4 ThreadPoolExecutor 任务处理流程5 线程池大小的设置6 ForkJoinPool7 按完成顺序获取任务返回结果8 线程池异常处理9 获取执行结果Future实例 Memorizer十 同步工具类1 CountDownLatch2 循环栅栏 CyclicBarrier3 信号量 Semaphore4 ThreadLocal十一 线程中断1 中断方法2 示例十二 性能与可伸缩性1 相关概念2 多线程引入的开销3 减少锁竞争零 基本概念
对象的线程安全 当 多个线程 访问一个对象时如果不用考虑这些线程在运行时环境下的调度和交替执行也不需要进行额外的同步或者在调用方进行任何其他的协调操作调用这个对象的行为都可以获得正确的结果那这个 对象 就是线程安全的 并发编程的三个特性原子性、有序性、可见性关于有序性 如果在本线程内观察所有操作都是有序的线程内表现为串行如果从一个线程观察另一个线程所有的操作都是无序的指令重排序、工作内存和主内存同步延迟 无状态的对象一定是线程安全的计算过程中的临时状态传入的参数仅存在于栈上的局部变量中并且只能由正在执行的线程访问多线程常见的两个问题 竞态条件计算的正确性取决于多个线程交替执行的时序即结果是否正确具有偶然性内存可见性一个线程对一个共享变量的修改另一个线程不能马上看到甚至永远不能看到 因为除了内存数据还会被缓存在寄存器和各级缓存中 访问变量时不一定从内存中取修改变量时可能先写到缓存稍后才同步到内存中
1 CAS、ABA 问题和原子变量 CAS CompareAndSwap 是一种非阻塞同步方式采用基于冲突检测的乐观并发策略包含三个操作数需要读写的内存位置V、预期值A、拟写入的新值B 当且仅当VA时CAS才会通过原子方式用B更新V的值否则不会执行任何操作无论是否操作成功都返回V原有的值多个线程尝试使用CAS同时更新一个变量时只有一个线程可以成功变更其它都会失败但是失败的线程不会被挂起而是被告知失败可以再次尝试通常在失败时不执行任何操作 CAS 的整个过程是原子的依赖于硬件指令集CAS 是 Java 并发包的基础基于 CAS 可以实现高效、乐观、非阻塞的数据结构和算法它也是并发包的锁、同步工具和各种容器的基础 java.util.concurrent.atomic 包含了一些原子变量类也是基于 CAS 实现的使用示例
// 线程不安全
public class ThreadSafeCounter {private final long count 0L;public void plusOne() {// 自增操作包含 读取-修改-写入 三个步骤且非原子操作count;}
}// 线程安全
public class ThreadSafeCounter {private final AtomicLong count new AtomicLong(0);public void plusOne() {// 原子操作count.incrementAndGet(); }
}CAS 常见的 ABA 问题 一种简单的解决方法是为值加上版本号两次看到相同的值版本号是不同的AtomicStampedReference 维护一个引用和对应的版本号且修改操作是原子的大部分情况下 ABA 问题不会影响并发的正确性如果需要解决该问题采用互斥同步可能会更高效 private void test() {AtomicStampedReferenceInteger atomicStampedReference new AtomicStampedReference(1, 111);// 所有的修改操作都是原子的atomicStampedReference.attemptStamp(2, 222);atomicStampedReference.attemptStamp(1, 333); // 值为1但版本号不同}2 this 引用逸出
this 引用逸出的后果是如果发生逸出时外部类未初始化完成会导致不可预料的结果this 引用逸出产生的条件在构造函数中 创建 并 发布 内部类即使发布的语句是构造函数的最后一行内部类可以访问外部类的对象的域是因为内部类构造的时候会把外部类的对象 this 隐式的作为一个参数传递给内部类
// 逸出示例
public class ThisEscape {public final int id;public final String name;public ThisEscape(EventSourceEventListener source) {id 1;name xjpking;// 在此创建了内部类内部类具有外部类的this引用但外部类的构造方法并未执行完成source.registerListener(new EventListener() {public void onEvent(Object obj) {System.out.println(id: ThisEscape.this.id);System.out.println(name: ThisEscape.this.name);}});}
}// 正确示例保证创建内部类时外部类已经初始化完成
// 破坏了this逸出的第二个条件没有在构造方法中发布内部类
public class ThisSafe {public final int id;public final String name;private ThisSafe(EventSourceEventListener source) {id 1;listener new EventListener(){public void onEvent(Object obj) {System.out.println(id: ThisSafe.this.id);System.out.println(name: ThisSafe.this.name);}};name xjpisking;}// 只有构造方法返回之后外部类初始化完成public static ThisSafe getInstance(EventSourceEventListener source) {ThisSafe safe new ThisSafe();source.registerListener(safe.listener);return safe;}
}3 不变性 immutable
不可变对象满足以下条件 对象创建后状态不能修改所有属性都由 final 修饰对象是正确创建的不发生 this 逸出 不可变对象一定是线程安全的可以将多个与一致性相关的属性绑定在一个不可变对象中让它们看起来是原子的避免部分被修改从而保证这些属性的一致性
4 同步、异步、阻塞、非阻塞
同步、异步指请求发送方的行为 同步发送方等待接收方返回处理后的结果异步发送方不等待接收方返回处理后的结果 阻塞、非阻塞指请求接收方的行为 阻塞接收方直到处理请求后才返回结果非阻塞接收方即时返回结果如果未处理完请求也返回
组合行为同步阻塞发送方发送请求之后一直等待响应接收方处理请求如果不能马上等到返回结果就一直等到返回结果后才响应发送方同步非阻塞发送方发送请求之后一直等待响应接收方立即返回结果异步阻塞发送方向接收方请求后不等待响应可以继续其他工作接收方处理请求如果不能马上等到返回结果就一直等到返回结果后才响应发送方异步非阻塞发送方向接收方请求后不等待响应可以继续其他工作接收方立即返回结果
5 JMM 对应关系 从 JVM 运行时内存来看工作内存对应栈主内存对应堆中的对象实例从硬件角度来看工作内存对应寄存器或高速缓存主内存对应物理硬件的内存 工作内存保存了变量的主内存副本线程对变量的所有操作都必须在工作内存中执行不能直接读写主内存的数据 volatile 变量同样遵循只是看起来如同直接在主内存中读写访问
6 同步方案演示计数器 demo*
同时面临竞态条件和内存可见性问题演示四种同步方案 volatile 关键字synchronized 关键字原子类型变量显式锁 ReentrantLock
class Counter {public int nonSyncCount 0;public volatile int volatileCount 0;public int synchronizedCount 0;public AtomicInteger atomicCount new AtomicInteger(0);public int lockCount 0;private final ReentrantLock lock new ReentrantLock();public void nonSyncIncrease() {nonSyncCount;}public void volatileIncrease() {volatileCount;}public synchronized void synchronizedIncrease() {synchronizedCount;}public void atomicIncrease() {atomicCount.incrementAndGet();}public void lockIncrease() {lock.lock();lockCount;lock.unlock();}
}public static void main(String[] args) throws InterruptedException {Counter counter new Counter();// 创建10个线程每个线程调用5个计数值的自增方法for (int i 0; i 10; i) {new Thread(() - {for (int j 0; j 10000; j) {counter.nonSyncIncrease();counter.volatileIncrease();counter.synchronizedIncrease();counter.atomicIncrease();counter.lockIncrease();}}).start();}while (Thread.activeCount() 1) {Thread.yield();}System.out.println(counter.nonSyncCount); // 未同步结果随机System.out.println(counter.volatileCount); // 仅使用volatile修饰变量结果随机因为执行结果依赖于旧值System.out.println(counter.synchronizedCount); // 使用synchronized同步100000System.out.println(counter.atomicCount); // 使用原子变量100000System.out.println(counter.lockCount); // 使用显式锁100000}一 进程与线程
1 区别与联系
进程是系统分配资源的基本单位线程是 CPU 调度的基本单位进程和线程本质的区别是是否单独占有内存地址空间及其它系统资源比如I/O线程是轻量级进程进程在其执行的过程中可以产生多个线程 与进程不同的是属于同一进程的多个线程共享进程的堆和方法区资源 (JDK1.8 之后的元空间)但每个线程有自己的程序计数器、虚拟机栈和本地方法栈所以系统在产生一个线程或是在各个线程之间作切换工作时负担要比进程小得多 使用多线程而非多进程实现并发的优势 进程间的通信比较复杂而线程间的通信比较简单进程是重量级的而线程是轻量级的故多线程方式的系统开销更小
2 Java内存区域 线程私有程序计数器的目的是线程切换后能恢复到正确的执行位置线程私有虚拟机栈和本地方法栈的目的是保证线程中的 局部变量存放在栈帧中的局部变量表 不被别的线程访问到堆和方法区是所有 线程共享的资源其中堆是进程中最大的一块内存主要用于存放新创建的对象 (几乎所有对象都在这里分配内存)方法区主要用于存放已被加载的类信息、常量、静态变量、即时编译器编译后的代码等数据
3 线程组
线程组是一个树状的结构每个线程组下面可以有多个线程或者 线程组每个 Thread 必然存在于一个 ThreadGroup 中Thread 不能独立于 ThreadGroup 存在如果在 new Thread 时没有显式指定那么默认将父线程当前执行new Thread的线程线程组设置为自己的线程组ThreadGroup 是一个标准的向下引用的树状结构这样设计的原因是防止上级线程被下级线程引用而无法有效地被GC回收线程组可以起到统一控制线程的优先级和检查线程的权限的作用
4 线程的上下文切换
线程在执行过程中会有自己的运行条件和状态也称上下文比如上文所说到过的程序计数器栈信息等线程切换时需要保存当前线程的上下文留待线程下次占用 CPU 的时候恢复现场并加载下一个将要占用 CPU 的线程上下文上下文切换会使部分缓存失效 举例说明线程A切换到线程B 1.先挂起线程A将其在CPU中的状态保存在内存中 2.在内存中检索下一个线程B的上下文并将其在 CPU 的寄存器中恢复开始执行B线程 3.当B执行完根据程序计数器中指向的位置恢复线程A 5 并发与并行
并发 同一时间段多个任务都在执行 (单位时间内不一定同时执行可以来回切换)并行 单位时间内多个任务同时执行
6 线程的生命周期与状态 线程创建之后它将处于 NEW新建 状态调用 start() 方法后开始运行线程这时候处于 READY可运行 状态。可运行状态的线程获得了 CPU 时间片后就处于 RUNNING运行 状态。以上两种状态都属于 RUNNABLE当线程执行 wait() 或 join() 方法之后线程进入 WAITING等待 状态。进入等待状态的线程需要依靠其他线程的通知才能够返回到运行状态而 TIMED_WAITING(超时等待) 状态相当于在等待状态的基础上增加了超时限制比如通过 sleep(long millis)、join(long millis)方法或 wait(long millis)方法可以将 Java 线程置于 TIMED_WAITING 状态。当超时时间到达后 Java 线程将会返回到 RUNNABLE 状态当线程调用同步方法时在没有获取到锁的情况下线程将会进入到 BLOCKED阻塞 状态线程在执行 Runnable 的 run() 方法之后将会进入到 TERMINATED终止 状态 反复调用同一个线程的start()方法是否可行假如一个线程执行完毕此时处于TERMINATED状态再次调用这个线程的 start() 方法是否可行 两个问题的答案都是不可行因为 threadStatus 的值会改变调用 start() 的前提是 threadStatus0 此时再次调用 start() 方法会抛 IllegalThreadStateException异常 二 线程间的通信和同步
另参考 十一、同步工具类
1 线程同步锁synchronized / Lock
线程同步的根本目的是让线程按照一定的顺序执行多个线程同时访问的可变数据才需要通过锁保护每个共享、可变的变量都只由一个锁保护对于包含多个变量的不变性条件每个变量都由同一个锁保护
2 线程同步wait-notify
基于 Object 类的 wait() 方法和 notify()随机叫醒一个正在等待的线程 notifyAll()叫醒所有正在等待的线程 方法实现每个对象除了拥有用于锁的等待队列还有一个条件队列用于线程间的协作两个队列存放的都是线程对象wait() 和 notify() 只能在对应的 synchronized 代码块内被调用否则会抛出异常 对应指的是只能调用 synchronized 保护的对象的 wait() 和 notify()下面的例子中synchronized 保护的是子线程对象 this所以调用的也是 this 的 wait() 和 notify() 方法 调用 wait() 的具体过程 将当前线程放入条件等待队列释放对象锁阻塞等待线程状态变为 WAITING/TIMED_WAITING到达等待时间后或者被其它线程调用 notify()/notifyAll()从条件等待队列中移除尝试竞争对象锁 如果获得锁线程状态变为 RUNNABLE并从 wait() 调用中返回否则加入对象锁的等待队列线程状态变为 BLOCKED只有获得锁后才从wait() 调用中返回 从 wait() 返回后线程重新获得锁但不代表等待的条件一定成立需要重新检查条件 synchronized(obj) {while(条件不成立) {obj.wait();}满足条件后的操作
}调用 notify() 会唤醒条件队列的线程并将其移除但不会立刻释放锁只有所在的同步代码块执行完后才会释放锁被唤醒的线程获得锁后才会从 wait() 返回wait-notify DEMO public static void main(String[] args) throws InterruptedException {WaitThread waitThread new WaitThread();waitThread.start(); // 子线程启动Thread.sleep(2000L);waitThread.setFlagTrue(); // 主线程改变标志位}class WaitThread extends Thread {private volatile boolean flag false;Overridepublic void run() {System.out.println(sub thread running);try {synchronized (this) {// 子线程执行循环while (!flag) {wait(); // 子线程加入子线程对象的条件队列释放锁等待唤醒}// 直到主线程将flag修改为true并调用notify()唤醒子线程跳出循环...System.out.println(loop end);}} catch (InterruptedException ignored) {}}public synchronized void setFlagTrue() {// 主线程修改flag并唤醒在子线程对象的条件队列上的子线程flag true;System.out.println(main thread call notify());notify();}
}输出结果
sub thread running
main thread call notify()
loop end3 线程同步volatile 信号量
参见 volatile 部分
4 线程通信管道
JDK提供了 PipedWriter、 PipedReader、 PipedOutputStream、 PipedInputStream前面两个是基于字符的处理单元为2字节后面两个是基于字节流的处理单元为1字节 三 线程死锁
1 死锁的四个必要条件
互斥条件该资源任意一个时刻只由一个线程占用请求与保持条件一个进程因请求资源而阻塞时对已获得的资源保持不放不剥夺条件线程已获得的资源在未使用完之前不能被其他线程强行剥夺只有自己使用完毕后才释放资源循环等待条件若干进程之间形成一种头尾相接的循环等待资源关系
2 死锁解决预防死锁、避免死锁、检测与解除死锁
预防死锁 破坏请求与保持条件 一次性申请所有的资源或者使用超时机制主动释放锁破坏不剥夺条件 占用部分资源的线程进一步申请其他资源时如果申请不到可以主动释放它占有的资源破坏循环等待条件 获取一系列锁时按某一顺序申请资源反序释放 避免死锁——银行家算法检测与解除死锁
操作系统死锁
3 避免死锁
开放调用
利用 开放调用不将方法设置为 synchronized 避免同时持有多个锁但这种做法也削弱了原子性 这种方式适用于 业务无需原子性 的场景如果在持有锁的情况下调用某个外部方法就需要警惕可能出现死锁 示例车队和出租车 // 不使用开放调用class Taxi {private Position location;private Position destination;private Dispatcher dispatcher;public synchronized void setLocation(Position location) {this.location location;if (location.equals(destination)) {dispatcher.notifyAvailable(this); // 持有本车的锁请求车队的锁}}public synchronized Position getLocation() {return location;}}class Dispatcher {private SetTaxi taxis;private SetTaxi availableTaxis;public synchronized void notifyAvailable(Taxi taxi) {availableTaxis.add(taxi);}public synchronized void showAllLocations() {for (Taxi t : taxis) {Position location t.getLocation(); // 持有车队的锁获取车的锁// ...}}}// 使用开放调用class Taxi {private Position location;private Position destination;private Dispatcher dispatcher;public void setLocation(Position location) {boolean reach false;// 仅持有车的锁synchronized (this) {this.location location;if (location.equals(destination)) {reach true;}}if (reach) {// 仅持有车队的锁dispatcher.notifyAvailable(this);}}public synchronized Position getLocation() {return location;}}class Dispatcher {private SetTaxi taxis;private SetTaxi availableTaxis;public synchronized void notifyAvailable(Taxi taxi) {availableTaxis.add(taxi);}public void showAllLocations() {SetTaxi copy;// 仅持有车队的锁synchronized (this) {copy new HashSet(this.taxis);}for (Taxi t : copy) {// 仅持有车的锁Position location t.getLocation();// ...}}}限制锁顺序*
如果需要同时持有多个锁确保线程在获取多个锁的时候采用一致的顺序示例转账总是先获得ID更小的账户的锁
class LockByOrderDemo {/*** 加时锁只有两个转账账户无法比较时使用* 如果ID不重复则不会使用*/private static final Object lock new Object();public void transferMoney(Account from, Account to, double amount) {// 优先加锁id小的账户if (from.getId() to.getId()) {synchronized (from) {synchronized (to) {if (from.getBalance() 0) {from.decrease(amount);to.increase(amount);}}}} else if (from.getId() to.getId()) {synchronized (to) {synchronized (from) {if (from.getBalance() 0) {from.decrease(amount);to.increase(amount);}}}} else {// 如果重复则需要加时锁保证每次只有一个线程以未知的顺序获取两个账户的锁synchronized (lock) {synchronized (from) {synchronized (to) {if (from.getBalance() 0) {from.decrease(amount);to.increase(amount);}}}}}}
}四 并发编程的相关方法
1 sleep() 与 wait()
sleep() 方法没有释放锁而 wait() 方法释放了锁从 wait() 返回后线程又重新获得锁都可以暂停线程的执行wait() 通常被用于线程间交互/通信sleep() 通常被用于暂停执行wait() 无参方法被调用后线程不会自动苏醒需要别的线程调用同一个对象上的 notify() 或者 notifyAll() 方法且线程竞争到锁之后才能从方法中返回sleep() 方法执行完成后线程会自动苏醒wait() 可以指定时间也可以不指定而 sleep() 必须指定时间wait() 释放CPU资源同时释放锁sleep() 释放CPU资源但是不释放锁所以易死锁 为什么 sleep 函数的精度很低 sleep函数并不能起到定时的作用主要作用是延时在一些多线程中可能会看到sleep(0)其主要目的是让出时间片当系统越繁忙的时候它精度也就越低因为它的精度取决于线程自身优先级、其他线程的优先级以及线程的数量等因素 2 run() 与 start()
调用 start() 方法会启动一个线程并使线程进入了就绪状态当分配到时间片后就可以开始运行了。 start() 会执行线程的相应准备工作然后自动执行 run() 方法的内容实现多线程工作直接执行 run() 方法会把 run() 方法当成一个 main 线程下的普通方法去执行并不会在某个线程中执行它所以不是多线程工作
3 join()
在 a 线程中调用 b 线程的 join() 方法线程 a 进入阻塞状态直到线程 b 完全执行完线程 a 从阻塞状态中恢复如果主线程想等待子线程执行完毕后获得子线程中的处理完的某个数据就使用 join()
public static void main(String[] args) {Thread t new MyThread();thread.start();thread.join();// 主线程的其它功能...会在子线程执行完后开始执行
}4 yield()
静态方法通知操作系统调度器当前线程可以让出 CPU 时间片仅作为建议也可能被完全忽略
5 线程创建的四种方法
Runnable 接口不会返回结果或抛出检查异常Callable 接口可以如果任务不需要返回结果或抛出异常推荐使用 Runnable这样代码看起来会更加简洁
通过继承 Thread 类并重写 run() 方法使用时直接创建该类的对象
public class Demo {public static class MyThread extends Thread {Overridepublic void run() {System.out.println(MyThread);}}public static void main(String[] args) {Thread myThread new MyThread();myThread.start();}
}实现 Runnable 接口并重写 run() 方法使用时将该类的对象作为参数传递到 Thread 类的构造方法中 这种方法不受单继承的限制更适合处理多线程共享数据的情况
public class Demo {public static class MyThread implements Runnable {Overridepublic void run() {System.out.println(MyThread);}}public static void main(String[] args) {new Thread(new MyThread()).start();}
}实现 Callable 接口并重写 call() 方法 call() 可以有返回值可以抛出异常被外面的操作捕获获取异常的信息支持泛型泛型的类型即返回值的类型
//1.创建一个实现Callable的实现类
class NumThread implements Callable{//2.实现call方法将此线程需要执行的操作声明在call()中Overridepublic Object call() throws Exception {int sum 0;for (int i 1; i 100; i) {if(i % 2 0){System.out.println(i);sum i;}}return sum;}
}public class ThreadNew {public static void main(String[] args) {//3.创建Callable接口实现类的对象NumThread numThread new NumThread();//4.将此Callable接口实现类的对象作为传递到FutureTask构造器中创建FutureTask的对象FutureTask futureTask new FutureTask(numThread);//5.将FutureTask的对象作为参数传递到Thread类的构造器中创建Thread对象并调用start()new Thread(futureTask).start();try {//6.获取Callable中call方法的返回值//get()返回值即为FutureTask构造器参数Callable实现类重写的call()的返回值。Object sum futureTask.get();System.out.println(总和为 sum);} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}}
}通过线程池工程中的唯一合法方法 提高响应速度减少创建新线程的时间降低资源消耗重复利用线程池中线程不需要每次都创建便于线程管理
public class ThreadPool {public static void main(String[] args) {//1. 提供指定线程数量的线程池ExecutorService service Executors.newFixedThreadPool(10);ThreadPoolExecutor service1 (ThreadPoolExecutor) service;//2.执行指定的线程的操作。需要提供实现Runnable接口或Callable接口实现类的对象service.execute(new NumberThread());//适合适用于Runnable// service.submit(Callable callable);//适合使用于Callable//3.关闭连接池service.shutdown();}}
6 获取与设置优先级、守护线程
Java 只是给操作系统一个优先级的 参考值线程最终在操作系统的调用顺序由操作系统的线程调度算法决定的 优先级获取thread_instance.getPriority()优先级设置setPriority(int LEVEL)默认5最高10最低1优先级越高先执行的 概率 更大线程组也具有优先级如果某个线程优先级大于线程所在线程组的最大优先级那么该线程的优先级被线程组的最大优先级取代守护线程默认的优先级比较低 避免使用线程优先级否则会增加平台依赖性所有的非守护线程都结束了守护线程也会自动结束直接抛弃不会执行 finally 块新创建的线程会继承创建者的守护状态因此默认主线程创建的所有线程都是非守护线程
五 synchronized
详细内容
1 简介
Java 多线程的锁都是基于对象的Java中的每一个对象都可以作为一个锁阻塞、悲观CAS 非阻塞、乐观解决多个线程之间访问资源的同步性 保证被它修饰的方法或者代码块在任意时刻只能有一个线程执行线程间互斥同时保证内存可见性同一个锁同步的不同线程看到的都是共享变量的最新值 Happens-Before 上图展示了两个线程使用同一个锁时它们之间的 Happens-Before 关系JMM为程序中的所有操作定义的偏序关系要保证执行操作B的线程看到操作A的结果无论A和B是否由同一个线程执行A和B之间必须满足 Happens-Before 关系如果不满足则JVM可以对操作A和B进行任意重排序 在 Java 早期版本中synchronized 属于 重量级锁效率低下 因为操作系统实现线程之间的切换时需要从用户态转换到内核态通过 trap 指令
2 使用方法
修饰实例方法锁定实例作用于当前对象实例加锁进入同步代码前要获得当前对象实例的锁
synchronized void method() {// TODO
}修饰静态方法锁定类给当前类加锁会作用于类的所有对象实例 进入同步代码前要获得当前 class 的锁。如果一个线程 A 调用一个实例对象的非静态 synchronized 方法而线程 B 需要调用这个实例对象所属类的静态 synchronized 方法是允许的不会发生互斥现象因为访问静态 synchronized 方法占用的锁是当前类的锁而访问非静态 synchronized 方法占用的锁是当前实例对象锁。
synchronized static void method() {// TODO
}修饰代码块指定锁定类型指定加锁对象对给定对象/类加锁 synchronized(this|object) 表示进入同步代码库前要获得给定对象的锁synchronized(类.class) 表示进入同步代码前要获得 当前 class 的锁
synchronized(this) {// TODO
}3 注意事项
不要使用 synchronized(String a)因为字符串常量池具有缓存功能构造方法不能使用 synchronized 关键字修饰。因为构造方法本身就属于线程安全的 六 volatile
1 作用
禁止 JVM 的指令重排 指令重排可以保证串行语义一致但是没有义务保证多线程间的语义也一致 指示 JVM变量是共享且不稳定的每次使用它都到主存中进行读取保证变量的可见性并在使用完毕后写回内存 在当前的 Java 内存模型下线程可以把变量保存本地内存比如机器的寄存器CPU cache中而不是直接在主存中进行读写可能造成一个线程在主存中修改了一个变量的值而另外一个线程还继续使用它在寄存器中的变量值的拷贝造成数据的不一致
2 双重校验锁实现 单例模式线程安全
public class Singleton {private volatile static Singleton uniqueInstance; // 对象实例需要用volatile修饰private Singleton() { // 构造方法设置为private}public static Singleton getUniqueInstance() { // 创建实例//先判断对象是否已经实例过没有实例化过才进入加锁代码if (uniqueInstance null) {//类对象加锁保证只能创建一个实例synchronized (Singleton.class) {if (uniqueInstance null) {uniqueInstance new Singleton();// 执行过程// 1.为 uniqueInstance 分配内存空间// 2.初始化 uniqueInstance// 3.将 uniqueInstance 指向分配的内存地址}}}return uniqueInstance;}
}必须使用 volatile 关键字的原因 由于 JVM 具有指令重排的特性uniqueInstance new Singleton() 执行顺序有可能变成注释中的 1-3-2。指令重排在单线程环境下不会出现问题但是在多线程环境下会导致一个线程获得还没有初始化的实例仅仅是刚分配了内存空间线程 T1 执行了 1 和 3此时 T2 调用 getUniqueInstance() 后发现 uniqueInstance 不为空因此返回 uniqueInstance但此时 uniqueInstance 还未被初始化使用 volatile 禁止 JVM 的指令重排保证在多线程环境下也能正常运行
3 与 synchronized 的关系
synchronized 关键字和 volatile 关键字是互补而非对立的关系volatile 关键字是线程同步的 轻量级 实现性能比 synchronized 关键字好volatile 关键字只能用于变量仅仅保证对单个volatile变量的读/写具有原子性而 synchronized 关键字可以修饰方法以及代码块volatile 保证变量在多个线程之间的 可见性 而 synchronized 可以保证 变量的可见性 和 操作的原子性
4 volatile 的实现——内存屏障*
屏障类型作用LoadLoad对于语句 Load1; LoadLoad; Load2在 Load2 及后续读取操作要读取的数据被访问前保证 Load1 要读取的数据被读取完毕StoreStore对于语句 Store1; StoreStore; Store2在 Store2 及后续写入操作执行前保证 Store1 的写入操作对其它处理器可见LoadStore对于语句 Load1; LoadStore; Store2在 Store2 及后续写入操作被刷出前保证 Load1 要读取的数据被读取完毕StoreLoad对于语句 Store1; StoreLoad; Load2在 Load2 及后续所有读取操作执行前保证 Store1 的写入对所有处理器可见
JVM 的实现会在 volatile 读写前后均加上内存屏障在一定程度上保证有序性内存屏障的保守策略 在每个 写入操作前 插入 StoreStore 屏障写入操作后 插入 StoreLoad 屏障 在每个 读操作后 插入 LoadLoad 屏障和 LoadStore 屏障 5 使用 volatile 必须满足的条件*
变量修改后的值不依赖原值或能够确保同一时间只有单一线程修改变量值变量不需要和其它状态变量共同构成不变性约束
违反上述条件时证明操作变量同时需要满足原子性因此仅 volatile 不满足线程安全的要求需要加锁
class VolatileThreadUnsafeDemo {private volatile int count 0;private void increase() {count; // 新值依赖原值违背了条件1}public void test() {int threadCount 20;for (int i 0; i threadCount; i) {new Thread(() - {for (int j 0; j 10000; j) {increase();}}).start();}while (Thread.activeCount() 1) {Thread.yield();}System.out.println(count); // 结果是随机的}
}七 ReentrantLock
该部分的参考 详细内容
1 可重入锁
可重入锁指的是一个线程能够对一个临界资源重复加锁AQS 有一个变量 state 用于记录同步状态 初始情况下state 0表示 ReentrantLock 目前处于解锁状态如果有线程调用 lock() 方法进行加锁state 1如果该线程再次调用 lock() 方法加锁就执行 state线程每调用一次 unlock() 方法释放锁会让 state--通过查询 state 的数值即可知道 ReentrantLock 被重入的次数了 Demo 现在有方法 m1 和 m2两个方法均使用了同一把锁对方法进行同步控制同时方法 m1 会调用 m2线程 t 进入方法 m1 成功获得了锁此时线程 t 要在没有释放锁的情况下调用 m2 方法由于 m1 和 m2 使用的是同一把可重入锁所以线程 t 可以进入方法 m2并再次获得锁而不会被阻塞住假如 lock 是不可重入锁那么上面的示例代码必然会引起死锁情况的发生
void m1() {lock.lock();try {// 调用 m2因为可重入所以并不会被阻塞m2();} finally {lock.unlock()}
}void m2() {lock.lock();try {// do something} finally {lock.unlock()}
}2 和 synchronized 异同
都是基于互斥同步的方法通过互斥达到同步synchronized 使用的是对象或类进行加锁而 ReentrantLock 内部是通过 AQSAbstractQueuedSynchronizer 中的同步队列进行加锁公平与非公平指的是线程获取锁的方式 公平模式下线程在同步队列中通过 FIFO 的方式获取锁每个线程最终都能获取锁缺点是效率低在非公平模式下线程会通过“插队”的方式去抢占锁抢不到的则进入同步队列进行排队缺点是可能出现线程饥饿
class Window implements Runnable{private int ticket 100;//1.实例化ReentrantLockprivate ReentrantLock lock new ReentrantLock();Overridepublic void run() {while(true){try{//2.调用锁定方法lock()lock.lock();if(ticket 0){try {Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(Thread.currentThread().getName() 售票票号为 ticket);ticket--;}else{break;}}finally {//3.调用解锁方法unlock()lock.unlock();}}}
}八 并发容器
1 ConcurrentHashMap
数据结构
类型数据结构使用的锁ConcurrentHashMap JDK1.7Segment 数组 HashEntry 数组 链表Segment本质是 ReentrantLock每次锁若干 HashEntryConcurrentHashMap JDK1.8Node 数组 链表/红黑树synchronized每次锁一个 NodeHashtable数组链表synchronized每次锁全表
在 JDK1.7 的时候ConcurrentHashMap 采用分段锁机制对整个桶数组进行了分割分段Segment每个 Segment 都是一个可重入锁每一个 Segment 只锁容器其中一部分数据多线程访问容器里不同数据段的数据不会存在锁竞争提高并发访问率
static class SegmentK,V extends ReentrantLock implements Serializable {...}JDK1.8 的时候已经摒弃了 Segment 的概念synchronized 只锁定当前链表或红黑二叉树的首节点并发控制使用 synchronized 和 CAS 来操作虽然在 JDK1.8 中还能看到 Segment 的数据结构但是已经简化了属性只是为了兼容旧版本 Hashtable(同一把锁) 使用 synchronized 来保证线程安全效率非常低下 当一个线程访问同步方法时其他线程也访问同步方法可能会进入阻塞或轮询状态如使用 put 添加元素另一个线程不能使用 put 添加元素也不能使用 get竞争会越来越激烈效率越低
扩容问题**
扩容需要解决的问题
创建新数组的时候只能由一个线程创建拷贝数据的时候已经拷贝过的数据不能重复拷贝拷贝数据的时候一个桶只能由一个线程负责多个线程如何协作
扩容流程
计算每个CPU核一轮处理桶的个数最小是16修改 transferIndex 标志位每个线程领取完任务就减去多少领取完任务之后就开始处理如果桶为空就设置为 ForwardingNode如果不为空就加锁拷贝拷贝完成之后也设置为 ForwardingNode 节点如果某个线程分配的桶处理完了之后再去申请发现 transferIndex 0这个时候就说明所有的桶都领取完了但是别的线程领取任务之后有没有处理完并不知道该线程会将 sizeCtl 的值减1最后一个线程处理完发现 sizeCtl - 2 rs RESIZE_STAMP_SHIFT 此时的 sizeCtl 高16位是本次扩容的标识低16位值为2即只有当前线程正在参与扩容才会将旧数组用新数组覆盖并且会重新设置 sizeCtl 的值为0.75n作为新数组的扩容阈值
sizeCtl
private transient volatile int sizeCtl;用于标识 ConcurrentHashMap 的状态sizeCtl 的高16位是标志位每一轮扩容生成的一个唯一的标志低16位等于 参与扩容的线程数1如果最后一个帮助扩容的线程在结束时发现 sizeCtl - 2 rsRESIZE_STAMP_SHIFT(值为16) 说明所有参与扩容的线程都执行完即扩容完毕rs 是本次扩容的标志
sizeCtl含义-1正在初始化0.75n正常状态代表扩容阈值其它负值有其他线程正在扩容
为什么在扩容时为负值因为 sizeCtl 最初是 rs 带符号左移16位的结果它的符号位为1所以数值为负
transferIndex
扩容时用于标志下一个线程要处理的桶的开始位置每个线程每次领取指定数目的桶进行数据的拷贝在刚开始扩容时值为旧数组的长度比如初始大小是 transferIndex table.length 64每个线程领取的桶个数是16第一个线程领取完任务后 transferIndex 48也就是说第二个线程这时进来是从第48个桶开始处理再减去16依次类推这就是多线程协作处理的原理
ForwardingNode
在旧数组中起标记作用表示其他线程正在扩容并且此节点已经扩容完毕关联了新数组 nextTable扩容期间可以通过 find 方法访问已经迁移到了 nextTable 中的数据 2 写时复制CopyOnWriteArrayList / CopyOnWriteArraySet
线程安全以优化读为目标牺牲了写的性能适用于读多写少的场景CopyOnWriteArrayList基于 ReentrantLock 实现CopyOnWriteArraySet 基于 CopyOnWriteArrayList 实现读读、读写可以并行但多个线程不能同时写每个写操作需要先获得锁写时复制对一块内存进行修改时不在原有内存块中进行写操作而是将内存拷贝一份在新的内存中进行写操作写完之后将指向原来内存指针指向新的内存整个过程是原子的原来的内存就可以被回收掉了
3 并发队列
无锁非阻塞队列ConcurrentLinkedQueue、ConcurrentLinkedDeque 通过 CAS 操作实现非阻塞基于链表实现 普通阻塞队列ArrayBlockingQueue、LinkedBlockingQueue、LinkedBlockingDeque 都实现了 BlockingQueue 接口适合用于作为数据共享的通道都基于 ReentrantLock 和 Condition 实现广泛使用在 “生产者-消费者” 问题中其原因是 BlockingQueue 提供了可阻塞的插入和移除的方法当队列容器已满生产者线程会被阻塞直到队列未满当队列容器为空时消费者线程会被阻塞直至队列非空时为止 优先级阻塞队列PriorityBlockingQueue延时阻塞队列DelayQueue其它阻塞队列SynchronousQueue 没有存储元素的空间如果没有其它线程在等待从队列中接受元素put 操作就会等待take 操作同样需要等待其它线程向队列中放元素如果没有也会等待
4 基于跳表ConcurrentSkipListMap / ConcurrentSkipListSet
关于跳表 跳表基于链表在链表的基础上增加了多层索引的结构高层的索引节点一定同时是底层的索引节点每个索引节点有两个指针一个向右指向同层的索引节点另一个指向下层的索引节点或基本链表节点每层的节点有 1/2 的概率成为更高一层的节点基于此可以实现类似二分查找的操作跳表内所有的元素都是排序的对跳表进行遍历会得到有序的结果适用于数据需要有序的环境 ConcurrentSkipListMap / ConcurrentSkipListSet 数据有序没有使用锁所有操作都是非阻塞的包括写 九 异步任务执行服务、线程池
参考链接
0 基本接口和类 提交任务的方法
接口功能Runnable, Callable要执行的异步任务Executor, ExecutorService执行服务Future异步执行的结果包含返回值或异常Executor 是一个顶层接口在它里面只声明了一个方法 execute(Runnable)返回值为 void参数为 Runnable 类型ExecutorService 接口继承了 Executor 接口并声明了一些方法submit、invokeAll、invokeAny以及shutDown等抽象类 AbstractExecutorService 实现了 ExecutorService 接口基本实现了 ExecutorService 中声明的所有方法 execute(task) 异步执行无返回值submit(task) 异步执行返回 Future 对象可通过 task.get() 同步到主线程invoke(task) 一直阻塞到任务执行完成返回
1 为什么使用线程池
创建/销毁线程需要消耗系统资源线程池可以 复用已创建的线程控制并发的数量并发数量过多可能会导致资源消耗过多从而造成服务器崩溃统一管理线程通过线程工厂可以指定创建线程的行为如命名只有提交给线程池的任务是 同类型 且 相互独立 时线程池的性能才能达到最佳 如果将运行时间较长和较短的任务提交给一个线程池可能会产生饥饿如果一些任务依赖于其它任务的执行结果可能会产生死锁
2 ThreadPoolExecutor 构造方法
参数含义说明是否必须int corePoolSize线程池中核心线程数最大值一旦创建核心线程默认情况下会一直存在于线程池中核心线程不会预先创建有任务时才会创建非核心线程如果长时间的闲置就会被销毁是int maximumPoolSize线程池中线程总数最大值核心线程数量 非核心线程数量是long keepAliveTime非核心线程闲置超时时长非核心线程如果处于闲置状态超过该值就会被销毁。如果设置 allowCoreThreadTimeOut(true)则会也作用于核心线程是TimeUnit unitkeepAliveTime 的单位枚举类型是BlockingQueue workQueue阻塞队列维护着等待执行的 Runnable 任务对象下面补充说明是ThreadFactory threadFactory线程创建工厂用于批量创建线程统一在创建线程时设置一些参数如是否守护线程、线程的优先级等。如果不指定会新建一个默认的线程工厂否RejectedExecutionHandler handler拒绝处理策略线程数量大于最大线程数就会采用拒绝处理策略下面补充说明否常用的阻塞队列 LinkedBlockingQueue底层数据结构是链表默认大小是 Integer.MAX_VALUE也可以指定大小ArrayBlockingQueue底层数据结构是数组需要指定队列的大小SynchronousQueue同步队列容量为0要将一个任务放到该队列中必须有一个线程等待接收这个任务否则执行拒绝策略DelayQueue延迟队列该队列中的元素只有当其指定的延迟时间到了才能够从队列中获取到该元素 只有任务相互独立时设置最大线程数、最大队列长度才是合理的否则可能会出现死锁如果任务之间存在依赖应该使用无界的线程池 有关拒绝处理策略 ThreadPoolExecutor.AbortPolicy默认策略丢弃任务并抛出 RejectedExecutionException 异常ThreadPoolExecutor.DiscardPolicy丢弃新来的任务但是不抛出异常ThreadPoolExecutor.DiscardOldestPolicy丢弃队列头部也就是最旧的如果是优先队列则丢弃优先级最高的任务然后重新尝试执行程序如果再次失败重复此过程ThreadPoolExecutor.CallerRunsPolicy由主线程处理该任务此时主线程暂停向队列中提交新任务 ThreadPoolExecutor.CallerRunsPolicy 在执行时主线程处于忙碌状态新到达的请求会保存在 TCP 队列中如果持续下去可能造成 TCP 队列丢弃请求这种现象最终会蔓延到客户端导致性能降低
3 ThreadPoolExecutor 的状态 扩展
状态说明RUNNING线程池创建后处于 RUNNING 状态SHUTDOWN调用 shutdown() 方法后处于 SHUTDOWN 状态线程池不能接受新的任务正在执行的线程不中断完成阻塞队列的任务存疑STOP调用 shutdownNow() 方法后处于 STOP 状态线程池不能接受新的任务中断所有线程阻塞队列中没有被执行的任务全部丢弃。此时工作线程全部停止阻塞队列为空TIDYING当所有的任务已终止线程池会变为 TIDYING 状态接着会执行 terminated() 函数TERMINATED线程池处在 TIDYING 状态时并且执行完 terminated() 方法之后 线程池被设置为 TERMINATED 状态
class ThreadPoolExecutorExtension extends ThreadPoolExecutor {public ThreadPoolExecutorExtension(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,BlockingQueueRunnable workQueue, ThreadFactory threadFactory,RejectedExecutionHandler handler) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);}Overrideprotected void beforeExecute(Thread t, Runnable r) {// 任务执行前...}Overrideprotected void afterExecute(Runnable r, Throwable t) {// 任务执行后...}Overrideprotected void terminated() {// 线程池关闭...}
}4 ThreadPoolExecutor 任务处理流程 线程总数量 corePoolSize无论当前已经创建的线程是否空闲都会新建一个核心线程执行任务在核心线程数量 corePoolSize 时让核心线程数量快速达到 corePoolSize线程总数量 corePoolSize 时新来的线程任务会进入任务队列中等待然后空闲的核心线程会依次去缓存队列中取任务来执行线程复用。在加入队列前后都进行线程池状态是否是 RUNNING 的检查当缓存队列满了说明这个时候任务非常多创建非核心线程去执行这个任务缓存队列满了 且总线程数达到了 maximumPoolSize则会采取拒绝策略进行处理 为什么在步骤2中要二次检查线程池的状态 在多线程的环境下线程池的状态是时刻发生变化的有可能刚获取线程池状态后线程池状态就改变了。判断是否将 command 加入workqueue 是线程池之前的状态倘若没有二次检查万一线程池处于非 RUNNING 状态在多线程环境下很有可能发生那么 command 永远不会执行类似于单例模式的双重校验 5 线程池大小的设置
假设机器有N个CPU 对于计算密集型任务计算需要占用CPU应该设置线程数为 N1。多设置一个线程的目的是计算密集型的线程恰好在某时因为发生一个页错误或者因其他原因而暂停有一个“额外”的线程可以确保在这种情况下CPU周期不会中断工作对于I/O密集型任务I/O不占用CPU应该设置线程数为 2N对于同时有计算工作和I/O工作的任务考虑使用两个线程池分别处理两种任务
6 ForkJoinPool
参考链接
采用分治方法处理任务 每个线程对应一个双端队列 Deque 用于存放任务并采用工作窃取机制 构造函数参数
参数含义parallelism线程池最大线程数量和该参数值有关系但不是绝对的关联有依据但并不全由它决定factory线程创建工厂handler线程因未知异常而终止的回调处理执行的任务中出现异常并从任务中被抛出时会被 handler 捕获asyncMode当设置为 ture 的时候队列采用先进先出方式反之则是采用后进先出的方式默认false参考上图
提交任务的两种方式RecursiveAction类比Runnable、RecursiveTaskV类比CallableV
class ForkJoinPoolDemo {private static final ForkJoinPool forkJoinPool new ForkJoinPool();public static void addTask() {// 无返回值的任务 类比RunnableRecursiveAction recursiveAction new RecursiveAction() {SneakyThrowsOverrideprotected void compute() {Thread.sleep(5000);System.out.println(recursiveAction Thread.currentThread());}};// 有返回值的任务 类比CallableRecursiveTaskInteger recursiveTask new RecursiveTaskInteger() {SneakyThrowsOverrideprotected Integer compute() {Thread.sleep(5000);System.out.println(recursiveTask Thread.currentThread());return 1;}};// execute: 异步无返回值forkJoinPool.execute(recursiveTask);forkJoinPool.execute(recursiveAction);// submit: 异步返回Future需要调用get同步结果ForkJoinTaskInteger taskSubmit forkJoinPool.submit(recursiveTask);ForkJoinTaskVoid actionSubmit forkJoinPool.submit(recursiveAction);try {Integer res taskSubmit.get();actionSubmit.get(); // Void} catch (InterruptedException | ExecutionException e) {// ...}// invoke: 阻塞并直接返回结果Integer taskInvoke forkJoinPool.invoke(recursiveTask);forkJoinPool.invoke(recursiveAction);}
}7 按完成顺序获取任务返回结果
如果向 Executor 提交一组任务并在计算完成后获得结果例如每下载一张图片就执行一次渲染而非下载完所有图片再渲染 使用 ThreadPoolExecutor.submit(task) 并对返回值轮询调用 future.get() 效率低使用 ThreadPoolExecutor.invoke(task) 会阻塞直到返回结果同样效率低使用 CompletionService.take() 可以在部分任务完成后按完成的先后顺序获取任务 future ExecutorCompletionService 是 CompletionService 的实现将计算任务委托给线程池执行
class CompletionServiceDemo {/*** 线程池*/private final Executor executor Executors.newFixedThreadPool(4);public void test() {CompletionServiceInteger completionService new ExecutorCompletionService(executor);FutureInteger future5000 completionService.submit(new CustomTask(5000));FutureInteger future2000 completionService.submit(new CustomTask(2000));FutureInteger future8000 completionService.submit(new CustomTask(8000)); for (int i 0; i 3; i) {FutureInteger future null;try {future completionService.take(); // 阻塞直到有任务完成按照任务完成的先后顺序获得结果System.out.println(future.get()); // 输出结果 2000, 5000, 8000} catch (InterruptedException | ExecutionException e) {// ...}}}static class CustomTask implements CallableInteger {private final int i;public CustomTask(int i) {this.i i;}Overridepublic Integer call() throws Exception {Thread.sleep(i);return i;}}
}8 线程池异常处理
通过自定义线程工厂的方式实现 在线程的 run() 方法中主动捕获异常设置未捕获异常的处理器 Thread.UncaughtExceptionHandler被动处理 Thread.UncaughtExceptionHandler 仅在使用 execute(task) 向线程池提交任务时才生效 submit(task) 会在调用其返回值的 get() 方法时将异常包装为 ExecutionException 后重新抛出
/**
* 自定义线程工厂
**/
class MyThreadFactory implements ThreadFactory {private final ThreadGroup group;private final AtomicInteger threadNumber new AtomicInteger(1);private final String namePrefix;public MyThreadFactory(String namePrefix) {SecurityManager s System.getSecurityManager();group (s ! null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();this.namePrefix namePrefix -thread-;}Overridepublic Thread newThread(Runnable r) {Thread t new Thread(group, r, namePrefix threadNumber.getAndIncrement(), 0) {Overridepublic void run() {// 1.捕获未受检异常例如RuntimeException主动// 不被捕获的未受检异常将使线程终结Throwable throwable null;try {super.run();} catch (Throwable t) {throwable t;} finally {// 处理策略可以新建线程代替当前线程也可以在活跃线程数足够多的时候不新建线程}}};// 2.处理未捕捉的异常被动// 仅在通过execute方法提交任务时才会用该方式处理异常// 使用submit提交的任务会在其返回值调用future.get()时将异常包装为ExecutionException后重新抛出t.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {Overridepublic void uncaughtException(Thread t, Throwable e) {System.out.println(do something to handle uncaughtException);}});return t;}
}9 获取执行结果Future 一般用于执行时间较长的任务Future 可以把计算结果从执行计算的线程传递到获取计算结果的线程也可以在任务执行中取消调用 future.get() 时行为取决于任务的状态 已完成立即返回结果或抛出异常 Callable 对象内部的异常会被包装成 ExecutionException 并抛出可以通过 e.getCause() 获得包装前的初始异常 未完成阻塞直到任务完成或抛出异常抛出的异常包含超时被取消抛出 CancellationException 对执行完成的 Future 调用 cancel() 不会产生影响RunnableFuture 同时拓展了 Runnable作为执行的任务和 Future作为任务执行的结果
public class Solution {public static void main(String[] args) {// 创建Callable对象CallableInteger callable () - {if (System.currentTimeMillis() % 2 0) {throw new MyException(); // 自定义异常} else {Thread.sleep(2000);return 1;}};// 创建FutureTask对象FutureTaskInteger futureTask new FutureTask(callable);// 将FutureTask对象交付线程并执行Thread thread new Thread(futureTask);thread.start();try {futureTask.get(10, TimeUnit.SECONDS);// 保持阻塞直到 1.执行完成 2.抛出异常包含中断、超时、Callable内部checked异常、RuntimeException、Error} catch (TimeoutException e) {// 超时...} catch (InterruptedException e) {// 中断...} catch (ExecutionException e) {// Callable内部异常被包装后重新抛出if (e.getCause() instanceof MyException) {System.out.println(Callable Inner Exception, thrown by futureTask.get());}} finally {// 1.如果任务已经结束执行cancel不会产生任何影响// 2.如果任务正在运行将会被中断futureTask.cancel(true);}}
}实例 Memorizer
如果输入参数不在缓存中则执行计算否则查询缓存并返回结果
class ComputeCacheK, V {/*** 因为计算过程较长Value使用Future*/private final ConcurrentHashMapK, FutureV cache new ConcurrentHashMap();private V compute(K args) {while (true) {FutureV result cache.get(args);if (result null) {// 缓存中无该值执行计算CallableV callable new CallableV() {Overridepublic V call() throws Exception {// take a long time// Thread.sleep(100000);return null;}};FutureTaskV futureTask new FutureTask(callable);result cache.putIfAbsent(args, futureTask); // 保证查询-放入操作是原子的避免了put可能被多个线程同时执行if (result null) { // 加入成功开始执行计算result futureTask;futureTask.run();}}try {// 阻塞直到拿到计算结果return result.get();} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}}}
}十 同步工具类
均基于 AQS 实现
1 CountDownLatch
参与的线程有不同的角色有的负责倒计时有的负责等待计数器变为0且角色都可以有多个不能重复使用await() 解除阻塞的情况 计数值为0中断超时
/** 使用示例等待线程池的任务全部执行完成 **/
public final void run(ListRuleDO rules) {CountDownLatch countDownLatch new CountDownLatch(taskList.size());for (ScanTask task : taskList) {// 向线程池加入任务threadPoolExecutor.execute(() - {doScan(task, rules);// 计数值-1countDownLatch.countDown();});}try {// 在此阻塞直到计数值为0或发生中断或超时countDownLatch.await();} catch (InterruptedException e) {// ...}
}2 循环栅栏 CyclicBarrier
和 CountDownLatch 不同的是参与的线程角色相同且可以重复使用构造方法参数 线程数必选Runnable 类型所有线程到达后执行的动作由最后一个到达的线程执行可选 static void CyclicBarrierTest() {int subThreadNum 5;// 所有子线程到达后执行的行为由最后一个到达的子线程完成Runnable commonAction new Runnable() {Overridepublic void run() {System.out.println(all sub threads terminated, print by thread Thread.currentThread().getName());}};// 构造CyclicBarrierCyclicBarrier cyclicBarrier new CyclicBarrier(subThreadNum, commonAction);for (int i 0; i subThreadNum; i) {new Thread(() - {System.out.println(round1: sub thread Thread.currentThread().getName() terminated);try {cyclicBarrier.await(); // 线程宣布到达} catch (InterruptedException | BrokenBarrierException e) {// ...}System.out.println(round2: sub thread Thread.currentThread().getName() terminated);try {cyclicBarrier.await(); // 线程宣布到达} catch (InterruptedException | BrokenBarrierException e) {// ...}}).start();}}执行结果
round1: sub thread Thread-1 terminated
round1: sub thread Thread-0 terminated
round1: sub thread Thread-3 terminated
round1: sub thread Thread-2 terminated
round1: sub thread Thread-4 terminated
all sub threads terminated, print by thread Thread-4
round2: sub thread Thread-4 terminated
round2: sub thread Thread-3 terminated
round2: sub thread Thread-0 terminated
round2: sub thread Thread-1 terminated
round2: sub thread Thread-2 terminated
all sub threads terminated, print by thread Thread-23 信号量 Semaphore
用于控制同时访问某个资源的操作数量例如实现资源池、对容器添加容量限制用于有增有减的场景CountDownLatch 用于仅减的场景设置 permits1 时和一般的锁仍有区别 不可重入一般锁只能由持有锁的线程释放Semaphore 可以由任意线程释放 常用方法
acquire()
获取一个令牌在获取到令牌、或者被其他线程调用中断之前线程一直处于阻塞状态
acquire(int permits)
获取一个令牌在获取到令牌、或者被其他线程调用中断、或超时之前线程一直处于阻塞状态acquireUninterruptibly()
获取一个令牌在获取到令牌之前线程一直处于阻塞状态忽略中断tryAcquire()
尝试获得令牌返回获取令牌成功或失败不阻塞线程
tryAcquire(long timeout, TimeUnit unit)
尝试获得令牌在超时时间内循环尝试获取直到尝试获取成功或超时返回不阻塞线程
release()
释放一个令牌唤醒一个获取令牌不成功的阻塞线程
hasQueuedThreads()
等待队列里是否还存在等待线程
getQueueLength()
获取等待队列里阻塞的线程数
drainPermits()
清空令牌把可用令牌数置为0返回清空令牌的数量
availablePermits()
返回可用的令牌数量。使用示例使用信号量控制任务向线程池的提交速率
class LimitTaskCommitWithSemaphoreDemo {private ExecutorService executorService; // 线程池拒绝策略 AbortPolicyprivate final Semaphore semaphore new Semaphore(100); // 信号量上界 最大线程数 队列最大长度public void addTask(Runnable task) throws InterruptedException {semaphore.acquire(); // 信号量-1try {executorService.execute(() - {task.run();semaphore.release(); // 执行完毕信号量1});} catch (RejectedExecutionException e) {semaphore.release(); // 添加失败信号量1}}
}4 ThreadLocal
参考链接
ThreadLocal 类主要解决的就是让每个线程绑定自己的值这个值不能被其它线程访问到ThreadLocal 对象一般声明为 static 以便引用典型用途是提供上下文信息比如在一个 Web 服务器中一个线程执行用户请求会访问一些共同的信息请求信息、用户身份、数据库连接等这些是线程执行过程中的全局信息如果作为参数在不同代码间传递会很繁琐
public class RequestContext {public static class Request {...}private static ThreadLocalString userId new ThreadLocal();private static ThreadLocalRequest req new ThreadLocal();// gettersetter...
}ThreadLocalMap 每个 Thread 中都具备一个容器 ThreadLocalMap而 ThreadLocalMap 可以存储以 ThreadLocal 为 key ThreadLocal 的弱引用Object 对象为 value 的键值对ThrealLocal 类可以通过 Thread.currentThread() 获取到当前线程对象后直接通过 getMap(Thread t) 可以访问到该线程的 ThreadLocalMap 对象ThreadLocalMap 不使用拉链法解决哈希冲突而是向后探测如果先遇到了空位置则直接插入如果先遇到了 key 过期的数据则进行垃圾回收并替换 十一 线程中断
1 中断方法
interrupt() 仅是一种协作机制不会强制终止线程总结中断状态 true - false 的情况 调用静态方法 Thread.interrupted()抛出异常 InterruptedException例如对 WAITING/TIMED_WAITING 状态的线程调用 interrupt()线程执行完成
public class Thread {/*** 由*本线程对象*之外的线程调用作用为* 1.线程处于状态WAITING/TIMED_WAITING则抛出InterruptedException* 2.线程处于状态RUNNABLE/BLOCKED将中断标志位设置为true仅告知需要中断而不采取其它措施* 3.如果本线程对象处于状态NEW/TERMINATE则无动作不修改标志位**/public void interrupt() {...}/*** 查看*本线程对象*的中断标志位* 如果线程执行完成则返回false即使之前对它调用了 interrupt()**/public boolean isInterrupted() {...}/*** 静态方法查看*调用该方法的线程*的中断标志位并清除状态* 调用形式仅可以为 Thread.interrupted()同时会将中断状态更新为false**/public static boolean interrupted() {...}
}2 示例
Demo1仅有一个主线程
class ThreadInterruptDemo {protected void test() throws Exception {Thread.currentThread().interrupt(); // 将主线程的中断标志设为trueSystem.out.println(MainStatus after thread.interrupt(): Thread.currentThread().isInterrupted());boolean mainThreadInterruptStatus Thread.interrupted(); // 查看当前线程主线程的中断状态并将其重置为falseSystem.out.println(MainStatus return by Thread.interrupted(): mainThreadInterruptStatus);System.out.println(MainStatus after Thread.interrupted(): Thread.currentThread().isInterrupted());}
}返回结果
MainStatus after thread.interrupt(): true
MainStatus return by Thread.interrupted(): true
MainStatus after Thread.interrupted(): falseDemo2
class ThreadInterruptDemo {protected void test() throws Exception {Thread subThread new Thread(() - {System.out.println(Start: Thread.currentThread().isInterrupted());try {System.out.println(Before sleep: Thread.currentThread().isInterrupted());Thread.sleep(5000);} catch (InterruptedException e) {System.out.println(InterruptedException: Thread.currentThread().isInterrupted());// 捕获到InterruptedException后会将中断状态重置为false如果需要维持还需要重设为trueThread.currentThread().interrupt();}System.out.println(End: Thread.currentThread().isInterrupted());});subThread.start();subThread.interrupt(); // 如果子线程处于阻塞状态则抛出InterruptedException否则仅将子线程的中断状态设置为trueThread.sleep(10000); // 等待子线程执行完成System.out.println(SubThread end status: subThread.isInterrupted());}
}返回结果1
Start: false
Before sleep: false
---此时调用subThread.interrupt()---
InterruptedException: false // 发生了InterruptedException中断状态被重置为false
End: true // 在catch块里又将中断状态设置为true
---子线程执行完成---
SubThread end status: false返回结果2
---此时调用subThread.interrupt()---
Start: true
Before sleep: true
InterruptedException: false // 证明了线程先中断状态true再sleep也会抛出异常!!!
End: true
---子线程执行完成---
SubThread end status: false十二 性能与可伸缩性
1 相关概念
可伸缩性 定义 当增加计算资源CPU、内存、存储容量、I/O带宽等时程序的吞吐量或处理能力相应地增加首先保证程序正确然后再提高运行速度避免不成熟的优化所有并发程序中都会包含串行部分至少同步代码块是串行的Amdahl定律 程序的可伸缩性取决于必须串行执行的代码比例
2 多线程引入的开销
上下文切换 调度器划分合适的时间片大小将线程切换的开销分摊给执行时间提高整体吞吐量上下文切换会使缓存失效 内存同步 来源于 synchronized 和 volatile 保证的内存可见性同步会增加共享内存总线上的通信量而总线的带宽是有限的所有处理器都共享总线 阻塞 包含阻塞I/O、等待获取锁、在条件变量上等待等待获得锁的线程会被阻塞被阻塞的线程可以自旋等待或被操作系统挂起直到解除阻塞具体采用的方式和上下文切换的代价、阻塞时间有关被挂起的线程会引入两次切换被阻塞的线程在自己的时间片用完之前被换出在解除阻塞时由被换回
3 减少锁竞争
Java程序中的串行操作主要来源于独占方式的资源锁对于一个锁可以通过以下方式减少锁竞争 减少锁的持有时间减少锁的请求频率降低锁粒度、避免热点域用非独占锁或非阻塞锁代替独占锁