Java Atomic简介

所谓 Atomic,翻译过来就是原子。原子被认为是操作中最小的单位,一段代码如果是原子的,则表示这段代码在执行过程中,要么执行成功,要么执行失败。原子操作一般都是底层通过 CPU 的指令来实现。而 atomic 包下的这些类,则可以让我们在多线程环境下,通过一种无锁的原子操作来实现线程安全。

atomic 包下的类基本上都是借助 Unsafe 类,通过 CAS 操作来封装实现的。Unsafe 这个类不属于 Java 标准,或者说这个类是 Java 预留的一个后门类,JDK 中,有关提升性能的 concurrent 或者 NIO 等操作,大部分都是借助于这个类来封装操作的。
Java 是种编译型语言,不像 C 语言能支持操作内存,正常情况下都是由 JVM 进行内存的创建回收等操作,但这个类提供了一些直接操作内存相关的底层操作,使得我们也可以手动操作内存,但从类的名字就可以看出,这个类不是安全的,官方也是不建议我们使用的。

CAS原理

CAS 包含 3 个参数 CAS(V,E,N). V 表示要更新的变量, E 表示预期值, N 表示新值.

仅当V值等于E值时, 才会将V的值设为N, 如果V值和E值不同, 则说明已经有其他线程做了更新, 则当前线程什么都不做. 最后, CAS返回当前V的真实值. CAS操作是抱着乐观的态度进行的, 它总是认为自己可以成功完成操作.

当多个线程同时使用CAS操作一个变量时, 只有一个会胜出, 并成功更新, 其余均会失败.失败的线程不会被挂起,仅是被告知失败, 并且允许再次尝试, 当然也允许失败的线程放弃操作.基于这样的原理, CAS操作即时没有锁,也可以发现其他线程对当前线程的干扰, 并进行恰当的处理.

JDK8atomic 包下,大概有 16 个类,按照原子的更新方式,大概可以分为 4 类:原子更新普通类型原子更新数组原子更新引用原子更新字段

原子更新普通类型

atomic 包下提供了三种基本类型的原子更新,分别是 AtomicBooleanAtomicIntegerAtomicLong,这几个原子类对应于基础类型的布尔,整形,长整形,至于 Java 中其他的基本类型,如 float 等,如果需要,可以参考这几个类的源码自行实现。

AtomicBoolean

主要接口

1
2
3
4
5
6
public final boolean get();
public final boolean compareAndSet(boolean expect, boolean update);
public boolean weakCompareAndSet(boolean expect, boolean update);
public final void set(boolean newValue);
public final void lazySet(boolean newValue);
public final boolean getAndSet(boolean newValue);

这里面的操作都很正常,主要都是用到了 CAS。这个类中的方法不多,基本上上面都介绍了,而内部的计算则是先将布尔转换为数字0/1,然后再进行后续计算。

AtomicLong

主要接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public final long get();
public final void set(long newValue);
public final void lazySet(long newValue);
public final long getAndSet(long newValue);
public final boolean compareAndSet(long expect, long update);
public final boolean weakCompareAndSet(long expect, long update);
public final long getAndIncrement();
public final long getAndDecrement();
public final long getAndAdd(long delta);
public final long incrementAndGet();
public final long decrementAndGet();
public final long addAndGet(long delta);
public final long getAndUpdate(LongUnaryOperator updateFunction);
public final long updateAndGet(LongUnaryOperator updateFunction);

这个和下面要讲的 AtomicInteger 类似,下面具体说下。

AtomicInteger

主要接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 取得当前值
public final int get();
// 设置当前值
public final void set(int newValue);
// 设置新值,并返回旧值
public final int getAndSet(int newValue);
// 如果当前值为expect,则设置为u
public final boolean compareAndSet(int expect, int u);
// 当前值加1,返回旧值
public final int getAndIncrement();
// 当前值减1,返回旧值
public final int getAndDecrement();
// 当前值增加delta,返回旧值
public final int getAndAdd(int delta);
// 当前值加1,返回新值
public final int incrementAndGet();
// 当前值减1,返回新值
public final int decrementAndGet();
// 当前值增加delta,返回新值
public final int addAndGet(int delta);

实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 封装了一个int对其加减
private volatile int value;
.......
public final boolean compareAndSet(int expect, int update) {
// 通过unsafe 基于CPU的CAS指令来实现, 可以认为无阻塞.
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
.......
public final int getAndIncrement() {
for (;;) {
// 当前值
int current = get();
// 预期值
int next = current + 1;
if (compareAndSet(current, next)) {
// 如果加成功了, 则返回当前值
return current;
}
// 如果加失败了, 说明其他线程已经修改了数据, 与期望不相符,
// 则继续无限循环, 直到成功. 这种乐观锁, 理论上只要等两三个时钟周期就可以设值成功
// 相比于直接通过synchronized独占锁的方式操作int, 要大大节约等待时间.
}
}

用一个简单的例子测试下:

1
2
3
4
5
6
AtomicInteger atomicInteger = new AtomicInteger(1);
System.out.println(atomicInteger.incrementAndGet()); // 2
System.out.println(atomicInteger.getAndIncrement()); // 2
System.out.println(atomicInteger.getAndAccumulate(2, (i, j) -> i + j)); // 3
System.out.println(atomicInteger.get()); // 5
System.out.println(atomicInteger.addAndGet(5));

原子更新数组

atomic 包下提供了三种数组相关类型的原子更新,分别是 AtomicIntegerArrayAtomicLongArrayAtomicReferenceArray,对应于整型,长整形,引用类型,要说明的一点是,这里说的更新是指更新数组中的某一个元素的操作。

由于方法和更新基本类型方法相同,这里只简单看下 AtomicIntegerArray 这个类的几个方法,其他的方法类似。

AtomicIntegerArray

主要接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 获得数组第i个下标的元素
public final int get(int i);
// 获得数组的长度
public final int length();
// 将数组第i个下标设置为newValue,并返回旧的值
public final int getAndSet(int i, int newValue);
// 进行CAS操作,如果第i个下标的元素等于expect,则设置为update,设置成功返回true
public final boolean compareAndSet(int i, int expect, int update);
// 将第i个下标的元素加1
public final int getAndIncrement(int i);
// 将第i个下标的元素减1
public final int getAndDecrement(int i);
// 将第i个下标的元素增加delta(delta可以是负数)
public final int getAndAdd(int i, int delta);

实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// 数组本身基地址
private static final int base = unsafe.arrayBaseOffset(int[].class);

// 封装了一个数组
private final int[] array;

static {
// 数组中对象的宽度, int类型, 4个字节, scale = 4;
int scale = unsafe.arrayIndexScale(int[].class);
if ((scale & (scale - 1)) != 0)
throw new Error("data type scale not a power of two");
// 前导0 : 一个数字转为二进制后, 他前面0的个数
// 对于4来讲, 他就是00000000 00000000 00000000 00000100, 他的前导0 就是29
// 所以shift = 2
shift = 31 - Integer.numberOfLeadingZeros(scale);
}

// 获取第i个元素
public final int get(int i) {
return getRaw(checkedByteOffset(i));
}

// 第i个元素, 在数组中的偏移量是多少
private long checkedByteOffset(int i) {
if (i < 0 || i >= array.length)
throw new IndexOutOfBoundsException("index " + i);

return byteOffset(i);
}

// base : 数组基地址, i << shift, 其实就是i * 4, 因为这边是int array.
private static long byteOffset(int i) {
// i * 4 + base
return ((long) i << shift) + base;
}

// 根据偏移量从数组中获取数据
private int getRaw(long offset) {
return unsafe.getIntVolatile(array, offset);
}

用一个简单的例子测试一下:

1
2
3
4
AtomicIntegerArray array = new AtomicIntegerArray(5);
array.set(0, 1); // 设置数组第一个值为1
System.out.println(array.getAndDecrement(0)); // 1
System.out.println(array.addAndGet(0, 5)); // 5

原子更新引用

更新引用类型的原子类包含了AtomicReference(更新引用类型),AtomicReferenceFieldUpdater(抽象类,更新引用类型里的字段),AtomicMarkableReference(更新带有标记的引用类型)这三个类,这几个类能同时更新多个变量。

AtomicReference

AtomicInteger 类似, 只是里面封装了一个对象, 而不是 int, 对引用进行修改。

主要接口

1
2
3
4
5
6
7
public final V get();

public final void set(V newValue);

public final boolean compareAndSet(V expect, V update);

public final V getAndSet(V newValue);

测试
使用 10 个线程, 同时尝试修改 AtomicReference 中的 String, 最终只有一个线程可以成功。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import java.util.concurrent.atomic.AtomicReference;

public class AtomicReferenceTest {
public final static AtomicReference<String> attxnicStr = new AtomicReference<String>("abc");

public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
new Thread() {
public void run() {
try {
Thread.sleep(Math.abs((int) (Math.random() * 100)));
} catch (InterruptedException e) {
e.printStackTrace();
}
if (attxnicStr.compareAndSet("abc", "def")) {
System.out.println("Thread:" + Thread.currentThread().getId() + " change value to " + attxnicStr.get());
} else {
System.out.println("Thread:" + Thread.currentThread().getId() + " change failed!");
}
}
}.start();
}
}
}

原子更新字段

如果更新的时候只更新对象中的某一个字段,则可以使用 atomic 包提供的更新字段类型:AtomicIntegerFieldUpdaterAtomicLongFieldUpdaterAtomicStampedReference,前两个顾名思义,就是更新 intlong 类型,最后一个是更新引用类型,该类提供了版本号,用于解决通过 CAS 进行原子更新过程中,可能出现的 ABA 问题。
前面这两个类和上面介绍的 AtomicReferenceFieldUpdater 有些相似,都是抽象类,都需要通过 newUpdater 方法进行实例化,并且对字段的要求也是一样的。

AtomicStampedReference

ABA问题

线程一准备用 CAS 将变量的值由 A 替换为 B, 在此之前线程二将变量的值由 A 替换为 C, 线程三又将 C 替换为A, 然后线程一执行 CAS 时发现变量的值仍然为 A, 所以线程一 CAS 成功.

主要接口

1
2
3
4
5
6
7
8
// 比较设置 参数依次为:期望值 写入新值 期望时间戳 新时间戳
public boolean compareAndSet(V expectedReference,V newReference,int expectedStamp,int newStamp)
// 获得当前对象引用
public V getReference()
// 获得当前时间戳
public int getStamp()
// 设置当前对象引用和时间戳
public void set(V newReference, int newStamp)

分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// 内部封装了一个Pair对象, 每次对对象操作的时候, stamp + 1
private static class Pair<T> {
final T reference;
final int stamp;
private Pair(T reference, int stamp) {
this.reference = reference;
this.stamp = stamp;
}
static <T> Pair<T> of(T reference, int stamp) {
return new Pair<T>(reference, stamp);
}
}

private volatile Pair<V> pair;

// 进行cas操作的时候, 会对比stamp的值
public boolean compareAndSet(V expectedReference,
V newReference,
int expectedStamp,
int newStamp) {
Pair<V> current = pair;
return
expectedReference == current.reference &&
expectedStamp == current.stamp &&
((newReference == current.reference &&
newStamp == current.stamp) ||
casPair(current, Pair.of(newReference, newStamp)));
}

测试

要求:后台使用多个线程对用户充值, 要求只能充值一次.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public class AtomicStampedReferenceDemo {
static AtomicStampedReference<Integer> money=new AtomicStampedReference<Integer>(19,0);
public staticvoid main(String[] args) {
//模拟多个线程同时更新后台数据库,为用户充值
for(int i = 0 ; i < 3 ; i++) {
final int timestamp=money.getStamp();
newThread() {
public void run() {
while(true){
while(true){
Integerm=money.getReference();
if(m<20){
if(money.compareAndSet(m,m+20,timestamp,timestamp+1)){
                System.out.println("余额小于20元,充值成功,余额:"+money.getReference()+"元");
break;
}
}else{
//System.out.println("余额大于20元,无需充值");
break ;
}
}
}
}
}.start();
}

//用户消费线程,模拟消费行为
new Thread() {
publicvoid run() {
for(int i=0;i<100;i++){
while(true){
int timestamp=money.getStamp();
Integer m=money.getReference();
if(m>10){
System.out.println("大于10元");
  if(money.compareAndSet(m, m-10,timestamp,timestamp+1)){
       System.out.println("成功消费10元,余额:"+money.getReference());
break;
}
}else{
System.out.println("没有足够的金额");
break;
}
}
try {Thread.sleep(100);} catch (InterruptedException e) {}
}
}
}.start();
}
}

AtomicIntegerFieldUpdater

能够让普通变量也能够进行原子操作。

主要接口

1
2
3
4
public static <U> AtomicIntegerFieldUpdater<U> newUpdater(Class<U> tclass,
String fieldName);

public int incrementAndGet(T obj);
  • Updater只能修改它可见范围内的变量。因为Updater使用反射得到这个变量。如果变量不可见,就会出错。比如如果score申明为private,就是不可行的。

  • 为了确保变量被正确的读取,它必须是volatile类型的。如果我们原有代码中未申明这个类型,那么简单得申明一下就行。

  • 由于CAS操作会通过对象实例中的偏移量直接进行赋值,因此,它不支持static字段(Unsafe.objectFieldOffset()不支持静态变量)。

测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

public class AtomicIntegerFieldUpdaterDemo {
public static class Candidate {
int id;
// 如果直接把int改成atomicinteger, 可能对代码破坏比较大
// 因此使用AtomicIntegerFieldUpdater对score进行封装
volatile int score;
}

// 通过反射实现
public final static AtomicIntegerFieldUpdater<Candidate> scoreUpdater = AtomicIntegerFieldUpdater.newUpdater(Candidate.class, "score");
// 检查Updater是否工作正确, allScore的结果应该跟score一致
public static AtomicInteger allScore = new AtomicInteger(0);

public static void main(String[] args) throws InterruptedException {
final Candidate stu = new Candidate();
Thread[] t = new Thread[10000];
for (int i = 0; i < 10000; i++) {
t[i] = new Thread() {
public void run() {
if (Math.random() > 0.4) {
scoreUpdater.incrementAndGet(stu);
allScore.incrementAndGet();
}
}
};
t[i].start();
}
for (int i = 0; i < 10000; i++) {
t[i].join();
}

System.out.println("score=" + stu.score);
System.out.println("allScore=" + allScore);
}
}

JDK8之后引入的类型

JDK8之前,针对原子操作,我们基本上可以通过上面提供的这些类来完成我们的多线程下的原子操作,不过在并发高的情况下,上面这些单一的 CAS + 自旋操作的性能将会是一个问题,所以上述这些类一般用于低并发操作。
而针对这个问题,JDK8又引入了下面几个类:DoubleAdderLongAdderDoubleAccumulatorLongAccumulator,这些类是对AtomicLong这些类的改进与增强,这些类都继承自Striped64这个类。

Java Atomic.png

参考博客:
Java高并发之无锁与Atomic源码分析


Java Atomic简介
https://muchen.fun/passages/java-atomic-introduction/
作者
沐晨
发布于
2019年10月16日
许可协议