阿里云网站建设流程教案,宽屏网站欣赏,WordPress数据库自动切换,合肥有多少建网站公司原子累加器 相较于上一节看图学源码 之 Atomic 类源码浅析一#xff08;cas 自旋操作的 AtomicXXX原子类#xff09;说的的原子类#xff0c;原子累加器的效率会更高 XXXXAdder 和 XXXAccumulator 区别就是 Adder只有add 方法#xff0c;Accumulator是可以进行自定义运算方…原子累加器 相较于上一节看图学源码 之 Atomic 类源码浅析一cas 自旋操作的 AtomicXXX原子类说的的原子类原子累加器的效率会更高 XXXXAdder 和 XXXAccumulator 区别就是 Adder只有add 方法Accumulator是可以进行自定义运算方法的 始于 Striped64
abstract class Striped64 extends Number {// cpu 运行核数, 控制数组的大小static final int NCPU Runtime.getRuntime().availableProcessors();// 当非空时大小是 2 的幂。 transient volatile Cell[] cells;// 表初始化竞争期间的后备值 通过 CAS 更新 就是 valuetransient volatile long base;// 锁的 标志位 调整单元大小和/或创建单元时使用自旋锁通过 CAS 锁定transient volatile int cellsBusy;//Base的cas 操作final boolean casBase(long cmp, long val) {return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);}// CellsBusy的cas操作final boolean casCellsBusy() {return UNSAFE.compareAndSwapInt(this, CELLSBUSY, 0, 1);}// 主要是给不同的线程找到数组中不同的下标// 当前线程的探测值static final int getProbe() {return UNSAFE.getInt(Thread.currentThread(), PROBE);}// 给定线程的给定探测值static final int advanceProbe(int probe) {probe ^ probe 13; // xorshiftprobe ^ probe 17;probe ^ probe 5;UNSAFE.putInt(Thread.currentThread(), PROBE, probe);return probe;}......// Unsafe mechanics// Unsafe 的获取 和 偏移量的获取private static final sun.misc.Unsafe UNSAFE;private static final long BASE;private static final long CELLSBUSY;private static final long PROBE;static {try {UNSAFE sun.misc.Unsafe.getUnsafe();Class? sk Striped64.class;BASE UNSAFE.objectFieldOffset(sk.getDeclaredField(base));CELLSBUSY UNSAFE.objectFieldOffset(sk.getDeclaredField(cellsBusy));Class? tk Thread.class;PROBE UNSAFE.objectFieldOffset(tk.getDeclaredField(threadLocalRandomProbe));} catch (Exception e) {throw new Error(e);}}}内部类 sun.misc.Contended——解决伪共享进行字节填充 sun.misc.Contended static final class Cell {// 操作的数volatile long value;// 构造器Cell(long x) { value x; }// 进行 cas final boolean cas(long cmp, long val) {return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);}
}LongAdder 可能会出现一边正在进行 累加操作一边又在执行求和操作所以就导致了不是 强一致性而是最终一致性 public class LongAdder extends Striped64 implements Serializable {private static final long serialVersionUID 7249069246863182397L;public LongAdder() {}public void add(long x) {Cell[] as; long b, v; int m; Cell a;// 数组是不是 null判断有没有发生竞争因为只有竞争发生才会初始化数组// 没有初始化就是没有竞争 直接对 base 的值x 失败if ((as cells) ! null || !casBase(b base, b x)) {//有竞争的时候boolean uncontended true;// 数组还是没有初始化 || 数组初始化但是数组的长度 0 || 数组中的该位置的值是 null (表示这个下标没有初始化)|| cas的方式把当前位置的值 x cas 失败if (as null || (m as.length - 1) 0 ||(a as[getProbe() m]) null ||!(uncontended a.cas(v a.value, v x)))// 发生冲突都会走这里longAccumulate(x, null, uncontended);}}public void increment() {add(1L);}public void decrement() {add(-1L);}// 返回当前总和。返回的值不是原子快照// 在没有并发更新的情况下调用会返回准确的结果但计算总和时发生的并发更新可能不会被合并// 所以不是 强一致性的public long sum() {Cell[] as cells; Cell a;long sum base;//数组不是 nullif (as ! null) {//遍历数组for (int i 0; i as.length; i) {//数组中的槽位不是 null对槽位的数据进行运算赋值加到base中if ((a as[i]) ! null)sum a.value;}}//返回总的值return sum;}// 将保持总和为零的变量重置。此方法可能是创建新加法器的有用替代方法但仅在没有并发更新时才有效。// 由于此方法本质上是活泼的因此仅应在 已知没有线程同时更新时才使用它。public void reset() {Cell[] as cells; Cell a;base 0L;if (as ! null) {// 数组存在遍历数组将数组中所有的值设置为 0 for (int i 0; i as.length; i) {if ((a as[i]) ! null)a.value 0L;}}}// 相当于sum后跟reset // 该方法可以应用于例如 多线程计算之间的静止点期间。// 如果此方法同时有更新则不能保证返回值是重置之前发生的最终值。public long sumThenReset() {Cell[] as cells; Cell a;long sum base;base 0L;if (as ! null) {// 数组存在遍历数组先求和 后把数组中所有的值设置为 0 for (int i 0; i as.length; i) {if ((a as[i]) ! null) {sum a.value;a.value 0L;}}}return sum;}// 返回sum的字符串表示形式public String toString() {return Long.toString(sum());}// 返回sumpublic long longValue() {return sum();}//缩小基元转换后以 int 形式返回sum public int intValue() {return (int)sum();}// 加宽基元转换后以float形式返回sum public float floatValue() {return (float)sum();}//加宽基元转换后以 double 形式返回sum public double doubleValue() {return (double)sum();}private static class SerializationProxy implements Serializable {private static final long serialVersionUID 7249069246863182397L;private final long value;// sum() 返回的当前值。SerializationProxy(LongAdder a) {value a.sum();}// 返回一个LongAdder对象其初始状态由该代理保存。private Object readResolve() {LongAdder a new LongAdder();a.base value;return a;}}private Object writeReplace() {return new SerializationProxy(this);}private void readObject(java.io.ObjectInputStream s)throws java.io.InvalidObjectException {throw new java.io.InvalidObjectException(Proxy required);}}striped64中的 longAccumulate final void longAccumulate(long x, LongBinaryOperator fn,boolean wasUncontended) {int h;// 拿 hash 值拿不到强制获取if ((h getProbe()) 0) {ThreadLocalRandom.current(); // force initializationh getProbe();// 将 wasUncontended 的值设为 true表示当前线程是未争用的。wasUncontended true;}boolean collide false; // True if last slot nonemptyfor (;;) {Cell[] as; Cell a; int n; long v;// 分支 1//数组已经初始化出现了竞争if ((as cells) ! null (n as.length) 0) {// 分支1.1// 当前位置的值是nullif ((a as[(n - 1) h]) null) {// 锁的标志位 0 没有加锁 if (cellsBusy 0) { // Try to attach new CellCell r new Cell(x); // Optimistically create// 加锁if (cellsBusy 0 casCellsBusy()) {boolean created false; try { // Recheck under lockCell[] rs; int m, j;// 加锁之后再次检查指定位置是否为空 // 数组初始化过了 当前位置的值不是nullif ((rs cells) ! null (m rs.length) 0 rs[j (m - 1) h] null) {// 给数组的指定位置设置为 之前设置过的cell对象rs[j] r;// 创建成功created true;}} finally {// 解锁cellsBusy 0;}if (created)break;continue; // Slot is now non-empty}}// 有人加锁了发生了冲突 避免在当前位置发生碰撞的情况下继续进行操作将 collide 标志位设置为 false。collide false;}// 分支1.2// 没有发生竞争else if (!wasUncontended) // CAS already known to fail// 此时是发生了竞争wasUncontended true; // Continue after rehash// 分支1.3// cas 的方式更新 此位置的值, cas 失败表示有线程正在此位置执行操作else if (a.cas(v a.value, ((fn null) ? v x :fn.applyAsLong(v, x))))break;// 分支1.4// n cpu 的个数 当前分段数组的长度是否已经达到或超过了处理器的数量。//如果是,说明分段数组已经达到了最大的容量或者已经很大了不再需要继续进行扩容操作。// 或者 cells 发生了变化当前线程获取到的分段数组引用是否与共享变量中的引用相等。// 如果不相等说明在当前线程获取到分段数组的过程中有其他线程进行了修改即分段数组已经发生了变化。else if (n NCPU || cells ! as)collide false; // At max size or stale// 分支1.5// 此时是发生了碰撞的 collide 被设置为 true else if (!collide)collide true;// 分支1.6 // 扩容// 没有被锁 cas 的方式 成功加锁 else if (cellsBusy 0 casCellsBusy()) {try {// 数组没有变化if (cells as) { // Expand table unless stale// as 数组长度扩大一倍Cell[] rs new Cell[n 1];// 元素直接赋值for (int i 0; i n; i)rs[i] as[i];cells rs;}} finally {// 解锁cellsBusy 0;}//没有冲突collide false;//扩容成功继续循环continue; // Retry with expanded table}// 更新 hash 值h advanceProbe(h);}
// 分支 2// 此处数组没有进行初始化此时进行初始化// 锁的标志为 0 数组没有改变多线程情况下该线程没有被其他线程初始化 cas 成功的把锁的标志位 设置为 1加锁流程// 当前的 cells 数组没有被其他线程占用并且成功获取了 cellsBusy 锁else if (cellsBusy 0 cells as casCellsBusy()) {boolean init false;try { // Initialize table// 加完锁之后再次判断一次 cells 数组没有发生过变化if (cells as) { // 数组 长度默认为2Cell[] rs new Cell[2];// 给rs 赋值为 要加入的 xrs[h 1] new Cell(x);// 将 cells 数组变更为 rscells rs;// 初始化成功init true;}} finally {// 解锁cellsBusy 0;}if (init) //初始化成功break; // 退出循环}
// 分支 3 cas 的方式 操作 base , fn 函数式接口的方法 null 默认加法否则就是定义的方法else if (casBase(v base, ((fn null) ? v x : fn.applyAsLong(v, x))))break; // Fall back on using base 退出循环}
}LongAccumulator public class LongAccumulator extends Striped64 implements Serializable {private static final long serialVersionUID 7249069246863182397L;private final LongBinaryOperator function;private final long identity;public LongAccumulator(LongBinaryOperator accumulatorFunction,long identity) {this.function accumulatorFunction;base this.identity identity;}// 更新值public void accumulate(long x) {Cell[] as; long b, v, r; int m; Cell a;// 有竞争 || cas 运算 base 的值成功 对 base进行cas更新失败 if ((as cells) ! null ||// function.applyAsLong 函数式接口 (r function.applyAsLong(b base, x)) ! b !casBase(b, r)) {boolean uncontended true;// 出现了竞争// 数组还是没有初始化 || 数组初始化但数组的长度 0 || 数组中的该位置的值是 null (表示这个下标没有初始化)|| cas的方式运算当前位置的值 失败 cas 更新当前位置的值也失败if (as null || (m as.length - 1) 0 ||(a as[getProbe() m]) null ||!(uncontended (r function.applyAsLong(v a.value, x)) v ||a.cas(v, r)))longAccumulate(x, function, uncontended);}}// 返回当前值。返回的值不是原子快照// 在没有并发更新的情况下调用会返回准确的结果但计算值时发生的并发更新可能不会被合并。public long get() {Cell[] as cells; Cell a;long result base;// 数组存在if (as ! null) {// 遍历数组for (int i 0; i as.length; i) {//数组中的槽位不是 nullif ((a as[i]) ! null)//对槽位的数据进行运算赋值加到base中result function.applyAsLong(result, a.value);}}// 并返回总的值return result;}// 重置变量以维护对标识值的更新。// 此方法可能是创建新更新程序的有用替代方法但仅在没有并发更新时才有效。// 由于此方法本质上是活泼的因此仅应在已知没有线程同时更新时才使用它。 public void reset() {Cell[] as cells; Cell a;base identity;// 数组存在if (as ! null) {// 遍历数组for (int i 0; i as.length; i) {//数组中的槽位不是 nullif ((a as[i]) ! null)//将槽位的值设置为 identitya.value identity;}}}// 效果相当于get后面跟着reset 。// 该方法可以应用于例如多线程计算之间的静止点期间。// 如果此方法同时有更新则不能保证返回值是重置之前发生的最终值。public long getThenReset() {Cell[] as cells; Cell a;long result base;base identity;// 数组存在if (as ! null) {// 遍历数组for (int i 0; i as.length; i) {//数组中的槽位不是 nullif ((a as[i]) ! null) {// 将槽位的值设置为 identity// 对槽位的数据进行运算赋值加到base中long v a.value;a.value identity;result function.applyAsLong(result, v);}}}return result;}public String toString() {return Long.toString(get());}public long longValue() {return get();}public int intValue() {return (int)get();}public float floatValue() {return (float)get();}public double doubleValue() {return (double)get();}/*** 序列化代理用于避免以序列化形式引用非公共 Striped64 超类*/private static class SerializationProxy implements Serializable {private static final long serialVersionUID 7249069246863182397L;private final long value;private final LongBinaryOperator function;private final long identity;SerializationProxy(LongAccumulator a) {function a.function;identity a.identity;value a.get();}private Object readResolve() {LongAccumulator a new LongAccumulator(function, identity);a.base value;return a;}}private Object writeReplace() {return new SerializationProxy(this);}private void readObject(java.io.ObjectInputStream s)throws java.io.InvalidObjectException {throw new java.io.InvalidObjectException(Proxy required);}} 在Double 中会有 doubleToRawLongBits的操作主要是检查数组越界的 DoubleAdder public class DoubleAdder extends Striped64 implements Serializable {private static final long serialVersionUID 7249069246863182397L;/*
请注意我们必须使用“long”作为底层表示因为 double 没有compareAndSet因为任何 CAS 实现中使用的按位等于与双精度等于不同
然而我们仅使用 CAS 来检测和缓解争用无论如何按位等于效果最好。
原则上这里使用的 longdouble 转换在大多数平台上基本上应该是免费的因为它们只是重新解释位。*/public DoubleAdder() {}public void add(double x) {Cell[] as; long b, v; int m; Cell a;// 数组存在 || 对 base 进行 cas运算操作失败
if ((as cells) ! null ||!casBase(b base,Double.doubleToRawLongBits(Double.longBitsToDouble(b) x))) {// boolean uncontended true;// 数组为 空 || 数组的长度 0 || 当前位置的值为 null || 对该位置的值进行cas 运算失败if (as null || (m as.length - 1) 0 ||(a as[getProbe() m]) null ||!(uncontended a.cas(v a.value,Double.doubleToRawLongBits(Double.longBitsToDouble(v) x))))doubleAccumulate(x, null, uncontended);}}/**返回当前总和。返回的值不是原子快照在没有并发更新的情况下调用会返回准确的结果但计算总和时发生的并发更新可能不会被合并。由于浮点算术不是严格关联的因此返回的结果不需要与在单个变量的一系列连续更新中获得的值相同。*/public double sum() {Cell[] as cells; Cell a;double sum Double.longBitsToDouble(base);if (as ! null) {for (int i 0; i as.length; i) {if ((a as[i]) ! null)sum Double.longBitsToDouble(a.value);}}return sum;}/**将保持总和为零的变量重置。此方法可能是创建新加法器的有用替代方法但仅在没有并发更新时才有效。由于此方法本质上是活泼的因此仅应在已知没有线程同时更新时才使用它。*/public void reset() {Cell[] as cells; Cell a;base 0L; // relies on fact that double 0 must have same rep as longif (as ! null) {for (int i 0; i as.length; i) {if ((a as[i]) ! null)a.value 0L;}}}/**相当于sum后跟reset 。该方法可以应用于例如多线程计算之间的静止点期间。如果此方法同时有更新则不能保证返回值是重置之前发生的最终值。*/public double sumThenReset() {Cell[] as cells; Cell a;double sum Double.longBitsToDouble(base);base 0L;if (as ! null) {for (int i 0; i as.length; i) {if ((a as[i]) ! null) {long v a.value;a.value 0L;sum Double.longBitsToDouble(v);}}}return sum;}public String toString() {return Double.toString(sum());}public double doubleValue() {return sum();}public long longValue() {return (long)sum();}public int intValue() {return (int)sum();}public float floatValue() {return (float)sum();}private static class SerializationProxy implements Serializable {private static final long serialVersionUID 7249069246863182397L;private final double value;SerializationProxy(DoubleAdder a) {value a.sum();}private Object readResolve() {DoubleAdder a new DoubleAdder();a.base Double.doubleToRawLongBits(value);return a;}}private Object writeReplace() {return new SerializationProxy(this);}private void readObject(java.io.ObjectInputStream s)throws java.io.InvalidObjectException {throw new java.io.InvalidObjectException(Proxy required);}}striped64中的 doubleAccumulate 和上面的 striped64中的 longAccumulate 几乎一模一样只有doubleToRawLongBits部分的细微差别 final void doubleAccumulate(double x, DoubleBinaryOperator fn,boolean wasUncontended) {int h;// 拿 hash 值拿不到强制获取if ((h getProbe()) 0) {ThreadLocalRandom.current(); // force initializationh getProbe();wasUncontended true;}boolean collide false; // True if last slot nonemptyfor (;;) {Cell[] as; Cell a; int n; long v;
// 分支 1//数组已经初始化出现了竞争if ((as cells) ! null (n as.length) 0) {// 分支1.1// 当前位置的值是nullif ((a as[(n - 1) h]) null) {// 锁的标志位 0 没有加锁 if (cellsBusy 0) { // Try to attach new CellCell r new Cell(Double.doubleToRawLongBits(x));// 加锁if (cellsBusy 0 casCellsBusy()) {boolean created false;try { // Recheck under lockCell[] rs; int m, j;// 数组初始化过了 当前位置的值不是nullif ((rs cells) ! null (m rs.length) 0 rs[j (m - 1) h] null) {// 给数组的位置设置为 之前设置过的cell对象rs[j] r;// 创建成功created true;}} finally {// 解锁cellsBusy 0;}if (created)break;continue; // Slot is now non-empty}}// 有人加锁了发生了冲突collide false;}// 分支1.2// 没有发生竞争else if (!wasUncontended) // CAS already known to fail// 此时是发生了竞争wasUncontended true; // Continue after rehash// 分支1.3// cas 的方式更新 此位置的值, cas 失败表示有线程正在此位置执行操作else if (a.cas(v a.value,((fn null) ?Double.doubleToRawLongBits(Double.longBitsToDouble(v) x) :Double.doubleToRawLongBits(fn.applyAsDouble(Double.longBitsToDouble(v), x)))))break;// 分支1.4// n cpu 的个数 或者 cells 发生了变化表示 之前没有发生碰撞不能扩容else if (n NCPU || cells ! as)collide false; // At max size or stale// 分支1.5// 此时是发生了碰撞的 collide 被设置为 true else if (!collide)collide true;// 分支1.6 // 扩容// 没有被锁 成功加锁 else if (cellsBusy 0 casCellsBusy()) {try {// 数组没有变化if (cells as) { // Expand table unless stale// as 数组长度扩大一倍Cell[] rs new Cell[n 1];// 元素直接赋值for (int i 0; i n; i)rs[i] as[i];cells rs;}} finally {// 解锁cellsBusy 0;}//没有冲突collide false;//扩容成功继续循环continue; // Retry with expanded table}// 获取hash 值h advanceProbe(h);}
// 分支 2// 此处数组没有进行初始化此时进行初始化// 锁的标志为 0 数组没有改变多线程情况下该线程没有被其他线程初始化 cas 成功的把锁的标志位 设置为 1枷锁流程else if (cellsBusy 0 cells as casCellsBusy()) {boolean init false;try { // Initialize table// 加完锁之后再次判断一次 cells 数组没有发生过变化if (cells as) {// 数组 长度默认为2Cell[] rs new Cell[2];// 给rs 赋值为 要加入的 xrs[h 1] new Cell(Double.doubleToRawLongBits(x));// 将 cells 数组变更为 rscells rs;// 初始化成功init true;}} finally {// 解锁cellsBusy 0;}if (init)// 初始化成功break; // 退出循环}
// 分支 3 cas 的方式 操作 base , fn 函数式接口的方法 null 默认加法否则就是定义的方法else if (casBase(v base,((fn null) ?Double.doubleToRawLongBits(Double.longBitsToDouble(v) x) :Double.doubleToRawLongBits(fn.applyAsDouble(Double.longBitsToDouble(v), x)))))break; // Fall back on using base// 退出循环}
}DoubleAccumulator public class DoubleAccumulator extends Striped64 implements Serializable {private static final long serialVersionUID 7249069246863182397L;private final DoubleBinaryOperator function;private final long identity; // use long representationpublic DoubleAccumulator(DoubleBinaryOperator accumulatorFunction,double identity) {this.function accumulatorFunction;base this.identity Double.doubleToRawLongBits(identity);}public void accumulate(double x) {Cell[] as; long b, v, r; int m; Cell a;// 数组存在 || 对 base 进行 cas的 运算操作成功 对base 进行cas 更新操作失败 if ((as cells) ! null ||(r Double.doubleToRawLongBits(function.applyAsDouble (Double.longBitsToDouble(b base), x))) ! b !casBase(b, r)) {boolean uncontended true;
// 数组为 空 || 数组被初始化但是 数组的长度 0 || 当前位置的值为 null || (对该位置的值进行cas 运算失败 || 对该值进行cas 更新失败)if (as null || (m as.length - 1) 0 ||(a as[getProbe() m]) null ||!(uncontended (r Double.doubleToRawLongBits(function.applyAsDouble(Double.longBitsToDouble(v a.value), x))) v ||a.cas(v, r)))doubleAccumulate(x, function, uncontended);}}public double get() {Cell[] as cells; Cell a;double result Double.longBitsToDouble(base);if (as ! null) {for (int i 0; i as.length; i) {if ((a as[i]) ! null)result function.applyAsDouble(result, Double.longBitsToDouble(a.value));}}return result;}public void reset() {Cell[] as cells; Cell a;base identity;if (as ! null) {for (int i 0; i as.length; i) {if ((a as[i]) ! null)a.value identity;}}}public double getThenReset() {Cell[] as cells; Cell a;double result Double.longBitsToDouble(base);base identity;if (as ! null) {for (int i 0; i as.length; i) {if ((a as[i]) ! null) {double v Double.longBitsToDouble(a.value);a.value identity;result function.applyAsDouble(result, v);}}}return result;}public String toString() {return Double.toString(get());}public double doubleValue() {return get();}public long longValue() {return (long)get();}public int intValue() {return (int)get();}public float floatValue() {return (float)get();}private static class SerializationProxy implements Serializable {private static final long serialVersionUID 7249069246863182397L;private final double value;private final DoubleBinaryOperator function;private final long identity;SerializationProxy(DoubleAccumulator a) {function a.function;identity a.identity;value a.get();}private Object readResolve() {double d Double.longBitsToDouble(identity);DoubleAccumulator a new DoubleAccumulator(function, d);a.base Double.doubleToRawLongBits(value);return a;}}private Object writeReplace() {return new SerializationProxy(this);}private void readObject(java.io.ObjectInputStream s)throws java.io.InvalidObjectException {throw new java.io.InvalidObjectException(Proxy required);}}