Android开发中线程池源码解析

线程池(英语:thread pool):一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。----摘自维基百科

我们在Android或者Java开发中,日常所使用的就是ThreadPoolExecutor了,我们先来看下如何使用一个线程池来代替多线程开发。

使用线程池

// 创建一个核心线程数为5,最大线程数为10,空闲线程存活时间为60s的线程池对象

val threadPoolExecutor = ThreadPoolExecutor(

5, 10, 60,

TimeUnit.MINUTES,

ArrayBlockingQueue<Runnable>(100),

RejectedExecutionHandler { _, _ -> println("reject submit thread to thread pool") }

)

// 测试

for (i in 1..10) {

threadPoolExecutor.execute { println("execute thread is:${Thread.currentThread().name}") }

}

// 结果

// execute thread is:pool-1-thread-1

// execute thread is:pool-1-thread-1

// execute thread is:pool-1-thread-1

// execute thread is:pool-1-thread-1

// execute thread is:pool-1-thread-5

// execute thread is:pool-1-thread-5

// execute thread is:pool-1-thread-4

// execute thread is:pool-1-thread-3

// execute thread is:pool-1-thread-2

// execute thread is:pool-1-thread-1

从结果就可以看出来,执行时间操作,但是只创建了5个线程,另外5次都是复用线程的。这样就达到了复用存在的线程、减少对象的创建和销毁的额外开销;并且可以控制最大线程数,也就是控制了最大并发数。

知道如何使用一个线程池还不够,我们需要看看ThreadPoolExecutor是如何创建、复用这些线程的。下面我们看看创建ThreadPoolExecutor对象的几个参数:

构造方法

/**

* 创建一个ThreadPoolExecutor对象

*

* @param corePoolSize 核心线程数,这些线程会一直在线程池中,除非设置了 allowCoreThreadTimeOut

* @param maximumPoolSize 最大线程数,运行线程创建的最大值

* @param keepAliveTime 当线程数>核心线程数的时候,这个值就是空闲且非核心线程存活的时间

* @param unit keepAliveTime的单位

* @param workQueue 保存task的队列,直到执行execute()方法执行

* @param threadFactory ThreadFactory是一个接口,里面只有Thread newThread(Runnable r)方法,用来创建线程,

* 默认采用Executors.defaultThreadFactory()

* @param handler 拒绝处理任务时的策略,如果线程池满了且所有线程都不处于空闲状态,

* 通过RejectedExecutionHandler接口的rejectedExecution(Runnable r, ThreadPoolExecutor executor)来处理传进来的Runnable

* 系统提供了四种:CallerRunsPolicy(), AbortPolicy(), DiscardPolicy(), DiscardOldestPolicy()

* 默认采用new AbortPolicy()

*/

public ThreadPoolExecutor(int corePoolSize,

int maximumPoolSize,

long keepAliveTime,

TimeUnit unit,

BlockingQueue<Runnable> workQueue,

ThreadFactory threadFactory,

RejectedExecutionHandler handler){

if (corePoolSize < 0 ||

maximumPoolSize <= 0 ||

maximumPoolSize < corePoolSize ||

keepAliveTime < 0)

throw new IllegalArgumentException();

if (workQueue == null || threadFactory == null || handler == null)

throw new NullPointerException();

this.acc = System.getSecurityManager() == null ?

null :

AccessController.getContext();

this.corePoolSize = corePoolSize;

this.maximumPoolSize = maximumPoolSize;

this.workQueue = workQueue;

this.keepAliveTime = unit.toNanos(keepAliveTime);

this.threadFactory = threadFactory;

this.handler = handler;

}

我在方法头注释中我都一一解释了几个参数的作用,还有几点需要注意的就是:

  • 核心线程数不能小于0;
  • 最大线程数不能小于0;
  • 最大线程数不能小于核心线程数;
  • 空闲线程的存活时间不能小于0;

通过上面的解释我们很明白的知道前面几个参数的作用,但是最后两个参数我们并不能通过表面的解释通晓它,既然不能通过表象看懂他俩,那就看看默认的实现是如何做的,这样在接下来的源码分析中很有帮助。

ThreadFactory:线程工厂

ThreadFactory 是一个接口,里面只由唯一的 Thread newThread(Runnable r); 方法,此方法是用来创建线程的,从接口中我们得到的就只有这么多,下面我们看看 Executors 默认的 DefaultThreadFactory 类:

// 静态内部类

static class DefaultThreadFactory implements ThreadFactory {

// 线程池的标识,从1开始没创建一个线程池+1

private static final AtomicInteger poolNumber = new AtomicInteger(1);

// 线程组

private final ThreadGroup group;

// 线程名中的结尾标识,从1开始每创建一个线程+1

private final AtomicInteger threadNumber = new AtomicInteger(1);

// 线程名

private final String namePrefix;

DefaultThreadFactory() {

SecurityManager s = System.getSecurityManager();

group = (s != null) ? s.getThreadGroup() :

Thread.currentThread().getThreadGroup();

namePrefix = "pool-" +

poolNumber.getAndIncrement() +

"-thread-";

}

public Thread newThread(Runnable r) {

Thread t = new Thread(group, r,

namePrefix + threadNumber.getAndIncrement(),

0);

if (t.isDaemon())

t.setDaemon(false);

if (t.getPriority() != Thread.NORM_PRIORITY)

t.setPriority(Thread.NORM_PRIORITY);

return t;

}

}

RejectedExecutionHandler:拒绝处理任务的策略

RejectedExecutionHandler 也是一个接口,并且也只提供了唯一的 void rejectedExecution(Runnable r, ThreadPoolExecutor executor); 方法。我们可以自定义策略,也可以用上面提到的封装好的四种策略,先看一下四种策略分别怎么拒绝任务的:

CallerRunsPolicy

public static class CallerRunsPolicy implements RejectedExecutionHandler {

/**

* Creates a {@code CallerRunsPolicy}.

*/

public CallerRunsPolicy() {

}

/**

* 如果线程池还没关闭,那么就再次执行这个Runnable

*/

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {

if (!e.isShutdown()) {

r.run();

}

}

}

AbortPolicy

public static class AbortPolicy implements RejectedExecutionHandler {

/**

* Creates an {@code AbortPolicy}.

*/

public AbortPolicy() {

}

/**

* 这个策略就是抛出异常,不做其他处理

*/

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {

throw new RejectedExecutionException("Task " + r.toString() +

" rejected from " +

e.toString());

}

}

DiscardPolicy

public static class DiscardPolicy implements RejectedExecutionHandler {

/**

* Creates a {@code DiscardPolicy}.

*/

public DiscardPolicy() {

}

/**

* 什么也不做,也就是抛弃了这个Runnable

*/

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {

}

}

DiscardOldestPolicy

public static class DiscardOldestPolicy implements RejectedExecutionHandler {

/**

* Creates a {@code DiscardOldestPolicy} for the given executor.

*/

public DiscardOldestPolicy() {

}

/**

* 1. 线程池未关闭

* 2. 获取队列中的下一个Runnable

* 3. 获取到了,但是不对它进行处理,也就是抛弃它

* 4. 执行我们传过来的这个Runnable

*/

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {

if (!e.isShutdown()) {

e.getQueue().poll();

e.execute(r);

}

}

}

重要的参数

除了上述构造方法中的几个参数外,线程池还有几个比较核心的参数,如下:

public class ThreadPoolExecutor extends AbstractExecutorService {

// ctl 的低29位表示线程池中的线程数,高3位表示当前线程状态

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

// 29

private static final int COUNT_BITS = Integer.SIZE - 3;

// (2^29) -1

private static final int CAPACITY = (1 << COUNT_BITS) - 1;

// 运行状态:接受新任务并处理排队的任务

private static final int RUNNING = -1 << COUNT_BITS;

// 关闭状态:不接受新任务,但处理排队的任务

private static final int SHUTDOWN = 0 << COUNT_BITS;

// 停止状态:不接受新任务,不处理排队的任务,中断正在进行的任务

private static final int STOP = 1 << COUNT_BITS;

// 整理状态:整理状态,所有任务已终止,workerCount为零,线程将运行terminate()方法

private static final int TIDYING = 2 << COUNT_BITS;

// 终止状态:terminate()方法执行完成

private static final int TERMINATED = 3 << COUNT_BITS;

// 表示线程是否允许或停止

private static int runStateOf(int c) { return c & ~CAPACITY; }

// 线程的有效数量

private static int workerCountOf(int c) { return c & CAPACITY; }

private static int ctlOf(int rs, int wc) { return rs | wc; }

......后面的源码暂时省略

}

execute:执行

public void execute(Runnable command) {

if (command == null)

throw new NullPointerException();

int c = ctl.get();

// 如果运行中的线程数小于核心线程数,执行addWorker(command, true)创建新的核心Thread执行任务

if (workerCountOf(c) < corePoolSize) {

if (addWorker(command, true))

return;

c = ctl.get();

}

// 1. 已经满足:运行中的线程数大于核心线程数,但是小于最大线程数

// 2. 需要满足:线程池在运行状态

// 3. 需要满足:添加到工作队列中成功

if (isRunning(c) && workQueue.offer(command)) {

int recheck = ctl.get();

// 如果线程不在运行状态,就从工作队列中移除command

// 并且执行拒绝策略

if (!isRunning(recheck) && remove(command))

reject(command);

// 线程池处于运行状态,但是没有线程,则addWorker(null, false)

// 至于这里为什么要传入一个null,因为在最外层的if条件中我们已经将Runnable添加到工作队列中了

// 而且在runWorker()源码中也可以得到答案,如果传入的Runnable为空,就会去工作队列中取task。

else if (workerCountOf(recheck) == 0)

addWorker(null, false);

}

// 执行addWorker()创建新的非核心线程Thread执行任务

// addWorker() 失败,执行拒绝策略

else if (!addWorker(command, false))

reject(command);

}

从上面源码中可以看出,execute()一个新的任务,主要有以下这几种情况:

1、核心线程未满,直接新建核心线程并执行任务;

2、核心线程满了,工作队列未满,将任务添加到工作队列中;

3、核心线程和工作队列都满,但是最大线程数未达到,新建线程并执行任务;

4、上面条件都不满足,那么就执行拒绝策略。

更形象的可以看下方流程图:

添加任务的流程图

addWorker(Runnable , boolean):添加Worker

private boolean addWorker(Runnable firstTask, boolean core) {

// 标记外循环,比如在内循环中break retry就直接跳出外循环

retry:

for (; ; ) {

int c = ctl.get();

int rs = runStateOf(c);

// 直接返回false有以下3种情况:

// 1. 线程池状态为STOP、TIDYING、TERMINATED

// 2. 线程池状态不是running状态,并且firstTask不为空

// 3. 线程池状态不是running状态,并且工作队列为空

if (rs >= SHUTDOWN &&

!(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()))

return false;

for (; ; ) {

int wc = workerCountOf(c);

// 如果添加的是核心线程,但是运行的线程数大于等于核心线程数,那么就不添加了,直接返回

// 如果添加的是非核心线程,但是运行的线程数大于等于最大线程数,那么也不添加,直接返回

if (wc >= CAPACITY ||

wc >= (core ? corePoolSize : maximumPoolSize))

return false;

// 增加workerCount的值 +1

if (compareAndIncrementWorkerCount(c))

// 跳出外循环

break retry;

c = ctl.get(); // 重新检查线程池状态

if (runStateOf(c) != rs)

continue retry;

// 重新检查的状态和之前不合,再次从外循环进入

}

}

boolean workerStarted = false;

boolean workerAdded = false;

Worker w = null;

try {

w = new Worker(firstTask);

final Thread t = w.thread;

if (t != null) {

// 线程池重入锁

final ReentrantLock mainLock = this.mainLock;

// 获得锁

mainLock.lock();

try {

// Recheck while holding lock.

// Back out on ThreadFactory failure or if

// shut down before lock acquired.

int rs = runStateOf(ctl.get());

// 线程池在运行状态或者是线程池关闭同时Runnable也为空

if (rs < SHUTDOWN ||

(rs == SHUTDOWN && firstTask == null)) {

if (t.isAlive()) // precheck that t is startable

throw new IllegalThreadStateException();

// 想Worker中添加新的Worker

workers.add(w);

int s = workers.size();

if (s > largestPoolSize)

largestPoolSize = s;

workerAdded = true;

}

} finally {

// 释放锁

mainLock.unlock();

}

// 如果添加成功,启动线程

if (workerAdded) {

t.start();

workerStarted = true;

}

}

} finally {

if (!workerStarted)

addWorkerFailed(w);

}

return workerStarted;

}

addWorker() 主要就是在满足种种条件(上述源码中解释了)后,新建一个Worker对象,并添加到HashSet<Worker> workers中去,最后调用新建Worker对象的Thread变量的start()方法。

Worker类

Worker是一个继承了AQS并实现了Runnable的内部类,我们重点看看它的run()方法,因为上面addWorker()中,t.start()触发的就是它的run()方法:

private final class Worker

extends AbstractQueuedSynchronizer

implements Runnable {

/**

* This class will never be serialized, but we provide a

* serialVersionUID to suppress a javac warning.

*/

private static final long serialVersionUID = 6138294804551838833L;

/**

* Thread this worker is running in. Null if factory fails.

*/

final Thread thread;

/**

* Initial task to run. Possibly null.

*/

Runnable firstTask;

/**

* Per-thread task counter

*/

volatile long completedTasks;

/**

* Creates with given first task and thread from ThreadFactory.

*

* @param firstTask the first task (null if none)

*/

Worker(Runnable firstTask) {

setState(-1); // inhibit interrupts until runWorker

this.firstTask = firstTask;

// 这边是把Runnable传给了Thread,也就是说Thread.run()就是执行了下面的run()方法

this.thread = getThreadFactory().newThread(this);

}

/**

* Delegates main run loop to outer runWorker

*/

public void run() {

runWorker(this);

}

}

run()方法实际调用了runWorker(Worker)方法

runWorker(Worker)方法:

final void runWorker(Worker w) {

Thread wt = Thread.currentThread();

Runnable task = w.firstTask;

w.firstTask = null;

w.unlock(); // 释放锁,允许中断

boolean completedAbruptly = true;

try {

// 1. worker中的task不为空

// 2. 如果worker的task为空,那么取WorkerQueue的task

while (task != null || (task = getTask()) != null) {

w.lock();

// If pool is stopping, ensure thread is interrupted;

// if not, ensure thread is not interrupted. This

// requires a recheck in second case to deal with

// shutdownNow race while clearing interrupt

if ((runStateAtLeast(ctl.get(), STOP) ||

(Thread.interrupted() &&

runStateAtLeast(ctl.get(), STOP))) &&

!wt.isInterrupted())

wt.interrupt();

try {

// 这是一个空方法,可由子类实现

beforeExecute(wt, task);

Throwable thrown = null;

try {

// 执行task

task.run();

}

.... 省略

// 这是一个空方法,可由子类实现

finally {

afterExecute(task, thrown);

}

} finally {

task = null;

w.completedTasks++;

w.unlock();

}

}

completedAbruptly = false;

} finally {

processWorkerExit(w, completedAbruptly);

}

}

getTask():

```java

private Runnable getTask() {

// 进入死循环

for (; ; ) {

try {

// 为true的条件:

// allowCoreThreadTimeOut=true: 核心线程需根据keepAliveTime超时等待

// 核心线程数已满

boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

// 如果timed为true,执行BlockQueue.poll(),这个操作在取不到task的时候会等待keepAliveTime,然后返回null

// 如果timed为false,执行BlockQueue.take(),这个操作在队列为空的时候一直阻塞

Runnable r = timed ?

workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :

workQueue.take();

if (r != null)

return r;

}

}

}

```

线程池的源码按照上述的几个方法(execute(runnable) -> addWorker(runnable,core) -> Worker -> runWorker(worker) -> getTask())的顺序来分析,你就可以很清晰的将运作过程了解清楚,同事构造方法和几个重要的参数一定要懂,不然对于后面的源码分析很受阻碍,相信大家通过这篇文章可以加深对线程池的理解。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。

以上是 Android开发中线程池源码解析 的全部内容, 来源链接: utcz.com/p/244125.html

回到顶部