今天主要讨论的主题是Java的ReentrantLock的由来和加锁的原理。
为什么需要ReentrantLock
作为程序员,还是需要尽量减少重复造轮子这种事情。但是Java既然已经有了synchronized关键词,Java的大佬们再设计Lock是不是一种重复的轮子呢?当然不是,synchronized有一个问题,我们无法主动释放资源。如下所示:
sychronize (a) {
sychronize (b) {
//.. method body
}
}
假设我们获取了资源a,但是无法获取资源b。此时我们也无法释放资源a。那么整个系统可能就会处在死锁状态。为了解决这个问题,就需要新的轮子来解决。那就是今天想要介绍的主角ReentrantLock。
使用lock的范式
在这个类的一开始作者就贴心的写上了Java并发所需要用到的范式。因为这些代码在多线程中执行,为了确保就算程序出错抛出了异常,所以finally中务必不要忘记加入解锁的相关代码,否则容易造成不知名的死锁情况。
class X {
private final ReentrantLock lock = new ReentrantLock();
//..
public void m() {
lock.lock();
try {
//.. method body
} finally {
lock.unlock();
}
}
}
标准1—finally 中释放锁:
这个大家应该都会明白,在 finally 中释放锁,目的是保证在获取到锁之后,最终能被释放
标准2—在 try{} 外面获取锁:
不知道你有没有想过,为什么会有标准 2 的存在,我们通常是“喜欢” try 住所有内容,生怕 发生异常不能捕获
ReentrantLock的上锁原理
其实很简单,比如在 ReentrantLock 内部维护了一个 volatile 修饰的变量 state,通过 CAS 来进行读写(最底层还是交给硬件来保证原子性和可⻅性),如果CAS更改成功, 即获取到锁,线程进入到 try 代码块继续执行;如果没有更改成功,线程会被【挂起】,不会向下执行。
那么接下来的问题就变成了如何维护这个volatile的state呢?这时候就要说到Lock背后最大的功臣 -- 队列同步器 (AbstractQueuedSynchronizer),简称AQS。大致上它是有CLH实现的。Java并发系列的第一篇讲解过CLH的原理,有兴趣的可以回顾一下:CLH锁简单解释。
AQS的实现比较复杂,我准备用几篇博客慢慢更新。既然今天说的是ReentrantLock,那么就以ReentrantLock为例子,看看AQS的使用。
Function1: lock()
源代码看非常简单,lock函数调用了sync类的lock方法。
public void lock() {
sync.lock();
}
sync的所属类是Sync,它有2个具体实现:FairSync和NonFairSync。听着可能有些复杂,我们还是从简先进入到AQS的理解中:
final void lock() {
acquire(1);
}
这个acquire就是AQS中的方法了:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
第一次看这个源码的时候,可能会让人比较困惑。不是很明白进入这个方法之后,为什么会存在3个判断条件。其实源代码上有很详细的注释:
tryAquire
tryAquire会尝试不利用AQS,直接将当前状态用CAS的方式覆盖当前的state:
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// P1: 如果state是0,在ReentrantLock中表示无人获取锁
if (c == 0) {
// 当前线程处于队列最初始位置或者当前队列没有排队
if (!hasQueuedPredecessors() &&
// CAS置换成功
compareAndSetState(0, acquires)) {
// 将当前线程标记为拥有这个锁
setExclusiveOwnerThread(current);
return true;
}
}
// P2: 如果锁有人使用了,就要判断这个锁是不是当前线程使用
else if (current == getExclusiveOwnerThread()) {
// 此时锁是已经获取的,所以不需要利用CAS,可以直接更新状态
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
重要的地方,我已经加入了中文注释。需要注意到P2这个部分。由于当前线程已经获取到了锁,所以就不需要浪费CPU资源,重新进行CAS。这些小细节还是很值得学习的。
addWaiter(Node.EXCLUSIVE)
addWaiter(Node.EXCLUSIVE):由于ReentrantLock是排他锁,第一次无脑尝试失败之后,我们就需要把需要锁的信息加入队列。
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// P1: 这段逻辑也比较类似,如果当前队列不是空,那么无脑尝试一下是不是可以加入队列
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
AQS的代码都比较喜欢无脑进行一次尝试。符合一些简单条件之后,都会进行一次简单的尝试。比较进入循环和复杂逻辑的判断。当然,如果简单尝试失败之后,代码就会进入enq(node)阶段:
enq(node)
enq(node):尝试把新生成的node加入队列
private Node enq(final Node node) {
for (;;) {
Node t = tail;
// P1
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
// P2
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
一部分逻辑和第一次无脑重试是很类似的。P2部分就是不断充实重试,直到CAS成功将当前节点加入队列。P1部分则会有一些特殊,如果当前是一个空队列,那么会加入一个空的node进入队列,作为队列头。
这个空的node,可以认为是一个哨兵节点:
哨兵,顾名思义,是用来解决国家之间边界问题的,不直接参与生产活动。同样,计算 机科学中提到的哨兵,也用来解决边界问题,如果没有边界,指定环节,按照同样算法 可能会在边界处发生异常
acquireQueued
acquireQueued发生在成功将请求加入队列之后。这时候就需要尝试独占式的获取资源
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// P1
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// P2
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
P1部分会比较好理解,如果当前的节点的前置节点已经是头节点,那么代码会尝试获取锁。这段代码也解释了之前为什么会需要加入一个哨兵节点,否则p.next = null会出现NullPointer的报错。
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
setHead的逻辑很简单,我这边也就直接贴一下。注意这个时候,已经tryAcuire(arg)成功,所以没必要使用CAS比较。
P2是这段逻辑中很重要的一部分。如果不断的循环,CPU资源会被损耗。所以在尝试几次之后,还是需要把线程挂起,减少CPU的资源浪费。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
// 当前节点已经被收到信号,需要等待释放资源,那么现在可以安全的挂起
return true;
// ws 大于0说明是CANCELLED状态,
if (ws > 0) {
// 循环判断前驱节点的前驱节点是否也为CANCELLED状态,忽略该状态的节点,重新连接队列
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 将当前节点的前驱节点设置为设置为 SIGNAL 状态,用于后续唤醒操作。程序第一次执行到这返回为false,还会进行外层第二次循环,最终从ws = Node.SIGNAL条件进入
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
这篇有很多SINGNAL相关的知识。AQS的Node中有几种状态。今天我只是简单过一下加锁的流程,给一个印象。下下篇会认真讲一下这些信号量的作用。
当资源仍然被占用,那么代码就进入park阶段:
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
如果这个时候线程已经被打断,那么也需要把打断的信号获取,返回。
这时候在看回最开始的代码:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
如果acquireQueued已经被打断,那么会返回一个true,程序接收到之后,就会进入selfInterrupt。否则一切正常锁获取成功。
小结
这篇主要简单过一下ReentrantLock的加锁过程。当中也留了几个尾巴
- 线程park之后,有谁来唤醒
- Node的这些SINGA,state的各个数值又代表了什么含义
下一篇,第一个问题的答案就能揭晓。park之后,当然是有人release lock之后,会来唤醒被park的线程。具体逻辑也敬请下一期分享。