【Java】Java同步组件之CountDownLatch,Semaphore

Java同步组件概况

  • CountDownLatch : 是闭锁,通过一个计数来保证线程是否一直阻塞

  • Semaphore: 控制同一时间,并发线程数量

  • CyclicBarrier:字面意思是回环栅栏,通过它可以实现让一组线程等待至某个状态之后再全部同时执行。
  • ReentrantLock:是一个重入锁,一个线程获得了锁之后仍然可以反复加锁,不会出现自己阻塞自己的情况。
  • Condition:配合ReentrantLock,实现等待/通知模型
  • FutureTask:FutureTask实现了接口Future,同Future一样,代表异步计算的结果。

CountDownLatch 同步辅助类

【Java】Java同步组件之CountDownLatch,Semaphore

CountDownLatch代码案例

package com.rumenz.task;

import java.util.concurrent.CountDownLatch;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

public class CountDownLatchTest {

public static void main(String[] args) throws Exception {

ExecutorService executorService = Executors.newCachedThreadPool();

CountDownLatch countDownLatch=new CountDownLatch(2);

executorService.execute(()->{

try{

Thread.sleep(3000);

System.out.println("任务一完成");

}catch (Exception e){

e.printStackTrace();

}

countDownLatch.countDown();

});

executorService.execute(()->{

try{

Thread.sleep(5000);

System.out.println("任务二完成");

}catch (Exception e){

e.printStackTrace();

}

countDownLatch.countDown();

});

countDownLatch.await();

//所有子任务执行完后才会执行

System.out.println("主线程开始工作.....");

executorService.shutdown();

}

}

任务一完成

任务二完成

主线程开始工作.....

CountDownlatch指定时间完成任务,如果在规定时间内完成,则等待之前的等待线程(countDownLatch.await())继续执行

package com.rumenz.task;

import java.util.concurrent.CountDownLatch;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import java.util.concurrent.TimeUnit;

public class CountDownLatchTest {

public static void main(String[] args) throws Exception {

ExecutorService executorService = Executors.newCachedThreadPool();

CountDownLatch countDownLatch=new CountDownLatch(2);

executorService.execute(()->{

try{

Thread.sleep(3000);

System.out.println("任务一完成");

}catch (Exception e){

e.printStackTrace();

}

countDownLatch.countDown();

});

executorService.execute(()->{

try{

Thread.sleep(5000);

System.out.println("任务二完成");

}catch (Exception e){

e.printStackTrace();

}

countDownLatch.countDown();

});

//这里只等3秒

countDownLatch.await(3, TimeUnit.SECONDS);

//所有子任务执行完后才会执行

System.out.println("主线程开始工作.....");

executorService.shutdown();

}

}

//任务一完成

//主线程开始工作.....

//任务二完成

Semaphore控制线程数量

【Java】Java同步组件之CountDownLatch,Semaphore

Semaohore应用场景

package com.rumenz.task;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import java.util.concurrent.Semaphore;

public class SemaphoreExample1 {

private static Integer clientTotal=30;

private static Integer threadTotal=3;

public static void main(String[] args) throws Exception {

ExecutorService executorService = Executors.newCachedThreadPool();

Semaphore semaphore=new Semaphore(threadTotal);

for (int i = 0; i < clientTotal; i++) {

final Integer j=i;

executorService.execute(()->{

try{

semaphore.acquire(); // 获取一个许可

update(j);

semaphore.release(); // 释放一个许可

}catch (Exception e) {

e.printStackTrace();

}

});

}

executorService.shutdown();

}

private static void update(Integer j) throws Exception {

System.out.println(j);

Thread.sleep(2000);

}

}

package com.rumenz.task;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import java.util.concurrent.Semaphore;

public class SemaphoreExample1 {

private static Integer clientTotal=30;

private static Integer threadTotal=3;

public static void main(String[] args) throws Exception {

ExecutorService executorService = Executors.newCachedThreadPool();

Semaphore semaphore=new Semaphore(threadTotal);

for (int i = 0; i < clientTotal; i++) {

final Integer j=i;

executorService.execute(()->{

try{

semaphore.acquire(3); // 获取多个许可

update(j);

semaphore.release(3); // 释放多个许可

}catch (Exception e) {

e.printStackTrace();

}

});

}

executorService.shutdown();

}

private static void update(Integer j) throws Exception {

System.out.println(j);

Thread.sleep(2000);

}

}

tryAcquire

  • tryAcquire() : boolean
  • tryAcquire(int permits) : boolean 尝试获取指定数量的许可
  • tryAcquire(int permits,long timeout,TimeUnit timeUnit) : boolean
  • tryAcquire(long timeout,TimeUnit timeUnit) : boolean 尝试获取许可的时候可以等待一段时间,在指定时间内未获取到许可则放弃

Semaphore源码分析

// 非公平模式

public Semaphore(int permits) {

sync = new NonfairSync(permits);

}

// fair=true为公平模式,false=非公平模式

public Semaphore(int permits, boolean fair) {

sync = fair ? new FairSync(permits) : new NonfairSync(permits);

}


public class Semaphore implements java.io.Serializable {

/*

* 只指定许可量,构造不公平模式

*/

public Semaphore(int permits) {

sync = new NonfairSync(permits);

}

/*

* 指定许可量,并指定模式

*/

public Semaphore(int permits, boolean fair) {

sync = fair ? new FairSync(permits) : new NonfairSync(permits);

}

//Semaphore内部基于AQS的共享模式,所以实现都委托给了Sync类。

abstract static class Sync extends AbstractQueuedSynchronizer {}

/**

* NonFair version

*/

static final class NonfairSync extends Sync {

private static final long serialVersionUID = -2694183684443567898L;

NonfairSync(int permits) {

// 可以看到调用了setState方法,也就是说AQS中的资源就是许可证的数量。

super(permits);

}

protected int tryAcquireShared(int acquires) {

return nonfairTryAcquireShared(acquires);

}

}

/**

* Fair version

*/

static final class FairSync extends Sync {

private static final long serialVersionUID = 2014338818796000944L;

FairSync(int permits) {

// 可以看到调用了setState方法,也就是说AQS中的资源就是许可证的数量。

super(permits);

}

protected int tryAcquireShared(int acquires) {

for (;;) {

if (hasQueuedPredecessors())

return -1;

int available = getState();

int remaining = available - acquires;

if (remaining < 0 ||

compareAndSetState(available, remaining))

return remaining;

}

}

}

}

关注微信公众号:【入门小站】,解锁更多知识点

【Java】Java同步组件之CountDownLatch,Semaphore

以上是 【Java】Java同步组件之CountDownLatch,Semaphore 的全部内容, 来源链接: utcz.com/a/99829.html

回到顶部