这篇我们会继续讲解一个经常和ReentrantLock配套使用的组建:Condition。对于Condition来说,一切的故事开始于:

public Condition newCondition() {
    return sync.newCondition();
}

sync对象封装了该方法:

final ConditionObject newCondition() {
    return new ConditionObject();
}

ConditionObject就是Condition的实现类,该对象也在AQS的文件中。它有2个成员对象:

/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;

了解了这些基础之后,我们就继续讲解Condition类最常用的2个API,await和signal

await()

获取锁之后,如果逻辑判断不满足要求,就会请求await来释放锁。

public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    // 1. 加入新的Condition等待节点
    Node node = addConditionWaiter();
    // 2. 释放当前节点
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    // 3. 如果节点不在等待condition,挂起当前节点
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

那这里的第一步就是增加一个Condition的等待节点

addConditionWaiter

private Node addConditionWaiter() {
    Node t = lastWaiter;
    // 如果最后的节点不在等待condition,那么清理最后的节点
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    // 新构建一个等待Condition的节点
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}

这边会有点奇怪,为什么这些代码不用CAS来保证线程安全?

这时因为在使用await的时候,代码一定获取了对应的Lock。除了当前线程可以修改队列状态外,其他线程只能处于等待状态。

fullyRelease(node)

final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        int savedState = getState();
        if (release(savedState)) {
            failed = false;
            return savedState;
        } else {
            throw new IllegalMonitorStateException();
        }
    } finally {
        if (failed)
            node.waitStatus = Node.CANCELLED;
    }
}

成功构建等待节点后,当前线程需要释放锁状态。如果非法线程释放锁,就会报错。具体代码在之前的ReentrantLock代码解析中讲过,不记得的话可以回看之前的系列。

isOnSyncQueue(node)

final boolean isOnSyncQueue(Node node) {
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        return false;
    if (node.next != null) // If has successor, it must be on queue
        return true;
    /*
        * node.prev can be non-null, but not yet on queue because
        * the CAS to place it on queue can fail. So we have to
        * traverse from tail to make sure it actually made it.  It
        * will always be near the tail in calls to this method, and
        * unless the CAS failed (which is unlikely), it will be
        * there, so we hardly ever traverse much.
        */
    return findNodeFromTail(node);
}

判断当前节点状态,如果是等待Condition状态,或者前驱节点不为空,那么返回false。对于外层获取false之后,就会把当前线程进行挂起操作。

在Lock中可以定义多个条件,每个条件都会对应一个条件等待队列。

singal()

在await之后,总需要等待条件满足之后,别人来进行唤醒操作。

public final void signal() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}

Signal方法实际调用doSignal方法来唤醒前驱节点:

private void doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&
                (first = firstWaiter) != null);
}

while循环内的逻辑比较简单:将头节点信息进行更新,并断开当前节点。如果队列为空了,那么lastWaiter也需要置为空。

transferForSignal(first)

接下来分析while循环的条件:

final boolean transferForSignal(Node node) {
    // 如果无法更新节点状态,那么节点已经处于cancel状态,返回false
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;

    // 重新进行入队操作
    Node p = enq(node);
    int ws = p.waitStatus;
    // 如果条件满足要求,那么唤醒线程
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}

初次之外还有signalAll方法,这个方法和signal方法比较类似,这里就不赘述了。这边多说一句,推荐尽量使用signalAll而不是signal方法,因为如果signal唤醒的线程很不幸已经彻底挂了,那么这个锁会彻底成为死锁。但是signalAll会导致较为严重的竞争,所以这个规则也不一定绝对。

Leave a Reply

Your email address will not be published. Required fields are marked *