做网站公司多少钱,政务网站建设交流发言,做期货看什么网站的资讯,常州网红打卡景点#x1f44f;作者简介#xff1a;大家好#xff0c;我是爱吃芝士的土豆倪#xff0c;24届校招生Java选手#xff0c;很高兴认识大家#x1f4d5;系列专栏#xff1a;Spring源码、JUC源码#x1f525;如果感觉博主的文章还不错的话#xff0c;请#x1f44d;三连支持作者简介大家好我是爱吃芝士的土豆倪24届校招生Java选手很高兴认识大家系列专栏Spring源码、JUC源码如果感觉博主的文章还不错的话请三连支持一下博主哦博主正在努力完成2023计划中源码溯源一探究竟联系方式nhs19990716加我进群大家一起学习一起进步一起对抗互联网寒冬 文章目录 线程安全集合类概述ConcurrentHashMapConcurrentHashMap 原理JDK 7 HashMap 并发死链死链复现源码分析小结 JDK 8 ConcurrentHashMap重要方法构造器分析get 流程put 流程size 计算流程transfer JDK 7 ConcurrentHashMap构造器分析put 流程rehash 流程get 流程size 计算流程 BlockingQueueLinkedBlockingQueue 原理基本的入队出队加锁分析 CopyOnWriteArrayList迭代器弱一致性 线程安全集合类概述 线程安全集合类可以分为三大类
遗留的线程安全集合如 Hashtable Vector 出现时间比较早而且所有方法都是用synchronized修饰并发性能比较低时至今日有更好的实现更好的替代使用 Collections 装饰的线程安全集合如将原本不安全的集合变成安全的集合
Collections.synchronizedCollectionCollections.synchronizedListCollections.synchronizedMapCollections.synchronizedSetCollections.synchronizedNavigableMapCollections.synchronizedNavigableSetCollections.synchronizedSortedMapCollections.synchronizedSortedSet
private static class SynchronizedMapK,Vimplements MapK,V, Serializable {private static final long serialVersionUID 1978198479659022715L;private final MapK,V m; // Backing Mapfinal Object mutex; // Object on which to synchronizeSynchronizedMap(MapK,V m) {this.m Objects.requireNonNull(m);mutex this;}public int size() {synchronized (mutex) {return m.size();}}public boolean isEmpty() {synchronized (mutex) {return m.isEmpty();}}public boolean containsKey(Object key) {synchronized (mutex) {return m.containsKey(key);}}public boolean containsValue(Object value) {synchronized (mutex) {return m.containsValue(value);}}public V get(Object key) {synchronized (mutex) {return m.get(key);}}public V put(K key, V value) {synchronized (mutex) {return m.put(key, value);}}public V remove(Object key) {synchronized (mutex) {return m.remove(key);}}public void putAll(Map? extends K, ? extends V map) {synchronized (mutex) {m.putAll(map);}}public void clear() {synchronized (mutex) {m.clear();}}传入的就是线程不安全的map将其变成线程安全的
本质上就是多加了一个synchronized 锁住了对象
java.util.concurrent.*
重点介绍 java.util.concurrent.* 下的线程安全集合类可以发现它们有规律里面包含三类关键词 Blocking、CopyOnWrite、Concurrent Blocking 大部分实现基于锁并提供用来阻塞的方法很多方法在不满足条件的时候需要等待 CopyOnWrite 之类容器修改开销相对较重适用于读多写少 Concurrent 类型的容器
内部很多操作使用 cas 优化一般可以提供较高吞吐量
弱一致性
遍历时弱一致性例如当利用迭代器遍历时如果容器发生修改迭代器仍然可以继续进行遍历这时内容是旧的求大小弱一致性size 操作未必是 100% 准确读取弱一致性
遍历时如果发生了修改对于非安全容器来讲使用 fail-fast 机制也就是让遍历立刻失败抛出ConcurrentModificationException不再继续遍历
一致性 和 性能 两者不可兼得
ConcurrentHashMap
生成测试数据
static final String ALPHA abcedfghijklmnopqrstuvwxyz;public static void main(String[] args) {int length ALPHA.length();int count 200;ListString list new ArrayList(length * count);for (int i 0; i length; i) {char ch ALPHA.charAt(i);for (int j 0; j count; j) {list.add(String.valueOf(ch));}}Collections.shuffle(list);for (int i 0; i 26; i) {try (PrintWriter out new PrintWriter(new OutputStreamWriter(new FileOutputStream(tmp/ (i1) .txt)))) {String collect list.subList(i * count, (i 1) * count).stream().collect(Collectors.joining(\n));out.print(collect);} catch (IOException e) {}}}模版代码模版代码中封装了多线程读取文件的代码
private static V void demo(SupplierMapString,V supplier,BiConsumerMapString,V,ListString consumer) {MapString, V counterMap supplier.get();ListThread ts new ArrayList();for (int i 1; i 26; i) {int idx i;Thread thread new Thread(() - {ListString words readFromFile(idx);consumer.accept(counterMap, words);});ts.add(thread);}ts.forEach(t-t.start());ts.forEach(t- {try {t.join();} catch (InterruptedException e) {e.printStackTrace();}});System.out.println(counterMap);}public static ListString readFromFile(int i) {ArrayListString words new ArrayList();try (BufferedReader in new BufferedReader(new InputStreamReader(new FileInputStream(tmp/ i .txt)))) {while(true) {String word in.readLine();if(word null) {break;}words.add(word);}return words;} catch (IOException e) {throw new RuntimeException(e);}}
你要做的是实现两个参数
一是提供一个 map 集合用来存放每个单词的计数结果key 为单词value 为计数二是提供一组操作保证计数的安全性会传递 map 集合以及 单词 List
正确结果输出应该是每个单词出现 200 次
{a200, b200, c200, d200, e200, f200, g200, h200, i200, j200, k200, l200, m200,
n200, o200, p200, q200, r200, s200, t200, u200, v200, w200, x200, y200, z200} 下面的实现为
demo(// 创建 map 集合// 创建 ConcurrentHashMap 对不对() - new HashMapString, Integer(),// 进行计数(map, words) - {for (String word : words) {Integer counter map.get(word);int newValue counter null ? 1 : counter 1;map.put(word, newValue);}});结果和预期的并不一样
将HashMap 换成 ConcurrentHashMap发现还是不行
如果将其改成了ConcurrentHashMap的话那么其实下面的几行也不是原子的因为使用了线程安全的集合只能保证每行是原子的但是整体不是
或者加上 synchronized 锁。但是这样做没有好处ConcurrentHashMap本身加的就是细粒度的锁你使用synchronized 这么重的锁影响性能影响并发度。
ConcurrentHashMap 原理
JDK 7 HashMap 并发死链
注意 要在 JDK 7 下运行否则扩容机制和 hash 的计算方法都变了
以下测试代码是精心准备的不要随便改动
public static void main(String[] args) {// 测试 java 7 中哪些数字的 hash 结果相等System.out.println(长度为16时桶下标为1的key);for (int i 0; i 64; i) {if (hash(i) % 16 1) {System.out.println(i);}}System.out.println(长度为32时桶下标为1的key);for (int i 0; i 64; i) {if (hash(i) % 32 1) {System.out.println(i);}}// 1, 35, 16, 50 当大小为16时它们在一个桶内final HashMapInteger, Integer map new HashMapInteger, Integer();// 放 12 个元素map.put(2, null);map.put(3, null);map.put(4, null);map.put(5, null);map.put(6, null);map.put(7, null);map.put(8, null);map.put(9, null);map.put(10, null);map.put(16, null);map.put(35, null);map.put(1, null);System.out.println(扩容前大小[main]:map.size());new Thread() {Overridepublic void run() {// 放第 13 个元素, 发生扩容map.put(50, null);System.out.println(扩容后大小[Thread-0]:map.size());}}.start();new Thread() {Overridepublic void run() {// 放第 13 个元素, 发生扩容map.put(50, null);System.out.println(扩容后大小[Thread-1]:map.size());}}.start();}final static int hash(Object k) {int h 0;if (0 ! h k instanceof String) {return sun.misc.Hashing.stringHash32((String) k);}h ^ k.hashCode();h ^ (h 20) ^ (h 12);return h ^ (h 7) ^ (h 4);}死链复现
调试工具使用 idea
在 HashMap 源码 590 行加断点 int newCapacity newTable.length;断点的条件如下目的是让 HashMap 在扩容为 32 时并且线程为 Thread-0 或 Thread-1 时停下来
newTable.length32 (Thread.currentThread().getName().equals(Thread-0)||Thread.currentThread().getName().equals(Thread-1))断点暂停方式选择 Thread否则在调试 Thread-0 时Thread-1 无法恢复运行 运行代码程序在预料的断点位置停了下来输出
长度为16时桶下标为1的key
1
16
35
50
长度为32时桶下标为1的key
1
35
扩容前大小[main]:12 在jdk7 中hashmap是采用头插法插入的
接下来进入扩容流程调试 在 HashMap 源码 594 行加断点
EntryK,V next e.next; // 593
if (rehash) // 594
// ...这是为了观察 e 节点和 next 节点的状态Thread-0 单步执行到 594 行再 594 处再添加一个断点条件 Thread.currentThread().getName().equals(“Thread-0”) 这时可以在 Variables 面板观察到 e 和 next 变量使用 view as - Object 查看节点状态
e (1)-(35)-(16)-null
next (35)-(16)-null 在 Threads 面板选中 Thread-1 恢复运行可以看到控制台输出新的内容如下Thread-1 扩容已完成
newTable[1] (35)-(1)-null
扩容后大小13这时 Thread-0 还停在 594 处 Variables 面板变量的状态已经变化为
e (1)-null
next (35)-(1)-null为什么呢因为 Thread-1 扩容时链表也是后加入的元素放入链表头因此链表就倒过来了但 Thread-1 虽然结 果正确但它结束后 Thread-0 还要继续运行
接下来就可以单步调试F8观察死链的产生了
下一轮循环到 594将 e 搬迁到 newTable 链表头
newTable[1] (1)-null
e (35)-(1)-null
next (1)-null 下一轮循环到 594将 e 搬迁到 newTable 链表头
newTable[1] (35)-(1)-null
e (1)-null
next null 再看看源码
e.next newTable[1];
// 这时 e (1,35)
// 而 newTable[1] (35,1)-(1,35) 因为是同一个对象
// 相当于 1 - 35 - 1
newTable[1] e;
// 再尝试将 e 作为链表头, 死链已成
e next;
// 虽然 next 是 null, 会进入下一个链表的复制, 但死链已经形成了源码分析
HashMap 的并发死链发生在扩容时
void transfer(Entry[] newTable, boolean rehash) {int newCapacity newTable.length;for (EntryK,V e : table) {while(null ! e) {EntryK,V next e.next;// 1 处if (rehash) {e.hash null e.key ? 0 : hash(e.key);}int i indexFor(e.hash, newCapacity);// 2 处// 将新元素加入 newTable[i], 原 newTable[i] 作为新元素的 nexte.next newTable[i];newTable[i] e;e next;}}}假设 map 中初始元素是
原始链表格式[下标] (key,next)
[1] (1,35)-(35,16)-(16,null)线程 a 执行到 1 处 此时局部变量 e 为 (1,35)而局部变量 next 为 (35,16) 线程 a 挂起线程 b 开始执行
第一次循环
[1] (1,null)第二次循环
[1] (35,1)-(1,null)第三次循环
[1] (35,1)-(1,null)
[17] (16,null)切换回线程 a此时局部变量 e 和 next 被恢复引用没变但内容变了e 的内容被改为 (1,null)而 next 的内
容被改为 (35,1) 并链向 (1,null)
第一次循环
[1] (1,null)第二次循环注意这时 e 是 (35,1) 并链向 (1,null) 所以 next 又是 (1,null)
[1] (35,1)-(1,null)第三次循环e 是 (1,null)而 next 是 null但 e 被放入链表头这样 e.next 变成了 35 2 处
[1] (1,35)-(35,1)-(1,35)已经是死链了小结
究其原因是因为在多线程环境下使用了非线程安全的 map 集合JDK 8 虽然将扩容算法做了调整不再将元素加入链表头而是保持与扩容前一样的顺序但仍不意味着能 够在多线程环境下能够安全扩容还会出现其它问题如扩容丢数据
JDK 8 ConcurrentHashMap
// 默认为 0
// 当初始化时, 为 -1
// 当扩容时, 为 -(1 扩容线程数)
// 当初始化或扩容完成后为 下一次的扩容的阈值大小
private transient volatile int sizeCtl;// 整个 ConcurrentHashMap 就是一个 Node[]
static class NodeK,V implements Map.EntryK,V {}// hash 表
transient volatile NodeK,V[] table;// 扩容时的 新 hash 表
private transient volatile NodeK,V[] nextTable;// 扩容时如果某个 bin 迁移完毕, 用 ForwardingNode 作为旧 table bin 的头结点
// 主要用在扩容的时候
static final class ForwardingNodeK,V extends NodeK,V {}// 用在 compute 以及 computeIfAbsent 时, 用来占位, 计算完成后替换为普通 Node
static final class ReservationNodeK,V extends NodeK,V {}// 作为 treebin 的头节点, 存储 root 和 first
// 红黑树节点
static final class TreeBinK,V extends NodeK,V {}// 作为 treebin 的节点, 存储 parent, left, right
static final class TreeNodeK,V extends NodeK,V {}ForwardingNode 这个是从后往前处理处理完了就会在对应的下标 加一个头结点 fnode 如果扩容过程中去get如果是fnode的那么就会去新数组中去获取
转换红黑树当链表长度超过8的时候会考虑转换但是要满足一个前提就是 数组长度超过64否则只会执行扩容操作。因为扩容能够有效的减少链表的长度。
重要方法
// 获取 Node[] 中第 i 个 Node
static final K,V NodeK,V tabAt(NodeK,V[] tab, int i)// cas 修改 Node[] 中第 i 个 Node 的值, c 为旧值, v 为新值
static final K,V boolean casTabAt(NodeK,V[] tab, int i, NodeK,V c, NodeK,V v)// 直接修改 Node[] 中第 i 个 Node 的值, v 为新值
static final K,V void setTabAt(NodeK,V[] tab, int i, NodeK,V v)构造器分析
可以看到实现了懒惰初始化在构造方法中仅仅计算了 table 的大小以后在第一次使用时才会真正创建
// 初始容量 负载因子 并发度
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {if (!(loadFactor 0.0f) || initialCapacity 0 || concurrencyLevel 0)throw new IllegalArgumentException();// 如果初始容量小于并发度的时候 将初始容量改成 并发度也就是最起码要保持到并发度这么大if (initialCapacity concurrencyLevel) // Use at least as many binsinitialCapacity concurrencyLevel; // as estimated threadslong size (long)(1.0 (long)initialCapacity / loadFactor);// tableSizeFor 仍然是保证计算的大小是 2^n, 即 16,32,64 ... int cap (size (long)MAXIMUM_CAPACITY) ?MAXIMUM_CAPACITY : tableSizeFor((int)size);this.sizeCtl cap;// 1.8中实现了懒惰初始化而1.7中会在构造方法中创建了不管你用不用都会占用内存空间。}get 流程
可以看到整个get中没有任何的锁所以这也是并发度高的一个地方
public V get(Object key) {NodeK,V[] tab; NodeK,V e, p; int n, eh; K ek;// spread 方法能确保返回结果是正数int h spread(key.hashCode());// tab创建好了并且里面是有元素的if ((tab table) ! null (n tab.length) 0 (e tabAt(tab, (n - 1) h)) ! null) {// 获取 Node[] 中第 i 个 Node 定位到桶下标看看是不是为空如果不为空继续比较头节点的hash码是不是等于key的hash码// 如果头结点已经是要查找的 keyif ((eh e.hash) h) {if ((ek e.key) key || (ek ! null key.equals(ek)))return e.val;}// hash 为负数表示该 bin 在扩容中或是 treebin, 这时调用 find 方法来查找// 扩容中会变成 fnode 对应的取值就是负数else if (eh 0)return (p e.find(h, key)) ! null ? p.val : null;// 正常遍历链表, 用 equals 比较while ((e e.next) ! null) {if (e.hash h ((ek e.key) key || (ek ! null key.equals(ek))))return e.val;}}return null;}put 流程
以下数组简称table链表简称bin public V put(K key, V value) {return putVal(key, value, false);// onlyIfAbsent如果是false那么每次都会用新值替换掉旧值}final V putVal(K key, V value, boolean onlyIfAbsent) {if (key null || value null) throw new NullPointerException();// 其中 spread 方法会综合高位低位, 具有更好的 hash 性int hash spread(key.hashCode());int binCount 0;// 死循环for (NodeK,V[] tab table;;) {// f 是链表头节点// fh 是链表头结点的 hash// i 是链表在 table 中的下标NodeK,V f; int n, i, fh;// 要创建 tableif (tab null || (n tab.length) 0)// 初始化 table 使用了 cas, 无需 synchronized 创建成功, 进入下一轮循环// 因为是懒惰初始化的所以直到现在才开始创建 初始化使用cas 创建其它失败得再次进入循环没有用syn 我们得线程并没有被阻塞住tab initTable();// 要创建链表头节点else if ((f tabAt(tab, i (n - 1) hash)) null) {// 添加链表头使用了 cas, 无需 synchronized// 用cas将头节点加进去如果加入失败了继续循环if (casTabAt(tab, i, null,new NodeK,V(hash, key, value, null)))break;}// 帮忙扩容// 其实就是看你的头结点是不是 ForwardingNode其对应得MOVED是一个负数else if ((fh f.hash) MOVED)// 帮忙之后, 进入下一轮循环// 锁住当前的链表帮助去扩容tab helpTransfer(tab, f);// 能进入这个else说明 table既不处于扩容中也不是处于table的初始化过程中而且这时肯定发生了锁下标的冲突else {V oldVal null;// 锁住链表头节点// 并没有锁住整个tab而是锁住这个桶链表的头节点synchronized (f) {// 再次确认链表头节点没有被移动if (tabAt(tab, i) f) {// 链表// 链表的头节点hash码大于等于 0 if (fh 0) {binCount 1;// 遍历链表for (NodeK,V e f;; binCount) {K ek;
// 找到相同的 keyif (e.hash hash ((ek e.key) key ||(ek ! null key.equals(ek)))) {oldVal e.val;// 更新if (!onlyIfAbsent)e.val value;break;}NodeK,V pred e;// 已经是最后的节点了, 新增 Node, 追加至链表尾if ((e e.next) null) {pred.next new NodeK,V(hash, key,value, null);break;}}}// 红黑树else if (f instanceof TreeBin) {NodeK,V p;binCount 2;// putTreeVal 会看 key 是否已经在树中, 是, 则返回对应的 TreeNodeif ((p ((TreeBinK,V)f).putTreeVal(hash, key,value)) ! null) {oldVal p.val;if (!onlyIfAbsent)p.val value;}}}// 释放链表头节点的锁}if (binCount ! 0) {if (binCount TREEIFY_THRESHOLD)// 如果链表长度 树化阈值(8), 进行链表转为红黑树treeifyBin(tab, i);if (oldVal ! null)return oldVal;break;}}}// 增加 size 计数addCount(1L, binCount);return null;}private final NodeK,V[] initTable() {NodeK,V[] tab; int sc;// 这个hash有没有被创建while ((tab table) null || tab.length 0) {if ((sc sizeCtl) 0)// 让出cpu的使用权如果cpu的时间片没有其它线程了那么还是会分给这个线程只是让他不至于充分利用cpu少占用一点cpu的时间。Thread.yield();// 尝试将 sizeCtl 设置为 -1表示初始化 table// 而其它的线程再次进入循环首先 不小于0了其次之前的 sc也已经变了cas失败再次循环的时候发现 tab已经不为空了结束循环else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {// 获得锁, 创建 table, 这时其它线程会在 while() 循环中 yield 直至 table 创建try {if ((tab table) null || tab.length 0) {int n (sc 0) ? sc : DEFAULT_CAPACITY;NodeK,V[] nt (NodeK,V[])new Node?,?[n];table tab nt;// 计算出下一次要扩容的阈值sc n - (n 2);}} finally {// 计算出下一次要扩容的阈值sizeCtl sc;}break;}}return tab;}// check 是之前 binCount 的个数// 运用了 longadder 的思想private final void addCount(long x, int check) {CounterCell[] as; long b, s;if (// 已经有了 counterCells, 向 cell 累加// 累加单元数组不为空(as counterCells) ! null ||// 还没有, 向 baseCount 累加// 一个基础数值累加!U.compareAndSwapLong(this, BASECOUNT, b baseCount, s b x)) {CounterCell a; long v; int m;boolean uncontended true;if (// 还没有 counterCellsas null || (m as.length - 1) 0 ||// 还没有 cell(a as[ThreadLocalRandom.getProbe() m]) null ||// cell cas 增加计数失败!(uncontended U.compareAndSwapLong(a, CELLVALUE, v a.value, v x))) {// 创建累加单元数组和cell, 累加重试fullAddCount(x, uncontended);return;}if (check 1)return;// 获取元素个数s sumCount();}if (check 0) {NodeK,V[] tab, nt; int n, sc;// 看看元素的个数是否大于扩容的阈值while (s (long)(sc sizeCtl) (tab table) ! null (n tab.length) MAXIMUM_CAPACITY) {int rs resizeStamp(n);if (sc 0) {if ((sc RESIZE_STAMP_SHIFT) ! rs || sc rs 1 ||sc rs MAX_RESIZERS || (nt nextTable) null ||transferIndex 0)break;// newtable 已经创建了帮忙扩容if (U.compareAndSwapInt(this, SIZECTL, sc, sc 1))// 首次调用因为是懒惰初始化的所以还没有创建transfer(tab, nt);}// 需要扩容这时 newtable 未创建else if (U.compareAndSwapInt(this, SIZECTL, sc,(rs RESIZE_STAMP_SHIFT) 2))transfer(tab, null);s sumCount();}}}size 计算流程
size 计算实际发生在 putremove 改变集合元素的操作之中
没有竞争发生向 baseCount 累加计数有竞争发生新建 counterCells向其中的一个 cell 累加计数
counterCells 初始有两个 cell
如果计数竞争比较激烈会创建新的 cell 来累加计数 public int size() {long n sumCount();return ((n 0L) ? 0 :(n (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :(int)n);}final long sumCount() {CounterCell[] as counterCells; CounterCell a;// 将 baseCount 计数与所有 cell 计数累加long sum baseCount;if (as ! null) {for (int i 0; i as.length; i) {if ((a as[i]) ! null)sum a.value;}}return sum;}transfer
private final void transfer(NodeK,V[] tab, NodeK,V[] nextTab) {int n tab.length, stride;if ((stride (NCPU 1) ? (n 3) / NCPU : n) MIN_TRANSFER_STRIDE)stride MIN_TRANSFER_STRIDE; // subdivide rangeif (nextTab null) { // initiatingtry {SuppressWarnings(unchecked)// 将原有的扩容两倍NodeK,V[] nt (NodeK,V[])new Node?,?[n 1];nextTab nt;} catch (Throwable ex) { // try to cope with OOMEsizeCtl Integer.MAX_VALUE;return;}nextTable nextTab;transferIndex n;}int nextn nextTab.length;ForwardingNodeK,V fwd new ForwardingNodeK,V(nextTab);boolean advance true;boolean finishing false; // to ensure sweep before committing nextTabfor (int i 0, bound 0;;) {NodeK,V f; int fh;while (advance) {int nextIndex, nextBound;if (--i bound || finishing)advance false;else if ((nextIndex transferIndex) 0) {i -1;advance false;}else if (U.compareAndSwapInt(this, TRANSFERINDEX, nextIndex,nextBound (nextIndex stride ?nextIndex - stride : 0))) {bound nextBound;i nextIndex - 1;advance false;}}if (i 0 || i n || i n nextn) {int sc;if (finishing) {nextTable null;table nextTab;sizeCtl (n 1) - (n 1);return;}if (U.compareAndSwapInt(this, SIZECTL, sc sizeCtl, sc - 1)) {if ((sc - 2) ! resizeStamp(n) RESIZE_STAMP_SHIFT)return;finishing advance true;i n; // recheck before commit}}else if ((f tabAt(tab, i)) null)// 处理完了将链表设置成 ForwardingNodeadvance casTabAt(tab, i, null, fwd);else if ((fh f.hash) MOVED)advance true; // already processedelse {// 如果这个链表头是有元素的将链表锁住然后进行处理synchronized (f) {if (tabAt(tab, i) f) {NodeK,V ln, hn;// 普通节点if (fh 0) {int runBit fh n;NodeK,V lastRun f;for (NodeK,V p f.next; p ! null; p p.next) {int b p.hash n;if (b ! runBit) {runBit b;lastRun p;}}if (runBit 0) {ln lastRun;hn null;}else {hn lastRun;ln null;}for (NodeK,V p f; p ! lastRun; p p.next) {int ph p.hash; K pk p.key; V pv p.val;if ((ph n) 0)ln new NodeK,V(ph, pk, pv, ln);elsehn new NodeK,V(ph, pk, pv, hn);}setTabAt(nextTab, i, ln);setTabAt(nextTab, i n, hn);setTabAt(tab, i, fwd);advance true;}// 树节点的搬迁逻辑else if (f instanceof TreeBin) {TreeBinK,V t (TreeBinK,V)f;TreeNodeK,V lo null, loTail null;TreeNodeK,V hi null, hiTail null;int lc 0, hc 0;for (NodeK,V e t.first; e ! null; e e.next) {int h e.hash;TreeNodeK,V p new TreeNodeK,V(h, e.key, e.val, null, null);if ((h n) 0) {if ((p.prev loTail) null)lo p;elseloTail.next p;loTail p;lc;}else {if ((p.prev hiTail) null)hi p;elsehiTail.next p;hiTail p;hc;}}ln (lc UNTREEIFY_THRESHOLD) ? untreeify(lo) :(hc ! 0) ? new TreeBinK,V(lo) : t;hn (hc UNTREEIFY_THRESHOLD) ? untreeify(hi) :(lc ! 0) ? new TreeBinK,V(hi) : t;setTabAt(nextTab, i, ln);setTabAt(nextTab, i n, hn);setTabAt(tab, i, fwd);advance true;}}}}}}JDK 7 ConcurrentHashMap
它维护了一个 segment 数组(分段的意思)每个 segment(继承自ReentrantLock) 对应一把锁
优点如果多个线程访问不同的 segment实际是没有冲突的这与 jdk8 中是类似的缺点Segments 数组默认大小为16这个容量初始化指定后就不能改变了并且不是懒惰初始化(jdk8中随着扩容链表的个数也会越来越多所以这个并发度会随着你的这个容量增大而增大) 构造器分析
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {if (!(loadFactor 0) || initialCapacity 0 || concurrencyLevel 0)throw new IllegalArgumentException();if (concurrencyLevel MAX_SEGMENTS)concurrencyLevel MAX_SEGMENTS;// ssize 必须是 2^n, 即 2, 4, 8, 16 ... 表示了 segments 数组的大小int sshift 0;int ssize 1;while (ssize concurrencyLevel) {sshift;ssize 1;}// segmentShift 默认是 32 - 4 28this.segmentShift 32 - sshift;// segmentMask 默认是 15 即 0000 0000 0000 1111this.segmentMask ssize - 1;// 为了将来 get 或者 put 一个key的时候他好确定这个key 在 segment中的那个元素if (initialCapacity MAXIMUM_CAPACITY)initialCapacity MAXIMUM_CAPACITY;int c initialCapacity / ssize;if (c * ssize initialCapacity)c;int cap MIN_SEGMENT_TABLE_CAPACITY;while (cap c)cap 1;// 创建 segments and segments[0]SegmentK,V s0 new SegmentK,V(loadFactor, (int)(cap * loadFactor),(HashEntryK,V[])new HashEntry[cap]);SegmentK,V[] ss (SegmentK,V[])new Segment[ssize];UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]this.segments ss;}可以看到 ConcurrentHashMap 没有实现懒惰初始化空间占用不友好
其中 this.segmentShift 和 this.segmentMask 的作用是决定将 key 的 hash 结果匹配到哪个 segment
例如根据某一 hash 值求 segment 位置先将高位向低位移动 this.segmentShift 位 结果再与 this.segmentMask 做位于运算最终得到 1010 即下标为 10 的 segment put 流程 public V put(K key, V value) {SegmentK,V s;if (value null)throw new NullPointerException();int hash hash(key);// 计算出 segment 下标int j (hash segmentShift) segmentMask;// 获得 segment 对象, 判断是否为 null, 是则创建该 segmentif ((s (SegmentK,V)UNSAFE.getObject(segments, (j SSHIFT) SBASE)) null) {// 这时不能确定是否真的为 null, 因为其它线程也发现该 segment 为 null,// 因此在 ensureSegment 里用 cas 方式保证该 segment 安全性s ensureSegment(j);}// 进入 segment 的put 流程return s.put(key, hash, value, false);}segment 继承了可重入锁ReentrantLock它的 put 方法为 final V put(K key, int hash, V value, boolean onlyIfAbsent) {// 尝试加锁HashEntryK,V node tryLock() ? null :// 如果不成功, 进入 scanAndLockForPut 流程// 如果是多核 cpu 最多 tryLock 64 次, 进入 lock 流程// 在尝试期间, 还可以顺便看该节点在链表中有没有, 如果没有顺便创建出来scanAndLockForPut(key, hash, value);// 执行到这里 segment 已经被成功加锁, 可以安全执行V oldValue;try {HashEntryK,V[] tab table;int index (tab.length - 1) hash;// 找到链表头结点HashEntryK,V first entryAt(tab, index);for (HashEntryK,V e first;;) {if (e ! null) {// 更新K k;if ((k e.key) key ||(e.hash hash key.equals(k))) {oldValue e.value;if (!onlyIfAbsent) {e.value value;modCount;}break;}e e.next;}else {// 新增// 1) 之前等待锁时, node 已经被创建, next 指向链表头if (node ! null)node.setNext(first);else// 2) 创建新 nodenode new HashEntryK,V(hash, key, value, first);int c count 1;// 3) 扩容// 超过了阈值if (c threshold tab.length MAXIMUM_CAPACITY)rehash(node);else// 将 node 作为链表头setEntryAt(tab, index, node);modCount;count c;oldValue null;break;}}} finally {unlock();}return oldValue;}rehash 流程
发生在 put 中因为此时已经获得了锁因此 rehash 时不需要考虑线程安全
private void rehash(HashEntryK,V node) {HashEntryK,V[] oldTable table;int oldCapacity oldTable.length;// 移位int newCapacity oldCapacity 1;threshold (int)(newCapacity * loadFactor);HashEntryK,V[] newTable (HashEntryK,V[]) new HashEntry[newCapacity];int sizeMask newCapacity - 1;for (int i 0; i oldCapacity ; i) {HashEntryK,V e oldTable[i];if (e ! null) {HashEntryK,V next e.next;int idx e.hash sizeMask;if (next null) // Single node on listnewTable[idx] e;else { // Reuse consecutive sequence at same slotHashEntryK,V lastRun e;int lastIdx idx;// 过一遍链表, 尽可能把 rehash 后 idx 不变的节点重用for (HashEntryK,V last next;last ! null;last last.next) {int k last.hash sizeMask;if (k ! lastIdx) {lastIdx k;lastRun last;}}newTable[lastIdx] lastRun;// 剩余节点需要新建for (HashEntryK,V p e; p ! lastRun; p p.next) {V v p.value;int h p.hash;int k h sizeMask;HashEntryK,V n newTable[k];newTable[k] new HashEntryK,V(h, p.key, v, n);}}}}// 扩容完成, 才加入新的节点int nodeIndex node.hash sizeMask; // add the new nodenode.setNext(newTable[nodeIndex]);newTable[nodeIndex] node;// 替换为新的 HashEntry tabletable newTable;}get 流程
get 时并未加锁用了 UNSAFE 方法保证了可见性扩容过程中get 先发生就从旧表取内容get 后发生就从新 表取内容 public V get(Object key) {SegmentK,V s; // manually integrate access methods to reduce overheadHashEntryK,V[] tab;int h hash(key);// u 为 segment 对象在数组中的偏移量long u (((h segmentShift) segmentMask) SSHIFT) SBASE;// s 即为 segment// 数组内元素必须使用这个来保证它的可见性 if ((s (SegmentK,V)UNSAFE.getObjectVolatile(segments, u)) ! null (tab s.table) ! null) {for (HashEntryK,V e (HashEntryK,V) UNSAFE.getObjectVolatile(tab, ((long)(((tab.length - 1) h)) TSHIFT) TBASE);e ! null; e e.next) {K k;if ((k e.key) key || (e.hash h key.equals(k)))return e.value;}}return null;}size 计算流程
计算元素个数前先不加锁计算两次如果前后两次结果如一样认为个数正确返回如果不一样进行重试重试次数超过 3将所有 segment 锁住重新计算个数返回
其size本身的计算就是弱一致性的。 public int size() {// Try a few times to get accurate count. On failure due to// continuous async changes in table, resort to locking.final SegmentK,V[] segments this.segments;int size;boolean overflow; // true if size overflows 32 bitslong sum; // sum of modCountslong last 0L; // previous sumint retries -1; // first iteration isnt retrytry {for (;;) {if (retries RETRIES_BEFORE_LOCK) {// 超过重试次数, 需要创建所有 segment 并加锁for (int j 0; j segments.length; j)ensureSegment(j).lock(); // force creation}sum 0L;size 0;overflow false;for (int j 0; j segments.length; j) {SegmentK,V seg segmentAt(segments, j);if (seg ! null) {sum seg.modCount;int c seg.count;if (c 0 || (size c) 0)overflow true;}}if (sum last)break;last sum;}} finally {if (retries RETRIES_BEFORE_LOCK) {for (int j 0; j segments.length; j)segmentAt(segments, j).unlock();}}return overflow ? Integer.MAX_VALUE : size;}BlockingQueue
LinkedBlockingQueue 原理
基本的入队出队
public class LinkedBlockingQueueE extends AbstractQueueEimplements BlockingQueueE, java.io.Serializable {static class NodeE {E item;/*** 下列三种情况之一* - 真正的后继节点* - 自己, 发生在出队时* - null, 表示是没有后继节点, 是最后了*/NodeE next;Node(E x) { item x; }}
}初始化链表 last head new Node(null); Dummy 节点用来占位item 为 null 当一个节点入队 last last.next node; 再来一个节点入队 last last.next node; 出队
NodeE h head;
NodeE first h.next;
h.next h; // help GC
head first;
E x first.item;
first.item null;
return x;h head first h.next h.next h
指向自己安全的进行垃圾回收 head first E x first.item;
first.item null;
return x;加锁分析
高明之处在于用了两把锁和 dummy 节点(站位节点)
用一把锁同一时刻最多只允许有一个线程生产者或消费者二选一执行用两把锁同一时刻可以允许两个线程同时一个生产者与一个消费者执行
消费者与消费者线程仍然串行 生产者与生产者线程仍然串行
线程安全分析
当节点总数大于 2 时包括 dummy 节点putLock 保证的是 last 节点的线程安全takeLock 保证的是 head 节点的线程安全。两把锁保证了入队和出队没有竞争当节点总数等于 2 时即一个 dummy 节点一个正常节点这时候仍然是两把锁锁两个对象不会竞争当节点总数等于 1 时就一个 dummy 节点这时 take 线程会被 notEmpty 条件阻塞有竞争会阻塞
// 用于 put(阻塞) offer(非阻塞)
private final ReentrantLock putLock new ReentrantLock();// 用户 take(阻塞) poll(非阻塞)
private final ReentrantLock takeLock new ReentrantLock();put 操作
public void put(E e) throws InterruptedException {if (e null) throw new NullPointerException();int c -1;NodeE node new NodeE(e);final ReentrantLock putLock this.putLock;// count 用来维护元素计数final AtomicInteger count this.count;putLock.lockInterruptibly();try {// 满了等待while (count.get() capacity) {// 倒过来读就好: 等待 notFullnotFull.await();}// 有空位, 入队且计数加一enqueue(node);c count.getAndIncrement();// 在这里和我们之前自己做的 使用signalall唤醒是不同的都是使用signal来唤醒使用signal 而不使用signalall的原因就是signalall一次会唤醒多个这样的话 最终可能就一个会去执行然后又陷入等待会引起很多不必要的竞争// 除了自己 put 以外, 队列还有空位, 由自己叫醒其他 put 线程if (c 1 capacity)notFull.signal();} finally {putLock.unlock();}// 如果队列中有一个元素, 叫醒 take 线程 是为了确保在第一个元素被添加到队列中时可以及时通知等待的take线程进行取出操作。这样可以避免take线程一直处于等待状态提高了程序的效率。if (c 0)// 这里调用的是 notEmpty.signal() 而不是 notEmpty.signalAll() 是为了减少竞争signalNotEmpty();}take 操作
public E take() throws InterruptedException {E x;int c -1;final AtomicInteger count this.count;final ReentrantLock takeLock this.takeLock;takeLock.lockInterruptibly();try {while (count.get() 0) {notEmpty.await();}x dequeue();c count.getAndDecrement();if (c 1)notEmpty.signal();} finally {takeLock.unlock();}// 如果队列中只有一个空位时, 叫醒 put 线程// 如果有多个线程进行出队, 第一个线程满足 c capacity, 但后续线程 c capacityif (c capacity)// 这里调用的是 notFull.signal() 而不是 notFull.signalAll() 是为了减少竞争signalNotFull()return x;}CopyOnWriteArrayList
CopyOnWriteArraySet 是它的马甲 底层实现采用了 写入时拷贝 的思想增删改操作会将底层数组拷贝一份更 改操作在新数组上执行这时不影响其它线程的并发读读写分离。 以新增为例 public boolean add(E e) {synchronized (lock) {// 获取旧的数组Object[] es getArray();int len es.length;// 拷贝新的数组这里是比较耗时的操作但不影响其它读线程es Arrays.copyOf(es, len 1);// 添加新元素es[len] e;// 替换旧的数组setArray(es);return true;}}这里的源码版本是 Java 11在 Java 1.8 中使用的是可重入锁而不是 synchronized
其它读操作并未加锁例如
public void forEach(Consumer? super E action) {Objects.requireNonNull(action);for (Object x : getArray()) {SuppressWarnings(unchecked) E e (E) x;action.accept(e);}}适合『读多写少』的应用场景
get 弱一致性 时间点操作1Thread-0 getArray()2Thread-1 getArray()3Thread-1 setArray(arrayCopy)4Thread-0 array[index]
不容易测试但问题确实存在
迭代器弱一致性
CopyOnWriteArrayListInteger list new CopyOnWriteArrayList();
list.add(1);
list.add(2);
list.add(3);
IteratorInteger iter list.iterator();
new Thread(() - {list.remove(0);System.out.println(list);
}).start();
sleep1s();
while (iter.hasNext()) {System.out.println(iter.next());
}不要觉得弱一致性就不好
数据库的 MVCC 都是弱一致性的表现并发高和一致性是矛盾的需要权衡