自定义线程池的阻塞策略

编程

自定义线程池的阻塞策略
ThreadPoolExecutor,常见构造如下:

 public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue workQueue,
                              RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), handler);
    }

corePoolSize: 线程池维护线程的最少数量
maximumPoolSize:线程池维护线程的最大数量
keepAliveTime: 线程池维护线程所允许的空闲时间
unit: 线程池维护线程所允许的空闲时间的单位
workQueue: 线程池所使用的缓冲队列
handler: 线程池对拒绝任务的处理策略
正式使用中一般都会设置一个最大缓冲队列容量,如果线程池满它会对继续添加的任务线程执行指定的拒绝策略,ThreadPoolExcetor 的最后一个参数指定了拒绝策略,JDK提供了四种拒绝策略:
AbortPolicy 策略、CallerRunsPolicy策略、 DiscardOledestPolicy策略、DiscardPolicy策略。

AbortPolicy策略:该策略会直接抛出异常,阻止系统正常工作。

CallerRunsPolicy 策略:只要线程池未关闭,该策略直接在调用者线程中,运行当前的被丢弃的任务。

DiscardOleddestPolicy策略: 该策略将丢弃最老的一个请求,也就是即将被执行的任务,并尝试再次提交当前任务。

DiscardPolicy策略:该策略默默的丢弃无法处理的任务,不予任何处理。

  可以看到默认提供的四种策略似乎都不太友好,要么放弃要么抛异常,而直接在调用者线程中执行或许也不是你想要的,因为它破坏了线程的执行顺序。

有时候我们需要保证任务添加不会失败,并且只要被添加的任务能依次顺序执行就好了,而不需要这个添加动作立即响应,即让线程池等待池中的任务完成后再继续添加新任务,此时JDK提供的四种策略无法满足需求。

现在的场景我需要使队列阻塞,如果队列满了就一直阻塞。
自定义ThreadPoolExecutor 类

package com.montnets.task;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 自定义阻塞型线程池 当池满时会阻塞任务提交
 * 
 * @ClassName: BlockThreadPool
 * @Description: TODO
 * @author: wangs
 * @date: 2018-1-24 下午5:24:54
 */
public class BlockThreadPool {

    private ThreadPoolExecutor pool = null;
    
    public BlockThreadPool(int poolSize) {
        pool = new ThreadPoolExecutor(poolSize, poolSize, 0, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(5), new CustomThreadFactory(),
                new CustomRejectedExecutionHandler());
    }
    
    public void destory() {
        if (pool != null) {
            pool.shutdownNow();
        }
    }

    private class CustomThreadFactory implements ThreadFactory {
        private AtomicInteger count = new AtomicInteger(0);
        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r);
            String threadName = BlockThreadPool.class.getSimpleName() + count.addAndGet(1);
            t.setName(threadName);
            return t;
        }
    }

    private class CustomRejectedExecutionHandler implements RejectedExecutionHandler {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            try {
                // 核心改造点,由blockingqueue的offer改成put阻塞方法
                executor.getQueue().put(r);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public void execute(Runnable runnable) {
        this.pool.execute(runnable);
    }

    // 测试构造的线程池
    public static void main(String[] args) {
        BlockThreadPool pool = new BlockThreadPool(3);
        for (int i = 1; i < 100; i++) {
            System.out.println("提交第" + i + "个任务!");
            pool.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        System.out.println(Thread.currentThread().getId() + "=====开始");
                        TimeUnit.SECONDS.sleep(10);
                        System.out.println(Thread.currentThread().getId() + "=====【结束】");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
            System.out.println("【提交第" + i + "个任务成功!】");
        }

        // 2.销毁----此处不能销毁,因为任务没有提交执行完,如果销毁线程池,任务也就无法执行了
        // exec.destory();
        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}


默认的策略源码:
public static class AbortPolicy implements RejectedExecutionHandler {
        /**
         * Creates an {@code AbortPolicy}.
         */
        public AbortPolicy() { }

        /**
         * Always throws RejectedExecutionException.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         * @throws RejectedExecutionException always
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
    }

所有的策略都是实现    RejectedExecutionHandler 类;
例如: 在此处继续将线程放入队列,put方法会阻塞,不了解的的可以看一下队列的核心方法。
public class MyRejectHasnler implements RejectedExecutionHandler {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        try {
            // 核心改造点,由blockingqueue的offer改成put阻塞方法
            executor.getQueue().put(r);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
    


 

以上是 自定义线程池的阻塞策略 的全部内容, 来源链接: utcz.com/z/519176.html

回到顶部