亲宝软件园·资讯

展开

LongAdder原理及创建使用示例详解

Afu 人气:0

LongAdder介绍

1.Atomic原子类

Atomic的原子类内部使用的是CAS原则,CAS是一个乐观锁,但是如果是在高并发的情况下的话,多个线程不断地竞争

CAS的不断的自旋,非常耗CPU.

高并发环境下,value变量其实是一个热点,也就是多个线程竞争一个热点。

这时候就需要使用的 LongAdder 来替代 Atomic类

2.LongAdder原理

LongAdder的原理就是分散热点,将value分散到一个数组中,不同的线程去找自己的对应的Cell进行修改值,

各个线程对Cell进行CAS操作,这样热点就被分散了,冲突的概率小了,性能就提高了.

如果要返回实际的值,返回所有的数组中的值和base值就行.

2.查看LongAdder的add方法

public void add(long x) {
    Cell[] as; long b, v; int m; Cell a;
    if ((as = cells) != null || !casBase(b = base, b + x)) {
        boolean uncontended = true;
        if (as == null || (m = as.length - 1) < 0 ||
            (a = as[getProbe() & m]) == null ||
            !(uncontended = a.cas(v = a.value, v + x)))
            longAccumulate(x, null, uncontended);
    }
}

一看到这种代码就头皮发麻

简单说明一下这几行代码作用,说多了,你也懒得看

第一个if作用:

如果还没有初始化cells数组,就去修改base值

如果修改Base值失败,说明多个线程对base值修改发生了竞争

第二个if作用:

判断有没有cells有没有值,有的话说明已经初始化过cells数组了

知道对应的桶位添加值或者修改值

我感觉你已经不知道我在说什么了!

反正你就知道,如果有多个线程发生了竞争,就去cells数组中找对应的桶位的cell添加或者修改值即可

3.longAccumulate方法

这里的代码特别的恶心,是能助眠的好代码,建议收藏!

简单说明一下几个核心的代码

3.1.创建Cell数组

  else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
                boolean init = false;
                try {
                    if (cells == as) {
                        Cell[] rs = new Cell[2];
                        rs[h & 1] = new Cell(x);
                        cells = rs;
                        init = true;
                    }
                } finally {
                    cellsBusy = 0;
                }
                if (init)
                    break;
            }

Cell数组还没有进行初始化,

创建长度为2的Cell数组

在索引1的位置存储第一个Cell对象

3.2.创建桶位Cell对象

                if ((as = cells) != null && (n = as.length) > 0) {    
              if ((a = as[(n - 1) & h]) == null) {
                    if (cellsBusy == 0) {     
                        Cell r = new Cell(x);  
                        if (cellsBusy == 0 && casCellsBusy()) {
                            boolean created = false;
                            try {             
                                Cell[] rs; int m, j;
                                if ((rs = cells) != null &&
                                    (m = rs.length) > 0 &&
                                    rs[j = (m - 1) & h] == null) {
                                    rs[j] = r;
                                    created = true;
                                }
                            } finally {
                                cellsBusy = 0;
                            }
                            if (created)
                                break;
                            continue;          
                        }
                    }

m - 1) & h 通过路由寻址公式找到对应的索引位置的桶位为空,然后把创建的Cell对象存储进去

更直白就是说,你要蹲坑了,你要去找坑,如果坑位没有人,你就进去

3.3.扩容Cell数组

 else if (cellsBusy == 0 && casCellsBusy()) {
                    try {
                        if (cells == as) {     
                            Cell[] rs = new Cell[n << 1];
                            for (int i = 0; i < n; ++i)
                                rs[i] = as[i];
                            cells = rs;
                        }
                    } finally {
                        cellsBusy = 0;
                    }
                    collide = false;
                    continue;                 
                }

数组长度不够时,2倍扩容

4.总结

你现在肯定是云里雾里,讲的都是什么东西.

你现在就只需要知道 LongAdder

通过 base 和 Cell数组 换取更高的性能

5.附上源码解析

当然这里你可以跳过了,不用看了

5.1.add方法

public void add(long x) {
        // as 表示cells引用
        // b 表示获取的base值
        // v 表示期望值
        // m 表示cells数组的长度
        // a 表示当前线程命中cell单元格
        Cell[] as; long b, v; int m; Cell a;
        //条件一: --> true 表示cells已经初始化过了,当前线程应该把数据写到对应的cell中
        //        --> false 表示cells还没有初始化过,当前线程所有的数据都被写到base中
        //条件二: --> false 表示cas替换值成功
        //        --> true 表示发生竞争了,可能需要重试 或者 扩容
        if ((as = cells) != null || !casBase(b = base, b + x)) {
            //什么时候会进来?
            //1.true 表示cells已经初始化过了,当前线程应该把数据写到对应的cell中
            //2.true cells还没有初始化,但是发生竞争了
            // true -> 未竞争  false ->发生竞争
            boolean uncontended = true;
            //as == null || (m = as.length - 1) < 0 看做是一个条件
            //条件一:true --> 说明cells还没有被初始化,也就是多线程写base发生竞争了进来的,也就是通过第二个条件进来的
            //      false --> 说明cells已经初始化了,可以去寻找自己的cell进行赋值
            //条件二: getProbe() 可以理解为获取当前线程的hash值, m表示cells长度-1  注意:cells的次方数一定是2的次方
            // true --> 说明当前线程对应的下标的cell为空,需要创建 longAccumulate 支持
            // false --> 说明当前线程对应的下标cell不为空,下一步需要将 x,添加到cell中
            //条件三: true --> 代表cas失败,代表当前线程对应的cell,有竞争
            //        false --> 表示cas成功
            if (as == null || (m = as.length - 1) < 0 ||
                (a = as[getProbe() & m]) == null ||
                !(uncontended = a.cas(v = a.value, v + x)))
                //都哪些情况会调用这里的方法?
                //1.true --> 说明cells还没有被初始化,也就是多线程写base发生竞争了进来的
                //2.true --> 说明当前线程对应的下标的cell为空,需要创建 longAccumulate 支持
                //3.true --> 代表cas失败,代表当前线程对应的cell,有竞争
                longAccumulate(x, null, uncontended);
        }
    }

5.2.longAccumulate方法

 /**
     * //1.true --> 说明cells还没有被初始化,也就是多线程写base发生竞争了进来的,到时候会进行初始化cell
     * //2.true --> 说明当前线程对应的下标的cell为空,需要创建 longAccumulate 支持
     * //3.true --> 对应的下标也有cell,但是cas失败,代表当前线程对应的cell,有竞争
     */
    //wasUncontended,只有cells初始化之后,并且当前线程 竞争修改失败,才会是false,其他情况下都是true
    final void longAccumulate(long x, LongBinaryOperator fn,
                              boolean wasUncontended) {
        //表示线程的hash值
        int h;
        //条件成立,说明当前线程还未分配hash值
        if ((h = getProbe()) == 0) {
            //给当前线程分配hash值
            ThreadLocalRandom.current(); // force initialization
            //取出当前线程的hash值赋值给h
            h = getProbe();
            //为什么?默认情况下,可定写入到了cell[0]的位置,不把他当做一次真正的竞争??
            wasUncontended = true;
        }
        //表示扩容意向, false表示不会扩容 , true有可能会扩容
        boolean collide = false;                // True if last slot nonempty
        //自旋
        for (;;) {
            //as 表示cells引用
            //a 表示当前线程命中的cell
            //n 表示cell的数组长度
            //v 表示期望值
            Cell[] as; Cell a; int n; long v;
            //情况1:as不为空表示cells已经初始化了,当前线程应该将数据写入到对应的cell中
            if ((as = cells) != null && (n = as.length) > 0) {
                //2.true --> 说明当前线程对应的下标的cell为空,需要创建 longAccumulate 支持
                //3.true --> 代表cas失败,代表当前线程对应的cell,有竞争[重试或者扩容]
                //情况1.1: true 表示当前线程对应的下标位置的cell为null,需要创建cell
                if ((a = as[(n - 1) & h]) == null) {
                    //true 表示当前锁未被占用,false-->表示锁被占用
                    if (cellsBusy == 0) {       // Try to attach new Cell
                        //拿x创建cell
                        Cell r = new Cell(x);   // Optimistically create
                        //条件一:true 表示当前锁未被占用,false-->表示锁被占用
                        //条件二:true表示当前线程获取锁成功,false表示当前线程获取锁失败,
                        if (cellsBusy == 0 && casCellsBusy()) {
                            //是否创建成功的标记
                            boolean created = false;
                            try {               // Recheck under lock
                                //rs表示cells引用
                                //m 表示cells长度
                                //j 表示当前线程命中的下标
                                Cell[] rs; int m, j;
                                //条件一,条件二,恒成立的
                                //rs[j = (m - 1) & h] == null 为了防止其他线程已经初始化话过了,防止当前线程再次修改,覆盖掉原来的cell
                                if ((rs = cells) != null &&
                                    (m = rs.length) > 0 &&
                                    rs[j = (m - 1) & h] == null) {
                                    rs[j] = r;
                                    created = true;
                                }
                            } finally {
                                cellsBusy = 0;
                            }
                            if (created)
                                break;
                            continue;           // Slot is now non-empty
                        }
                    }
                    //扩容意向改为false
                    collide = false;
                }
                //情况1.2
                //wasUncontended,只有cells初始化之后,并且当前线程 竞争修改失败,才会是false,其他情况下都是true
                else if (!wasUncontended)       // CAS already known to fail
                    wasUncontended = true;      // Continue after rehash
                //情况1.3 :当前线程rehash过后,然后新命中的cell不为空
                //true --> 写成功 退出
                //false -- > rehash过后命中的cell,也有竞争  这里为重试一次,再重试一次
                else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                             fn.applyAsLong(v, x))))
                    break;
                    //情况1.4.
                    // 条件一: n >= NCPU  true --> 扩容意向改为False ,表示不扩容了   false --> 表示cells还可以扩容
                   //条件二:  true --> 表示其他线程已经扩容过了,当前线程rehash之后重试即可
                else if (n >= NCPU || cells != as)
                    collide = false;            // At max size or stale
                //默认开始为false 的, 设置为true,需要扩容,但是不一定真的发生扩容
                else if (!collide)
                    collide = true;
                //情况6.真正的扩容逻辑
                //条件一: cellsBusy == 0 true  --> 表示当前无锁状态,当前线程可以去竞争这把锁
                //条件二:casCellsBusy() true --> 表示当前线程获取锁成功,可以执行扩容逻辑 ,false 表示有其他线程在做相关的操作
                // 尝试两次之后
                else if (cellsBusy == 0 && casCellsBusy()) {
                    try {
                        if (cells == as) {      // Expand table unless stale
                            //n<<1 翻倍
                            Cell[] rs = new Cell[n << 1];
                            for (int i = 0; i < n; ++i)
                                rs[i] = as[i];
                            cells = rs;
                        }
                    } finally {
                        //释放锁
                        cellsBusy = 0;
                    }
                    collide = false;
                    continue;                   // Retry with expanded table
                }
                //重置当前线程hash值
                h = advanceProbe(h);
            }
            //情况2:前置条件,cells为进行初始化,as为null
            //条件一: cellsBusy为true当前未加锁
            //条件二: cells == as 为什么? 因为其他线程在你给as赋值之后修改了cells
            //条件三: true 表示获取锁成功 ,会把cellBusy 设置为 1 , false 表示其他线程在持有这个锁
            else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
                boolean init = false;
                try {
                    // 为什么还有一次判断呢?
                    //防止其他线程已经初始化了,当前线程再次初始化,导致丢失数据
                    if (cells == as) {
                        Cell[] rs = new Cell[2];
                        rs[h & 1] = new Cell(x);
                        cells = rs;
                        init = true;
                    }
                } finally {
                    //释放锁
                    cellsBusy = 0;
                }
                if (init)
                    break;
            }
            //情况三:
            //1.当前cellBusy加锁状态,表示其他线程正在初始化cells,所以当前线程累加到base
            //2.cells被其他线程初始化之后,当前线程需要将数据累加到base
            else if (casBase(v = base, ((fn == null) ? v + x :
                                        fn.applyAsLong(v, x))))
                break;                          // Fall back on using base
        }
    }

加载全部内容

相关教程
猜你喜欢
用户评论