【Java】Java同步组件之CyclicBarrier,ReentrantLock
[TOC]
Java同步组件概况
- CountDownLatch : 是闭锁,通过一个计数来保证线程是否一直阻塞
- Semaphore: 控制同一时间,并发线程数量
CyclicBarrier:字面意思是回环栅栏,通过它可以实现让一组线程等待至某个状态之后再全部同时执行。
ReentrantLock:是一个重入锁,一个线程获得了锁之后仍然可以反复加锁,不会出现自己阻塞自己的情况。
- Condition:配合
ReentrantLock
,实现等待/通知模型 - FutureTask:FutureTask实现了接口Future,同Future一样,代表异步计算的结果。
CyclicBarrier
介绍
与CountDownLatch
比较
相同点
- 都是同步辅助类
- 使用计数器实现
不同点
CyclicBarrier
允许一个或多个线程,等待其它一组线程完成操作,再继续执行。CyclicBarrier
允许一组线程之间相互等待,到达一个共同点,再继续执行。CountDownLatch
不能被复用。CyclicBarrier
适合更复杂的业务场景,如计算发生错误,通过重置计数器,并让线程重新执行。CyclicBarrier
还提供其它有用的方法,比如getNumberWaiting
方法可以获得CyclicBarrier
阻塞的线程数量,isBroken
方法用来知道阻塞的线程是否被中断。
CountDownLatch
和CyclicBarrier
的场景比较
代码演示
package com.rumenz.task.CyclicBarrier;import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CyclicBarrierExample {
public static void main(String[] args) {
CyclicBarrier cyclicBarrier=new CyclicBarrier(2,new Runnable(){
@Override
public void run() {
System.out.println("汇总计算----");
}
});
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(()->{
try{
System.out.println("计算昨天的数据");
Thread.sleep(5000);
cyclicBarrier.await();
}catch (Exception e){
e.printStackTrace();
}
});
executorService.execute(()->{
try{
System.out.println("计算今天的数据");
Thread.sleep(3000);
cyclicBarrier.await();
}catch (Exception e){
e.printStackTrace();
}
});
executorService.shutdown();
}
}
//计算昨天的数据
//计算今天的数据
//汇总计算----
ReentrantLock
可重入锁
ReentrantLock
(可重入锁)和synchronized
的区别
ReentrantLock
与synchronized
的功能区别
synchronized
更加便利,它由编译器保证加锁与释放。ReentrantLock
需要手动声明和释放锁,所以为了避免忘记手动释放锁造成死锁,所以最好在finally
中声明释放锁。ReentrantLock
的锁粒度更细更灵活。
ReentrantLock
特有功能
ReentrantLock
可以指定为公平或者非公平,synchronized
是能是非公平锁。(公平锁的意思就是先等待的锁先获得锁)- 提供一个
Condition
类,它可以分组唤醒需要唤醒的线程。不像synchronized
要么随机唤醒一个,要么全部唤醒。 - 提供能够中断等待锁的线程的机制,通过lock.lockInterruptibly()实现,这种机制ReentrantLock是一种自选锁,通过循环调用CAS操作来实现加锁。性能比较好的原因是避免了进入内核态的阻塞状态。想进办法避免线程进入内核阻塞状态, 是我们分析和理解锁设计的关键
synchronized
的使用场景
- J.U.C包中的锁定类是用于高级情况和高级用户的工具,除非说你对Lock的高级特性有特别清楚的了解以及有明确的需要,或这有明确的证据表明同步已经成为可伸缩性的瓶颈的时候,否则我们还是继续使用synchronized
- 相比较这些高级的锁定类,synchronized还是有一些优势的,比如synchronized不可能忘记释放锁。 在退出synchronized块时,JVM会自动释放锁,会很容易忘记要使用finally释放锁,这对程序非常有害。
- 还有当JVM使用synchronized管理锁定请求和释放时,JVM在生成线程转储时能够包括锁定信息,这些信息对调试非常有价值,它们可以标识死锁以及其他异常行为的来源。 而Lock类知识普通的类,JVM不知道哪个线程具有Lock对象,而且几乎每个开发人员都是比较熟悉synchronized
代码演示
package com.rumenz.task;import com.google.common.net.InternetDomainName;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ReentrantLockExample {
private static Integer clientTotal=5000;
private static Integer threadTotal=200;
public static Integer count=0;
// 声明锁的实例,调用构造方法,默认生成一个不公平的锁
private final static Lock lock=new ReentrantLock();
public static void main(String[] args) throws Exception{
ExecutorService executorService = Executors.newCachedThreadPool();
final Semaphore semaphore=new Semaphore(threadTotal);
final CountDownLatch countDownLatch=new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal; i++) {
executorService.execute(()->{
try{
semaphore.acquire();
update();
semaphore.release();
}catch (Exception e){
e.printStackTrace();
}
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
System.out.println("count:"+count);
}
private static void update() {
lock.lock();
try{
count++;
}finally {
lock.unlock();
}
}
}
//count:5000
ReentrantLock
常用方法
tryLock():仅在调用时锁定未被另一个线程保持的情况下才获取锁定。tryLock(long timeout, TimeUnit unit):如果锁定在给定的时间内没有被另一个线程保持且当前线程没有被中断,则获取这个锁定。
lockInterruptbily():如果当前线程没有被中断的话,那么就获取锁定。如果中断了就抛出异常。
isLocked():查询此锁定是否由任意线程保持
isHeldByCurrentThread:查询当前线程是否保持锁定状态。
isFair:判断是不是公平锁
ReentrantReadWriteLock
读写锁
public class ReentrantReadWriteLockimplements ReadWriteLock, java.io.Serializable {
/** 内部类提供的读锁 */
private final ReentrantReadWriteLock.ReadLock readerLock;
/** 内部类提供的读锁 */
private final ReentrantReadWriteLock.WriteLock writerLock;
}
ReentrantReadWriteLock
代码演示
package com.rumenz.task.reentrant;import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class ReentrantLockExample {
private static Integer clientTotal=5000;
private static Integer threadTotal=200;
private static LockMap map=new LockMap();
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
final CountDownLatch countDownLatch=new CountDownLatch(clientTotal);
final Semaphore semaphore=new Semaphore(threadTotal);
for (int i = 0; i < 2500; i++) {
final Integer m=i;
executorService.execute(()->{
try{
semaphore.acquire();
map.put(m+"",m+"");
semaphore.release();
}catch (Exception e){
e.printStackTrace();
}
countDownLatch.countDown();
});
}
for (int j = 0; j< 2500; j++) {
final Integer n=j;
executorService.execute(()->{
executorService.execute(()->{
try{
semaphore.acquire();
String s = map.get(n + "");
System.out.println("===="+s);
semaphore.release();
}catch (Exception e){
e.printStackTrace();
}
countDownLatch.countDown();
});
});
}
countDownLatch.await();
executorService.shutdown();
}
}
//线程安全的一个Map
class LockMap {
private final Map<String,String> map=new TreeMap<>();
//声明读写锁
private final ReentrantReadWriteLock reentrantReadWriteLock=new ReentrantReadWriteLock();
//获得读写锁中的读锁
private final Lock rlock=reentrantReadWriteLock.readLock();
//获得读写锁中的写锁
private final Lock wlock=reentrantReadWriteLock.writeLock();
//读取数据
public String get(String key){
rlock.lock();
try{
return map.get(key);
}finally {
rlock.unlock();
}
}
//写入数据
public String put(String k,String v){
wlock.lock();
try{
return map.put(k,v);
}finally {
wlock.unlock();
}
}
//读取数据
public Set<String> getAllKeys(){
rlock.lock();
try {
return map.keySet();
}finally {
rlock.unlock();
}
}
}
StampedLock
介绍
StampedLock
源码中的一个案例
package com.rumenz.task.stampedLock;import java.util.concurrent.locks.StampedLock;
class Point {
private double x, y;
// 锁实例
private final StampedLock sl = new StampedLock();
// 排它锁-写锁(writeLock)
void move(double deltaX, double deltaY) {
long stamp = sl.writeLock();
try {
x += deltaX;
y += deltaY;
} finally {
sl.unlockWrite(stamp);
}
}
//乐观读锁(tryOptimisticRead)
double distanceFromOrigin() {
// 尝试获取乐观读锁(1)
long stamp = sl.tryOptimisticRead();
// 将全部变量拷贝到方法体栈内(2)将两个字段读入本地局部变量
double currentX = x, currentY = y;
// 检查在(1)获取到读锁票据后,锁有没被其他写线程排它性抢占(3)
if (!sl.validate(stamp)) {
// 如果被抢占则获取一个共享读锁(悲观获取)(4)
stamp = sl.readLock();
try {
currentX = x; // 将两个字段读入本地局部变量(5)
currentY = y; // 将两个字段读入本地局部变量(5)
} finally {
// 释放共享读锁(6)
sl.unlockRead(stamp);
}
}
// 返回计算结果(7)
return Math.sqrt(currentX * currentX + currentY * currentY);
}
// 使用悲观锁获取读锁,并尝试转换为写锁
void moveIfAtOrigin(double newX, double newY) {
// 这里可以使用乐观读锁替换(1)
long stamp = sl.readLock();
try {
// 如果当前点在原点则移动(2)
while (x == 0.0 && y == 0.0) {
// 尝试将获取的读锁升级为写锁(3)
long ws = sl.tryConvertToWriteLock(stamp);
// 升级成功,则更新票据,并设置坐标值,然后退出循环(4)
if (ws != 0L) { //这是确认转为写锁是否成功
stamp = ws; //如果成功 替换票据
x = newX; //进行状态改变
y = newY; //进行状态改变
break;
}
else {
// 读锁升级写锁失败则释放读锁,显示获取独占写锁,然后循环重试(5)
sl.unlockRead(stamp); //我们显式释放读锁
stamp = sl.writeLock(); //显式直接进行写锁 然后再通过循环再试
}
}
} finally {
sl.unlock(stamp); //释放读锁或写锁(6)
}
}
}
总结
- synchronized是在JVM层面上实现的,不但可以通过一些监控工具监控synchronized的锁定,而且在代码执行时出现异常,JVM会自动释放锁定;
- ReentrantLock、ReentrantReadWriteLock,、StampedLock都是对象层面的锁定,要保证锁定一定会被释放,就必须将unLock()放到finally{}中;
- StampedLock 对吞吐量有巨大的改进,特别是在读线程越来越多的场景下;
- StampedLock有一个复杂的API,对于加锁操作,很容易误用其他方法;
- 当只有少量竞争者的时候,synchronized是一个很好的通用的锁实现;
- 当线程增长能够预估,ReentrantLock是一个很好的通用的锁实现;
关注微信公众号:【入门小站】,解锁更多知识点
以上是 【Java】Java同步组件之CyclicBarrier,ReentrantLock 的全部内容, 来源链接: utcz.com/a/100523.html