java并发深入--通信、forkJoin框架、线程安全的学习
线程是计算机的一个稀缺资源,在一个应用的开发下,使用到多线程之间的使用,对项目的效率有着明显的提升;但是并不是线程越多越好,在线程的使用的时候还必须保证线程安全的前提下进行的。先前自己在学校的时候对线程总体的学习,但是始终没有用在项目上,现在打算进一步的对线程的深入源码的去理解、去学习,希望可以得到提升。
线程---并发编程的基础
先前对线程已经学习过一次,现在对此做一个基础的总结:
1.cpu核心数和线程数之间的关系:在传统的模式中,核心数和线程数是1:1,在微软加入超线程后,这两者的比例变成了1:2,并且性能上有一个很大的提升。
2.cpu时间片轮转机制:在cpu的调度下,每个线程都有一个生命周期,当生命周期结束时,无论线程的任务是否已经完成,都会暂停当前的任务,cpu调下一个线程进行运行,这称为上下文切换。在多线程运行,都会涉及上下文的切换,如果线程太多,甚至过量,会导致上下文切换比较频繁,使得上下文切换花费的时间占cpu时间的比例会上升,导致工作时间比例下降,效率反而低下,所以并不是线程越多越好。关于优化上下文切换可以参照博客JAVA并发概要。
3.进程和线程的区别:进程:程序运行资源分配的最小单位;线程:是cpu调度的最小单位(本身没有资源,所有线程共享进程中的资源),线程必须依附于进程,不会单独存在。
4.高并发的优点:①充分利用cpu的资源,避免的cpu资源的浪费;②加快用户响应的时间;③程序模块异步化。但是必须注意的问题就是: 线程安全性问题,资源有限性问题。
5.java里的程序天生的多线程,main函数就是一个线程;创建启动线程的方式:①:继承类Thread;②:实现接口Runable;③:实现接口Callable;④:使用线程池工产类Executors创建或者使用ThreadPoolExecutor自定义创建。
6.怎样才能让java线程安全的停止:
传统的线程停止的方法:stop(),suspend()函数,但是这两个方式已经被弃用了,因为这两个方法会导致一系列的安全情况;
stop()被调用后,强制暂停线程,线程资源会得不到释放,数据很有可能被使用在一个不正确的情况下。
suspend()被调用后,线程不释放占用资源直接进入睡眠,很容易造成死锁(占用且等待)。
造成死锁的四个条件:互斥,占有并且等待,不可剥夺,循环等待。
怎么样才能线程安全的停止呢?
在现在的解决方案里面:interrupt(),interrupted(),isInterrupted()这三个函数来解决的。
interrupt()本质上不会中断线程,只是将线程的中断标志位设置为“true”。
isInterrupted():是static方法的interrupted(),两个都是用来检查中断标记位的。
1 public class InterruptClass {2 private static class UseInterrupt implements Runnable{
3 @Override
4 public void run() {
5 //但是实际上并没有中断,需要检测中断标识符
6 while (!Thread.currentThread().isInterrupted()){
7 System.out.println(Thread.currentThread().getName()+" am implements Runnable");
8 }
9 System.out.println(Thread.currentThread().getName()+" interrupt is "+Thread.currentThread().isInterrupted());
10 }
11 }
12 public static void main(String[] args) throws InterruptedException {
13 UseInterrupt useInterrupt=new UseInterrupt();
14 Thread thread=new Thread(useInterrupt,"endThread");
15 thread.start();
16 Thread.sleep(1);
17 thread.interrupt();//通过调用函数实现中断
18 }
19 }
测试代码
7.线程优先级:Thread.setPriority();//默认标识是5,但是在有些操作系统中,操作系统会忽略这一个语法,这个函数不可靠,应该使用其他的方式设置线程的优先级。
8.守护线程:Thread.setDaemon(true),在守护线程中,try-finally中的finally的代码不一定执行。
9.线程之间的状态、以及状态的切换:
线程---线程间的协作通信
线程间的通信有:同步,wait/notify机制,管道通信,Condition。
1.线程间的共享:
synchronized内置锁:在并发的条件下保证原子性;
Volatile关键字:只保证内存的可见性,不保证内存操作的原子性。在高可见低原子性,使用volatile效果很好,不止达到了目的,还避免了锁。
ThreadLocal:每一个线程都独自的拥有一个变量,但是线程之间不是共享的,在ThreadLocal中有一个ThreadLocalMap的内部类,实际上就是map结构,key就是线程,value就是自己的那个变量。
执行结果:
1 public class UserThreadLocal {2
3 static ThreadLocal<Integer> threadLocal=new ThreadLocal<Integer>(){
4 @Override
5 protected Integer initialValue(){
6 return 1;
7 }
8 };
9 public void startThreadArray(){
10 Thread[] runs=new Thread[3];
11 for(int i=0;i<runs.length;i++){
12 runs[i]=new Thread(new TestThread(i));
13 }
14 for (int i=0;i<runs.length;i++){
15 runs[i].start();
16 }
17 }
18 private static class TestThread implements Runnable{
19 int id;
20
21 public TestThread(int id) {
22 this.id = id;
23 }
24
25 @Override
26 public void run() {
27 System.out.println(Thread.currentThread().getName()+":Start ");
28 Integer s=threadLocal.get();
29 s=s+id;
30 threadLocal.set(s);
31 System.out.println(Thread.currentThread().getName()+" : "+threadLocal.get());
32 }
33 }
34
35 public static void main(String[] args){
36 UserThreadLocal userThreadLocal=new UserThreadLocal();
37 userThreadLocal.startThreadArray();
38 }
39
40 }
测试代码
2.等待通知:wait(),notify/notifyAll,等待超时模式,
join()方法:控制线程的执行顺序,等待执行join的线程执行完成后,再执行其他的内容。
yield()方法:调用时,当前线程执行锁,不释放锁。
sleep()方法:调用时,不释放锁、占有资源。
wait()方法:调用时,释放锁和占有资源。
notify()/notifyAll,调用前持有锁;包含notify()的方法结束后,锁才会被释放。
关于wait和notify的列子如下:我们现在有一个快递系统,定义一个快递的实体类,里有km里程数和site站点。我们需要当快递到达目的地时要提醒用户,实现如下:waitKm这个锁一直被阻塞了的,当changeKm被修改的时候,唤醒waitKm这个锁。
关于notify和notifyAll的使用问题:notify是唤醒其中的一个wait但是不能确定唤醒的是哪一个wait,所以都是使用的是notifyAll。
1 package main.java.Thread.UserThread.wn;2
3 /**
4 * @author yangxin
5 * @time 2019/2/28 11:55
6 */
7 public class Express {
8 public final static String city="Shanghai";
9 private int km;
10 private String site;
11
12 public Express() {
13 }
14
15 public Express(int km, String site) {
16 this.km = km;
17 this.site = site;
18 }
19
20 public synchronized void changeKm(){
21 this.km=101;
22 notifyAll();
23 }
24
25 public synchronized void changeSite(){
26 this.site="BeiJing";
27 notifyAll();
28 }
29
30 public synchronized void waitKm(){
31 while (this.km<100){
32 try {
33 wait();
34 System.out.println("check km Thream["+Thread.currentThread().getName()
35 +"] is be notify");
36 } catch (InterruptedException e) {
37 e.printStackTrace();
38 }
39 }
40 System.out.println("this km is "+this.km+",I will change db");
41 }
42
43 public synchronized void waitSite(){
44 while (this.site.equals(city)){
45 try {
46 wait();
47 System.out.println("check km Thream["+Thread.currentThread().getName()
48 +"] is be notify");
49 } catch (InterruptedException e) {
50 e.printStackTrace();
51 }
52 }
53
54 System.out.println("this is site"+this.site+",I will change db");
55 }
56 }
测试代码
1 package main.java.Thread.UserThread.wn;2
3 /**
4 * @author yangxin
5 * @time 2019/2/28 11:55
6 */
7 public class TestWn {
8 private static Express express=new Express(0,Express.city);
9
10 private static class chenkKm extends Thread{
11 @Override
12 public void run() {
13 express.waitKm();
14 }
15 }
16
17 private static class checkSite extends Thread{
18 @Override
19 public void run() {
20 express.waitSite();
21 }
22 }
23
24 public static void main(String[] args) throws InterruptedException{
25 for(int i=0;i<1;i++){
26 new checkSite().start();
27 }
28 for(int i=0;i<1;i++){
29 new chenkKm().start();
30 }
31 Thread.sleep(1000);
32 express.changeKm();
33 }
34 }
测试代码二
3.Fork/join框架
fork/join框架就是在必要的情况下,将一个大任务,进行拆分成一个个小任务,当在不可拆时,再将任务计算的结果一个个的合成,得出最后的结果,显然就是一个分而治之的思路。
Fork/Join相关的类和使用的标准范式:ForkJoinTask,ForkJoinPool。使用ForkJoinTask实现后将代码交给ForkJoinPool里。
在ForkJoinTask的派生子类主要有:RecursiveTask和RecursiveAction;RecursiveTask表示计算的任务有返回值;RecursiveAction表示计算的任务没有返回值;一般的使用方法是自定义一个类来继承这两个子类,覆盖compute方法,然后在这个方法里进行任务的执行和拆分。
①:遍历电脑D盘统计文件个数;②:遍历电脑D盘寻找.txt文件的个数。
遍历电脑D盘统计个数
运行结果(部分截图):
1 public class ForkAndJoinFramework extends RecursiveTask<Integer> {2 private File path;//文件路径
3 public ForkAndJoinFramework(File path) {
4 this.path = path;
5 }
6 @Override
7 protected Integer compute() {
8 int count=0;//文件个数的计数器
9 int dircount=0;//目录计数器
10 List<ForkAndJoinFramework> listTask=new ArrayList<>();//子目录任务容器
11 File[] files=path.listFiles();
12 if(files!=null){
13 for(File file:files){
14 if(file.isDirectory()){
15 listTask.add(new ForkAndJoinFramework(file));
16 dircount++;
17 }else{
18 count++;
19 }
20 }
21 }
22 System.out.println("目录:"+path.getAbsolutePath()+" 包含的子目录个数"+dircount
23 +" 包含的文件个数"+count);
24 if(!listTask.isEmpty()){
25 for(ForkAndJoinFramework forkAndJoinFramework:invokeAll(listTask)){
26 count+=forkAndJoinFramework.join();
27 }
28 }
29 return count;
30 }
31
32 public static void main(String[] args){
33 ForkJoinPool forkJoinPool=new ForkJoinPool();
34 ForkAndJoinFramework forkAndJoinFramework=new ForkAndJoinFramework(new File("D:/"));
35 forkJoinPool.invoke(forkAndJoinFramework);
36 System.out.println("File Count ="+forkAndJoinFramework.join());
37 }
38 }
遍历电脑D盘统计个数
遍历电脑D盘寻找.txt文件:因为是查找出.txt所以我们优先选择继承RecursiveAction没有返回值的。
运行结果:
1 public class FornJoinFramework extends RecursiveAction {2 private File path;
3 public FornJoinFramework(File path) {
4 this.path = path;
5 }
6 @Override
7 protected void compute() {
8 List<FornJoinFramework> listTask=new ArrayList<>();
9 File[] files=path.listFiles();
10 if(files!=null){
11 for(File file:files){
12 if(file.isDirectory()){
13 listTask.add(new FornJoinFramework(file));
14 }else{
15 if(file.getAbsolutePath().endsWith("txt")){
16 System.out.println("文件:"+file.getAbsolutePath());
17 }
18 }
19 }
20 }
21 if(!listTask.isEmpty()){
22 for(FornJoinFramework fornJoinFramework: invokeAll(listTask)){
23 fornJoinFramework.join();
24 }
25 }
26 }
27 public static void main(String[] args){
28 ForkJoinPool forkJoinPool=new ForkJoinPool();
29 FornJoinFramework fornJoinFramework=new FornJoinFramework(new File("D:/"));
30 forkJoinPool.execute(fornJoinFramework);
31 try {
32 Thread.sleep(5);
33 } catch (InterruptedException e) {
34 e.printStackTrace();
35 }
36 System.out.println("I am ok ");
37 fornJoinFramework.join();
38 System.out.println("work finish ");
39 }
40 }
个目录下的txt文件
4.CountDownLatch(闭锁):允许一个或者多个线程等待其他线程完成操作后继续自己的操作,类似加强版的Join。
1 /**2 * @author yangxin
3 * @time 2019/2/28 15:30
4 *
5 * 类说明:5个线程执行6个初始化任务,使用CountDownLatch。
6 */
7 public class UseCountDownLatch {
8 static CountDownLatch countDownLatch=new CountDownLatch(6);
9
10 private static class initThread implements Runnable{
11
12 @Override
13 public void run() {
14 System.out.println("Thread:"+Thread.currentThread().getName()+"初始化中....");
15 countDownLatch.countDown();
16 System.out.println("Thread:"+Thread.currentThread().getName()+"初始化完成,执行其他操作中...");
17 }
18 }
19
20 private static class BussThread implements Runnable{
21
22 @Override
23 public void run() {
24 try {
25 countDownLatch.await();
26 } catch (InterruptedException e) {
27 e.printStackTrace();
28 }
29 System.out.println("Thread:"+Thread.currentThread().getName()+"业务逻辑工作++++++");
30 }
31 }
32
33 public static void main(String[] args) throws InterruptedException {
34 new Thread(new Runnable() {
35 @Override
36 public void run() {
37 System.out.println("Thread:"+Thread.currentThread().getName()+"初始化111");
38 countDownLatch.countDown();
39 System.out.println("Thread:"+Thread.currentThread().getName()+"初始化222");
40 countDownLatch.countDown();
41 }
42 }).start();
43 new Thread(new BussThread()).start();
44 for(int i=0;i<=3;i++){
45 Thread thread=new Thread(new initThread());
46 thread.start();
47 }
48
49 countDownLatch.await();
50 System.out.println("Main work finish");
51 }
52
53 }
CountDownLatch
5.CyclicBarrier(屏障):让一组线程到达一个屏障时被阻塞,直到最后一个线程到达屏障时才被放行,所有被屏障拦截的线程才会继续执行。可以在一组线程到达屏障时,优先执行某一个操作,方便处理更复杂的业务逻辑。关于闭锁和屏障可以参考JAVA并发 闭锁、同步屏障、信号量总结。
线程---线程安全
1.怎么能够保证线程安全:让类不可变,volatile保证内存可见性,加锁,CAS类控制线程的执行顺序。
2.死锁:是指两个或者两个以上的进程在执行的过程中,由于资源竞争或者彼此通信而造成的一种拥塞现象,如果没有外力的情况下,将一直保持这种状态。怎么解决死锁?控制线程之间的加锁顺序。
3.活锁:在try-Local模式下造成的情况。在try-Local中,如果线程已经锁定了A锁,在尝试获取B锁的时候,发现已经被占有,于是释放自己占有的A锁;在此同时另一个线程已经锁定了B锁,在尝试获取A锁的时候发现已经被占有,于是释放自己占有的B锁,于是两个线程都在相互等待,这种称为活锁。
4.线程饥饿:在线程优先级的情况下,有可能出现这种情况,优先级高的线程一直持有锁,优先级低的线程一直都会得不到锁,就是线程饥饿。
5.性能:在加锁的情况上,尽量只加锁关键性代码,尽量不要对没必要的代码段加锁。
显示锁Lock接口:
①、尝试非阻塞获取锁,②、可以被中断的获取锁,③、超时获取锁。(synchronized 是隐式锁,固化了加锁和释放锁)。
Lock接口的核心方法:lock()加锁,unlock()释放锁,trylock()有一个返回值,如果获取到锁返回true,否则返回false;
Lock的实现ReentrantLock和标准用法:排它锁:就是在任何情况下同一时刻只允许一个线程允许访问,例子如下:
ReadWriteLock接口和读写锁ReentrantReadWriteLock:因为在排他锁,在任何情况下,同一时刻都只允许一个线程访问,但是在很多情况下,读多写少的情况下就非常的低效,因为读可以不用加锁,所以就出现了读写锁,他读写分离。同时都是读的 时候他们都是并发的执行。
如下来对比Synchronized和ReentrantReadWriteLock的效率对比:
Synchronized加锁:
运行效率如下:
ReentrantReadWriteLock锁:
运行的效率:
两者的对比可知,在读多写少的情况下,ReentrantReadWriteLock锁更高效。
代码有点多:如果要源代码访问github:https://github.com/fireshoot/LearningAndTest/tree/master/src/main/java/Thread/UserThread。在rw包下的就是这部分的代码,整个java包下是本篇博客的代码。
Condition接口
Condition接口提供方法,signal()/signalAll()和await(),这两个方法的作用和wait-notify一样。
使用如下:
1 public class ConditionClass {2 Lock kmLock=new ReentrantLock();
3 Condition condition1=kmLock.newCondition();
4
5 Lock siteLock=new ReentrantLock();
6 Condition condition2=siteLock.newCondition();
7
8 private String name;
9 private int km;
10 private String site;
11
12 public void setKm(int km) {
13 kmLock.lock();
14 try{
15 this.km = km;
16 condition1.signal();
17 }finally {
18 kmLock.unlock();
19 }
20 }
21
22 public void setSite(String site) {
23 siteLock.lock();
24 try{
25 this.site = site;
26 condition2.signal();
27 }finally {
28 siteLock.unlock();
29 }
30 }
31
32 public void changeKm() throws InterruptedException {
33 kmLock.lock();
34 try{
35 while (this.km<100){
36 condition1.await();
37 //.....
38 }
39 //......
40 }finally {
41 kmLock.unlock();
42 }
43 }
44
45 public void changeSite() throws InterruptedException {
46 siteLock.lock();
47 try{
48 while (site.equals("BeiJing")){
49 condition2.await();
50 }
51 }finally {
52 siteLock.unlock();
53 }
54 }
55
56 public ConditionClass(String name, int km, String site) {
57 this.name = name;
58 this.km = km;
59 this.site = site;
60 }
61
62 public int getKm() {
63 return km;
64 }
65
66 public String getSite() {
67 return site;
68 }
69 }
Condition的使用
CAS(Compare And Swap)
即比较并替换:三个运算符:一个内存地址V,一个期望的旧值A,一个新值B;在内存地址V上,在期望旧值B的基础上,将新值写入地址V上。
是一种实现并发算法时常用到的技术,Java并发包中的很多类都使用了CAS技术。它能够保证基本类型的变化具有原子性。比如我们常常见到的 i++;这条语句,在平时的开发中并没有感觉原子性的问题,但是在多线程的情况下i++并不具有原子性,i++首先取值,再写值,在并发的条件下有可能已经被多个线程写值了,导致i++增加的异常。怎么解决比如i++的原子性呢,那么就是用CAS的AtomicInteger来代替int,在i++的时候,首先检测该地址上的期望旧值是不是我们i++之前的值,如果是,那么就执行++操作;如果不是,说明其他线程对此已经做出了修改,报异常。
CAS存在的问题:
①:ABA问题:内存计算V本来就是A值,但是在其中变化成了B值,但是最后又变化成A,在运行检查的时候发现期望旧值并没有改变,于是进行了操作。解决方法:版本号。
②:往往只能保证一个共享变量的原子操作。
相关的原子操作类:
- 更新基本类型:AtomicBoolearn,AtomicInteger,AtomicLong,AtomicReference。
- 更新数组类型:AtomicIntegerArray,AtomicLongArray,AtomicReferenceArray。
- 更新引用类型:AtomicReference,AtomicMarkableReference,AtomicStampReference。
- 原子更新字段类:AtomicReferenceFieldUpdater,AtumicIntegerFieldUpdater,AtomicLongFieldUpdater。
结语
对线程并发编程的基础知识的复习,还对①加深了对 wait-notify/notifyAll的理解和使用;②Fork-Join框架下ForkJoinTask(RecursiveTask,RecursiveAction)和ForkJoinPool的搭配使用,并且实现搜寻文件数量和指定文件的实现方式;③对CountDownLatch、CyclicBarrier、Semaphore、Exchange的复习;④:对显示锁Lock、ReentrantLock、读写锁ReadWriteLock、ReentrantReadWriteLock的使用,读写锁和Synchronized关键字在读多写少的效率比较;⑤ Condition的使用,和Wait-notify的区别;⑥:CAS。
关于本篇博客的源码见github地址:https://github.com/fireshoot/LearningAndTest。注意:我是使用的IDEA开发的,如果导入失败直接复制包类就行了
转载标明地址:java并发深入学习总结
以上是 java并发深入--通信、forkJoin框架、线程安全的学习 的全部内容, 来源链接: utcz.com/z/393816.html