今天主要讨论的主题是Java的ReentrantLock的由来和加锁的原理。

为什么需要ReentrantLock

作为程序员,还是需要尽量减少重复造轮子这种事情。但是Java既然已经有了synchronized关键词,Java的大佬们再设计Lock是不是一种重复的轮子呢?当然不是,synchronized有一个问题,我们无法主动释放资源。如下所示:

sychronize (a) {
    sychronize (b) {
        //.. method body
    }
}

假设我们获取了资源a,但是无法获取资源b。此时我们也无法释放资源a。那么整个系统可能就会处在死锁状态。为了解决这个问题,就需要新的轮子来解决。那就是今天想要介绍的主角ReentrantLock。

使用lock的范式

在这个类的一开始作者就贴心的写上了Java并发所需要用到的范式。因为这些代码在多线程中执行,为了确保就算程序出错抛出了异常,所以finally中务必不要忘记加入解锁的相关代码,否则容易造成不知名的死锁情况。

class X {
    private final ReentrantLock lock = new ReentrantLock();
    //..
    public void m() {
        lock.lock();
        try {
            //.. method body
        } finally {
            lock.unlock();
        }
    }
}

标准1—finally 中释放锁:
这个大家应该都会明白,在 finally 中释放锁,目的是保证在获取到锁之后,最终能被释放

标准2—在 try{} 外面获取锁:
不知道你有没有想过,为什么会有标准 2 的存在,我们通常是“喜欢” try 住所有内容,生怕 发生异常不能捕获

ReentrantLock的上锁原理

其实很简单,比如在 ReentrantLock 内部维护了一个 volatile 修饰的变量 state,通过 CAS 来进行读写(最底层还是交给硬件来保证原子性和可⻅性),如果CAS更改成功, 即获取到锁,线程进入到 try 代码块继续执行;如果没有更改成功,线程会被【挂起】,不会向下执行。

那么接下来的问题就变成了如何维护这个volatile的state呢?这时候就要说到Lock背后最大的功臣 -- 队列同步器 (AbstractQueuedSynchronizer),简称AQS。大致上它是有CLH实现的。Java并发系列的第一篇讲解过CLH的原理,有兴趣的可以回顾一下:CLH锁简单解释

AQS的实现比较复杂,我准备用几篇博客慢慢更新。既然今天说的是ReentrantLock,那么就以ReentrantLock为例子,看看AQS的使用。

Function1: lock()

源代码看非常简单,lock函数调用了sync类的lock方法。

public void lock() {
    sync.lock();
}

sync的所属类是Sync,它有2个具体实现:FairSync和NonFairSync。听着可能有些复杂,我们还是从简先进入到AQS的理解中:

final void lock() {
    acquire(1);
}

这个acquire就是AQS中的方法了:

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

第一次看这个源码的时候,可能会让人比较困惑。不是很明白进入这个方法之后,为什么会存在3个判断条件。其实源代码上有很详细的注释:

tryAquire

tryAquire会尝试不利用AQS,直接将当前状态用CAS的方式覆盖当前的state:

protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        // P1: 如果state是0,在ReentrantLock中表示无人获取锁
        if (c == 0) {
            // 当前线程处于队列最初始位置或者当前队列没有排队
            if (!hasQueuedPredecessors() &&
                // CAS置换成功
                compareAndSetState(0, acquires)) {
                // 将当前线程标记为拥有这个锁
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        // P2: 如果锁有人使用了,就要判断这个锁是不是当前线程使用
        else if (current == getExclusiveOwnerThread()) {
            // 此时锁是已经获取的,所以不需要利用CAS,可以直接更新状态
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
}

重要的地方,我已经加入了中文注释。需要注意到P2这个部分。由于当前线程已经获取到了锁,所以就不需要浪费CPU资源,重新进行CAS。这些小细节还是很值得学习的。

addWaiter(Node.EXCLUSIVE)

addWaiter(Node.EXCLUSIVE):由于ReentrantLock是排他锁,第一次无脑尝试失败之后,我们就需要把需要锁的信息加入队列。

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // P1: 这段逻辑也比较类似,如果当前队列不是空,那么无脑尝试一下是不是可以加入队列
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

AQS的代码都比较喜欢无脑进行一次尝试。符合一些简单条件之后,都会进行一次简单的尝试。比较进入循环和复杂逻辑的判断。当然,如果简单尝试失败之后,代码就会进入enq(node)阶段:

enq(node)

enq(node):尝试把新生成的node加入队列

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        // P1
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            // P2
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

一部分逻辑和第一次无脑重试是很类似的。P2部分就是不断充实重试,直到CAS成功将当前节点加入队列。P1部分则会有一些特殊,如果当前是一个空队列,那么会加入一个空的node进入队列,作为队列头。

这个空的node,可以认为是一个哨兵节点:

哨兵,顾名思义,是用来解决国家之间边界问题的,不直接参与生产活动。同样,计算 机科学中提到的哨兵,也用来解决边界问题,如果没有边界,指定环节,按照同样算法 可能会在边界处发生异常

acquireQueued

acquireQueued发生在成功将请求加入队列之后。这时候就需要尝试独占式的获取资源

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            // P1
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // P2
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

P1部分会比较好理解,如果当前的节点的前置节点已经是头节点,那么代码会尝试获取锁。这段代码也解释了之前为什么会需要加入一个哨兵节点,否则p.next = null会出现NullPointer的报错。

private void setHead(Node node) {
    head = node;
    node.thread = null;
    node.prev = null;
}

setHead的逻辑很简单,我这边也就直接贴一下。注意这个时候,已经tryAcuire(arg)成功,所以没必要使用CAS比较。

P2是这段逻辑中很重要的一部分。如果不断的循环,CPU资源会被损耗。所以在尝试几次之后,还是需要把线程挂起,减少CPU的资源浪费。

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        // 当前节点已经被收到信号,需要等待释放资源,那么现在可以安全的挂起
        return true;
    // ws 大于0说明是CANCELLED状态,
    if (ws > 0) {
        // 循环判断前驱节点的前驱节点是否也为CANCELLED状态,忽略该状态的节点,重新连接队列
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        // 将当前节点的前驱节点设置为设置为 SIGNAL 状态,用于后续唤醒操作。程序第一次执行到这返回为false,还会进行外层第二次循环,最终从ws = Node.SIGNAL条件进入
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

这篇有很多SINGNAL相关的知识。AQS的Node中有几种状态。今天我只是简单过一下加锁的流程,给一个印象。下下篇会认真讲一下这些信号量的作用。

当资源仍然被占用,那么代码就进入park阶段:

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

如果这个时候线程已经被打断,那么也需要把打断的信号获取,返回。

这时候在看回最开始的代码:

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

如果acquireQueued已经被打断,那么会返回一个true,程序接收到之后,就会进入selfInterrupt。否则一切正常锁获取成功。

小结

这篇主要简单过一下ReentrantLock的加锁过程。当中也留了几个尾巴

  1. 线程park之后,有谁来唤醒
  2. Node的这些SINGA,state的各个数值又代表了什么含义

下一篇,第一个问题的答案就能揭晓。park之后,当然是有人release lock之后,会来唤醒被park的线程。具体逻辑也敬请下一期分享。

Leave a Reply

Your email address will not be published. Required fields are marked *