定时任务实现原理详解
定时任务,可以说是业务系统必不可少的一个部分,今天我们就一起来了解一下 JDK 定时任务实现及原理分析。
一、摘要
在很多业务的系统中,我们常常需要定时的执行一些任务,例如定时发短信、定时变更数据、定时发起促销活动等等。
在上篇文章中,我们简单的介绍了定时任务的使用方式,不同的架构对应的解决方案也有所不同,总结起来主要分单机和分布式两大类,本文会重点分析下单机的定时任务实现原理以及优缺点,分布式框架的实现原理会在后续文章中进行分析。
从单机角度,定时任务实现主要有以下 3 种方案:
- while + sleep 组合
- 最小堆实现
- 时间轮实现
二、while+sleep组合
while+sleep 方案,简单的说,就是定义一个线程,然后 while 循环,通过 sleep 延迟时间来达到周期性调度任务。
简单示例如下:
1 |
|
实现上非常简单,如果我们想在创建一个每隔3秒钟执行一次任务,怎么办呢?
同样的,也可以在创建一个线程,然后间隔性的调度方法;但是如果创建了大量这种类型的线程,这个时候会发现大量的定时任务线程在调度切换时性能消耗会非常大,而且整体效率低!
面对这种在情况,大佬们也想到了,于是想出了用一个线程将所有的定时任务存起来,事先排好序,按照一定的规则来调度,这样不就可以极大的减少每个线程的切换消耗吗?
正因此,JDK 中的 Timer 定时器由此诞生了!
三、最小堆实现
所谓最小堆方案,正如我们上面所说的,每当有新任务加入的时候,会把需要即将要执行的任务排到前面,同时会有一个线程不断的轮询判断,如果当前某个任务已经到达执行时间点,就会立即执行,具体实现代表就是 JDK 中的 Timer 定时器!
3.1、Timer
首先我们来一个简单的 Timer 定时器例子
1 |
|
实现上,好像跟我们上面介绍的 while+sleep 方案差不多,同样也是起一个TimerTask
线程任务,只不过共用一个Timer
调度器。
下面我们一起来打开源码看看里面到底有些啥!
- 进入Timer.schedule()方法
从方法上可以看出,这里主要做参数验证,其中
TimerTask
是一个线程任务,delay
表示延迟多久执行(单位毫秒),period
表示多久执行一次(单位毫秒)
1 |
|
- 接着看
sched()
方法
这步操作中,可以很清晰的看到,在同步代码块里,会将
task
对象加入到queue
1 |
|
- 我们继续来看
queue
对象
任务会将入到
TaskQueue
队列中,同时在Timer
初始化阶段会将TaskQueue
作为参数传入到TimerThread
线程中,并且起到线程
1 |
|
- 而
TaskQueue
其实是一个最小堆的数据实体类,源码如下
每当有新元素加入的时候,会对原来的数组进行重排,会将即将要执行的任务排在数组的前面
1 |
|
- 最后我们来看看
TimerThread
TimerThread
其实就是一个任务调度线程,首先从TaskQueue
里面获取排在最前面的任务,然后判断它是否到达任务执行时间点,如果已到达,就会立刻执行任务
1 |
|
总结这个利用最小堆实现的方案,相比 while + sleep 方案,多了一个线程来管理所有的任务,优点就是减少了线程之间的性能开销,提升了执行效率;但是同样也带来的了一些缺点,整体的新加任务写入效率变成了 O(log(n))。
同时,细心的发现,这个方案还有以下几个缺点:
串行阻塞:调度线程只有一个,长任务会阻塞短任务的执行,例如,A任务跑了一分钟,B任务至少需要等1分钟才能跑
容错能力差:没有异常处理能力,一旦一个任务执行故障,后续任务都无法执行
3.2、ScheduledThreadPoolExecutor
鉴于 Timer 的上述缺陷,从 Java 5 开始,推出了基于线程池设计的 ScheduledThreadPoolExecutor 。
其设计思想是,每一个被调度的任务都会由线程池来管理执行,因此任务是并发执行的,相互之间不会受到干扰。需要注意的是,只有当任务的执行时间到来时,ScheduledThreadPoolExecutor 才会真正启动一个线程,其余时间 ScheduledThreadPoolExecutor 都是在轮询任务的状态。
简单的使用示例:
1 |
|
同样的,我们首先打开源码,看看里面到底做了啥
- 进入
scheduleAtFixedRate()
方法
首先是校验基本参数,然后将任务作为封装到
ScheduledFutureTask
线程中,ScheduledFutureTask
继承自RunnableScheduledFuture
,并作为参数调用delayedExecute()
方法进行预处理
1 |
|
- 继续看
delayedExecute()
方法
可以很清晰的看到,当线程池没有关闭的时候,会通过
super.getQueue().add(task)
操作,将任务加入到队列,同时调用ensurePrestart()
方法做预处理
1 |
|
其中
super.getQueue()
得到的是一个自定义的new DelayedWorkQueue()
阻塞队列,数据存储方面也是一个最小堆结构的队列,这一点在初始化new ScheduledThreadPoolExecutor()
的时候,可以看出!
1 |
|
打开源码可以看到,
DelayedWorkQueue
其实是ScheduledThreadPoolExecutor
中的一个静态内部类,在添加的时候,会将任务加入到RunnableScheduledFuture
数组中,同时线程池中的Woker
线程会通过调用任务队列中的take()
方法获取对应的ScheduledFutureTask
线程任务,接着执行对应的任务线程
1 |
|
- 回到我们最开始说到的
ScheduledFutureTask
任务线程类,最终执行任务的其实就是它
ScheduledFutureTask
任务线程,才是真正执行任务的线程类,只是绕了一圈,做了很多包装,run()
方法就是真正执行定时任务的方法。
1 |
|
3.3、小结
ScheduledExecutorService 相比 Timer 定时器,完美的解决上面说到的 Timer 存在的两个缺点!
在单体应用里面,使用 ScheduledExecutorService 可以解决大部分需要使用定时任务的业务需求!
但是这是否意味着它是最佳的解决方案呢?
我们发现线程池中 ScheduledExecutorService 的排序容器跟 Timer 一样,都是采用最小堆的存储结构,新任务加入排序效率是O(log(n))
,执行取任务是O(1)
。
这里的写入排序效率其实是有空间可提升的,有可能优化到O(1)
的时间复杂度,也就是我们下面要介绍的时间轮实现!
四、时间轮实现
所谓时间轮(RingBuffer)实现,从数据结构上看,简单的说就是循环队列,从名称上看可能感觉很抽象。
它其实就是一个环形的数组,如图所示,假设我们创建了一个长度为 8 的时间轮。
插入、取值流程:
- 1.当我们需要新建一个 1s 延时任务的时候,则只需要将它放到下标为 1 的那个槽中,2、3、…、7也同样如此。
- 2.而如果是新建一个 10s 的延时任务,则需要将它放到下标为 2 的槽中,但同时需要记录它所对应的圈数,也就是 1 圈,不然就和 2 秒的延时消息重复了
- 3.当创建一个 21s 的延时任务时,它所在的位置就在下标为 5 的槽中,同样的需要为他加上圈数为 2,依次类推…
因此,总结起来有两个核心的变量:
- 数组下标:表示某个任务延迟时间,从数据操作上对执行时间点进行取余
- 圈数:表示需要循环圈数
通过这张图可以更直观的理解!
当我们需要取出延时任务时,只需要每秒往下移动这个指针,然后取出该位置的所有任务即可,取任务的时间消耗为O(1)
。
当我们需要插入任务式,也只需要计算出对应的下表和圈数,即可将任务插入到对应的数组位置中,插入任务的时间消耗为O(1)
。
如果时间轮的槽比较少,会导致某一个槽上的任务非常多,那么效率也比较低,这就和 HashMap 的 hash 冲突是一样的,因此在设计槽的时候不能太大也不能太小。
4.1、代码实现
- 首先创建一个
RingBufferWheel
时间轮定时任务管理器
1 |
|
- 接着,编写一个客户端,测试客户端
1 |
|
运行结果:
1 |
|
如果要周期性执行任务,可以在任务执行完成之后,再重新加入到时间轮中。
详细源码分析地址:点击这里获取
4.2、应用
时间轮的应用还是非常广的,例如在 Disruptor 项目中就运用到了 RingBuffer,还有Netty
中的HashedWheelTimer
工具原理也差不多等等,有兴趣的同学,可以阅读一下官方对应的源码!
五、小结
本文主要围绕单体应用中的定时任务原理进行分析,可能也有理解不对的地方,欢迎批评指出!
六、参考
1、简书 - 谈谈定时任务解决方案原理
2、crossoverJie’s Blog - 延时消息之时间轮
以上是 定时任务实现原理详解 的全部内容, 来源链接: utcz.com/a/130478.html