前言

关于CountDownLatch,在项目中使用的场景还是比较多的,也是基于AQS的一种实现,称做计数器或者闭锁。每个线程可以拿到一个资源,当资源都释放后会有一个总的线程来继续执行,起到线程间通信的作用。


系列文章
一、并发编程系列-同步器 AQS
二、并发编程系列-同步器实现一 ReentrantLock
三、并发编程系列-同步器实现二 ReentrantLock Condition
四、并发编程系列-同步器实现三 CountDownLatch
五、并发编程系列-同步器实现四 Semaphore
六、并发编程系列-同步器实现五 CyclicBarrier


一个例子:
生活中的考试,考试前,老师给每个人发了一份试卷,老师开始等考试结束,快结束时,每个人都把做完的试卷交了上去,等所有的试卷收完后,老师宣布考试结束。
例子中:
学生比喻成一个个线程
老师比喻成总线程,也是等待线程
试卷就是给每个线程的一个资源,老师等待所有的试卷收齐


插播

进入正题前,先了解下线程的interrupt机制
对于一个线程调用,interrupt()时:
1、如果该线程是阻塞状态,例如处于await、sleep时,线程会立即退出阻塞状态,并抛出一个InterruptedException异常;
2、如果该线程是正常状态,只会把该线程的中断标识位置成true,仅仅如此,如果后面有需要判断线程是否中断时,调用Thread.interrupted()判断即可。

如果线程中断了,调用Thread.interrupted()返回true,并重置中断位为false。

CountDownLatch

先了解下CountDownLatch

CountDownLatch的内部类如下

// 和ReentrantLock一个套路,有个静态内部类Sync继承了AQS,不过不是抽象的,说明没有类似公平或非公平的区分,来实现核心的逻辑
private static final class Sync extends AbstractQueuedSynchronizer {
	// 构造函数,指定资源总数
        Sync(int count) {
            setState(count);
        }
	// 获取count,即当前资源数
        int getCount() {
            return getState();
        }
	// 关键:尝试获取共享锁,该方法实现了AQS的抽象方法;逻辑比较简单,资源为0返回1否则返回-1
	// 具体如何用,往下看
        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }
	// 关键:尝试释放共享锁,该方法实现了AQS的抽象方法;
        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) { // 循环将state减1,cas不成功则无限循环到成功为止,除非state被减到0了
                int c = getState();
                if (c == 0) // 无数可减,返回false
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
		    // 如果是减1之后便为0,则返回true,否则返回false
                    return nextc == 0; 
            }
        }
}
// 此处可以看出,与ReentrantLock的区别是,CountDownLatch实现的是以Shared结尾的方法,字面理解为共享,
// 所以CountDownLatch应该是通过共享锁来实现的。

CountDownLatch的变量和方法如下

// 以内部类定义的成员变量
private final Sync sync;

// 构造函数,指定锁的总数
public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
	// 内部类的一个实例化过程
        this.sync = new Sync(count);
}

// 关键方法,等待方法,该方法捕获中断异常并抛出
public void await() throws InterruptedException {
        // 调用内部类sync继承至AQS的方法,请求可中断共享锁1个
	sync.acquireSharedInterruptibly(1);
}
// 同上,只是说该方法支持超时等待
public boolean await(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

// 关键方法,释放当前持有的一个资源
public void countDown() {
	// 调用内部类sync继承至AQS的方法,释放共享锁1个
        sync.releaseShared(1);
}

//// 其它方法一个是获取count的方法,还有一个toString

案例

public static void testCountDownLatch() throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(2);
        System.out.println(DateUtils.getCurrentDateTime() + " 开始测试...");
        new Thread(() -> {
            System.out.println(DateUtils.getCurrentDateTime() + " 我是线程1开始执行");
            try {
                // 模拟任务执行
                Thread.sleep(5 * 1000);
            } catch (InterruptedException e) { e.printStackTrace(); }
            System.out.println(DateUtils.getCurrentDateTime() + " 我是线程1执行结束 countDown()");
            countDownLatch.countDown();
        }).start();

        new Thread(() -> {
            System.out.println(DateUtils.getCurrentDateTime() + " 我是线程2开始执行");
            try {
                // 模拟任务执行
                Thread.sleep(10 * 1000);
            } catch (InterruptedException e) { e.printStackTrace(); }
            System.out.println(DateUtils.getCurrentDateTime() + " 我是线程2执行结束 countDown()");
            countDownLatch.countDown();
        }).start();

        try {
            Thread.sleep(1 * 1000);
        } catch (InterruptedException e) { e.printStackTrace(); }

        System.out.println(DateUtils.getCurrentDateTime() + " 我是主线程,子线程应该正在执行吧,我await等等吧");
        /*
         * 1、当都2个子线程都超时很大的时候,可以debug await方法,进入同步队列
         * 2、当第1个线程设置5000ms睡眠时,第2个线程设置10000ms时,可以debug第2个线程的countDown()
         *  来debug 释放锁的过程;debug 第1个线程的countDown(),比较简单,就是state-1 return。
         **/
        countDownLatch.await();
        System.out.println(DateUtils.getCurrentDateTime() + " 子线程都执行完了,结束下班");
}

输出结果:

2021-11-14 21:42:42 开始测试...
2021-11-14 21:42:42 我是线程1开始执行
2021-11-14 21:42:42 我是线程2开始执行
2021-11-14 21:42:43 我是主线程,子线程应该正在执行吧,我await等等吧
2021-11-14 21:42:47 我是线程1执行结束 countDown()
2021-11-14 21:42:52 我是线程2执行结束 countDown()
2021-11-14 21:42:52 子线程都执行完了,结束下班


案例源码解读

案例解读前,先来通过流程图看下大概的过程
等待及唤醒过程


首先看main线程,初始化CountDownLatch并传入参数2,表示一共2个资源。

// 初始化
CountDownLatch countDownLatch = new CountDownLatch(2);

先跳过线程1和线程2,假设他们都在执行任务中,还没调用countDown(),主线程调用了countDownLatch.await();

java.util.concurrent.CountDownLatch
public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
}

继续看acquireSharedInterruptibly方法
java.util.concurrent.locks.AbstractQueuedSynchronizer
public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
	// 如果调用await的线程被之前中断了,则抛出异常,要知道它是不会影响子线程的执行
        if (Thread.interrupted())
            throw new InterruptedException();
	// 尝试获取共享锁,由子类CountDownLatch实现,往下看
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
}

java.util.concurrent.CountDownLatch
protected int tryAcquireShared(int acquires) {
	// 如果当前资源为0,则返回1,否则返回-1,从调用此方法的if判断可知
	// 如果state为0,说明之前分的共享资源都释放了,可以直接返回了;否则会执行下面的逻辑进入同步队列等待
	return (getState() == 0) ? 1 : -1;
}

继续看,main线程先调用的await方法,state不为0,所以继续执行 doAcquireSharedInterruptibly(arg)方法
java.util.concurrent.locks.AbstractQueuedSynchronizer
private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
	// 似曾相识,创建节点,增加到同步队列的后面,Node.SHARED标识是一个共享节点(用Node的nextWaiter存储记录),同时节点记录当前线程信息
	// addWaiter内部,将新节点加入到同步队列的尾部,如果原队列为空,会创建一个空节点作为头节点,然后新节点加入其后。
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) { // 无限循环
		// 第一次调用await方法,所以前继节点为头节点
                final Node p = node.predecessor();
                if (p == head) {
		    // 此时再次调用子类CountDownLatch实现的tryAcquireShared方法(见上面),如果资源全被释放返回1,否则返回-1
		    //(就是在真正线程等待之前再不甘心的尝试一下,哈哈)
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
			// 如果此时资源都释放了,则传播,此处先不分析,后面再详细说
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
		// 又似曾相识,此时资源没有全部释放,当前线程需要park等待了,往下看
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed) // 后面分析,如果线程中断了,抛出异常,会执行这里,等线程被唤起时,再分析,毕竟线程已经进入等待了,无法执行到这里
                cancelAcquire(node);
        }
}

继续看shouldParkAfterFailedAcquire方法(ReentrantLock一样的)
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        // 获取前继节点的等待状态,AQS那章已经说过,默认是0,会经过下面的cas改成SIGNAL(-1)
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            // 如果是-1,表示后继节点可以安心睡觉了,当前节点锁释放后会唤醒它的
            return true;
        if (ws > 0) {
	    // ws大于1表示节点状态已经取消了,可以跳过该节点了
            do {
		// 比较难看懂,从后往前看,pred = pred.prev表示前节点指到再前一个节点;node.prev = pred当前node节点的前节点指向刚刚的pred。
		// 加上后面那句pred.next = node; 其实就是删除中间的取消节点。
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            // 将waitStatue cas成signal状态
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
}

继续看,shouldParkAfterFailedAcquire最终会返回true,进而调用parkAndCheckInterrupt()方法
private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this); // 线程进入等待,开始睡眠了
        return Thread.interrupted(); // 返回线程是否有被中断
}

至此,主线程进入同步列队进行等待,注意的是,它的标识是一个共享的,即共享锁。


下面我们看线程进行countDown操作

线程1执行完任务了,线程1进行countDown操作

调用 java.util.concurrent.CountDownLatch#countDown
public void countDown() {
	// 继续往下看
        sync.releaseShared(1);
}

进入AQS的方法,释放共享锁
java.util.concurrent.locks.AbstractQueuedSynchronizer
 public final boolean releaseShared(int arg) {
	// 调用子类CountDownLatch实现的tryReleaseShared,往下看
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
	// 因为是第一个线程1先countDownLatch,此时state还不为0,返回false
        return false;
}

看下tryReleaseShared方法,前面分析过,由CountDownLatch实现
protected boolean tryReleaseShared(int releases) {
           // 循环cas将资源数-1,如果是从1减到0的则返回true,否则返回false
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
}

线程2执行完任务了,线程2进行countDown操作
和线程1调用是一样的逻辑,唯一的不同在于state此时被cas成0了
java.util.concurrent.locks.AbstractQueuedSynchronizer
 public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
	    // 此时进入到这里,state从1cas到0,tryReleaseShared返回true了,调用tryReleaseShared逻辑,继续往下看
            doReleaseShared();
            return true;
        }
        return false;
    }

继续看doReleaseShared方法,此方法比较关键
private void doReleaseShared() {
        for (;;) {
	    // 获取头节点
            Node h = head;
	    // 进入此处,头节点不为空,且不等于尾节点
            if (h != null && h != tail) {
                int ws = h.waitStatus;
		// Node.SIGNAL表示后继节点需要被唤醒
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
			// 如果cas更新失败了,则循环
                        continue;	// loop to recheck cases
		    // 唤醒同步队列head的后继节点
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
		    // cas从0到-3,也可能没更新,唤醒节点后,队列的节点还没执行setHead更新头指针,当前线程就(h == head)离开了
                    continue;    
            }
	    // 非常关键,唤醒head后续节点后,继续传播唤醒,直到head位置没发生改变
            if (h == head)	// loop if head changed
                break;
        }
}

至此,所有线程countDownLatch了,资源释放为0,同时唤醒了同步队列等待的共享线程(而且如果多个的话则传播唤醒


最后一哆嗦,等待的线程要被唤醒了
private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) { // 无限循环
		// 第一次调用await方法,p节点为头节点
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
			// 线程被唤起,再次进入循环会到此,往下看
                        setHeadAndPropagate(node, r);
                        p.next = null; // help head节点 GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
}

继续看,setHeadAndPropagate方法
private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        // 将头指针指向node,同时设置线程为null,前序指针指向null
	setHead(node);
        // 此时propagate是为1的
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
	    // 如果当前节点的后继节点为null或后续节点为共享节点,则继续传播唤醒,具体传播唤醒见doReleaseShared,上面分析过了。
            if (s == null || s.isShared())
                doReleaseShared();
        }
 }

至此,等待的节点被唤醒后拿到,可以继续执行任务,结束。

高级的地方

我们再来看看它的牛皮逻辑,搞懂它

我们假设有3个线程,线程1(先)和线程2(后)进入等待,线程3进行唤醒:

当线程3唤醒线程1时,线程3会执行到doReleaseShare方法的if(h == head) break;判断:
1、如果此时线程3唤醒的线程1还没有来得及调用setHeadAndPropagate(node, r);方法更新头指针,那么线程3就会break,线程1继续执行setHeadAndPropagate,更新头指针,重新进入doReleaseShare进行唤醒线程2,以此反复...
2、如果此时线程3唤醒的线程1调用了setHeadAndPropagate(node, r);更新了头指针,同时进入doReleaseShare加入唤醒其它节点,而线程3此时判断if(h == head)会跳过重新进入for循环进行后继节点的唤醒,此时就会有两个线程 线程3和线程1同时唤醒后继节点,除非碰到(h == head)为true(唤醒的节点来没更新head)则推出。

可以看出:
1、共享节点的唤醒是传播的
2、唤醒的共享节点会加入唤醒的“池子”里进行唤醒后续节点,很强大啊!
3、如果被唤醒的节点来不及更新头节点,导致唤醒线程break;也没关系,被唤醒的线程会传播的。


最后的一个假如:

如果线程调用await之前就被设置中断,那么doAcquireSharedInterruptibly会抛出InterruptedException异常,注意再声明下,当前线程抛中断异常,不影响其它线程的执行。
在抛异常时有个finally处理,往下看

private void cancelAcquire(Node node) {
        // Ignore if node doesn't exist
        if (node == null)
            return;
	// 节点的线程先置null,不需要再被唤醒的
        node.thread = null;

        // 获取前续节点,如果是取消节点的话,则一直往前找,是node的前继节点指向它。
        Node pred = node.prev;
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;

        // 前继节点的下一个节点,
        Node predNext = pred.next;
	// 当前节点标记为取消
        node.waitStatus = Node.CANCELLED;
	// 下面的操作判断是否是尾节点,如果是的话移出,如果不是,判断是不是头节点且后续节点是否需要唤醒,移出当前节点,唤醒后续节点
        if (node == tail && compareAndSetTail(node, pred)) {
            compareAndSetNext(pred, predNext, null);
        } else {
            int ws;
            if (pred != head &&
                ((ws = pred.waitStatus) == Node.SIGNAL ||
                 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                pred.thread != null) {
                Node next = node.next;
                if (next != null && next.waitStatus <= 0)
                    compareAndSetNext(pred, predNext, next);
            } else {
                unparkSuccessor(node);
            }
            node.next = node; // help GC
        }
}

至此,案例源码解读完成。

总结几个点

1、CountDownLatch维护了一个同步队列,调用await方法的线程会被加入到同步队列中,并进入等待睡眠。
2、进入睡眠前,都会cas更新前继节点的waitStatus为-1(AQS的机制)
3、doReleaseShared用的很高级,唤醒不仅是传播的,被唤醒的节点线程会加入唤醒的“池子”。