【Java】Java多线程之线程池

线程池是什么

线程池(Thread Pool)是一种基于池化思想管理线程的工具。

线程池的作用

降低资源消耗:通过池化技术重复利用已创建的线程,降低线程创建和销毁造成的损耗。
提高响应速度:任务到达时,无需等待线程创建即可立即执行。
提高线程的可管理性:线程是稀缺资源,如果无限制创建,不仅会消耗系统资源,还会因为线程的不合理分布导致资源调度失衡,降低系统的稳定性。使用线程池可以进行统一的分配、调优和监控。

总体设计

【Java】Java多线程之线程池
ThreadPoolExecutor实现的顶层接口是Executor,顶层接口Executor提供了一种思想:将任务提交和任务执行进行解耦。用户无需关注如何创建线程,如何调度线程来执行任务,用户只需提供Runnable对象,将任务的运行逻辑提交到执行器(Executor)中,由Executor框架完成线程的调配和任务的执行部分。ExecutorService接口增加了一些能力:(1)扩充执行任务的能力,补充可以为一个或一批异步任务生成Future的方法;(2)提供了管控线程池的方法,比如停止线程池的运行。AbstractExecutorService则是上层的抽象类,将执行任务的流程串联了起来,保证下层的实现只需关注一个执行任务的方法即可。最下层的实现类ThreadPoolExecutor实现最复杂的运行部分,ThreadPoolExecutor将会一方面维护自身的生命周期,另一方面同时管理线程和任务,使两者良好的结合从而执行并行任务。

ThreadPoolExecutor

ThreadPoolExecutor继承了类AbstractExecutorService。

public ThreadPoolExecutor(int corePoolSize,

int maximumPoolSize,

long keepAliveTime,

TimeUnit unit,

BlockingQueue<Runnable> workQueue,

ThreadFactory threadFactory,

RejectedExecutionHandler handler) {

}

  • corePoolSize 核心线程数
  • maximumPoolSize 最大线程数
  • keepAliveTime 线程池中非核心线程空闲的存活时间大小
  • unit keepAliveTime的时间单位
  • workQueue 线程任务缓冲队列
  • threadFactory 线程工厂用来创建线程
  • handle 当拒绝处理任务时的策略

线程池的运行状态

ThreadPoolExecutor的运行状态有5种,分别为:
【Java】Java多线程之线程池
其生命周期转换如下入所示:
【Java】Java多线程之线程池

任务执行

【Java】Java多线程之线程池

  1. 如果workerCount < corePoolSize,则创建并启动一个线程来执行新提交的任务。
  2. 如果workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中。
  3. 如果corePoolSize <= workerCount < maximumPoolSize,且线程池内的阻塞队列已满,则创建并启动一个线程来执行新提交的任务。
  4. 如果workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满, 则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常

任务队列

LinkedBlockingQueue 一个由链表结构组成的有界队列,此队列按照先进先出(FIFO)的原则对元素进行排序。此队列的默认长度为Integer.MAX_VALUE,如果大量新任务在队列中堆积可能最终导致OOM。
ArrayBlockingQueue 一个数组实现的有界队列,此队列按照先进先出(FIFO)的原则对元素进行排序。支持公平锁和非公平锁。
SynchronousQueue 一个不存储元素的阻塞队列,每一个put操作必须等待take操作,否则不能添加元素。支持公平锁和非公平锁。

当任务队列是LinkedBlockingQueue,队列的默认长度为Integer.MAX_VALUE,corePoolSize=2,maximumPoolSize=6,此时启动10个线程任务,前2个线程任务立刻被执行,后续8个任务被加入任务队列中。

public class ThreadPoolTest {

public static void main(String[] args) {

BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();

ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 6, 1, TimeUnit.DAYS, queue);

for (int i = 0; i < 10; i++) {

executor.execute(new Runnable() {

@Override

public void run() {

try {

System.out.println(Thread.currentThread().getName());

Thread.sleep(3000);

} catch (InterruptedException e) {

e.printStackTrace();

}

}

});

int threadSize = queue.size();

System.out.println("线程队列大小为-->"+threadSize);

}

executor.shutdown();

}

}

pool-1-thread-1

线程队列大小为-->0

pool-1-thread-2

线程队列大小为-->0

线程队列大小为-->1

线程队列大小为-->2

线程队列大小为-->3

线程队列大小为-->4

线程队列大小为-->5

线程队列大小为-->6

线程队列大小为-->7

线程队列大小为-->8

pool-1-thread-2

pool-1-thread-1

pool-1-thread-2

pool-1-thread-1

pool-1-thread-2

pool-1-thread-1

pool-1-thread-2

pool-1-thread-1

当任务队列是ArrayBlockingQueue,队列的长度为4,corePoolSize=2,maximumPoolSize=6,此时启动10个线程任务,前2个线程任务立刻被执行,后续4个任务被加入任务队列中,剩下的4个线程任务(2+4=6,没有超过maximumPoolSize),启动新线程执行任务

public class ThreadPoolTest {

public static void main(String[] args) {

BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(4);

ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 6, 1, TimeUnit.DAYS, queue);

for (int i = 0; i < 10; i++) {

executor.execute(new Runnable() {

@Override

public void run() {

try {

System.out.println(Thread.currentThread().getName());

Thread.sleep(3000);

} catch (InterruptedException e) {

e.printStackTrace();

}

}

});

int threadSize = queue.size();

System.out.println("线程队列大小为-->"+threadSize);

}

executor.shutdown();

}

}

线程队列大小为-->0

pool-1-thread-1

线程队列大小为-->0

pool-1-thread-2

线程队列大小为-->1

线程队列大小为-->2

线程队列大小为-->3

线程队列大小为-->4

pool-1-thread-3

线程队列大小为-->4

线程队列大小为-->4

pool-1-thread-4

线程队列大小为-->4

pool-1-thread-5

pool-1-thread-6

线程队列大小为-->4

pool-1-thread-1

pool-1-thread-3

pool-1-thread-4

pool-1-thread-2

当任务队列是ArrayBlockingQueue,队列的长度为4,corePoolSize=2,maximumPoolSize=6,此时启动11个线程任务,前2个线程任务立刻被执行,后续4个任务被加入任务队列中,剩下的5个线程任务(2+5>6,超过maximumPoolSize),最后一个任务被拒绝执行

public class ThreadPoolTest {

public static void main(String[] args) {

BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(4);

ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 6, 1, TimeUnit.DAYS, queue);

for (int i = 0; i < 11; i++) {

executor.execute(new Runnable() {

@Override

public void run() {

try {

System.out.println(Thread.currentThread().getName());

Thread.sleep(3000);

} catch (InterruptedException e) {

e.printStackTrace();

}

}

});

int threadSize = queue.size();

System.out.println("线程队列大小为-->"+threadSize);

}

executor.shutdown();

}

}

pool-1-thread-1

线程队列大小为-->0

pool-1-thread-2

线程队列大小为-->0

线程队列大小为-->1

线程队列大小为-->2

线程队列大小为-->3

线程队列大小为-->4

线程队列大小为-->4

pool-1-thread-3

pool-1-thread-4

线程队列大小为-->4

pool-1-thread-5

线程队列大小为-->4

pool-1-thread-6

线程队列大小为-->4

Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task [email protected] rejected from [email protected][Running, pool size = 6, active threads = 6, queued tasks = 4, completed tasks = 0]

at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)

at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)

at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)

at ThreadPoolTest.main(ThreadPoolTest.java:9)

pool-1-thread-1

pool-1-thread-2

pool-1-thread-3

pool-1-thread-4

当任务队列是SynchronousQueue,corePoolSize=2,maximumPoolSize=6,此时启动10个线程任务,前6个线程任务立即执行,剩下的4个线程任务(从7开始>6,超过maximumPoolSize),被拒绝执行

public class ThreadPoolTest {

public static void main(String[] args) {

BlockingQueue<Runnable> queue = new SynchronousQueue<>();

ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 6, 1, TimeUnit.DAYS, queue);

for (int i = 0; i < 10; i++) {

executor.execute(new Runnable() {

@Override

public void run() {

try {

System.out.println(Thread.currentThread().getName());

Thread.sleep(3000);

} catch (InterruptedException e) {

e.printStackTrace();

}

}

});

int threadSize = queue.size();

System.out.println("线程队列大小为-->"+threadSize);

}

executor.shutdown();

}

}

线程队列大小为-->0

pool-1-thread-1

线程队列大小为-->0

pool-1-thread-2

线程队列大小为-->0

pool-1-thread-3

线程队列大小为-->0

pool-1-thread-4

线程队列大小为-->0

pool-1-thread-5

线程队列大小为-->0

pool-1-thread-6

Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task [email protected] rejected from [email protected][Running, pool size = 6, active threads = 6, queued tasks = 0, completed tasks = 0]

at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)

at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)

at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)

at ThreadPoolTest.main(ThreadPoolTest.java:9)

拒绝策略

任务拒绝模块是线程池的保护部分,线程池有一个最大的容量,当线程池的任务缓存队列已满,并且线程池中的线程数目达到maximumPoolSize时,就需要拒绝掉该任务,采取任务拒绝策略,保护线程池。
【Java】Java多线程之线程池

源码解析

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

一个 ctl 变量可以包含两部分信息: 线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount). 由于 int 型的变量是由32位二进制的数构成, 所以用 ctl 的高3位来表示线程池的运行状态, 用低29位来表示线程池内有效线程的数量.
表示线程池运行状态的变量通常命名为 rs, 表示线程池中有效线程数量的变量通常命名为 wc, 另外, ctl 也通常会简写作 c
由于 ctl 变量是由线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount)这两个信息组合而成, 所以, 如果知道了这两部分信息各自的数值, 就可以调用下面的 ctlOf() 方法来计算出 ctl 的数值

private static int ctlOf(int rs, int wc) { return rs | wc; }

反过来, 如果知道了 ctl 的值, 那么也可以通过如下的 runStateOf() 和 workerCountOf() 两个方法来分别获取线程池的运行状态和线程池内有效线程的数量.
CAPACITY表示线程池内有效线程的数量上限就是29个二进制1所表示的数值 (约为5亿)

private static final int COUNT_BITS = Integer.SIZE - 3;

private static final int CAPACITY = (1 << COUNT_BITS) - 1;

private static int runStateOf(int c) { return c & ~CAPACITY; }

private static int workerCountOf(int c) { return c & CAPACITY; }

线程池五种状态的具体数值

private static final int RUNNING = -1 << COUNT_BITS;

private static final int SHUTDOWN = 0 << COUNT_BITS;

private static final int STOP = 1 << COUNT_BITS;

private static final int TIDYING = 2 << COUNT_BITS;

private static final int TERMINATED = 3 << COUNT_BITS;

我们要向线程池提交一个任务, 可以通过调用 execute() 或 submit()方法来实现, 而二者的区别是, execute()方法只能进行任务的提交而不能获取该任务执行的结果, 但 submit()方法则既能进行任务的提交, 又能获取该任务执行的结果. 所以, 如果你需要获取一个任务执行的结果或者需要对一个任务执行的结果进行某种操作, 那么就需要使用 submit()方法来提交任务. 其实 submit()方法就是对 execute()方法的一种封装, 它内部也是调用 execute()方法来实现任务提交的, 只是因为 submit()方法的返回值是一个 Future 对象, 通过返回的 Future对象就能获取该任务最终执行的结果,从线程的提交开始分析, 由于 submit()方法内部也是调用 execute()方法, 所以我们就直接分析 execute()方法, 其源码如下:

    public void execute(Runnable command) {

if (command == null)

throw new NullPointerException();

int c = ctl.get();

//根据ctl的值, 获取线程池中的有效线程数 workerCount, 如果 workerCount小于核心线程数corePoolSize,调用addWorker()方法创建线程执行任务

if (workerCountOf(c) < corePoolSize) {

if (addWorker(command, true))

return;

c = ctl.get();

}

//如果线程池内的阻塞队列 workQueue未满,则将提交的任务command添加到阻塞队列workQueue中,被添加到阻塞队列中的该任务将会在未来的某个时刻被执行.

if (isRunning(c) && workQueue.offer(command)) {

int recheck = ctl.get();

if (!isRunning(recheck) && remove(command))

reject(command);

else if (workerCountOf(recheck) == 0)

addWorker(null, false);

//如果线程池内的线程池内的阻塞队列已满,并且有效线程数小于最大线程数maximumPoolSize,调用addWorker()方法创建线程执行任务

//如果线程池内的线程池内的阻塞队列已满,并且有效线程数达到了最大线程数maximumPoolSize,那么就让 RejectedExecutionHandler 根据它的拒绝策略来处理该任务, 默认的处理方式是直接抛异常.

} else if (!addWorker(command, false))

reject(command);

}

四种线程池

    public static ExecutorService newCachedThreadPool() {

return new ThreadPoolExecutor(0, Integer.MAX_VALUE,

60L, TimeUnit.SECONDS,

new SynchronousQueue<Runnable>());

}

    public static ExecutorService newFixedThreadPool(int nThreads) {

return new ThreadPoolExecutor(nThreads, nThreads,

0L, TimeUnit.MILLISECONDS,

new LinkedBlockingQueue<Runnable>());

}

    public static ExecutorService newSingleThreadExecutor() {

return new FinalizableDelegatedExecutorService

(new ThreadPoolExecutor(1, 1,

0L, TimeUnit.MILLISECONDS,

new LinkedBlockingQueue<Runnable>()));

}

    public static ScheduledExecutorService newScheduledThreadPool(

int corePoolSize, ThreadFactory threadFactory) {

return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);

}

public ScheduledThreadPoolExecutor(int corePoolSize,

ThreadFactory threadFactory) {

super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,

new DelayedWorkQueue(), threadFactory);

}

Alibaba规范的解释:

【强制】线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor的方式,这样 的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。 说明: Executors 返回的线程池对象的弊端如下:

1) FixedThreadPool 和 SingleThreadPool : 允许的请求队列长度为 Integer.MAX_VALUE ,可能会堆积大量的请求,从而导致 OOM 。

2) CachedThreadPool 和 ScheduledThreadPool : 允许的创建线程数量为 Integer.MAX_VALUE ,可能会创建大量的线程,从而导致 OOM 。

参考:
Java线程池实现原理及其在美团业务中的实践
Java 线程池 ThreadPoolExecutor 源码分析

以上是 【Java】Java多线程之线程池 的全部内容, 来源链接: utcz.com/a/98878.html

回到顶部