java并发深入--通信、forkJoin框架、线程安全的学习

java

  线程是计算机的一个稀缺资源,在一个应用的开发下,使用到多线程之间的使用,对项目的效率有着明显的提升;但是并不是线程越多越好,在线程的使用的时候还必须保证线程安全的前提下进行的。先前自己在学校的时候对线程总体的学习,但是始终没有用在项目上,现在打算进一步的对线程的深入源码的去理解、去学习,希望可以得到提升。

线程---并发编程的基础

先前对线程已经学习过一次,现在对此做一个基础的总结:

1.cpu核心数和线程数之间的关系:在传统的模式中,核心数和线程数是1:1,在微软加入超线程后,这两者的比例变成了1:2,并且性能上有一个很大的提升。

2.cpu时间片轮转机制:在cpu的调度下,每个线程都有一个生命周期,当生命周期结束时,无论线程的任务是否已经完成,都会暂停当前的任务,cpu调下一个线程进行运行,这称为上下文切换。在多线程运行,都会涉及上下文的切换,如果线程太多,甚至过量,会导致上下文切换比较频繁,使得上下文切换花费的时间占cpu时间的比例会上升,导致工作时间比例下降,效率反而低下,所以并不是线程越多越好。关于优化上下文切换可以参照博客JAVA并发概要。

3.进程和线程的区别:进程:程序运行资源分配的最小单位;线程:是cpu调度的最小单位(本身没有资源,所有线程共享进程中的资源),线程必须依附于进程,不会单独存在。

4.高并发的优点:①充分利用cpu的资源,避免的cpu资源的浪费;②加快用户响应的时间;③程序模块异步化。但是必须注意的问题就是: 线程安全性问题,资源有限性问题。

5.java里的程序天生的多线程,main函数就是一个线程;创建启动线程的方式:①:继承类Thread;②:实现接口Runable;③:实现接口Callable;④:使用线程池工产类Executors创建或者使用ThreadPoolExecutor自定义创建。

6.怎样才能让java线程安全的停止:

  传统的线程停止的方法:stop(),suspend()函数,但是这两个方式已经被弃用了,因为这两个方法会导致一系列的安全情况;

  stop()被调用后,强制暂停线程,线程资源会得不到释放,数据很有可能被使用在一个不正确的情况下。

  suspend()被调用后,线程不释放占用资源直接进入睡眠,很容易造成死锁(占用且等待)。

造成死锁的四个条件:互斥,占有并且等待,不可剥夺,循环等待。

怎么样才能线程安全的停止呢?

在现在的解决方案里面:interrupt(),interrupted(),isInterrupted()这三个函数来解决的。

interrupt()本质上不会中断线程,只是将线程的中断标志位设置为“true”。

isInterrupted():是static方法的interrupted(),两个都是用来检查中断标记位的。

 1 public class InterruptClass {

2 private static class UseInterrupt implements Runnable{

3 @Override

4 public void run() {

5 //但是实际上并没有中断,需要检测中断标识符

6 while (!Thread.currentThread().isInterrupted()){

7 System.out.println(Thread.currentThread().getName()+" am implements Runnable");

8 }

9 System.out.println(Thread.currentThread().getName()+" interrupt is "+Thread.currentThread().isInterrupted());

10 }

11 }

12 public static void main(String[] args) throws InterruptedException {

13 UseInterrupt useInterrupt=new UseInterrupt();

14 Thread thread=new Thread(useInterrupt,"endThread");

15 thread.start();

16 Thread.sleep(1);

17 thread.interrupt();//通过调用函数实现中断

18 }

19 }

测试代码

 

7.线程优先级:Thread.setPriority();//默认标识是5,但是在有些操作系统中,操作系统会忽略这一个语法,这个函数不可靠,应该使用其他的方式设置线程的优先级。

8.守护线程:Thread.setDaemon(true),在守护线程中,try-finally中的finally的代码不一定执行。

9.线程之间的状态、以及状态的切换:

线程---线程间的协作通信

线程间的通信有:同步,wait/notify机制,管道通信,Condition。

1.线程间的共享:
synchronized内置锁:在并发的条件下保证原子性;

Volatile关键字:只保证内存的可见性,不保证内存操作的原子性。在高可见低原子性,使用volatile效果很好,不止达到了目的,还避免了锁。

ThreadLocal:每一个线程都独自的拥有一个变量,但是线程之间不是共享的,在ThreadLocal中有一个ThreadLocalMap的内部类,实际上就是map结构,key就是线程,value就是自己的那个变量。

执行结果:

 1 public class UserThreadLocal {

2

3 static ThreadLocal<Integer> threadLocal=new ThreadLocal<Integer>(){

4 @Override

5 protected Integer initialValue(){

6 return 1;

7 }

8 };

9 public void startThreadArray(){

10 Thread[] runs=new Thread[3];

11 for(int i=0;i<runs.length;i++){

12 runs[i]=new Thread(new TestThread(i));

13 }

14 for (int i=0;i<runs.length;i++){

15 runs[i].start();

16 }

17 }

18 private static class TestThread implements Runnable{

19 int id;

20

21 public TestThread(int id) {

22 this.id = id;

23 }

24

25 @Override

26 public void run() {

27 System.out.println(Thread.currentThread().getName()+":Start ");

28 Integer s=threadLocal.get();

29 s=s+id;

30 threadLocal.set(s);

31 System.out.println(Thread.currentThread().getName()+" : "+threadLocal.get());

32 }

33 }

34

35 public static void main(String[] args){

36 UserThreadLocal userThreadLocal=new UserThreadLocal();

37 userThreadLocal.startThreadArray();

38 }

39

40 }

测试代码

 

 

2.等待通知:wait(),notify/notifyAll,等待超时模式,

join()方法:控制线程的执行顺序,等待执行join的线程执行完成后,再执行其他的内容。

yield()方法:调用时,当前线程执行锁,不释放锁。

sleep()方法:调用时,不释放锁、占有资源。

wait()方法:调用时,释放锁和占有资源。

notify()/notifyAll,调用前持有锁;包含notify()的方法结束后,锁才会被释放。

关于wait和notify的列子如下:我们现在有一个快递系统,定义一个快递的实体类,里有km里程数和site站点。我们需要当快递到达目的地时要提醒用户,实现如下:waitKm这个锁一直被阻塞了的,当changeKm被修改的时候,唤醒waitKm这个锁。

 

关于notify和notifyAll的使用问题:notify是唤醒其中的一个wait但是不能确定唤醒的是哪一个wait,所以都是使用的是notifyAll。

 

 1 package main.java.Thread.UserThread.wn;

2

3 /**

4 * @author yangxin

5 * @time 2019/2/28 11:55

6 */

7 public class Express {

8 public final static String city="Shanghai";

9 private int km;

10 private String site;

11

12 public Express() {

13 }

14

15 public Express(int km, String site) {

16 this.km = km;

17 this.site = site;

18 }

19

20 public synchronized void changeKm(){

21 this.km=101;

22 notifyAll();

23 }

24

25 public synchronized void changeSite(){

26 this.site="BeiJing";

27 notifyAll();

28 }

29

30 public synchronized void waitKm(){

31 while (this.km<100){

32 try {

33 wait();

34 System.out.println("check km Thream["+Thread.currentThread().getName()

35 +"] is be notify");

36 } catch (InterruptedException e) {

37 e.printStackTrace();

38 }

39 }

40 System.out.println("this km is "+this.km+",I will change db");

41 }

42

43 public synchronized void waitSite(){

44 while (this.site.equals(city)){

45 try {

46 wait();

47 System.out.println("check km Thream["+Thread.currentThread().getName()

48 +"] is be notify");

49 } catch (InterruptedException e) {

50 e.printStackTrace();

51 }

52 }

53

54 System.out.println("this is site"+this.site+",I will change db");

55 }

56 }

测试代码

 

 1 package main.java.Thread.UserThread.wn;

2

3 /**

4 * @author yangxin

5 * @time 2019/2/28 11:55

6 */

7 public class TestWn {

8 private static Express express=new Express(0,Express.city);

9

10 private static class chenkKm extends Thread{

11 @Override

12 public void run() {

13 express.waitKm();

14 }

15 }

16

17 private static class checkSite extends Thread{

18 @Override

19 public void run() {

20 express.waitSite();

21 }

22 }

23

24 public static void main(String[] args) throws InterruptedException{

25 for(int i=0;i<1;i++){

26 new checkSite().start();

27 }

28 for(int i=0;i<1;i++){

29 new chenkKm().start();

30 }

31 Thread.sleep(1000);

32 express.changeKm();

33 }

34 }

测试代码二

 

 

3.Fork/join框架

fork/join框架就是在必要的情况下,将一个大任务,进行拆分成一个个小任务,当在不可拆时,再将任务计算的结果一个个的合成,得出最后的结果,显然就是一个分而治之的思路。

Fork/Join相关的类和使用的标准范式:ForkJoinTask,ForkJoinPool。使用ForkJoinTask实现后将代码交给ForkJoinPool里。

在ForkJoinTask的派生子类主要有:RecursiveTask和RecursiveAction;RecursiveTask表示计算的任务有返回值;RecursiveAction表示计算的任务没有返回值;一般的使用方法是自定义一个类来继承这两个子类,覆盖compute方法,然后在这个方法里进行任务的执行和拆分。

①:遍历电脑D盘统计文件个数;②:遍历电脑D盘寻找.txt文件的个数。

遍历电脑D盘统计个数

 

运行结果(部分截图):

 1 public class ForkAndJoinFramework extends RecursiveTask<Integer> {

2 private File path;//文件路径

3 public ForkAndJoinFramework(File path) {

4 this.path = path;

5 }

6 @Override

7 protected Integer compute() {

8 int count=0;//文件个数的计数器

9 int dircount=0;//目录计数器

10 List<ForkAndJoinFramework> listTask=new ArrayList<>();//子目录任务容器

11 File[] files=path.listFiles();

12 if(files!=null){

13 for(File file:files){

14 if(file.isDirectory()){

15 listTask.add(new ForkAndJoinFramework(file));

16 dircount++;

17 }else{

18 count++;

19 }

20 }

21 }

22 System.out.println("目录:"+path.getAbsolutePath()+" 包含的子目录个数"+dircount

23 +" 包含的文件个数"+count);

24 if(!listTask.isEmpty()){

25 for(ForkAndJoinFramework forkAndJoinFramework:invokeAll(listTask)){

26 count+=forkAndJoinFramework.join();

27 }

28 }

29 return count;

30 }

31

32 public static void main(String[] args){

33 ForkJoinPool forkJoinPool=new ForkJoinPool();

34 ForkAndJoinFramework forkAndJoinFramework=new ForkAndJoinFramework(new File("D:/"));

35 forkJoinPool.invoke(forkAndJoinFramework);

36 System.out.println("File Count ="+forkAndJoinFramework.join());

37 }

38 }

遍历电脑D盘统计个数

 

 

 

遍历电脑D盘寻找.txt文件:因为是查找出.txt所以我们优先选择继承RecursiveAction没有返回值的。

运行结果:

 

 1 public class FornJoinFramework extends RecursiveAction {

2 private File path;

3 public FornJoinFramework(File path) {

4 this.path = path;

5 }

6 @Override

7 protected void compute() {

8 List<FornJoinFramework> listTask=new ArrayList<>();

9 File[] files=path.listFiles();

10 if(files!=null){

11 for(File file:files){

12 if(file.isDirectory()){

13 listTask.add(new FornJoinFramework(file));

14 }else{

15 if(file.getAbsolutePath().endsWith("txt")){

16 System.out.println("文件:"+file.getAbsolutePath());

17 }

18 }

19 }

20 }

21 if(!listTask.isEmpty()){

22 for(FornJoinFramework fornJoinFramework: invokeAll(listTask)){

23 fornJoinFramework.join();

24 }

25 }

26 }

27 public static void main(String[] args){

28 ForkJoinPool forkJoinPool=new ForkJoinPool();

29 FornJoinFramework fornJoinFramework=new FornJoinFramework(new File("D:/"));

30 forkJoinPool.execute(fornJoinFramework);

31 try {

32 Thread.sleep(5);

33 } catch (InterruptedException e) {

34 e.printStackTrace();

35 }

36 System.out.println("I am ok ");

37 fornJoinFramework.join();

38 System.out.println("work finish ");

39 }

40 }

个目录下的txt文件

 

 

4.CountDownLatch(闭锁):允许一个或者多个线程等待其他线程完成操作后继续自己的操作,类似加强版的Join。

 1 /**

2 * @author yangxin

3 * @time 2019/2/28 15:30

4 *

5 * 类说明:5个线程执行6个初始化任务,使用CountDownLatch。

6 */

7 public class UseCountDownLatch {

8 static CountDownLatch countDownLatch=new CountDownLatch(6);

9

10 private static class initThread implements Runnable{

11

12 @Override

13 public void run() {

14 System.out.println("Thread:"+Thread.currentThread().getName()+"初始化中....");

15 countDownLatch.countDown();

16 System.out.println("Thread:"+Thread.currentThread().getName()+"初始化完成,执行其他操作中...");

17 }

18 }

19

20 private static class BussThread implements Runnable{

21

22 @Override

23 public void run() {

24 try {

25 countDownLatch.await();

26 } catch (InterruptedException e) {

27 e.printStackTrace();

28 }

29 System.out.println("Thread:"+Thread.currentThread().getName()+"业务逻辑工作++++++");

30 }

31 }

32

33 public static void main(String[] args) throws InterruptedException {

34 new Thread(new Runnable() {

35 @Override

36 public void run() {

37 System.out.println("Thread:"+Thread.currentThread().getName()+"初始化111");

38 countDownLatch.countDown();

39 System.out.println("Thread:"+Thread.currentThread().getName()+"初始化222");

40 countDownLatch.countDown();

41 }

42 }).start();

43 new Thread(new BussThread()).start();

44 for(int i=0;i<=3;i++){

45 Thread thread=new Thread(new initThread());

46 thread.start();

47 }

48

49 countDownLatch.await();

50 System.out.println("Main work finish");

51 }

52

53 }

CountDownLatch

 

 

5.CyclicBarrier(屏障):让一组线程到达一个屏障时被阻塞,直到最后一个线程到达屏障时才被放行,所有被屏障拦截的线程才会继续执行。可以在一组线程到达屏障时,优先执行某一个操作,方便处理更复杂的业务逻辑。关于闭锁和屏障可以参考JAVA并发 闭锁、同步屏障、信号量总结。

 

线程---线程安全

1.怎么能够保证线程安全:让类不可变,volatile保证内存可见性,加锁,CAS类控制线程的执行顺序。

2.死锁:是指两个或者两个以上的进程在执行的过程中,由于资源竞争或者彼此通信而造成的一种拥塞现象,如果没有外力的情况下,将一直保持这种状态。怎么解决死锁?控制线程之间的加锁顺序。

3.活锁:在try-Local模式下造成的情况。在try-Local中,如果线程已经锁定了A锁,在尝试获取B锁的时候,发现已经被占有,于是释放自己占有的A锁;在此同时另一个线程已经锁定了B锁,在尝试获取A锁的时候发现已经被占有,于是释放自己占有的B锁,于是两个线程都在相互等待,这种称为活锁。

4.线程饥饿:在线程优先级的情况下,有可能出现这种情况,优先级高的线程一直持有锁,优先级低的线程一直都会得不到锁,就是线程饥饿。

5.性能:在加锁的情况上,尽量只加锁关键性代码,尽量不要对没必要的代码段加锁。

显示锁Lock接口:

①、尝试非阻塞获取锁,②、可以被中断的获取锁,③、超时获取锁。(synchronized 是隐式锁,固化了加锁和释放锁)。

Lock接口的核心方法:lock()加锁,unlock()释放锁,trylock()有一个返回值,如果获取到锁返回true,否则返回false;

Lock的实现ReentrantLock和标准用法:排它锁:就是在任何情况下同一时刻只允许一个线程允许访问,例子如下:

ReadWriteLock接口和读写锁ReentrantReadWriteLock:因为在排他锁,在任何情况下,同一时刻都只允许一个线程访问,但是在很多情况下,读多写少的情况下就非常的低效,因为读可以不用加锁,所以就出现了读写锁,他读写分离。同时都是读的 时候他们都是并发的执行。

如下来对比Synchronized和ReentrantReadWriteLock的效率对比:

Synchronized加锁:

 

运行效率如下:

 

ReentrantReadWriteLock锁:

 

运行的效率:

两者的对比可知,在读多写少的情况下,ReentrantReadWriteLock锁更高效。

代码有点多:如果要源代码访问github:https://github.com/fireshoot/LearningAndTest/tree/master/src/main/java/Thread/UserThread。在rw包下的就是这部分的代码,整个java包下是本篇博客的代码。

Condition接口

  Condition接口提供方法,signal()/signalAll()和await(),这两个方法的作用和wait-notify一样。

使用如下:

 

 1 public class ConditionClass {

2 Lock kmLock=new ReentrantLock();

3 Condition condition1=kmLock.newCondition();

4

5 Lock siteLock=new ReentrantLock();

6 Condition condition2=siteLock.newCondition();

7

8 private String name;

9 private int km;

10 private String site;

11

12 public void setKm(int km) {

13 kmLock.lock();

14 try{

15 this.km = km;

16 condition1.signal();

17 }finally {

18 kmLock.unlock();

19 }

20 }

21

22 public void setSite(String site) {

23 siteLock.lock();

24 try{

25 this.site = site;

26 condition2.signal();

27 }finally {

28 siteLock.unlock();

29 }

30 }

31

32 public void changeKm() throws InterruptedException {

33 kmLock.lock();

34 try{

35 while (this.km<100){

36 condition1.await();

37 //.....

38 }

39 //......

40 }finally {

41 kmLock.unlock();

42 }

43 }

44

45 public void changeSite() throws InterruptedException {

46 siteLock.lock();

47 try{

48 while (site.equals("BeiJing")){

49 condition2.await();

50 }

51 }finally {

52 siteLock.unlock();

53 }

54 }

55

56 public ConditionClass(String name, int km, String site) {

57 this.name = name;

58 this.km = km;

59 this.site = site;

60 }

61

62 public int getKm() {

63 return km;

64 }

65

66 public String getSite() {

67 return site;

68 }

69 }

Condition的使用

 

 

 CAS(Compare And Swap)

即比较并替换:三个运算符:一个内存地址V,一个期望的旧值A,一个新值B;在内存地址V上,在期望旧值B的基础上,将新值写入地址V上。

是一种实现并发算法时常用到的技术,Java并发包中的很多类都使用了CAS技术。它能够保证基本类型的变化具有原子性。比如我们常常见到的 i++;这条语句,在平时的开发中并没有感觉原子性的问题,但是在多线程的情况下i++并不具有原子性,i++首先取值,再写值,在并发的条件下有可能已经被多个线程写值了,导致i++增加的异常。怎么解决比如i++的原子性呢,那么就是用CAS的AtomicInteger来代替int,在i++的时候,首先检测该地址上的期望旧值是不是我们i++之前的值,如果是,那么就执行++操作;如果不是,说明其他线程对此已经做出了修改,报异常。

 

CAS存在的问题:

①:ABA问题:内存计算V本来就是A值,但是在其中变化成了B值,但是最后又变化成A,在运行检查的时候发现期望旧值并没有改变,于是进行了操作。解决方法:版本号。

②:往往只能保证一个共享变量的原子操作。

相关的原子操作类:

  1. 更新基本类型:AtomicBoolearn,AtomicInteger,AtomicLong,AtomicReference。
  2. 更新数组类型:AtomicIntegerArray,AtomicLongArray,AtomicReferenceArray。
  3. 更新引用类型:AtomicReference,AtomicMarkableReference,AtomicStampReference。
  4. 原子更新字段类:AtomicReferenceFieldUpdater,AtumicIntegerFieldUpdater,AtomicLongFieldUpdater。

结语

对线程并发编程的基础知识的复习,还对①加深了对 wait-notify/notifyAll的理解和使用;②Fork-Join框架下ForkJoinTask(RecursiveTask,RecursiveAction)和ForkJoinPool的搭配使用,并且实现搜寻文件数量和指定文件的实现方式;③对CountDownLatch、CyclicBarrier、Semaphore、Exchange的复习;④:对显示锁Lock、ReentrantLock、读写锁ReadWriteLock、ReentrantReadWriteLock的使用,读写锁和Synchronized关键字在读多写少的效率比较;⑤ Condition的使用,和Wait-notify的区别;⑥:CAS。

关于本篇博客的源码见github地址:https://github.com/fireshoot/LearningAndTest。注意:我是使用的IDEA开发的,如果导入失败直接复制包类就行了

 

 

 

转载标明地址:java并发深入学习总结

 

 

以上是 java并发深入--通信、forkJoin框架、线程安全的学习 的全部内容, 来源链接: utcz.com/z/393816.html

回到顶部