并发编程系列-同步器实现一 ReentrantLock Condition
前言
上一篇介绍了ReentrantLock竞争锁及释放锁的过程,从源码的角度分析了过程。
此篇会继续基于ReentrantLock来讲讲Condition条件变量,condition是和lock绑定的,一个lock可以绑定多个condition条件,可以使多个线程基于条件顺序执行;相比于Object的wait、notify、notifyAll,控制线程更加灵活。
下面从案例入手分析源码来看Condition如何实现的系列文章
一、并发编程系列-同步器 AQS
二、并发编程系列-同步器实现一 ReentrantLock
三、并发编程系列-同步器实现二 ReentrantLock Condition
四、并发编程系列-同步器实现三 CountDownLatch
五、并发编程系列-同步器实现四 Semaphore
六、并发编程系列-同步器实现五 CyclicBarrier
Condition
ReentrantLock仅提供了获取条件对象的方法
public Condition newCondition() {
return sync.newCondition();
}
实际返回的是AQS类的内部类ConditionObject,作为一个条件队列
public class ConditionObject implements Condition{
// 条件队列第一个节点
private transient Node firstWaiter;
// 条件队列最后一个节点
private transient Node lastWaiter;
//注意:上文提到Node结构中有一个nextWaiter节点,一个使用场景便是条件队列的下一个节点(单链表结构)。
// 当前线程等待,进入条件队列;类似于Object的wait(), 都需要获取到锁后执行
public final void await(){}
public final long awaitNanos(long nanosTimeout){}
// 注意:唤醒基于当前条件等待的一个线程,加入到同步队列中,等待获取锁资源;类似于Object的notify(), 都需要获取到锁后执行
public final void signal() {}
// 唤醒所有条件等待线程,加入到同步队列中。
public final void signalAll() {}
}
案例
public static void testReentrantCondition() throws Exception {
ReentrantLock reentrantLock = new ReentrantLock();
// 一个条件
Condition condition1 = reentrantLock.newCondition();
new Thread(() -> {
System.out.println("我是线程1开始竞争锁.");
// 排它锁
reentrantLock.lock();
try {
System.out.println("我是线程1获取锁成功了, 开始执行任务..");
Thread.sleep(5 * 1000);
System.out.println("我是线程1进入等待");
// await会释放锁 见代码块1
condition1.await();
System.out.println("我是线程1退出等待");
} catch (InterruptedException e) { e.printStackTrace(); }
// 执行任务结束,释放锁,工作时间变短,可以看释放锁步骤
System.out.println("我是线程1完成了要释放锁...");
reentrantLock.unlock();
}, "a").start();
new Thread(() -> {
try {
//如果等待时间超过线程1的任务时间,则不会竞争锁,否则下面的lock会先竞争锁
Thread.sleep(1 * 1000);
} catch (InterruptedException e) { e.printStackTrace(); }
System.out.println("我是线程2开始竞争锁.");
// 排它锁
reentrantLock.lock();
try {
System.out.println("我是线程2获取锁成功了, 开始执行任务..");
Thread.sleep(5 * 1000);
System.out.println("我是线程2进入等待");
// await会释放锁
condition1.await();
System.out.println("我是线程2退出等待");
} catch (InterruptedException e) { e.printStackTrace(); }
System.out.println("我是线程2完成了要释放锁...");
reentrantLock.unlock();
}, "b").start();
// 主线程睡眠等待,时间调短,看signal唤醒过程
Thread.sleep(60 * 1000);
reentrantLock.lock();
System.out.println("释放锁");
// 见代码块2 唤醒条件队列第一个节点,将其加入同步队列
condition1.signal();
// 虽然调用signal,但是不会立马释放锁,需要释放了锁之后,条件线程才能再次获取锁
condition1.signal();
// 释放锁
reentrantLock.unlock();
Thread.sleep(3 * 1000);
}
输出结果:
我是线程2开始竞争锁.
我是线程1进入等待
我是线程2获取锁成功了, 开始执行任务..
我是线程2进入等待
释放锁
我是线程1退出等待
我是线程1完成了要释放锁...
我是线程2退出等待
我是线程2完成了要释放锁...
案例源码解读
同样,先通过两张流程图看下大致过程
1、await过程
2、signal过程
具体步骤:
代码块1
线程1已经获取锁的情况下,进行条件变量condition.await()调用
java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 第1步、创建一个条件等待节点
Node node = addConditionWaiter();
// 第2步、释放当前锁占用的全部资源,注意此处:如果同步队列后继有等待节点,会进行唤醒
int savedState = fullyRelease(node);
int interruptMode = 0;
// 第3步、如果是条件队列节点,进行park等待,此案例是的
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 4、先不用管此处,只要知道,等待是要被唤醒的,走到这里说明已经被唤醒了;然后竞争资源,失败则继续等待,成功就拿到锁了。
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
继续看,第1步,如何创建条件队列节点
private Node addConditionWaiter() {
// 第一个调用await的 此时无条件队列,t为null
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
// 创建节点,传递线程信息,同时waitStatus赋值为-2
Node node = new Node(Thread.currentThread(), Node.CONDITION);
// 加入队列尾部,并返回该节点;如果只有一个节点,firstWaiter和lastWaiter都指向该节点
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
继续看,创建完节点,调用 第2步 fullyRelease(node)进行释放资源
final int fullyRelease(Node node) {
boolean failed = true;
try {
// 获取所有资源
int savedState = getState();
// 释放资源 成功则返回,直接往下看
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
看release操作,在前一篇ReentrantLock.unlock时已经分析过了,是同一个方法
public final boolean release(int arg) {
// tryRelease会调用子类ReentrantLock的实现,更新state及锁持有者操作
if (tryRelease(arg)) {
Node h = head; // 判断如果还有节点且需要被唤醒
if (h != null && h.waitStatus != 0)
// 唤醒head后面的节点,此案例中的线程2(因为在线程1执行任务的时候,线程2调用了lock进行竞争资源,没竞争到锁而进入了同步队列等待),上一篇分析过,不深入了
unparkSuccessor(h);
return true;
}
return false;
}
继续看 第3步 isOnSyncQueue() 判断是不是同步节点,显然不是,会进入下面的LockSupport.park(this) 等待。
final boolean isOnSyncQueue(Node node) {
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false; // 此处return了
if (node.next != null)
return true;
// 先忽略该方法
return findNodeFromTail(node);
}
至此,线程1进入了条件队列进行等待,同时如果同步队列有后继节点需要唤醒的话,会进行唤醒,唤醒后同步队列head指向下一个节点。
接下来看,线程2被唤醒获取到锁之后,执行了任务后再次调用condition.await方法,和线程1一样,加入条件队列等待,释放资源,唤醒同步队列节点如果有的话。
再往后,会由main线程拿到锁资源,然后 开始进行condition.signal操作唤醒条件上的等待节点。java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject
public final void signal() {
// 调用子类ReentrantLock的方法,判断当前线程是否是同步锁的持有者
// 仅一行判断逻辑:getExclusiveOwnerThread() == Thread.currentThread();)
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter; // 取第一个条件队列的节点
if (first != null)
// 主要逻辑,继续往下看
doSignal(first);
}
继续看如何signal的
private void doSignal(Node first) {
do {
// 条件也有含义 firstWaiter = first.nextWaiter,firstWaiter指向下一个节点,也是关键操作
// 如果没有后续节点则条件队列的尾指针lastWaiter赋null
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
// 第一个节点先脱离队列,应该是help gc
first.nextWaiter = null;
// 核心方法transferForSignal,返回成功即可跳出循环
} while (!transferForSignal(first) &&
// 上面if判断 firstWaiter已经指向了first的下一个节点,所以此处结合上下文看first已经是下一个节点了
(first = firstWaiter) != null);
}
继续看,主要是transferForSignal(first)方法
final boolean transferForSignal(Node node) {
// 把当前节点的状态更新为0,表明它不再是一个条件队列节点;如果cas失败则当前节点已经被取消了,返回false
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
// 加入同步队列尾,只是加入尾部而已。上一篇分析过,不深入了
Node p = enq(node);
int ws = p.waitStatus;
// 原同步队列的尾节点,即node节点的前继节点没取消的情况下,将cas成SIGNAL状态(node节点可安心睡眠)(个人感觉这里不更新也行,node唤醒后也会更新的)
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
// 唤醒条件节点第一个线程
LockSupport.unpark(node.thread);
return true;
}
至此,signal操作结束,总的就是将条件队列第一个节点移入到同步队列最后一个节点,同时唤醒该节点。
还有最后一哆嗦,唤醒了条件队列的第一个节点的时候,该线程节点它要做什么呢?也就是我们上面说的await方法的第4步(下面是代码片段,唤醒后发现不在条件队列中则跳出while从此处继续执行):
// 走到这里说明已经被唤醒了;然后竞争资源,失败则继续等待,成功就拿到锁了。
// acquireQueued之前见过:竞争资源,失败则继续等待,成功就拿到锁了。此处不会成功的,因为锁被调用signal的线程持有呢。
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
// interruptMode判断在等待中有没有被中断
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
// 调试发现不管是signal之前还是之后中断node节点,signal唤醒之后已经在同步队列了则重新中断下;此案例不会走到此。
reportInterruptAfterWait(interruptMode);
至此,节点正常被唤醒且加入到了同步队列了。
总结几个点
1、条件队列是单向链表,await和signal都需要先获取锁
2、signal唤醒1个节点,会从条件队列的第一个节点进行唤醒。
3、signal唤醒节点,其实就是将其出条件队列,加入同步队列尾部,同时unpark线程。