商城做网站,百度竞价推广点击软件,一个人做商城网站,深圳网站制作的公司排名简介
juc#xff0c;java.util.concurrent包的简称#xff0c;java1.5时引入。juc中提供了一系列的工具#xff0c;可以更好地支持高并发任务
juc中提供的工具
可重入锁 ReentrantLock
可重入锁#xff1a;ReentrantLock#xff0c;可重入是指当一个线程获取到锁之后java.util.concurrent包的简称java1.5时引入。juc中提供了一系列的工具可以更好地支持高并发任务
juc中提供的工具
可重入锁 ReentrantLock
可重入锁ReentrantLock可重入是指当一个线程获取到锁之后可以再次获取到当前锁。可重入锁一定程度上防止了死锁。
ReentrantLock提供的功能
可重入在获取到锁之后还可以再次获取这把锁可打断获取锁时的阻塞状态可以被interrupt方法打断可超时可以指定阻塞时长多条件变量synchronized只支持一个条件变量这里条件变量是指调用wait方法、notify方法的锁对象ReentrantLock可以实现在多个条件变量上等待和唤醒可以指定内部使用公平锁还是非公平锁默认使用非公平锁
ReentrantLock和synchronized都支持可重入但是synchronized没有ReentrantLock提供的其它功能
使用案例
案例1基本使用
10个线程同时对同一个int变量执行1000次加加确认结果是否正确。
private static final ReentrantLock LOCK new ReentrantLock();
private static int count 0;public static void main(String[] args) {ListThread list new ArrayList();for (int i 0; i 10; i) {Thread thread new Thread(() - {// 加锁LOCK.lock();try {for(int j 0; j 1000; j) {// 访问共享资源count;}} finally {// 释放锁LOCK.unlock();}});list.add(thread);thread.start();}for (Thread thread : list) {try {thread.join();} catch (InterruptedException e) {Thread.currentThread().interrupt();}}// 验证结果结果正确System.out.println(count);
}案例2属性方法
获取ReentrantLock的状态
private static final ReentrantLock LOCK new ReentrantLock();public static void main(String[] args) throws InterruptedException {// 等待锁的线程Thread thread2 new Thread(() - {LOCK.lock();try {for (int i 0; i 100000; i) {}} finally {LOCK.unlock();}}, t2);// 拥有锁的线程new Thread(() - {try {LOCK.lock();// 拥有锁的情况下锁的状态System.out.println(---拥有锁---);System.out.println(锁是否被某个线程持有 LOCK.isLocked()); // trueSystem.out.println(重入次数 LOCK.getHoldCount()); // 1System.out.println(锁是否被当前线程持有 LOCK.isHeldByCurrentThread()); // trueSystem.out.println(阻塞队列中是否有等待锁的线程 LOCK.hasQueuedThreads()); // trueSystem.out.println(线程2是否在阻塞队列中 LOCK.hasQueuedThread(thread2)); // trueSystem.out.println(阻塞队列的长度 LOCK.getQueueLength()); // 1System.out.println(锁是不是公平锁 LOCK.isFair()); // false// java.util.concurrent.locks.ReentrantLock6a53a7e9[Locked by thread t1]System.out.println(锁.toString方法 LOCK.toString()); Thread.sleep(200);} catch (InterruptedException e) {e.printStackTrace();Thread.currentThread().interrupt();} finally {LOCK.unlock();}}, t1).start();thread2.start();Thread.sleep(1000L);// 没有上锁的情况下ReentrantLock的状态System.out.println(---没有锁---);System.out.println(锁是否被某个线程持有 LOCK.isLocked()); // falseSystem.out.println(重入次数 LOCK.getHoldCount()); // 0System.out.println(锁是否被当前线程持有 LOCK.isHeldByCurrentThread()); // falseSystem.out.println(阻塞队列中是否有等待锁的线程 LOCK.hasQueuedThreads()); // falseSystem.out.println(线程2是否在阻塞队列中 LOCK.hasQueuedThread(thread2)); // falseSystem.out.println(阻塞队列的长度 LOCK.getQueueLength()); // 0System.out.println(锁是不是公平锁 LOCK.isFair()); // false// java.util.concurrent.locks.ReentrantLock6a53a7e9[Unlocked]System.out.println(锁.toString方法 LOCK.toString());
}案例3可超时
可以指定超时时间超过指定时长没有获取锁算失败
private static final ReentrantLock LOCK new ReentrantLock();// 测试tryLock方法
public static void main(String[] args) {// 先启动一个线程占用锁new Thread(() - {LOCK.lock();try {Utils.println(当前线程获取锁);Thread.sleep(5000);} catch (InterruptedException e) {Thread.currentThread().interrupt();} finally {Utils.println(当前线程释放锁);LOCK.unlock();}}).start();boolean tryLock false;try{tryLock LOCK.tryLock(4, TimeUnit.SECONDS);if (tryLock) {Utils.println(当前线程获取锁成功);} else {Utils.println(当前线程获取锁失败);}} catch (InterruptedException e) {Thread.currentThread().interrupt();} finally {if (tryLock) {Utils.println(当前线程释放锁);LOCK.unlock();}}
}案例4可打断
使用lockInterruptibly方法获取锁线程的等待状态可以被interrupt方法打断普通的lock方法则不会。
线程在等待状态下如果外部调用了线程对象的interrupt方法线程会结束等待状态如果是可打断地获取锁此时会抛出InterruptedException结束获取锁的操作然后需要用户处理这个异常。
private static final ReentrantLock LOCK new ReentrantLock();// 测试lockInterruptibly方法
public static void main(String[] args) throws InterruptedException {new Thread(() - {// 主线程先占住锁资源Utils.println(当前线程获取锁);LOCK.lock();try {try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}} finally {if (LOCK.isHeldByCurrentThread()) {Utils.println(当前线程释放锁);LOCK.unlock();}}}, 线程0).start();Thread.sleep(500);// 可打断地抢占锁资源Thread thread new Thread(() - {try {LOCK.lockInterruptibly();Utils.println(当前线程获取锁);} catch (InterruptedException e) {e.printStackTrace();Utils.println(当前线程被打断);} finally {if (LOCK.isHeldByCurrentThread()) {Utils.println(当前线程释放锁);LOCK.unlock();}}}, 线程1);thread.start();Thread.sleep(500);// 打断线程1在lockInterruptibly方法获取锁时调用线程的interrupt方法// lockInterruptibly方法会抛出InterruptedExceptionthread.interrupt();
}条件对象 Condition
Condition联合锁对象一起使用表示条件对象提供了类似于Object类中的wait、notify等方法的功能。
当调用Condition实例中的await、signal方法时如果当前线程没有持有锁资源则抛出非法监视器状态异常当线程调用Condition中的await方法时线程放弃锁资源进入等待列表如果在等待过程中被打断抛出中断异常
通过Condition可以支持在一个锁对象上操作多个条件变量
常用api
awaitvoid await() throws InterruptedException相当于Object类中的wait方法signalvoid signal()相当于Object类中的notify方法signalAllvoid signalAll()相当于Object类中的notifyAll方法
案例1多条件变量
类似于生产者/消费者模式只不过这个案例中有两个生产者、两个消费者它们一一对应。
private static final ListInteger list new ArrayList();
private static final ListInteger list2 new ArrayList();
private static final ReentrantLock LOCK new ReentrantLock();
private static final Condition condition LOCK.newCondition();
private static final Condition condition2 LOCK.newCondition();public static void main(String[] args) throws InterruptedException {// 消费者1new Thread(() - {while (true) {LOCK.lock();try {if (list.isEmpty()) {condition.await();} else {System.out.println(Thread.currentThread().getName() 消费数据 list);list.clear();}} catch (Exception e) {e.printStackTrace();throw new RuntimeException(e);} finally {LOCK.unlock();}try {Thread.sleep(1000L);} catch (InterruptedException e) {throw new RuntimeException(e);}}}, 消费者1).start();// 生产者1new Thread(() - {while (true) {LOCK.lock();try {if (list.isEmpty()) {list.addAll(createIntegerList(10));System.out.println(Thread.currentThread().getName() 生产数据 list);condition.signal();}} catch (Exception e) {e.printStackTrace();throw new RuntimeException(e);} finally {LOCK.unlock();}try {Thread.sleep(3000L);} catch (InterruptedException e) {throw new RuntimeException(e);}}}, 生产者1).start();// 消费者2new Thread(() - {while (true) {LOCK.lock();try {if (list2.isEmpty()) {condition2.await();} else {System.out.println(Thread.currentThread().getName() 消费数据 list2);list2.clear();}} catch (Exception e) {e.printStackTrace();throw new RuntimeException(e);} finally {LOCK.unlock();}try {Thread.sleep(1000L);} catch (InterruptedException e) {throw new RuntimeException(e);}}}, 消费者2).start();// 生产者2new Thread(() - {while (true) {LOCK.lock();try {if (list2.isEmpty()) {list2.addAll(createIntegerList(100));System.out.println(Thread.currentThread().getName() 生产数据 list2);condition2.signal();}} catch (Exception e) {e.printStackTrace();throw new RuntimeException(e);} finally {LOCK.unlock();}try {Thread.sleep(3000L);} catch (InterruptedException e) {throw new RuntimeException(e);}}}, 生产者2).start();
}public static ListInteger createIntegerList(int bound) {ListInteger list new ArrayList();Random random new Random();list.add(random.nextInt(bound));list.add(random.nextInt(bound));list.add(random.nextInt(bound));return list;
}总结这个案例比较粗糙只是演示了一个锁对象在支持多个条件变量的情况要注意如果是一个生产者对应多个消费者signal方法会唤醒等待队列中的第一个线程。
读写锁 ReentrantReadWriteLock
ReentrantReadWriteLock可重入的读写锁读锁和写锁使用同一个同步器读读不冲突读写冲突。写锁是互斥锁读锁是共享锁。如果同步队列中有写锁读锁会排在写锁之后
读写锁的使用规则
读读不冲突多个线程是可以同时获取读锁而不需要阻塞等待读写冲突一个线程获取了读锁那么其他的线程要获取写锁 需要等待同样的一个线程获取了写锁另外的想要获取读锁或者写锁都需要阻塞等待锁降级一个获取写锁的线程是可以在释放写锁之前再次获取读锁的这就是锁降级
案例一个使用读写锁来保护数据的容器
第一步数据容器
public class DataContainer {private Object data;private final ReentrantReadWriteLock LOCK new ReentrantReadWriteLock();private final ReentrantReadWriteLock.ReadLock READ_LOCK LOCK.readLock(); // 获取读锁private final ReentrantReadWriteLock.WriteLock WRITE_LOCK LOCK.writeLock(); // 获取写锁/*** 读取数据*/public Object read(){Utils.println(获取读锁...);READ_LOCK.lock();try{Utils.println(读取数据...);Thread.sleep(1000);return data;} catch (InterruptedException e) {e.printStackTrace();Thread.currentThread().interrupt();} finally {Utils.println(释放读锁...);READ_LOCK.unlock();}return null;}/*** 写入数据*/public void write(Object data){Utils.println(获取写锁...);WRITE_LOCK.lock();try{this.data data;Utils.println(写入数据...);} finally {Utils.println(释放写锁...);WRITE_LOCK.unlock();}}
}第二步测试读读不冲突
public static void main(String[] args) {DataContainer container new DataContainer();// 两个线程同时获取读锁读读不冲突new Thread(() - {Object read container.read();System.out.println(read read);}).start();new Thread(() - {Object read container.read();System.out.println(read read);}).start();
}加戳的读写锁 StampedLock
StampedLock自jdk8加入对于读写锁的进一步优化。它提供了一种乐观读技术读取完毕后需要做一次戳校验如果校验通过表示这期间确实没有写操作数据可以安全使用如果校验没有通过需要重新获取锁保证数据安全。适合于读多写少的场景
案例
第一步一个使用加戳读写锁保护的容器
public class DataContainerStamped {private Object data;private final StampedLock LOCK new StampedLock();public Object read() {// 第一步乐观地读取数据Object result null;// 获取乐观读锁long stamp LOCK.tryOptimisticRead();Utils.println(乐观地读取数据stamp stamp);try {Thread.sleep(3000L);result data;} catch (InterruptedException e) {e.printStackTrace();Thread.currentThread().interrupt();}// 第二步读完数据之后对戳进行校验if (LOCK.validate(stamp)) {Utils.println(乐观地读完数据stamp stamp);return result;}// 第三步如果时间戳变了证明数据有更新需要重新读取数据Utils.println(更新读锁stamp stamp);try {stamp LOCK.readLock();Utils.println(获取读锁 stamp stamp);try {Thread.sleep(3000L);result data;} catch (InterruptedException e) {e.printStackTrace();Thread.currentThread().interrupt();}} finally {Utils.println(释放读锁, stamp stamp);LOCK.unlockRead(stamp);}return result;}public void write(Object newData) {long stamp LOCK.writeLock();Utils.println(获取写锁stamp stamp);try {try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();Thread.currentThread().interrupt();}this.data newData;} finally {Utils.println(释放写锁stamp stamp);LOCK.unlockWrite(stamp);}}
}第二步验证乐观读在读取数据的时候写入数据。结论是写入数据会改变数据戳乐观读完数据需要会校验戳如果数据戳被改变需要再次重新读取。
public static void main(String[] args) {DataContainerStamped container new DataContainerStamped();new Thread(() - {Object read container.read();System.out.println(read read);}).start();try {Thread.sleep(500);} catch (InterruptedException e) {e.printStackTrace();Thread.currentThread().interrupt();}new Thread(() - container.write(10)).start();
}原子类
原子类内部维护了一个数据并且通过cas算法来操作这个数据确保对这个数据的操作是线程安全的同时又避免了锁竞争。
原子类适合处理单个变量需要在线程间共享的情况。
CAS算法
cascompare and swap比较和交换实现无锁同步的一种算法
工作机制CAS算法维护了3个变量内存位置、预期原值、新值它会使用预期原值和内存位置存储的值相比较如果相同进行交换操作如果不同则不进行整个比较并交换的操作是原子性的。
底层原理在语法上CAS算法操作的变量必须被volatile修饰CAS算法的底层是基于CPU的原语支持能够保证 “比较-交换”操作是原子性的。
CAS算法的优缺点
优点可以避免阻塞CAS算法适合多核、并发量不高的情况缺点 只能保证一个共享变量的原子操作如果是多个的话就需要使用锁了。ABA问题如果先将预期值A给成B再改回A那CAS操作就会误认为A的值从来没有被改变过这时其他线程的CAS操作仍然能够成功。通过加入一个版本号来解决这个问题如果并发量很大CAS算法的性能可能会降低。因为如果并发量很大重试必然频繁发生这会导致效率降低
案例
import sun.misc.Unsafe;
import java.lang.reflect.Field;public class CasTest {private volatile int a 10;public static void main(String[] args) throws Exception {new CasTest().test();}public void test() throws Exception {Field field Unsafe.class.getDeclaredField(theUnsafe);field.setAccessible(true);Unsafe unsafe (Unsafe) field.get(null);// 获取字段a的内存偏移量Field fieldI CasTest.class.getDeclaredField(a);long fieldIOffset unsafe.objectFieldOffset(fieldI);// cas操作// 参数1操作哪个对象// 参数2操作对象上的哪个字段// 参数3预期值// 参数4新值boolean b unsafe.compareAndSwapInt(this, fieldIOffset, 10, 11);assert b;assert a 11;}
}常用的原子类
AtomicInteger
内部维护了一个int类型的数据对这个int类型的数据的所有操作都是原子性的。
案例1两个线程同时修改原子类变量只有一个可以修改成功
public static void main(String[] args) {AtomicInteger i new AtomicInteger(10);int intValue i.get();new Thread(() - {boolean b i.compareAndSet(intValue, 11);// 底层调用cas算法System.out.println(Thread.currentThread().getName() b b);}, t1).start();new Thread(() - {boolean b i.compareAndSet(intValue, 12);System.out.println(Thread.currentThread().getName() b b);}, t1).start();System.out.println(i i.get()); // 11
}案例210个线程对一个原子类的变量各加加1000次判断最终结果是否正确
public static void main(String[] args) throws InterruptedException {AtomicInteger i new AtomicInteger(0);ListThread threads new ArrayList();for (int j 0; j 10; j) {Thread thread new Thread(() - {for (int k 0; k 1000; k) {int i1 i.getAndIncrement();}});threads.add(thread);thread.start();}// 等待线程执行结束for (Thread thread : threads) {thread.join();}System.out.println(i i.get()); // 10000结果正确
}AtomicReference
内部维护了一个普通JavaBean的原子类对这个bean的操作是原子性的但是不支持单独操作bean中的某个字段必须整体替换bean。
案例两个线程同时修改原子类变量只有一个可以修改成功
public static void main(String[] args) throws InterruptedException {AtomicReferenceUser userAtomicReference new AtomicReference(new User(张三, 18));User user userAtomicReference.get();new Thread(() - {boolean b userAtomicReference.compareAndSet(user, new User(李四, 19));System.out.println(Thread.currentThread().getName() b b);}, t1).start();new Thread(() - {boolean b userAtomicReference.compareAndSet(user, new User(王五, 20));System.out.println(Thread.currentThread().getName() b b);}, t2).start();Thread.sleep(1000);System.out.println(user userAtomicReference.get()); // 李四 19
}AtomicIntegerArray
操作数组的原子类支持原子性地修改数组中的某个元素
案例两个线程同时更新数组中某个下标处的值
public static void main(String[] args) throws InterruptedException {int[] intArr new int[10];AtomicIntegerArray atomicIntegerArray new AtomicIntegerArray(intArr);new Thread(() - {boolean b atomicIntegerArray.compareAndSet(1, 0, 11);System.out.println(Thread.currentThread().getName() b b);}).start();new Thread(() - {boolean b atomicIntegerArray.compareAndSet(1, 0, 12);System.out.println(Thread.currentThread().getName() b b);}).start();Thread.sleep(100L);// 结论只有一个线程可以更新成功而且AtomicIntegerArray维护的是数组的拷贝而不是元数据// 所以在原数组中看不出更新内容System.out.println(intArr[1] atomicIntegerArray.get(1)); // 11System.out.println(intArr[1] intArr[1]); // 0
}AtomicStampedReference
使用一个版本号来解决ABA问题每次操作都需要手动更新版本号。
案例在主线程对变量进行修改的时候发生了ABA问题
public static void main(String[] args) throws InterruptedException {AtomicReferenceString ref new AtomicReference(A);// 主线程修改数据String prev ref.get();// ABA操作将数据从A改为B再改回来new Thread(() - Utils.println(change A - B: ref.compareAndSet(A, B)), t1).start();Thread.sleep(100L);new Thread(() - Utils.println(change B - A: ref.compareAndSet(B, A)), t2).start();Thread.sleep(1000);Utils.println(change A - C: ref.compareAndSet(prev, C));
}案例使用AtomicStampedReference解决ABA问题
public static void main(String[] args) throws InterruptedException {AtomicStampedReferenceString ref new AtomicStampedReference(A, 1);// 主线程执行更新操作String prev ref.getReference();int stamp ref.getStamp(); // 1// 在这个过程中其它线程执行了ABA操作new Thread(() - {int stamp1 ref.getStamp();Utils.println(change A - B: ref.compareAndSet(A, B, stamp1, stamp1 1));}, t1).start();Thread.sleep(100);new Thread(() - {int stamp1 ref.getStamp();Utils.println(change B - A: ref.compareAndSet(B, A, stamp1, stamp1 1));}, t2).start();Thread.sleep(1000);// 结果主线程更新失败解决了ABA问题Utils.println(change A - C: ref.compareAndSet(prev, C, stamp, stamp 1));
}累加器 LongAdder LongAccumulator
它们都是Java 8引入的高性能累加器原理几乎一样内部维护了一个base变量和Cell数组将累加操作分散到多个槽中减少竞争需要累加值的时候调用sum方法把Cell数组中每个槽中的数据相加。累加器用于替换AtomicLong它们更加适合高并发场景因为CAS在高并发场景下性能可能会降低。它们的不同之处在于LongAdder适合于简单的计算LongAccumulator适合于需要复杂计算的累加场景它可以定制计算规则
案例LongAdder
public static void main(String[] args) {LongAdder longAdder new LongAdder();longAdder.increment(); // 增加 1longAdder.add(10); // 增加 10System.out.println(总计数值: longAdder.sum()); // 输出11longAdder.reset(); // 重置计数器System.out.println(重置后总值: longAdder.sum()); // 输出0
}案例LongAccumulator
public static void main(String[] args) {// 定义累加规则为加法LongAccumulator longAccumulator new LongAccumulator(Long::sum, 0);for (int i 0; i 10; i) {new Thread(() - {longAccumulator.accumulate(10);}).start();}System.out.println(longAccumulator.get() longAccumulator.get()); // 100
}总结
原子类通常以Atomic开头
juc中提供的原子类
基本类型原子类AtomicInteger、AtomicLong、AtomicBoolean引用类型原子类AtomicReference、AtomicStampedReference解决ABA问题数组类型原子类AtomicIntegerArray、AtomicLongArray、AtomicReferenceArray累加器LongAdder、DoubleAdder、LongAccumulator、DoubleAccumulator
源码解析
原子类的源码比较简单它的内部调用了Unsafe类的cas算法通过它来保证线程安全这里以AtomicInteger为例了解原子类的工作机制
public class AtomicInteger extends Number implements java.io.Serializable {// 封装了int类型的变量并且变量被volatile修饰private volatile int value;// 获取int类型的变量在对象中的内存偏移量private static final Unsafe unsafe Unsafe.getUnsafe();private static final long valueOffset;static {try {valueOffset unsafe.objectFieldOffset(AtomicInteger.class.getDeclaredField(value));} catch (Exception ex) { throw new Error(ex); }}// 通过cas算法更新value字段的值public final boolean compareAndSet(int expect, int update) {return unsafe.compareAndSwapInt(this, valueOffset, expect, update);}
}总结原子类中封装的变量使用volatile修饰使用cas算法来更新保证线程的安全性
信号量
信号量Semaphore用来限制并发度的工具避免并发了过大从而达到保护程序的目的
使用案例使用semaphore来线程线程的并发数量同一时刻只能有三个线程同时运行
public static void main(String[] args) {Semaphore semaphore new Semaphore(3);for (int i 0; i 10; i) {new Thread(() - {try {semaphore.acquire();} catch (InterruptedException e) {e.printStackTrace();Thread.currentThread().interrupt();}try {Utils.println(running...);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();Thread.currentThread().interrupt();}Utils.println(end...);} finally {semaphore.release();}}).start();}
}常用api
acquirevoid acquire()从信号量获取一个许可如果无可用许可前将一直阻塞等待acquirevoid acquire(int permits) 获取指定数目的许可如果无可用许可前也将会一直阻塞等待tryAcquireboolean tryAcquire()从信号量尝试获取一个许可如果无可用许可直接返回false不会阻塞。它的重载方法可以获取指定数目的许可也可以指定阻塞的时间releasepublic void release()释放一个许可证计数器加1
使用方式总结
在构造信号量对象时指定许可证数量一个许可证对应一个线程表示最多有多少个线程可以执行任务在每个线程中执行任务前调用acquire方法获取许可证semaphore对象中许可证数量减1执行完任务后调用release方法释放许可证semaphore对象中许可证数量加1如果semaphore对象中许可证数量为0线程调用acquire方法时会进入阻塞状态直到其它线程释放许可证通过这种方式实现控制并发度的功能
LockSupport
用于创建锁和同步类的基本线程阻塞原语。当前线程调用LockSupport的park方法可以进入阻塞状态在线程外可以调用unpark方法同时传入线程实例可以让指定线程退出阻塞状态。
park和unpark与wait和notify的区别
wait、notify必须在同步块内调用park、unpark不必notify和notifyAll无法精确控制唤醒哪一个线程park和unpark可以
案例1基本使用
// 先调用park方法
public static void main(String[] args) {Thread thread new Thread(() - {Utils.println(start);LockSupport.park();Utils.println(end);});thread.start();try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();Thread.currentThread().interrupt();}Utils.println(解除指定线程的阻塞状态);// 再调用unpark方法LockSupport.unpark(thread);
}案例2在park方法之前调用unpark方法会怎么样
public static void main(String[] args) {Thread thread new Thread(() - {try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();Thread.currentThread().interrupt();}Utils.println(start);LockSupport.park();Utils.println(end);});thread.start(); // 注意线程必须先启动否则针对它调用unpark方法没有效果。Utils.println(解除指定线程的阻塞状态);LockSupport.unpark(thread); // LockSupport类似于只有一张许可证的Semaphore
}结论先调用unpark方法会park方法不会阻塞线程
LockSupport的源码非常简单它的底层是基于Unsafe类的park、unpark方法它只是直接调用了这两个方法然后再传入其它必要的参数比如超时时间。
倒计时锁
CountdownLatch倒计时锁做线程间的同步协作在某个位置等待所有线程完成倒计时然后再向下执行。
案例使用CountDownLatch实现主线程等待所有子线程执行完成后在执行的效果
public static void main(String[] args) {CountDownLatch latch new CountDownLatch(3);new Thread(() - {Utils.println(开始执行任务);Utils.sleep(1000);Utils.println(执行完成);latch.countDown();}).start();new Thread(() - {Utils.println(开始执行任务);Utils.sleep(2000);Utils.println(执行完成);latch.countDown();}).start();new Thread(() - {Utils.println(开始执行任务);Utils.sleep(1500);Utils.println(执行完成);latch.countDown();}).start();Utils.println(等待中);try {latch.await();} catch (InterruptedException e) {e.printStackTrace();Thread.currentThread().interrupt();}Utils.println(等待结束);
}使用方式总结
第一步构建倒计时锁构造函数中指定需要等待多少个任务完成第二步主线程调用await方法进入阻塞用来等待计数器归零第三步执行任务的线程执行完任务后调用countDown方法计数器减1表示执行完一个任务了结果当所有任务都执行完后await方法结束阻塞
循环栅栏
循环栅栏CyclicBarrier允许一组线程互相等待直到到达某个公共屏障点并且在释放等待线程后可以重用。CyclicBarrier的字面意思是可循环使用的屏障。它要做的事情是让一组线程到达一个屏障时被阻塞直到最后一个线程到达屏障时屏障才会开门所有被屏障拦截的线程才会继续干活。
作用适用于一组线程中的每一个线程需要都等待所有线程完成任务后再继续执行下一次任务的场景
CountDownLatch和CyclicBarrier的异同
相同点都有让多个线程等待同步然后再开始下一步动作的意思不同点 CountDownLatch的下一步的动作实施者是主线程具有不可重复性CyclicBarrier的下一步动作实施者还是“其他线程”本身具有往复多次实施动作的特点。
案例
public static void main(String[] args) {// 第一步指定需要同步的线程数和所有线程都到达同步点之后需要执行的方法CyclicBarrier barrier new CyclicBarrier(2, // 只有所有线程到达同步点之后才会执行这个任务() - Utils.println(任务结束));ExecutorService pool Executors.newFixedThreadPool(2);pool.submit(() - {try {Thread.sleep(1000L);} catch (InterruptedException e) {throw new RuntimeException(e);}Utils.println(任务1执行);try {// 第二步执行任务的线程到达同步点barrier.await();} catch (InterruptedException | BrokenBarrierException e) {e.printStackTrace();Thread.currentThread().interrupt();}Utils.println(任务1执行完成);});pool.submit(() - {try {Thread.sleep(2000L);} catch (InterruptedException e) {throw new RuntimeException(e);}Utils.println(任务2执行);try {// 第二步执行任务的线程到达同步点。barrier.await();} catch (InterruptedException | BrokenBarrierException e) {e.printStackTrace();Thread.currentThread().interrupt();}Utils.println(任务2执行完成);});pool.shutdown();
}源码解析
循环栅栏不像其他工具类那样内部依赖AQS它的内部只使用了可重入锁。线程会阻塞在wait方法直到所有的线程都执行到wait方法再一起向下执行接下来看一下它是如何做到的
核心代码
// wait方法的内部会调用dowait方法
private int dowait(boolean timed, long nanos)throws InterruptedException, BrokenBarrierException,TimeoutException {final ReentrantLock lock this.lock;lock.lock(); // 加锁try {final Generation g generation; // 代表循环栅栏的一次运行if (g.broken)throw new BrokenBarrierException();if (Thread.interrupted()) {breakBarrier();throw new InterruptedException();}// 所有线程都会执行wait方法--count表示当前线程执行到了wait方法数值减1int index --count;// 如果count等于0表示所有线程都执行到了wait方法执行预先定义好的异步任务if (index 0) { // trippedboolean ranAction false;try {final Runnable command barrierCommand;if (command ! null)command.run();ranAction true;nextGeneration(); // 循环栅栏的下一次运行return 0;} finally {if (!ranAction)breakBarrier(); // 打破屏障唤醒进入等待状态的线程}}// 如果count不等于0证明需要等待在上面count0的分支中会唤醒等待中的线程// loop until tripped, broken, interrupted, or timed outfor (;;) { // 自旋try {// 进入等待状态if (!timed)trip.await();else if (nanos 0L)nanos trip.awaitNanos(nanos);} catch (InterruptedException ie) {if (g generation ! g.broken) {breakBarrier();throw ie;} else {// Were about to finish waiting even if we had not// been interrupted, so this interrupt is deemed to// belong to subsequent execution.Thread.currentThread().interrupt();}}if (g.broken)throw new BrokenBarrierException();if (g ! generation)return index;// 如果等待超时同样打破屏障if (timed nanos 0L) {breakBarrier();throw new TimeoutException();}}} finally {lock.unlock();}
}// 打破屏障
private void breakBarrier() {generation.broken true;count parties;trip.signalAll();
}总结wait方法表示屏障点线程会被阻塞在wait方法处直到所有线程都执行到wait方法会执行指定的异步任务然后唤醒阻塞的线程。
线程安全的集合类
jdk1.8之前提供的安全集合HashTable、Vector它们的实现比较粗糙直接使用synchronized关键字修饰整个方法。
使用Collections工具类中提供的方法修饰一个集合。案例ListString list Collections.synchronizedList(new ArrayListString())使用一个线程安全的集合来包装用户提供的集合但是线程安全的集合中所有的方法都是被synchronized修饰的效率比较低下。
juc的安全集合
BlockingXXXLinkedBlockingQueue、ArrayBlockingQueue基于JUC中提供的锁线程池使用这两个个队列作为阻塞队列CopyOnWriteXXXCopyOnWriteArrayList、CopyOnWriteArraySet基于锁写时复制适合读多写少的场景。ConcurrentXXXConcurrentHashMap通过cas算法和局部加锁的方式优化了性能
ArrayBlockingQueue
基于数组的同步队列内部使用ReentrantLock所有的读写操作全部加锁
// 元素入队的方法
public boolean offer(E e) {checkNotNull(e);// 加锁final ReentrantLock lock this.lock;lock.lock();try {if (count items.length)return false;else {enqueue(e); // 入队return true;}} finally {lock.unlock(); // 释放锁}
}// 查看队列头部的元素内部也会加锁
public E peek() {final ReentrantLock lock this.lock;lock.lock();try {return itemAt(takeIndex); // null when queue is empty} finally {lock.unlock();}
}CopyOnWriteArrayList
copy on write写时复制当对集合进行修改操作时不会直接修改原数组而是创建一个新的数组副本在副本上进行修改然后将原数组替换为新数组。这种机制确保了在修改过程中读操作不会受到影响因为读操作始终基于原数组进行。
适用于读多写少的场景由于写操作需要创建数组副本写操作的性能开销较大但读操作的性能非常高效因此非常适合读操作频繁且写操作较少的场景
// 向集合中添加数据的源码
public boolean add(E e) {final ReentrantLock lock this.lock;lock.lock(); // 加锁try {Object[] elements getArray();int len elements.length;Object[] newElements Arrays.copyOf(elements, len 1); // 创建原先数组的复制品newElements[len] e;setArray(newElements); // 使用复制好的数组替换原数组return true;} finally {lock.unlock();}
}ConcurrentHashMap
使用方法和HashMap相同但它是线程安全的。通过cas算法和局部加锁只锁住某个节点的方式尽可能的避免锁和减小锁的粒度以此来优化性能。这里简单了解一下它的工作机制
// 首先存储数据的数组使用volatile修饰确保修改可以立刻被其他线程看到
transient volatile NodeK,V[] table;添加元素的核心方法
final V putVal(K key, V value, boolean onlyIfAbsent) {if (key null || value null) throw new NullPointerException();// 计算key的哈希值int hash spread(key.hashCode());int binCount 0;// 死循环自旋for (NodeK,V[] tab table;;) {NodeK,V f; int n, i, fh;// 如果存储元素的数组为null初始化数组这里就是在第一次put元素的时候初始化数组if (tab null || (n tab.length) 0)tab initTable();// 根据哈希值计算元素的下标如果下标处没有值进入当前分支else if ((f tabAt(tab, i (n - 1) hash)) null) {// 使用cas算法更新下标处的值如果更新成功退出// 这个过程是不加锁的使用cas算法可以保证线程安全if (casTabAt(tab, i, null,new NodeK,V(hash, key, value, null)))break; // no lock when adding to empty bin}// 判断是否正在扩容如果是帮助扩容else if ((fh f.hash) MOVED)tab helpTransfer(tab, f);else {// 下标处有值发生哈希冲突V oldVal null;// 只在这一个节点上加锁synchronized (f) {// 再次判断判断当前节点没有发生变化否则它有可能已经被其他线程更新了。if (tabAt(tab, i) f) {// 处理链表if (fh 0) { // 哈希值大于0证明它没有在扩容并且不是树节点binCount 1;// 遍历链表for (NodeK,V e f;; binCount) { K ek;// 如果找到相同的key更新valueif (e.hash hash ((ek e.key) key ||(ek ! null key.equals(ek)))) {oldVal e.val;if (!onlyIfAbsent)e.val value;break;}// 将新节点挂载到链表的尾部NodeK,V pred e;if ((e e.next) null) {pred.next new NodeK,V(hash, key,value, null);break;}}}// 处理红黑树else if (f instanceof TreeBin) {NodeK,V p;binCount 2;if ((p ((TreeBinK,V)f).putTreeVal(hash, key,value)) ! null) {oldVal p.val;if (!onlyIfAbsent)p.val value;}}}}// 判断是否需要扩容if (binCount ! 0) {if (binCount TREEIFY_THRESHOLD)treeifyBin(tab, i); // 扩容或者将链表转换为红黑树if (oldVal ! null)return oldVal;break;}}}addCount(1L, binCount);return null;
}总结通过cas算法来向数组中写入元素写元素时如果发生哈希冲突只在发生冲突的节点上加锁尽可能减小锁的粒度。
读取元素是不需要加锁的因为元素使用volatile修饰其它线程可以立刻看到元素的变化