【图灵学院10】高并发之java线程池源码分析

java

1. 提纲

1)线程池的模块结构

2)示例&原理解析

2. 问题

1)线程池包含哪些东西

2)线程池的运作原理

3)调度线程池的运作原理

4)线程池怎么实现FixRate,FixDelay,他们之间的区别

5)任务怎么取消

3. 源码解析

3.1 线程池框架

接口简介:

java.util.concurrent.Executor:执行器,执行方法

java.util.concurrent.ExecutorService:(执行服务)包含服务的生命周期

java.util.concurrent.ScheduledExecutorService:(调度相关的服务)

辅助类:

java.util.concurrent.Executors:

核心实现类:

java.util.concurrent.ThreadPoolExecutor:(普通的线程池实现类)

java.util.concurrent.ScheduledThreadPoolExecutor:(调度的核心实现类)

java.util.concurrent.ForkJoinPool

完成服务:

java.util.concurrent.CompletionService:

java.util.concurrent.ExecutorCompletionService:

3.2 ThreadPoolExecutor源码剖析

1)构造器

核心数量,任务队列容器,存活时间,线程工厂,处理器

ThreadPoolExecutor.execute()方法源码

    public void execute(Runnable command) {

if (command == null)

throw new NullPointerException();

/*

* Proceed in 3 steps:

*

* 1. If fewer than corePoolSize threads are running, try to

* start a new thread with the given command as its first

* task. The call to addWorker atomically checks runState and

* workerCount, and so prevents false alarms that would add

* threads when it shouldn't, by returning false.

*

* 2. If a task can be successfully queued, then we still need

* to double-check whether we should have added a thread

* (because existing ones died since last checking) or that

* the pool shut down since entry into this method. So we

* recheck state and if necessary roll back the enqueuing if

* stopped, or start a new thread if there are none.

*

* 3. If we cannot queue task, then we try to add a new

* thread. If it fails, we know we are shut down or saturated

* and so reject the task.

*/

int c = ctl.get();

if (workerCountOf(c) < corePoolSize) {

if (addWorker(command, true))

return;

c = ctl.get();

}

if (isRunning(c) && workQueue.offer(command)) {

int recheck = ctl.get();

if (! isRunning(recheck) && remove(command))

reject(command);

else if (workerCountOf(recheck) == 0)

addWorker(null, false);

}

else if (!addWorker(command, false))

reject(command);

}

ThreadPoolExecutor.runWorker()方法源码

    final void runWorker(Worker w) {

Thread wt = Thread.currentThread();

Runnable task = w.firstTask;

w.firstTask = null;

w.unlock(); // allow interrupts

boolean completedAbruptly = true;

try {

while (task != null || (task = getTask()) != null) {

w.lock();

// If pool is stopping, ensure thread is interrupted;

// if not, ensure thread is not interrupted. This

// requires a recheck in second case to deal with

// shutdownNow race while clearing interrupt

if ((runStateAtLeast(ctl.get(), STOP) ||

(Thread.interrupted() &&

runStateAtLeast(ctl.get(), STOP))) &&

!wt.isInterrupted())

wt.interrupt();

try {

beforeExecute(wt, task);

Throwable thrown = null;

try {

task.run();

} catch (RuntimeException x) {

thrown = x; throw x;

} catch (Error x) {

thrown = x; throw x;

} catch (Throwable x) {

thrown = x; throw new Error(x);

} finally {

afterExecute(task, thrown);

}

} finally {

task = null;

w.completedTasks++;

w.unlock();

}

}

completedAbruptly = false;

} finally {

processWorkerExit(w, completedAbruptly);

}

}

3.3 ScheduledThreadPoolExecutor

 1)

2)FixRate和FixDelay的区别

        /**

* Sets the next time to run for a periodic task.

*/

private void setNextRunTime() {

long p = period;

if (p > 0)

time += p;

else

time = triggerTime(-p);

}

FixRate严格按照设定的时间

FixDelay按照上一个线程执行完成的时间间隔

FixRate: 21:40,21:41,21:42

FixDelay:21:40,21:43

    void reExecutePeriodic(RunnableScheduledFuture<?> task) {

if (canRunInCurrentRunState(true)) {
       //此处没有捕获异常,如果单个任务发生异常,会导致后续任务无法继续执行

super.getQueue().add(task);

if (!canRunInCurrentRunState(true) && remove(task))

task.cancel(false);

else

ensurePrestart();

}

}

以上是 【图灵学院10】高并发之java线程池源码分析 的全部内容, 来源链接: utcz.com/z/394539.html

回到顶部