【Java】Java同步组件之CountDownLatch,Semaphore
Java同步组件概况
CountDownLatch : 是闭锁,通过一个计数来保证线程是否一直阻塞
Semaphore: 控制同一时间,并发线程数量
- CyclicBarrier:字面意思是回环栅栏,通过它可以实现让一组线程等待至某个状态之后再全部同时执行。
- ReentrantLock:是一个重入锁,一个线程获得了锁之后仍然可以反复加锁,不会出现自己阻塞自己的情况。
- Condition:配合
ReentrantLock
,实现等待/通知模型 - FutureTask:FutureTask实现了接口Future,同Future一样,代表异步计算的结果。
CountDownLatch 同步辅助类
CountDownLatch代码案例
package com.rumenz.task;import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CountDownLatchTest {
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
CountDownLatch countDownLatch=new CountDownLatch(2);
executorService.execute(()->{
try{
Thread.sleep(3000);
System.out.println("任务一完成");
}catch (Exception e){
e.printStackTrace();
}
countDownLatch.countDown();
});
executorService.execute(()->{
try{
Thread.sleep(5000);
System.out.println("任务二完成");
}catch (Exception e){
e.printStackTrace();
}
countDownLatch.countDown();
});
countDownLatch.await();
//所有子任务执行完后才会执行
System.out.println("主线程开始工作.....");
executorService.shutdown();
}
}
任务一完成
任务二完成
主线程开始工作.....
CountDownlatch指定时间完成任务,如果在规定时间内完成,则等待之前的等待线程(countDownLatch.await()
)继续执行
package com.rumenz.task;import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class CountDownLatchTest {
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
CountDownLatch countDownLatch=new CountDownLatch(2);
executorService.execute(()->{
try{
Thread.sleep(3000);
System.out.println("任务一完成");
}catch (Exception e){
e.printStackTrace();
}
countDownLatch.countDown();
});
executorService.execute(()->{
try{
Thread.sleep(5000);
System.out.println("任务二完成");
}catch (Exception e){
e.printStackTrace();
}
countDownLatch.countDown();
});
//这里只等3秒
countDownLatch.await(3, TimeUnit.SECONDS);
//所有子任务执行完后才会执行
System.out.println("主线程开始工作.....");
executorService.shutdown();
}
}
//任务一完成
//主线程开始工作.....
//任务二完成
Semaphore
控制线程数量
Semaohore
应用场景
package com.rumenz.task;import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
public class SemaphoreExample1 {
private static Integer clientTotal=30;
private static Integer threadTotal=3;
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
Semaphore semaphore=new Semaphore(threadTotal);
for (int i = 0; i < clientTotal; i++) {
final Integer j=i;
executorService.execute(()->{
try{
semaphore.acquire(); // 获取一个许可
update(j);
semaphore.release(); // 释放一个许可
}catch (Exception e) {
e.printStackTrace();
}
});
}
executorService.shutdown();
}
private static void update(Integer j) throws Exception {
System.out.println(j);
Thread.sleep(2000);
}
}
package com.rumenz.task;import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
public class SemaphoreExample1 {
private static Integer clientTotal=30;
private static Integer threadTotal=3;
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
Semaphore semaphore=new Semaphore(threadTotal);
for (int i = 0; i < clientTotal; i++) {
final Integer j=i;
executorService.execute(()->{
try{
semaphore.acquire(3); // 获取多个许可
update(j);
semaphore.release(3); // 释放多个许可
}catch (Exception e) {
e.printStackTrace();
}
});
}
executorService.shutdown();
}
private static void update(Integer j) throws Exception {
System.out.println(j);
Thread.sleep(2000);
}
}
tryAcquire
- tryAcquire() : boolean
- tryAcquire(int permits) : boolean 尝试获取指定数量的许可
- tryAcquire(int permits,long timeout,TimeUnit timeUnit) : boolean
- tryAcquire(long timeout,TimeUnit timeUnit) : boolean 尝试获取许可的时候可以等待一段时间,在指定时间内未获取到许可则放弃
Semaphore
源码分析
// 非公平模式public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
// fair=true为公平模式,false=非公平模式
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
public class Semaphore implements java.io.Serializable {/*
* 只指定许可量,构造不公平模式
*/
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
/*
* 指定许可量,并指定模式
*/
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
//Semaphore内部基于AQS的共享模式,所以实现都委托给了Sync类。
abstract static class Sync extends AbstractQueuedSynchronizer {}
/**
* NonFair version
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
NonfairSync(int permits) {
// 可以看到调用了setState方法,也就是说AQS中的资源就是许可证的数量。
super(permits);
}
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
/**
* Fair version
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;
FairSync(int permits) {
// 可以看到调用了setState方法,也就是说AQS中的资源就是许可证的数量。
super(permits);
}
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
}
关注微信公众号:【入门小站】,解锁更多知识点
以上是 【Java】Java同步组件之CountDownLatch,Semaphore 的全部内容, 来源链接: utcz.com/a/99829.html