释义

AQS----AbstractQueuedSynchronizer----抽象的队列同步器

volatile+cas机制实现的锁模板,保证了代码的同步性和可见性,而AQS封装了线程阻塞等待挂起,解锁唤醒其他线程的逻辑。AQS子类只需根据状态变量,判断是否可获取锁,是否释放锁,使用LockSupport挂起、唤醒线程即可

AQS体系架构

蓝色:继承;红色:内部;绿色:实现

AQS抽象类简介

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public abstract class AbstractQueuedSynchronizer  
extends AbstractOwnableSynchronizer
implements java.io.Serializable {

private static final long serialVersionUID = 7373984972572414691L;

protected AbstractQueuedSynchronizer() { }

static final class Node {...}
private transient volatile Node head;

private transient volatile Node tail;

private volatile int state;

protected final int getState() {
return state;
}
..........

Node: 将抢占资源的线程封装为node节点
head&tail: 双向CLH队列的头尾
state: 状态标识(0空闲,1繁忙)

Node类waitStatus分析

1
2
3
4
5
6
7
8
// 为1的时候说明这个任务可能因为中断或者其他原因取消了  
static final int CANCELLED = 1;
// 代表下一个节点需要被唤醒
static final int SIGNAL = -1;
// 用于条件队列,等待condition唤醒
static final int CONDITION = -2;
// 用于共享锁,共享式同步状态取消,将会无条件传播下去
static final int PROPAGATE = -3;

双向CLH队列

AQS本质就是 state变量+CLH双端Node队列

如果共享资源被占用,就需要一定的阻塞等待唤醒机制来保证锁分配。这个机制主要用的是CLH队列的变体实现的,将暂时获取不到锁的线程加入到队列中,这个队列就是AQS同步队列的抽象表现。它将要请求共享资源的线程及自身的等待状态封装成队列的结点对象(Node),通过CAS自旋以及LockSupport.park()的方式,维护state变量的状态,使并发达到同步的效果。

AQS源码解读

公平锁&非公平锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
static final class FairSync extends Sync {  
private static final long serialVersionUID = -3000897897090466540L;

final void lock() {
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}}
1
2
3
4
5
6
7
8
9
10
11
12
static final class NonfairSync extends Sync {  
private static final long serialVersionUID = 7316153563782823691L;

final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}}

非公平锁分析

lock方法

通过cas将状态status由0变为1
如果成功的话,将当前线程设置为有访问权限线程
如果cas失败的话会进入else语句执行acquire(1)抢占失败

1
2
3
4
5
6
final void lock() {  
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}

acquire方法

  • 调用tryAcquire():尝试获得锁,或执行可重入操作
  • 调用addWaiter() --> enq入队操作:队列内无节点则创建哨兵节点后入队,有节点则直接入队
  • 调用acquireQueued() --> 调用cancelAcquire():再次尝试1 tryAcquire(),如果失败则修改它的前一个节点的waitState为SIGNAL(-1)后进入阻塞,等待被唤醒
1
2
3
4
5
public final void acquire(int arg) {  
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
tryAcquire方法

ReentrantLock中的NonfairSync重写了tryAcquire方法,内部调用了nonfairTryAcquire方法

  • 先获得当前状态state,如果state=0,则自旋修改state为1,并修改占有锁的线程为当前线程,返回true,acquire()方法结束
  • else if 判断当前占有锁的线程是否为当前线程,如果是,则将state+1并返回true,acquire()方法结束锁被重入
  • 在重入过程中, 如果资源数量由正变负, 代表资源溢出了, 毕竟初始值是0, 获取资源时也是不断累加, 不会有人尝试获取负数的资源, 那没有意义. 我们都知道int类型的数据取值范围是[ -2147483648, 2147483647] 所以一般不会触发这个Error
  • 前面两个条件都不满足,则tryAcquire()方法失败,返回false
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
final boolean nonfairTryAcquire(int acquires) {  
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) //
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
addWaiter(Node.EXCLUSIVE)

addWaiter方法是给当前线程创建一个节点, 并添加到同步队列中, 排队等待机会获取锁, 参数Node.EXCLUSIVE代表当前线程希望以独占模式获取锁

然后获得尾节点, 如果尾节点已经创建,则使用尾插法将节点入队, 如果没有则调用enq方法入队, 实际上enq就是多了一步初始化头节点和尾节点的处理, 入队依然是使用相同的尾插法

理想状态下首先同样会拿到当前队列的尾节点然后判断是否为null,如果为null的话说明此时还不存在尾节点,则创建一个节点当队列的头节点,里边的数据是null,然后cas的方式将其设为头节点然后尾节点也是它,因为我们这个分支里边并没有return语句所以还会重新进入循环,此时尾节点已经不是null了,那么我们进入else里边的分支,首先

1
2
3
>node.prev = t;  
// 这句话会将当前节点的上一个节点设为头节点,然后通过cas的方式去将当前的节点设置为队列的尾节点
// 如果设置成功的话将当前队列的尾节点的下一个节点设置成当前节点然后将其返回

不理想状态的时候,模拟一下场景, 假如线程A进入了for循环, 判断了当前的尾节点为null,但是他在执行compareAndSetHead(new Node())的时候, 刚new出自己的节点来之后,就被线程B抢先将队列的头节点设置为自己new的节点,此时队列的尾节点就是线程B的节点数据,那么此时线程A去cas设置头节点就会失败,然后线程A进入下一轮循环, 此时的尾节点已经存在,我们就会使用尾插法将线程A设置为新的尾节点,最终返回旧的尾节点(这是因为AQS是前驱节点负责唤醒当前节点)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
```
```java
//加入等待线程的方法
//参数Node.EXCLUSIVE的意思是独占锁,另一种锁位Node.SHARED共享锁
private Node addWaiter(Node mode) {
//先new一个新的节点用来装当前线程,模式为独占模式
Node node = new Node(Thread.currentThread(), mode);
//定义一个节点pred为当前队列的尾节点
Node pred = tail;
//不为null,说明不是第一个要进入等待队列的线程,第一个线程为null直接跳过执行下面的enq(node)
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//队列中无节点才会执行enq()方法
enq(node);
return node;
}

private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { //第一次入队时尾节点t为空
if (compareAndSetHead(new Node()))
tail = head;
} else { //不是第一次入队,尾节点t不为空
node.prev = t;
if (compareAndSetTail(t, node)) { //将当前线程的节点的前置节点设为之前的尾节点
t.next = node; //设置之前的尾节点t的下一个节点为当前线程的节点node
return t;
}
}
}
}

enq()方法会返回旧的尾节点,但是addWaiter是返回当前线程的节点

acquireQueued方法

上一个方法将当前线程的节点添加到同步队列之后, 会将其前驱节点返回, acquireQueued方法会尝试将该前驱节点设置为新的头节点, 以此来唤醒当前线程的节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
final boolean acquireQueued(final Node node, int arg) {  
boolean failed = true; // 在获取锁的过程中是否获取失败
try {
boolean interrupted = false; // 在获取锁的过程中是否发生了中断
for (;;) {
final Node p = node.predecessor();//该方法为设置前置节点
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

for循环逻辑

  1. 拿到当前节点的前置节点,如果前置节点是头节点的话, 代表当前节点已经是队列中第一个待执行的节点, 此时可以再次尝试获取锁,无需任何等待(这里再次尝试获得锁的原因是,可能在添加节点到队列的过程中,拥有当前锁线程已经被释放)
  2. 我们先看获取锁成功的时候,如果获取锁成功的话我们就会将当前的节点设置为头节点
1
2
3
4
5
6
7
8
 private void setHead(Node node) {
// 首先将我们的当前节点设为头节点
head = node;
// 然后将当前节点的线程数据置为null
node.thread = null;
// 将当前节点的前驱节点置为null
node.prev = null;
}
  1. 接下来将前驱节点的后继节点设为null,因为上一步已经将当前节点的前置节点设为了null所以现在已经不需要之前的头节点了,所以我们需要将原始头节点的的后继节点设为null,此时头节点就没有任何的引用了,GC会将他进行回收
  2. 然后将是否获取失败标志位设为false,然后返回false
  3. 如果当前节点不是头节点或者是头节点但是抢占锁失败的情况下是如何处理,即以下部分代码
1
2
3
if (shouldParkAfterFailedAcquire(p, node) &&  
parkAndCheckInterrupt())
interrupted = true;
  1. shouldParkAfterFailedAcquire(p, node)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {  
int ws = pred.waitStatus; //获取前驱节点(即当前线程的前一个节点)的等待状态
if (ws == Node.SIGNAL) //如果前驱节点的等待状态是SIGNAL,表示当前节点将来可以被唤醒,那么当前节点就可以安全的挂起了
return true;
/*
* 1)当ws>0(即CANCELLED == 1),前驱节点的线程被取消了,我们会将该节点之前的连续几个被取消的前驱节点从队列中剔除,返回false(即不能挂起)
* 2)如果ws<=0&&!=SIGNAL,将当前节点的前驱节点的等待状态设为SIGNAL
*/
if (ws > 0) {
// 这里我在下边没有讲到这里就是如果大于0就说明这个任务可能因为中断或者其他原因取消了,我们就将当前的节点的前驱节点置为前驱节点的前驱,其实目的就是跳过有问题的节点,找到我们上一个可以唤醒下一节点的节点
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* 尝试将当前节点的前驱节点的等待状态设为SIGNAL
* 1/这为什么用CAS,现在已经入队成功了,前驱节点就是pred,除了node外应该没有别的线程在操作这个节点了,那为什么还要用CAS?而不直接赋值呢?
* (解释:因为pred可以自己将自己的状态改为cancel,也就是pred的状态可能同时会有两条线程(pred和node)去操作)
* 2/既然前驱节点已经设为SIGNAL了,为什么最后还要返回false
* (因为CAS可能会失败,这里不管失败与否,都返回false,下一次执行该方法的之后,pred的等待状态就是SIGNAL了)
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
>}

这个方法呢他的作用就是检查并重新获取失败节点的状态,waitStatus只可能为-1,1,还有初始值0,注意有两个参数,参数一是前驱节点,参数二是当前的节点,当队列中添加了节点之后int ws = pred.waitStatus;获得前驱节点默认值肯定是0,然后进来进行判断走进else语句中执行compareAndSetWaitStatus(pred, ws, Node.SIGNAL);将前驱节点的状态值改为Node.SIGNAL,也就是-1,然后返回false此时就不去执行parkAndCheckInterrupt()方法了,而是重新进入for循环,此时如果还满足第一个if的条件的话,进入第二次循环中, 如果再次进入shouldParkAfterFailedAcquire, 此时前驱节点的等待状态就是Node.SIGNAL, 所以直接返回true, 并执行parkAndCheckInterrupt(), 将当前线程挂起

1
if (shouldParkAfterFailedAcquire(p, node) &&  parkAndCheckInterrupt())
  1. 此时在进入shouldParkAfterFailedAcquire(p, node)之后前驱节点的标志位就是-1(节点需要被唤醒)了,进入parkAndCheckInterrupt()方法中
1
2
3
4
private final boolean parkAndCheckInterrupt() {  
LockSupport.park(this); //挂起当前的线程
return Thread.interrupted(); //如果当前线程已经被中断了,返回true
}

这个方法就是将当前的线程挂起

ancelAcquire(node)

ancelAcquire(node);在非公平锁独占模式下发生的概率很小,进入条件为当前线程节点,在完成循环等待与抢夺锁资源后,仍然失败,failer为true

方法的作用是在当前节点再次获取锁失败后,判断当前线程是否需要挂起等待, 此时, 当前节点确实是队列中下一个执行的节点, 但是前面的节点还没释放锁, 所以当前线程可能需要挂起自己, 等待唤醒, 该方法返回true则挂起当前线程, false则返回acquireQueued方法继续自旋.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private void cancelAcquire(Node node) {  
if (node == null)
return;

node.thread = null;

Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
  1. 先判断当前节点是否为null,如果是直接return
  2. 当前节点的关联线程设置为null,然后获取当前节点的前驱节点
  3. 通过while循环将waitStatus的值大于0的过滤掉也就是跳过有问题的节点,找到我们上一个可以唤醒下一节点的节点。并且把当前的节点状态设置为Node.CANCELLED;此时出现if语句根据不同情况进行不同的处理
  4. 如果当前节点是尾节点的话,将从后往前找,找到第一个状态为非取消状态的节点设置为尾节点
    • 如果设置成功的话将当前尾节点的后继节点设为null
    • 如果设置失败的话将进入else语句
  5. 如果当前节点不是尾节点的话也进入eles语句
    1
    pred != head //如果当前节点不是头节点的后继节点
    1
    2
    //1. 判断当前节点的前驱节点状态是不是SIGNAL ,如果不是的话则把前驱节点设置为SIGNAL 看看是否成功
    ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL)))
    1
    pred.thread != null //判断当前节点的线程是否为null

    如果上述条件都满足则把当前节点的前驱节点的后继指针指向当前节点的后继节点
    如果上述条件都不满足的话也就是当前节点是头节点的后继节点或者不满足上边的条件那就调用unparkSuccessor(node);唤醒下一个需要获取锁的节点

unlock方法

ReentrantLock中的解锁并没有区分公平锁和非公平锁

1
2
3
4
5
6
7
8
9
10
11
12
public void unlock() {  
sync.release(1);
}
public final boolean release(int arg) {
if (tryRelease(arg)) { //如果成功释放锁
Node h = head; //获取头节点:(注意:这里的头节点就是当前正在释放锁的节点)
if (h != null && h.waitStatus != 0) //头结点存在且等待状态,不是取消
unparkSuccessor(h); //唤醒距离头节点最近的一个非取消的节点
return true;
}
return false;
}

h != null && h.waitStatus != 0

  1. h == null Head还没初始化。初始情况下,head == null,第一个节点入队,Head会被初始化一个虚拟节点。所以说,这里如果还没来得及入队,就会出现head == null 的情况。
  2. h != null && waitStatus == 0 表明后继节点对应的线程仍在运行中,不需要唤醒。
  3. h != null && waitStatus < 0 表明后继节点可能被阻塞了,需要唤醒。
##### tryRelease方法

ReentrantLock加锁过程中支持可重入锁,首先会减去可重入的次数,然后判断一下当前持有锁的线程是不是当前线程如果不是的话直接抛出IllegalMonitorStateException();异常

  1. 接下来定义一个free,如果tryRelease将当前持有的线程全部释放掉的话则返回true,否则返回false
  2. 如果已经全部释放掉了做一下处理将free设置为true然后设置当前的锁没有线程拥有它然后返回true
1
2
3
4
5
6
7
8
9
10
11
12
protected final boolean tryRelease(int releases) {  
int c = getState() - releases; //获取现在的锁数量-传入的解锁数量(这里为1)
if (Thread.currentThread() != getExclusiveOwnerThread()) //当前线程不持有锁
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) { //锁被释放
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}

回到release()方法,如果此时的tryRelease(arg)返回了ture说明了该锁没有被任何线程持有然后我们可以进行if语句里边的操作

unparkSuccessor方法

进入方法后获取当前节点状态,如果小于0则设置为0,也就是初始状态
然后获取当前节点的下一个节点

if语句的意思就是如果当前节点的下一个节点被 cancelled(中断) 掉了就找到队列最开始的非 cancelled 的节点,但是里边的for循环是从队列的尾部开始找的,而不是从一开始就找,是从队尾到队首拿到队列的第一个 waitStatus<0 的节点

节点入队列的操作并不是原子的,所以不排除有这种情况在入队过程中执行到了pred.next = node的时候,此时还没有执行这条代码,但是这时候调用了unparkSuccessor,到达这里的时候就没有办法从前往后找了因为这里相当于链表的断链了所以需要从后往前找

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private void unparkSuccessor(Node node) {  
int ws = node.waitStatus;
if (ws < 0) //将ws设为0状态(即什么状态都不是)
compareAndSetWaitStatus(node, ws, 0);

Node s = node.next;
//获取头节点的下一个等待状态不是cancel的节点
if (s == null || s.waitStatus > 0) {
s = null;
//注意:从后往前遍历找到离头节点最近的一个非取消的节点,从后往前遍历据说是在入队(enq())的时候,可能nodeX.next==null
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
//唤醒离头节点最近的一个非取消的节点
LockSupport.unpark(s.thread);
}

唤醒后:

1
2
3
>private final boolean parkAndCheckInterrupt() {  
LockSupport.park(this);
return Thread.interrupted(); //返回当前执行线程的中断状态,并清除

然后回到acquireQueued()方法中