多线程--精通ThreadPoolExecutor

前言

在多线程开发中,应该避免显式创建线程,而是采用线程池里面的线程。使用线程池可以减少手动创建线程,减少线程创建和回收的损耗等。那么使用线程池就需要了解它的原理。这里我们ThreadPoolExecutor.execute()方法内部的具体实现逻辑

流程图


源码分析

    public void execute(Runnable command) {

if (command == null)

throw new NullPointerException();

// 获取状态变量

int c = ctl.get();

if (workerCountOf(c) < corePoolSize) {

// 如果worker个数小于核心线程数量

if (addWorker(command, true))

return;

c = ctl.get();

}

// 尝试将任务丢入工作队列中

if (isRunning(c) && workQueue.offer(command)) {

int recheck = ctl.get();

if (! isRunning(recheck) && remove(command))

reject(command);

elseif (workerCountOf(recheck) == 0)

addWorker(null, false);

}

// 工作队列满了的话再次尝试增加worker

elseif (!addWorker(command, false))

reject(command);

}

  private boolean addWorker(Runnable firstTask, boolean core) {

retry:

for (;;) {

int c = ctl.get();

int rs = runStateOf(c);

// Check if queue empty only if necessary.

if (rs >= SHUTDOWN &&

! (rs == SHUTDOWN &&

firstTask == null &&

! workQueue.isEmpty()))

returnfalse;

for (;;) {

// 获取worker数量

int wc = workerCountOf(c);

if (wc >= CAPACITY ||

wc >= (core ? corePoolSize : maximumPoolSize))

// 判断是否大于核心线程数量或者超过线程池总大小

returnfalse;

if (compareAndIncrementWorkerCount(c))

break retry;

c = ctl.get(); // Re-read ctl

if (runStateOf(c) != rs)

continue retry;

// else CAS failed due to workerCount change; retry inner loop

}

}

boolean workerStarted = false;

boolean workerAdded = false;

Worker w = null;

try {

// 创建新的worker

w = new Worker(firstTask);

final Thread t = w.thread;

if (t != null) {

final ReentrantLock mainLock = this.mainLock;

mainLock.lock();

try {

// 单线程处理工作者入队

int rs = runStateOf(ctl.get());

if (rs < SHUTDOWN ||

(rs == SHUTDOWN && firstTask == null)) {

if (t.isAlive()) // 这时候的worker不能已经是在运行中的

throw new IllegalThreadStateException();

// 将worker加入到工作者的集合中

workers.add(w);

int s = workers.size();

// 更新线程池的当前最大size

if (s > largestPoolSize)

largestPoolSize = s;

workerAdded = true;

}

} finally {

mainLock.unlock();

}

if (workerAdded) {

// 如果加入到工作者集合中成功,那么就启动工作者

t.start();

workerStarted = true;

}

}

} finally {

if (! workerStarted)

addWorkerFailed(w);

}

return workerStarted;

}

// 这个方法是工作者的实际工作内容,看下工作者是怎么处理任务的

final void runWorker(Worker w) {

Thread wt = Thread.currentThread();

Runnable task = w.firstTask;

w.firstTask = null;

w.unlock(); // 允许中断

boolean completedAbruptly = true;

try {

// 如果工作者携带的任务或任务队列不是空的,就会一直循环

while (task != null || (task = getTask()) != null) {

w.lock();

// If pool is stopping, ensure thread is interrupted;

// if not, ensure thread is not interrupted. This

// requires a recheck in second case to deal with

// shutdownNow race while clearing interrupt

if ((runStateAtLeast(ctl.get(), STOP) ||

(Thread.interrupted() &&

runStateAtLeast(ctl.get(), STOP))) &&

!wt.isInterrupted())

wt.interrupt();

try {

beforeExecute(wt, task);

Throwable thrown = null;

try {

task.run();

} catch (RuntimeException x) {

thrown = x; throw x;

} catch (Error x) {

thrown = x; throw x;

} catch (Throwable x) {

thrown = x; throw new Error(x);

} finally {

// 处理任务执行之后的逻辑

afterExecute(task, thrown);

}

} finally {

task = null;

w.completedTasks++;

w.unlock();

}

}

completedAbruptly = false;

} finally {

// 处理worker退出的逻辑

processWorkerExit(w, completedAbruptly);

}

}


以上是 多线程--精通ThreadPoolExecutor 的全部内容, 来源链接: utcz.com/a/25598.html

回到顶部