java并发编程CountDownLatch和CyclicBarrier在内部实现和场景上的区别

编程

  内部实现差异

  前者更多依赖经典的AQS机制和CAS机制来控制器内部状态的更迭和计数器本身的变化,而后者更多依靠可重入Lock等机制来控制其内部并发安全性和一致性。

  public class {

  //Synchronization control For CountDownLatch.

  //Uses AQS state to represent count.

  private static final class Sync extends AbstractQueuedSynchronizer {

  private static final long serialVersionUID = 4982264981922014374L;

  Sync(int count) {

  setState(count);

  }

  int getCount() {

  return getState();

  }

  protected int tryAcquireShared(int acquires) {

  return (getState() == 0) ? 1 : -1;

  }

  protected boolean tryReleaseShared(int releases) {

  // Decrement count; signal when transition to zero

  for (;;) {

  int c = getState();

  if (c == 0)

  return false;

  int nextc = c-1;

  if (compareAndSetState(c, nextc))

  return nextc == 0;

  }

  }

  }

  private final Sync sync;

  ... ...//

  }

  public class CyclicBarrier {

  /**

  * Each use of the barrier is represented as a generation instance.

  * The generation changes whenever the barrier is tripped, or

  * is reset. There can be many generations associated with threads

  * using the barrier - due to the non-deterministic way the lock

  * may be allocated to waiting threads - but only one of these

  * can be active at a time (the one to which {@code count} applies)

  * and all the rest are either broken or tripped.

  * There need not be an active generation if there has been a break

  * but no subsequent reset.

  */

  private static class Generation {

  boolean broken = false;

  }

  /** The lock for guarding barrier entry */

  private final ReentrantLock lock = new ReentrantLock();

  /** Condition to wait on until tripped */

  private final Condition trip = lock.newCondition();

  /** The number of parties */

  private final int parties;

  /* The command to run when tripped */

  private final Runnable barrierCommand;

  /** The current generation */

  private Generation generation = new Generation();

  /**

  * Number of parties still waiting. Counts down from parties to 0

  * on each generation. It is reset to parties on each new

  * generation or when broken.

  */

  private int count;

  /**

  * Updates state on barrier trip and wakes up everyone.

  * Called only while holding lock.

  */

  private void nextGeneration() {

  // signal completion of last generation

  trip.signalAll();

  // set up next generation

  count = parties;

  generation = new Generation();

  }

  /**

  * Sets current barrier generation as broken and wakes up everyone.

  * Called only while holding lock.

  */

  private void breakBarrier() {

  generation.broken = true;

  count = parties;

  trip.signalAll();

  }

  /**

  * Main barrier code, covering the various policies.

  */

  private int dowait(boolean timed, long nanos)

  throws InterruptedException, BrokenBarrierException,

  TimeoutException {

  final ReentrantLock lock = this.lock;

  lock.lock();

  try {

  final Generation g = generation;

  if (g.broken)

  throw new BrokenBarrierException();

  if (Thread.interrupted()) {

  breakBarrier();

  throw new InterruptedException();

  }

  int index = --count;

  if (index == 0) { // tripped

  boolean ranAction = false;

  try {

  final Runnable command = barrierCommand;

  if (command != null)

  command.run();

  ranAction = true;

  nextGeneration();

  return 0;

  } finally {

  if (!ranAction)

  breakBarrier();

  }

  }

  // loop until tripped, broken, interrupted, or timed out

  for (;;) {

  try {

  if (!timed)

  trip.await();

  else if (nanos > 0L)

  nanos = trip.awaitNanos(nanos);

  } catch (InterruptedException ie) {

  if (g == generation && ! g.broken) {

  breakBarrier();

  throw ie;

  } else {

  // We"re about to finish waiting even if we had not

  // been interrupted, so this interrupt is deemed to

  // "belong" to subsequent execution.

  Thread.currentThread().interrupt();

  }

  }

  if (g.broken)

  throw new BrokenBarrierException();

  if (g != generation)

  return index;

  if (timed && nanos <= 0L) {

  breakBarrier();

  throw new TimeoutException();

  }

  }

  } finally {

  lock.unlock();

  }

  }

  ... ... //

  }

  实战 - 展示各自的使用场景

  /**

  *类说明:共5个初始化子线程,6个闭锁扣除点,扣除完毕后,主线程和业务线程才能继续执行

  */

  public class UseCountDownLatch {

  static CountDownLatch latch = new CountDownLatch(6);

  /*初始化线程*/

  private static class InitThread implements Runnable{

  public void run() {

  System.out.println("Thread_"+Thread.currentThread().getId()

  +" ready init work......");

  latch.countDown();

  for(int i =0;i<2;i++) {

  System.out.println("Thread_"+Thread.currentThread().getId()

  +" ........continue do its work");

  }

  }

  }

  /*业务线程等待latch的计数器为0完成*/

  private static class BusiThread implements Runnable{

  public void run() {

  try {

  latch.await();

  } catch (InterruptedException e) {

  e.printStackTrace();

  }

  for(int i =0;i<3;i++) {

  System.out.println("BusiThread_"+Thread.currentThread().getId()

  +" do business-----");

  }

  }

  }

  public static void main(String[] args) throws InterruptedException {

  new Thread(new Runnable() {

  public void run() {

  SleepTools.ms(1);

  System.out.println("Thread_"+Thread.currentThread().getId()

  +" ready init work step 1st......");

  latch.countDown();

  System.out.println("begin step 2nd.......");

  SleepTools.ms(1);

  System.out.println("Thread_"+Thread.currentThread().getId()

  +" ready init work step 2nd......");

  latch.countDown();

  }

  }).start();

  new Thread(new BusiThread()).start();

  for(int i=0;i<=3;i++){

  Thread thread = new Thread(new InitThread());

  thread.start();

  }

  latch.await();

  System.out.println("Main do ites work........");

  }

  }

  /**

  *类说明:共4个子线程,他们全部完成工作后,交出自己结果,

  *再被统一释放去做自己的事情,而交出的结果被另外的线程拿来拼接字符串

  */

  class UseCyclicBarrier {

  private static CyclicBarrier barrier

  = new CyclicBarrier(4,new CollectThread());

  //存放子线程工作结果的容器

  private static ConcurrentHashMap resultMap

  = new ConcurrentHashMap();

  public static void main(String[] args) {

  for(int i=0;i<4;i++){

  Thread thread = new Thread(new SubThread());

  thread.start();

  }

  }

  /*汇总的任务*/

  private static class CollectThread implements Runnable{

  @Override

  public void run() {

  StringBuilder result = new StringBuilder();

  for(Map.Entry workResult:resultMap.entrySet()){

  result.append("["+workResult.getValue()+"]");

  }

  System.out.println(" the result = "+ result);

  System.out.println("do other business........");

  }

  }

  /*相互等待的子线程*/

  private static class SubThread implements Runnable{

  @Override

  public void run() {

  long id = Thread.currentThread().getId();

  resultMap.put(Thread.currentThread().getId()+"",id);

  try {

  Thread.sleep(1000+id);

  System.out.println("Thread_"+id+" ....do something ");

  barrier.await();

  Thread.sleep(1000+id);

  System.out.println("Thread_"+id+" ....do its business ");

  barrier.await();

  } catch (Exception e) {

  e.printStackTrace();

  }   郑州人工授精医院哪家好:http://yyk.39.net/hospital/fc964_detail.html/郑州做人工授精多少钱:http://yyk.39.net/hospital/fc964_detail.html/郑州看人工授精哪里好:http://yyk.39.net/hospital/fc964_detail.html/

 

  }

  }

  }

  两者总结

  1. Cyclicbarrier结果汇总的Runable线程可以重复被执行,通过多次触发await()方法,countdownlatch可以调用await()方法多次;cyclicbarrier若没有结果汇总,则调用一次await()就够了;

  2. New cyclicbarrier(threadCount)的线程数必须与实际的用户线程数一致;

  3. 协调线程同时运行:countDownLatch协调工作线程执行,是由外面线程协调;cyclicbarrier是由工作线程之间相互协调运行;

  4. 从构造函数上看出:countDownlatch控制运行的计数器数量和线程数没有关系;cyclicbarrier构造中传入的线程数等于实际执行线程数;

  5. countDownLatch在不能基于执行子线程的运行结果做处理,而cyclicbarrier可以;

  6. 就使用场景而言,countdownlatch 更适用于框架加载前的一系列初始化工作等场景; cyclicbarrier更适用于需要多个用户线程执行后,将运行结果汇总再计算等典型场景;

以上是 java并发编程CountDownLatch和CyclicBarrier在内部实现和场景上的区别 的全部内容, 来源链接: utcz.com/z/516577.html

回到顶部