Java Executor 框架

java

     Executor框架是指java5中引入的一系列并发库中与executor相关的功能类,包括Executor、Executors、ExecutorService、CompletionService、Future、Callable等。(图片引用自http://www.javaclubcn.com/a/jichuzhishi/2012/1116/170.html)

     本篇博文分析Executor中几个比较重要的接口和类。

     Executor

1 public interface Executor {

2 void execute(Runnable command);

3 }

     Executor接口是Executor框架中最基础的部分,定义了一个用于执行Runnable的execute方法。它没有直接的实现类,有一个重要的子接口ExecutorService。

     ExecutorService

 1 //继承自Executor接口

2 public interface ExecutorService extends Executor {

3 /**

4 * 关闭方法,调用后执行之前提交的任务,不再接受新的任务

5 */

6 void shutdown();

7 /**

8 * 从语义上可以看出是立即停止的意思,将暂停所有等待处理的任务并返回这些任务的列表

9 */

10 List<Runnable> shutdownNow();

11 /**

12 * 判断执行器是否已经关闭

13 */

14 boolean isShutdown();

15 /**

16 * 关闭后所有任务是否都已完成

17 */

18 boolean isTerminated();

19 /**

20 * 中断

21 */

22 boolean awaitTermination(long timeout, TimeUnit unit)

23 throws InterruptedException;

24 /**

25 * 提交一个Callable任务

26 */

27 <T> Future<T> submit(Callable<T> task);

28 /**

29 * 提交一个Runable任务,result要返回的结果

30 */

31 <T> Future<T> submit(Runnable task, T result);

32 /**

33 * 提交一个任务

34 */

35 Future<?> submit(Runnable task);

36 /**

37 * 执行所有给定的任务,当所有任务完成,返回保持任务状态和结果的Future列表

38 */

39 <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)

40 throws InterruptedException;

41 /**

42 * 执行给定的任务,当所有任务完成或超时期满时(无论哪个首先发生),返回保持任务状态和结果的 Future 列表。

43 */

44 <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,

45 long timeout, TimeUnit unit)

46 throws InterruptedException;

47 /**

48 * 执行给定的任务,如果某个任务已成功完成(也就是未抛出异常),则返回其结果。

49 */

50 <T> T invokeAny(Collection<? extends Callable<T>> tasks)

51 throws InterruptedException, ExecutionException;

52 /**

53 * 执行给定的任务,如果在给定的超时期满前某个任务已成功完成(也就是未抛出异常),则返回其结果。

54 */

55 <T> T invokeAny(Collection<? extends Callable<T>> tasks,

56 long timeout, TimeUnit unit)

57 throws InterruptedException, ExecutionException, TimeoutException;

58 }

    ExecutorService接口继承自Executor接口,定义了终止、提交任务、跟踪任务返回结果等方法。

    ExecutorService涉及到Runnable、Callable、Future接口,这些接口的具体内容如下。

 1 // 实现Runnable接口的类将被Thread执行,表示一个基本的任务

2 public interface Runnable {

3 // run方法就是它所有的内容,就是实际执行的任务

4 public abstract void run();

5 }

6 // Callable同样是任务,与Runnable接口的区别在于它接收泛型,同时它执行任务后带有返回内容

7 public interface Callable<V> {

8 // 相对于run方法的带有返回值的call方法

9 V call() throws Exception;

10 }

Future

 1 // Future代表异步任务的执行结果

2 public interface Future<V> {

3

4 /**

5 * 尝试取消一个任务,如果这个任务不能被取消(通常是因为已经执行完了),返回false,否则返回true。

6 */

7 boolean cancel(boolean mayInterruptIfRunning);

8

9 /**

10 * 返回代表的任务是否在完成之前被取消了

11 */

12 boolean isCancelled();

13

14 /**

15 * 如果任务已经完成,返回true

16 */

17 boolean isDone();

18

19 /**

20 * 获取异步任务的执行结果(如果任务没执行完将等待)

21 */

22 V get() throws InterruptedException, ExecutionException;

23

24 /**

25 * 获取异步任务的执行结果(有最常等待时间的限制)

26 *

27 * timeout表示等待的时间,unit是它时间单位

28 */

29 V get(long timeout, TimeUnit unit)

30 throws InterruptedException, ExecutionException, TimeoutException;

31 }

     ExecutorService有一个子接口ScheduledExecutorService和一个抽象实现类AbstractExecutorService。

     ScheduledExecutorService

 1 // 可以安排指定时间或周期性的执行任务的ExecutorService

2 public interface ScheduledExecutorService extends ExecutorService {

3 /**

4 * 在指定延迟后执行一个任务,只执行一次

5 */

6 public ScheduledFuture<?> schedule(Runnable command,

7 long delay, TimeUnit unit);

8 /**

9 * 与上面的方法相同,只是接受的是Callable任务

10 */

11 public <V> ScheduledFuture<V> schedule(Callable<V> callable,

12 long delay, TimeUnit unit);

13 /**

14 * 创建并执行一个周期性的任务,在initialDelay延迟后每间隔period个单位执行一次,时间单位都是unit

15 * 每次执行任务的时间点是initialDelay, initialDelay+period, initialDelay + 2 * period...

16 */

17 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,

18 long initialDelay,

19 long period,

20 TimeUnit unit);

21 /**

22 * 创建并执行一个周期性的任务,在initialDelay延迟后开始执行,在执行结束后再延迟delay个单位开始执行下一次任务,时间单位都是unit

23 * 每次执行任务的时间点是initialDelay, initialDelay+(任务运行时间+delay), initialDelay + 2 * (任务运行时间+delay)...

24 */

25 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,

26 long initialDelay,

27 long delay,

28 TimeUnit unit);

29 }

    ScheduledExecutorService定义了四个方法,已经在上面给出基本的解释。ScheduledExecutorService有两个实现类,分别是DelegatedScheduledExecutorService和ScheduledThreadPoolExecutor,将在后面介绍。还需要解释的是ScheduledFuture。

    ScheduledFuture继承自Future和Delayed接口,自身没有添加方法。Delayed接口定义了一个获取剩余延迟的方法。

     AbstractExecutorService

  1 // 提供ExecutorService的默认实现

2 public abstract class AbstractExecutorService implements ExecutorService {

3 /*

4 * 为指定的Runnable和value构造一个FutureTask,value表示默认被返回的Future

5 */

6 protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {

7 return new FutureTask<T>(runnable, value);

8 }

9 /*

10 * 为指定的Callable创建一个FutureTask

11 */

12 protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {

13 return new FutureTask<T>(callable);

14 }

15 /*

16 * 提交Runnable任务

17 */

18 public Future<?> submit(Runnable task) {

19 if (task == null) throw new NullPointerException();

20 // 通过newTaskFor方法构造RunnableFuture,默认的返回值是null

21 RunnableFuture<Object> ftask = newTaskFor(task, null);

22 // 调用具体实现的execute方法

23 execute(ftask);

24 return ftask;

25 }

26 /*

27 * 提交Runnable任务

28 */

29 public <T> Future<T> submit(Runnable task, T result) {

30 if (task == null) throw new NullPointerException();

31 // 通过newTaskFor方法构造RunnableFuture,默认的返回值是result

32 RunnableFuture<T> ftask = newTaskFor(task, result);

33 execute(ftask);

34 return ftask;

35 }

36 /*

37 * 提交Callable任务

38 */

39 public <T> Future<T> submit(Callable<T> task) {

40 if (task == null) throw new NullPointerException();

41 RunnableFuture<T> ftask = newTaskFor(task);

42 execute(ftask);

43 return ftask;

44 }

45

46 /*

47 * doInvokeAny的具体实现(核心内容),其它几个方法都是重载方法,都对这个方法进行调用

48 * tasks 是被执行的任务集,timed标志是否定时的,nanos表示定时的情况下执行任务的限制时间

49 */

50 private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,

51 boolean timed, long nanos)

52 throws InterruptedException, ExecutionException, TimeoutException {

53 // tasks空判断

54 if (tasks == null)

55 throw new NullPointerException();

56 // 任务数量

57 int ntasks = tasks.size();

58 if (ntasks == 0)

59 throw new IllegalArgumentException();

60 // 创建对应数量的Future返回集

61 List<Future<T>> futures= new ArrayList<Future<T>>(ntasks);

62 ExecutorCompletionService<T> ecs =

63 new ExecutorCompletionService<T>(this);

64 try {

65 // 执行异常

66 ExecutionException ee = null;

67 // System.nanoTime()根据系统计时器当回当前的纳秒值

68 long lastTime = (timed)? System.nanoTime() : 0;

69 // 获取任务集的遍历器

70 Iterator<? extends Callable<T>> it = tasks.iterator();

71

72 // 向执行器ExecutorCompletionService提交一个任务,并将结果加入futures中

73 futures.add(ecs.submit(it.next

74 // 修改任务计数器

75 --ntasks;

76 // 活跃任务计数器

77 int active = 1;

78 for (;;) {

79 // 获取并移除代表已完成任务的Future,如果不存在,返回null

80 Future<T> f = ecs.poll();

81 if (f == null) {

82 // 没有任务完成,且任务集中还有未提交的任务

83 if (ntasks > 0) {

84 // 剩余任务计数器减1

85 --ntasks;

86 // 提交任务并添加结果

87 futures.add(ecs.submit(it.next()));

88 // 活跃任务计数器加1

89 ++active;

90 }

91 // 没有剩余任务,且没有活跃任务(所有任务可能都会取消),跳过这一次循环

92 else if (active == 0)

93 break;

94 else if (timed) {

95 // 获取并移除代表已完成任务的Future,如果不存在,会等待nanos指定的纳秒数

96 f = ecs.poll(nanos, TimeUnit.NANOSECONDS);

97 if (f == null)

98 throw new TimeoutException();

99 // 计算剩余可用时间

100 long now = System.nanoTime();

101 nanos -= now - lastTime;

102 lastTime = now;

103 }

104 else

105 // 获取并移除表示下一个已完成任务的未来,等待,如果目前不存在。

106 // 执行到这一步说明已经没有任务任务可以提交,只能等待某一个任务的返回

107 f = ecs.take();

108 }

109 // f不为空说明有一个任务完成了

110 if (f != null) {

111 // 已完成一个任务,所以活跃任务计数减1

112 --active;

113 try {

114 // 返回该任务的结果

115 return f.get();

116 } catch (InterruptedException ie) {

117 throw ie;

118 } catch (ExecutionException eex) {

119 ee = eex;

120 } catch (RuntimeException rex) {

121 ee = new ExecutionException(rex);

122 }

123 }

124 }

125 // 如果没有成功返回结果则抛出异常

126 if (ee == null)

127 ee = new ExecutionException();

128 throw ee;

129

130 } finally {

131 // 无论执行中发生异常还是顺利结束,都将取消剩余未执行的任务

132 for (Future<T> f : futures)

133 f.cancel(true);

134 }

135 }

136

137 public <T> T invokeAny(Collection<? extends Callable<T>> tasks)

138 throws InterruptedException, ExecutionException {

139 try {

140 // 非定时任务的doInvokeAny调用

141 return doInvokeAny(tasks, false, 0);

142 } catch (TimeoutException cannotHappen) {

143 assert false;

144 return null;

145 }

146 }

147 // 定时任务的invokeAny调用,timeout表示超时时间,unit表示时间单位

148 public <T> T invokeAny(Collection<? extends Callable<T>> tasks,

149 long timeout, TimeUnit unit)

150 throws InterruptedException, ExecutionException, TimeoutException {

151 return doInvokeAny(tasks, true, unit.toNanos(timeout));

152 }

153 // 无超时设置的invokeAll方法

154 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)

155 throws InterruptedException {

156 // 空任务判断

157 if (tasks == null)

158 throw new NullPointerException();

159 // 创建大小为任务数量的结果集

160 List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());

161 // 是否完成所有任务的标记

162 boolean done = false;

163 try {

164 // 遍历并执行任务

165 for (Callable<T> t : tasks) {

166 RunnableFuture<T> f = newTaskFor(t);

167 futures.add(f);

168 execute(f);

169 }

170 // 遍历结果集

171 for (Future<T> f : futures) {

172 // 如果某个任务没完成,通过f调用get()方法

173 if (!f.isDone()) {

174 try {

175 // get方法等待计算完成,然后获取结果(会等待)。所以调用get后任务就会完成计算,否则会等待

176 f.get();

177 } catch (CancellationException ignore) {

178 } catch (ExecutionException ignore) {

179 }

180 }

181 }

182 // 标志所有任务执行完成

183 done = true;

184 // 返回结果

185 return futures;

186 } finally {

187 // 假如没有完成所有任务(可能是发生异常等情况),将任务取消

188 if (!done)

189 for (Future<T> f : futures)

190 f.cancel(true);

191 }

192 }

193 // 超时设置的invokeAll方法

194 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,

195 long timeout, TimeUnit unit)

196 throws InterruptedException {

197 // 需要执行的任务集为空或时间单位为空,抛出异常

198 if (tasks == null || unit == null)

199 throw new NullPointerException();

200 // 将超时时间转为纳秒单位

201 long nanos = unit.toNanos(timeout);

202 // 创建任务结果集

203 List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());

204 // 是否全部完成的标志

205 boolean done = false;

206 try {

207 // 遍历tasks,将任务转为RunnableFuture

208 for (Callable<T> t : tasks)

209 futures.add(newTaskFor(t));

210 // 记录当前时间(单位是纳秒)

211 long lastTime = System.nanoTime();

212 // 获取迭代器

213 Iterator<Future<T>> it = futures.iterator();

214 // 遍历

215 while (it.hasNext()) {

216 // 执行任务

217 execute((Runnable)(it.next()));

218 // 记录当前时间

219 long now = System.nanoTime();

220 // 计算剩余可用时间

221 nanos -= now - lastTime;

222 // 更新上一次执行时间

223 lastTime = now;

224 // 超时,返回保存任务状态的结果集

225 if (nanos <= 0)

226 return futures;

227 }

228

229 for (Future<T> f : futures) {

230 // 如果有任务没完成

231 if (!f.isDone()) {

232 // 时间已经用完,返回保存任务状态的结果集

233 if (nanos <= 0)

234 return futures;

235 try {

236 // 获取计算结果,最多等待给定的时间nanos,单位是纳秒

237 f.get(nanos, TimeUnit.NANOSECONDS);

238 } catch (CancellationException ignore) {

239 } catch (ExecutionException ignore) {

240 } catch (TimeoutException toe) {

241 return futures;

242 }

243 // 计算可用时间

244 long now = System.nanoTime();

245 nanos -= now - lastTime;

246 lastTime = now;

247 }

248 }

249 // 修改是否全部完成的标记

250 done = true;

251 // 返回结果集

252 return futures;

253 } finally {

254 // 假如没有完成所有任务(可能是时间已经用完、发生异常等情况),将任务取消

255 if (!done)

256 for (Future<T> f : futures)

257 f.cancel(true);

258 }

259 }

260 }

    AbstractExecutor实现了ExecutorService接口的部分方法。具体代码的分析在上面已经给出。

    AbstractExecutor有两个子类:DelegatedExecutorService、ThreadPoolExecutor。将在后面介绍。

    下面是AbstractExecutor中涉及到的RunnableFuture、FutureTask、ExecutorCompletionService。

    RunnableFuture继承自Future和Runnable,只有一个run()方法(Runnable中已经有一个run方法了,为什么RunnableFuture还要重新写一个run方法呢?求高手指教)。RunnableFuture接口看上去就像是Future和Runnable两个接口的组合。

    FutureTask实现了RunnableFuture接口,除了实现了Future和Runnable中的方法外,它还有自己的方法和一个内部类Sync。

    ExecutorCompletionService实现了CompletionService接口,将结果从复杂的一部分物种解耦出来。这些内容后续会介绍,不过这里先介绍框架中的其它内容,弄清整体框架。

    下面看继承自AbstractExecutorService的ThreadPoolExecutor。

    ThreadPoolExecutor

ThreadPoolExecutor(好长)

  1 public class ThreadPoolExecutor extends AbstractExecutorService {

2 // 检查关闭的权限

3 private static final RuntimePermission shutdownPerm =

4 new RuntimePermission("modifyThread");

5 /* runState提供了主要的生命周期控制,可取值有以下几个:

6 * RUNNING:接受新的任务,处理队列中的任务

7 * SHUTDOWN:不再接受新的任务,但是处理队列中的任务

8 * STOP:不接受新任务,也不处理队列中的任务,打断正在处理的任务

9 * TERMINATED:和STOP类似,同时终止所有线程

10 * RUNNING -> SHUTDOWN

11 * On invocation of shutdown(), perhaps implicitly in finalize()

12 * (RUNNING or SHUTDOWN) -> STOP

13 * On invocation of shutdownNow()

14 * SHUTDOWN -> TERMINATED

15 * When both queue and pool are empty

16 * STOP -> TERMINATED

17 * When pool is empty

18 *

19 */

20 volatile int runState;

21 static final int RUNNING = 0;

22 static final int SHUTDOWN = 1;

23 static final int STOP = 2;

24 static final int TERMINATED = 3;

25

26 // 用于保持任务的队列

27 private final BlockingQueue<Runnable> workQueue;

28 // poolSize, corePoolSize, maximumPoolSize, runState, workers set的更新锁

29 private final ReentrantLock mainLock = new ReentrantLock();

30 // mainLock锁的一个Condition实例

31 private final Condition termination = mainLock.newCondition();

32 // 保持线程池中所有的工作线程。只有获取mainLock锁后才能访问。

33 private final HashSet<Worker> workers = new HashSet<Worker>();

34 // 空闲线程的等待时间,大为是纳秒

35 private volatile long keepAliveTime;

36 // 是否允许核心线程“活着” false(默认值)允许,哪怕空闲;true则使用keepAliveTime来控制等待超时时间

37 private volatile boolean allowCoreThreadTimeOut;

38 // 核心线程池的大小

39 private volatile int corePoolSize;

40 // pool size最大值

41 private volatile int maximumPoolSize;

42 // 当前pool大小

43 private volatile int poolSize;

44 // 拒绝执行的处理器 顾名思义,当一个任务被拒绝执行后将同个这个处理器进行处理

45 private volatile RejectedExecutionHandler handler;

46 // 线程工厂,用于创建线程

47 private volatile ThreadFactory threadFactory;

48 // 最终pool size达到的最大值

49 private int largestPoolSize;

50 // 已完成任务计数

51 private long completedTaskCount;

52 // 默认的拒绝执行的处理器

53 private static final RejectedExecutionHandler defaultHandler =

54 new AbortPolicy();

55 /**

56 * 关于借个size的说明:

57 * 线程池数量poolSize指工作线程Worker对象的集合workers的实际大小,通过workers.size()可直接获得。

58 * 核心线程池数量corePoolSize,可理解为工作线程Worker对象的集合workers的目标大小。

59 * 如果poolSize > corePoolSize,那么ThreadPoolExecutor就会有机制在适当的时候回收闲置的线程。

60 * 最大线程池数量maxPoolSize,就是工作线程Worker对象的集合workers的大小上限。

61 * 假如说任务队列满了,再来新任务时,若poolSize还没达到maxPoolSize,则继续创建新的线程来执行新任务,

62 * 若不幸poolSize达到了上限maxPoolSize,那不能再创建新的线程了,只能采取reject策略来拒绝新任务。

63 */

64 /** 构造方法 开始*/

65 public ThreadPoolExecutor(int corePoolSize,

66 int maximumPoolSize,

67 long keepAliveTime,

68 TimeUnit unit,

69 BlockingQueue<Runnable> workQueue) {

70 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,

71 Executors.defaultThreadFactory(), defaultHandler);

72 }

73 public ThreadPoolExecutor(int corePoolSize,

74 int maximumPoolSize,

75 long keepAliveTime,

76 TimeUnit unit,

77 BlockingQueue<Runnable> workQueue,

78 ThreadFactory threadFactory) {

79 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,

80 threadFactory, defaultHandler);

81 }

82 public ThreadPoolExecutor(int corePoolSize,

83 int maximumPoolSize,

84 long keepAliveTime,

85 TimeUnit unit,

86 BlockingQueue<Runnable> workQueue,

87 RejectedExecutionHandler handler) {

88 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,

89 Executors.defaultThreadFactory(), handler);

90 }

91 // 主要的构造方法,其它构造方法都是对这个方法的调用

92 public ThreadPoolExecutor(int corePoolSize,

93 int maximumPoolSize,

94 long keepAliveTime,

95 TimeUnit unit,

96 BlockingQueue<Runnable> workQueue,

97 ThreadFactory threadFactory,

98 RejectedExecutionHandler handler) {

99 // 非法输入(明显这些值都是不能小于0的)

100 if (corePoolSize < 0 ||

101 maximumPoolSize <= 0 ||

102 maximumPoolSize < corePoolSize ||

103 keepAliveTime < 0)

104 throw new IllegalArgumentException();

105 // 空判断

106 if (workQueue == null || threadFactory == null || handler == null)

107 throw new NullPointerException();

108 this.corePoolSize = corePoolSize;

109 this.maximumPoolSize = maximumPoolSize;

110 this.workQueue = workQueue;

111 this.keepAliveTime = unit.toNanos(keepAliveTime);

112 this.threadFactory = threadFactory;

113 this.handler = handler;

114 }

115 /** 构造方法 结束*/

116

117

118 // 执行Runnable任务

119 public void execute(Runnable command) {

120 if (command == null)

121 throw new NullPointerException();

122 /*如果当前线程数量poolSize>=核心线程数量corePoolSize,

123 那当然无法再把当前任务加入到核心线程池中执行了,于是进花括号选择其他的策略执行;

124 如果poolSize没有达到corePoolSize,那很自然是把当前任务放到核心线程池执行,

125 也就是执行逻辑或运算符后的方法addIfUnderCorePoolSize(command)。

126 “放到核心线程池执行”是什么意思呢?

127 就是new 一个新工作线程放到workers集合中,让这个新线程来执行当前的任务command,而这个新线程可以认为是核心线程池中的其中一个线程。*/

128 if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {

129 // 线程池状态时RUNNING且能将任务添加到worker队列中

130 if (runState == RUNNING && workQueue.offer(command)) {

131 // 加入了队列以后,只要保证有工作线程就ok了,工作线程会自动去执行任务队列的。

132 // 所以判断一下if ( runState != RUNNING || poolSize == 0),

133 // 在这个if为true时候,去保证一下任务队列有线程会执行,即执行ensureQueuedTaskHandled(command)方法。

134 // 这里有两种情况,情况一:runState != RUNNING,这种情况在ensureQueuedTaskHandled方法中会把任务丢给reject拒绝策略处理,

135 // 情况二:poolSize == 0,这种情况是new一个新线程加入到工作线程集合workers中。

136 if (runState != RUNNING || poolSize == 0)

137 ensureQueuedTaskHandled(command);

138 }

139 // 程序执行到这个分支,说明当前状态runState != RUNNING,或者任务队列workQueue已经满了。

140 // 先看第一个条件下,前面解释过runState,除了RUNNING状态,其他三个状态都不能接收新任务,

141 // 所以当runState != RUNNING时新任务只能根据reject策略拒绝,

142 // 而这个拒绝的逻辑是在addIfUnderMaximumPoolSize方法中实现的;

143 // 再看第二个条件下,workQueue已经满,潜在的条件是runState == RUNNING,这种情况怎么处理新任务呢?

144 // 很简单,若当前线程数量poolSize没有达到最大线程数量maxPoolSize,

145 // 则创建新的线程去执行这个无法加入任务队列的新任务,

146 // 否则就根据reject策略拒绝

147 else if (!addIfUnderMaximumPoolSize(command))

148 reject(command); // is shutdown or saturated

149 }

150 }

151

152 private Thread addThread(Runnable firstTask) {

153 Worker w = new Worker(firstTask);

154 // 创建一个新Thread t

155 Thread t = threadFactory.newThread(w);

156 if (t != null) {

157 w.thread = t;

158 workers.add(w);

159 int nt = ++poolSize;

160 // 跟踪线程池大小的最大值

161 if (nt > largestPoolSize)

162 largestPoolSize = nt;

163 }

164 return t;

165 }

166

167 // 创建并启动新线程执行firstTask(在运行线程数小于核心线程池大小的情况且状态是RUNNING)

168 private boolean addIfUnderCorePoolSize(Runnable firstTask) {

169 Thread t = null;

170 final ReentrantLock mainLock = this.mainLock;

171 // 获取锁

172 mainLock.lock();

173 try {

174 if (poolSize < corePoolSize && runState == RUNNING)

175 // 创建一个新线程

176 t = addThread(firstTask);

177 } finally {

178 // 释放锁

179 mainLock.unlock();

180 }

181 if (t == null)

182 return false;

183 // 启动线程执行任务

184 t.start();

185 return true;

186 }

187

188 // 创建并启动新线程执行firstTask(在运行线程数小于pool size的最大值的情况且状态是RUNNING)

189 private boolean addIfUnderMaximumPoolSize(Runnable firstTask) {

190 Thread t = null;

191 final ReentrantLock mainLock = this.mainLock;

192 mainLock.lock();

193 try {

194 if (poolSize < maximumPoolSize && runState == RUNNING)

195 t = addThread(firstTask);

196 } finally {

197 mainLock.unlock();

198 }

199 if (t == null)

200 return false;

201 t.start();

202 return true;

203 }

204

205 // 确保任务被处理

206 private void ensureQueuedTaskHandled(Runnable command) {

207 final ReentrantLock mainLock = this.mainLock;

208 mainLock.lock();

209 // 拒绝标记

210 boolean reject = false;

211 Thread t = null;

212 try {

213 int state = runState;

214 // 如果状态不是RUNNING,能成功从worker队列中移除,则拒绝这个任务执行

215 if (state != RUNNING && workQueue.remove(command))

216 reject = true;

217 else if (state < STOP &&

218 poolSize < Math.max(corePoolSize, 1) &&

219 !workQueue.isEmpty())

220 t = addThread(null);

221 } finally {

222 mainLock.unlock();

223 }

224 if (reject)

225 reject(command);

226 else if (t != null)

227 // 不用拒绝任务则启动线程执行任务

228 t.start();

229 }

230

231 // 调用RejectedExecutionHandler决绝任务

232 void reject(Runnable command) {

233 handler.rejectedExecution(command, this);

234 }

235 // 工作线程,实现了Runnable接口

236 private final class Worker implements Runnable {

237 // 每个任务执行都必须获取和释放runLock。这主要是防止中断的目的是取消工作线程,而不是中断正在运行的任务。

238 private final ReentrantLock runLock = new ReentrantLock();

239 // 要执行的任务

240 private Runnable firstTask;

241 // 每个线程完成任务的计数器,最后会统计到completedTaskCount中

242 volatile long completedTasks;

243 // 用于执行Runnable的线程

244 Thread thread;

245 // 构造方法

246 Worker(Runnable firstTask) {

247 this.firstTask = firstTask;

248 }

249 // 判断这个线程是否活动

250 boolean isActive() {

251 return runLock.isLocked();

252 }

253 // 中断闲置线程

254 void interruptIfIdle() {

255 final ReentrantLock runLock = this.runLock;

256 if (runLock.tryLock()) {

257 try {

258 if (thread != Thread.currentThread())

259 thread.interrupt();

260 } finally {

261 runLock.unlock();

262 }

263 }

264 }

265 // 中断

266 void interruptNow() {

267 thread.interrupt();

268 }

269

270

271 private void runTask(Runnable task) {

272 final ReentrantLock runLock = this.runLock;

273 runLock.lock();

274 try {

275

276 if (runState < STOP &&

277 Thread.interrupted() &&

278 runState >= STOP)

279 thread.interrupt();

280

281 boolean ran = false;

282 beforeExecute(thread, task);

283 try {

284 task.run();

285 ran = true;

286 afterExecute(task, null);

287 ++completedTasks;

288 } catch (RuntimeException ex) {

289 if (!ran)

290 afterExecute(task, ex);

291 throw ex;

292 }

293 } finally {

294 runLock.unlock();

295 }

296 }

297

298

299 public void run() {

300 try {

301 Runnable task = firstTask;

302 firstTask = null;

303 /**

304 * 注意这段while循环的执行逻辑,每执行完一个核心线程后,就会去线程池

305 * 队列中取下一个核心线程,如取出的核心线程为null,则当前工作线程终止

306 */

307 while (task != null || (task = getTask()) != null) {

308 //你所提交的核心线程(任务)的运行逻辑

309 runTask(task);

310 task = null;

311 }

312 } finally {

313 // 当前工作线程退出

314 workerDone(this);

315 }

316 }

317 }

318

319 // 从池队列中取的核心线程(任务)的方法

320 Runnable getTask() {

321 for (;;) {

322 try {

323 // 获取运行状态

324 int state = runState;

325 // 大于SHUTDOWN,即STOP和TERMINATED状态,没有任务

326 if (state > SHUTDOWN)

327 return null;

328 Runnable r;

329 // SHUTDOWN状态

330 if (state == SHUTDOWN) // 帮助清空队列

331 r = workQueue.poll();

332 // 状态时RUNNING,且poolSize > corePoolSize或allowCoreThreadTimeOut

333 else if (poolSize > corePoolSize || allowCoreThreadTimeOut)

334 // 获取并移除元素(等待指定的时间)

335 r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);

336 else

337 // 获取并移除元素(会一直等待知道获取到有效元素)

338 r = workQueue.take();

339 // 获取结果不为空,返回

340 if (r != null)

341 return r;

342 // 检查一个获取任务失败的线程能否退出

343 if (workerCanExit()) {

344 if (runState >= SHUTDOWN) // 中断其他线程

345 interruptIdleWorkers();

346 return null;

347 }

348 // Else retry

349 } catch (InterruptedException ie) {

350 // On interruption, re-check runState

351 }

352 }

353 }

354

355 // 检查一个获取任务失败的线程能否退出

356 private boolean workerCanExit() {

357 final ReentrantLock mainLock = this.mainLock;

358 mainLock.lock();

359 boolean canExit;

360 try {

361 // 可以退出的条件是状态为STOP或TERMINATED或至少有一个处理非空队列的线程(在允许超时的情况下)

362 canExit = runState >= STOP ||

363 workQueue.isEmpty() ||

364 (allowCoreThreadTimeOut &&

365 poolSize > Math.max(1, corePoolSize));

366 } finally {

367 mainLock.unlock();

368 }

369 return canExit;

370 }

371

372 // 中断其他线程

373 void interruptIdleWorkers() {

374 final ReentrantLock mainLock = this.mainLock;

375 mainLock.lock();

376 try {

377 // 遍历工作线程

378 for (Worker w : workers)

379 // 尝试中断闲置线程

380 w.interruptIfIdle();

381 } finally {

382 mainLock.unlock();

383 }

384 }

385 // 工作线程退出要处理的逻辑

386 void workerDone(Worker w) {

387 final ReentrantLock mainLock = this.mainLock;

388 mainLock.lock();

389 try {

390 completedTaskCount += w.completedTasks;

391 workers.remove(w);//从工作线程缓存中删除

392 if (--poolSize == 0)//poolSize减一,这时其实又可以创建工作线程了

393 tryTerminate();//尝试终止

394 } finally {

395 mainLock.unlock();

396 }

397 }

398

399 // 尝试终止

400 private void tryTerminate() {

401 //终止的前提条件就是线程池里已经没有工作线程(Worker)了

402 if (poolSize == 0) {

403 int state = runState;

404 /**

405 * 如果当前已经没有了工作线程(Worker),但是线程队列里还有等待的线程任务,则创建一个

406 * 工作线程来执行线程队列中等待的任务

407 */

408 if (state < STOP && !workQueue.isEmpty()) {

409 state = RUNNING; // disable termination check below

410 Thread t = addThread(null);

411 if (t != null)

412 t.start();

413 }

414 // 设置池状态为终止状态

415 if (state == STOP || state == SHUTDOWN) {

416 runState = TERMINATED;

417 termination.signalAll();

418 terminated();

419 }

420 }

421 }

422 // 发起一个有序的关闭在以前已提交任务的执行,但不接受新任务。如果已经关闭,调用不会有其他影响。

423 public void shutdown() {

424 // Gets the system security interface.

425 SecurityManager security = System.getSecurityManager();

426 if (security != null)

427 // 检查权限(以抛出异常的形式)

428 security.checkPermission(shutdownPerm);

429 final ReentrantLock mainLock = this.mainLock;

430 mainLock.lock();

431 try {

432 if (security != null) { // 检查调用者是否能修改线程

433 for (Worker w : workers)

434 security.checkAccess(w.thread);

435 }

436 // 获取运行状态

437 int state = runState;

438 // 小于SHUTDOWN的不就是RUNNING么。。。

439 if (state < SHUTDOWN)

440 runState = SHUTDOWN;

441

442 try {

443 for (Worker w : workers) {

444 // 中断线程

445 w.interruptIfIdle();

446 }

447 } catch (SecurityException se) { // Try to back out

448 runState = state;

449 // tryTerminate() here would be a no-op

450 throw se;

451 }

452 // 尝试终止

453 tryTerminate(); // Terminate now if pool and queue empty

454 } finally {

455 mainLock.unlock();

456 }

457 }

458

459

460 public List<Runnable> shutdownNow() {

461 SecurityManager security = System.getSecurityManager();

462 if (security != null)

463 security.checkPermission(shutdownPerm);

464

465 final ReentrantLock mainLock = this.mainLock;

466 mainLock.lock();

467 try {

468 if (security != null) { // Check if caller can modify our threads

469 for (Worker w : workers)

470 security.checkAccess(w.thread);

471 }

472

473 int state = runState;

474 // 与上一个方法主要区别在于状态和interruptNow方法

475 if (state < STOP)

476 runState = STOP;

477

478 try {

479 for (Worker w : workers) {

480 w.interruptNow();

481 }

482 } catch (SecurityException se) { // Try to back out

483 runState = state;

484 // tryTerminate() here would be a no-op

485 throw se;

486 }

487

488 List<Runnable> tasks = drainQueue();

489 tryTerminate(); // Terminate now if pool and queue empty

490 return tasks;

491 } finally {

492 mainLock.unlock();

493 }

494 }

495

496 // 清空队列

497 private List<Runnable> drainQueue() {

498 List<Runnable> taskList = new ArrayList<Runnable>();

499 // 将队列中的所有元素一到taskList中

500 workQueue.drainTo(taskList);

501 while (!workQueue.isEmpty()) {

502 Iterator<Runnable> it = workQueue.iterator();

503 try {

504 if (it.hasNext()) {

505 Runnable r = it.next();

506 // 从workQueue中移除,并添加到taskList中

507 if (workQueue.remove(r))

508 taskList.add(r);

509 }

510 } catch (ConcurrentModificationException ignore) {

511 }

512 }

513 return taskList;

514 }

515

516 public boolean isShutdown() {

517 return runState != RUNNING;

518 }

519

520

521 boolean isStopped() {

522 return runState == STOP;

523 }

524

525

526 public boolean isTerminating() {

527 int state = runState;

528 return state == SHUTDOWN || state == STOP;

529 }

530

531 public boolean isTerminated() {

532 return runState == TERMINATED;

533 }

534

535 public boolean awaitTermination(long timeout, TimeUnit unit)

536 throws InterruptedException {

537 long nanos = unit.toNanos(timeout);

538 final ReentrantLock mainLock = this.mainLock;

539 mainLock.lock();

540 try {

541 for (;;) {

542 if (runState == TERMINATED)

543 return true;

544 if (nanos <= 0)

545 return false;

546 nanos = termination.awaitNanos(nanos);

547 }

548 } finally {

549 mainLock.unlock();

550 }

551 }

552

553

554 protected void finalize() {

555 shutdown();

556 }

557

558

559 public void setThreadFactory(ThreadFactory threadFactory) {

560 if (threadFactory == null)

561 throw new NullPointerException();

562 this.threadFactory = threadFactory;

563 }

564

565

566 public ThreadFactory getThreadFactory() {

567 return threadFactory;

568 }

569

570

571 public void setRejectedExecutionHandler(RejectedExecutionHandler handler) {

572 if (handler == null)

573 throw new NullPointerException();

574 this.handler = handler;

575 }

576

577

578 public RejectedExecutionHandler getRejectedExecutionHandler() {

579 return handler;

580 }

581

582 // 设置核心线程数 这里的设置将覆盖构造方法中的设置

583 // 如果小于构造方法的设置,多余的线程将被闲置

584 // 如果大于构造方法的设置,新线程将被用于执行排队的任务

585 public void setCorePoolSize(int corePoolSize) {

586 if (corePoolSize < 0)

587 throw new IllegalArgumentException();

588 final ReentrantLock mainLock = this.mainLock;

589 mainLock.lock();

590 try {

591 int extra = this.corePoolSize - corePoolSize;

592 this.corePoolSize = corePoolSize;

593 // 大于构造方法的设置

594 if (extra < 0) {

595 int n = workQueue.size();

596 while (extra++ < 0 && n-- > 0 && poolSize < corePoolSize) {

597 Thread t = addThread(null);

598 if (t != null)

599 t.start();

600 else

601 break;

602 }

603 }

604 // 小于构造方法的设置

605 else if (extra > 0 && poolSize > corePoolSize) {

606 try {

607 Iterator<Worker> it = workers.iterator();

608 while (it.hasNext() &&

609 extra-- > 0 &&

610 poolSize > corePoolSize &&

611 workQueue.remainingCapacity() == 0)

612 it.next().interruptIfIdle();

613 } catch (SecurityException ignore) {

614 // Not an error; it is OK if the threads stay live

615 }

616 }

617 } finally {

618 mainLock.unlock();

619 }

620 }

621

622

623 public int getCorePoolSize() {

624 return corePoolSize;

625 }

626

627

628 public boolean prestartCoreThread() {

629 return addIfUnderCorePoolSize(null);

630 }

631

632

633 public int prestartAllCoreThreads() {

634 int n = 0;

635 while (addIfUnderCorePoolSize(null))

636 ++n;

637 return n;

638 }

639

640

641 public boolean allowsCoreThreadTimeOut() {

642 return allowCoreThreadTimeOut;

643 }

644

645

646 public void allowCoreThreadTimeOut(boolean value) {

647 if (value && keepAliveTime <= 0)

648 throw new IllegalArgumentException("Core threads must have nonzero keep alive times");

649

650 allowCoreThreadTimeOut = value;

651 }

652

653 // 设置所允许的最大的线程数。这将覆盖在构造函数中设置的任何值。如果新值小于当前值,多余的现有线程将被终止时,他们成为闲置。

654 public void setMaximumPoolSize(int maximumPoolSize) {

655 if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)

656 throw new IllegalArgumentException();

657 final ReentrantLock mainLock = this.mainLock;

658 mainLock.lock();

659 try {

660 int extra = this.maximumPoolSize - maximumPoolSize;

661 this.maximumPoolSize = maximumPoolSize;

662 if (extra > 0 && poolSize > maximumPoolSize) {

663 try {

664 Iterator<Worker> it = workers.iterator();

665 while (it.hasNext() &&

666 extra > 0 &&

667 poolSize > maximumPoolSize) {

668 it.next().interruptIfIdle();

669 --extra;

670 }

671 } catch (SecurityException ignore) {

672 // Not an error; it is OK if the threads stay live

673 }

674 }

675 } finally {

676 mainLock.unlock();

677 }

678 }

679

680

681 public int getMaximumPoolSize() {

682 return maximumPoolSize;

683 }

684

685

686 public void setKeepAliveTime(long time, TimeUnit unit) {

687 if (time < 0)

688 throw new IllegalArgumentException();

689 if (time == 0 && allowsCoreThreadTimeOut())

690 throw new IllegalArgumentException("Core threads must have nonzero keep alive times");

691 this.keepAliveTime = unit.toNanos(time);

692 }

693

694

695 public long getKeepAliveTime(TimeUnit unit) {

696 return unit.convert(keepAliveTime, TimeUnit.NANOSECONDS);

697 }

698

699

700 public BlockingQueue<Runnable> getQueue() {

701 return workQueue;

702 }

703

704

705 public boolean remove(Runnable task) {

706 return getQueue().remove(task);

707 }

708

709 // 移除所有被取消的任务

710 public void purge() {

711 // Fail if we encounter interference during traversal

712 try {

713 Iterator<Runnable> it = getQueue().iterator();

714 while (it.hasNext()) {

715 Runnable r = it.next();

716 if (r instanceof Future<?>) {

717 Future<?> c = (Future<?>)r;

718 if (c.isCancelled())

719 it.remove();

720 }

721 }

722 }

723 catch (ConcurrentModificationException ex) {

724 return;

725 }

726 }

727

728

729 public int getPoolSize() {

730 return poolSize;

731 }

732

733 // 获取活跃线程数

734 public int getActiveCount() {

735 final ReentrantLock mainLock = this.mainLock;

736 mainLock.lock();

737 try {

738 int n = 0;

739 for (Worker w : workers) {

740 if (w.isActive())

741 ++n;

742 }

743 return n;

744 } finally {

745 mainLock.unlock();

746 }

747 }

748

749

750 public int getLargestPoolSize() {

751 final ReentrantLock mainLock = this.mainLock;

752 mainLock.lock();

753 try {

754 return largestPoolSize;

755 } finally {

756 mainLock.unlock();

757 }

758 }

759

760

761 public long getTaskCount() {

762 final ReentrantLock mainLock = this.mainLock;

763 mainLock.lock();

764 try {

765 long n = completedTaskCount;

766 for (Worker w : workers) {

767 // 统计已经完成的任务

768 n += w.completedTasks;

769 // 如果w是活跃线程,说明正在执行一个任务,所以n加一

770 if (w.isActive())

771 ++n;

772 }

773 // 加上队列中的任务

774 return n + workQueue.size();

775 } finally {

776 mainLock.unlock();

777 }

778 }

779

780 // 获取已完成的任务数

781 public long getCompletedTaskCount() {

782 final ReentrantLock mainLock = this.mainLock;

783 mainLock.lock();

784 try {

785 long n = completedTaskCount;

786 for (Worker w : workers)

787 n += w.completedTasks;

788 return n;

789 } finally {

790 mainLock.unlock();

791 }

792 }

793

794

795 protected void beforeExecute(Thread t, Runnable r) { }

796

797

798 protected void afterExecute(Runnable r, Throwable t) { }

799

800

801 protected void terminated() { }

802

803 // 实现了RejectedExecutionHandler,即是一个拒绝执行的Handler

804 public static class CallerRunsPolicy implements RejectedExecutionHandler {

805

806 public CallerRunsPolicy() { }

807

808

809 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {

810 if (!e.isShutdown()) {

811 r.run();

812 }

813 }

814 }

815

816

817 public static class AbortPolicy implements RejectedExecutionHandler {

818

819 public AbortPolicy() { }

820

821

822 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {

823 throw new RejectedExecutionException();

824 }

825 }

826

827

828 public static class DiscardPolicy implements RejectedExecutionHandler {

829

830 public DiscardPolicy() { }

831

832

833 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {

834 }

835 }

836

837

838 public static class DiscardOldestPolicy implements RejectedExecutionHandler {

839

840 public DiscardOldestPolicy() { }

841

842

843 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {

844 if (!e.isShutdown()) {

845 e.getQueue().poll();

846 e.execute(r);

847 }

848 }

849 }

850 }

    可以参考http://xtu-xiaoxin.iteye.com/blog/647744

    从上面的框架结构图中可以可以看出剩下的就是ScheduledThreadPoolExecutor和Executors。Executors是一个工具类,提供一些工厂和实用方法。

    下面看ScheduledThreadPoolExecutor,它继承自ThreadPoolExecutor并实现了ScheduledExecutorService接口。

    ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor

// 可以安排指定时间或周期性的执行任务的ExecutorService

public class ScheduledThreadPoolExecutor

extends ThreadPoolExecutor

implements ScheduledExecutorService {

// 在Shutdown的时候如果要取消或关闭任务,设置为false;true表示继续执行任务,在Shutdown之后

private volatile boolean continueExistingPeriodicTasksAfterShutdown;

// false表示在Shutdown的时候取消Delayed的任务;true表示执行这个任务

private volatile boolean executeExistingDelayedTasksAfterShutdown = true;

// 打破调度联系,进而保证先进先出的顺序捆绑项目中的序列号

private static final AtomicLong sequencer = new AtomicLong(0);

// 基准时间

private static final long NANO_ORIGIN = System.nanoTime();

// 当前时间(相对于基准时间的值)

final long now() {

return System.nanoTime() - NANO_ORIGIN;

}

// RunnableScheduledFuture接口表示是否是周期性的

private class ScheduledFutureTask<V>

extends FutureTask<V> implements RunnableScheduledFuture<V> {

private final long sequenceNumber;

// 预定安排执行的时刻

private long time;

// 表示重复任务,0表示不重复,正数表示固定比率,负数表示固定延时

private final long period;

// 构造方法,构造一个只执行一次的任务

ScheduledFutureTask(Runnable r, V result, long ns) {

super(r, result);

this.time = ns;

this.period = 0;

this.sequenceNumber = sequencer.getAndIncrement();

}

// 构造方法,构造一个按指定ns开始执行,指定period周期性执行

ScheduledFutureTask(Runnable r, V result, long ns, long period) {

super(r, result);

this.time = ns;

this.period = period;

this.sequenceNumber = sequencer.getAndIncrement();

}

// 构造方法,构造一个延时执行的任务

ScheduledFutureTask(Callable<V> callable, long ns) {

super(callable);

this.time = ns;

this.period = 0;

this.sequenceNumber = sequencer.getAndIncrement();

}

// 按指定单位获取延时时间

public long getDelay(TimeUnit unit) {

return unit.convert(time - now(), TimeUnit.NANOSECONDS);

}

// 判断传入延时和这个任务延时之间的大小关系

public int compareTo(Delayed other) {

// 为什么可以和Delayed比较?因为这个类实现了RunnableScheduledFuture接口,而RunnableScheduledFuture接口继承自Delayed接口

if (other == this) // compare zero ONLY if same object

return 0;

// other是ScheduledFutureTask实例

if (other instanceof ScheduledFutureTask) {

ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;

long diff = time - x.time;

// 比较大小

if (diff < 0)

return -1;

else if (diff > 0)

return 1;

else if (sequenceNumber < x.sequenceNumber)

return -1;

else

return 1;

}

long d = (getDelay(TimeUnit.NANOSECONDS) -

other.getDelay(TimeUnit.NANOSECONDS));

return (d == 0)? 0 : ((d < 0)? -1 : 1);

}

// 是否周期性的(包括延时的情况)

public boolean isPeriodic() {

return period != 0;

}

// 执行周期性的任务

private void runPeriodic() {

// 执行任务

boolean ok = ScheduledFutureTask.super.runAndReset();

// 判断是否已经shutdown

boolean down = isShutdown();

// 重新安排任务(如果没有shutdown或在没有关闭且允许在shutdown之后执行已存在的任务)

if (ok && (!down ||

(getContinueExistingPeriodicTasksAfterShutdownPolicy() &&

!isStopped()))) {

long p = period;

if (p > 0)

// 计算下一次执行的时间

time += p;

else

// 计算触发时间

time = triggerTime(-p);

// 将任务添加到队列中

ScheduledThreadPoolExecutor.super.getQueue().add(this);

}

else if (down)

interruptIdleWorkers();

}

// 执行任务,根据是否周期性调用不同的方法

public void run() {

if (isPeriodic())

runPeriodic();

else

ScheduledFutureTask.super.run();

}

}

// 延迟执行

private void delayedExecute(Runnable command) {

// 如果已经shutdown,决绝任务

if (isShutdown()) {

reject(command);

return;

}

if (getPoolSize() < getCorePoolSize())

// 预启动线程

prestartCoreThread();

super.getQueue().add(command);

}

// 取消和清除关闭政策不允许运行的任务

private void cancelUnwantedTasks() {

// 获取shutdown策略

boolean keepDelayed = getExecuteExistingDelayedTasksAfterShutdownPolicy();

boolean keepPeriodic = getContinueExistingPeriodicTasksAfterShutdownPolicy();

if (!keepDelayed && !keepPeriodic)

super.getQueue().clear();

else if (keepDelayed || keepPeriodic) {

Object[] entries = super.getQueue().toArray();

for (int i = 0; i < entries.length; ++i) {

Object e = entries[i];

if (e instanceof RunnableScheduledFuture) {

RunnableScheduledFuture<?> t = (RunnableScheduledFuture<?>)e;

// 根据是否周期性的任务通过制定的值判断进行取消操作

if (t.isPeriodic()? !keepPeriodic : !keepDelayed)

t.cancel(false);

}

}

entries = null;

// 净化,移除已经取消的任务

purge();

}

}

public boolean remove(Runnable task) {

if (!(task instanceof RunnableScheduledFuture))

return false;

return getQueue().remove(task);

}

protected <V> RunnableScheduledFuture<V> decorateTask(

Runnable runnable, RunnableScheduledFuture<V> task) {

return task;

}

protected <V> RunnableScheduledFuture<V> decorateTask(

Callable<V> callable, RunnableScheduledFuture<V> task) {

return task;

}

public ScheduledThreadPoolExecutor(int corePoolSize) {

super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,

new DelayedWorkQueue());

}

public ScheduledThreadPoolExecutor(int corePoolSize,

ThreadFactory threadFactory) {

super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,

new DelayedWorkQueue(), threadFactory);

}

public ScheduledThreadPoolExecutor(int corePoolSize,

RejectedExecutionHandler handler) {

super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,

new DelayedWorkQueue(), handler);

}

private long triggerTime(long delay, TimeUnit unit) {

return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));

}

long triggerTime(long delay) {

return now() +

((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));

}

// 避免移除,返回延迟的值

private long overflowFree(long delay) {

Delayed head = (Delayed) super.getQueue().peek();

if (head != null) {

long headDelay = head.getDelay(TimeUnit.NANOSECONDS);

if (headDelay < 0 && (delay - headDelay < 0))

delay = Long.MAX_VALUE + headDelay;

}

return delay;

}

public ScheduledThreadPoolExecutor(int corePoolSize,

ThreadFactory threadFactory,

RejectedExecutionHandler handler) {

super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,

new DelayedWorkQueue(), threadFactory, handler);

}

// 根据执行的延时时间执行任务

public ScheduledFuture<?> schedule(Runnable command,

long delay,

TimeUnit unit) {

if (command == null || unit == null)

throw new NullPointerException();

// ScheduledFutureTask的result为null

RunnableScheduledFuture<?> t = decorateTask(command,

new ScheduledFutureTask<Void>(command, null,

triggerTime(delay, unit)));

// 延时执行

delayedExecute(t);

return t;

}

// 上一个方法的重载形式,接收的是Callable

public <V> ScheduledFuture<V> schedule(Callable<V> callable,

long delay,

TimeUnit unit) {

if (callable == null || unit == null)

throw new NullPointerException();

RunnableScheduledFuture<V> t = decorateTask(callable,

new ScheduledFutureTask<V>(callable,

triggerTime(delay, unit)));

delayedExecute(t);

return t;

}

/**

* 创建并执行一个周期性的任务,在initialDelay延迟后每间隔period个单位执行一次,时间单位都是unit

* 每次执行任务的时间点是initialDelay, initialDelay+period, initialDelay + 2 * period...

*/

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,

long initialDelay,

long period,

TimeUnit unit) {

if (command == null || unit == null)

throw new NullPointerException();

if (period <= 0)

throw new IllegalArgumentException();

RunnableScheduledFuture<?> t = decorateTask(command,

new ScheduledFutureTask<Object>(command,

null,

triggerTime(initialDelay, unit),

unit.toNanos(period)));

delayedExecute(t);

return t;

}

/**

* 创建并执行一个周期性的任务,在initialDelay延迟后开始执行,在执行结束后再延迟delay个单位开始执行下一次任务,时间单位都是unit

* 每次执行任务的时间点是initialDelay, initialDelay+(任务运行时间+delay), initialDelay + 2 * (任务运行时间+delay)...

*/

public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,

long initialDelay,

long delay,

TimeUnit unit) {

if (command == null || unit == null)

throw new NullPointerException();

if (delay <= 0)

throw new IllegalArgumentException();

RunnableScheduledFuture<?> t = decorateTask(command,

new ScheduledFutureTask<Boolean>(command,

null,

triggerTime(initialDelay, unit),

unit.toNanos(-delay)));

delayedExecute(t);

return t;

}

// 执行任务

public void execute(Runnable command) {

if (command == null)

throw new NullPointerException();

// 立即执行,延时时间是0

schedule(command, 0, TimeUnit.NANOSECONDS);

}

// 重新 AbstractExecutorService 的方法

public Future<?> submit(Runnable task) {

return schedule(task, 0, TimeUnit.NANOSECONDS);

}

public <T> Future<T> submit(Runnable task, T result) {

return schedule(Executors.callable(task, result),

0, TimeUnit.NANOSECONDS);

}

public <T> Future<T> submit(Callable<T> task) {

return schedule(task, 0, TimeUnit.NANOSECONDS);

}

public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) {

continueExistingPeriodicTasksAfterShutdown = value;

if (!value && isShutdown())

cancelUnwantedTasks();

}

public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() {

return continueExistingPeriodicTasksAfterShutdown;

}

public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) {

executeExistingDelayedTasksAfterShutdown = value;

if (!value && isShutdown())

cancelUnwantedTasks();

}

public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() {

return executeExistingDelayedTasksAfterShutdown;

}

// 关闭

public void shutdown() {

// 取消任务

cancelUnwantedTasks();

super.shutdown();

}

// 立即关闭,调用的是父类立即关闭的方法

public List<Runnable> shutdownNow() {

return super.shutdownNow();

}

// 返回使用这个执行器的任务队列

public BlockingQueue<Runnable> getQueue() {

return super.getQueue();

}

// 将DelayQueue<RunnableScheduledFuture> 包装为 BlockingQueue<Runnable>的类

// 类似于代理

private static class DelayedWorkQueue

extends AbstractCollection<Runnable>

implements BlockingQueue<Runnable> {

private final DelayQueue<RunnableScheduledFuture> dq = new DelayQueue<RunnableScheduledFuture>();

public Runnable poll() { return dq.poll(); }

public Runnable peek() { return dq.peek(); }

public Runnable take() throws InterruptedException { return dq.take(); }

public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {

return dq.poll(timeout, unit);

}

public boolean add(Runnable x) {

return dq.add((RunnableScheduledFuture)x);

}

public boolean offer(Runnable x) {

return dq.offer((RunnableScheduledFuture)x);

}

public void put(Runnable x) {

dq.put((RunnableScheduledFuture)x);

}

public boolean offer(Runnable x, long timeout, TimeUnit unit) {

return dq.offer((RunnableScheduledFuture)x, timeout, unit);

}

public Runnable remove() { return dq.remove(); }

public Runnable element() { return dq.element(); }

public void clear() { dq.clear(); }

public int drainTo(Collection<? super Runnable> c) { return dq.drainTo(c); }

public int drainTo(Collection<? super Runnable> c, int maxElements) {

return dq.drainTo(c, maxElements);

}

public int remainingCapacity() { return dq.remainingCapacity(); }

public boolean remove(Object x) { return dq.remove(x); }

public boolean contains(Object x) { return dq.contains(x); }

public int size() { return dq.size(); }

public boolean isEmpty() { return dq.isEmpty(); }

public Object[] toArray() { return dq.toArray(); }

public <T> T[] toArray(T[] array) { return dq.toArray(array); }

public Iterator<Runnable> iterator() {

return new Iterator<Runnable>() {

private Iterator<RunnableScheduledFuture> it = dq.iterator();

public boolean hasNext() { return it.hasNext(); }

public Runnable next() { return it.next(); }

public void remove() { it.remove(); }

};

}

}

}

    在代码中都加了注释,我想大致能解释清楚吧。

    Executor涉及的类还是比较多的,到此为止剩下的还有Executors

    Executors

    Executors中所定义的 ExecutorExecutorServiceScheduledExecutorServiceThreadFactoryCallable 类的工厂和实用方法。此类支持以下各种方法:

  • 创建并返回设置有常用配置字符串的 ExecutorService 的方法。
  • 创建并返回设置有常用配置字符串的 ScheduledExecutorService 的方法。
  • 创建并返回“包装的”ExecutorService 方法,它通过使特定于实现的方法不可访问来禁用重新配置。
  • 创建并返回 ThreadFactory 的方法,它可将新创建的线程设置为已知的状态。
  • 创建并返回非闭包形式的 Callable 的方法,这样可将其用于需要 Callable 的执行方法中。 

     Executors提供的都是工具形式的方法,所以都是static的,并且这个类也没有必要实例化,所以它的构造方法时private的。下面主要看一下几个内部类。

   RunnableAdapter

 1 static final class RunnableAdapter<T> implements Callable<T> {

2 final Runnable task;

3 final T result;

4 RunnableAdapter(Runnable task, T result) {

5 this.task = task;

6 this.result = result;

7 }

8 public T call() {

9 task.run();

10 return result;

11 }

12 }

    适配器。以Callable的形式执行Runnable并且返回给定的result。

    PrivilegedCallable

 1 static final class PrivilegedCallable<T> implements Callable<T> {

2 private final AccessControlContext acc;

3 private final Callable<T> task;

4 private T result;

5 private Exception exception;

6 PrivilegedCallable(Callable<T> task) {

7 this.task = task;

8 this.acc = AccessController.getContext();

9 }

10

11 public T call() throws Exception {

12 AccessController.doPrivileged(new PrivilegedAction<T>() {

13 public T run() {

14 try {

15 result = task.call();

16 } catch (Exception ex) {

17 exception = ex;

18 }

19 return null;

20 }

21 }, acc);

22 if (exception != null)

23 throw exception;

24 else

25 return result;

26 }

27 }

    在访问控制下运行的Callable。涉及到Java.security包中的内容。

    PrivilegedCallableUsingCurrentClassLoader类与上面的PrivilegedCallable类似,只是使用的是CurrentClassLoader。

    DefaultThreadFactory

 1     static class DefaultThreadFactory implements ThreadFactory {

2 static final AtomicInteger poolNumber = new AtomicInteger(1);

3 final ThreadGroup group;

4 final AtomicInteger threadNumber = new AtomicInteger(1);

5 final String namePrefix;

6

7 DefaultThreadFactory() {

8 SecurityManager s = System.getSecurityManager();

9 group = (s != null)? s.getThreadGroup() :

10 Thread.currentThread().getThreadGroup();

11 namePrefix = "pool-" +

12 poolNumber.getAndIncrement() +

13 "-thread-";

14 }

15

16 public Thread newThread(Runnable r) {

17 // 调用Thread构造方法创建线程

18 Thread t = new Thread(group, r,

19 namePrefix + threadNumber.getAndIncrement(),

20 0);

21 // 取消守护线程设置

22 if (t.isDaemon())

23 t.setDaemon(false);

24 // 设置默认优先级

25 if (t.getPriority() != Thread.NORM_PRIORITY)

26 t.setPriority(Thread.NORM_PRIORITY);

27 return t;

28 }

29 }

    DefaultThreadFactory 是默认的线程工程,提供创建线程的方法。

    PrivilegedThreadFactory继承自DefaultThreadFactory,区别在于线程执行的run方法指定了classLoader并受到权限的控制。

    DelegatedExecutorService继承自AbstractExecutorService,是一个包装类,暴露ExecutorService的方法。

    DelegatedScheduledExecutorService继承自DelegatedExecutorService,实现了ScheduledExecutorService接口。它也是一个包装类,公开ScheduledExecutorService方法。 

以上是 Java Executor 框架 的全部内容, 来源链接: utcz.com/z/391139.html

回到顶部