Java 中的并发锁大致分为隐式锁和显式锁两种。
隐式锁就是我们最常使用的 synchronized 关键字,显式锁主要包含两个接口:Lock 和 ReadWriteLock,主要实现类分别为 ReentrantLock 和 ReentrantReadWriteLock,
这两个类都是基于 AQS(AbstractQueuedSynchronizer) 实现的。还有的地方将 CAS 也称为一种锁,在包括 AQS 在内的很多并发相关类中,CAS 都扮演了很重要的角色。
就是一些名词的概念,知道他们是什么中文意思,知道理解意思之后在学习代码的实现就比较轻松了。
这个文章也算带着看了 AbstractQueuedSynchronizer 和 ReentrantReadWriteLock 和 ReentrantLock 的源码。
相关阅读:
CAS(https://www.cnblogs.com/zwtblog/p/15129821.html#cas)
悲观锁和独占锁是一个意思,
它假设一定会发生冲突,因此获取到锁之后会阻塞其他等待线程。
这么做的好处是简单安全,但是挂起线程和恢复线程都需要转入内核态进行,这样做会带来很大的性能开销。
悲观锁的代表是 synchronized。
然而在真实环境中,大部分时候都不会产生冲突。悲观锁会造成很大的浪费。
而乐观锁不一样,它假设不会产生冲突,先去尝试执行某项操作,失败了再进行其他处理(一般都是不断循环重试)。
这种锁不会阻塞其他的线程,也不涉及上下文切换,性能开销小。代表实现是 CAS。
公平锁是指各个线程在加锁前先检查有无排队的线程,按排队顺序去获得锁。
非公平锁是指线程加锁前不考虑排队问题,直接尝试获取锁,获取不到再去队尾排队。
值得注意的是,在 AQS 的实现中,一旦线程进入排队队列,即使是非公平锁,线程也得乖乖排队。
如果一个线程已经获取到了一个锁,那么它可以访问被这个锁锁住的所有代码块。
不可重入锁与之相反。
Synchronized 是一种独占锁。
在修饰静态方法时,锁的是类,如 Object.class。
修饰非静态方法时,锁的是对象,即 this。修饰方法块时,锁的是括号里的对象。
每个对象有一个锁和一个等待队列,锁只能被一个线程持有,其他需要锁的线程需要阻塞等待。
锁被释放后,对象会从队列中取出一个并唤醒,唤醒哪个线程是不确定的,不保证公平性。
synchronized 是基于 Java 对象头和 Monitor 机制来实现的。
一个对象在内存中包含三部分:对象头,实例数据和对齐填充。
其中 Java 对象头包含两部分:
Mark Word 有一个字段指向 monitor 对象。
monitor 中记录了锁的持有线程,等待的线程队列等信息。
前面说的每个对象都有一个锁和一个等待队列,就是在这里实现的。 monitor 对象由 C++ 实现。其中有三个关键字段:
Monitor的操作机制如下:
上面了解了 monitor 的机制,那虚拟机是如何将 synchronized 和 monitor 关联起来的呢?
分两种情况:
synchronized 是重量级锁,由于消耗太大,虚拟机对其做了一些优化。
在许多应用中,锁定状态只会持续很短的时间,为了这么一点时间去挂起恢复线程,不值得。
我们可以让等待线程执行一定次数的循环,在循环中去获取锁。这项技术称为自旋锁,它可以节省系统切换线程的消耗,但仍然要占用处理器。
在 JDK1.4.2 中,自选的次数可以通过参数来控制。 JDK 1.6又引入了自适应的自旋锁,不再通过次数来限制,而是由前一次在同一个锁上的自旋时间及锁的拥有者的状态来决定。
虚拟机在运行时,如果发现一段被锁住的代码中不可能存在共享数据,就会将这个锁清除。
当虚拟机检测到有一串零碎的操作都对同一个对象加锁时,会把锁扩展到整个操作序列外部。如 StringBuffer 的 append 操作。
对绝大部分的锁来说,在整个同步周期内都不存在竞争。如果没有竞争,轻量级锁可以使用 CAS 操作避免使用互斥量的开销。
偏向锁的核心思想是,如果一个线程获得了锁,那么锁就进入偏向模式,当这个线程再次请求锁时,无需再做任何同步操作,即可获取锁。
CAS 是 compare and swap 的简写,即比较并交换。
它是指一种操作机制,而不是某个具体的类或方法。
在 Java 平台上对这种操作进行了包装。在 Unsafe 类中,调用代码如下:
unsafe.compareAndSwapInt(this, valueOffset, expect, update);
它需要三个参数,分别是内存位置 V,旧的预期值 A 和新的值 B。
操作时,先从内存位置读取到值,然后和预期值A比较。如果相等,则将此内存位置的值改为新值 B,返回 true。如果不相等,说明和其他线程冲突了,则不做任何改变,返回 false。
这种机制在不阻塞其他线程的情况下避免了并发冲突,比独占锁的性能高很多。 CAS 在 Java 的原子类和并发包中有大量使用。
第一,CAS 本身并未实现失败后的处理机制,它只负责返回成功或失败的布尔值,后续由调用者自行处理。只不过我们最常用的处理方式是重试而已。
第二,这句话很容易理解错,被理解成重新比较并交换。实际上失败的时候,原值已经被修改,如果不更改期望值,再怎么比较都会失败。而新值同样需要修改。
所以正确的方法是,使用一个死循环进行 CAS 操作,成功了就结束循环返回,失败了就重新从内存读取值和计算新值,再调用 CAS。看下 AtomicInteger 的源码就什么都懂了:
public final int incrementAndGet () {
for (;;) {
int current = get();
int next = current + 1;
if (compareAndSet(current, next))
return next;
}
}
CAS 主要分三步,读取-比较-修改。
其中比较是在检测是否有冲突,如果检测到没有冲突后,其他线程还能修改这个值,那么 CAS 还是无法保证正确性。所以最关键的是要保证比较-修改这两步操作的原子性。
CAS 底层是靠调用 CPU 指令集的 cmpxchg 完成的,它是 x86 和 Intel 架构中的 compare and exchange 指令。在多核的情况下,这个指令也不能保证原子性,需要在前面加上 lock 指令。
lock 指令可以保证一个 CPU 核心在操作期间独占一片内存区域。那么 这又是如何实现的呢?
在处理器中,一般有两种方式来实现上述效果:总线锁和缓存锁。
在多核处理器的结构中,CPU 核心并不能直接访问内存,而是统一通过一条总线访问。
总线锁就是锁住这条总线,使其他核心无法访问内存。这种方式代价太大了,会导致其他核心停止工作。
而缓存锁并不锁定总线,只是锁定某部分内存区域。当一个 CPU 核心将内存区域的数据读取到自己的缓存区后,它会锁定缓存对应的内存区域。锁住期间,其他核心无法操作这块内存区域。
CAS 就是通过这种方式实现比较和交换操作的原子性的。
值得注意的是, CAS 只是保证了操作的原子性,并不保证变量的可见性,因此变量需要加上 volatile 关键字。
比如是有两个线程A,B ,一个变量:苹果
A线程期望拿到一个苹果
B线程一进来把苹果改成了梨子,但是在最后结束的时候又把梨子换成了苹果
A线程在此期间是不知情的,以为自己拿到的苹果还是原来的那一个,其实已经被换过了
如何解决ABA问题?
原子引用-解决ABA问题
AtomicStampedReference
类似于乐观锁,比较时会去对比版本号,确认变量是否被换过了。
ReentrantLock 使用代码实现了和 synchronized 一样的语义,包括可重入,保证内存可见性和解决竞态条件问题等。相比 synchronized,它还有如下好处:
基本用法如下:
public class Counter {
private final Lock lock = new ReentrantLock();
private volatile int count;
public void incr() {
lock.lock();
try {
count++;
} finally {
lock.unlock();
}
}
public int getCount() {
return count;
}
}
ReentrantLock 内部有两个内部类,分别是 FairSync 和 NoFairSync,对应公平锁和非公平锁。他们都继承自 Sync。Sync 又继承自AQS。
AQS全称AbstractQueuedSynchronizer
,即抽象的队列同步器,是一种用来构建锁和同步器的框架。
AQS 中有两个重要的成员:
成员变量 state。用于表示锁现在的状态,用 volatile 修饰,保证内存一致性。
同时所用对 state 的操作都是使用 CAS 进行的。
state 为0表示没有任何线程持有这个锁,线程持有该锁后将 state 加1,释放时减1。
多次持有释放则多次加减。
还有一个双向链表,链表除了头结点外,每一个节点都记录了线程的信息,代表一个等待线程。这是一个 FIFO 的链表。
下面以 ReentrantLock 非公平锁的代码看看 AQS 的原理。
CANCELLED
waitStatus值为1时表示该线程节点已释放(超时、中断),已取消的节点不会再阻塞。
SIGNAL
waitStatus为-1时表示该线程的后续线程需要阻塞,即只要前置节点释放锁,就会通知标识为 SIGNAL 状态的后续节点的线程
CONDITION
waitStatus为-2时,表示该线程在condition队列中阻塞(Condition有使用)
PROPAGATE
waitStatus为-3时,表示该线程以及后续线程进行无条件传播(CountDownLatch中有使用)共享模式下, PROPAGATE 状态的线程处于可运行状态
除了同步队列之外,AQS中还存在Condition队列,这是一个单向队列。
调用ConditionObject.await()方法,能够将当前线程封装成Node加入到Condition队列的末尾,
然后将获取的同步状态释放(即修改同步状态的值,唤醒在同步队列中的线程)。
Condition队列也是FIFO。调用ConditionObject.signal()方法,能够唤醒firstWaiter节点,将其添加到同步队列末尾。
所谓独占模式,即只允许一个线程获取同步状态,当这个线程还没有释放同步状态时,其他线程是获取不了的,只能加入到同步队列,进行等待。
很明显,我们可以将state的初始值设为0,表示空闲。
当一个线程获取到同步状态时,利用CAS操作让state加1,表示非空闲,那么其他线程就只能等待了。
释放同步状态时,不需要CAS操作,因为独占模式下只有一个线程能获取到同步状态。
ReentrantLock、CyclicBarrier正是基于此设计的。
例如,ReentrantLock,state初始化为0,表示未锁定状态。
A线程lock()时,会调用tryAcquire()独占该锁并将state+1
独占模式下的AQS是不响应中断的,指的是加入到同步队列中的线程,如果因为中断而被唤醒的话,
不会立即返回,并且抛出InterruptedException。
而是再次去判断其前驱节点是否为head节点,决定是否争抢同步状态。
如果其前驱节点不是head节点或者争抢同步状态失败,那么再次挂起。
请求锁时有三种可能:
/**
* Sync object for non-fair locks
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
//没有线程持有锁时,直接获取锁,对应情况1
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
/**以独占模式获取,忽略中断。通过至少调用一次 {@link tryAcquire} 实现,成功返回。
否则线程会排队,可能会反复阻塞和解除阻塞,调用 {@link tryAcquire} 直到成功。
该方法可用于实现方法{@link Locklock}。
@param arg 获取参数。这个值被传送到 {@link tryAcquire},但不会被解释,可以代表任何你喜欢的东西。*/
public final void acquire(int arg) {
if (!tryAcquire(arg) &&//在此方法中会判断当前持有线程是否等于自己,对应情况2
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))//将自己加入队列中,对应情况3
selfInterrupt();
}
如果没竞争到锁,这时候就要进入等待队列。
队列是默认有一个 head 节点的,并且不包含线程信息。
上面情况3中,addWaiter 会创建一个 Node,并添加到链表的末尾,Node 中持有当前线程的引用。同时还有一个成员变量 waitStatus,表示线程的等待状态,初始值为0。我们还需要关注两个值:
同时,加到链表末尾的操作使用了 CAS+死循环的模式,很有代表性,拿出来看一看:
/**
AbstractQueuedSynchronizer
为当前线程和给定模式创建和排队节点。
@param mode Node.EXCLUSIVE 表示独占,Node.SHARED 表示共享 @return 新节点
*/
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
可以看到,在死循环里调用了 CAS 的方法。
如果多个线程同时调用该方法,那么每次循环都只有一个线程执行成功,其他线程进入下一次循环,重新调用。
N个线程就会循环N次。这样就在无锁的模式下实现了并发模型。
可以看到,一个线程最多有两次机会,还竞争不到就去挂起等待。
/**
以独占不间断模式获取已在队列中的线程。
由条件等待方法以及获取使用。
@param node 节点
@param arg 获取参数
@return {@code true} 如果在等待时中断
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
public final boolean release(int arg) {
if (tryRelease(arg)) { //将 state 减1
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
node.compareAndSetWaitStatus(ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node p = tail; p != node && p != null; p = p.prev)
if (p.waitStatus <= 0)
s = p;
}
if (s != null) //唤醒第一个等待的线程
LockSupport.unpark(s.thread);
}
上面分析的是非公平锁,那公平锁呢?很简单,在竞争锁之前判断一下等待队列中有没有线程在等待就行了。
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() && //判断等待队列是否有节点
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
......
return false;
}
理解 ReentrantLock 和 AQS 之后,再来理解读写锁就很简单了。
读写锁有一个读锁和一个写锁,分别对应读操作和锁操作。
锁的特性如下:
读写锁虽然有两个锁,但实际上只有一个等待队列。
原文:https://www.cnblogs.com/zwtblog/p/15238087.html