JDK中线程池满后再放入队列

编程

 

    JDK中ThreadPoolExecutor有coreSize、maxSize,只有当线程数到coreSize且队列满后才会增加线程数到maxSize.

    想要达到的效果是线程数到maxSize后再放入队列。

方案一

    覆写ThreadPoolExecutor的execute()

    List-1

public void execute(Runnable command) {

if (command == null)

throw new NullPointerException();

//workerCountOf()获取线程数,线程worker数只是int类型二进制位的前几位

int c = ctl.get();

//1

if (workerCountOf(c) < corePoolSize) {

if (addWorker(command, true))

return;

c = ctl.get();

}

//2

if (isRunning(c) && workQueue.offer(command)) {

int recheck = ctl.get();

if (! isRunning(recheck) && remove(command))

reject(command);

else if (workerCountOf(recheck) == 0)

addWorker(null, false);

}

//3

else if (!addWorker(command, false))

reject(command);

}

线程数的表示,是用int类型的二进制的左边几位标示的,所以才需要调用workCountOf()方法来获取

  1. 如果线程数小于coreSize,那么增加worker线程
  2. 将任务放入到队列中,如果成功表示队列没有满
  3. 队列满了,此时尝试去增加worker线程数,让worker线程数达到maxSize

    如果我们自己定义一个CustomThreadPoolExecutor,然后覆写execute(),那么你会发现ctl、workerCountOf、addWorker都是private的,子类上根本访问不了,所以这个方案是不行的

方案二

    自己定义一个CustomThreadPoolExecutor,之后将JDK中ThreadPoolExecutor中的内容全部拷贝过来,之后再改写execute()的实现,但是这个成本很大。

    最重要的,连拒绝策略也不能使用JDK里面的了,因为如下List-2所示第二个参数是ThreadPoolExecutor

    List-2

public interface RejectedExecutionHandler {

void rejectedExecution(Runnable var1, ThreadPoolExecutor var2);

}

    此方案不行

方案三

  1. 自定义queue,改写offer逻辑
  2. 覆写ThreadPoolExecutor的execute()

    List-3

public class TaskQueue<R extends Runnable> extends ArrayBlockingQueue<Runnable> {

private EagerThreadPoolExecutor executor;

public TaskQueue(int size, boolean fair) {

super(size, fair);

}

@Override

public boolean offer(Runnable runnable) {

if (executor == null) {

throw new RejectedExecutionException("The task queue does not have executor!");

}

int currentPoolThreadSize = executor.getPoolSize();

// have free worker. put task into queue to let the worker deal with task.

//1

if (executor.getSubmittedTaskCount() < currentPoolThreadSize) {

return super.offer(runnable);

}

// return false to let executor create new worker.

//2

if (currentPoolThreadSize < executor.getMaximumPoolSize()) {

return false;

}

// currentPoolThreadSize >= max

//3

return super.offer(runnable);

}

/**

* retry offer task

*

* @param o task

* @return offer success or not

* @throws RejectedExecutionException if executor is terminated.

*/

public boolean retryOffer(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {

if (executor.isShutdown()) {

throw new RejectedExecutionException("Executor is shutdown!");

}

return super.offer(o, timeout, unit);

}

public void setExecutor(EagerThreadPoolExecutor executor){

this.executor = executor;

}

}

TaskQueue的offer方法是,放入队列时被调用

  1. 如果当前提交的task数少于线程池中线程是数量,那么直接调用父类的offer,将task放入队列,不新建线程,因此此时肯定有空闲的线程
  2. 此时线程池中没有空闲的线程,而且线程数量少于设置的maxSize,此时返回false,让线程池去创建新的线程
  3. 此时线程数量大于等于maxSize,将task放入任务队列中

EagerThreadPoolExecutor继承ThreadPoolExecutor,改写execute()的实现:

    List-4

public class EagerThreadPoolExecutor extends ThreadPoolExecutor {

private final AtomicInteger submittedTaskCount = new AtomicInteger(0);

public EagerThreadPoolExecutor(int coreSize, int maxSize, long l, TimeUnit timeUnit, BlockingQueue<Runnable> blockingQueue, ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {

super(coreSize, maxSize, l, timeUnit, blockingQueue, threadFactory, rejectedExecutionHandler);

}

@Override

protected void beforeExecute(Thread thread, Runnable runnable) {

//可以结合afterExecute统计执行耗时

}

@Override

public void execute(Runnable command) {

if (command == null) {

throw new NullPointerException();

}

// do not increment in method beforeExecute!

//1

submittedTaskCount.incrementAndGet();

try {

super.execute(command);

} catch (RejectedExecutionException rx) {

// retry to offer the task into queue.

final TaskQueue queue = (TaskQueue) super.getQueue();

try {

//2

if (!queue.retryOffer(command, 0, TimeUnit.MILLISECONDS)) {

submittedTaskCount.decrementAndGet();

throw new RejectedExecutionException("Queue capacity is full.", rx);

}

} catch (InterruptedException x) {

submittedTaskCount.decrementAndGet();

throw new RejectedExecutionException(x);

}

} catch (Throwable t) {

// decrease any way

//3

submittedTaskCount.decrementAndGet();

throw t;

}

}

@Override

protected void afterExecute(Runnable runnable, Throwable throwable) {

//ThreadPoolExecutor的勾子方法,在task执行完后需要将池中已提交的任务数 - 1

//afterExecute和beforeExecute是在runWorker中调用,即使有异常,也不会抛出RejectedExecutionException异常

submittedTaskCount.decrementAndGet();

if (throwable!=null) {

LOG.error("线程池中线程执行出错", throwable);

}

}

public int getSubmittedTaskCount() {

return submittedTaskCount.get();

}

}

  1. execute()开始时,将提交的任务数加1,之后调用父类的execute()方法,如List-1中,当线程数达到coreSize后,就会调用queue.offer(),即List-3中的offer(),我们会判断线程数是否少于maxSize,如果少于那么返回false,之后ThreadPoolExecutor.execute()方法会去新增线程
  2. 2处如果被拒绝了,说明队列满了而且线程数达到了maxSize,此时我们再重试一次,将task放入队列中,为什么还要重试一次?List-3中offer中做了一些操作,有可能这期间队列就有空了,所以要重试下。
  3. 3处捕获到任何的异常,之后将task数减去1,3处的Throwable是捕获不到2处抛出的RejectedExecutionException的

    为什么afterExecute()方法中还要将task数减去1呢?

ThreadPoolExecutor中,beforeExecute()和afterExecute()是在runWorker的run()中被调用的,分别在Runnable.run()的前后被调用,而且线程池中抛出异常,在线程池外面是捕获不到的,所以外面需要的afterExecute()中将task数减去1

    改进:我们可以将List-4中使用的AtomicInteger改为JDK8的LongAddr以提高性能

Reference

  1. https://github.com/apache/dubbo/blob/fb5761683729f63c715e39c78a8b7b75278050c9/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/support/eager/EagerThreadPoolExecutor.java

以上是 JDK中线程池满后再放入队列 的全部内容, 来源链接: utcz.com/z/518468.html

回到顶部