CAS&Atomic原子操作详解
什么是原子操作?如何实现原子操作?
我们在接触到事务的时候,了解到事务的一大特性是原子性,一个事务要么全部执行、要么全部不执行。
并发里的原子性和事务里的原子性有一样的内涵和概念。假定有2个操作A和B都包含多个步骤,从线程A的角度看,线程B要么全部执行要么全部不执行,B看A也是如此,那么A和B对彼此来说都是原子的。
实现原子操作可以使用锁,锁机制,满足基本需求是没有问题的,但有时候我们需要更有效,更加灵活的机制。
synchronized 关键字是基于阻塞的锁机制,也就是说当一个线程拥有锁的时候, 访问同一资源的其它线程需要等待,直到该线程释放锁。
这里会有些问题: 首先,如果被阻塞的线程优先级很高很重要怎么办?其次, 如果获得锁的线程一直不释放锁怎么办? 同时,还有可能出现一些例如死锁之类 的情况, 最后, 其实锁机制是一种比较粗糙, 粒度比较大的机制, 相对于像计数 器这样的需求有点儿过于笨重。为了解决这个问题,Java 提供了 Atomic 系列的 原子操作类。
这些原子操作类其实是使用当前的处理器基本都支持 CAS 的指令,比如 Intel 的汇编指令
cmpxchg,每个厂家所实现的具体算法并不一样,但是原理基本一样。 每一个 CAS 操作过程都包含三个运算符: 一个内存地址 V,一个期望的值 A 和一 个新值 B,操作的时候如果这个地址上存放的值等于这个期望的值 A,则将地址 上的值赋为新值 B,否则不做任何操作。
CAS 的基本思路就是, 如果这个地址上的值和期望的值相等, 则给其赋予新值, 否则不做任何事儿,但是要返回原值是多少。 自然 CAS 操作执行完成时, 在 业务上不一定完成了, 这个时候我们就会对 CAS 操作进行反复重试, 于是就有了 循环CAS。很明显, 循环CAS就是在一个循环里不断的做cas操作, 直到成功为 止。 Java 中的 Atomic 系列的原子操作类的实现则是利用了循环CAS来实现。
CAS 实现原子操作的三大问题
ABA 问题。
即值从A设置为B,再由B设置到A,这整个过程CAS是不知道的,还以为A从来没改过。
解决该问题的方法是使用版本号。即A->B->A变成了1A->2B->3A。
循环时间长开销大。
自旋CAS如果长时间不成功,会大量的消耗CPU资源
只能保证一个共享变量的原子操作。
当对一个共享变量执行操作时, 我们可以使用循环 CAS 的方式来保证原子操 作, 但是对多个共享变量操作时, 循环 CAS 就无法保证操作的原子性, 这个时候 就可以用锁。
Jdk 中相关原子操作类的使用
AtomicInteger
- int AddAndGet(int delta):可以将输入的值delta与源值相加,然后将相加的值返回
- boolean compareAndSet(int expect ,int update):如果数据的值等于预期值expect,则用update更新原值
- int getAndIncrement():以原子的方式+1,注意,此处返回的值是自增前的值
- int getAndSet(int newValue):以原子的方式设置为新值,并返回旧值
AtomicIntegerArray
主要是提供原子的方式更新数组里的整型,其常用方法如下。
- int addAndGet(int i ,int delta):以原子方式将输入值与数组中索引i的元素相加,并返回修改后的值
- boolean compareAndSet(int i ,int expect ,int update):如果当前值等于 预期值,则以原子方式将数组位置i的元素设置成 update 值。
需要注意的是, 数组 value 通过构造方法传递进去, 然后 AtomicIntegerArray 会将当前数组复制一份,所以当 AtomicIntegerArray 对内部的数组元素进行修改 时,不会影响传入的数组。
更新引用类型
原子更新基本类型的 AtomicInteger,只能更新一个变量, 如果要原子更新多 个变量,就需要使用这个原子更新引用类型提供的类。 Atomic 包提供了以下 3 个类。
AtomicReference
原子更新引用类型。
如下例
public class UseAtomicReference {
static AtomicReference<UserInfo> atomicUserRef;
public static void main(String[] args) {
UserInfo user = new UserInfo("Mark", 15);//要修改的实体的实例
atomicUserRef = new AtomicReference(user);
UserInfo updateUser = new UserInfo("Bill",17);
atomicUserRef.compareAndSet(user,updateUser);
System.out.println(atomicUserRef.get());
System.out.println(user);
}
//定义一个实体类
static class UserInfo {
private volatile String name;
private int age;
public UserInfo(String name, int age) {
this.name = name;
this.age = age;
}
public String getName() {
return name;
}
public int getAge() {
return age;
}
@Override
public String toString() {
return super.toString() + "UserInfo{" +
"name='" + name + '\'' +
", age=" + age +
'}';
}
}
}
AtomicStampedReference
利用版本戳的形式记录了每次改变以后的版本号, 这样的话就不会存在 ABA 问题了。这就是AtomicStampedReference 的解决方案。AtomicMarkableReference 跟 AtomicStampedReference 差不多,AtomicStampedReference是使用 pair 的 int stamp 作为计数器使用,AtomicMarkableReference 的 pair使用的是 boolean mark。 还是那个水的例子,AtomicStampedReference 可能关心的是动过几次,AtomicMarkableReference 关心的是有没有被人动过,方法都比较简单。
public class UseAtomicStampedReference {
static AtomicStampedReference<String> asr
= new AtomicStampedReference("mark",0);
public static void main(String[] args) throws InterruptedException {
//拿到当前的版本号(旧)
final int oldStamp = asr.getStamp();
final String oldReference = asr.getReference();
System.out.println(oldReference+"============"+oldStamp);
Thread rightStampThread = new Thread(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+ "A:" + ":当前变量值:"
+oldReference + "-当前版本戳:" + oldStamp + "-"
+ asr.compareAndSet(oldReference,
oldReference + "+Java", oldStamp,
oldStamp + 1));
}
});
Thread errorStampThread = new Thread(new Runnable() {
@Override
public void run() {
String reference = asr.getReference();
System.out.println(Thread.currentThread().getName() + "B:"
+":当前变量值:"
+reference + "-当前版本戳:" + asr.getStamp() + "-"
+ asr.compareAndSet(reference,
reference + "+C", oldStamp,
oldStamp + 1));
}
});
rightStampThread.start();
rightStampThread.join();
errorStampThread.start();
errorStampThread.join();
System.out.println(asr.getReference()+"============"+asr.getStamp());
}
}
AtomicMarkableReference
原子更新带有标记位的引用类型。可以原子更新一个布尔类型的标记位和引 用类型。构造方法是AtomicMarkableReference(V initialRef,boolean initialMark)。
原子更新字段类
如果需原子地更新某个类里的某个字段时,就需要使用原子更新字段类, Atomic 包提供了以下3个类进行原子字段更新。
要想原子地更新字段类需要两步。第一步,因为原子更新字段类都是抽象类, 每次使用的时候必须使用静态方法 newUpdater()创建一个更新器, 并且需要设置 想要更新的类和属性。第二步,更新类的字段(属性)必须使用 public volatile 修饰符。
AtomicIntegerFieldUpdater:
原子更新整型的字段的更新器。
AtomicLongFieldUpdater:
原子更新长整型字段的更新器。
AtomicReferenceFieldUpdater:
原子更新引用类型里的字段。
LongAdder
JDK1.8 时,java.uti l.concurrent.atomic 包中提供了一个新的原子类:LongAdder。 根据 Oracle官方文档的介绍, LongAdder在高并发的场景下会比它的前辈——AtomicLong 具有更好的性能,代价是消耗更多的内存空间。
AtomicLong是利用了底层的CAS操作来提供并发性的, 调用了Unsafe类的getAndAddLong方法, 该方法是个native方法, 它的逻辑是采用自旋的方式不断更新目标值,直到更新成功。
在并发量较低的环境下,线程冲突的概率比较小,自旋的次数不会很多。但 是,高并发环境下,N个线程同时进行自旋操作,会出现大量失败并不断自旋的情况,此时 AtomicLong 的自旋会成为瓶颈。
这就是LongAdder引入的初衷——解决高并发环境下AtomicLong的自旋瓶颈问题。
AtomicLong中有个内部变量value保存着实际的 long 值,有volatile修饰,所有的操作都是针对该变量进行。也就是说,高并发环境下,value变量其实是一个热点,也就是N个线程竞争一个热点。
LongAdder的基本思路就是分散热点,将value值分散到一个数组中,不同线程会命中到数组的不同槽中,各个线程只对自己槽中的那个值进行 CAS 操作,这样热点就被分散了,冲突的概率就小很多。如果要获取真正的long值,只要将各个槽中的变量值累加返回。
LongAdder提供的api和AtomicLong比较接近,两者都能以原子的方式对long进行增减。但AtomicLong提供的接口更丰富,尤其是addAndGet、 decrementAndGet、compareAndSet 这些方法。addAndGet、 decrementAndGet提供了先增减再获得增减后的值的功能,而LongAdder则需要做同步控制才能精确获取增减后的值。如果需求需要精确的控制计数,做计数比较,AtomicLong更合适,另外,从空间方面考虑,LongAdder其实是一种“空间换时间”的思想,从这一点来讲AtomicLong更适合。
在低并发、一般的业务场景下AtomicLong是足够了。如果并发量很多,存在大量写多读少的情况,用LongAdder更合适。如果出现了是使用AtomicLong还是LongAdder的场景,需要对两种方案进行性能测试,以准确评估当前场景下的性能。
abstract class Striped64 extends Number {
...
transient volatile Cell[] cells;
/**
* Base value, used mainly when there is no contention, but also as
* a fallback during table initialization races. Updated via CAS.
*/
transient volatile long base;
...
public long sum() {
Cell[] as = cells; Cell a;
long sum = base;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
public void add(long x) {
Cell[] as; long b, v; int m; Cell a;
if ((as = cells) != null || !casBase(b = base, b + x)) {
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
longAccumulate(x, null, uncontended);
}
}
...
}
LongAdder继承了Striped64,Striped64定义了base存基础值,Cell[]数组:竞态条件下,累加各个线程自己的槽Cell[i]中。最终累计的结果的是sum方法
而LongAdder最终结果的求和,并没有使用全局锁,返回值不是绝对准确的,因为调用这个方法时还有其他线程可能正在进行计数累加,所以只能得到某个时 刻的近似值,这也就是LongAdder并不能完全替代LongAtomic的原因之一。而且从测试情况来看,线程数越多,并发操作数越大,LongAdder的优势越 大,线程数较小时,AtomicLong 的性能还超过了 LongAdder。
其他新增
除了LongAdder,还有其他三个类LongAccumulator、DoubleAdder、
DoubleAccumulator。
LongAccumulator是LongAdder的增强版。LongAdder只能针对数值的进行加减运算,而LongAccumulator提供了自定义的函数操作。通过LongBinaryOperator,可以自定义对入参的任意操作,并返回结果(LongBinaryOperator接收2个long作为参数,并返回1个 long)。
LongAccumulator 内部原理和 LongAdder 几乎完全一样。
DoubleAdder和DoubleAccumulator用于操作double原始类型。
原文地址:https://blog.csdn.net/ariestse9/article/details/136350732
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!