【Java】通用滑动窗口的实现
背景
在之前《基于日志告警》的实现中,可以配置每小时最多发送X条告警
,但是实现的比较粗暴:
- 强制为
每小时
。 - 是以
自然小时
为时间段(从整点0分到60分),就容易出现类似59分发送X条,下一个小时的1分又发送X条
的情况,也就是在59-1分
这短短几分钟之内发送X * 2
条告警。
这里更好的方案应该是:可以自定义配置在X时间段内最多发送Y条告警
,时间段
以滑动窗口
来计数。
关键点:
数据结构
固定长度的数据为一个窗口
,数组的每个对象就是一个桶
,通过下标指针来标识当前窗口。
比如以下例子是时长为60秒的窗口,分为10个桶,即每个桶时长6秒。
public class SlidingWindow {protected int bucketCount;
protected int intervalInMs;
protected WindowBucket[] windowBuckets;
}
滑动方式
- 主动:启动定时任务,定时滑动下标指针。
- 被动:数据进来时,实时根据时间计算当前下标。
以上 主动
方式的方案的缺点:
- 需要额外加个定时任务。
- 定时任务本身可能会影响到下标滑动的精准度,特别是如果窗口时长很短,比如:时长1秒,并且分成100个桶。
新/旧数据
以上被动
的方式,在统计汇总时,怎么过滤掉已过期的的数据
(数据是上一个窗口的数据)?
依旧以时长为60秒的窗口,分为10个桶为例子:
- 第
5
秒的时候,插入数据,这时候插入的桶下标为1
。 - 第
61-66
秒都没有数据,也就是新的一轮窗口,下标为1
的桶并没有修改
。 - 第
70
秒的时候,插入数据,插入的桶下标为2
。这时候如果合并数组的数据,会把下标1
的数据都合并进来,但它的数据是上个窗口
的数据,不应该被算进来。
解决方案是给每个桶加一个开始时间
。
例如:刚开始第一个桶的起始时间是1s
,第二个桶的起始时间是7s
。经过一轮(60s)后,第一个桶的起始时间应该是61s
,第二个桶的起始时间变为67s
。
在插入数据时,也通过开始时间
判断当前桶的数据是不是上个窗口的:
- 如果不是:当前桶数据上累加。
- 如果是:则需要将桶数据重置(计数改为0,重新设置当前桶的起始时间),通过锁来防止出现并发问题。
合并的时候,检查每个桶的开始时间
距离当前时间是否在一个窗口内
。
比如以上例子,在第70
秒的插入数据到第二个桶,且第二个桶的起始时间被改为67s
。这时候合并统计当前窗口数据时,发现下标1
的起始时间是1s
,不在当前窗口时间内(67-0 > 60
),就不算入合并值。
public class WindowBucketWrap<T extends WindowBucket> {private volatile long beginTime;
private T windowBucket;
}
同时为了防止出现时钟回拨
的问题,取时间用的是nanoTime
。
由于JAVA的System.nanoTime()
存在性能上的问题,这里自定义了时间服务
。
/*** @author
* @data 2020-12-21 11:07
* @description
*/
public class TimeServer {
/**
* 一毫秒有多少纳秒.
*/
public static final long NANOS_PER_MILLI = 1000_000L;
private static final long beginNanoTime;
private static volatile long beginMilliTime;
private static volatile long currentNanoTime;
static {
currentNanoTime = System.nanoTime();
beginNanoTime = currentNanoTime;
beginMilliTime = System.currentTimeMillis();
Thread daemon = new Thread(new Runnable() {
@Override
public void run() {
while (true) {
currentNanoTime = System.nanoTime();
try {
TimeUnit.NANOSECONDS.sleep(NANOS_PER_MILLI / 2);
} catch (Throwable ignore) {
}
}
}
});
daemon.setDaemon(true);
daemon.setName("slidingwindow-time-tick-thread");
daemon.start();
}
数据字段
不同的业务场景,窗口保存的数据是不一样的。比如项目背景的日志告警
可以只有已告警次数
一种数据,限流
可以有已通过
和被限流
两种数据;既然是通用的,那就要兼容各种场景,不同的业务可以定义不同的统计数据。
这里可以将桶的数据字段设为数组类型,长度和列表通过数据枚举
来指定。
public class EventTypeWrap<E extends Enum> {private E[] eventTypes;
public EventTypeWrap(Class<E> clazz) {
eventTypes = clazz.getEnumConstants();
Assert.notEmpty(eventTypes, "事件类型列表为空!");
}
public E[] getEventTypes() {
return eventTypes;
}
}
public class SlidingWindow {
public WindowBucket buildWindowBucket() {
int eventTypeSize = eventTypeWrap.getEventTypes().length;
return new WindowBucket(eventTypeSize);
}
}
public class WindowBucket {
private long[] eventValues;
public WindowBucket(int eventSize) {
eventValues = new long[eventSize];
}
}
数据计算时,传入指定的类型。
public void add(Enum eventType) {……
}
性能和一致性
由于数据分散到多个桶内,合并值的时候,需要将所有桶的数据加起来,而在合并的过程中,桶内的数据是会改变的,这里的实现分两种:
- 强一致性:加锁。
- 最终一致性:不加锁。
public class AccurateWindowBucket extends WindowBucket {private long[] eventValues;
private ReentrantLock updateLock;
/**
* @param eventSize
* @author
* @date 2020-12-21 15:35
*/
public AccurateWindowBucket(ReentrantLock updateLock, int eventSize) {
super(eventSize);
this.updateLock = updateLock;
eventValues = new long[eventSize];
}
@Override
public void add(Enum eventType, int value) {
updateLock.lock();
try {
int index = eventType.ordinal();
eventValues[index] = eventValues[index] + value;
} finally {
updateLock.unlock();
}
}
}
public class EfficiencyWindowBucket extends WindowBucket {
private LongAdder[] eventValues;
/**
* @param eventSize
* @author
* @date 2020-12-21 15:35
*/
public EfficiencyWindowBucket(int eventSize) {
super(eventSize);
eventValues = new LongAdder[eventSize];
IntStream.range(0, eventSize).forEach(i -> {
eventValues[i] = new LongAdder();
});
}
@Override
public void add(Enum eventType, int value) {
eventValues[eventType.ordinal()].add(value);
}
}
以上两种方案,对性能
有极致
要求的,用最终一致性
,对精度
有要求的,用强一致性
。
源码
窗口桶:
public abstract class WindowBucket {protected int eventSize;
/**
* @param eventSize
* @author
* @date 2020-12-21 15:35
*/
public WindowBucket(int eventSize) {
this.eventSize = eventSize;
}
/**
* @param eventType
* @return long
* @author
* @date 2020-12-18 11:22
*/
public void add(Enum eventType) {
add(eventType, 1);
}
/**
* @param eventType
* @param value
* @return long
* @author
* @date 2020-12-18 11:22
*/
public abstract void add(Enum eventType, int value);
/**
* @param eventType
* @return long
* @author
* @date 2020-12-24 17:13
*/
public abstract long addAndGet(Enum eventType);
/**
* @param eventType
* @return long
* @author
* @date 2020-12-21 15:36
*/
public abstract long getValue(Enum eventType);
/**
* @author
* @date 2020-12-21 17:28
*/
public abstract void reset();
/**
* @return long[]
* @author
* @date 2020-12-21 17:29
*/
public abstract long[] getValues();
@Override
public String toString() {
return this.getClass().getSimpleName() + "{" +
"eventValues=" + Arrays.toString(getValues()) +
'}';
}
}
/**
* 性能优先,最终一致性
* @author minchin
* @date 2020-12-24 11:02
*/
public class EfficiencyWindowBucket extends WindowBucket {
private LongAdder[] eventValues;
/**
* @param eventSize
* @author minchin
* @date 2020-12-21 15:35
*/
public EfficiencyWindowBucket(int eventSize) {
super(eventSize);
eventValues = new LongAdder[eventSize];
IntStream.range(0, eventSize).forEach(i -> {
eventValues[i] = new LongAdder();
});
}
@Override
public void add(Enum eventType, int value) {
eventValues[eventType.ordinal()].add(value);
}
@Override
public long addAndGet(Enum eventType) {
add(eventType);
return getValue(eventType);
}
@Override
public long getValue(Enum eventType) {
return eventValues[eventType.ordinal()].longValue();
}
@Override
public void reset() {
for (LongAdder value : eventValues) {
value.reset();
}
}
@Override
public long[] getValues() {
long[] values = new long[eventSize];
IntStream.range(0, eventSize).forEach(i -> {
values[i] = eventValues[i].longValue();
});
return values;
}
}
/**
* 强一致性
* @author
* @data 2020-12-24 10:35
* @description
*/
public class AccurateWindowBucket extends WindowBucket {
private long[] eventValues;
private ReentrantLock updateLock;
/**
* @param eventSize
* @author minchin
* @date 2020-12-21 15:35
*/
public AccurateWindowBucket(ReentrantLock updateLock, int eventSize) {
super(eventSize);
this.updateLock = updateLock;
eventValues = new long[eventSize];
}
@Override
public void add(Enum eventType, int value) {
updateLock.lock();
try {
int index = eventType.ordinal();
eventValues[index] = eventValues[index] + value;
} finally {
updateLock.unlock();
}
}
@Override
public long addAndGet(Enum eventType) {
updateLock.lock();
try {
add(eventType);
return getValue(eventType);
} finally {
updateLock.unlock();
}
}
@Override
public long getValue(Enum eventType) {
updateLock.lock();
try {
return eventValues[eventType.ordinal()];
} finally {
updateLock.unlock();
}
}
@Override
public void reset() {
updateLock.lock();
try {
IntStream.range(0, eventSize).forEach(i -> eventValues[i] = 0L);
} finally {
updateLock.unlock();
}
}
@Override
public long[] getValues() {
updateLock.lock();
try {
return Arrays.copyOf(eventValues, eventValues.length);
} finally {
updateLock.unlock();
}
}
}
/**
* @author
* @data 2020-12-18 11:17
* @description
*/
public class WindowBucketWrap<T extends WindowBucket> {
private volatile long beginTime;
private T windowBucket;
public WindowBucketWrap(long beginTime, T windowBucket) {
this.beginTime = beginTime;
this.windowBucket = windowBucket;
}
/**
* @param windowStarTime
* @author minchin
* @date 2020-12-21 10:14
*/
public void reset(long windowStarTime) {
windowBucket.reset();
beginTime = windowStarTime;
}
public long getBeginTime() {
return beginTime;
}
public WindowBucketWrap<T> setBeginTime(long beginTime) {
this.beginTime = beginTime;
return this;
}
public T getWindowBucket() {
return windowBucket;
}
@Override
public String toString() {
return "WindowBucketWrap{" +
"beginTime=" + beginTime +
", windowBucket=" + windowBucket +
'}';
}
}
窗口:
/**
* @author
* @data 2020-12-17 17:14
* @description
*/
public class EventTypeWrap<E extends Enum> {
private E[] eventTypes;
public EventTypeWrap(Class<E> clazz) {
eventTypes = clazz.getEnumConstants();
Assert.notEmpty(eventTypes, "事件类型列表为空!");
}
public E[] getEventTypes() {
return eventTypes;
}
public EventTypeWrap<E> setEventTypes(E[] eventTypes) {
this.eventTypes = eventTypes;
return this;
}
}
/**
* @author
* @data 2020-12-24 11:16
* @description
*/
public enum SlidingWindowType {
/**
* 精准
*/
ACCURATE,
/**
* 性能,最终一致
*/
EFFICIENCY
}
/**
* @author
* @data 2020-12-18 15:36
* @description
*/
public abstract class SlidingWindow {
protected ReentrantLock updateLock;
protected Class<? extends Enum> eventTypeClass;
protected SlidingWindowType windowType = SlidingWindowType.EFFICIENCY;
protected EventTypeWrap<? extends Enum> eventTypeWrap;
protected int bucketCount;
protected int intervalInMs;
protected int bucketLengthInMs;
protected WindowBucketWrap[] windowBucketWraps;
protected SlidingWindow() {
}
/**
* @return com.xxxx.xxxx.common.slidingwindow.SlidingWindow
* @author
* @date 2020-12-18 16:58
*/
protected SlidingWindow init() {
if (eventTypeClass == null || bucketCount == 0 || intervalInMs == 0) {
throw new IllegalArgumentException("init SlidingWindow error!");
}
updateLock = new ReentrantLock();
eventTypeWrap = new EventTypeWrap(eventTypeClass);
long currentTime = TimeServer.getIntervalMilliTime();
windowBucketWraps = new WindowBucketWrap[bucketCount];
IntStream.range(0, bucketCount).forEach(i -> {
windowBucketWraps[i] = new WindowBucketWrap(currentTime, buildWindowBucket());
});
bucketLengthInMs = intervalInMs / bucketCount;
return this;
}
/**
* @return com.xxxx.xxxx.common.slidingwindow.WindowBucket
* @author
* @date 2020-12-24 11:22
*/
protected abstract WindowBucket buildWindowBucket();
/**
* @param eventType
* @author
* @date 2020-12-21 17:33
*/
public void add(Enum eventType) {
WindowBucket windowBucket = getCurrentBucket().getWindowBucket();
windowBucket.add(eventType);
}
/**
* @param eventType
* @param value
* @author
* @date 2020-12-24 17:16
*/
public void add(Enum eventType, int value) {
WindowBucket windowBucket = getCurrentBucket().getWindowBucket();
windowBucket.add(eventType, value);
}
/**
* @param eventType
* @return long
* @author
* @date 2020-12-24 17:13
*/
public abstract long addAndSum(Enum eventType);
/**
* 类似LongAdder,提升性能,但是部分丢失精度
*
* @param eventType
* @return long
* @author
* @date 2020-12-21 17:42
*/
public abstract long sum(Enum eventType);
/**
* @return com.xxxx.xxxx.common.slidingwindow.WindowBucketWrap
* @author
* @date 2020-12-18 16:56
*/
public WindowBucketWrap getCurrentBucket() {
long currentTime = TimeServer.getIntervalMilliTime();
int bucketId = calculateCurrentBucketId(currentTime);
long windowStarTime = calculateWindowStartTime(currentTime);
while (true) {
WindowBucketWrap windowBucketWrap = windowBucketWraps[bucketId];
long windowBucketBeginTime = windowBucketWrap.getBeginTime();
if (windowBucketBeginTime == windowStarTime) {
return windowBucketWrap;
}
// 新的一轮
if (windowStarTime > windowBucketBeginTime) {
resetWindow(windowBucketWrap, windowStarTime);
} else {
//由于用了nanoTime,不应该出现"时钟回拨"问题,所以直接抛异常
throw new DateTimeException(
String.format("windowBucketBeginTime: %d, windowStarTime: %d, 当前时钟异常,请查看是否出现类似'时钟回拨'的问题!",
windowBucketBeginTime, windowStarTime));
}
}
}
/**
* @param currentTime
* @return int
* @author
* @date 2020-12-18 17:09
*/
protected int calculateCurrentBucketId(long currentTime) {
long timeId = currentTime / bucketLengthInMs;
return (int) (timeId % windowBucketWraps.length);
}
/**
* @param currentTime
* @return long
* @author
* @date 2020-12-18 17:19
*/
protected long calculateWindowStartTime(long currentTime) {
return currentTime - currentTime % bucketLengthInMs;
}
/**
* @param windowBucketWrap
* @param windowStarTime
* @author
* @date 2020-12-21 10:30
*/
protected void resetWindow(WindowBucketWrap windowBucketWrap, long windowStarTime) {
if (updateLock.tryLock()) {
try {
if (windowStarTime > windowBucketWrap.getBeginTime()) {
windowBucketWrap.reset(windowStarTime);
}
} finally {
updateLock.unlock();
}
} else {
// 如果继续循环,很可能其他线程还没操作完,先让出CPU给其他业务/线程
Thread.yield();
}
}
/**
* @param currentTime
* @param windowBucketWrap
* @return boolean
* @author
* @date 2020-12-21 17:37
*/
protected boolean isWindowDeprecated(long currentTime, WindowBucketWrap<WindowBucket> windowBucketWrap) {
return currentTime - windowBucketWrap.getBeginTime() > intervalInMs;
}
public Class<? extends Enum> getEventTypeClass() {
return eventTypeClass;
}
protected SlidingWindow setEventTypeClass(Class<? extends Enum> eventTypeClass) {
this.eventTypeClass = eventTypeClass;
return this;
}
/**
* @return com.xxxx.xxxx.common.slidingwindow.EventTypeWrap<? extends java.lang.Enum>
* @author
* @date 2020-12-18 16:48
*/
public EventTypeWrap<? extends Enum> getEventTypeWrap() {
return eventTypeWrap;
}
/**
* @return int
* @author
* @date 2020-12-18 16:56
*/
public int getBucketCount() {
return bucketCount;
}
/**
* @param bucketCount
* @return com.xxxx.xxxx.common.slidingwindow.SlidingWindow
* @author
* @date 2020-12-18 16:56
*/
protected SlidingWindow setBucketCount(int bucketCount) {
this.bucketCount = bucketCount;
return this;
}
/**
* @return int
* @author
* @date 2020-12-18 16:56
*/
public int getIntervalInMs() {
return intervalInMs;
}
/**
* @param intervalInMs
* @return com.xxxx.xxxx.common.slidingwindow.SlidingWindow
* @author
* @date 2020-12-18 16:56
*/
protected SlidingWindow setIntervalInMs(int intervalInMs) {
this.intervalInMs = intervalInMs;
return this;
}
/**
* @return com.xxxx.xxxx.common.slidingwindow.SlidingWindowType
* @author
* @date 2020-12-24 11:18
*/
public SlidingWindowType getWindowType() {
return windowType;
}
/**
* @param windowType
* @return com.xxxx.xxxx.common.slidingwindow.SlidingWindow
* @author
* @date 2020-12-24 11:18
*/
protected SlidingWindow setWindowType(SlidingWindowType windowType) {
this.windowType = windowType;
return this;
}
@Override public String toString() {
return "SlidingWindow{" +
"eventTypeClass=" + eventTypeClass.getName() +
", windowType=" + windowType +
", bucketCount=" + bucketCount +
", intervalInMs=" + intervalInMs +
", bucketLengthInMs=" + bucketLengthInMs +
", windowBucketWraps=\n" + StringUtils.join(windowBucketWraps, ",\n") +
'}';
}
/**
* @author
* @date 2020-12-18 16:48
*/
public static class SlidingWindowBuilder {
private SlidingWindow slidingWindow;
public SlidingWindowBuilder() {
this(SlidingWindowType.EFFICIENCY);
}
public SlidingWindowBuilder(SlidingWindowType windowType) {
if (SlidingWindowType.ACCURATE.equals(windowType)) {
slidingWindow = new AccurateSlidingWindow();
} else {
slidingWindow = new EfficiencySlidingWindow();
}
}
/**
* @param eventTypeClass
* @return com.xxxx.xxxx.common.slidingwindow.SlidingWindow.SlidingWindowBuilder
* @author
* @date 2020-12-18 16:48
*/
public SlidingWindowBuilder ofEventTypeClass(Class<? extends Enum> eventTypeClass) {
slidingWindow.setEventTypeClass(eventTypeClass);
return this;
}
/**
* @param bucketCount
* @return com.xxxx.xxxx.common.slidingwindow.SlidingWindow.SlidingWindowBuilder
* @author
* @date 2020-12-18 16:55
*/
public SlidingWindowBuilder ofBucketCount(int bucketCount) {
slidingWindow.setBucketCount(bucketCount);
return this;
}
/**
* @param windowIntervalInMs
* @return com.xxxx.xxxx.common.slidingwindow.SlidingWindow.SlidingWindowBuilder
* @author
* @date 2020-12-18 16:55
*/
public SlidingWindowBuilder ofWindowIntervalInMs(int windowIntervalInMs) {
slidingWindow.setIntervalInMs(windowIntervalInMs);
return this;
}
/**
* @return com.xxxx.xxxx.common.slidingwindow.SlidingWindow
* @author
* @date 2020-12-18 16:48
*/
public SlidingWindow build() {
return slidingWindow.init();
}
}
}
/**
* @author
* @data 2020-12-24 17:26
* @description
*/
public class AccurateSlidingWindow extends SlidingWindow {
@Override
public WindowBucket buildWindowBucket() {
int eventTypeSize = eventTypeWrap.getEventTypes().length;
return new AccurateWindowBucket(updateLock, eventTypeSize);
}
@Override
public long addAndSum(Enum eventType) {
updateLock.lock();
try {
WindowBucket windowBucket = getCurrentBucket().getWindowBucket();
windowBucket.add(eventType);
return sum(eventType);
} finally {
updateLock.unlock();
}
}
@Override
public long sum(Enum eventType) {
updateLock.lock();
try {
long value = 0;
long currentTime = TimeServer.getIntervalMilliTime();
for (WindowBucketWrap windowBucketWrap : windowBucketWraps) {
if (!isWindowDeprecated(currentTime, windowBucketWrap)) {
value += windowBucketWrap.getWindowBucket().getValue(eventType);
}
}
return value;
} finally {
updateLock.unlock();
}
}
}
/**
* @author
* @data 2020-12-24 17:26
* @description
*/
public class EfficiencySlidingWindow extends SlidingWindow {
@Override
public WindowBucket buildWindowBucket() {
int eventTypeSize = eventTypeWrap.getEventTypes().length;
return new EfficiencyWindowBucket(eventTypeSize);
}
@Override
public long addAndSum(Enum eventType) {
add(eventType);
return sum(eventType);
}
@Override
public long sum(Enum eventType) {
long value = 0;
long currentTime = TimeServer.getIntervalMilliTime();
for (WindowBucketWrap windowBucketWrap : windowBucketWraps) {
if (!isWindowDeprecated(currentTime, windowBucketWrap)) {
value += windowBucketWrap.getWindowBucket().getValue(eventType);
}
}
return value;
}
}
时间服务器:
/*** @author
* @data 2020-12-21 11:07
* @description
*/
public class TimeServer {
/**
* 一毫秒有多少纳秒.
*/
public static final long NANOS_PER_MILLI = 1000_000L;
private static final long beginNanoTime;
private static volatile long beginMilliTime;
private static volatile long currentNanoTime;
static {
currentNanoTime = System.nanoTime();
beginNanoTime = currentNanoTime;
beginMilliTime = System.currentTimeMillis();
Thread daemon = new Thread(new Runnable() {
@Override
public void run() {
while (true) {
currentNanoTime = System.nanoTime();
try {
// 不需要非常高的精度,允许有点儿误差
TimeUnit.NANOSECONDS.sleep(NANOS_PER_MILLI / 2);
} catch (Throwable ignore) {
}
}
}
});
daemon.setDaemon(true);
daemon.setName("slidingwindow-time-tick-thread");
daemon.start();
}
/**
* @return long
* @author
* @date 2020-12-21 11:14
*/
public static long getCurrentNanoTime() {
return currentNanoTime;
}
/**
* @return long
* @author
* @date 2020-12-21 12:59
*/
public static long getCurrentMilliTime() {
return beginMilliTime + getIntervalMilliTime();
}
/**
* @return long
* @author
* @date 2020-12-21 11:16
*/
public static long getIntervalNanoTime() {
return getCurrentNanoTime() - beginNanoTime;
}
/**
* @return long
* @author
* @date 2020-12-21 11:20
*/
public static long getIntervalMilliTime() {
return getIntervalNanoTime() / NANOS_PER_MILLI;
}
}
使用
原先的日志次数限流类TimesLimiter
引入滑动窗口:
/***
* @author
* @data 2020-06-02 10:51
* @description
*/
public class TimesLimiter implements Limiter {
private static final int SLIDINGWINDOW_BUCKETCOUNT = 10;
private int timesLimit;
private SlidingWindow slidingWindow;
public TimesLimiter(int timesLimitSeconds, int timesLimit) {
this.timesLimit = timesLimit;
slidingWindow = new SlidingWindow.SlidingWindowBuilder(SlidingWindowType.ACCURATE)
.ofEventTypeClass(StandardLevel.class)
.ofBucketCount(SLIDINGWINDOW_BUCKETCOUNT)
.ofWindowIntervalInMs(timesLimitSeconds * 1000)
.build();
}
@Override
public boolean isLimited(String content) {
if (slidingWindow.addAndSum(StandardLevel.ERROR) > timesLimit) {
slidingWindow.add(StandardLevel.ERROR, -1);
return true;
}
return false;
}
}
以上是 【Java】通用滑动窗口的实现 的全部内容, 来源链接: utcz.com/a/90748.html