【Java】通俗易懂的JUC源码剖析-LongAdder/LongAccumulator

通俗易懂的JUC源码剖析-LongAdder/LongAccumulator

小强大人发布于 今天 14:59

一、有AtomicLong为什么还需要LongAdder/LongAccumulator?

大家对AtomicLong应该比较熟悉(如果未接触过,请翻看另一篇博客,通俗易懂的AtomicLong源码剖析),但JDK1.8为什么又新增了LongAdder/LongAccumulator2个类?AtomicLong不够用吗?答案:主要是基于性能考虑。AtomicLong的incrementAndGet()方法在高并发场景下,多个线程竞争修改共享资源value,会造成循环耗时过长,进而导致性能问题,下面贴出源码来讲解这个问题:

public final long incrementAndGet() {

return unsafe.getAndAddLong(this, valueOffset, 1L) + 1L;

}

其中,unsafe.getAndAddLong方法如下:

public final long getAndAddLong(Object var1, long var2, long var4) {

long var6;

do {

var6 = this.getLongVolatile(var1, var2);

} while(!this.compareAndSwapLong(var1, var2, var6, var6 + var4));

return var6;

}

可以看到,多个线程在竞争修改共享资源value值时,是在一个循环里面,高并发情况下,同一时刻只有一个线程CAS操作成功,其他大多数线程CAS失败,从而处于不断循环重试的场景,因此对性能造成影响。

二、那为什么LongAdder/LongAccumulator为什么能提升性能呢?它底层是怎么实现的呢?用了什么数据结构呢?

由于LongAccumulator是LongAdder的功能扩展,底层原理差不多,在此以LongAdder原理来说明。
首先看LongAdder的类结构:

public class LongAdder extends Striped64 implements Serializable {

}

LongAdder继承了Striped64,真正发挥作用的是这个Striped64类,来看看它的类结构:

/**

* A package-local class holding common representation and mechanics * for classes supporting dynamic striping on 64bit values. The class * extends Number so that concrete subclasses must publicly do so. */@SuppressWarnings("serial")

abstract class Striped64 extends Number {

}

接下来看它的重要属性有哪些:

/**

* Table of cells. When non-null, size is a power of 2. */

/*提升性能发挥作用的Cell数组,核心思想是通过多个线程在对应自己的Cell进行累加,从而减少竞争*/

transient volatile Cell[] cells;

/**

* Base value, used mainly when there is no contention, but also as * a fallback during table initialization races. Updated via CAS. */

/*多个线程没有发生竞争的时候,值累加在base上,这与AtomicLong的value作用是一样的*/

transient volatile long base;

/**

* Spinlock (locked via CAS) used when resizing and/or creating Cells. */

/*当Cells数组初始化,创建元素或者扩容的时候为1,否则为0*/

transient volatile int cellsBusy;

可能不少同学对Cell感到不解,其实很简单,打开源码就知道究竟了

/*@Contended注解是JDK1.8提供的字节填充方式,解决伪共享问题,可翻看另一篇博客:**什么是伪共享***/

@sun.misc.Contended static final class Cell {

volatile long value;

Cell(long x) { value = x; }

final boolean cas(long cmp, long val) {

return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);

}

}

它是Striped64的内部类,里面有个volatile修饰的value值,也是通过cas操作修改它的值,LongAdder计数器的值就是所有Cell[]的value和再加上base的值。
对数据结构有了大致了解后,再来看里面的常用关键方法:

public void increment() {

add(1L);

}

public void decrement() {

add(-1L);

}

可以看到,递增递减都调用了add()方法,可见它是实现的核心。往里看:

public void add(long x) {

Cell[] as; long b, v; int m; Cell a;

if ((as = cells) != null || !casBase(b = base, b + x)) {

boolean uncontended = true;

if (as == null || (m = as.length - 1) < 0 ||

(a = as[getProbe() & m]) == null ||

!(uncontended = a.cas(v = a.value, v + x)))

longAccumulate(x, null, uncontended);

}

}

先来看第一个if分支if ((as = cells) != null || !casBase(b = base, b + x))
由于初始时cells为空,第一次调用add()方法的话,(as = cells) != null不成立,转向!casBase(b = base, b + x),打开里面代码很简单,就是对base值进行CAS修改,前面说过,没有竞争的时候修改的是base值,发生竞争的时候Cellp[]才起作用。

final boolean casBase(long cmp, long val) {

return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);

}

如果casBase返回true,表示该线程修改成功,结束;
如果casBase返回false,表示该线程修改失败,产生了竞争,进入里面的if条件

if (as == null || (m = as.length - 1) < 0 ||

(a = as[getProbe() & m]) == null ||

!(uncontended = a.cas(v = a.value, v + x)))

咋看有点复杂,有4个分支,不着急,一个个来看。
第一个和第二个就是判断Cell[]有没有初始化,且元素不为空。
第三个和第四个就是在Cell[]已初始化的前提下,定位出当前线程应该对应的Cell元素,并尝试CAS修改里面的value值,给它加x,如果不成功,进入里面的longAccumulate(x, null, uncontended);
进入之前,可能有同学对uncontended和getProbe() & m有疑问。
uncontendted,翻译过来是"未发生过竞争的"意思,里面的方法会用到这个标记;而getProbe()返回的是Thread类threadLocalRandomProbe属性的值,它在ThreadLocalRandom里面发挥作用,另一篇博客有讲解,ThreadLocalRandom原理剖析。在这里我们可以把它理解成HashMap的哈希值h,然后与m=as.length - 1进行与操作,其实等效于h % as.length,即找到对应的位置,是不是和HashMap定位元素位置很类似?

static final int getProbe() {

return UNSAFE.getInt(Thread.currentThread(), PROBE);

}

Class<?> tk = Thread.class;

PROBE = UNSAFE.objectFieldOffset

(tk.getDeclaredField("threadLocalRandomProbe"));

现在我们可以进入longAccumulate(x, null, uncontended);方法了,打开一看,你kin你ca,这么复杂,绝望了有没有?别急,耐心慢慢分析!

final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) {

int h;

// 如果没有初始化

if ((h = getProbe()) == 0) {

// current()里面会初始化probe值

ThreadLocalRandom.current(); // force initialization

// 重新获取probe值

h = getProbe();

// 还未初始化,肯定没有产生竞争

wasUncontended = true;

}

// 是否发生碰撞,即多个线程hash到同一个Cell元素位置

boolean collide = false; // True if last slot nonempty

for (;;) {

Cell[] as; Cell a; int n; long v;

// 如果cells数组已经初始化

if ((as = cells) != null && (n = as.length) > 0) {

// hash到的数组元素位置为空

if ((a = as[(n - 1) & h]) == null) {

if (cellsBusy == 0) { // Try to attach new Cell

Cell r = new Cell(x); // Optimistically create

// 尝试获取锁

if (cellsBusy == 0 && casCellsBusy()) {

boolean created = false;

try { // Recheck under lock

Cell[] rs; int m, j;

// 再次检查该位置元素是否为空

if ((rs = cells) != null &&

(m = rs.length) > 0 &&

rs[j = (m - 1) & h] == null) {

// 将新生成的元素Cell(x)放在该位置上

rs[j] = r;

created = true;

}

} finally {

// 释放锁

cellsBusy = 0;

}

if (created)

// (1)创建成功,退出循环

break;

// 创建不成功,下一轮循环重试

continue; // Slot is now non-empty

}

}

// 该位置元素为空,则没有发生碰撞

collide = false;

}

// 对应外面add()方法的第四个条件,即该位置元素不为空,且cas失败了

// 重置wasUncontended,通过下面的advanceProbe()重新hash,找到新的位置进行下一轮重试

// 之所以重置wasUncontended,是为了下一轮重试时走下面cas分支,尝试对该位置元素进行值的修改

else if (!wasUncontended) // CAS already known to fail

wasUncontended = true; // Continue after rehash

// 第N(N > 1)轮重试,尝试对该位置元素进行值的修改,

else if (a.cas(v = a.value, ((fn == null) ? v + x :

fn.applyAsLong(v, x))))

// (2)修改成功退出循环

break;

// 如果数组元素到达CPU个数或者已经被扩容了,则重新hash下一轮重试

else if (n >= NCPU || cells != as)

collide = false; // At max size or stale

// 以上条件都不满足,则发生了碰撞,且竞争失败了

else if (!collide)

collide = true;

// 碰撞竞争失败时,则去尝试获取锁去扩容Cell数组

else if (cellsBusy == 0 && casCellsBusy()) {

try {

if (cells == as) { // Expand table unless stale

// 扩容为原来的2倍

Cell[] rs = new Cell[n << 1];

// 拷贝旧数组元素到新数组中

for (int i = 0; i < n; ++i)

rs[i] = as[i];

cells = rs;

}

} finally {

// 释放所

cellsBusy = 0;

}

// 扩容成功,则重置collide,表示我有新的位置去重试了,不跟你抢这个位置了

collide = false;

continue; // Retry with expanded table

}

// 产生新的hash值,尝试去找别的数组位置

h = advanceProbe(h);

}

// Cell[]为空,对应外面add()的第一二个条件,则尝试获取锁去初始化数组

else if (cellsBusy == 0 && cells == as && casCellsBusy()) {

boolean init = false;

try { // Initialize table

if (cells == as) {

// 初始化大小为2

Cell[] rs = new Cell[2];

// 将Cell(x)放在0或1号位置上

rs[h & 1] = new Cell(x);

cells = rs;

init = true;

}

} finally {

// 释放锁

cellsBusy = 0;

}

// (3)初始化成功,退出循环

if (init)

break;

}

// 有别的线程正在初始化数组,则尝试累加在base变量上

else if (casBase(v = base, ((fn == null) ? v + x :

fn.applyAsLong(v, x))))

// (4)成功则退出循环

break; // Fall back on using base

}

}

由上面代码可以看出,这个方法逻辑相当复杂,再来总结梳理下,可以从上面注释标记的4处退出循环的条件来看:
(1) Cell[]不为空,hash到的位置元素为空,那么就创建元素,并赋值为x,成功的话可以退出循环;
(2) Cell[]不为空,hash到的位置元素不为空,且上一轮cas修改失败了,这轮重试如果成功,可以退出循环;
(3) Cell[]为空,那么尝试初始化数组,并把x赋值到0或1号位置上,成功的话可以退出循环;
(4) Cell[]为空,且有其他线程在初始化数组,那么尝试累加到base上,成功的话可以退出循环;
其他条件都是需要通过advanceProbe()进行rehash到其他位置,进行下一轮重试

三、总结

总结之前顺便提下LongAccumulator,它是把LongAdder的(v + x)操作换成一个LongBinaryOperator,即用户可以自定义累加操作的逻辑,其他地方都是一样的

public LongAccumulator(LongBinaryOperator accumulatorFunction,

long identity) {

this.function = accumulatorFunction;

base = this.identity = identity;

}

整个LongAdder的源码分析就到这里结束了,其实JDK也提供了double类型的DoubleAdder和DoubleAccumulator,他们都继承了Striped64,原理是大同小异的,有兴趣的同学可以自己去看看源码。
关于平时开发如何选择AtomicLong,相信大家也很清楚了,并发不高的情况下用AtomicLong就行,并发很高的情况下就要选择LongAdder或者LongAccumulator了!
最后在别的地方看到一张图,可以更好的帮助我们理解原理,放在这里给大家看看。
ps:第一次写技术博客,描述地不准确的地方,希望大家包容,也可以指出来,共同进步!
【Java】通俗易懂的JUC源码剖析-LongAdder/LongAccumulator

java

阅读 11更新于 今天 15:03

本作品系原创,采用《署名-非商业性使用-禁止演绎 4.0 国际》许可协议

avatar

小强大人

1 声望

0 粉丝

0 条评论

得票时间

avatar

小强大人

1 声望

0 粉丝

宣传栏

一、有AtomicLong为什么还需要LongAdder/LongAccumulator?

大家对AtomicLong应该比较熟悉(如果未接触过,请翻看另一篇博客,通俗易懂的AtomicLong源码剖析),但JDK1.8为什么又新增了LongAdder/LongAccumulator2个类?AtomicLong不够用吗?答案:主要是基于性能考虑。AtomicLong的incrementAndGet()方法在高并发场景下,多个线程竞争修改共享资源value,会造成循环耗时过长,进而导致性能问题,下面贴出源码来讲解这个问题:

public final long incrementAndGet() {

return unsafe.getAndAddLong(this, valueOffset, 1L) + 1L;

}

其中,unsafe.getAndAddLong方法如下:

public final long getAndAddLong(Object var1, long var2, long var4) {

long var6;

do {

var6 = this.getLongVolatile(var1, var2);

} while(!this.compareAndSwapLong(var1, var2, var6, var6 + var4));

return var6;

}

可以看到,多个线程在竞争修改共享资源value值时,是在一个循环里面,高并发情况下,同一时刻只有一个线程CAS操作成功,其他大多数线程CAS失败,从而处于不断循环重试的场景,因此对性能造成影响。

二、那为什么LongAdder/LongAccumulator为什么能提升性能呢?它底层是怎么实现的呢?用了什么数据结构呢?

由于LongAccumulator是LongAdder的功能扩展,底层原理差不多,在此以LongAdder原理来说明。
首先看LongAdder的类结构:

public class LongAdder extends Striped64 implements Serializable {

}

LongAdder继承了Striped64,真正发挥作用的是这个Striped64类,来看看它的类结构:

/**

* A package-local class holding common representation and mechanics * for classes supporting dynamic striping on 64bit values. The class * extends Number so that concrete subclasses must publicly do so. */@SuppressWarnings("serial")

abstract class Striped64 extends Number {

}

接下来看它的重要属性有哪些:

/**

* Table of cells. When non-null, size is a power of 2. */

/*提升性能发挥作用的Cell数组,核心思想是通过多个线程在对应自己的Cell进行累加,从而减少竞争*/

transient volatile Cell[] cells;

/**

* Base value, used mainly when there is no contention, but also as * a fallback during table initialization races. Updated via CAS. */

/*多个线程没有发生竞争的时候,值累加在base上,这与AtomicLong的value作用是一样的*/

transient volatile long base;

/**

* Spinlock (locked via CAS) used when resizing and/or creating Cells. */

/*当Cells数组初始化,创建元素或者扩容的时候为1,否则为0*/

transient volatile int cellsBusy;

可能不少同学对Cell感到不解,其实很简单,打开源码就知道究竟了

/*@Contended注解是JDK1.8提供的字节填充方式,解决伪共享问题,可翻看另一篇博客:**什么是伪共享***/

@sun.misc.Contended static final class Cell {

volatile long value;

Cell(long x) { value = x; }

final boolean cas(long cmp, long val) {

return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);

}

}

它是Striped64的内部类,里面有个volatile修饰的value值,也是通过cas操作修改它的值,LongAdder计数器的值就是所有Cell[]的value和再加上base的值。
对数据结构有了大致了解后,再来看里面的常用关键方法:

public void increment() {

add(1L);

}

public void decrement() {

add(-1L);

}

可以看到,递增递减都调用了add()方法,可见它是实现的核心。往里看:

public void add(long x) {

Cell[] as; long b, v; int m; Cell a;

if ((as = cells) != null || !casBase(b = base, b + x)) {

boolean uncontended = true;

if (as == null || (m = as.length - 1) < 0 ||

(a = as[getProbe() & m]) == null ||

!(uncontended = a.cas(v = a.value, v + x)))

longAccumulate(x, null, uncontended);

}

}

先来看第一个if分支if ((as = cells) != null || !casBase(b = base, b + x))
由于初始时cells为空,第一次调用add()方法的话,(as = cells) != null不成立,转向!casBase(b = base, b + x),打开里面代码很简单,就是对base值进行CAS修改,前面说过,没有竞争的时候修改的是base值,发生竞争的时候Cellp[]才起作用。

final boolean casBase(long cmp, long val) {

return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);

}

如果casBase返回true,表示该线程修改成功,结束;
如果casBase返回false,表示该线程修改失败,产生了竞争,进入里面的if条件

if (as == null || (m = as.length - 1) < 0 ||

(a = as[getProbe() & m]) == null ||

!(uncontended = a.cas(v = a.value, v + x)))

咋看有点复杂,有4个分支,不着急,一个个来看。
第一个和第二个就是判断Cell[]有没有初始化,且元素不为空。
第三个和第四个就是在Cell[]已初始化的前提下,定位出当前线程应该对应的Cell元素,并尝试CAS修改里面的value值,给它加x,如果不成功,进入里面的longAccumulate(x, null, uncontended);
进入之前,可能有同学对uncontended和getProbe() & m有疑问。
uncontendted,翻译过来是"未发生过竞争的"意思,里面的方法会用到这个标记;而getProbe()返回的是Thread类threadLocalRandomProbe属性的值,它在ThreadLocalRandom里面发挥作用,另一篇博客有讲解,ThreadLocalRandom原理剖析。在这里我们可以把它理解成HashMap的哈希值h,然后与m=as.length - 1进行与操作,其实等效于h % as.length,即找到对应的位置,是不是和HashMap定位元素位置很类似?

static final int getProbe() {

return UNSAFE.getInt(Thread.currentThread(), PROBE);

}

Class<?> tk = Thread.class;

PROBE = UNSAFE.objectFieldOffset

(tk.getDeclaredField("threadLocalRandomProbe"));

现在我们可以进入longAccumulate(x, null, uncontended);方法了,打开一看,你kin你ca,这么复杂,绝望了有没有?别急,耐心慢慢分析!

final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) {

int h;

// 如果没有初始化

if ((h = getProbe()) == 0) {

// current()里面会初始化probe值

ThreadLocalRandom.current(); // force initialization

// 重新获取probe值

h = getProbe();

// 还未初始化,肯定没有产生竞争

wasUncontended = true;

}

// 是否发生碰撞,即多个线程hash到同一个Cell元素位置

boolean collide = false; // True if last slot nonempty

for (;;) {

Cell[] as; Cell a; int n; long v;

// 如果cells数组已经初始化

if ((as = cells) != null && (n = as.length) > 0) {

// hash到的数组元素位置为空

if ((a = as[(n - 1) & h]) == null) {

if (cellsBusy == 0) { // Try to attach new Cell

Cell r = new Cell(x); // Optimistically create

// 尝试获取锁

if (cellsBusy == 0 && casCellsBusy()) {

boolean created = false;

try { // Recheck under lock

Cell[] rs; int m, j;

// 再次检查该位置元素是否为空

if ((rs = cells) != null &&

(m = rs.length) > 0 &&

rs[j = (m - 1) & h] == null) {

// 将新生成的元素Cell(x)放在该位置上

rs[j] = r;

created = true;

}

} finally {

// 释放锁

cellsBusy = 0;

}

if (created)

// (1)创建成功,退出循环

break;

// 创建不成功,下一轮循环重试

continue; // Slot is now non-empty

}

}

// 该位置元素为空,则没有发生碰撞

collide = false;

}

// 对应外面add()方法的第四个条件,即该位置元素不为空,且cas失败了

// 重置wasUncontended,通过下面的advanceProbe()重新hash,找到新的位置进行下一轮重试

// 之所以重置wasUncontended,是为了下一轮重试时走下面cas分支,尝试对该位置元素进行值的修改

else if (!wasUncontended) // CAS already known to fail

wasUncontended = true; // Continue after rehash

// 第N(N > 1)轮重试,尝试对该位置元素进行值的修改,

else if (a.cas(v = a.value, ((fn == null) ? v + x :

fn.applyAsLong(v, x))))

// (2)修改成功退出循环

break;

// 如果数组元素到达CPU个数或者已经被扩容了,则重新hash下一轮重试

else if (n >= NCPU || cells != as)

collide = false; // At max size or stale

// 以上条件都不满足,则发生了碰撞,且竞争失败了

else if (!collide)

collide = true;

// 碰撞竞争失败时,则去尝试获取锁去扩容Cell数组

else if (cellsBusy == 0 && casCellsBusy()) {

try {

if (cells == as) { // Expand table unless stale

// 扩容为原来的2倍

Cell[] rs = new Cell[n << 1];

// 拷贝旧数组元素到新数组中

for (int i = 0; i < n; ++i)

rs[i] = as[i];

cells = rs;

}

} finally {

// 释放所

cellsBusy = 0;

}

// 扩容成功,则重置collide,表示我有新的位置去重试了,不跟你抢这个位置了

collide = false;

continue; // Retry with expanded table

}

// 产生新的hash值,尝试去找别的数组位置

h = advanceProbe(h);

}

// Cell[]为空,对应外面add()的第一二个条件,则尝试获取锁去初始化数组

else if (cellsBusy == 0 && cells == as && casCellsBusy()) {

boolean init = false;

try { // Initialize table

if (cells == as) {

// 初始化大小为2

Cell[] rs = new Cell[2];

// 将Cell(x)放在0或1号位置上

rs[h & 1] = new Cell(x);

cells = rs;

init = true;

}

} finally {

// 释放锁

cellsBusy = 0;

}

// (3)初始化成功,退出循环

if (init)

break;

}

// 有别的线程正在初始化数组,则尝试累加在base变量上

else if (casBase(v = base, ((fn == null) ? v + x :

fn.applyAsLong(v, x))))

// (4)成功则退出循环

break; // Fall back on using base

}

}

由上面代码可以看出,这个方法逻辑相当复杂,再来总结梳理下,可以从上面注释标记的4处退出循环的条件来看:
(1) Cell[]不为空,hash到的位置元素为空,那么就创建元素,并赋值为x,成功的话可以退出循环;
(2) Cell[]不为空,hash到的位置元素不为空,且上一轮cas修改失败了,这轮重试如果成功,可以退出循环;
(3) Cell[]为空,那么尝试初始化数组,并把x赋值到0或1号位置上,成功的话可以退出循环;
(4) Cell[]为空,且有其他线程在初始化数组,那么尝试累加到base上,成功的话可以退出循环;
其他条件都是需要通过advanceProbe()进行rehash到其他位置,进行下一轮重试

三、总结

总结之前顺便提下LongAccumulator,它是把LongAdder的(v + x)操作换成一个LongBinaryOperator,即用户可以自定义累加操作的逻辑,其他地方都是一样的

public LongAccumulator(LongBinaryOperator accumulatorFunction,

long identity) {

this.function = accumulatorFunction;

base = this.identity = identity;

}

整个LongAdder的源码分析就到这里结束了,其实JDK也提供了double类型的DoubleAdder和DoubleAccumulator,他们都继承了Striped64,原理是大同小异的,有兴趣的同学可以自己去看看源码。
关于平时开发如何选择AtomicLong,相信大家也很清楚了,并发不高的情况下用AtomicLong就行,并发很高的情况下就要选择LongAdder或者LongAccumulator了!
最后在别的地方看到一张图,可以更好的帮助我们理解原理,放在这里给大家看看。
ps:第一次写技术博客,描述地不准确的地方,希望大家包容,也可以指出来,共同进步!
【Java】通俗易懂的JUC源码剖析-LongAdder/LongAccumulator

以上是 【Java】通俗易懂的JUC源码剖析-LongAdder/LongAccumulator 的全部内容, 来源链接: utcz.com/a/106055.html

回到顶部