JUC源码详细分析之Part1——《CAS和原子类型》

在笔者看来,后端的技术点主要是两个,一个是并发一个是IO。其中并发涉及很多知识点:线程安全、锁、线程池等等,而这些知识点都在Java的JUC包下。

本系列笔记笔者将会详细的讲解JUC中核心类的源码,其中基于的JDK版本为1.8.0_291。由于JUC内容很多,因此笔者将笔记拆为四部分:

另外,本系列笔记部分内容参考自《深入浅出 Java Concurrency》

1. CAS

1.1 原子操作

假设疫情期间,你们老板要做一款微信扫描场所码的功能,其中有个功能需要你统计一下一天共扫了多少次码。

作为资深程序员的你很快就写好了代码:

public class ScanCode {
    private int count = 0;

    //注:打印语句打印出的并非是当前线程加过后的count值,这两句也并非原子操作,这里加打印语句只是方便展示问题
    public void scanCode(){
       count++;
       System.out.println("当前扫码:"+count);
    }
}

这段代码在单线程下没有任何问题,但是如果多线程一起请求就会有想不到的错误,我们假设当前有1000个用户一起来扫码:

public static void main(String[] args) {
    ScanCode scanCode = new ScanCode();
    Thread[] threads = new Thread[1000];
    for(int i = 0; i < 1000; i++){
        threads[i] = new Thread(() -> scanCode.scanCode());
    }
    for(Thread thread:threads){
        thread.start();
    }
}

打印结果中最大的一条==“当前扫码”==的记录往往是小于1000的,且每次运行得到的结果也不相同。

image-20220519120059225

这是为什么呢?

首先我们需要知道现代计算机为了弥补CPU与存储介质的速度差异,往往采用多级缓存的架构。一般CPU在做计算的时候会将内存中的数据加载到寄存器,在寄存器中对数据操作计算后,再写回到内存。如果在这之间发生中断,进行了线程上下文切换,会有可能发生意想不到的情况。我们模拟两个线程A,B的执行来解释这一情况:

时间OS线程A线程B
T1 从内存中取count,此时count=50,将50写入寄存器
T2 在寄存器中做++操作,此时寄存器中的值是51
T3发生中断,保存线程A状态,恢复线程B执行
T4 从内存中取count,此时count=50,将50写入寄存器
T5 在寄存器中做++操作,此时寄存器中的值是51
T6 将51写回内存,此时内存中count=51
T7发生中断,保存线程B状态,恢复线程A执行
T8 将寄存器中的值写回内存,此时count=51

可以看到,我们扫了两次码,但count只加了1。

根本原因在于count++这一操作不是原子操作。那么什么是原子操作?《Java并发编程实战》一书中给出了定义,但描述的依然生涩难懂。笔者在此简单解释下:所谓原子操作就是不可再切分的操作,不能在运行期间插入别的任务的,一个要么一口气执行完要么不执行的操作。

以上面的线程AB例子来看:在执行count++的过程中,需要做如下几件事:

  1. 从内存中将数据加载到寄存器
  2. 在寄存器中做操作
  3. 将值写回内存

上述任意一步之间均可被中断,此时就不是一个原子操作。

1.2 CAS

我们需要一个超级指令,当这个指令执行时,它会像期望那样执行更新。它不能在指令中间中断,如果发生中断,指令根本没有运行,或者运行完成,没有中间状态。

而这一超级指令是由现代CPU实现的(注意是硬件CPU,而非软件层)。

现代CPU提供了一个叫CAS的功能,CAS全称compare and swap,即比较并交换,其用伪代码大概如下:

int compareAndSwap(int * ptr, int expected, int new){
    int actual = *ptr;
    if(actual == expected){
        *ptr = new;
    }
    return actual;
}

基本思路是检测 ptr 指向的值是否和 expected 相等;如果是,更新 ptr 所指的值为新值。否则,什么也不做。不同的是这一过程会原子的执行。

CAS有一个高频的面试问题——ABA问题:

比如说一个线程one从内存位置V中取出A,这时候另一个线程two也从内存中取出A,并且two进行了一些操作变成了B,然后two又将V位置的数据变成A,这时候线程one进行CAS操作发现内存中仍然是A,然后one操作成功。尽管线程one的CAS操作成功,但是不代表这个过程就是没有问题的。如果链表的头在变化了两次后恢复了原值,但是不代表链表就没有变化。

对于ABA问题,也有一些解决方案,比较常见的一种是加版本号,也即每次更改都对应一个唯一的版本号,通过判断前后两次版本号是否一致来判断当前值是否与expected相等。

CAS的思想异常重要,说它是Java并发的基石也不为过。整个JUC都是建立在CAS上的。如果你还不了解CAS有什么用或者该怎么用,没关系,我们下面很快会讲到。

1.3 整体认识

img

上图为JUC的整体内容,我们将围绕原子类,锁,集合和线程池来讲解。

2. 原子类型

原子类型是JUC并发中一个相对简单的知识点,它会加强我们对CAS的理解,同时又是后续的锁,线程池等知识点的基础。

2.1 AtomicInteger

在第一章中,我们以一个count++的例子引出了原子操作和CAS。很多时候我们往往就是需要一个线程安全的count++的方案,并且这个方案要足够的简单高效。首先加锁可以排除掉,因为一般情况下我们认为加锁这个操作太重了,不符合简单高效的思想,那能不能借助CAS呢?

答案是肯定的,在Java中,AtomicInteger就是这一实现。我们先看下AtomicInteger是如何做到线程安全的累加的

public class ScanCode {
    private AtomicInteger count = new AtomicInteger();

    public void scanCode(){
        int thisCount = count.incrementAndGet();
        System.out.println("当前扫码:"+thisCount);
    }
}
public static void main(String[] args) {
    ScanCode scanCode = new ScanCode();
    Thread[] threads = new Thread[1000];
    for(int i = 0; i < 1000; i++){
        threads[i] = new Thread(() -> scanCode.scanCode());
    }
    for(Thread thread:threads){
        thread.start();
    }
}

这个方法无论多少线程测试多少遍结果都是正确的。

其中incrementAndGet()就是线程安全的count++操作。那么AtomicInteger是如何实现的呢?

public class AtomicInteger {
    private static final long valueOffset;
    //内部维持一个数字,这就是我们的count值
    private volatile int value;
    
    //这段代码看不懂没关系,你只需要知道valueOffset和this(这个对象)加起来可以原子的获取到value的值就可以了
    static {
        try {
            valueOffset = unsafe.objectFieldOffset
                (AtomicInteger.class.getDeclaredField("value"));
        } catch (Exception ex) { throw new Error(ex); }
    }
    
    public final int incrementAndGet() {
        //调用getAndAddInt操作
        return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
    }
    //getAndAddInt会以一个do while的形式不断调用CAS操作,直到某一次执行成功为止
    public final int getAndAddInt(Object var1, long var2, int var4) {
        int var5;
        do {
            var5 = this.getIntVolatile(var1, var2);
        } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
        return var5;
    }
}

上述代码中最复杂的应该是getAndAddInt函数。首先我们先回顾一下第一章中CAS的伪代码

int compareAndSwap(int * ptr, int expected, int new){
    int actual = *ptr;
    if(actual == expected){
        *ptr = new;
    }
    return actual;
}

这段代码与getAndAddInt中的this.compareAndSwapInt(var1, var2, var5, var5 + var4)做个对比,其中:

var1和var2对应上述伪代码的*ptr,var5对应的是expected,var5+var4对应的是new。

另外getAndAddInt中的getIntVolatile是获得当前value的值。

有了这些概念后我们再来看getAndAddInt函数。我们首先得到当前count的值,将它赋值给var5,然后由于传入的var4是1,因此上述代码可以描述为:

  1. 获得当前value的值
  2. 如果刚才获得的value的值与我们现在的value的值相等,那么就将value更新为value+1,结束执行。
  3. 如果不相等,那么就重新回到步骤1执行。

do while就是一个不断尝试,如果某次执行不成功,那肯定代表在执行getIntVolatilecompareAndSwapInt之间有其他线程修改了value的值,此时我们就需要重新获取最新的value值,在此基础上再做线程安全的value++操作。

CAS与循环基本是成对出现的,因为我们知道CAS只保证了原子的执行一个操作,但不保证一定执行成功,此时就需要重复执行,直到某次执行成功为止。单次不循环的CAS基本没什么意义。

另外Java中的CAS是通过JNI实现的,底层是C++,因此上述代码中的getIntVolatilecompareAndSwapInt都是native方法,对于native方法,本笔记不做更多说明。

public native int getIntVolatile(Object var1, long var2);
public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);

了解了AtomicInteger#incrementAndGet之后其实基本就懂了JUC下所有的原子类型操作的底层,除了incrementAndGetAtomicInteger还支持如下常用方法:

//以原子方式将给定值与当前值相加。 实际上就是等于线程安全版本的i =i+delta操作。
int addAndGet(int delta)

//如果当前值 == 预期值,则以原子方式将该值设置为给定的更新值。 如果成功就返回true,否则返回false,并且不修改原值。
boolean compareAndSet(int expect, int update)
          
//以原子方式将当前值减 1。 相当于线程安全版本的--i操作。
int decrementAndGet()
          
//获取当前值。
int get()
         
//以原子方式将给定值与当前值相加。 相当于线程安全版本的t=i;i+=delta;return t;操作。
int getAndAdd(int delta)
          
// 以原子方式将当前值减 1。 相当于线程安全版本的i--操作。
int getAndDecrement()
         
//以原子方式将当前值加 1。 相当于线程安全版本的i++操作。
int getAndIncrement()
          
//以原子方式设置为给定值,并返回旧值。 相当于线程安全版本的t=i;i=newValue;return t;操作。
int getAndSet(int newValue)
          
//以原子方式将当前值加 1。 相当于线程安全版本的++i操作。 
int incrementAndGet()

这其中的原子操作都是通过CAS来完成的,我想当你看到方法名的时候就已经猜到了底层大概的实现逻辑了。

另外除了AtomicInteger,JUC还支持AtomicLongAtomicReferenceAtomicBoolean等类型,其实现与AtomicInteger基本相似,此处不再做介绍,感兴趣的可以自行查看JDK的源码。

2.2 数组的原子操作

AtomicIntegerArray为例,我们看看其常用的API:

//将索引i处的值设置为newValue
void set(int i, int newValue)
//返回索引i处的值并将其设置为newValue
int getAndSet(int i, int newValue)
//原子的方式,如果索引i处的值是expect,就将索引i处的值设置为update
boolean compareAndSet(int i, int expect, int update)
//原子的方式,返回索引i处的值并做++操作
int getAndIncrement(int i)
//原子的方式,返回索引i处的值并做--操作
int getAndDecrement(int i)
//原子的方式,返回索引i处的值并做+delta操作
int getAndAdd(int i, int delta)
//原子的方式,将索引i处的值做++操作并返回
int incrementAndGet(int i)
//原子的方式,将索引i处的值做--操作并返回
int decrementAndGet(int i)
//原子的方式,将索引i处的值并做+delta操作并返回
int addAndGet(int i, int delta)

其底层实现与AtomicInteger基本相似,只不过包了一层数组,需要做数组越界的校验,我们以incrementAndGet为例来看下源码:

public final int incrementAndGet(int i) {
    return getAndAdd(i, 1) + 1;
}
//此处delta为1,代表+1操作(++)
public final int getAndAdd(int i, int delta) {
    return unsafe.getAndAddInt(array, checkedByteOffset(i), delta);
}
//校验数组越界
private long checkedByteOffset(int i) {
    if (i < 0 || i >= array.length)
        throw new IndexOutOfBoundsException("index " + i);

    return byteOffset(i);
}
//这个函数我们在AtomicInteger时讲过了,一模一样,就是循环加CAS,原子的操作,直到成功
public final int getAndAddInt(Object var1, long var2, int var4) {
    int var5;
    do {
        var5 = this.getIntVolatile(var1, var2);
    } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));

    return var5;
}

JUC下的AtomicLongArrayAtomicReferenceArray实现也基本相同(核心都是CAS),不再赘述。

2.3 引用的原子操作

引用的原子操作难点不在于源码,而在于怎么用。我们先以一个例子来说明:

还是第一章中微信扫码的例子,你需要统计一下一天共扫了多少次码,我们假设你没有使用AtomicInteger类型,就是使用了int类型,且现在不允许将int改为AtomicInteger,但你还是想高效简单的线程安全的做count++操作,可以吗?

public class ScanCode {
    int count = 0;
}

可以,此时就需要引用的原子操作,所谓引用的原子操作就是对一个类里的某一个引用来进行原子的修改,拿我们本例来说就是对ScanCode类里的count值做原子的++操作。

public class ScanCode {
    volatile int count;
    AtomicIntegerFieldUpdater<ScanCode> fieldUpdater =
            AtomicIntegerFieldUpdater.newUpdater(ScanCode.class, "count");

    public void scanCode(){
        int newCount = fieldUpdater.incrementAndGet(this);
        System.out.println("当前扫码:"+newCount);
    }
}

可以看到,我们使用了AtomicIntegerFieldUpdater类型对原来的int类型做了包装,并利用这个包装类来做++操作,看到这里很多同学基本也猜到了其底层其实就是反射,通过传入class和fieldName,反射的获取属性然后原子的更改属性的值。

但是引用的原子操作对类中的引用字段有一些要求:

  1. 字段必须是volatile类型的
  2. 字段的描述类型(修饰符public/protected/default/private)是与调用者与操作对象字段的关系一致。也就是说调用者能够直接操作对象字段,那么就可以反射进行原子操作。但是对于父类的字段,子类是不能直接操作的,尽管子类可以访问父类的字段。
  3. 只能是实例变量,不能是类变量,也就是说不能加static关键字。
  4. 只能是可修改变量,不能使final变量,因为final的语义就是不可修改。实际上final的语义和volatile是有冲突的,这两个关键字不能同时存在。
  5. 对于AtomicIntegerFieldUpdaterAtomicLongFieldUpdater只能修改int/long类型的字段,不能修改其包装类型(Integer/Long)。如果要修改包装类型就需要使用AtomicReferenceFieldUpdater

2.4 volatile

其实如果你观察的够仔细的话会发现我们一直在使用volatile关键字,但却未讲解其是什么,以及为什么要用。由于volatile的讲解涉及到了Java的内存模型,笔者虽看过些许书籍但依然无法理解透,因此对于volatile,本文档也只是简单的说一下,后续笔者会再补上对volatile更详细的说明。

volatile主要解决了多线程间可见性这一问题,什么是可见性?一个线程修改了变量,另一个线程也需要立马能够看到这个变量的修改就是可见性。那为什么不加volatile就是不可见的?

首先Java的内存模型规定,每个线程在运行的时候都会缓冲一份堆内存的部分字段的值,因此线程修改堆内存都是先保存到线程栈内存,然后再刷新到堆,这就会存在短暂的不可见性。比如一个线程已经修改了堆内存的值,但由于还未刷新到堆,且其他线程未感知到,因此其他线程还持有的是旧的值。

一种典型的场景如下:

public class VolatileTest {
    boolean flag = true;
}
public void test() {
    VolatileTest volatileTest = new VolatileTest();
    //线程1
    new Thread(()->{
        while (volatileTest.flag){
            //do something
        }
    }).start();
    //线程2
    new Thread(()->{
        volatileTest.flag = false;
    }).start();
}

上述代码虽然线程2修改了volatileTest.flag的值,但很有可能线程1永远都不会停止,因为线程2的修改对线程1不可见。

volatile就解决了这一问题,每个线程的修改都会立马被其他线程看到。

另外volatile不同于锁,其只保证了可见性,未保证原子性,但锁可以保证原子性和可见性。

那么volatile适合什么场景下呢?

  1. 对变量的写操作不依赖于当前值(每次操作都是幂等操作)。
  2. 只有一个线程在修改这个值,其余线程都是读。

如果存在一个线程修改,其他线程读的情况,就可以只使用volatile关键字,而无需加锁,但是如果是多线程的修改(前提是非幂等的修改),为保证线程安全性就需要加锁。

最后修改:2023 年 07 月 23 日
如果觉得我的文章对你有用,请随意赞赏