前言

上一篇介绍了ReentrantLock竞争锁及释放锁的过程,从源码的角度分析了过程。

此篇会继续基于ReentrantLock来讲讲Condition条件变量,condition是和lock绑定的,一个lock可以绑定多个condition条件,可以使多个线程基于条件顺序执行;相比于Object的wait、notify、notifyAll,控制线程更加灵活


系列文章
一、并发编程系列-同步器 AQS
二、并发编程系列-同步器实现一 ReentrantLock
三、并发编程系列-同步器实现二 ReentrantLock Condition
四、并发编程系列-同步器实现三 CountDownLatch
五、并发编程系列-同步器实现四 Semaphore
六、并发编程系列-同步器实现五 CyclicBarrier


下面从案例入手分析源码来看Condition如何实现的

Condition

ReentrantLock仅提供了获取条件对象的方法

public Condition newCondition() {
        return sync.newCondition();
}


实际返回的是AQS类的内部类ConditionObject,作为一个条件队列
public class ConditionObject implements Condition{
	// 条件队列第一个节点
   	private transient Node firstWaiter;
	// 条件队列最后一个节点
        private transient Node lastWaiter;
	//注意:上文提到Node结构中有一个nextWaiter节点,一个使用场景便是条件队列的下一个节点(单链表结构)。
	
	// 当前线程等待,进入条件队列;类似于Object的wait(), 都需要获取到锁后执行
	public final void await(){}
	public final long awaitNanos(long nanosTimeout){}
	// 注意:唤醒基于当前条件等待的一个线程,加入到同步队列中,等待获取锁资源;类似于Object的notify(), 都需要获取到锁后执行
	public final void signal() {}
	// 唤醒所有条件等待线程,加入到同步队列中。
	public final void signalAll() {}
}

案例

public static void testReentrantCondition() throws Exception {
        ReentrantLock reentrantLock = new ReentrantLock();
        // 一个条件
        Condition condition1 = reentrantLock.newCondition();

        new Thread(() -> {
            System.out.println("我是线程1开始竞争锁.");
            // 排它锁
            reentrantLock.lock();
            try {
                System.out.println("我是线程1获取锁成功了, 开始执行任务..");
                Thread.sleep(5 * 1000);

                System.out.println("我是线程1进入等待");
                // await会释放锁 见代码块1
                condition1.await();
                System.out.println("我是线程1退出等待");

            } catch (InterruptedException e) { e.printStackTrace(); }
            // 执行任务结束,释放锁,工作时间变短,可以看释放锁步骤
            System.out.println("我是线程1完成了要释放锁...");
            reentrantLock.unlock();
        }, "a").start();

        new Thread(() -> {
            try {
		//如果等待时间超过线程1的任务时间,则不会竞争锁,否则下面的lock会先竞争锁
                Thread.sleep(1 * 1000); 
            } catch (InterruptedException e) { e.printStackTrace(); }
            System.out.println("我是线程2开始竞争锁.");
            // 排它锁
            reentrantLock.lock();
            try {
                System.out.println("我是线程2获取锁成功了, 开始执行任务..");
                Thread.sleep(5 * 1000);

                System.out.println("我是线程2进入等待");
                // await会释放锁
                condition1.await();
                System.out.println("我是线程2退出等待");

            } catch (InterruptedException e) { e.printStackTrace(); }
            System.out.println("我是线程2完成了要释放锁...");
            reentrantLock.unlock();
        }, "b").start();

        // 主线程睡眠等待,时间调短,看signal唤醒过程
        Thread.sleep(60 * 1000);

        reentrantLock.lock();
        System.out.println("释放锁");
	// 见代码块2 唤醒条件队列第一个节点,将其加入同步队列
        condition1.signal();
        // 虽然调用signal,但是不会立马释放锁,需要释放了锁之后,条件线程才能再次获取锁
        condition1.signal();
        // 释放锁
        reentrantLock.unlock();

        Thread.sleep(3 * 1000);
}

输出结果:

我是线程2开始竞争锁.
我是线程1进入等待
我是线程2获取锁成功了, 开始执行任务..
我是线程2进入等待
释放锁
我是线程1退出等待
我是线程1完成了要释放锁...
我是线程2退出等待
我是线程2完成了要释放锁...


案例源码解读

同样,先通过两张流程图看下大致过程
1、await过程
await过程

2、signal过程
signal过程

具体步骤:
代码块1
线程1已经获取锁的情况下,进行条件变量condition.await()调用

java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject
public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
	    // 第1步、创建一个条件等待节点
            Node node = addConditionWaiter();
   	    // 第2步、释放当前锁占用的全部资源,注意此处:如果同步队列后继有等待节点,会进行唤醒
            int savedState = fullyRelease(node);
            int interruptMode = 0;
	    // 第3步、如果是条件队列节点,进行park等待,此案例是的
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
	     // 4、先不用管此处,只要知道,等待是要被唤醒的,走到这里说明已经被唤醒了;然后竞争资源,失败则继续等待,成功就拿到锁了。
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
}

继续看,第1步,如何创建条件队列节点
private Node addConditionWaiter() {
	    // 第一个调用await的 此时无条件队列,t为null
            Node t = lastWaiter;
            // If lastWaiter is cancelled, clean out.
            if (t != null && t.waitStatus != Node.CONDITION) {
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
	    // 创建节点,传递线程信息,同时waitStatus赋值为-2
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
	    // 加入队列尾部,并返回该节点;如果只有一个节点,firstWaiter和lastWaiter都指向该节点
            if (t == null)
                firstWaiter = node;
            else
                t.nextWaiter = node;
            lastWaiter = node;
            return node;
}

继续看,创建完节点,调用 第2步 fullyRelease(node)进行释放资源
final int fullyRelease(Node node) {
        boolean failed = true;
        try {
	    // 获取所有资源
            int savedState = getState();
	    // 释放资源 成功则返回,直接往下看
            if (release(savedState)) {
                failed = false;
                return savedState;
            } else {
                throw new IllegalMonitorStateException();
            }
        } finally {
            if (failed)
                node.waitStatus = Node.CANCELLED;
        }
}

看release操作,在前一篇ReentrantLock.unlock时已经分析过了,是同一个方法
public final boolean release(int arg) {
        // tryRelease会调用子类ReentrantLock的实现,更新state及锁持有者操作
	if (tryRelease(arg)) {
            Node h = head; // 判断如果还有节点且需要被唤醒
            if (h != null && h.waitStatus != 0)
		// 唤醒head后面的节点,此案例中的线程2(因为在线程1执行任务的时候,线程2调用了lock进行竞争资源,没竞争到锁而进入了同步队列等待),上一篇分析过,不深入了
                unparkSuccessor(h); 
            return true;
        }
        return false;
}

继续看 第3步 isOnSyncQueue() 判断是不是同步节点,显然不是,会进入下面的LockSupport.park(this) 等待。
final boolean isOnSyncQueue(Node node) {
        if (node.waitStatus == Node.CONDITION || node.prev == null)
            return false; // 此处return了
        if (node.next != null)
            return true;
        // 先忽略该方法
        return findNodeFromTail(node);
    }


至此,线程1进入了条件队列进行等待,同时如果同步队列有后继节点需要唤醒的话,会进行唤醒,唤醒后同步队列head指向下一个节点。

接下来看,线程2被唤醒获取到锁之后,执行了任务后再次调用condition.await方法,和线程1一样,加入条件队列等待,释放资源,唤醒同步队列节点如果有的话。


再往后,会由main线程拿到锁资源,然后 开始进行condition.signal操作唤醒条件上的等待节点。
java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject

public final void signal() {	
	// 调用子类ReentrantLock的方法,判断当前线程是否是同步锁的持有者
	// 仅一行判断逻辑:getExclusiveOwnerThread() == Thread.currentThread();)
	if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
	Node first = firstWaiter; // 取第一个条件队列的节点
        if (first != null)
		// 主要逻辑,继续往下看
        	doSignal(first);
}

继续看如何signal的
private void doSignal(Node first) {
	do {
 	   // 条件也有含义 firstWaiter = first.nextWaiter,firstWaiter指向下一个节点,也是关键操作
	   // 如果没有后续节点则条件队列的尾指针lastWaiter赋null
	   if ( (firstWaiter = first.nextWaiter) == null)
                  lastWaiter = null;
	     // 第一个节点先脱离队列,应该是help gc
             first.nextWaiter = null;
	     // 核心方法transferForSignal,返回成功即可跳出循环
          } while (!transferForSignal(first) &&
		  // 上面if判断 firstWaiter已经指向了first的下一个节点,所以此处结合上下文看first已经是下一个节点了
                 (first = firstWaiter) != null);
}

继续看,主要是transferForSignal(first)方法
final boolean transferForSignal(Node node) {
        // 把当前节点的状态更新为0,表明它不再是一个条件队列节点;如果cas失败则当前节点已经被取消了,返回false
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;

        // 加入同步队列尾,只是加入尾部而已。上一篇分析过,不深入了
        Node p = enq(node);
        int ws = p.waitStatus;
	// 原同步队列的尾节点,即node节点的前继节点没取消的情况下,将cas成SIGNAL状态(node节点可安心睡眠)(个人感觉这里不更新也行,node唤醒后也会更新的)
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
	    // 唤醒条件节点第一个线程
            LockSupport.unpark(node.thread);
        return true;
}

至此,signal操作结束,总的就是将条件队列第一个节点移入到同步队列最后一个节点,同时唤醒该节点。


还有最后一哆嗦,唤醒了条件队列的第一个节点的时候,该线程节点它要做什么呢?
也就是我们上面说的await方法的第4步(下面是代码片段,唤醒后发现不在条件队列中则跳出while从此处继续执行):
// 走到这里说明已经被唤醒了;然后竞争资源,失败则继续等待,成功就拿到锁了。
	    // acquireQueued之前见过:竞争资源,失败则继续等待,成功就拿到锁了。此处不会成功的,因为锁被调用signal的线程持有呢。
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
		// interruptMode判断在等待中有没有被中断
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0) 
	// 调试发现不管是signal之前还是之后中断node节点,signal唤醒之后已经在同步队列了则重新中断下;此案例不会走到此。
                reportInterruptAfterWait(interruptMode);

至此,节点正常被唤醒且加入到了同步队列了。

总结几个点

1、条件队列是单向链表,await和signal都需要先获取锁
2、signal唤醒1个节点,会从条件队列的第一个节点进行唤醒。
3、signal唤醒节点,其实就是将其出条件队列,加入同步队列尾部,同时unpark线程。