自定义线程池的阻塞策略
自定义线程池的阻塞策略
ThreadPoolExecutor,常见构造如下:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
corePoolSize: 线程池维护线程的最少数量
maximumPoolSize:线程池维护线程的最大数量
keepAliveTime: 线程池维护线程所允许的空闲时间
unit: 线程池维护线程所允许的空闲时间的单位
workQueue: 线程池所使用的缓冲队列
handler: 线程池对拒绝任务的处理策略
正式使用中一般都会设置一个最大缓冲队列容量,如果线程池满它会对继续添加的任务线程执行指定的拒绝策略,ThreadPoolExcetor 的最后一个参数指定了拒绝策略,JDK提供了四种拒绝策略:
AbortPolicy 策略、CallerRunsPolicy策略、 DiscardOledestPolicy策略、DiscardPolicy策略。
AbortPolicy策略:该策略会直接抛出异常,阻止系统正常工作。
CallerRunsPolicy 策略:只要线程池未关闭,该策略直接在调用者线程中,运行当前的被丢弃的任务。
DiscardOleddestPolicy策略: 该策略将丢弃最老的一个请求,也就是即将被执行的任务,并尝试再次提交当前任务。
DiscardPolicy策略:该策略默默的丢弃无法处理的任务,不予任何处理。
可以看到默认提供的四种策略似乎都不太友好,要么放弃要么抛异常,而直接在调用者线程中执行或许也不是你想要的,因为它破坏了线程的执行顺序。
有时候我们需要保证任务添加不会失败,并且只要被添加的任务能依次顺序执行就好了,而不需要这个添加动作立即响应,即让线程池等待池中的任务完成后再继续添加新任务,此时JDK提供的四种策略无法满足需求。
现在的场景我需要使队列阻塞,如果队列满了就一直阻塞。
自定义ThreadPoolExecutor 类
package com.montnets.task;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 自定义阻塞型线程池 当池满时会阻塞任务提交
*
* @ClassName: BlockThreadPool
* @Description: TODO
* @author: wangs
* @date: 2018-1-24 下午5:24:54
*/
public class BlockThreadPool {
private ThreadPoolExecutor pool = null;
public BlockThreadPool(int poolSize) {
pool = new ThreadPoolExecutor(poolSize, poolSize, 0, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(5), new CustomThreadFactory(),
new CustomRejectedExecutionHandler());
}
public void destory() {
if (pool != null) {
pool.shutdownNow();
}
}
private class CustomThreadFactory implements ThreadFactory {
private AtomicInteger count = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
String threadName = BlockThreadPool.class.getSimpleName() + count.addAndGet(1);
t.setName(threadName);
return t;
}
}
private class CustomRejectedExecutionHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
// 核心改造点,由blockingqueue的offer改成put阻塞方法
executor.getQueue().put(r);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public void execute(Runnable runnable) {
this.pool.execute(runnable);
}
// 测试构造的线程池
public static void main(String[] args) {
BlockThreadPool pool = new BlockThreadPool(3);
for (int i = 1; i < 100; i++) {
System.out.println("提交第" + i + "个任务!");
pool.execute(new Runnable() {
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getId() + "=====开始");
TimeUnit.SECONDS.sleep(10);
System.out.println(Thread.currentThread().getId() + "=====【结束】");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
System.out.println("【提交第" + i + "个任务成功!】");
}
// 2.销毁----此处不能销毁,因为任务没有提交执行完,如果销毁线程池,任务也就无法执行了
// exec.destory();
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
默认的策略源码:
public static class AbortPolicy implements RejectedExecutionHandler {
/**
* Creates an {@code AbortPolicy}.
*/
public AbortPolicy() { }
/**
* Always throws RejectedExecutionException.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
* @throws RejectedExecutionException always
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
所有的策略都是实现 RejectedExecutionHandler 类;
例如: 在此处继续将线程放入队列,put方法会阻塞,不了解的的可以看一下队列的核心方法。
public class MyRejectHasnler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
// 核心改造点,由blockingqueue的offer改成put阻塞方法
executor.getQueue().put(r);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
以上是 自定义线程池的阻塞策略 的全部内容, 来源链接: utcz.com/z/519176.html