Java并发编程(十七)Java并发包中原子操作类原理

逆流者 2021年02月21日 105次浏览

此文为读书笔记,欢迎评论,谈论问题,共同进步!


JUC包提供了一系列的原子性操作类,这些类都是使用非阻塞算法CAS 实现的,相比使用锁实现原子性操作这在性能上有很大提高。

原子性操作类的原理大致相同,下面描述下AtomicLong类的实现原理以及JDK8中新增的LongAdder和 LongAccumulator类的原理。

AtomicLong

AtomicLong是原子性递增或者递减类,其内部使用 Unsafe 来实现,前面的博文中有Unsafe类的介绍(Java 中的原子性操作),了解这个后再来看原子类源码就方便多了,下面我们直接上源码(AtomicLong):

public class AtomicLong extends Number implements java.io.Serializable {
    private static final long serialVersionUID = 1927816293512124184L;

    // 获取 Unsafe 实例
    private static final Unsafe unsafe = Unsafe.getUnsafe();
    // value 变量在AtomicLong 类中的偏移量
    private static final long valueOffset;

    // 判断 JVM 是否支持 Long 类型无锁CAS
    static final boolean VM_SUPPORTS_LONG_CAS = VMSupportsCS8();
    private static native boolean VMSupportsCS8();

    static {
        try {
        	// 获取value 在 AtomicLong 中的偏移量
            valueOffset = unsafe.objectFieldOffset
                (AtomicLong.class.getDeclaredField("value"));
        } catch (Exception ex) { throw new Error(ex); }
    }
	// 实际变量值,是声明为volatile哦,保证多线程下的内存可见性
    private volatile long value;

    public AtomicLong(long initialValue) {
        value = initialValue;
    }

    ....
}

为什么AtomicLong 类中 可以通过 Unsafe.getUnsafe() 来获取到Unsafe类的实例?

因为AtomicLong类也是在rt.jar包下面的,AtomicLong 类就是通过 BootStarp类加载器进行加载的。

AtomicLong 中的主要方法

递增和递减操作代码

// 调用unsafe方法,原子性设置value值为原始值+1,返回值为原始值
public final long getAndIncrement() {
  return unsafe.getAndAddLong(this, valueOffset, 1L);
}
// 调用unsafe方法,原子性设置value值为原始值-1,返回值为原始值
public final long getAndDecrement() {
    return unsafe.getAndAddLong(this, valueOffset, -1L);
}
// 调用unsafe方法,原子性设置value值为原始值+1,返回值为递增之后的值
public final long incrementAndGet() {
    return unsafe.getAndAddLong(this, valueOffset, 1L) + 1L;
}
// 调用unsafe方法,原子性设置value值为原始值-1,返回值为递增之后的值
public final long decrementAndGet() {
    return unsafe.getAndAddLong(this, valueOffset, -1L) - 1L;
}

上代码内部都是通过调用 Unsafe 的 getAndAddLong方法来实现操作,这个函数是个原子性操作,这里第一个参数是 AtomicLong 实例的引用,第二个参数是 value变量在 AtomicLong 中的偏移值,第三个参数是要设置的第二个变量的基础上增加的值。这个Unsafe 的 getAndAddLong方法在之前的博文中有描述,这里在说明一下:

Unsafe.java

  • long getAndAddLong(Object obj,long offset, long addValue)方法∶获取对象 obj 中偏移量为 offset 的变量 volatile 语义的当前值,并设置变量值为原始值 +addValue。
public final long getAndAddLong(Object obj,long offset,long addValue) {
	long l; 
	do {
		l = getLongvolatile (obj,offset);
	while(!compareAndSwapLong(obj,offset,l,l + addValue)); 
	return l;
}

compareAndSet 方法

public final boolean compareAndSet(long expect, long update) {
    return unsafe.compareAndSwapLong(this, valueOffset, expect, update);
}

看上面代码可知,在内部还是调用了unsafe.compareAndSwapLong方法。如果原子变量中的value值等于expect,则使用 update值更新该值并返回 true,否则返回 false。

看了上述的源码,我们也做个例子瞧一瞧:

一个多线程使用AtomicLong统计0的个数的例子:

public class AtomicLongTest {

    /** 原子计数器 */
    private static AtomicLong atomicLong = new AtomicLong();

    /** 数据源 */
    private static Integer[] arrayOne = new Integer[]{0, 1, 2, 3, 0, 6, 2, 8, 0, 4};
    private static Integer[] arrayTwo = new Integer[]{7, 10, 0, 3, 6, 0, 2, 0, 0, 7};

    public static void main(String[] args) throws InterruptedException {
        // threadOne 统计 arrayOne 数组中 0 的个数
        Thread threadOne = new Thread(new Runnable() {
            @Override
            public void run() {
                int size = arrayOne.length;
                for (int i = 0; i < size; i++) {
                    if (arrayOne[i].intValue() == 0) {
                        atomicLong.incrementAndGet();
                    }
                }
            }
        });

        // threadTwo 统计 arrayTwo 数组中 0 的个数
        Thread threadTwo = new Thread(new Runnable() {
            @Override
            public void run() {
                int size = arrayTwo.length;
                for (int i = 0; i < size; i++) {
                    if (arrayTwo[i].intValue() == 0) {
                        atomicLong.incrementAndGet();
                    }
                }
            }
        });

        threadOne.start();
        threadTwo.start();

        threadOne.join();
        threadTwo.join();

        System.out.println("count 0: " + atomicLong.get());
    }
}
count 0: 7

如上代码中的两个线程各自统计自己所持数据中0的个数,每当找到一个0就会调用 AtomicLong 的原子性递增方法。
在没有原子类的情况下,实现计数器需要使用一定的同步措施,比如使用 synchronized关键字等,但是这些都是阻塞算法,对性能有一定损耗,而原子操作类都使用CAS非阻塞算法,性能更好。但是在高并发情况下AtomicLong 还会存在性能问题。JDK8提供了一个在高并发下性能更好的LongAdder类,往下接着看。

JDK 8 新增的原子操作类 LongAdder

LongAdder

AtomicLong 通过 CAS 提供了非阻塞的原子性操作,相比使用阻塞算法的同步器来说它的性能已经很好了,但是AtomicLong在高并发下大量线程会同时去竞争更新同一个原子变量,但是由于同时只有一个线程的 CAS操作会成功,这就造成了大量线程竞争失败后,会通过无限循环不断进行自旋尝试 CAS 的操作,而这会白白浪费 CPU 资源。

因此JDK 8新增了一个原子性递增或者递减类LongAdder用来克服在高并发下使用 AtomicLong 的缺点。

既然AtomicLong的性能瓶颈是由于过多线程同时去竞争一个变量的更新而产生的,那么如果把一个变量分解为多个变量,让同样多的线程去竞争多个资源,是不是就解决了性能问题?

AtomicLong:多个线程同时竞争同一个原子变量
在这里插入图片描述

在这里插入图片描述
上图所示,使用 LongAdder 时,则是在内部维护多个 Cell变量,每个 Cell里面有一个初始值为0的long型变量,这样,在同等并发量的情况下,争夺单个变量更新操作的线程量会减少,这变相地减少了争夺共享资源的并发量。另外,多个线程在争夺同一个Cell 原子变量时如果失败了,它并不是在当前Cell变量上一直自旋CAS重试,而是尝试在其他 Cell的变量上进行CAS 尝试,这个改变增加了当前线程重试CAS 成功的可能性。最后,在获取LongAdder当前值时,是把所有Cell变量的value值累加后再加上base返回的。

LongAdder 维护了一个延迟初始化的原子性更新数组(默认情况下Cell数组是 null)和一个基值变量 base。由于Cells占用的内存是相对比较大的,所以一开始并不创建它,而是在需要时创建,也就是惰性加载。

当一开始判断 Cell数组是 null并且并发线程较少时,所有的累加操作都是对 base 变量进行的。保持 Cell数组的大小为2的N次方,在初始化时 Cell数组中的Cell元素个数为2,数组里面的变量实体是Cell类型。Cell类型是 AtomicLong的一个改进,用来减少缓存的争用 ,也就是解决伪共享问题。

对于大多数孤立的多个原子操作进行字节填充是浪费的,因为原子性操作都是无规律地分散在内存中的(也就是说多个原子性变量的内存地址是不连续的),多个原子变量被放入同一个缓存行的可能性很小。但是原子性数组元素的内存地址是连续的,所以数组内的多个元素能经常共享缓存行,因此这里使用 @sun.misc.Contended注解对 Cell类进行字节填充,这防止了数组中多个元素共享一个缓存行,在性能上是一个提升。

LongAdder 代码分析

根据下面问题我们具体来分析源码:

  1. LongAdder 的结构是怎样的?
  2. 当前线程应该访问 Cell 数组里面的哪一个Cell元素?
  3. 如何初始化 Cell 数组?
  4. Cell数组如何扩容?
  5. 线程访问分配的 Cell元素有冲突后如何处理?
  6. 如何保证线程操作被分配的 Cell 元素的原子性?

先看下 LongAdder 的类图结构:
在这里插入图片描述

看类图,LongAdder类继承自Striped64类,在 Striped64内部维护着三个变量。 LongAdder的真实值其实是 base的值与Cell数组里面所有Cell元素中的value值的累加, base是个基础值,默认为0。cellBusy用来实现自旋锁,状态值只有0和1,当创建Cell元素,扩容Cell数组或者初始化Cell数组时,使用CAS 操作该变量来保证同时只有一个线程可以进行其中之一的操作。

下面看 Cell 的构造:
Striped64.java

// 1. LongAdder 的结构是怎样的?
// @sun.misc.Contended 注解,避免伪共享
@sun.misc.Contended static final class Cell {
	// 内部维护了一个volatile 声明的value 变量
    volatile long value;
    Cell(long x) { value = x; }
    // 6. 如何保证线程操作被分配的 Cell 元素的原子性?
    // cas操作,保证线程更新Call中value变量的原子性
    final boolean cas(long cmp, long val) {
        return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
    }

    // Unsafe mechanics
    private static final sun.misc.Unsafe UNSAFE;
    private static final long valueOffset;
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> ak = Cell.class;
            valueOffset = UNSAFE.objectFieldOffset
                (ak.getDeclaredField("value"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
}

这里我们就解决问题1和问题6了。

LongAdder.java

  • long sum()
    返回当前的值,内部操作是累加所有Cell内部的 value值后再累加 base。
    由于计算总和时没有对 Cell 数组进行加锁,所以在累加过程中可能有其他线程对 Cell中的值进行了修改,也有可能对数组进行了扩容,所以sum返回的值并不是非常精确的,其返回值并不是一个调用 sum方法时的原子快照值。
public long sum() {
    Cell[] as = cells; Cell a;
    long sum = base;
    if (as != null) {
        for (int i = 0; i < as.length; ++i) {
            if ((a = as[i]) != null)
                sum += a.value;
        }
    }
    return sum;
}
  • long longValue()
    调用sum()
  • void reset()
    为重置操作,如下代码把 base置为0,如果 Cell数组有元素,则元素值被重置为0。
public void reset() {
    Cell[] as = cells; Cell a;
    base = 0L;
    if (as != null) {
        for (int i = 0; i < as.length; ++i) {
            if ((a = as[i]) != null)
                a.value = 0L;
        }
    }
}
  • long sumThenReset()
    sum 的改造版本,如下代码在使用sum累加对应的Cell值后,把当前Cell的值重置为0,base 重置为0。这样,当多线程调用该方法时会有问题,比如考虑第一个调用线程清空 Cell的值,则后一个线程调用时累加的都是0值。
public long sumThenReset() {
    Cell[] as = cells; Cell a;
    long sum = base;
    base = 0L;
    if (as != null) {
        for (int i = 0; i < as.length; ++i) {
            if ((a = as[i]) != null) {
                sum += a.value;
                a.value = 0L;
            }
        }
    }
    return sum;
}
  • void add(long x)
    重要方法
public void add(long x) {
    Cell[] as; long b, v; int m; Cell a;
    // (1) 看cells是否为 null,如果为 null 则当前在基础变量 base上进行累加
    if ((as = cells) != null || !casBase(b = base, b + x)) {
    	// 默认非竞争状态
        boolean uncontended = true;
        if (as == null || (m = as.length - 1) < 0 || // (2)
        	// 2. 当前线程应该访问 Cell 数组里面的哪一个Cell元素?
            (a = as[getProbe() & m]) == null || // (3) 当前线程应该访问cells数组里面的哪一个Cell元素(getProbe() & m 确定了下标)
            // 6. 如何保证线程操作被分配的 Cell 元素的原子性?
            !(uncontended = a.cas(v = a.value, v + x))) // (4) 当前线程映射的元素存在, 使用 CAS操作去更新分配的 Cell元素的value值, 更新失败uncontended 标志为false 为竞争状态
            // 如果当前线程映射的元素不存在或者存在但是CAS操作失败
            longAccumulate(x, null, uncontended); // (5)
    }
}

 // Striped64类的方法
final boolean casBase(long cmp, long val) {
    return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);
}
// 用于获取当前线程中变量 threaLocalRandomProbe的值,这个值一开始为0, 在longAccumulate方法中会对其进行初始化
static final int getProbe() {
    return UNSAFE.getInt(Thread.currentThread(), PROBE);
}
// 调整大小或创建单元时使用的自旋锁(通过CAS锁定)。
transient volatile int cellsBusy;
// cells 数组被初始化和扩容
final void longAccumulate(long x, LongBinaryOperator fn,
                      boolean wasUncontended) {
    // (6) 初始化当前线程变量threadLocalRandomProbe的值
    int h;
    if ((h = getProbe()) == 0) {
        ThreadLocalRandom.current(); // 实际在这里初始化的,前面博文讲过
        h = getProbe();
        wasUncontended = true;
    }
    boolean collide = false;                // 如果最后一个插槽非空则为真
    for (;;) {
        Cell[] as; Cell a; int n; long v;
        // 当前线程调用add方法并根据当前线程的随机数 threadLocalRandomProbe和cells元素个数计算要访问的Cell元素下标,然后如果发现对应下标元素的值为null,则新增一个Cell元素到cels数组,并且在将其添加到cels数组之前要竞争设置 cellsBusy 为1。
        if ((as = cells) != null && (n = as.length) > 0) { // (7)
            if ((a = as[(n - 1) & h]) == null) { // (8)
                if (cellsBusy == 0) {       // Try to attach new Cell
                    Cell r = new Cell(x);   // Optimistically create
                    if (cellsBusy == 0 && casCellsBusy()) {
                        boolean created = false;
                        try {               // Recheck under lock
                            Cell[] rs; int m, j;
                            if ((rs = cells) != null &&
                                (m = rs.length) > 0 &&
                                rs[j = (m - 1) & h] == null) {
                                rs[j] = r;
                                created = true;
                            }
                        } finally {
                            cellsBusy = 0;
                        }
                        if (created)
                            break;
                        continue;           // Slot is now non-empty
                    }
                }
                collide = false;
            }
            else if (!wasUncontended)       // CAS already known to fail
                wasUncontended = true;      // Continue after rehash
            // (9) 当前Cell存在 ,如 执行CAS设置
            else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                         fn.applyAsLong(v, x))))
                break;
            // (10) 当前Cell数组元素个数大于CPU个数
            else if (n >= NCPU || cells != as)
                collide = false;            // At max size or stale
            // (11) 是否有冲突
            else if (!collide)
                collide = true;
            // (12) 如果当前元素个数没有达到 CPU 个数并且有冲突则扩容
            // 这里的扩容操作也是先通过CAS 设置cellBusy为1,然后才能进行扩容。
            else if (cellsBusy == 0 && casCellsBusy()) {
                try {
                    if (cells == as) {      // Expand table unless stale
                    	// 4. Cell数组如何扩容?
                    	// (12.1)
                        Cell[] rs = new Cell[n << 1];
                        for (int i = 0; i < n; ++i)
                            rs[i] = as[i];
                        cells = rs;
                    }
                } finally {
	                // (12.2)
                    cellsBusy = 0;
                }
                // (12.3)
                collide = false;
                continue;                   // Retry with expanded table
            }
            // (13) 为了能够找到一个空闲的Cell,重新计算hash值,xorshift算法生成随机数
            h = advanceProbe(h);
        }
        // (14) 初始化Cell数组
        else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
            boolean init = false;
            try {                           // Initialize table
                if (cells == as) {
                	// (14.1) 
                    Cell[] rs = new Cell[2];
                    // (14.2) 
                    rs[h & 1] = new Cell(x);
                    cells = rs;
                    init = true;
                }
            } finally {
	            // (14.3) 
                cellsBusy = 0;
            }
            if (init)
                break;
        }
        else if (casBase(v = base, ((fn == null) ? v + x :
                                    fn.applyAsLong(v, x))))
            break;                          // Fall back on using base
    }
}

/**
* 将cellBusy字段从0设为1,以获取锁定。
*/
final boolean casCellsBusy() {
    return UNSAFE.compareAndSwapInt(this, CELLSBUSY, 0, 1);
}

上面代码比较复杂,这里我们主要关注问题 3、问题4和问题5。
当每个线程第 一次执行到代码 (6) 时,会初始化当前线程变量 threadLocalRandomProbe 的值,上面也说了,这个变量在计算当前线程应该被分配到cells数组的哪一个 Cell 元素时会用到。

cells数组的初始化是在代码 (14) 中进行的,其中cellsBusy是一个标示,为0说明当前cells数组没有在被初始化或者扩容,也没有在新建Cell元素,为1则说明cells数组在被初始化或者扩容,或者当前在创建新的 Cell元素、通过CAS操作来进行0或1状态的切换,这里使用 casCellsBusy 函数。假设当前线程通过CAS设置cellsBusy为1,则当前线程开始初始化操作,那么这时候其他线程就不能进行扩容了。如代码 (14.1) 初始化 cells数组元素个数为2,然后使用h&1计算当前线程应该访问cell数组的哪个位置,也就是使用当前线程的threadLocalRandomProbe变量值&(cells数组元素个数-1),然后标示cells 数组已经被初始化,最后代码 (14.3) 重置了cellBusy 标记。显然这里没有使用 CAS操作,却是线程安全的,原因是cellsBusyvolatile类型的,这保证了变量的内存可见性,另外此时其他地方的代码没有机会修改cellsBusy的值。在这里初始化的 cells数组里面的两个元素的值目前还是 null。这里回答了问题3,知道了cells数组如何被初始化。

cells数组的扩容是在代码 (12) 中进行的,对cells扩容是有条件的,也就是代码 (10) (11) 的条件都不满足的时候。具体就是当前cells的元素个数小于当前机器CPU个数并且当前多个线程访问了cells 中同一个元素,从而导致冲突使其中一个线程 CAS失败时才会进行扩容操作。这里为何要涉及CPU个数呢?
其实只有当每个CPU都运行一个线程时才会使多线程的效果最佳,也就是当cells数组元素个数与CPU个数一致时,每个Cell都使用一个CPU进行处理,这时性能才是最佳的。代码 (12) 中的扩容操作也是先通过CAS 设置cellsBusy为1,然后才能进行扩容。假设CAS成功则执行代码 (12.1) 将容量扩充为之前的2 倍,并复制Cell元素到扩容后数组。另外,扩容后cells数组里面除了包含复制过来的元素外,还包含其他新元素,这些元素的值目前还是null。这里回答了问题4。

在代码 (7) (8) 中,当前线程 调用 add方法 并 根据当前线 程的 随机 数 threadLocalRandomProbe和clls元素个数计算要访问的Cell元素下标,然后如果发现对应下标元素的值为null,则新增一个Cell元素到cels数组,并且在将其添加到cells数组之前要竞争设置 celsBusy 为1。

代码 (13) 对CAS失败的线程重新计算当前线程的随机值 threadLocalRandomProbe,以减少下次访问 cells 元素时的冲突机会。这里回答了问题5。

使用示例

public class LongAdderTest {

    private static LongAdder longAdder = new LongAdder();

    /**
     * 数据源
     */
    private static Integer[] arrayOne = new Integer[]{0, 1, 2, 3, 0, 6, 2, 8, 0, 4};
    private static Integer[] arrayTwo = new Integer[]{7, 10, 0, 3, 6, 0, 2, 0, 0, 7};

    public static void main(String[] args) throws InterruptedException {
        // threadOne 统计 arrayOne 数组中 0 的个数
        Thread threadOne = new Thread(new Runnable() {
            @Override
            public void run() {
                int size = arrayOne.length;
                for (int i = 0; i < size; i++) {
                    if (arrayOne[i].intValue() == 0) {
                        longAdder.increment();
                    }
                }
            }
        });

        // threadTwo 统计 arrayTwo 数组中 0 的个数
        Thread threadTwo = new Thread(new Runnable() {
            @Override
            public void run() {
                int size = arrayTwo.length;
                for (int i = 0; i < size; i++) {
                    if (arrayTwo[i].intValue() == 0) {
                        longAdder.increment();
                    }
                }
            }
        });

        threadOne.start();
        threadTwo.start();

        threadOne.join();
        threadTwo.join();

        System.out.println("count 0: " + longAdder);
    }
}

count 0: 7

小结

本节介绍了JDK8中新增的LongAdder原子性操作类,该类通过内部cells数组分担了高并发下多线程同时对一个原子变量进行更新时的竞争量,让多个线程可以同时对cells数组里面的元素进行并行的更新操作。另外,数组元素 Cell使用@sun.misc.Contended注解进行修饰,这避免了cells数组内多个原子变量被放入同一个缓存行,也就是避免了伪共享,这对性能也是一个提升。

LongAccumulator 类原理探究

LongAdder类是 LongAccumulator 的一个特例,LongAccumulator 比LongAdder 的功能更强大。例如下面的构造函数,其中accumulatorFunction是一个双目运算器接口,其根据输入的两个参数返回一个计算值,identity 则是LongAccumulator 累加器的初始值。

public LongAccumulator(LongBinaryOperator accumulatorFunction,
                   long identity) {
    this.function = accumulatorFunction;
    base = this.identity = identity;
}

@FunctionalInterface
public interface LongBinaryOperator {

    // 根据两个参数计算并返回一个值
    long applyAsLong(long left, long right);
}

上面提到,LongAdder 其实是 LongAccumulator 的一个特例,调用LongAdder 就相当于使用下面的方式调用 LongAccumulator ∶

LongAdder adder = new LongAdder();
LongAccumulator accumulator = new LongAccumulator(new LongBinaryOperator() {
    @Override
    public long applyAsLong(long left, long right) {
        return left + right;
    }
}, 0);

LongAccumulator 相比于LongAdder,可以为累加器提供非0的初始值,后者只能提供默认的0值。另外,前者还可以指定累加规则,比如不进行累加而进行相乘,只需要在构造LongAccumulator 时传入自定义的双目运算器即可,后者则内置累加的规则。

从下面代码我们可以知道,LongAccumulator相比于LongAdder的不同在于,在调用casBase 时后者传递的是 b+x,前者则使用了r = function.applyAsLong(v = a.value, x)来计算。

// LongAdder的add方法
public void add(long x) {
    Cell[]as; long b, v; int m; Cell a;
    if ((as = cells) != null || !casBase(b = base, b + x)) {
        boolean uncontended = true;
        if (as == null || (m = as.length - 1) < 0 ||
            (a = as[getProbe() & m]) == null ||
            !(uncontended = a.cas(v = a.value, v + x)))
            longAccumulate(x, null, uncontended);
    }
}
// LongAccumulator的accumulate方法
public void accumulate(long x) {
    Cell[] as; long b, v, r; int m; Cell a;
    if ((as = cells) != null ||
        (r = function.applyAsLong(b = base, x)) != b && !casBase(b, r)) {
        boolean uncontended = true;
        if (as == null || (m = as.length - 1) < 0 ||
            (a = as[getProbe() & m]) == null ||
            !(uncontended =
              (r = function.applyAsLong(v = a.value, x)) == v ||
              a.cas(v, r)))
            longAccumulate(x, function, uncontended);
    }
}

另外,前者在调用 longAccumulate时传递的是 function,而后者是null。从下面的代码可知,当 fn为 null时就使用v+x加法运算,这时候就等价于LongAdder,当fn不为null时则使用传递的 fn 函数计算。

else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                             fn.applyAsLong(v, x))))

总结∶LongAdder类是LongAccumulator的一个特例,只是后者提供了更加强大的功能,可以让用户自定义累加规则。