多线程进阶——JUC并发编程之CyclicBarrier源码一探究竟

编程

百度翻译大概意思就是:

一种同步辅助程序,允许一组线程相互等待到达一个公共的屏障点。CyclicBarrier在涉及固定大小的线程方的程序中非常有用,这些线程方有时必须相互等待。这个屏障被称为循环屏障,因为它可以在等待的线程被释放后重新使用。

CyclicBarrier支持可选的Runnable命令,该命令在参与方中的最后一个线程到达后,但在释放任何线程之前,每个屏障点运行一次。此屏障操作有助于在任何参与方继续之前更新共享状态。

动图演示:

在上文中我们分析完了 CountDownLatch源码,可以理解为减法计数器,是基于AQS的共享模式使用,而CyclicBarrier相比于CountDownLatch 来说,要简单很多,它类似于加法计数器,在源码中使用 ReentrantLock 和 Condition 的组合来使用。

2、案例演示 CyclicBarrier 

//加法计数器

public class CyclicBarrierDemo {

public static void main(String[] args) {

/**

* 集齐5名队员,开始游戏

*/

// 开始战斗的线程

CyclicBarrier cyclicBarrier = new CyclicBarrier(5,()->{

System.out.println("欢迎来到王者荣耀,敌军还有五秒到达战场!全军出击!");

});

for (int i = 1; i <=5 ; i++) {

final int temp = i;

// lambda能操作到 i 吗

new Thread(()->{

System.out.println(Thread.currentThread().getName()+"第"+temp+"个进入游戏!");

try {

cyclicBarrier.await(); // 等待

} catch (InterruptedException e) {

e.printStackTrace();

} catch (BrokenBarrierException e) {

e.printStackTrace();

}

}).start();

}

}

}

3、入手构造器

//构造器1

/** 创建一个新的CyclicBarrier,它将在给定数量的参与方(线程)等待时触发,并在触发屏障时执行给定的屏障操作,由最后一个进入屏障的线程执行 */

public CyclicBarrier(int parties, Runnable barrierAction) {

if (parties <= 0) throw new IllegalArgumentException();

this.parties = parties;

this.count = parties;

this.barrierCommand = barrierAction;

}

//构造器2

/** 创建一个新的CyclicBarrier,当给定数量的参与方(线程)在等待它时,它将跳闸,并且在屏障跳闸时不执行预定义的操作 */

public CyclicBarrier(int parties) {

this(parties, null);

}

其中构造器1为核心构造器,在这里你可以指定 parties 本局游戏的参与者的数量(要拦截的线程数)以及 barrierAction 本局游戏结束时要执行的任务。

3.1、入手成员变量

   /** 同步操作锁 */

private final ReentrantLock lock = new ReentrantLock();

/** 线程拦截器 */

private final Condition trip = lock.newCondition();

/** 每次拦截的线程数 */

private final int parties;

/* 换代前执行的任务 */

private final Runnable barrierCommand;

/** 表示栅栏的当前代 类似代表本局游戏*/

private Generation generation = new Generation();

/** 计数器 */

private int count;

/** 静态内部类Generation */

private static class Generation {

boolean broken = false;

}

3.2、入手核心方法

下面分析这两个方法,分别为【非定时等待】和【定时等待】!

//非定时等待

public int await() throws InterruptedException, BrokenBarrierException {

try {

return dowait(false, 0L);

} catch (TimeoutException toe) {

throw new Error(toe); // cannot happen

}

}

//定时等待

public int await(long timeout, TimeUnit unit)throws InterruptedException,

BrokenBarrierException,

TimeoutException {

return dowait(true, unit.toNanos(timeout));

}

可以看到,最终两个方法都走【dowait】 方法,只不过参数不同。下面我们重点看看这个方法到底做了哪些事情。

//核心等待方法

private int dowait(boolean timed, long nanos)

throws InterruptedException, BrokenBarrierException,

TimeoutException {

final ReentrantLock lock = this.lock;

lock.lock();//加锁操作

try {

final Generation g = generation;

//检查当前栅栏是否被打翻

if (g.broken)

throw new BrokenBarrierException();

//检查当前线程是否被中断

if (Thread.interrupted()) {

breakBarrier();

throw new InterruptedException();

}

//每次都将计数器的值-1

int index = --count;

//计数器的值减为0,则需要唤醒所有线程并转换到下一代

if (index == 0) { // tripped

boolean ranAction = false;

try {

//唤醒所有线程前先执行指定的任务

final Runnable command = barrierCommand;

if (command != null)

command.run();

ranAction = true;

//唤醒所有线程并转换到下一代

nextGeneration();

return 0;

} finally {

//确保在任务未成功执行时能将所有线程唤醒

if (!ranAction)

breakBarrier();

}

}

//如果计数器不为0 则执行此循环

// loop until tripped, broken, interrupted, or timed out

for (;;) {

try {

//根据传入的参数来觉得是定时等待还是非定时等待

if (!timed)

trip.await();

else if (nanos > 0L)

nanos = trip.awaitNanos(nanos);

} catch (InterruptedException ie) {

//若当前线程在等待期间被中断则打翻栅栏唤醒其它线程

if (g == generation && ! g.broken) {

breakBarrier();

throw ie;

} else {

// 若在捕获中断异常前已经完成在栅栏上的等待,则直接调用中断操作

Thread.currentThread().interrupt();

}

}

//如果线程因为打翻栅栏操作而被唤醒则抛出异常

if (g.broken)

throw new BrokenBarrierException();

//如果线程因为换代操作而被唤醒则返回计数器的值

if (g != generation)

return index;

//如果线程因为时间到了而被唤醒则打翻栅栏并抛出异常

if (timed && nanos <= 0L) {

breakBarrier();

throw new TimeoutException();

}

}

} finally {

lock.unlock();//最终解锁

}

}

分两步分析,首先计数器的值减为0的情况,和计数器不为0的情况,首先第一种情况下:

第二种情况,计数器不为0,则进入自旋for(;;):

最后,我们来看看怎么重置一个栅栏:

将屏障重置为初始状态。如果任何一方目前在隔离墙等候,他们将带着BrokenBarrierException返回。

请注意,由于其他原因发生中断后的重置可能很复杂;线程需要以其他方式重新同步,并选择一种方式执行重置。

最好是创建一个新的屏障供以后使用

    public void reset() {

final ReentrantLock lock = this.lock;

lock.lock();

try {

breakBarrier(); // break the current generation

nextGeneration(); // start a new generation

} finally {

lock.unlock();

}

}

测试reset代码:

首先,打破栅栏,那意味着所有等待的线程(5个等待的线程)会唤醒,【await 】方法会通过抛出【BrokenBarrierException】异常返回。然后开启新一代,重置了 count 和 generation,相当于一切归0了。

4、CyclicBarrier 与 CountDownLatch 的区别

相同点:

1、都可以实现一组线程在到达某个条件之前进行等待

2、它们内部都有一个计数器,当计数器的值不断减为0的时候,所有阻塞的线程都会被唤醒!

不同点:

1、CyclicBarrier 的计数器是由它自己来控制,而CountDownLatch 的计数器则是由使用则来控制

2、在CyclicBarrier 中线程调用 await方法不仅会将自己阻塞,还会将计数器减1,而在CountDownLatch中线程调用 await方法只是将自己阻塞而不会减少计数器的值。

3、另外,CountDownLatch 只能拦截一轮,而CyclicBarrier 可以实现循环拦截。一般来说CyclicBarrier 可以实现 CountDownLatch的功能,而反之不能。

 

以上是 多线程进阶——JUC并发编程之CyclicBarrier源码一探究竟 的全部内容, 来源链接: utcz.com/z/513887.html

回到顶部