【Java】通用滑动窗口的实现

背景

在之前《基于日志告警》的实现中,可以配置每小时最多发送X条告警,但是实现的比较粗暴:

  1. 强制为每小时
  2. 是以自然小时为时间段(从整点0分到60分),就容易出现类似59分发送X条,下一个小时的1分又发送X条的情况,也就是在59-1分这短短几分钟之内发送X * 2条告警。

这里更好的方案应该是:可以自定义配置在X时间段内最多发送Y条告警时间段滑动窗口来计数。

关键点:

数据结构

固定长度的数据为一个窗口,数组的每个对象就是一个,通过下标指针来标识当前窗口。
比如以下例子是时长为60秒的窗口,分为10个桶,即每个桶时长6秒。
【Java】通用滑动窗口的实现

public class SlidingWindow {

protected int bucketCount;

protected int intervalInMs;

protected WindowBucket[] windowBuckets;

}

滑动方式

  1. 主动:启动定时任务,定时滑动下标指针。
  2. 被动:数据进来时,实时根据时间计算当前下标。

以上 主动方式的方案的缺点:

  1. 需要额外加个定时任务。
  2. 定时任务本身可能会影响到下标滑动的精准度,特别是如果窗口时长很短,比如:时长1秒,并且分成100个桶。

新/旧数据

以上被动的方式,在统计汇总时,怎么过滤掉已过期的的数据(数据是上一个窗口的数据)?
依旧以时长为60秒的窗口,分为10个桶为例子:
【Java】通用滑动窗口的实现

  1. 5秒的时候,插入数据,这时候插入的桶下标为1
  2. 61-66秒都没有数据,也就是新的一轮窗口,下标为1的桶并没有修改
  3. 70秒的时候,插入数据,插入的桶下标为2。这时候如果合并数组的数据,会把下标1的数据都合并进来,但它的数据是上个窗口的数据,不应该被算进来。

解决方案是给每个桶加一个开始时间
例如:刚开始第一个桶的起始时间是1s,第二个桶的起始时间是7s。经过一轮(60s)后,第一个桶的起始时间应该是61s,第二个桶的起始时间变为67s
在插入数据时,也通过开始时间判断当前桶的数据是不是上个窗口的:

  1. 如果不是:当前桶数据上累加。
  2. 如果是:则需要将桶数据重置(计数改为0,重新设置当前桶的起始时间),通过锁来防止出现并发问题。

合并的时候,检查每个桶的开始时间距离当前时间是否在一个窗口内
比如以上例子,在第70秒的插入数据到第二个桶,且第二个桶的起始时间被改为67s。这时候合并统计当前窗口数据时,发现下标1的起始时间是1s,不在当前窗口时间内(67-0 > 60),就不算入合并值。

public class WindowBucketWrap<T extends WindowBucket> {

private volatile long beginTime;

private T windowBucket;

}

同时为了防止出现时钟回拨的问题,取时间用的是nanoTime
由于JAVA的System.nanoTime()存在性能上的问题,这里自定义了时间服务

/**

* @author

* @data 2020-12-21 11:07

* @description

*/

public class TimeServer {

/**

* 一毫秒有多少纳秒.

*/

public static final long NANOS_PER_MILLI = 1000_000L;

private static final long beginNanoTime;

private static volatile long beginMilliTime;

private static volatile long currentNanoTime;

static {

currentNanoTime = System.nanoTime();

beginNanoTime = currentNanoTime;

beginMilliTime = System.currentTimeMillis();

Thread daemon = new Thread(new Runnable() {

@Override

public void run() {

while (true) {

currentNanoTime = System.nanoTime();

try {

TimeUnit.NANOSECONDS.sleep(NANOS_PER_MILLI / 2);

} catch (Throwable ignore) {

}

}

}

});

daemon.setDaemon(true);

daemon.setName("slidingwindow-time-tick-thread");

daemon.start();

}

数据字段

不同的业务场景,窗口保存的数据是不一样的。比如项目背景的日志告警可以只有已告警次数一种数据,限流可以有已通过被限流两种数据;既然是通用的,那就要兼容各种场景,不同的业务可以定义不同的统计数据。
这里可以将桶的数据字段设为数组类型,长度和列表通过数据枚举来指定。

public class EventTypeWrap<E extends Enum> {

private E[] eventTypes;

public EventTypeWrap(Class<E> clazz) {

eventTypes = clazz.getEnumConstants();

Assert.notEmpty(eventTypes, "事件类型列表为空!");

}

public E[] getEventTypes() {

return eventTypes;

}

}

public class SlidingWindow {

public WindowBucket buildWindowBucket() {

int eventTypeSize = eventTypeWrap.getEventTypes().length;

return new WindowBucket(eventTypeSize);

}

}

public class WindowBucket {

private long[] eventValues;

public WindowBucket(int eventSize) {

eventValues = new long[eventSize];

}

}

数据计算时,传入指定的类型。

public void add(Enum eventType) {

……

}

性能和一致性

由于数据分散到多个桶内,合并值的时候,需要将所有桶的数据加起来,而在合并的过程中,桶内的数据是会改变的,这里的实现分两种:

  1. 强一致性:加锁。
  2. 最终一致性:不加锁。

public class AccurateWindowBucket extends WindowBucket {

private long[] eventValues;

private ReentrantLock updateLock;

/**

* @param eventSize

* @author

* @date 2020-12-21 15:35

*/

public AccurateWindowBucket(ReentrantLock updateLock, int eventSize) {

super(eventSize);

this.updateLock = updateLock;

eventValues = new long[eventSize];

}

@Override

public void add(Enum eventType, int value) {

updateLock.lock();

try {

int index = eventType.ordinal();

eventValues[index] = eventValues[index] + value;

} finally {

updateLock.unlock();

}

}

}

public class EfficiencyWindowBucket extends WindowBucket {

private LongAdder[] eventValues;

/**

* @param eventSize

* @author

* @date 2020-12-21 15:35

*/

public EfficiencyWindowBucket(int eventSize) {

super(eventSize);

eventValues = new LongAdder[eventSize];

IntStream.range(0, eventSize).forEach(i -> {

eventValues[i] = new LongAdder();

});

}

@Override

public void add(Enum eventType, int value) {

eventValues[eventType.ordinal()].add(value);

}

}

以上两种方案,对性能极致要求的,用最终一致性,对精度有要求的,用强一致性

源码

窗口桶:

public abstract class WindowBucket {

protected int eventSize;

/**

* @param eventSize

* @author

* @date 2020-12-21 15:35

*/

public WindowBucket(int eventSize) {

this.eventSize = eventSize;

}

/**

* @param eventType

* @return long

* @author

* @date 2020-12-18 11:22

*/

public void add(Enum eventType) {

add(eventType, 1);

}

/**

* @param eventType

* @param value

* @return long

* @author

* @date 2020-12-18 11:22

*/

public abstract void add(Enum eventType, int value);

/**

* @param eventType

* @return long

* @author

* @date 2020-12-24 17:13

*/

public abstract long addAndGet(Enum eventType);

/**

* @param eventType

* @return long

* @author

* @date 2020-12-21 15:36

*/

public abstract long getValue(Enum eventType);

/**

* @author

* @date 2020-12-21 17:28

*/

public abstract void reset();

/**

* @return long[]

* @author

* @date 2020-12-21 17:29

*/

public abstract long[] getValues();

@Override

public String toString() {

return this.getClass().getSimpleName() + "{" +

"eventValues=" + Arrays.toString(getValues()) +

'}';

}

}

/**

* 性能优先,最终一致性

* @author minchin

* @date 2020-12-24 11:02

*/

public class EfficiencyWindowBucket extends WindowBucket {

private LongAdder[] eventValues;

/**

* @param eventSize

* @author minchin

* @date 2020-12-21 15:35

*/

public EfficiencyWindowBucket(int eventSize) {

super(eventSize);

eventValues = new LongAdder[eventSize];

IntStream.range(0, eventSize).forEach(i -> {

eventValues[i] = new LongAdder();

});

}

@Override

public void add(Enum eventType, int value) {

eventValues[eventType.ordinal()].add(value);

}

@Override

public long addAndGet(Enum eventType) {

add(eventType);

return getValue(eventType);

}

@Override

public long getValue(Enum eventType) {

return eventValues[eventType.ordinal()].longValue();

}

@Override

public void reset() {

for (LongAdder value : eventValues) {

value.reset();

}

}

@Override

public long[] getValues() {

long[] values = new long[eventSize];

IntStream.range(0, eventSize).forEach(i -> {

values[i] = eventValues[i].longValue();

});

return values;

}

}

/**

* 强一致性

* @author

* @data 2020-12-24 10:35

* @description

*/

public class AccurateWindowBucket extends WindowBucket {

private long[] eventValues;

private ReentrantLock updateLock;

/**

* @param eventSize

* @author minchin

* @date 2020-12-21 15:35

*/

public AccurateWindowBucket(ReentrantLock updateLock, int eventSize) {

super(eventSize);

this.updateLock = updateLock;

eventValues = new long[eventSize];

}

@Override

public void add(Enum eventType, int value) {

updateLock.lock();

try {

int index = eventType.ordinal();

eventValues[index] = eventValues[index] + value;

} finally {

updateLock.unlock();

}

}

@Override

public long addAndGet(Enum eventType) {

updateLock.lock();

try {

add(eventType);

return getValue(eventType);

} finally {

updateLock.unlock();

}

}

@Override

public long getValue(Enum eventType) {

updateLock.lock();

try {

return eventValues[eventType.ordinal()];

} finally {

updateLock.unlock();

}

}

@Override

public void reset() {

updateLock.lock();

try {

IntStream.range(0, eventSize).forEach(i -> eventValues[i] = 0L);

} finally {

updateLock.unlock();

}

}

@Override

public long[] getValues() {

updateLock.lock();

try {

return Arrays.copyOf(eventValues, eventValues.length);

} finally {

updateLock.unlock();

}

}

}

/**

* @author

* @data 2020-12-18 11:17

* @description

*/

public class WindowBucketWrap<T extends WindowBucket> {

private volatile long beginTime;

private T windowBucket;

public WindowBucketWrap(long beginTime, T windowBucket) {

this.beginTime = beginTime;

this.windowBucket = windowBucket;

}

/**

* @param windowStarTime

* @author minchin

* @date 2020-12-21 10:14

*/

public void reset(long windowStarTime) {

windowBucket.reset();

beginTime = windowStarTime;

}

public long getBeginTime() {

return beginTime;

}

public WindowBucketWrap<T> setBeginTime(long beginTime) {

this.beginTime = beginTime;

return this;

}

public T getWindowBucket() {

return windowBucket;

}

@Override

public String toString() {

return "WindowBucketWrap{" +

"beginTime=" + beginTime +

", windowBucket=" + windowBucket +

'}';

}

}

窗口:

/**

* @author

* @data 2020-12-17 17:14

* @description

*/

public class EventTypeWrap<E extends Enum> {

private E[] eventTypes;

public EventTypeWrap(Class<E> clazz) {

eventTypes = clazz.getEnumConstants();

Assert.notEmpty(eventTypes, "事件类型列表为空!");

}

public E[] getEventTypes() {

return eventTypes;

}

public EventTypeWrap<E> setEventTypes(E[] eventTypes) {

this.eventTypes = eventTypes;

return this;

}

}

/**

* @author

* @data 2020-12-24 11:16

* @description

*/

public enum SlidingWindowType {

/**

* 精准

*/

ACCURATE,

/**

* 性能,最终一致

*/

EFFICIENCY

}

/**

* @author

* @data 2020-12-18 15:36

* @description

*/

public abstract class SlidingWindow {

protected ReentrantLock updateLock;

protected Class<? extends Enum> eventTypeClass;

protected SlidingWindowType windowType = SlidingWindowType.EFFICIENCY;

protected EventTypeWrap<? extends Enum> eventTypeWrap;

protected int bucketCount;

protected int intervalInMs;

protected int bucketLengthInMs;

protected WindowBucketWrap[] windowBucketWraps;

protected SlidingWindow() {

}

/**

* @return com.xxxx.xxxx.common.slidingwindow.SlidingWindow

* @author

* @date 2020-12-18 16:58

*/

protected SlidingWindow init() {

if (eventTypeClass == null || bucketCount == 0 || intervalInMs == 0) {

throw new IllegalArgumentException("init SlidingWindow error!");

}

updateLock = new ReentrantLock();

eventTypeWrap = new EventTypeWrap(eventTypeClass);

long currentTime = TimeServer.getIntervalMilliTime();

windowBucketWraps = new WindowBucketWrap[bucketCount];

IntStream.range(0, bucketCount).forEach(i -> {

windowBucketWraps[i] = new WindowBucketWrap(currentTime, buildWindowBucket());

});

bucketLengthInMs = intervalInMs / bucketCount;

return this;

}

/**

* @return com.xxxx.xxxx.common.slidingwindow.WindowBucket

* @author

* @date 2020-12-24 11:22

*/

protected abstract WindowBucket buildWindowBucket();

/**

* @param eventType

* @author

* @date 2020-12-21 17:33

*/

public void add(Enum eventType) {

WindowBucket windowBucket = getCurrentBucket().getWindowBucket();

windowBucket.add(eventType);

}

/**

* @param eventType

* @param value

* @author

* @date 2020-12-24 17:16

*/

public void add(Enum eventType, int value) {

WindowBucket windowBucket = getCurrentBucket().getWindowBucket();

windowBucket.add(eventType, value);

}

/**

* @param eventType

* @return long

* @author

* @date 2020-12-24 17:13

*/

public abstract long addAndSum(Enum eventType);

/**

* 类似LongAdder,提升性能,但是部分丢失精度

*

* @param eventType

* @return long

* @author

* @date 2020-12-21 17:42

*/

public abstract long sum(Enum eventType);

/**

* @return com.xxxx.xxxx.common.slidingwindow.WindowBucketWrap

* @author

* @date 2020-12-18 16:56

*/

public WindowBucketWrap getCurrentBucket() {

long currentTime = TimeServer.getIntervalMilliTime();

int bucketId = calculateCurrentBucketId(currentTime);

long windowStarTime = calculateWindowStartTime(currentTime);

while (true) {

WindowBucketWrap windowBucketWrap = windowBucketWraps[bucketId];

long windowBucketBeginTime = windowBucketWrap.getBeginTime();

if (windowBucketBeginTime == windowStarTime) {

return windowBucketWrap;

}

// 新的一轮

if (windowStarTime > windowBucketBeginTime) {

resetWindow(windowBucketWrap, windowStarTime);

} else {

//由于用了nanoTime,不应该出现"时钟回拨"问题,所以直接抛异常

throw new DateTimeException(

String.format("windowBucketBeginTime: %d, windowStarTime: %d, 当前时钟异常,请查看是否出现类似'时钟回拨'的问题!",

windowBucketBeginTime, windowStarTime));

}

}

}

/**

* @param currentTime

* @return int

* @author

* @date 2020-12-18 17:09

*/

protected int calculateCurrentBucketId(long currentTime) {

long timeId = currentTime / bucketLengthInMs;

return (int) (timeId % windowBucketWraps.length);

}

/**

* @param currentTime

* @return long

* @author

* @date 2020-12-18 17:19

*/

protected long calculateWindowStartTime(long currentTime) {

return currentTime - currentTime % bucketLengthInMs;

}

/**

* @param windowBucketWrap

* @param windowStarTime

* @author

* @date 2020-12-21 10:30

*/

protected void resetWindow(WindowBucketWrap windowBucketWrap, long windowStarTime) {

if (updateLock.tryLock()) {

try {

if (windowStarTime > windowBucketWrap.getBeginTime()) {

windowBucketWrap.reset(windowStarTime);

}

} finally {

updateLock.unlock();

}

} else {

// 如果继续循环,很可能其他线程还没操作完,先让出CPU给其他业务/线程

Thread.yield();

}

}

/**

* @param currentTime

* @param windowBucketWrap

* @return boolean

* @author

* @date 2020-12-21 17:37

*/

protected boolean isWindowDeprecated(long currentTime, WindowBucketWrap<WindowBucket> windowBucketWrap) {

return currentTime - windowBucketWrap.getBeginTime() > intervalInMs;

}

public Class<? extends Enum> getEventTypeClass() {

return eventTypeClass;

}

protected SlidingWindow setEventTypeClass(Class<? extends Enum> eventTypeClass) {

this.eventTypeClass = eventTypeClass;

return this;

}

/**

* @return com.xxxx.xxxx.common.slidingwindow.EventTypeWrap<? extends java.lang.Enum>

* @author

* @date 2020-12-18 16:48

*/

public EventTypeWrap<? extends Enum> getEventTypeWrap() {

return eventTypeWrap;

}

/**

* @return int

* @author

* @date 2020-12-18 16:56

*/

public int getBucketCount() {

return bucketCount;

}

/**

* @param bucketCount

* @return com.xxxx.xxxx.common.slidingwindow.SlidingWindow

* @author

* @date 2020-12-18 16:56

*/

protected SlidingWindow setBucketCount(int bucketCount) {

this.bucketCount = bucketCount;

return this;

}

/**

* @return int

* @author

* @date 2020-12-18 16:56

*/

public int getIntervalInMs() {

return intervalInMs;

}

/**

* @param intervalInMs

* @return com.xxxx.xxxx.common.slidingwindow.SlidingWindow

* @author

* @date 2020-12-18 16:56

*/

protected SlidingWindow setIntervalInMs(int intervalInMs) {

this.intervalInMs = intervalInMs;

return this;

}

/**

* @return com.xxxx.xxxx.common.slidingwindow.SlidingWindowType

* @author

* @date 2020-12-24 11:18

*/

public SlidingWindowType getWindowType() {

return windowType;

}

/**

* @param windowType

* @return com.xxxx.xxxx.common.slidingwindow.SlidingWindow

* @author

* @date 2020-12-24 11:18

*/

protected SlidingWindow setWindowType(SlidingWindowType windowType) {

this.windowType = windowType;

return this;

}

@Override public String toString() {

return "SlidingWindow{" +

"eventTypeClass=" + eventTypeClass.getName() +

", windowType=" + windowType +

", bucketCount=" + bucketCount +

", intervalInMs=" + intervalInMs +

", bucketLengthInMs=" + bucketLengthInMs +

", windowBucketWraps=\n" + StringUtils.join(windowBucketWraps, ",\n") +

'}';

}

/**

* @author

* @date 2020-12-18 16:48

*/

public static class SlidingWindowBuilder {

private SlidingWindow slidingWindow;

public SlidingWindowBuilder() {

this(SlidingWindowType.EFFICIENCY);

}

public SlidingWindowBuilder(SlidingWindowType windowType) {

if (SlidingWindowType.ACCURATE.equals(windowType)) {

slidingWindow = new AccurateSlidingWindow();

} else {

slidingWindow = new EfficiencySlidingWindow();

}

}

/**

* @param eventTypeClass

* @return com.xxxx.xxxx.common.slidingwindow.SlidingWindow.SlidingWindowBuilder

* @author

* @date 2020-12-18 16:48

*/

public SlidingWindowBuilder ofEventTypeClass(Class<? extends Enum> eventTypeClass) {

slidingWindow.setEventTypeClass(eventTypeClass);

return this;

}

/**

* @param bucketCount

* @return com.xxxx.xxxx.common.slidingwindow.SlidingWindow.SlidingWindowBuilder

* @author

* @date 2020-12-18 16:55

*/

public SlidingWindowBuilder ofBucketCount(int bucketCount) {

slidingWindow.setBucketCount(bucketCount);

return this;

}

/**

* @param windowIntervalInMs

* @return com.xxxx.xxxx.common.slidingwindow.SlidingWindow.SlidingWindowBuilder

* @author

* @date 2020-12-18 16:55

*/

public SlidingWindowBuilder ofWindowIntervalInMs(int windowIntervalInMs) {

slidingWindow.setIntervalInMs(windowIntervalInMs);

return this;

}

/**

* @return com.xxxx.xxxx.common.slidingwindow.SlidingWindow

* @author

* @date 2020-12-18 16:48

*/

public SlidingWindow build() {

return slidingWindow.init();

}

}

}

/**

* @author

* @data 2020-12-24 17:26

* @description

*/

public class AccurateSlidingWindow extends SlidingWindow {

@Override

public WindowBucket buildWindowBucket() {

int eventTypeSize = eventTypeWrap.getEventTypes().length;

return new AccurateWindowBucket(updateLock, eventTypeSize);

}

@Override

public long addAndSum(Enum eventType) {

updateLock.lock();

try {

WindowBucket windowBucket = getCurrentBucket().getWindowBucket();

windowBucket.add(eventType);

return sum(eventType);

} finally {

updateLock.unlock();

}

}

@Override

public long sum(Enum eventType) {

updateLock.lock();

try {

long value = 0;

long currentTime = TimeServer.getIntervalMilliTime();

for (WindowBucketWrap windowBucketWrap : windowBucketWraps) {

if (!isWindowDeprecated(currentTime, windowBucketWrap)) {

value += windowBucketWrap.getWindowBucket().getValue(eventType);

}

}

return value;

} finally {

updateLock.unlock();

}

}

}

/**

* @author

* @data 2020-12-24 17:26

* @description

*/

public class EfficiencySlidingWindow extends SlidingWindow {

@Override

public WindowBucket buildWindowBucket() {

int eventTypeSize = eventTypeWrap.getEventTypes().length;

return new EfficiencyWindowBucket(eventTypeSize);

}

@Override

public long addAndSum(Enum eventType) {

add(eventType);

return sum(eventType);

}

@Override

public long sum(Enum eventType) {

long value = 0;

long currentTime = TimeServer.getIntervalMilliTime();

for (WindowBucketWrap windowBucketWrap : windowBucketWraps) {

if (!isWindowDeprecated(currentTime, windowBucketWrap)) {

value += windowBucketWrap.getWindowBucket().getValue(eventType);

}

}

return value;

}

}

时间服务器:

/**

* @author

* @data 2020-12-21 11:07

* @description

*/

public class TimeServer {

/**

* 一毫秒有多少纳秒.

*/

public static final long NANOS_PER_MILLI = 1000_000L;

private static final long beginNanoTime;

private static volatile long beginMilliTime;

private static volatile long currentNanoTime;

static {

currentNanoTime = System.nanoTime();

beginNanoTime = currentNanoTime;

beginMilliTime = System.currentTimeMillis();

Thread daemon = new Thread(new Runnable() {

@Override

public void run() {

while (true) {

currentNanoTime = System.nanoTime();

try {

// 不需要非常高的精度,允许有点儿误差

TimeUnit.NANOSECONDS.sleep(NANOS_PER_MILLI / 2);

} catch (Throwable ignore) {

}

}

}

});

daemon.setDaemon(true);

daemon.setName("slidingwindow-time-tick-thread");

daemon.start();

}

/**

* @return long

* @author

* @date 2020-12-21 11:14

*/

public static long getCurrentNanoTime() {

return currentNanoTime;

}

/**

* @return long

* @author

* @date 2020-12-21 12:59

*/

public static long getCurrentMilliTime() {

return beginMilliTime + getIntervalMilliTime();

}

/**

* @return long

* @author

* @date 2020-12-21 11:16

*/

public static long getIntervalNanoTime() {

return getCurrentNanoTime() - beginNanoTime;

}

/**

* @return long

* @author

* @date 2020-12-21 11:20

*/

public static long getIntervalMilliTime() {

return getIntervalNanoTime() / NANOS_PER_MILLI;

}

}

使用

原先的日志次数限流类TimesLimiter引入滑动窗口:

/**

*

* @author

* @data 2020-06-02 10:51

* @description

*/

public class TimesLimiter implements Limiter {

private static final int SLIDINGWINDOW_BUCKETCOUNT = 10;

private int timesLimit;

private SlidingWindow slidingWindow;

public TimesLimiter(int timesLimitSeconds, int timesLimit) {

this.timesLimit = timesLimit;

slidingWindow = new SlidingWindow.SlidingWindowBuilder(SlidingWindowType.ACCURATE)

.ofEventTypeClass(StandardLevel.class)

.ofBucketCount(SLIDINGWINDOW_BUCKETCOUNT)

.ofWindowIntervalInMs(timesLimitSeconds * 1000)

.build();

}

@Override

public boolean isLimited(String content) {

if (slidingWindow.addAndSum(StandardLevel.ERROR) > timesLimit) {

slidingWindow.add(StandardLevel.ERROR, -1);

return true;

}

return false;

}

}

以上是 【Java】通用滑动窗口的实现 的全部内容, 来源链接: utcz.com/a/90748.html

回到顶部