JAVA(JDK8)线程池实现分析

编程

Worker:执行用户任务的线程主任务,它继承了AbstractQueuedSynchronizer,并实现了Runnable接口,所有用户提交的任务都是在Worker的run方法中被调用run()方法后得到执行的。Worker定义如下:

    /**

* 类 Worker主要维护线程运行任务的中断控制状态,以及其他次要簿记。 此类适时地扩展了AbstractQueuedSynchronizer以简化获取和释放围绕每个任务执行的锁。

* 这样可以防止旨在唤醒等待任务的工作线程而不是中断正在运行的任务的中断。 我们实现了一个简单的非可重入互斥锁,而不是使用ReentrantLock,

* 因为我们不希望辅助任务在调用诸如setCorePoolSize之类的池控制方法时能够重新获取该锁。 此外,为了抑制直到线程真正开始运行任务之前的中断,

* 我们将锁定状态初始化为负值,并在启动时将其清除(在runWorker中)。

*/

private final class Worker extends AbstractQueuedSynchronizer implements Runnable;

workers:存放worker实例的集合,也就是线程池中存放工作线程的容器。它的定义如下 :

    /**

* 存放线程池中的所有工作线程的集合。 访问时需要获得mainLock锁。

*/

private final HashSet<Worker> workers = new HashSet<Worker>();

workQueue:用于保存待移交给工作线程执行的任务队列,它的定义如下:

    /**

* 用于保留任务并移交给工作线程的队列。 我们不要求workQueue.poll()返回null必然意味着workQueue.isEmpty(),

* 因此仅依靠isEmpty来查看队列是否为空(例如,在确定是否从SHUTDOWN过渡到TIDYING时必须这样做)。 。

* 这可容纳特殊用途的队列,例如DelayQueues,允许poll()返回null,即使它在延迟到期后稍后可能返回non-null。

*/

private final BlockingQueue<Runnable> workQueue;

线程池中的几个主要主要参数:

  • corePoolSize:核心线程数,当没有待执行任务时,核心线程池继续驻留线程池,不销毁。当然,也可以设置核心线程一个存活时间,不一直驻留。
  • maximumPoolSize:最大线程数,即线程池开启的最大线程数量。
  • keepAliveTime:保活时间,即当任务线程没有任务执行后,存活多长时间,过了这个时间将会被销毁。
  • unit:保活时间参数的单位,如毫秒、秒等。
  • workQueue:任务队列,也就是上面说的“用于保存待移交给工作线程执行的任务队列”。
  • threadFactory:创建线程(Thread)的工厂类,工厂的主要工作是设置线程分组、线程名称、是否使用守护线程、线程优先级等于线程相关的内容,默认使用Executors工具类中的DefaultThreadFactory工厂类。
  • handler:拒绝策略处理器,即当任务队列已满,无线程可以处理时,处理后续提交的任务的处理器。默认使用ThreadPoolExecutor类中的AbortPolicy处理器,它的处理策略是直接抛出异常。

线程池的执行过程:

①、用户通过execute()方法或submit()方法向线程池提交任务,execute()方法会执行几个判断操作:

  • 1)、如果当前工作线程数小于核心线程数,则调用addWorker()方法,创建一个worker实例,worker实例是一个Runnable实例,它的内部会存放一个Thread,然后将worker自己提交给这个Thread去执行,并将用户提交的任务作为第一个任务执行。从这里可以看出,worker实例是一个工作线程执行的主任务,这个woker实例的run方法中执行了一个循环处理用户任务的方法,该方法会执行第一个方法并循环地从WorkQueue中获取用户任务并执行。获取任务的getTask()中判断根据是否设置“允许核心线程超时”或工作线程数大于核心线程数,调用workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)方法 或 workQueue.take()方法从队列中获取任务,若没有获取到任务则会调用await()方法 或 awaitNanos(nanos)方法阻塞等待,实际也就是调用的AQS中的相关方法,最终使用的是LockSupport中的park、unpark等方法实现的阻塞等待,超时后将会将worker实例销毁。
  • 2)、如果当前工作线程数大于核心线程数,则将任务放入到任务队列(WorkQueue)队尾,若放入队尾操作成功,则执行完成;若放入队尾操作不成功,如队列已满等情况,则尝试再创建一个worker实例,也就是创建一个工作线程。若创建新的worker成功,则将任务作为第一个任务交给线程执行,执行完后并继续从任务队列中获取任务继续执行。
  • 3)、如果放入任务队列失败,并尝试创建新的工作线程失败,则调用设置的拒绝策略的RejectedExecutionHandler实例的rejectedExecution()方法执行拒绝处理。

注:ThreadPoolExecutor 有两个生命周期的切入点,即 beforeExecute(Thread t, Runnable r) 和 afterExecute(Runnable r, Throwable t),即用户任务被执行前,和用户任务被执行完后,可以通过继承ThreadPoolExecutor覆写这两个方法来执行需要切入的操作,比如在afterExecute()方法中执行重置ThreadLocal的数据重置等操作。

线程池实现的主要源码:

execute()方法源码

    public void execute(Runnable command) {

if (command == null)

throw new NullPointerException();

int c = ctl.get();

if (workerCountOf(c) < corePoolSize) { // 判断当前工作线程数量是否小于核心线程数

if (addWorker(command, true)) // 工作线程数未达到核心线程数,直接创建新的工作线程,并将当前任务作为第一个任务进行执行

return;

c = ctl.get();

}

if (isRunning(c) && workQueue.offer(command)) { // 判断线程池状态并将任务放到任务队列队尾

int recheck = ctl.get();

if (!isRunning(recheck) && remove(command)) // 再次进行状态判断和对应操作

reject(command);

else if (workerCountOf(recheck) == 0)

addWorker(null, false); // 新创建工作线程

} else if (!addWorker(command, false)) // 尝试创建新工作线程

reject(command); // 尝试创建新工作线程失败,执行拒绝策略

}

addWorker()方法源码

    private boolean addWorker(Runnable firstTask, boolean core) {

// 省略部分代码...

boolean workerStarted = false;

boolean workerAdded = false;

Worker w = null;

try {

w = new Worker(firstTask);

final Thread t = w.thread;

if (t != null) {

final ReentrantLock mainLock = this.mainLock;

mainLock.lock();

try {

int rs = runStateOf(ctl.get());

if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {

// 省略部分代码...

workers.add(w); // 将worker实例放入到workers这个HashSet容器中

// 省略部分代码...

workerAdded = true;

}

} finally {

mainLock.unlock();

}

if (workerAdded) {

t.start(); // 启动线程

workerStarted = true;

}

}

} finally {

if (!workerStarted)

addWorkerFailed(w); // 添加工作线程失败

}

return workerStarted;

}

Worker对象的源码

    private final class Worker extends AbstractQueuedSynchronizer implements Runnable {

/**

* 此工作程序正在其中运行的线程。如果工厂失败,则为null。

*/

final Thread thread;

/**

* 要运行的初始任务。 可能为null。

*/

Runnable firstTask;

/**

* 每线程任务计数器

*/

volatile long completedTasks;

/**

* worker的构造方法

*/

Worker(Runnable firstTask) {

setState(-1); // 禁止中断,直到runWorker,这个调用的是AQS的方法,设置AQS的状态

this.firstTask = firstTask;

this.thread = getThreadFactory().newThread(this); // 通过工厂创建Thread,并将当前Worker(Runnable)实例传入Thread中

}

/**

* worker实现Runnable接口的run方法,它会调用ThreadPoolExecutor的runWorker()方法

*/

public void run() {

runWorker(this);

}

// 省略部分代码...

}

runWorker()方法源码

    final void runWorker(com.gelicheng.thread.ThreadPoolExecutor.Worker w) {

Thread wt = Thread.currentThread();

Runnable task = w.firstTask;

w.firstTask = null;

w.unlock(); // allow interrupts

boolean completedAbruptly = true;

try {

while (task != null || (task = getTask()) != null) { // 从任务队列中获取任务

w.lock();

// 判断并执行中断相关操作

if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted())

wt.interrupt();

try {

beforeExecute(wt, task); // 用户任务执行完成前的一个切入点

Throwable thrown = null;

try {

task.run(); // 执行用户任务

} catch (RuntimeException x) {

thrown = x;

throw x;

} catch (Error x) {

thrown = x;

throw x;

} catch (Throwable x) {

thrown = x;

throw new Error(x);

} finally {

afterExecute(task, thrown); // 用户任务执行完成后的一个切入点

}

} finally {

task = null;

w.completedTasks++;

w.unlock();

}

}

completedAbruptly = false;

} finally {

processWorkerExit(w, completedAbruptly); // 销毁worker实例

}

}

getTask()方法源码

    private Runnable getTask() {

boolean timedOut = false; // Did the last poll() time out?

for (; ; ) {

int c = ctl.get();

int rs = runStateOf(c);

// Check if queue empty only if necessary.

if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {

decrementWorkerCount();

return null;

}

int wc = workerCountOf(c);

// Are workers subject to culling?

boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) {

if (compareAndDecrementWorkerCount(c))

return null;

continue;

}

try {

Runnable r = timed ?

workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) // 获取任务,带超时时间,会等待一段时间,若过了超时时间后返回空,然后runWorker中的循环将会结束,worker将被销毁

:

workQueue.take(); // 核心线程从任务队列获取任务方法,不会有超时时间,若获取不到会一直等待

if (r != null)

return r;

timedOut = true;

} catch (InterruptedException retry) {

timedOut = false;

}

}

}

综上所述,可以看出来,线程池相关参数的作用和具体使用的地方;同时还能看出线程池的执行过程;也知道了线程池是如何实现的线程复用的,也知道了线程是如何实现无任务时的阻塞等待的和在哪里实现的。

以上仅个人分析和看法,如有不对之处还望看到的各位不吝赐教,谢谢!

二、再来看看API

(JDK 13)

模块 java.base

软件包 java.util.concurrent

ThreadPoolExecutor类

java.lang.Object

java.util.concurrent.AbstractExecutorService

java.util.concurrent.ThreadPoolExecutor

所有已实现的接口:

Executor, ExecutorService

直接已知子类:

ScheduledThreadPoolExecutor

公共类ThreadPoolExecutor 扩展AbstractExecutorService

一个ExecutorService执行使用可能的几个池线程之一,通常用配置的每个提交的任务Executors工厂方法。

线程池解决了两个不同的问题:由于减少了每个任务的调用开销,它们通常在执行大量异步任务时可提供改进的性能,并且它们提供了一种绑定和管理资源(包括线程)的方法,该资源在执行集合的执行时消耗任务。每个数据库ThreadPoolExecutor还维护一些基本统计信息,例如已完成任务的数量。

为了在广泛的上下文中有用,此类提供了许多可调整的参数和可扩展性挂钩。但是,敦促程序员使用更方便的 Executors工厂方法Executors.newCachedThreadPool()(无限制线程池,具有自动线程回收),Executors.newFixedThreadPool(int) (固定大小的线程池)和Executors.newSingleThreadExecutor()(单个后台线程),它们可以针对最常见的使用情况预先配置设置。否则,在手动配置和调整此类时,请使用以下指南:

核心和最大池大小

ThreadPoolExecutorgetPoolSize()根据corePoolSize(参见getCorePoolSize())和maximumPoolSize(参见getMaximumPoolSize())设置的界限自动调整池大小(参见)。在method中提交新任务时execute(Runnable),如果正在运行的内核线程数少于corePoolSize,则即使其他工作线程处于空闲状态,也会创建一个新线程来处理请求。否则,如果正在运行的线程少于maximumPoolSize,则将仅在队列已满时创建一个新线程来处理请求。通过将corePoolSize和maximumPoolSize设置为相同,可以创建固定大小的线程池。通过将maximumPoolSize设置为一个本质上不受限制的值,例如Integer.MAX_VALUE,则允许池容纳任意数量的并发任务。最典型的是,核心和最大池大小仅在构造时设置,但也可以使用setCorePoolSize(int)和进行动态更改 setMaximumPoolSize(int)

按需提前创建核心线程

默认情况下,甚至只有在有新任务到达时才创建和启动核心线程,但是可以使用方法prestartCoreThread()或来动态覆盖它prestartAllCoreThreads()。如果使用非空队列构造池,则可能要预启动线程。

创建新线程

使用创建新线程ThreadFactory。如果没有另外指定,Executors.defaultThreadFactory()则使用a,它将创建所有线程,并且所有线程都ThreadGroup具有相同的NORM_PRIORITY优先级和非守护程序状态。通过提供不同的ThreadFactory,可以更改线程的名称,线程组,优先级,守护程序状态等。如果在ThreadFactory从返回null时要求创建失败的线程newThread,执行器将继续执行,但可能无法执行任何操作。任务。线程应具有“ modifyThread” RuntimePermission。如果使用该池的工作线程或其他线程不具有此权限,则服务可能会降级:配置更改可能不会及时生效,并且关闭池可能保持在可能终止但未完成的状态。

保活时间

如果当前池中的线程数超过corePoolSize,则多余的线程将在空闲时间超过keepAliveTime时终止(请参阅参考资料getKeepAliveTime(TimeUnit))。当未积极使用池时,这提供了减少资源消耗的方法。如果池稍后变得更加活动,则将构建新线程。也可以使用method动态更改此参数setKeepAliveTime(long, TimeUnit)。使用Long.MAX_VALUE TimeUnit.NANOSECONDS有效的值可以使空闲线程永远不会在关闭之前终止。默认情况下,仅当存在多个corePoolSize线程时,allowCoreThreadTimeOut(boolean)才会使用keep-alive策略,但是只要keepAliveTime值非零,也可以使用方法将超时策略应用于核心线程。

排队

任何BlockingQueue都可以用于传输和保留提交的任务。此队列的使用与池大小交互:

  • 如果正在运行的线程少于corePoolSize线程,则执行程序总是喜欢添加新线程,而不是排队。
  • 如果正在运行corePoolSize或更多线程,则执行程序总是更喜欢对请求进行排队,而不是添加新线程。
  • 如果无法将请求排队,则将创建一个新线程,除非该线程超过了maximumPoolSize,在这种情况下,该任务将被拒绝。

有三种一般的排队策略:

  1. 直接交接。对于工作队列,一个很好的默认选择是将SynchronousQueue任务移交给线程,而不用保留它们。在这里,如果没有立即可用的线程来运行任务,则尝试将任务排队的尝试将失败,因此将构造一个新线程。在处理可能具有内部依赖性的请求集时,此策略避免了锁定。直接切换通常需要无限制的maximumPoolSizes,以避免拒绝新提交的任务。反过来,当平均而言,命令继续以比其处理速度更快的速度到达时,这可能会带来无限线程增长的可能性。
  2. 无限队列。LinkedBlockingQueue当所有corePoolSize线程都忙时,使用无界队列(例如,没有预定义容量的队列)将导致新任务在队列中等待。因此,将只创建corePoolSize线程。(因此,maximumPoolSize的值没有任何作用。)当每个任务完全独立于其他任务时,这可能是适当的,因此任务不会影响彼此的执行。例如,在网页服务器中。尽管这种排队方式对于消除短暂的请求突发很有用,但是它承认当命令平均到达的速度比处理命令的速度快时,工作队列就会无限增长。
  3. 有界队列。ArrayBlockingQueue当与有限的maximumPoolSizes一起使用时,有界队列(例如 )有助于防止资源耗尽,但调优和控制可能会更加困难。队列大小和最大池大小可能会相互折衷:使用大队列和小池可最大程度地减少CPU使用率,OS资源和上下文切换开销,但可能导致人为降低吞吐量。如果任务频繁阻塞(例如,如果它们受I / O限制),则系统可能能够安排比您原先允许的线程更多的时间。使用小队列通常需要更大的池大小,这会使CPU繁忙,但可能会遇到无法接受的调度开销,这也会降低吞吐量。

被拒绝的任务

当执行器关闭时,以及在执行器对最大线程和工作队列容量使用有限范围且已饱和时,execute(Runnable)将 拒绝在method中提交的新任务。无论哪种情况,该execute方法都会调用的RejectedExecutionHandler.rejectedExecution(Runnable, ThreadPoolExecutor) 方法RejectedExecutionHandler。提供了四个预定义的处理程序策略:

  1. 默认情况下ThreadPoolExecutor.AbortPolicy,处理程序RejectedExecutionException在拒绝时抛出运行时。
  2. 在中ThreadPoolExecutor.CallerRunsPolicy,调用execute自身的线程运行任务。这提供了一种简单的反馈控制机制,将降低新任务提交的速度。
  3. 在中ThreadPoolExecutor.DiscardPolicy,简单地删除了无法执行的任务。
  4. 在中ThreadPoolExecutor.DiscardOldestPolicy,如果未关闭执行程序,则将丢弃工作队列开头的任务,然后重试执行(这可能再次失败,从而导致重复执行此操作)。

可以定义和使用其他种类的RejectedExecutionHandler类。这样做需要格外小心,尤其是在设计策略仅在特定容量或排队策略下工作时。

挂钩方法

这个类提供protected重写 beforeExecute(Thread, Runnable)和 afterExecute(Runnable, Throwable)之前和每个任务的执行之后被调用的方法。这些可以用来操纵执行环境。例如,重新初始化ThreadLocals,收集统计信息或添加日志条目。另外,terminated()一旦执行程序完全终止,可以重写方法来执行需要执行的任何特殊处理。

如果钩子,回调或BlockingQueue方法抛出异常,则内部工作线程可能进而失败,突然终止并可能被替换。

队列维护

方法getQueue()允许访问工作队列,以进行监视和调试。强烈建议不要将此方法用于任何其他目的。提供的两个方法, remove(Runnable)并且purge()可在存储回收协助时排队等候的任务大批成为取消。

核心线程驻留设置

这不再是一个程序所引用的线程池 无剩余线程可能会被回收(垃圾回收)没有被明确地关闭。您可以使用零核心线程和/或setting的下限来设置适当的保持活动时间,从而将池配置为允许所有未使用的线程最终死亡allowCoreThreadTimeOut(boolean)

 

扩展示例。此类的大多数扩展都覆盖一个或多个受保护的hook方法。例如,以下是一个子类,它添加了一个简单的暂停/继续功能:

 class PausableThreadPoolExecutor extends ThreadPoolExecutor { private boolean isPaused; private ReentrantLock pauseLock = new ReentrantLock(); private Condition unpaused = pauseLock.newCondition(); public PausableThreadPoolExecutor(...) { super(...); } protected void beforeExecute(Thread t, Runnable r) { super.beforeExecute(t, r); pauseLock.lock(); try { while (isPaused) unpaused.await(); } catch (InterruptedException ie) { t.interrupt(); } finally { pauseLock.unlock(); } } public void pause() { pauseLock.lock(); try { isPaused = true; } finally { pauseLock.unlock(); } } public void resume() { pauseLock.lock(); try { isPaused = false; unpaused.signalAll(); } finally { pauseLock.unlock(); } } }

 

自:JDK 1.5

  • 嵌套类摘要

    嵌套类

    修饰符和类型

    描述

    static class 

    ThreadPoolExecutor.AbortPolicy

    抛出的被拒绝任务的处理程序 RejectedExecutionException

    static class 

    ThreadPoolExecutor.CallerRunsPolicy

    拒绝任务的处理程序,直接在方法的调用线程中运行拒绝任务execute,除非执行器已关闭,在这种情况下,该任务将被丢弃。

    static class 

    ThreadPoolExecutor.DiscardOldestPolicy

    拒绝任务的处理程序,它丢弃最旧的未处理请求,然后重试execute,除非执行器被关闭,在这种情况下,该任务将被丢弃。

    static class 

    ThreadPoolExecutor.DiscardPolicy

    拒绝任务的处理程序,静默丢弃被拒绝的任务。

  • 构造器摘要

    构造方法

    构造方法

    描述

    ThreadPoolExecutor​(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue)

    ThreadPoolExecutor使用给定的初始参数,默认的线程工厂和默认的拒绝执行处理程序创建一个新的。

    ThreadPoolExecutor​(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler)

    ThreadPoolExecutor使用给定的初始参数和 默认线程工厂创建一个新对象。

    ThreadPoolExecutor​(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory)

    ThreadPoolExecutor使用给定的初始参数和默认的拒绝执行处理程序创建一个新的。

    ThreadPoolExecutor​(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)

    ThreadPoolExecutor用给定的初始参数创建一个新的。

  • 方法总结

    所有方法

    修饰符和类型

    方法

    描述

    protected void

    afterExecute​(Runnable r, Throwable t)

    给定Runnable执行完成时调用的方法。

    void

    allowCoreThreadTimeOut​(boolean value)

    设置策略,以控制在保持活动时间内没有任务到达时核心线程是否可能超时并终止,并在新任务到达时根据需要替换。

    boolean

    allowsCoreThreadTimeOut()

    如果此池允许核心线程超时并在keepAlive时间内没有任务到达时终止,则返回true,如果有新任务到达,则在需要时替换该线程。

    protected void

    beforeExecute​(Thread t, Runnable r)

    在给定线程中执行给定Runnable之前调用的方法。

    void

    execute​(Runnable command)

    在将来的某个时间执行给定的任务。

    protected void

    finalize()

    不推荐使用。

    int

    getActiveCount()

    返回正在主动执行任务的线程的大概数量。

    long

    getCompletedTaskCount()

    返回已完成执行的任务的总数。

    int

    getCorePoolSize()

    返回线程的核心数量。

    long

    getKeepAliveTime​(TimeUnit unit)

    返回线程保持活动时间,该时间是线程在终止之前可能保持空闲的时间。

    int

    getLargestPoolSize()

    返回池中曾经同时存在的最大线程数。

    int

    getMaximumPoolSize()

    返回允许的最大线程数。

    int

    getPoolSize()

    返回池中的当前线程数。

    BlockingQueue<Runnable>

    getQueue()

    返回此执行程序使用的任务队列。

    RejectedExecutionHandler

    getRejectedExecutionHandler()

    返回无法执行任务的当前处理程序。

    long

    getTaskCount()

    返回计划执行的任务总数。

    ThreadFactory

    getThreadFactory()

    返回用于创建新线程的线程工厂。

    boolean

    isTerminating()

    如果此执行程序在终止之后shutdown()shutdownNow()尚未完全终止,则返回true 。

    int

    prestartAllCoreThreads()

    启动所有核心线程,使它们空闲地等待工作。

    boolean

    prestartCoreThread()

    启动一个核心线程,使其闲置地等待工作。

    void

    purge()

    尝试从工作队列中删除所有Future 已取消的任务。

    boolean

    remove​(Runnable task)

    如果执行程序的内部队列中存在此任务,则将其删除,如果尚未启动,则导致该任务无法运行。

    void

    setCorePoolSize​(int corePoolSize)

    设置核心线程数。

    void

    setKeepAliveTime​(long time, TimeUnit unit)

    设置线程保持活动时间,该时间是线程在终止之前可以保持空闲状态的时间。

    void

    setMaximumPoolSize​(int maximumPoolSize)

    设置允许的最大线程数。

    void

    setRejectedExecutionHandler​(RejectedExecutionHandler handler)

    为无法执行的任务设置新的处理程序。

    void

    setThreadFactory​(ThreadFactory threadFactory)

    设置用于创建新线程的线程工厂。

    void

    shutdown()

    启动有序关闭,在该关闭中执行先前提交的任务,但不接受任何新任务。

    List<Runnable>

    shutdownNow()

    尝试停止所有正在执行的任务,中止正在等待的任务的处理,并返回正在等待执行的任务的列表。

    protected void

    terminated()

    执行程序终止时调用的方法。

    String

    toString()

    返回标识此池及其状态的字符串,包括运行状态的指示以及估计的工作人员和任务计数。

    在类java.util.concurrent.AbstractExecutorService 中声明的方法。

     newTaskFor, newTaskFor, submit, submit, submit

    在类java.lang.Object中声明的方法。

     clone, equals, getClass, hashCode, notify, notifyAll, wait, wait, wait

    在接口java.util.concurrent.ExecutorService中声明的方法。

     awaitTermination, invokeAll, invokeAll, invokeAny, invokeAny, isShutdown, isTerminated

  • 构造函数详细信息

    • ThreadPoolExecutor

      public  ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime, TimeUnit  unit, BlockingQueue < Runnable > workQueue)

      ThreadPoolExecutor使用给定的初始参数,默认的线程工厂和默认的拒绝执行处理程序创建一个新的。

      使用Executors 工厂方法之一代替此通用构造函数可能会更方便。

      参数:

      corePoolSize-保留在池中的线​​程数(即使它们处于空闲状态),除非allowCoreThreadTimeOut已设置

      maximumPoolSize -池中允许的最大线程数

      keepAliveTime -当线程数大于内核数时,这是多余的空闲线程将在终止之前等待新任务的最长时间。

      unitkeepAliveTime参数的时间单位

      workQueue-在执行任务之前用于保留任务的队列。该队列将仅保存Runnable 该execute方法提交的任务。

      抛出:

      IllegalArgumentException -如果下列条件之一成立:
      corePoolSize < 0
      keepAliveTime < 0
      maximumPoolSize <= 0
      maximumPoolSize < corePoolSize

      NullPointerException-如果workQueue为null

    • ThreadPoolExecutor

      public  ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime, TimeUnit  unit, BlockingQueue < Runnable > workQueue, ThreadFactory  threadFactory)

      ThreadPoolExecutor使用给定的初始参数和默认的拒绝执行处理程序创建一个新的。

      参数:

      corePoolSize-保留在池中的线​​程数(即使它们处于空闲状态),除非allowCoreThreadTimeOut已设置

      maximumPoolSize -池中允许的最大线程数

      keepAliveTime -当线程数大于内核数时,这是多余的空闲线程将在终止之前等待新任务的最长时间。

      unitkeepAliveTime参数的时间单位

      workQueue-在执行任务之前用于保留任务的队列。该队列将仅保存Runnable 该execute方法提交的任务。

      threadFactory -执行程序创建新线程时要使用的工厂

      抛出:

      IllegalArgumentException -如果下列条件之一成立:
      corePoolSize < 0
      keepAliveTime < 0
      maximumPoolSize <= 0
      maximumPoolSize < corePoolSize

      NullPointerException-如果workQueue 或threadFactory为null

    • ThreadPoolExecutor

      public  ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime, TimeUnit  unit, BlockingQueue < Runnable > workQueue, RejectedExecutionHandler  handler)

      ThreadPoolExecutor使用给定的初始参数和 默认线程工厂创建一个新对象。

      参数:

      corePoolSize-保留在池中的线​​程数(即使它们处于空闲状态),除非allowCoreThreadTimeOut已设置

      maximumPoolSize -池中允许的最大线程数

      keepAliveTime -当线程数大于内核数时,这是多余的空闲线程将在终止之前等待新任务的最长时间。

      unitkeepAliveTime参数的时间单位

      workQueue-在执行任务之前用于保留任务的队列。该队列将仅保存Runnable 该execute方法提交的任务。

      handler -因为达到了线程界限和队列容量而在执行被阻止时使用的处理程序

      抛出:

      IllegalArgumentException -如果下列条件之一成立:
      corePoolSize < 0
      keepAliveTime < 0
      maximumPoolSize <= 0
      maximumPoolSize < corePoolSize

      NullPointerException-如果workQueue 或handler为null

    • ThreadPoolExecutor

      public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime, TimeUnit  unit, BlockingQueue < Runnable > workQueue, ThreadFactory  threadFactory, RejectedExecutionHandler  handler)

      ThreadPoolExecutor用给定的初始参数创建一个新的。

      参数:

      corePoolSize-保留在池中的线​​程数(即使它们处于空闲状态),除非allowCoreThreadTimeOut已设置

      maximumPoolSize -池中允许的最大线程数

      keepAliveTime -当线程数大于内核数时,这是多余的空闲线程将在终止之前等待新任务的最长时间。

      unitkeepAliveTime参数的时间单位

      workQueue-在执行任务之前用于保留任务的队列。该队列将仅保存Runnable 该execute方法提交的任务。

      threadFactory -执行程序创建新线程时要使用的工厂

      handler -因为达到了线程界限和队列容量而在执行被阻止时使用的处理程序

      抛出:

      IllegalArgumentException -如果下列条件之一成立:
      corePoolSize < 0
      keepAliveTime < 0
      maximumPoolSize <= 0
      maximumPoolSize < corePoolSize

      NullPointerException-如果workQueue or threadFactoryhandlernull为空

  • 方法细节

    • execute

      public  void  execute(Runnable  command)

      在将来的某个时间执行给定的任务。该任务可以在新线程或现有池线程中执行。如果由于该执行器已关闭或已达到其容量而无法提交执行任务,则该任务将由current处理RejectedExecutionHandler

      参数:

      command -要执行的任务

      抛出:

      RejectedExecutionExceptionRejectedExecutionHandler如果不能接受执行任务,则由决定

      NullPointerException-如果command为null

    • shutdown

      public void shutdown()

      启动有序关闭,在该关闭中执行先前提交的任务,但不接受任何新任务。如果已关闭,则调用不会产生任何其他影响。

      此方法不等待先前提交的任务完成执行。使用awaitTermination 做到这一点。

      抛出:

      SecurityException-如果存在安全管理器并关闭此ExecutorService,则它可能会操纵不允许调用者修改的线程,因为该调用器不持有,或者安全管理器的方法拒绝访问。RuntimePermission("modifyThread")checkAccess

    • shutdownNow

      public  List < Runnable>  shutdownNow()

      尝试停止所有正在执行的任务,中止正在等待的任务的处理,并返回正在等待执行的任务的列表。从此方法返回后,将从任务队列中清空(删除)这些任务。

      此方法不等待主动执行的任务终止。使用awaitTermination做到这一点。

      除了尽最大努力阻止停止处理正在执行的任务之外,没有任何保证。此实现通过中断任务Thread.interrupt();任何无法响应中断的任务都永远不会终止。

      返回值:

      从未开始执行的任务列表

      抛出:

      SecurityException-如果存在安全管理器并关闭此ExecutorService,则它可能会操纵不允许调用者修改的线程,因为该调用器不持有,或者安全管理器的方法拒绝访问。RuntimePermission("modifyThread")checkAccess

    • isTerminating

      public boolean isTerminating()

      如果此执行程序在终止之后shutdown()shutdownNow()尚未完全终止,则返回true 。此方法可能对调试有用。true在关闭后返回足够长的报告时间可能表明已提交的任务已被忽略或抑制了中断,从而导致该执行程序无法正确终止。

      返回值:

      true 如果终止但尚未终止

    • finalize

      @Deprecated(since="9")


      protected void finalize()

      不推荐使用。

      从类复制的说明: Object

      当垃圾回收确定不再有对该对象的引用时,由垃圾回收器在对象上调用。子类重写finalize用于处置系统资源或执行其他清除的方法。

      的一般约定finalize是,当Java™虚拟机确定不再有任何手段可以使尚未死亡的任何线程访问该对象时,将调用该协议,除非是由于采取了措施通过终结一些准备完成的对象或类。该finalize方法可以采取任何措施,包括使该对象可再次用于其他线程。finalize但是,通常的目的是在清除对象之前将其清除。例如,代表输入/输出连接的对象的finalize方法可能会执行显式I / O事务,以在永久丢弃该对象之前中断连接。

      finalize类 的方法不Object执行任何特殊操作;它只是正常返回。的子类 Object可能会覆盖此定义。

      Java编程语言不能保证哪个线程将为finalize任何给定对象调用该方法。但是,可以确保,在调用finalize时,调用finalize的线程将不持有任何用户可见的同步锁。如果finalize方法引发了未捕获的异常,则将忽略该异常,并终止该对象的终止。

      finalize为对象调用该方法之后,在Java虚拟机再次确定不再有任何方法可以由尚未死亡的任何线程访问该对象之前,不采取进一步的措施,包括可能采取的措施。准备完成的其他对象或类,此时可以将其丢弃。

      finalizeJava虚拟机永远不会为任何给定对象多次调用 该方法。

      finalize方法引发的任何异常都会导致该对象的终结被终止,但否则将被忽略。

      覆写:

      finalize in  class Object

      实施说明:

      该类的先前版本具有finalize方法,该方法可以关闭该执行程序,但是在此版本中,finalize不执行任何操作。

      也可以看看:

      WeakReference, PhantomReference

    • setThreadFactory

      public

       

      void

       

      setThreadFactory

      ​(

      ThreadFactory threadFactory)

      设置用于创建新线程的线程工厂。

      参数:

      threadFactory -新线程工厂

      抛出:

      NullPointerException -如果threadFactory为null

      也可以看看:

      getThreadFactory()

    • getThreadFactory

      public

       

      ThreadFactory

       

      getThreadFactory

      ()

      返回用于创建新线程的线程工厂。

      返回值:

      当前的线程工厂

      也可以看看:

      setThreadFactory(ThreadFactory)

    • setRejectedExecutionHandler

      public

       

      void

       

      setRejectedExecutionHandler

      ​(

      RejectedExecutionHandler handler)

      为无法执行的任务设置新的处理程序。

      参数:

      handler -新的处理程序

      抛出:

      NullPointerException -如果处理程序为null

      也可以看看:

      getRejectedExecutionHandler()

    • getRejectedExecutionHandler

      public

       

      RejectedExecutionHandler

       

      getRejectedExecutionHandler

      ()

      返回无法执行任务的当前处理程序。

      返回值:

      当前的处理程序

      也可以看看:

      setRejectedExecutionHandler(RejectedExecutionHandler)

    • setCorePoolSize

      public

       

      void

       

      setCorePoolSize

      ​(

      int corePoolSize)

      设置核心线程数。这将覆盖构造函数中设置的任何值。如果新值小于当前值,则多余的现有线程将在下次空闲时终止。如果更大,将在需要时启动新线程来执行任何排队的任务。

      参数:

      corePoolSize -新的核心尺寸

      抛出:

      IllegalArgumentException-如果corePoolSize < 0 还是corePoolSize比大于最大池大小

      也可以看看:

      getCorePoolSize()

    • getCorePoolSize

      public  int  getCorePoolSize()

      返回线程的核心数量。

      返回值:

      核心线程数

      也可以看看:

      setCorePoolSize(int)

    • prestartCoreThread

      public

       

      boolean

       

      prestartCoreThread

      ()

      启动一个核心线程,使其闲置地等待工作。这将覆盖仅在执行新任务时启动核心线程的默认策略。false 如果所有核心线程已经启动,则此方法将返回。

      返回值:

      true 如果线程已启动

    • prestartAllCoreThreads

      public  int  prestartAllCoreThreads()

      启动所有核心线程,使它们空闲地等待工作。这将覆盖仅在执行新任务时启动核心线程的默认策略。

      返回值:

      启动的线程数

    • allowCoreThreadTimeOut

      public

       

      boolean

       

      allowsCoreThreadTimeOut

      ()

      如果此池允许核心线程超时并在keepAlive时间内没有任务到达时终止,则返回true,如果有新任务到达,则在需要时替换该线程。如果为true,则适用于非核心线程的相同的保持活动策略也适用于核心线程。如果为false(默认值),则由于缺少传入任务,核心线程永远不会终止。

      返回值:

      true 如果允许核心线程超时,否则 false

      自:

      JDK 1.6

    • allowCoreThreadTimeOut

      public

       

      void

       

      allowCoreThreadTimeOut

      ​(

      boolean value)

      设置策略,以控制在保持活动时间内没有任务到达时核心线程是否可能超时并终止,并在新任务到达时根据需要替换。如果为false,则由于缺少传入任务,核心线程永远不会终止。如果为true,则适用于非核心线程的相同的保持活动策略也适用于核心线程。为避免连续更换线程,设置时,保持活动时间必须大于零true。通常应在主动使用池之前调用此方法。

      参数:

      valuetrue如果应该超时,否则false

      抛出:

      IllegalArgumentException-如果值为,true 并且当前保持活动时间不大于零

      自:

      JDK 1.6

    • setMaximumPoolSize

      public

       

      void

       

      setMaximumPoolSize

      ​(

      int maximumPoolSize)

      设置允许的最大线程数。这将覆盖构造函数中设置的任何值。如果新值小于当前值,则多余的现有线程将在下次空闲时终止。

      参数:

      maximumPoolSize -新的最大值

      抛出:

      IllegalArgumentException-如果新的最大值小于或等于零,或者小于核心池大小

      也可以看看:

      getMaximumPoolSize()

    • getMaximumPoolSize

      public  int  getMaximumPoolSize()

      返回允许的最大线程数。

      返回值:

      允许的最大线程数

      也可以看看:

      setMaximumPoolSize(int)

    • setKeepAliveTime

      public

       

      void

       

      setKeepAliveTime

      ​(

      long time, TimeUnit unit)

      设置线程保持活动时间,该时间是线程在终止之前可以保持空闲状态的时间。如果当前池中的线程数超过核心数,或者此池允许核心线程超时,那么等待此时间而没有处理任务的线程将被终止 。这将覆盖构造函数中设置的任何值。

      参数:

      time-等待的时间。时间值为零将导致多余的线程在执行任务后立即终止。

      unittime参数的时间单位

      抛出:

      IllegalArgumentException-如果time小于零或time为零,并且allowsCoreThreadTimeOut

      也可以看看:

      getKeepAliveTime(TimeUnit)

    • getKeepAliveTime

      public

       

      long

       

      getKeepAliveTime

      ​(

      TimeUnit unit)

      返回线程保持活动时间,该时间是线程在终止之前可以保持空闲状态的时间。如果当前池中的线程数超过核心数,或者此池允许核心线程超时,那么等待此时间而没有处理任务的线程将被终止 。

      参数:

      unit -结果的所需时间单位

      返回值:

      时间限制

      也可以看看:

      setKeepAliveTime(long, TimeUnit)

    • getQueue

      public  BlockingQueue < Runnable >  getQueue()

      返回此执行程序使用的任务队列。访问任务队列主要用于调试和监视。该队列可能正在使用中。检索任务队列不会阻止排队的任务执行。

      返回值:

      任务队列

    • remove

      public

       

      boolean

       

      remove

      ​(

      Runnable task)

      如果执行程序的内部队列中存在此任务,则将其删除,如果尚未启动,则导致该任务无法运行。

      此方法可能可用作取消方案的一部分。在放入内部队列之前,它可能无法删除已转换为其他形式的任务。例如,使用输入的任务submit可能会转换为维护Future状态的表单。但是,在这种情况下,purge()可以使用方法删除已取消的那些期货。

      参数:

      task -要删除的任务

      返回值:

      true 如果任务已删除

    • purge

      public

       

      void

       

      purge

      ()

      尝试从工作队列中删除所有Future 已取消的任务。此方法可用作存储回收操作,对功能没有其他影响。已取消的任务永远不会执行,但可能会在工作队列中累积,直到工作线程可以主动将其删除为止。而是调用此方法尝试立即将其删除。但是,如果存在其他线程的干扰,此方法可能无法删除任务。

    • getPoolSize

      public  int  getPoolSize()

      返回池中的当前线程数。

      返回值:

      线程数

    • getActiveCount

      public  int  getActiveCount()

      返回正在主动执行任务的线程的大概数量。

      返回值:

      线程数

    • getLargestPoolSize

      public  int  getLargestPoolSize()

      返回池中曾经同时存在的最大线程数。

      返回值:

      线程数

    • getTaskCount

      public

       

      long

       

      getTaskCount

      ()

      返回计划执行的任务总数。由于任务和线程的状态在计算过程中可能会动态更改,因此返回的值只是一个近似值。

      返回值:

      任务数

    • getCompletedTaskCount

      public

       

      long

       

      getCompletedTaskCount

      ()

      返回已完成执行的任务的总数。由于任务和线程的状态在计算过程中可能会动态变化,因此返回的值仅是一个近似值,而在连续调用中不会降低。

      返回值:

      任务数

    • toString

      public

       

      String

       

      toString

      ()

      返回标识此池及其状态的字符串,包括运行状态的指示以及估计的工作人员和任务计数。

      覆写:

      toString in  class Object

      返回值:

      标识此池及其状态的字符串

    • beforeExecute

      protected

       

      void

       

      beforeExecute

      ​(

      Thread t, Runnable r)

      在给定线程中执行给定Runnable之前调用的方法。该方法由t将执行task 的线程调用r,可用于重新初始化ThreadLocals或执行日志记录。

      该实现不执行任何操作,但可以在子类中对其进行自定义。注意:要正确嵌套多个重写,通常应super.beforeExecute在此方法的末尾调用子类。

      参数:

      t -将运行任务的线程 r

      r -将要执行的任务

    • afterExecute

       

      protected

       

      void

       

      afterExecute

      ​(

      Runnable r, Throwable t)

      给定Runnable执行完成时调用的方法。该方法由执行任务的线程调用。如果不为空,则Throwable是未捕获的RuntimeException 或Error导致执行突然终止的代码。

      该实现不执行任何操作,但可以在子类中对其进行自定义。注意:为了正确地嵌套多个重写,通常应super.afterExecute在此方法的开头调用子类。

      注意:如果将动作FutureTask显式地或通过诸如的方法包含在任务(例如 )中 submit,则这些任务对象会捕获并维护计算异常,因此它们不会导致突然终止,并且内部异常不会 传递给此方法。如果您想使用此方法捕获两种类型的失败,则可以进一步探查此类情况,例如在此示例子类中,如果任务被中止,该子类将打印直接原因或潜在异常:

       class ExtendedExecutor extends ThreadPoolExecutor { // ... protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); if (t == null && r instanceof Future<?> && ((Future<?>)r).isDone()) { try { Object result = ((Future<?>) r).get(); } catch (CancellationException ce) { t = ce; } catch (ExecutionException ee) { t = ee.getCause(); } catch (InterruptedException ie) { // ignore/reset Thread.currentThread().interrupt(); } } if (t != null) System.out.println(t); } }

       

      参数:

      r -已完成的可运行

      t -导致终止的异常;如果正常完成执行,则为null

    • terminated

      protected

       

      void

       

      terminated

      ()

      执行程序终止时调用的方法。默认实现不执行任何操作。注意:要正确嵌套多个重写,通常应super.terminated在此方法内调用子类 。

 

以上是 JAVA(JDK8)线程池实现分析 的全部内容, 来源链接: utcz.com/z/513504.html

回到顶部