【Java多线程系列七】ExecutorService

java

java.util.concurrent.ExecutorService接口提供了许多线程管理的方法

Method说明
shutdown拒绝接收新的任务,待已提交的任务执行后关闭,且宿主线程不阻塞,若需要阻塞可借助awaitTermination实现
shutdownNow停止所有正在执行的任务,挂起未执行的任务并关闭,且宿主线程不阻塞,若需要阻塞可借助awaitTermination实现
awaitTermination当发生shutdown时,阻塞宿主线程直到约定的时间已过或者所有任务完成
submit提交任务Callable/Runnable,可利用Future的get()方法使宿主线程阻塞直到任务结束后返回结果

有了以上方法,便可以基于此接口实现线程池的各种功能(例如java.util.concurrent.ThreadPoolExecutor/java.util.concurrent.ScheduledThreadPoolExecutor),以java.util.concurrent.ThreadPoolExecutor为例,其参数的详解

NameType说明
corePoolSizeint线程池中最小的线程数
maximumPoolSizeint线程池中最大的线程数
keepAliveTimelong线程空闲时间,若线程数大于corePoolSize,空闲时间超过该值的线程将被终止回收
unitTimeUnitkeepAliveTime的时间单位
workQueueBlockingQueue<Runnable>已提交但未执行的任务队列
threadFactoryThreadFactory创建新线程的工厂
handlerRejectedExecutionHandler当线程池或队列达到上限拒绝新任务抛出异常时的处理类

同时,java.util.concurrent.Executors类提供的常用方法有

Method说明基类
newFixedThreadPool线程池中含固定数量的线程基于java.util.concurrent.ThreadPoolExecutor类
newSingleThreadExecutor线程池中仅含一个工作线程
newCachedThreadPool按需创建线程,若线程池中无可用线程,则创建新的线程并加入,直到线程数达到上限值(Integer.MAX_VALUE)
newWorkStealingPool按照可用CPU数创建线程池基于java.util.concurrent.ForkJoinPool类

java.util.concurrent.ForkJoinPool类是Fork/Join框架的实现类,Fork/Join框架是Java7提供了的一个用于并行执行任务的框架, 是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架,该类在有递归实现的场景有更优异的表现。

package com.concurrent.test;

import java.util.Arrays;

import java.util.Date;

import java.util.List;

import java.util.concurrent.Callable;

import java.util.concurrent.ExecutionException;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import java.util.concurrent.ForkJoinPool;

import java.util.concurrent.Future;

import java.util.concurrent.TimeUnit;

import org.junit.Assert;

import org.junit.Test;

/**

* 测试ExecutorService

*/

public class ThreadExecutorServiceTest {

private static final String THIS_IS_SHUTDOWN_WITH_AWAIT_TERMINATION = "This is shutdownWithAwaitTermination";

private static final int RESULT = 111;

private static boolean submitRunable() throws InterruptedException, ExecutionException {

ExecutorService executorService = Executors.newSingleThreadExecutor();

Future<?> future = executorService.submit(new Runnable() {

@Override

public void run() {

System.out.println("This is submitRunnable");

}

});

return future.get() == null;

}

private static Integer submitRunnableWithResult() throws InterruptedException, ExecutionException {

ExecutorService executorService = Executors.newSingleThreadExecutor();

Future<Integer> future = executorService.submit(new Runnable() {

@Override

public void run() {

System.out.println("This is submitRunnableWithResult");

}

}, RESULT);

return future.get();

}

private static Integer submitBlockCallable() throws InterruptedException, ExecutionException {

ExecutorService executorService = Executors.newSingleThreadExecutor();

Future<Integer> future = executorService.submit(new Callable<Integer>() {

@Override

public Integer call() throws Exception {

System.out.println("This is submitBlockCallable");

return RESULT;

}

});

return future.get(); //阻塞

}

private static boolean submitNonBlockCallable() {

ExecutorService executorService = Executors.newSingleThreadExecutor();

Future<Integer> future = executorService.submit(new Callable<Integer>() {

@Override

public Integer call() throws Exception {

System.out.println("This is submitNonBlockCallable");

return RESULT;

}

});

while (!future.isDone()) {// 非阻塞

System.out.println(new Date());

}

return future.isDone();

}

private static String shutdown() {

ExecutorService executorService = Executors.newSingleThreadExecutor();

final StringBuilder sb = new StringBuilder();

executorService.submit(new Callable<Integer>() {

@Override

public Integer call() throws Exception {

Thread.sleep(10000);

sb.append("This is shutdown");

return RESULT;

}

});

executorService.shutdown();

return sb.toString();

}

private static String shutdownWithAwaitTermination() throws InterruptedException {

ExecutorService executorService = Executors.newSingleThreadExecutor();

final StringBuilder sb = new StringBuilder();

executorService.submit(new Callable<Integer>() {

@Override

public Integer call() throws Exception {

Thread.sleep(10000);

sb.append(THIS_IS_SHUTDOWN_WITH_AWAIT_TERMINATION);

return RESULT;

}

});

executorService.shutdown();

executorService.awaitTermination(Integer.MAX_VALUE, TimeUnit.DAYS);

return sb.toString();

}

private static int testForkJoinPool(List<Integer> list) throws InterruptedException, ExecutionException {

ForkJoinPool forkJoinPool = new ForkJoinPool(8);

Future<Integer> future = forkJoinPool.submit(new SumTask(list));

return future.get();

}

@Test

public void test() throws InterruptedException, ExecutionException {

Assert.assertTrue(submitRunable());

Assert.assertEquals(RESULT, submitRunnableWithResult().intValue());

Assert.assertEquals(RESULT, submitBlockCallable().intValue());

Assert.assertTrue(submitNonBlockCallable());

Assert.assertTrue(shutdown().isEmpty());

Assert.assertEquals(THIS_IS_SHUTDOWN_WITH_AWAIT_TERMINATION, shutdownWithAwaitTermination());

Assert.assertEquals(10, testForkJoinPool(Arrays.asList(new Integer[] { 1, 2, 3, 4 })));

Assert.assertEquals(49, testForkJoinPool(Arrays.asList(new Integer[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 })));

Assert.assertEquals(60, testForkJoinPool(Arrays.asList(new Integer[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11 })));

}

}

 SumTask类如下:

package com.concurrent.test;

import java.util.List;

import java.util.concurrent.RecursiveTask;

public class SumTask extends RecursiveTask<Integer> {

/**

*

*/

private static final long serialVersionUID = -5468389855825594053L;

private List<Integer> list;

public SumTask(List<Integer> list) {

this.list = list;

}

/*

* Ensure it is necessary to divide the job to parts and finish them separately

*/

@Override

protected Integer compute() {

int rtn , size = list.size();

if (size < 10) {

rtn = sum(list);

}

else{

SumTask subTask1 = new SumTask(list.subList(0, size /2));

SumTask subTask2 = new SumTask(list.subList(size /2 + 1, size));

subTask1.fork();

subTask2.fork();

rtn = subTask1.join() + subTask2.join();

}

return rtn;

}

private int sum(List<Integer> list) {

int sum = 0;

for (Integer integer : list) {

sum += integer;

}

return sum;

}

}

以上是 【Java多线程系列七】ExecutorService 的全部内容, 来源链接: utcz.com/z/392775.html

回到顶部