多线程进阶——JUC并发编程之CountDownLatch源码一探究竟

编程

百度翻译如下:

一种同步辅助程序,允许一个或多个线程等待在其它线程中执行的一组操作完成。使用给定的计数初始化CountDownLatch。由于调用了countDown()方法,await方法阻塞直到当前计数为零,之后释放所有等待线程,并立即返回await的任何后续调用。这是一个一次性现象——计数不能重置。如果需要重置计数的版本,请考虑使用CyclicBarrier。倒计时锁存器是一种通用的同步工具,可用于多种目的。使用计数1初始化的倒计时锁存器用作简单的开/关锁存器或门:调用倒计时()的线程打开它之前,调用它的所有线程都在门处等待。初始化为N的倒计时锁存器可用于使一个线程等待N个线程完成某个操作或某个操作已完成N次。倒计时锁存器的一个有用特性是,它不要求调用倒计时的线程在继续之前等待计数达到零,它只是防止任何线程在所有线程都可以通过之前继续通过等待。

先来个demo

// 计数器

public class CountDownLatchDemo {

public static void main(String[] args) throws InterruptedException {

// 总数是6,必须要执行任务的时候,再使用!

CountDownLatch countDownLatch = new CountDownLatch(6);

for (int i = 1; i <=6 ; i++) {

new Thread(()->{

System.out.println(Thread.currentThread().getName()+" Go out");

countDownLatch.countDown(); // 数量-1

},String.valueOf(i)).start();

}

countDownLatch.await(); // 等待计数器归零,然后再向下执行

System.out.println("Close Door");

}

}

结果如下:CountDownLatch可以理解为减法计数器

废话不多说,下面我们开始对CountDownLatch源码进行分析

首先我们打开CountDownLatch源码类(我把多余的注释都去掉了...):

package java.util.concurrent;

import java.util.concurrent.locks.AbstractQueuedSynchronizer;

public class CountDownLatch {

private static final class Sync extends AbstractQueuedSynchronizer {

private static final long serialVersionUID = 4982264981922014374L;

//设置同步状态

Sync(int count) {

setState(count);

}

//获取同步状态的值

int getCount() {

return getState();

}

//获取共享锁,1、getState>1返回1:表示获取到共享锁,-1:表示没有获取到共享锁

protected int tryAcquireShared(int acquires) {

return (getState() == 0) ? 1 : -1;

}

//释放共享锁

protected boolean tryReleaseShared(int releases) {

// Decrement count; signal when transition to zero

for (;;) {

int c = getState();

if (c == 0)

return false;

int nextc = c-1;

//通过CAS设置同步状态值,如果设置失败则说明同一时刻有其它线程在设置,但是会通过自旋的方式最终设置成功

if (compareAndSetState(c, nextc))

return nextc == 0;

}

}

}

找到它的构造函数,来瞄一眼!

    private final Sync sync;

public CountDownLatch(int count) {

if (count < 0) throw new IllegalArgumentException("count < 0");

this.sync = new Sync(count);

}

首先从构造函数出发,初始化变量,其中Sync是一个AQS的子类,构造函数如下:

public class CountDownLatch {

/**

* Synchronization control For CountDownLatch.

* Uses AQS state to represent count.

*/

private static final class Sync extends AbstractQueuedSynchronizer {

private static final long serialVersionUID = 4982264981922014374L;

Sync(int count) {

setState(count);

}

int getCount() {

return getState();

}

    private volatile int state;

protected final void setState(int newState) {

state = newState;

}

设置状态变量state,其中state是一个volatile关键字,可用来保证可见性,不懂volatile可以看看这篇博客:多线程进阶——狂神说java之JUC并发编程,里面详细介绍了volatile的作用!

由上面可知实际上是把计数器的值赋值给了AQS的state,也就是这里AQS的状态值来表示计数器值。

接下来主要看一下CountDownLatch中几个重要的方法内部是如何调用AQS来实现功能的。

await()方法:调用await()方法的线程会被挂起,它会等待直到count=0才继续执行,一般由主线程调用。

    public void await() throws InterruptedException {

sync.acquireSharedInterruptibly(1);

}

//AQS获取共享资源时,该方法是响应中断的

public final void acquireSharedInterruptibly(int arg)

throws InterruptedException {

//如果线程中断则抛出异常

if (Thread.interrupted())

throw new InterruptedException();

//tryAcquireShared(arg)是AQS提供的模板方法

//尝试看当前计数器值是否为0,为0则直接返回,否则进入AQS的等待队列

if (tryAcquireShared(arg) < 0)

doAcquireSharedInterruptibly(arg);

}

    public static boolean interrupted() {//判断是否中断

return currentThread().isInterrupted(true);

}

、、最终调用本地Native方法,C,C++

private native boolean isInterrupted(boolean ClearInterrupted);

接下来看看第二个判断:

 //尝试看当前计数器值是否为0,为0则直接返回1,否则返回-1       

protected int tryAcquireShared(int acquires) {

return (getState() == 0) ? 1 : -1;

}

分析上面代码我们不难发现:await()方法调用 acquireSharedInterruptibly(int arg)的时候传递的是1,就说明要获取一个资源,而这里计数器的值 getState()获得的是资源总数,acquireSharedInterruptibly内部首先判断当前线程是否被中断了,是则抛出异常,否则调用sync实现的tryAcquireShared方法,查看当前状态值state(计数器)值是否为0,为0 则当前线程的await()方法直接返回1,否则调用AQS的 doAcquireSharedInterruptibly让当前线程阻塞。

接下来我们瞅瞅源码 doAcquireSharedInterruptibly(int arg)方法如何实现阻塞当前线程

1、将当前线程构造成共享模式节点,通过自旋的方式尝试获取同步状态

2、如果获取同步状态成功,则唤醒后续处于共享模式下的节点;如果没有获取到同步状态,则对调用shouldParkAfterFailedAcquire和parkAndCheckInterrupt方法挂起当前线程,这样可以避免该线程无线循环而获取不到共享锁,从而造成资源的浪费。

需要注意的点:

当有多个线程调用 await()方法时,这些线程都会通过addWaiter(Node.SHARED)方法被构造成节点加入到等待队列中。

当最后一个调用 countDown()方法的线程执行了countDown()后(这里有点拗口),会唤醒处于等待队列中距离头节点最近的一个节点, 也就是说该线程被唤醒之后会继续自旋尝试获取同步状态,此时执行到 tryAcquireShared(int)方法时,发现r大于0(因为state已经被置为0了) 该线程就会调用setHeadAndPropagate(Node, int)方法将唤醒传递下去,并且退出当前循环,开始执行awat()方法之后的代码。

//这里将当前线程构造成Node节点加入到等待队列中,并通过自旋的方式尝试获取共享锁,并且该方法是响应中断的

private void doAcquireSharedInterruptibly(int arg)

throws InterruptedException {

//将当前线程构建成共享模式的节点加入到等待队列中

final Node node = addWaiter(Node.SHARED);

boolean failed = true;

try {

for (;;) {

final Node p = node.predecessor();

if (p == head) {

//获取同步状态,这里count值如果不为0,则r的值一直为-1.这个时候循环会继续,除非线程被中断

int r = tryAcquireShared(arg);

//如果r>=0说明此时同步状态的值为0,获取到共享锁

if (r >= 0) {

//处理后续节点

setHeadAndPropagate(node, r);

p.next = null; // help GC

failed = false;

return;

}

}

//挂起等待被唤醒

if (shouldParkAfterFailedAcquire(p, node) &&

parkAndCheckInterrupt())

throw new InterruptedException();

}

} finally {

if (failed)

//目的是由于响应中断或者其它异常情况会导致执行这个函数:主要用于唤醒后继节点和取消某个节点

cancelAcquire(node);

}

}

我们先对addWriter进行具体的剖析,瞅瞅线程如何变成节点被添加进去:实现如下

表明是第一个创建节点,或者是已经被其他线程修改过了会进入到这来,这里无限循环设置Node头尾节点,只有设置成功才会退出,所以该节点一定会被添加

好了,到这里我们的addWaiter方法也就分析完了,下面我们继续跟进,添加完节点之后,检查并更新无法获取的节点的状态。如果线程应该阻塞,则返回true。

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {

int ws = pred.waitStatus;

//只有当当前节点状态为Singal才返回true

if (ws == Node.SIGNAL)

return true;

if (ws > 0) {

do {

node.prev = pred = pred.prev;

} while (pred.waitStatus > 0);

pred.next = node;

} else {

compareAndSetWaitStatus(pred, ws, Node.SIGNAL);

}

return false;

}

我们对shouldParkAfterFailedAcquire来进行一个整体的概述,首先应该明白节点的状态,节点的状态是为了表明当前线程的良好度,如果当前线程被打断了,在唤醒的过程中是不是应该忽略该线程,这个状态标志就是用来做这个的具体有如下几种 :

线程已经被取消

static final int CANCELLED = 1;

线程需要去被唤醒

static final int SIGNAL = -1;

线程正在唤醒等待条件

static final int CONDITION = -2;

//线程的共享锁应该被无条件传播

static final int PROPAGATE = -3;

shouldParkAfterFailedAcquire是位于无限for循环内的,这一点需要注意一般每个节点都会经历两次循环后然后被阻塞。建议读者试着走一遍,以加深理解 ,当该函数返货true时 线程调用parkAndCheckInterrupt这个阻塞自身。到这里基本每个调用await函数都阻塞在这里 (很关键哦,应为下次唤醒,从这里开始执行哦)
接着让我们来看看countDown这个函数的玄机吧,因为线程就是通过这个来函数来触发唤醒条件的

//调用countDown()释放同步状态,每次调用同步状态值-1

public void countDown() {

sync.releaseShared(1);

}

public final boolean releaseShared(int arg) {

//tryReleaseShared方法必须保证同步状态线程安全释放,一般是通过CAS和循环来实现

if (tryReleaseShared(arg)) {

//唤醒后续处于等待的节点

doReleaseShared();

return true;

}

return false;

}

看一下判断条件tryReleaseShared函数

protected boolean tryReleaseShared(int releases) {

// Decrement count; signal when transition to zero

for (;;) {

int c = getState();

if (c == 0)

return false;

int nextc = c-1;

if (compareAndSetState(c, nextc))//自旋减一

return nextc == 0;//next为0返回true

}

}

}

也就是说当state减1后为0时才会返回为真 执行后面的唤醒条件,否则都一概忽略,假设达到唤醒条件 具体来看如何唤醒 ,函数如下

//释放共享锁,通知后续节点,主要是唤醒调用了await方法的线程(一般为主线程)

private void doReleaseShared() {

for (;;) {

Node h = head;//获取头节点

if (h != null && h != tail) {

int ws = h.waitStatus;

//SIGNAL为-1,后继节点的线程处于等待状态,当前节点的线程如果释放了同步状态或者被取消,将会通知后继节点

if (ws == Node.SIGNAL) { //头结点的状态为Node.SIGNAL

//将头结点的状态值设置为0

if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))

continue; // loop to recheck cases

unparkSuccessor(h);//这里唤醒后继节点

}

//WaitStatus为0的时候表示为初始状态,设置当前节点为-3,表示线程的共享锁应该被无条件传播

else if (ws == 0 &&

!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))

continue; // loop on failed CAS

}

//如果h还是指向头结点,说明没有其他节点对头结点进行修改

if (h == head) // loop if head changed

break;

}

}

只有当在最后一个执行 countDown()方法的线程时,才会进入在doReleaseShared()方法中,其大致的逻辑如下:

1、判断head节点不为null,且不为tail节点,说明等待队列中有等待唤醒的线程,在等待队列中,头结点中并没有保存正在等待的线程,其只是一个空的Node节点,真正等待的线程是从头结点的下一个节点开始排队等待的。

2、在判断等待队列中有正在等待的线程之后,将头结点的状态信息置为初始状态,并且调用 unparkSuccessor(Node)方法唤醒后继节点,使后续节点可以尝试去获取共享锁。

3、如果头结点的的 waitStatus为0此时为初始状态 ,则将头结点的 waitStatus设置为为-3,表示下一次同步状态的获取将会无条件的传播下去。

4、头结点没有被其他线程修改,则跳出循环。

private void unparkSuccessor(Node node) {

int ws = node.waitStatus;

if (ws < 0)

compareAndSetWaitStatus(node, ws, 0);

Node s = node.next;

if (s == null || s.waitStatus > 0) {

s = null;

for (Node t = tail; t != null && t != node; t = t.prev)

if (t.waitStatus <= 0)

s = t;

}

if (s != null)

LockSupport.unpark(s.thread);//唤醒线程

}

对于这个唤醒操作很好理解的,首先取该节点的后节点进行唤醒,如果后节点已被取消,则从最后一个开始往前找,找一个满足添加的节点进行唤醒

有人肯能会有疑问,要是如果有多个节点只在这进行一次唤醒工作吗?难道只唤醒一个线程就可以了?哈哈别急还记得线程是在哪阻塞的吗 让我们回来前面去看线程被阻塞的地方 (忘记了可以往前看看)

 private void doAcquireSharedInterruptibly(int arg)

throws InterruptedException {

final Node node = addWaiter(Node.SHARED);

boolean failed = true;

try {

for (;;) {

final Node p = node.predecessor();

if (p == head) {

int r = tryAcquireShared(arg);

if (r >= 0) {

setHeadAndPropagate(node, r);

p.next = null; // help GC

failed = false;

return;

}

}

if (shouldParkAfterFailedAcquire(p, node) &&

parkAndCheckInterrupt())//我们知道线程是在这里被阻塞了

throw new InterruptedException();

}

} finally {

if (failed)

cancelAcquire(node);

}

}

关键来了,线程在这里被阻塞,唤醒后继续执行,由于满足条件,state的状态值为0,函数返回值为1 ,大于0会进入其中我们继续往下看 这一小段

private void setHeadAndPropagate(Node node, int propagate) {

Node h = head; // Record old head for check below

setHead(node);//设置Head为头结点

//如果走到了setHeadAndPropagate方法,那么propagate的值一定大于1,if条件成立

if (propagate > 0 || h == null || h.waitStatus < 0 ||

(h = head) == null || h.waitStatus < 0) {

//获取当前节点的下一个节点

Node s = node.next;

//如果下一个节点为null或者shared节点释放共享锁

if (s == null || s.isShared())

doReleaseShared();//进入这里

}

}

这个函数相信你不陌生吧,就是第一个释放锁所调用的,在这里,被唤醒的线程在调用一次,依赖唤醒后续线程

private void doReleaseShared() {

for (;;) {

Node h = head;

if (h != null && h != tail) {

int ws = h.waitStatus;

if (ws == Node.SIGNAL) {

if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))

continue; // loop to recheck cases

unparkSuccessor(h);

}

else if (ws == 0 &&

!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))

continue; // loop on failed CAS

}

//明白这里为什么要加一次判断了吧!!!,被唤醒的线程会在执行该函数

if (h == head) // loop if head changed

break;

}

}

现在明白其唤醒机制了吧 先唤醒一个线程(第一个阻塞的线程) 然后被唤醒的线程又会执行到这里唤醒线程,如此重复下去

最终所有线程都会被唤醒, 其实这也是AQS共享锁的唤醒原理,自此完成了对countDownLatch阻塞和唤醒原理的基本分析。

总结:

CountDownLatch还提供了超时等待机制,在特定时间后就不会再阻塞当前线程;不可能重新初始化或者修改CountDownLatch对象的内部计数器的值。一个线程调用countDown方法happen-before,另外一个线程调用await方法。

CountDownLatch 底层实现依赖于AQS共享锁的实现机制,首先初始化计数器count,调用countDown()方法时,计数器count-1,当计数器count=0时,会唤醒处于AQS等待队列中的线程。调用await()方法,线程会被挂起,他会等待直到count=0才会继续执行,否则会加入到等待队列中,等待被唤醒。

以上是 多线程进阶——JUC并发编程之CountDownLatch源码一探究竟 的全部内容, 来源链接: utcz.com/z/513844.html

回到顶部