在实习的时候,需要对公司内部的分布式框架(RPC框架)进行拓展。在阅读该RPC框架源码的时候,发现该框架中较多地方使用了自增原子类,而原子类又是基于AQS实现,在秋招之前阅读过AQS框架,但是都是粗粗的阅读了一些博客,并没有对源码进行阅读。如今,趁着过年有时间对AQS源码进行梳理。
1. 原理简介
2. 部分Node类分析
根据原理可知道,AQS是一个线程同步工具,其主要作用是内部维持了一个双向队列,以及一个状态,如果没有获取到状态,那么该线程则会被加入等待队列。而这个队列中的节点(Node)则是AQS内部实现的类,其主要的属性如下:
static final AbstractQueuedSynchronizer.Node SHARED = new AbstractQueuedSynchronizer.Node();
static final AbstractQueuedSynchronizer.Node EXCLUSIVE = null;
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
volatile int waitStatus; // 等待状态(是否超时等)
volatile AbstractQueuedSynchronizer.Node prev; // 指向前一个节点
volatile AbstractQueuedSynchronizer.Node next; // 指向后一个节点
volatile Thread thread; // 获取线程信息
AbstractQueuedSynchronizer.Node nextWaiter;
private static final VarHandle NEXT; // VarHandle为本地同步类
private static final VarHandle PREV;
private static final VarHandle THREAD;
private static final VarHandle WAITSTATUS;
注:在java9 之后均使用VarHandle来实现CAS操作,代替了原先的Unsafe类,其好处是屏蔽了内存模型,使得可以适应不同的系统。
框架意义
只需要根据AQS提供的工具实现排斥锁和共享锁即可,而无须关注状态是否获取成功,是否需要排队,唤醒等操作。
框架原理
1. 属性
首先,AQS框架是为了设计锁而存在的,而锁的就是对状态的获取。在AQS中通过private volatile int state来表示锁的状态,在独占锁时只有0、1两个状态,在共享锁时大于等于0表示锁的状态。其次,我们说过AQS解决了解决了线程等待、排队以及唤醒的问题,这一过程是通过双向链表来完成的。所以在AQS中有head以及tail两个节点。AQS的基础原理是基于CAS来完成状态获取操作(无论是获取排他锁还是共享锁),CAS操作又是基于VarHandle来完成,所以在类属性中存在
private static final VarHandle STATE; // 操作状态
private static final VarHandle HEAD; // 操作头节点
private static final VarHandle TAIL; // 操作尾节点
(对于这三个状态的操作采用了三个工具,本身也是为了互不干扰,如果是采用一个VarHandle变量,会影响框架的效率。比方说:在A线程需要通过VarHandle设置头节点,此时B线程需要通过VarHandle设置尾节点,需要等待A线程操作结束之后才行。当然以上是我的猜测。)
具体的属性列表如下:
private static final long serialVersionUID = 7373984972572414691L;
private transient volatile AbstractQueuedSynchronizer.Node head;
private transient volatile AbstractQueuedSynchronizer.Node tail;
private volatile int state;
static final long SPIN_FOR_TIMEOUT_THRESHOLD = 1000L;
private static final VarHandle STATE;
private static final VarHandle HEAD;
private static final VarHandle TAIL;
2. 方法
protected final boolean compareAndSetState(int expect, int update) {
return STATE.compareAndSet(this, expect, update);
}
上面一段代码没啥可以说的,就是利用STATE完成CAS操作。
private AbstractQueuedSynchronizer.Node enq(AbstractQueuedSynchronizer.Node node) {
while(true) {
AbstractQueuedSynchronizer.Node oldTail = this.tail; // 获取尾节点
if (oldTail != null) {
node.setPrevRelaxed(oldTail); // 将node的前一个节点设置为oldTail节点
if (this.compareAndSetTail(oldTail, node)) {
oldTail.next = node;
return oldTail;
}
} else {
this.initializeSyncQueue();
}
}
}
// 下面就是setPrevRelaxed方法的签名
final void setPrevRelaxed(AbstractQueuedSynchronizer.Node p) {
PREV.set(this, p);
}
enq方法的主要作用就是通过CAS+自旋完成尾节点的插入。
private AbstractQueuedSynchronizer.Node addWaiter(AbstractQueuedSynchronizer.Node mode) {
AbstractQueuedSynchronizer.Node node = new AbstractQueuedSynchronizer.Node(mode);
AbstractQueuedSynchronizer.Node oldTail;
do {
while(true) {
oldTail = this.tail;
if (oldTail != null) {
node.setPrevRelaxed(oldTail); // node节点的前向指针指向当前尾节点
break;
}
this.initializeSyncQueue(); // 更新
}
} while(!this.compareAndSetTail(oldTail, node));
oldTail.next = node; // 尾节点的后向指针指向node节点
return node;
}
addWaiter方法的主要作用就是CAS + 自旋完成节点的添加,与enq方法的不同时,这里的插入是形成了双向链表。
private void unparkSuccessor(AbstractQueuedSynchronizer.Node node) {
int ws = node.waitStatus;
if (ws < 0) {
node.compareAndSetWaitStatus(ws, 0);
}
AbstractQueuedSynchronizer.Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for(AbstractQueuedSynchronizer.Node p = this.tail; p != node && p != null; p = p.prev) {
if (p.waitStatus <= 0) {
s = p;
}
}
}
if (s != null) {
LockSupport.unpark(s.thread);
}
}
unparkSuccessor方法的主要任务是:唤醒其他等待锁的节点。
其主要流程是:
1. 如果当前节点的状态为取消状态,则对其进行初始化。
2. 拿到node后面的一个节点。
3. 如果后边的节点为空或者被取消(waitStatus > 0 则表明节点已经被取消)
4. 开始从尾部开始进行迭代。原因是:节点被阻塞的时候,是在 acquireQueued 方法里面被阻塞的,唤醒时也一定会在 acquireQueued 方法里面被唤醒,唤醒之后的条件是,判断当前节点的前置节点是否是头节点,这里是判断当前节点的前置节点,所以这里必须使用从尾到头的迭代顺序才行,目的就是为了过滤掉无效的前置节点,不然节点被唤醒时,发现其前置节点还是无效节点,就又会陷入阻塞。
条件队列
主要是因为并不是所有场景一个同步队列就可以搞定的,在遇到锁 + 队列结合的场景时,就需要 Lock + Condition 配合才行,先使用 Lock 来决定哪些线程可以获得锁,哪些线程需要到同步队列里面排队阻塞;获得锁的多个线程在碰到队列满或者空的时候,可以使用 Condition 来管理这些线程,让这些线程阻塞等待,然后在合适的时机后,被正常唤醒。
说白了就是同步队列的预备队列。在同步队列满了之后,阻塞的线程无法进入同步队列,这时候会进入条件队列(代表获得了抢占锁的机会)。同步队列是负责互斥,也就是如果没有获取锁就等着,等待获取锁的那一刻。而条件队列是告诉你什么时候可以去等待获取锁。类似于银行排队,一个窗口只能负责一个人,但是一个窗口有很多人在排队,如果排队的人过多,让你先不要排队,先去等候区坐着,等到了满足条件的时候,大堂经理会告诉你要准备了(可以进入排队)。
原文:https://www.cnblogs.com/jihuabai/p/12239062.html