【Java】Java高并发之CountDownLatch源码分析

Java高并发之CountDownLatch源码分析

入门小站发布于 今天 14:50

概述

【Java】Java高并发之CountDownLatch源码分析

使用方法

public static void main(String[] args) {

Test countDownLatchTest=new Test();

countDownLatchTest.runThread();

}

//计数器为10,代表有10个线程等待创建

CountDownLatch countDownLatch=new CountDownLatch(10);

/**

* 创建一个线程

* @return

*/

private Thread createThread(int i){

Thread thread=new Thread(new Runnable() {

@Override

public void run() {

try {

//在此等待,直到计数器变为0

countDownLatch.await();

System.out.println("thread"+Thread.currentThread().getName()+"准备完毕"+System.currentTimeMillis());

}catch (InterruptedException e){

e.printStackTrace();

}

}

});

thread.setName("thread-"+i);

return thread;

}

public void runThread(){

ExecutorService executorService= Executors.newFixedThreadPool(10);

try {

for(int i=0;i<10;i++){

Thread.sleep(100);

executorService.submit(createThread(i));

//一个线程创建好了,待创建的线程数减一

countDownLatch.countDown();

}

}catch (InterruptedException e){

e.printStackTrace();

}

}

源码分析

【Java】Java高并发之CountDownLatch源码分析

从锁的分类上来讲,CountDownLatch 其实是一个” 共享锁 “。还有一个需要注意的是 CountDownLath 是响应中断的,如果线程在对锁进行操作的期间发生了中断,会直接抛出 InterruptedException。

源码分析

计数器的本质是什么?

public CountDownLatch(int count) {

if (count < 0) throw new IllegalArgumentException("count < 0");

this.sync = new Sync(count);

}

Sync(int count) {

setState(count); //就是修改了AQS中的state值

}

await 方法

  1. await 方法实际上是调用了 sync 的一个方法

public void await() throws InterruptedException {

sync.acquireSharedInterruptibly(1);

}

  1. sync 的void acquireSharedInterruptibly(int arg)的实现如下

public final void acquireSharedInterruptibly(int arg)

throws InterruptedException {

if (Thread.interrupted())

//如果线程中断了,则抛异常。

//证明了之前所说的CountDownLatch是会响应中断的

throw new InterruptedException();

if (tryAcquireShared(arg) < 0)

doAcquireSharedInterruptibly(arg);

}

  1. 如果没有中断,就会调用tryAcquireShared(arg)
    它的实现非常的简单,如果 state 为 0,就返回 1,否则返回 - 1

protected int tryAcquireShared(int acquires) {

return (getState() == 0) ? 1 : -1;

}

  1. 如果 state 不为 0,就会返回 - 1,if 条件成立,就会调用doAcquireSharedInterruptibly(arg)
    这个方法的实现,稍微复杂一点,但这个方法也不陌生了,它的功能就是把该线程加入等待队列中并阻塞,但是在入队之后,不一定会立即 park 阻塞,它会判断自己是否是第二个节点,如果是就会再次尝试获取。

private void doAcquireSharedInterruptibly(int arg)

throws InterruptedException {

final Node node = addWaiter(Node.SHARED);

boolean failed = true;

try {

for (;;) {

final Node p = node.predecessor(); //获取当前节点的前驱节点

if (p == head) {//前一个节点是头节点

int r = tryAcquireShared(arg); //去看一看state是否为0,步骤3分析过

if (r >= 0) {

//如果state目前为0,就出队

setHeadAndPropagate(node, r);

p.next = null; // help GC

failed = false;

return;

}

}

if (shouldParkAfterFailedAcquire(p, node) &&

parkAndCheckInterrupt())

//进入阻塞队列阻塞,如果发生中断,则抛异常

throw new InterruptedException();

}

} finally {

if (failed)

cancelAcquire(node);

}

countDown 方法

  1. countDown 方法实际上是调用 sync 中的一个方法

public void countDown() {

sync.releaseShared(1);

}

  1. boolean releaseShared(int arg)的具体实现如下:

public final boolean releaseShared(int arg) {

if (tryReleaseShared(arg)) {

doReleaseShared();

return true;

}

return false;

}

  1. tryReleaseShared(arg)方法的具体实现如下:

protected boolean tryReleaseShared(int releases) {

// Decrement count; signal when transition to zero

for (;;) {//自旋

int c = getState();

if (c == 0)//计数器已经都是0了,当然会释放失败咯

return false;

int nextc = c-1;//释放后,计数器减一

if (compareAndSetState(c, nextc))//CAS修改计数器

return nextc == 0;

}

}

  1. doReleaseShared();的实现如下,它的作用就是state为0的时候,去唤醒等待队列中的线程。

private void doReleaseShared() {

/*

* 如果head需要通知下一个节点,调用unparkSuccessor

* 如果不需要通知,需要在释放后把waitStatus改为PROPAGATE来继续传播

* 此外,我们必须通过自旋来CAS以防止操作时有新节点加入

* 另外,不同于其他unparkSuccessor的用途,我们需要知道CAS设置状态失败的情况,

* 以便进行重新检查。

*/

for (;;) {

//唤醒操作由头结点开始,注意这里的头节点已经是上面新设置的头结点了

//其实就是唤醒上面新获取到共享锁的节点的后继节点

Node h = head;

if (h != null && h != tail) {

int ws = h.waitStatus;

//表示后继节点需要被唤醒

if (ws == Node.SIGNAL) {

//这里需要控制并发,因为入口有setHeadAndPropagate跟release两个,避免两次unpark

if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))

continue;

//执行唤醒操作

unparkSuccessor(h);

}

//如果后继节点暂时不需要唤醒,则把当前节点状态设置为PROPAGATE确保以后可以传递下去

else if (ws == 0 &&

!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))

continue;

}

//如果头结点没有发生变化,表示设置完成,退出循环

//如果头结点发生变化,比如说其他线程获取到了锁,为了使自己的唤醒动作可以传递,必须进行重试

if (h == head)

break;

}

}

关注微信公众号:【入门小站】,解锁更多知识点

【Java】Java高并发之CountDownLatch源码分析

java多线程

阅读 39更新于 今天 15:11

本作品系原创,采用《署名-非商业性使用-禁止演绎 4.0 国际》许可协议

avatar

入门小站

rumenz.com

47 声望

3 粉丝

0 条评论

得票时间

avatar

入门小站

rumenz.com

47 声望

3 粉丝

宣传栏

概述

【Java】Java高并发之CountDownLatch源码分析

使用方法

public static void main(String[] args) {

Test countDownLatchTest=new Test();

countDownLatchTest.runThread();

}

//计数器为10,代表有10个线程等待创建

CountDownLatch countDownLatch=new CountDownLatch(10);

/**

* 创建一个线程

* @return

*/

private Thread createThread(int i){

Thread thread=new Thread(new Runnable() {

@Override

public void run() {

try {

//在此等待,直到计数器变为0

countDownLatch.await();

System.out.println("thread"+Thread.currentThread().getName()+"准备完毕"+System.currentTimeMillis());

}catch (InterruptedException e){

e.printStackTrace();

}

}

});

thread.setName("thread-"+i);

return thread;

}

public void runThread(){

ExecutorService executorService= Executors.newFixedThreadPool(10);

try {

for(int i=0;i<10;i++){

Thread.sleep(100);

executorService.submit(createThread(i));

//一个线程创建好了,待创建的线程数减一

countDownLatch.countDown();

}

}catch (InterruptedException e){

e.printStackTrace();

}

}

源码分析

【Java】Java高并发之CountDownLatch源码分析

从锁的分类上来讲,CountDownLatch 其实是一个” 共享锁 “。还有一个需要注意的是 CountDownLath 是响应中断的,如果线程在对锁进行操作的期间发生了中断,会直接抛出 InterruptedException。

源码分析

计数器的本质是什么?

public CountDownLatch(int count) {

if (count < 0) throw new IllegalArgumentException("count < 0");

this.sync = new Sync(count);

}

Sync(int count) {

setState(count); //就是修改了AQS中的state值

}

await 方法

  1. await 方法实际上是调用了 sync 的一个方法

public void await() throws InterruptedException {

sync.acquireSharedInterruptibly(1);

}

  1. sync 的void acquireSharedInterruptibly(int arg)的实现如下

public final void acquireSharedInterruptibly(int arg)

throws InterruptedException {

if (Thread.interrupted())

//如果线程中断了,则抛异常。

//证明了之前所说的CountDownLatch是会响应中断的

throw new InterruptedException();

if (tryAcquireShared(arg) < 0)

doAcquireSharedInterruptibly(arg);

}

  1. 如果没有中断,就会调用tryAcquireShared(arg)
    它的实现非常的简单,如果 state 为 0,就返回 1,否则返回 - 1

protected int tryAcquireShared(int acquires) {

return (getState() == 0) ? 1 : -1;

}

  1. 如果 state 不为 0,就会返回 - 1,if 条件成立,就会调用doAcquireSharedInterruptibly(arg)
    这个方法的实现,稍微复杂一点,但这个方法也不陌生了,它的功能就是把该线程加入等待队列中并阻塞,但是在入队之后,不一定会立即 park 阻塞,它会判断自己是否是第二个节点,如果是就会再次尝试获取。

private void doAcquireSharedInterruptibly(int arg)

throws InterruptedException {

final Node node = addWaiter(Node.SHARED);

boolean failed = true;

try {

for (;;) {

final Node p = node.predecessor(); //获取当前节点的前驱节点

if (p == head) {//前一个节点是头节点

int r = tryAcquireShared(arg); //去看一看state是否为0,步骤3分析过

if (r >= 0) {

//如果state目前为0,就出队

setHeadAndPropagate(node, r);

p.next = null; // help GC

failed = false;

return;

}

}

if (shouldParkAfterFailedAcquire(p, node) &&

parkAndCheckInterrupt())

//进入阻塞队列阻塞,如果发生中断,则抛异常

throw new InterruptedException();

}

} finally {

if (failed)

cancelAcquire(node);

}

countDown 方法

  1. countDown 方法实际上是调用 sync 中的一个方法

public void countDown() {

sync.releaseShared(1);

}

  1. boolean releaseShared(int arg)的具体实现如下:

public final boolean releaseShared(int arg) {

if (tryReleaseShared(arg)) {

doReleaseShared();

return true;

}

return false;

}

  1. tryReleaseShared(arg)方法的具体实现如下:

protected boolean tryReleaseShared(int releases) {

// Decrement count; signal when transition to zero

for (;;) {//自旋

int c = getState();

if (c == 0)//计数器已经都是0了,当然会释放失败咯

return false;

int nextc = c-1;//释放后,计数器减一

if (compareAndSetState(c, nextc))//CAS修改计数器

return nextc == 0;

}

}

  1. doReleaseShared();的实现如下,它的作用就是state为0的时候,去唤醒等待队列中的线程。

private void doReleaseShared() {

/*

* 如果head需要通知下一个节点,调用unparkSuccessor

* 如果不需要通知,需要在释放后把waitStatus改为PROPAGATE来继续传播

* 此外,我们必须通过自旋来CAS以防止操作时有新节点加入

* 另外,不同于其他unparkSuccessor的用途,我们需要知道CAS设置状态失败的情况,

* 以便进行重新检查。

*/

for (;;) {

//唤醒操作由头结点开始,注意这里的头节点已经是上面新设置的头结点了

//其实就是唤醒上面新获取到共享锁的节点的后继节点

Node h = head;

if (h != null && h != tail) {

int ws = h.waitStatus;

//表示后继节点需要被唤醒

if (ws == Node.SIGNAL) {

//这里需要控制并发,因为入口有setHeadAndPropagate跟release两个,避免两次unpark

if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))

continue;

//执行唤醒操作

unparkSuccessor(h);

}

//如果后继节点暂时不需要唤醒,则把当前节点状态设置为PROPAGATE确保以后可以传递下去

else if (ws == 0 &&

!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))

continue;

}

//如果头结点没有发生变化,表示设置完成,退出循环

//如果头结点发生变化,比如说其他线程获取到了锁,为了使自己的唤醒动作可以传递,必须进行重试

if (h == head)

break;

}

}

关注微信公众号:【入门小站】,解锁更多知识点

【Java】Java高并发之CountDownLatch源码分析

以上是 【Java】Java高并发之CountDownLatch源码分析 的全部内容, 来源链接: utcz.com/a/110887.html

回到顶部