java线程池源码解读

线程分为两种,分别是内核级线程(KLT)以及用户级线程(ULT)
首先说一下进程:
 进程是资源拥有的基本单位,进程切换需要保存进程状态,会造成资源的浪费。
 在同一个进程当中,线程共享进程所拥有的资源;线程切换不会引起进程切换,所需要的资源也要远少于进程切换,可以提高效率。
内核级线程:
线程管理的所有操作(创建,撤销),都由操作系统内核完成
 操作系统的内核提供一个应用程序的接口api,供开发者使用KLT
 线程特点:
 进程中的一个线程被阻塞,内核能调度同一进程的其他线程(就绪态)占有处理器运行
 多处理器环境中,内核能同时调度同一进程的多线程,将这些线程映射到不同的处理器核心上,提高进程的执行效率。
 应用程序线程在用户态运行,线程调度和管理在内核实现。线程调度时,控制权从一个线程改变到另一线程,需要模式切换,系统开销较大。
用户级线程:
所以线程的创建,消息传递,调度,保存/恢复上下文都有线程库来完成。
 内核感知不到多线程的存在。内核继续以进程为调度单位,并且给该进程指定一个执行状态(就绪、运行、阻塞等)
 线程特点:
 线程切换不需要内核模式,能节省模式切换开销和内核资源。
 允许进程按照特定的需要选择不同的调度算法来调度线程。调度算法需要自己实现。
 由于其不需要内核进行支持,所以可以跨OS运行。
 一个线程阻塞,将导致整个进程的阻塞。
JVM属于内核级线程
什么时候使用线程池:
 单个任务处理时间比较短
 需要处理的任务数量很大
线程池的优势:
 重用存在的线程,减少线程创建,消亡的开销,提高性能
 提高响应速度,当任务到达,不需要线程创建可以立即执行
 提高线程的可管理性
 jdk1.5之后,JUC(java.util.concurrent)为我们提供了多种线程池,我们可以根据需求,选择合适的。
 先来看看类型:
 newCachedThreadPool
 newFixedThreadPool
 newSingleThreadExecutor
 newWorkStealingPool(@since 1.8)
 newScheduledThreadPool // 定时线程池
 newSingleThreadScheduledExecutor
跟随源码,看一下这些线程池的基本执行步骤:
// 以下代码创建一个长度为10的线程池ExecutorService executorService = Executors.newFixedThreadPool(10);
// 跟随源码,看具体做了哪些操作
Executors.java
public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
}
ThreadPoolExecutor.java
public ThreadPoolExecutor(int corePoolSize, // 线程池常规工作线程数 
                              int maximumPoolSize, // 线程池最大工作线程数
                              long keepAliveTime,// 线程闲置最大存活时间
                              TimeUnit unit, // 存活时间单位
                              BlockingQueue<Runnable> workQueue) { // 存放任务队列,这是一个阻塞队列
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
						  int maximumPoolSize,
						  long keepAliveTime,
						  TimeUnit unit,
						  BlockingQueue<Runnable> workQueue,
						  ThreadFactory threadFactory,
						  RejectedExecutionHandler handler) {// 拒绝策略,即当线程池所有工作线程都在工作,还有任务添加进来的时候,对进来的新任务所采取的措施
	if (corePoolSize < 0 ||
		maximumPoolSize <= 0 ||
		maximumPoolSize < corePoolSize ||
		keepAliveTime < 0)
		throw new IllegalArgumentException();
	if (workQueue == null || threadFactory == null || handler == null)
		throw new NullPointerException();
	this.corePoolSize = corePoolSize;
	this.maximumPoolSize = maximumPoolSize;
	this.workQueue = workQueue;
	this.keepAliveTime = unit.toNanos(keepAliveTime);
	this.threadFactory = threadFactory;
	this.handler = handler;
}
// 线程池创建完毕
// 拒绝策略,一般有4种,如果这4种拒绝策略无法满足业务要求,则可以自定义拒绝策略。
CallerRunsPolicy // 重试添加当前的任务,他会自动重复调用 execute() 方法,直到成功
AbortPolicy // 抛弃任务,并且抛出异常RejectedExecutionException
DiscardPolicy // 抛弃任务
DiscardOldestPolicy // 抛弃队列里面等待时间最长的任务,把新进来任务加入队列
// 当我们往线程池里面添加一个任务的时候,线程池做了哪些操作呢?
executorService.execute(new Runnable() {
	@Override
	public void run() {
		try {
			Thread.sleep(1000);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
  });
// 单步调试以上代码,底层执行逻辑如下:
public void execute(Runnable command) {
	if (command == null)// 任务不能为空
		throw new NullPointerException();
	int c = ctl.get();
	if (workerCountOf(c) < corePoolSize) {// 工作线程数小于常规工作线程数,则添加工作线程,绑定任务
		if (addWorker(command, true))
			return;
		c = ctl.get();
	}
	if (isRunning(c) && workQueue.offer(command)) {// 任务加入队列
		int recheck = ctl.get();
		if (! isRunning(recheck) && remove(command))
			reject(command);
		else if (workerCountOf(recheck) == 0)
			addWorker(null, false);
	}
	else if (!addWorker(command, false))
		reject(command);
}
private boolean addWorker(Runnable firstTask, boolean core) {
	retry:
	for (;;) {
		int c = ctl.get();
		int rs = runStateOf(c);
		// Check if queue empty only if necessary.
		if (rs >= SHUTDOWN &&
			! (rs == SHUTDOWN &&
			   firstTask == null &&
			   ! workQueue.isEmpty()))
			return false;
		for (;;) {
			int wc = workerCountOf(c);
			if (wc >= CAPACITY ||
				wc >= (core ? corePoolSize : maximumPoolSize))
				return false;
			if (compareAndIncrementWorkerCount(c))
				break retry;
			c = ctl.get();  // Re-read ctl
			if (runStateOf(c) != rs)
				continue retry;
			// else CAS failed due to workerCount change; retry inner loop
		}
	}
	boolean workerStarted = false;
	boolean workerAdded = false;
	Worker w = null;
	try {
		w = new Worker(firstTask);// 新建一个工作线程
		final Thread t = w.thread;
		if (t != null) {
			final ReentrantLock mainLock = this.mainLock;
			mainLock.lock();
			try {
				int rs = runStateOf(ctl.get());
				if (rs < SHUTDOWN ||
					(rs == SHUTDOWN && firstTask == null)) {
					if (t.isAlive()) // precheck that t is startable
						throw new IllegalThreadStateException();
					workers.add(w);// 工作线程加入存储workers的set里面
					int s = workers.size();
					if (s > largestPoolSize)
						largestPoolSize = s;
					workerAdded = true;
				}
			} finally {
				mainLock.unlock();
			}
			if (workerAdded) {
				t.start();// 启动线程
				workerStarted = true;
			}
		}
	} finally {
		if (! workerStarted)
			addWorkerFailed(w);
	}
	return workerStarted;
}
public void run() {
	runWorker(this);
}
final void runWorker(Worker w) {
	Thread wt = Thread.currentThread();
	Runnable task = w.firstTask;
	w.firstTask = null;
	w.unlock(); // allow interrupts
	boolean completedAbruptly = true;
	try {
		while (task != null || (task = getTask()) != null) {
			w.lock();
			// If pool is stopping, ensure thread is interrupted;
			// if not, ensure thread is not interrupted.  This
			// requires a recheck in second case to deal with
			// shutdownNow race while clearing interrupt
			if ((runStateAtLeast(ctl.get(), STOP) ||
				 (Thread.interrupted() &&
				  runStateAtLeast(ctl.get(), STOP))) &&
				!wt.isInterrupted())
				wt.interrupt();
			try {
				beforeExecute(wt, task);
				Throwable thrown = null;
				try {
					task.run();
				} catch (RuntimeException x) {
					thrown = x; throw x;
				} catch (Error x) {
					thrown = x; throw x;
				} catch (Throwable x) {
					thrown = x; throw new Error(x);
				} finally {
					afterExecute(task, thrown);
				}
			} finally {
				task = null;
				w.completedTasks++;
				w.unlock();
			}
		}
		completedAbruptly = false;
	} finally {
		processWorkerExit(w, completedAbruptly);
	}
}
线程池执行任务流程如下图:
// 线程池的状态标识位:ThreadPoolExecutor.javaprivate final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); //高3位表示状态,后面29位表示线程数 (2^29)-1 (about 500 million) threads
// 线程池状态:
private static final int RUNNING    = -1 << COUNT_BITS; // 接受新任务并处理排队的任务
private static final int SHUTDOWN   =  0 << COUNT_BITS; // 不接受新任务,但处理排队的任务
private static final int STOP       =  1 << COUNT_BITS; // 不接受新任务、不处理排队的任务和中断正在进行的任务
private static final int TIDYING    =  2 << COUNT_BITS; // 所有任务都已终止,workercount为零,转换为state tiding的线程将运行terminated()钩子方法
private static final int TERMINATED =  3 << COUNT_BITS; // terminated()已完成
以上是 java线程池源码解读 的全部内容, 来源链接: utcz.com/z/510434.html








