JUC并发工具类详解

image-20200724075805195

1.为了并发安全:互斥同步、非互斥同步、无同步方案

2.管理线程、提高效率

3.线程协作

image-20200724075805195

问题1:线程池

为什么要使用线程池

问题一:反复创建线程开销大

问题二:过多的线程会占用太多内存

用少量的线程-避免内存占用过多

让这部分线程都保持工作,且可以反复执行任务-避免生命周期的损耗

  • 加快响应速度

  • 合理利用CPU和内存

  • 统一管理

线程池的创建与销毁

image-20200724075805195

corePoolSize指的是核心线程数:线程池在完成初始化后,默认情况下,线程池中并没有任何线程,线程池会等待有任务到来时,再创建新线程去执行任务

线程池有可能会在核心线程数的基础上,额外增加一些线程,但是这些新增加的线 程数有一个上限,这就是最大量maxPoolSize

image-20200724075805195

线程池应该手动创建还是自动创建

手动创建更好,因为这样可以让我们更加明确线程池的运行规则,避免资源耗尽的风险。另一方面,自动创建的线程池可能和业务不够契合。

自动创建:

image-20200724075805195

线程池里的线程数量设定为多少比较合适?

CPU密集型(加密、计算hash等):最佳线程数为CPU核心数的1-2倍左右。

耗时IO型(读写数据库、文件、网络读写等):最佳线程数-般会大于cpu核心数很多倍,以VM线程监控显示繁忙情况为依据,保证线程空闲可以衔接上,参考Brain Goetz推荐的计算方法:

线程数=CPU核心数*(1+平均等待时间/平均工作时间)

线程池的停止

shutdown 开始停止

isShutdown 是否开始停止

isTerminated 是否完全停止

waitTermination 等待一段时间,看是否停止

shutdownNow 立即停止,可以有返回值存放未完成的任务

线程池的任务拒绝

当Executor关闭时,提交新任务会被拒绝。

以及当Executor对最大线程和工作队列容量使用有限边界并且已经饱和时

  • ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。 (默认)

    这是线程池默认的拒绝策略,在任务不能再提交的时候,抛出异常,及时反馈程序运行状态。如果是比较关键的业务,推荐使用此拒绝策略,这样子在系统不能承载更大的并发量的时候,能够及时的通过异常发现。

  • ThreadPoolExecutor.DiscardPolicy:丢弃任务,但是不抛出异常。

    使用此策略,可能会使我们无法发现系统的异常状态。建议是一些无关紧要的业务采用此策略。

  • ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新提交被拒绝的任务

    此拒绝策略,是一种喜新厌旧的拒绝策略。是否要采用此种拒绝策略,还得根据实际业务是否允许丢弃老任务来认真衡量。

  • ThreadPoolExecutor.CallerRunsPolicy:由调用线程(提交任务的线程)处理该任务

    一般在不允许失败的、对性能要求不高、并发量较小的场景下使用,因为线程池一般情况下不会关闭,也就是提交的任务一定会被运行,但是由于是调用者线程自己执行的,当多次提交任务时,就会阻塞后续任务执行,性能和效率自然就慢了。

线程池的状态

image-20200724075805195

问题2:ThreadLocal

使用场景

典型场景1:每个线程需要一个独享的对象(通常是工具类,典型需要使用的类有SimpleDateFormat和Random)

典型场景2:每个线程内需要保存全局变量(例如在拦截器中获取用户信息),可以让不同方法直接使用,避免参数传递的麻烦

根据共享对象的生成时机不同,选择initialValue或set来保存对象

两个作用

让某个需要用到的对象在线程间隔离(每个线程都有自己的独立的对象)

在任何方法中都可以轻松获取到该对象--》相当于一个Map的作用

好处

达到线程安全

不需要加锁,提高执行效率

更高效地利用内存、节省开销

免去传参的繁琐

源码解读

image-20200724075805195

注意点

内存泄漏

ThreadLocalMap的每个Entry都是一个对key的弱引用,同时,每个Entry都包含了一个对value的强引用

image-20200724075805195

JDK已经考虑到了这个问题,所以在set,remove,rehash方法中会扫描key为null的Entry,并把对应的value设置为null,这样value对象就可以被回收,使用完ThreadLocal之后,应该调用remove方法

空指针异常

注意get方法前如果没有进行set或者初始化,get的为null,在自动装箱时或火车调用时会出现npe

不要强行使用

如果可以不使用ThreadLocal就解决问题,那么不要强行使用例如在任务数很少的时候,在局部变量中可以新建对象就可以解决问题,那么就不需要使用到ThreadLocal

问题3:锁

为啥需要Lock?synchronized不够用吗?

1)效率低:锁的释放情况少、试图获得锁时不能设定超时、不能中断一个正在试图获得锁的线程

2)不够灵活(读写锁更灵活):加锁和释放的时机单一,每个锁仅有单一的条件(某个对象),可能是不够的

3)无法知道是否成功获取到锁

Lock的四个方法

编程思想:Lock不会像synchronized一样在异常时自动释放锁,所以在finally释放

  • lock()就是最普通的获取锁。如果锁已被其他线程获取,则进行等待,lock()方法不能被中断,这会带来很大的隐患:一旦陷入死锁,lock()就会陷入永久等待。

  • tryLock()用来尝试获取锁,如果当前锁没有被其他线程占用,则获取成功,则返回true,否则返回false。tryLock(long time,TimeUnit unit):超时就放弃

  • lockInterruptibly():相当于tryLock(long time,TimeUnit unit)把超时时间设置为无限。在等待锁的过程中,线程可以被中断

  • unlock():解锁

锁的分类

image-20200724075805195

乐观锁--悲观锁

悲观锁--互斥同步锁

坏处

阻塞和唤醒带来的性能劣势

永久阻塞:如果持有锁的线程被永久阻塞,比如遇到了无很循环、死锁等活跃性问题,那么等待该线程释放锁的那几个悲催的线程,将永远也得不到执行

优先级反转

实例

Java中悲观锁的实现就是synchronized和Lock相关类,axure?

使用场景

悲观锁:适合并发写入多的情况,适用于临界区持锁时间比较长的情况,悲观锁可以避免大量的无用自旋等消耗,典型情况:

  • 1临界区有IO操作
  • 2临界区代码复杂或者循环量大
  • 3临界区竞争非常激烈

乐观锁--非互斥同步锁

乐观锁的实现一般都是利用CAS算法来实现的

乐观锁的典型例子就是原子类、并发容器等

Git就是乐观锁的典型例子,当我们往远端仓库push的时候,git会检查远端仓库的版本是不是领先于我们现在的版本,如果远程仓库的版本号和本地的不一样,就表示有其他人修改了远端代码了,我们的这次提交就失败;如果远端和本地版本号一致,我们就可以顺利提交版本到远端仓库

使用场景

适合并发写入少,大部分是读取的场景,不加锁的能让读取性能大幅提高。

可重入锁--非可重入锁

ReentrantLock,synchronized

getHoldCount():锁被拿到几次了

image-20200724075805195

isHeldByCurrentThread可以看出锁是否被当前线程持有

getQueuelLength可以返回当前正在等待这把锁的队列有多长,一般这两个方法是开发和调试时候使用,上线后用到的不多

在一定程度上预防了死锁

公平锁--非公平锁

定义与设计目的

公平指的是按照线程请求的顺序,来分配锁;

非公平指的是,不完全按照请求的顺序,在一定情况下,可以插队。

Java设计者这样设计的目的,是为了提高效率,避免唤醒带来的空档期

//非公平

private Lock queueLock = new ReentrantLock(false);

image-20200724075805195

关于tryLock的说明

tryLock()方法它不遵守设定的公平的规则,当有线程执行tryLock的时候,一旦有线程释放了锁,那么这个正在tryLock的线程就能获取到锁,即使在它之前已经有其他现在在等待队列里了

优劣对比

image-20200724075805195

源码分析

image-20200724075805195

共享锁--排它锁

定义

排他锁,又称为独占锁、独享锁

共享锁,又称为读锁,获得共享锁之后,可以查看但无法修改和删除数据,其他线程此时也可以获取到共享锁,也可以查看但无法修改和删除数据

共享锁和排它锁的典型是读写锁ReentrantReadWriteLock,其中读锁是共享锁,写锁是独享锁

读写锁的规则

要么多读,要么一写,但是两者不会同时出现

插队策略

公平锁

image-20200724075805195

非公平锁

写锁可以随时插队

读锁仅在等待队列头结点不是想获取写锁的线程的时候可以插队

image-20200724075805195

锁的升降级

支持锁的降级,不支持升级(原因分析,两个读锁都想升级,则会造成死锁。)

自旋锁--阻塞锁

定义

是指当一个线程在获取锁的时候,如果锁已经被其它线程获取,那么该线程将循环等待,然后不断的判断锁是否能够被成功获取,直到获取到锁才会退出循环。--自旋锁

阻塞锁和自旋锁相反,阻塞锁如果遇到没拿到锁的情况,会直接把线程阻塞,直到被唤醒

缺点

获取锁的线程一直处于活跃状态,但是并没有执行任何有效的任务,使用这种锁会造成busy-waiting。

实例

AtomicInteger的实现:自旋锁的实现原理是CAS,AtomicInteger中调用unsafe进行自增操作的源码中的do-while循环就是一个自旋操作,如果修改过程中遇到其他线程竞争导致没修改成功,就在while里死循环,直至修改成功

使用场景

自旋锁一般用于多核的服务器,在并发度不是特别高的情况下,比阻塞锁的效率高

自旋锁适用于临界区比较短小的情况,否则如果临界区很大(线程一旦拿到锁,很久以后才会释放),那也是不合适的

可中断锁

在Java中,synchronized就不是可中断锁,而Lock是可中断锁,因为tryLock(time)和lockInterruptibly都能响应中断。

锁的优化

JVM

自旋锁和自适应

重试多少次转为阻塞锁

锁消除

分析出不需要加锁的部分

锁粗化

相邻代码多次加锁解锁,则和为一次

个人

缩小同步代码块

尽量不要锁住方法

减少请求锁的次数

锁中尽量不要再包含锁

问题4:原子类

优势

粒度更细:原子变量可以把竞争范围缩小到变量级别,这是我们可以获得的最细粒度的情况了,通常锁的粒度都要大于原子变量的粒度

效率更高:通常,使用原子类的效率会比使用锁的效率更高,除了高度竞争的情况

image-20200724075805195

AtomicInteger常用方法

publicfinalint get()//获取当前的值

publicfinalint getAndSet(int newValue)//获取当前的值,并设置新的值

publicfinalint getAndIncrement()//获取当前的值,并自增

publicfinalint getAndDecrement()//获取当前的值,并自减

publicfinalint getAndAdd(int delta)//获取当前的值,并加上预期的值

Atomic*Reference引用类型原子类

整数保证原子性,而AtomicReference可以让一个对象保证原子性,当然,AtomicReference的功能明显比AtomicInteger强,因为一个对象里可以包含很多属性。用法和AtomicInteger类似。

AtomicIntegerFieldUpdater对普通变量进行升级

使用场景:偶尔需要一个原子get-set操作

注意点

底层使用了反射,不能对private和static的属性进行升级

Adder累加器

是Java 8引入的,相对是比较新的一个类,高并发下LongAdder比AtomicLong效率高(由于竞争很激烈,每一次加法,都要flush和refresh,JMM进行同步,导致很耗费资源。),不过本质是空间换时间,竞争激烈的时候,LongAdder把不同线程对应到不同的Cell上进行修改,降低了冲突的概率,是多段锁的理念,提高了并发性。

LongAdder的改进

LongAdder,每个线程会有自己的一个计数器,仅用来在自己线程内计数,这样一来就不会和其他线程的计数器干扰

LongAdder引入了分段累加的概念,内部有一个base变量和一个Cell数组共同参与计数

base变量:竞争不激烈,直接累加到该变量上

Cell数组:竞争激烈,各个线程分散累加到自己的槽Cell[i]中(hash算法)。

LongAdder有两个重要的方法:addsum,add是线程安全的加,sum是返回结果,多线程并发更新时被散列到不同的变量上执行,减少冲突,所以最后获取返回值是将这些变量求和。通过这点也能看出sum获取的结果是不准确的,所以它只适用于统计场景,如果要获取精确的返回值,还是得用AtomicLong,性能和准确不可兼得。

image-20200724075805195

Accumulator累加器

适用于并行计算

CAS

定义

CAS有三个操作数:内存值V、预期值A、要修改的值B,当且仅当预期值A和内存值V相同时,才将内存值修改为B,否则什么都不做。最后返回现在的V值

应用场景

乐观锁

并发容器

原子类:AtomicInteger加载Unsafe工具,用来直接操作内存数据,Unsafe类中的compareAndSwapInt方法,方法中先想办法拿到变量value在内存中的地址。通过Atomic::cmpxchg实现原子性的比较和替换,其中参数x是即将更新的值,参数e是原内存的值。至此,最终完成了CAS的全过程。

缺点

ABA问题

自旋时间过长

final

如果对象在被创建后,状态就不能被修改,那么它就是不可变的,不可变对象是线程安全的。

类防止被继承、方法防止被重写、变量防止被修改

final修饰变量

含义:被final修饰的变量,意味着值不能被修改。如果变量是对象,那么对象的引用不能变,但是对象自身的内容依然可以变化

  • 类中的final属性:1.等号右边直接赋值,2.构造函数赋值,3.初始代码块赋值
  • 类中的static final属性:1.等号右边直接赋值,2.static初始代码块赋值
  • 方法中的final变量:不规定赋值时机,但是规定使用前要赋值。

final修饰方法

构造方法不允许final修饰

不可被重写,也就是不能被override,即便是子类有同样名字的方法,那也不是override,这个和static方法是一个道理(static也不能被重写,但是可以写两个相同的函数,即在父子类中独有。)

final修饰类

不可被继承--String

不变性

对于基本数据类型,确实被final修饰后就具有不变性,但是对于对象类型,需要该对象保证状态永远不会变才可以(属性全final,且对象属性的属性也是全final,或者是private)

  • 对象创建后,其状态就不能修改
  • 所有属性都是final修饰的
  • 对象创建过程中没有发生逸出

栈封闭

在方法里新建的局部变量,实际上是存储在每个线程私有的栈空间,而每个栈的栈空间是不能被其他线程所访问到的,所以不会有线程安全问题。这就是著名的"栈封闭"技术,是"线程封闭"技术的一种情况。

tips

image-20200724075805195

问题5:并发容器

Concurrent的特点是大部分通过CAS实现并发,

CopyOnWrite则是通过复制一份原数据来实现的

Blocking通过AQS实现的

概览

ConcurrentHashMap:线程安全的HashMap

CopyOnWriteArrayList:线程安全的List ,适合读多写少的情况

BlockingQueue:这是一个接口,表示阻塞队列,非常适合用于作为数据共享的通道

ConcurrentLinkedQueue:高效的非阻塞并发队列,使用链表实现。可以看做一个线程安全的LinkedList

ConcurrentSkipListMap:是一个Map,使用跳表的数据结构进行快速查找

image-20200724075805195

ConcurrentHashMap

为什么HashMap是线程不安全的?

同时put碰撞导致数据丢失

同时put扩容导致数据丢失

死循环造成的CPU100% :在多线程同时扩容的时候,可能会造成循环链表,导致CPU100%

ConcurrentHashMap1.7

image-20200724075805195

image-20200724075805195

ConcurrentHashMap1.8

image-20200724075805195

区别总结

1.数据结构:

从并发度16--》每个node都独立

2.hash冲突的解决方法

从拉链---》拉链+红黑

3.保证并发安全

从分段锁--》CAS+synchronized

4.查询复杂度

两个方法putVal+get

image-20200724075805195

image-20200724075805195

错误使用ConcurrentHashMap

组合操作并不保证线程安全 ---》使用replace方法,或者putIfAbsent

image-20200724075805195

CopyOnWriteArrayList

读多写少:黑名单,每日更新;监听器:迭代操作远多余修改操作

读写规则

读取是完全不用加锁的,写入也不会阻塞读取操作。只有写入和写入之间需要进行同步等待

在迭代器中修改list

  • ArrayList会报错
  • CopyOnWriteArrayList不会报错,但是会输出之前没修改的(过期的)

缺点

数据一致性问题:CopyOnWrite容器只能保证数据的最终一致性,不能保证数据的实时一致性。所以如果你希望写入的数据,马上能读到,请不要使用CopyOnWrite容器。

内存占用问题:因为CopyOnWrite的写是复制机制,所以在进行写操作的时候,内存里会同时驻扎两个对象的内存。

并发队列

image-20200724075805195

阻塞队列

主要的方法

  • put, take

    take方法:获取并移除队列的头结点,一旦如果执行take的时候,队列里无数据,则阻塞,直到队列里有数据

    put方法:插入元素。但是如果队列已满,那么就无法继续插入,则阻塞,直到队列里有了空闲空间

  • add, remove, element

    add:往里放,满了抛异常

    remove:删除,空了抛异常

    element:返回头元素,空了抛异常

  • offer, poll, peek

    offer:往里放,满了返回false

    poll:取出并删除,空了返回null

    peek:取出,空了返回null

ArrayBlockingQueue

  • 有界,创建需要指定容量
  • 可以指定是否公平

LinkedBlockingQueue

  • 无界
  • take与put方法使用两把锁

PriorityBlockingQueue

  • 支持优先级
  • 可以扩容--》无界队列
  • PriorityQueue的线程优先的版本

SynchronousQueue

  • 容量为0,是一个极好的用来直接传递的并发数据结构

DelayQueue

  • 延迟队列,根据延迟时间排序
  • 元素需要实现Delayed接口,规定排序规 则

非阻塞队列

ConcurrentLinkedQueue

并发包中的非阻塞队列只有ConcurrentLinkedQueue这一种,顾名思义ConcurrentLinkedQueue是使用链表作为其数据结构的,使用CAS非阻塞算法来实现线程安全(不具备阻塞功能),适合用在对性能要求较高的并发场景。用的相对比较少一些

选择

  • 边界--》你有多少的东西需要放进来
  • 吞吐量-->一般来说,LinkedBlockingQueue>ArrayBlockingQueue

问题6:控制并发流程

概述

  • CountDownLatch倒计时门闩
  • Semaphore信号量
  • Condition接口(又称条件对象)
  • CyclicBarrier循环栅栏

image-20200724075805195

CountDownLatch

CountDownLatch是不能够重用的,如果需要重新计数,可以考虑使用CyclicBarrier或者创建新的CountDownLatch实例。

使用await()等待,使用countDown()减去1次。

Semaphore

Semaphore可以用来限制或管理数量有限的资源的使用情况。当信号童所拥有的许可证数量为0,那么下一个还想要获取许可证的线程,就需要等待,直到有另外的线程释放了许可证。在初始化Semaphore的时候可以设置公平性,一般设置为true会更合理。

image-20200724075805195

Condition接口(又称条件对象)

实际上,如果说Lock用来代替synchronized,那么Condition就是用来代替相对应的Object.wait/notify的,所以在用法和性质上,几乎都一样。await方法会自动释放持有的Lock锁,和Object.wait一样,不需要自己手动先释放锁。调用await的时候,必须持有锁,否则会抛出异常,和Object.wait一样。

一个lock锁可以生成多个锁对象,则可以使用await()与signal()更加精细的控制流程。

private Lock lock = new ReentrantLock();

private Condition notFull = lock.newCondition();

private Condition notEmpty = lock.newCondition();

CyclicBarrier循环栅栏

//五个都到了,则开始做什么?

CyclicBarrier cyclicBarrier = new CyclicBarrier(5, new Runnable() {

@Override

publicvoidrun(){

System.out.println("所有人都到场了, 大家统一出发!");

}

});

作用不同:CyclicBarrier要等固定数量的线程都到达了栅栏位置才能继续执行,而CountDownLatch只需等待数字到0,也就是说,CountDownLatch用于事件,但是CyclicBarrier是用于线程的。

可重用性不同:CountDownLatch在倒数到0并触发门闩打开后,就不能再次使用了,除非新建新的实例;而CyclicBarrier可以重复使用。

问题7:AQS

为什么要使用AQS

AQS是一个用于构建锁、同步器、协作工具类的工具类(框架)。有了AQS以后,更多的协作工具类都可以很方便得被写出来

因为上面的那些协作类,它们有很多工作都是类似的,所以如果能提取出一个工具类,那么就可以直接用,对于ReentrantLock和Semaphore而言就可以屏蔽很多细节,只关注它们自己的“业务逻辑"就可以了

内部有一个Sync类,Sync类继承了AQS

image-20200724075805195

原理解释

AQS最核心的就是三大部分:

  • state状态
  • 控制线程抢锁和配合的FIFO队列
  • 期望协作工具去实现的获取/释放等重要方法

应用

image-20200724075805195

image-20200724075805195

image-20200724075805195

问题8:Future和Callable

Runnable的缺陷

不能返回一个返回值

也不能抛出checked Exception

Callable接口

image-20200724075805195

Future类

一个方法可能比较耗时,可以使用子线程执行,

  • get方法

我们可以用Future.get来获取Callable接口返回的执行结果,get方法的行为取决于Callable任务的状态,只有以下这5种情况:

1,任务正常完成:get方法会立刻返回结果

2,任务尚未完成(任务还没开始或进行中):get将阻塞并直到任务完成。

3,任务执行过程中抛出Exception:get方法会抛出ExecutionException:这里的抛出异常,是call()执行时产生的那个异常,看到这个异常类型是java.util.concurrent.ExecutionException,不论call()执行时抛出的异常类型是什么,最后get方法抛出的异常类型都是ExecutionException,并不是遇到异常就抛出,而是当调用get方法的时候才会出现异常

4,任务被取消:get方法会抛出CancellationException

5,任务超时:get方法有一个重载方法,是传入一个延迟时间的,如果时间到了还没有获得结果,get方法就会抛出TimeoutException.

  • cancel方法

取消任务的执行,传入true与false的区别,代表是否中断正在执行的任务。

Future.cancel(true)适用于:

1.任务能够处理interrupt

Future.cancel(false)仅用于避免启动尚未启动的任务,适用于:

1.未能处理interrupt的任务

2.不清楚任务是否支持取消

3·需要等待已经开始的任务执行完成

  • isDone()

判断任务是否已经执行完了,即使任务不是正常结束的,一旦结束,也会True

  • isCancelled()

判断是否取消了这个任务,限时获取任务的结果等

使用的方法

  • 用法1:线程池的submit方法返回Future对象

首先,我们要给线程池提交我们的任务,提交时线程池会立刻返回给我们一个空的Future容器。当线程的任务一旦执行完毕也就是当我们可以获取结果的时候,线程池便会把该结果填入到之前给我们的那个Future中去(而不是创建一个新的Future),我们此时便可以从该Future中获得任务执行的结果

  • 用法2:用FutureTask来创建Future

FutureTask是一种包装器,可以把Callable转化成Future和Runnable,它同时实现二者的接口

        Task task = new Task();//Callable

FutureTask<Integer> integerFutureTask = new FutureTask<>(task);

new Thread(integerFutureTask).start();

Task task = new Task();

FutureTask<Integer> integerFutureTask = new FutureTask<>(task);

ExecutorService service = Executors.newCachedThreadPool();

service.submit(integerFutureTask);

Future的注意点

  • 当for循环批量获取future的结果时,容易发生一部分线程很慢的情况,get方法调用时应使用timeout限制,或者使用CompletableFuture工具类
  • Future的生命周期是不能后退的。

问题9:迭代一个缓存工具

  1. hashMap
  2. synchronized锁住缓存方法
  3. 装饰器优化,实现缓存与业务的解耦
  4. 减小粒度-->synchronized锁代码块
  5. ConcurrentHashMap
  6. 出现重复计算,使用Future,因为get方法会看看有没有提前放进去的任务。
    image-20200724075805195

  7. 同时调用get()都为空,也会出现重复计算,使用putIfAbsent()方法控制
    image-20200724075805195

  8. 计算类中可能出现错误,对get方法可能出现的三种异常进行不同的处理

  9. 考虑缓存污染,考虑错误的回滚。可以把任务进行清空。
  10. 缓存的过期功能,可以使用ScheduledExecutorService的线程池进行设置定时的延迟任务
  11. 如果设置的是同时进行过期,那么同时都拿不到缓存,导致缓存雪崩,缓存击穿等高并发下的缓存问题。--->缓存时间随机
  12. 高并发测试--》使用CountDownLatch进行统一控制,使用ThreadLocal进行每一个线程的时间输出

以上是 JUC并发工具类详解 的全部内容, 来源链接: utcz.com/a/36160.html

回到顶部