并发编程系列-同步器实现三 CountDownLatch
前言
关于CountDownLatch,在项目中使用的场景还是比较多的,也是基于AQS的一种实现,称做计数器或者闭锁。每个线程可以拿到一个资源,当资源都释放后会有一个总的线程来继续执行,起到线程间通信的作用。
系列文章
一、并发编程系列-同步器 AQS
二、并发编程系列-同步器实现一 ReentrantLock
三、并发编程系列-同步器实现二 ReentrantLock Condition
四、并发编程系列-同步器实现三 CountDownLatch
五、并发编程系列-同步器实现四 Semaphore
六、并发编程系列-同步器实现五 CyclicBarrier
一个例子:
生活中的考试,考试前,老师给每个人发了一份试卷,老师开始等考试结束,快结束时,每个人都把做完的试卷交了上去,等所有的试卷收完后,老师宣布考试结束。
例子中:
学生比喻成一个个线程
老师比喻成总线程,也是等待线程
试卷就是给每个线程的一个资源,老师等待所有的试卷收齐
插播
进入正题前,先了解下线程的interrupt机制
对于一个线程调用,interrupt()时:
1、如果该线程是阻塞状态,例如处于await、sleep时,线程会立即退出阻塞状态,并抛出一个InterruptedException异常;
2、如果该线程是正常状态,只会把该线程的中断标识位置成true,仅仅如此,如果后面有需要判断线程是否中断时,调用Thread.interrupted()判断即可。
如果线程中断了,调用Thread.interrupted()返回true,并重置中断位为false。
CountDownLatch
先了解下CountDownLatch
CountDownLatch的内部类如下
// 和ReentrantLock一个套路,有个静态内部类Sync继承了AQS,不过不是抽象的,说明没有类似公平或非公平的区分,来实现核心的逻辑
private static final class Sync extends AbstractQueuedSynchronizer {
// 构造函数,指定资源总数
Sync(int count) {
setState(count);
}
// 获取count,即当前资源数
int getCount() {
return getState();
}
// 关键:尝试获取共享锁,该方法实现了AQS的抽象方法;逻辑比较简单,资源为0返回1否则返回-1
// 具体如何用,往下看
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
// 关键:尝试释放共享锁,该方法实现了AQS的抽象方法;
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) { // 循环将state减1,cas不成功则无限循环到成功为止,除非state被减到0了
int c = getState();
if (c == 0) // 无数可减,返回false
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
// 如果是减1之后便为0,则返回true,否则返回false
return nextc == 0;
}
}
}
// 此处可以看出,与ReentrantLock的区别是,CountDownLatch实现的是以Shared结尾的方法,字面理解为共享,
// 所以CountDownLatch应该是通过共享锁来实现的。
CountDownLatch的变量和方法如下
// 以内部类定义的成员变量
private final Sync sync;
// 构造函数,指定锁的总数
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
// 内部类的一个实例化过程
this.sync = new Sync(count);
}
// 关键方法,等待方法,该方法捕获中断异常并抛出
public void await() throws InterruptedException {
// 调用内部类sync继承至AQS的方法,请求可中断共享锁1个
sync.acquireSharedInterruptibly(1);
}
// 同上,只是说该方法支持超时等待
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
// 关键方法,释放当前持有的一个资源
public void countDown() {
// 调用内部类sync继承至AQS的方法,释放共享锁1个
sync.releaseShared(1);
}
//// 其它方法一个是获取count的方法,还有一个toString
案例
public static void testCountDownLatch() throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(2);
System.out.println(DateUtils.getCurrentDateTime() + " 开始测试...");
new Thread(() -> {
System.out.println(DateUtils.getCurrentDateTime() + " 我是线程1开始执行");
try {
// 模拟任务执行
Thread.sleep(5 * 1000);
} catch (InterruptedException e) { e.printStackTrace(); }
System.out.println(DateUtils.getCurrentDateTime() + " 我是线程1执行结束 countDown()");
countDownLatch.countDown();
}).start();
new Thread(() -> {
System.out.println(DateUtils.getCurrentDateTime() + " 我是线程2开始执行");
try {
// 模拟任务执行
Thread.sleep(10 * 1000);
} catch (InterruptedException e) { e.printStackTrace(); }
System.out.println(DateUtils.getCurrentDateTime() + " 我是线程2执行结束 countDown()");
countDownLatch.countDown();
}).start();
try {
Thread.sleep(1 * 1000);
} catch (InterruptedException e) { e.printStackTrace(); }
System.out.println(DateUtils.getCurrentDateTime() + " 我是主线程,子线程应该正在执行吧,我await等等吧");
/*
* 1、当都2个子线程都超时很大的时候,可以debug await方法,进入同步队列
* 2、当第1个线程设置5000ms睡眠时,第2个线程设置10000ms时,可以debug第2个线程的countDown()
* 来debug 释放锁的过程;debug 第1个线程的countDown(),比较简单,就是state-1 return。
**/
countDownLatch.await();
System.out.println(DateUtils.getCurrentDateTime() + " 子线程都执行完了,结束下班");
}
输出结果:
2021-11-14 21:42:42 开始测试...
2021-11-14 21:42:42 我是线程1开始执行
2021-11-14 21:42:42 我是线程2开始执行
2021-11-14 21:42:43 我是主线程,子线程应该正在执行吧,我await等等吧
2021-11-14 21:42:47 我是线程1执行结束 countDown()
2021-11-14 21:42:52 我是线程2执行结束 countDown()
2021-11-14 21:42:52 子线程都执行完了,结束下班
案例源码解读
案例解读前,先来通过流程图看下大概的过程
首先看main线程,初始化CountDownLatch并传入参数2,表示一共2个资源。
// 初始化
CountDownLatch countDownLatch = new CountDownLatch(2);
先跳过线程1和线程2,假设他们都在执行任务中,还没调用countDown(),主线程调用了countDownLatch.await();
java.util.concurrent.CountDownLatch
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
继续看acquireSharedInterruptibly方法
java.util.concurrent.locks.AbstractQueuedSynchronizer
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
// 如果调用await的线程被之前中断了,则抛出异常,要知道它是不会影响子线程的执行
if (Thread.interrupted())
throw new InterruptedException();
// 尝试获取共享锁,由子类CountDownLatch实现,往下看
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
java.util.concurrent.CountDownLatch
protected int tryAcquireShared(int acquires) {
// 如果当前资源为0,则返回1,否则返回-1,从调用此方法的if判断可知
// 如果state为0,说明之前分的共享资源都释放了,可以直接返回了;否则会执行下面的逻辑进入同步队列等待
return (getState() == 0) ? 1 : -1;
}
继续看,main线程先调用的await方法,state不为0,所以继续执行 doAcquireSharedInterruptibly(arg)方法
java.util.concurrent.locks.AbstractQueuedSynchronizer
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
// 似曾相识,创建节点,增加到同步队列的后面,Node.SHARED标识是一个共享节点(用Node的nextWaiter存储记录),同时节点记录当前线程信息
// addWaiter内部,将新节点加入到同步队列的尾部,如果原队列为空,会创建一个空节点作为头节点,然后新节点加入其后。
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) { // 无限循环
// 第一次调用await方法,所以前继节点为头节点
final Node p = node.predecessor();
if (p == head) {
// 此时再次调用子类CountDownLatch实现的tryAcquireShared方法(见上面),如果资源全被释放返回1,否则返回-1
//(就是在真正线程等待之前再不甘心的尝试一下,哈哈)
int r = tryAcquireShared(arg);
if (r >= 0) {
// 如果此时资源都释放了,则传播,此处先不分析,后面再详细说
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
// 又似曾相识,此时资源没有全部释放,当前线程需要park等待了,往下看
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed) // 后面分析,如果线程中断了,抛出异常,会执行这里,等线程被唤起时,再分析,毕竟线程已经进入等待了,无法执行到这里
cancelAcquire(node);
}
}
继续看shouldParkAfterFailedAcquire方法(ReentrantLock一样的)
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 获取前继节点的等待状态,AQS那章已经说过,默认是0,会经过下面的cas改成SIGNAL(-1)
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
// 如果是-1,表示后继节点可以安心睡觉了,当前节点锁释放后会唤醒它的
return true;
if (ws > 0) {
// ws大于1表示节点状态已经取消了,可以跳过该节点了
do {
// 比较难看懂,从后往前看,pred = pred.prev表示前节点指到再前一个节点;node.prev = pred当前node节点的前节点指向刚刚的pred。
// 加上后面那句pred.next = node; 其实就是删除中间的取消节点。
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 将waitStatue cas成signal状态
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
继续看,shouldParkAfterFailedAcquire最终会返回true,进而调用parkAndCheckInterrupt()方法
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this); // 线程进入等待,开始睡眠了
return Thread.interrupted(); // 返回线程是否有被中断
}
至此,主线程进入同步列队进行等待,注意的是,它的标识是一个共享的,即共享锁。
下面我们看线程进行countDown操作
线程1执行完任务了,线程1进行countDown操作
调用 java.util.concurrent.CountDownLatch#countDown
public void countDown() {
// 继续往下看
sync.releaseShared(1);
}
进入AQS的方法,释放共享锁
java.util.concurrent.locks.AbstractQueuedSynchronizer
public final boolean releaseShared(int arg) {
// 调用子类CountDownLatch实现的tryReleaseShared,往下看
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
// 因为是第一个线程1先countDownLatch,此时state还不为0,返回false
return false;
}
看下tryReleaseShared方法,前面分析过,由CountDownLatch实现
protected boolean tryReleaseShared(int releases) {
// 循环cas将资源数-1,如果是从1减到0的则返回true,否则返回false
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
线程2执行完任务了,线程2进行countDown操作
和线程1调用是一样的逻辑,唯一的不同在于state此时被cas成0了
java.util.concurrent.locks.AbstractQueuedSynchronizer
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
// 此时进入到这里,state从1cas到0,tryReleaseShared返回true了,调用tryReleaseShared逻辑,继续往下看
doReleaseShared();
return true;
}
return false;
}
继续看doReleaseShared方法,此方法比较关键
private void doReleaseShared() {
for (;;) {
// 获取头节点
Node h = head;
// 进入此处,头节点不为空,且不等于尾节点
if (h != null && h != tail) {
int ws = h.waitStatus;
// Node.SIGNAL表示后继节点需要被唤醒
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
// 如果cas更新失败了,则循环
continue; // loop to recheck cases
// 唤醒同步队列head的后继节点
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
// cas从0到-3,也可能没更新,唤醒节点后,队列的节点还没执行setHead更新头指针,当前线程就(h == head)离开了
continue;
}
// 非常关键,唤醒head后续节点后,继续传播唤醒,直到head位置没发生改变
if (h == head) // loop if head changed
break;
}
}
至此,所有线程countDownLatch了,资源释放为0,同时唤醒了同步队列等待的共享线程(而且如果多个的话则传播唤醒)
最后一哆嗦,等待的线程要被唤醒了private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) { // 无限循环
// 第一次调用await方法,p节点为头节点
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
// 线程被唤起,再次进入循环会到此,往下看
setHeadAndPropagate(node, r);
p.next = null; // help head节点 GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
继续看,setHeadAndPropagate方法
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
// 将头指针指向node,同时设置线程为null,前序指针指向null
setHead(node);
// 此时propagate是为1的
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
// 如果当前节点的后继节点为null或后续节点为共享节点,则继续传播唤醒,具体传播唤醒见doReleaseShared,上面分析过了。
if (s == null || s.isShared())
doReleaseShared();
}
}
至此,等待的节点被唤醒后拿到,可以继续执行任务,结束。
高级的地方
我们再来看看它的牛皮逻辑,搞懂它
我们假设有3个线程,线程1(先)和线程2(后)进入等待,线程3进行唤醒:
当线程3唤醒线程1时,线程3会执行到doReleaseShare方法的if(h == head) break;判断:
1、如果此时线程3唤醒的线程1还没有来得及调用setHeadAndPropagate(node, r);方法更新头指针,那么线程3就会break,线程1继续执行setHeadAndPropagate,更新头指针,重新进入doReleaseShare进行唤醒线程2,以此反复...
2、如果此时线程3唤醒的线程1调用了setHeadAndPropagate(node, r);更新了头指针,同时进入doReleaseShare加入唤醒其它节点,而线程3此时判断if(h == head)会跳过重新进入for循环进行后继节点的唤醒,此时就会有两个线程 线程3和线程1同时唤醒后继节点,除非碰到(h == head)为true(唤醒的节点来没更新head)则推出。
可以看出:
1、共享节点的唤醒是传播的
2、唤醒的共享节点会加入唤醒的“池子”里进行唤醒后续节点,很强大啊!
3、如果被唤醒的节点来不及更新头节点,导致唤醒线程break;也没关系,被唤醒的线程会传播的。
最后的一个假如:
如果线程调用await之前就被设置中断,那么doAcquireSharedInterruptibly会抛出InterruptedException异常,注意再声明下,当前线程抛中断异常,不影响其它线程的执行。
在抛异常时有个finally处理,往下看
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;
// 节点的线程先置null,不需要再被唤醒的
node.thread = null;
// 获取前续节点,如果是取消节点的话,则一直往前找,是node的前继节点指向它。
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// 前继节点的下一个节点,
Node predNext = pred.next;
// 当前节点标记为取消
node.waitStatus = Node.CANCELLED;
// 下面的操作判断是否是尾节点,如果是的话移出,如果不是,判断是不是头节点且后续节点是否需要唤醒,移出当前节点,唤醒后续节点
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
至此,案例源码解读完成。
总结几个点
1、CountDownLatch维护了一个同步队列,调用await方法的线程会被加入到同步队列中,并进入等待睡眠。
2、进入睡眠前,都会cas更新前继节点的waitStatus为-1(AQS的机制)
3、doReleaseShared用的很高级,唤醒不仅是传播的,被唤醒的节点线程会加入唤醒的“池子”。