这篇我们会继续讲解一个经常和ReentrantLock配套使用的组建:Condition。对于Condition来说,一切的故事开始于:
public Condition newCondition() {
return sync.newCondition();
}
sync对象封装了该方法:
final ConditionObject newCondition() {
return new ConditionObject();
}
ConditionObject就是Condition的实现类,该对象也在AQS的文件中。它有2个成员对象:
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
了解了这些基础之后,我们就继续讲解Condition类最常用的2个API,await和signal
await()
获取锁之后,如果逻辑判断不满足要求,就会请求await来释放锁。
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 1. 加入新的Condition等待节点
Node node = addConditionWaiter();
// 2. 释放当前节点
int savedState = fullyRelease(node);
int interruptMode = 0;
// 3. 如果节点不在等待condition,挂起当前节点
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
那这里的第一步就是增加一个Condition的等待节点
addConditionWaiter
private Node addConditionWaiter() {
Node t = lastWaiter;
// 如果最后的节点不在等待condition,那么清理最后的节点
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
// 新构建一个等待Condition的节点
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
这边会有点奇怪,为什么这些代码不用CAS来保证线程安全?
这时因为在使用await的时候,代码一定获取了对应的Lock。除了当前线程可以修改队列状态外,其他线程只能处于等待状态。
fullyRelease(node)
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
成功构建等待节点后,当前线程需要释放锁状态。如果非法线程释放锁,就会报错。具体代码在之前的ReentrantLock代码解析中讲过,不记得的话可以回看之前的系列。
isOnSyncQueue(node)
final boolean isOnSyncQueue(Node node) {
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
if (node.next != null) // If has successor, it must be on queue
return true;
/*
* node.prev can be non-null, but not yet on queue because
* the CAS to place it on queue can fail. So we have to
* traverse from tail to make sure it actually made it. It
* will always be near the tail in calls to this method, and
* unless the CAS failed (which is unlikely), it will be
* there, so we hardly ever traverse much.
*/
return findNodeFromTail(node);
}
判断当前节点状态,如果是等待Condition状态,或者前驱节点不为空,那么返回false。对于外层获取false之后,就会把当前线程进行挂起操作。
在Lock中可以定义多个条件,每个条件都会对应一个条件等待队列。
singal()
在await之后,总需要等待条件满足之后,别人来进行唤醒操作。
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
Signal方法实际调用doSignal方法来唤醒前驱节点:
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
while循环内的逻辑比较简单:将头节点信息进行更新,并断开当前节点。如果队列为空了,那么lastWaiter也需要置为空。
transferForSignal(first)
接下来分析while循环的条件:
final boolean transferForSignal(Node node) {
// 如果无法更新节点状态,那么节点已经处于cancel状态,返回false
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
// 重新进行入队操作
Node p = enq(node);
int ws = p.waitStatus;
// 如果条件满足要求,那么唤醒线程
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
初次之外还有signalAll方法,这个方法和signal方法比较类似,这里就不赘述了。这边多说一句,推荐尽量使用signalAll而不是signal方法,因为如果signal唤醒的线程很不幸已经彻底挂了,那么这个锁会彻底成为死锁。但是signalAll会导致较为严重的竞争,所以这个规则也不一定绝对。