SpringCloudGateway限流操作

编程

开发高并发系统时有三把利器用来保护系统:缓存、降级和限流。

API网关作为所有请求的入口,请求量大,我们可以通过对并发访问的请求进行限速来保护系统的可用性。

常用的限流算法比如有令牌桶算法,漏桶算法,计数器算法等。

在Zuul中我们可以自己去实现限流的功能 (Zuul中如何限流在我的书 《Spring Cloud微服务-全栈技术与案例解析》  中有详细讲解) ,Spring Cloud Gateway的出现本身就是用来替代Zuul的。

要想替代那肯定得有强大的功能,除了性能上的优势之外,Spring Cloud Gateway还提供了很多新功能,比如今天我们要讲的限流操作,使用起来非常简单,今天我们就来学习在如何在Spring Cloud Gateway中进行限流操作。

目前限流提供了基于Redis的实现,我们需要增加对应的依赖:

1

2

3

4

<

dependency

>

 

<

groupId

>org.springframework.boot</

groupId

>

 

<

artifactId

>spring-boot-starter-data-redis-reactive</

artifactId

>

</

dependency

>

可以通过KeyResolver来指定限流的Key,比如我们需要根据用户来做限流,IP来做限流等等。

IP限流

1

2

3

4

@Bean

public

KeyResolver ipKeyResolver() {

 

return

exchange -> Mono.just(exchange.getRequest().getRemoteAddress().getHostName());

}

通过exchange对象可以获取到请求信息,这边用了HostName,如果你想根据用户来做限流的话这边可以获取当前请求的用户ID或者用户名就可以了,比如:

用户限流

使用这种方式限流,请求路径中必须携带userId参数。

1

2

3

4

@Bean

KeyResolver userKeyResolver() {

 

return

exchange -> Mono.just(exchange.getRequest().getQueryParams().getFirst(

"userId"

));

}

接口限流

获取请求地址的uri作为限流key。

1

2

3

4

@Bean

KeyResolver apiKeyResolver() {

 

return

exchange -> Mono.just(exchange.getRequest().getPath().value());

}

然后配置限流的过滤器信息:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

server:

 

port: 8084

spring:

 

redis:

 

host: 127.0.0.1

 

port: 6379

 

cloud:

 

gateway:

  

routes:

  

- id: fsh-house

  

uri: lb://fsh-house

  

predicates:

  

- Path=/house/**

  

filters:

  

- name: RequestRateLimiter

   

args:

   

redis-rate-limiter.replenishRate: 10

   

redis-rate-limiter.burstCapacity: 20

   

key-resolver: "#{@ipKeyResolver}"

  • filter名称必须是RequestRateLimiter
  • redis-rate-limiter.replenishRate:允许用户每秒处理多少个请求
  • redis-rate-limiter.burstCapacity:令牌桶的容量,允许在一秒钟内完成的最大请求数
  • key-resolver:使用SpEL按名称引用bean

可以访问接口进行测试,这时候Redis中会有对应的数据:

127.0.0.1:6379> keys *
1) "request_rate_limiter.{localhost}.timestamp"
2) "request_rate_limiter.{localhost}.tokens"

大括号中就是我们的限流Key,这边是IP,本地的就是localhost

  • timestamp:存储的是当前时间的秒数,也就是System.currentTimeMillis() / 1000或者Instant.now().getEpochSecond()
  • tokens:存储的是当前这秒钟的对应的可用的令牌数量

Spring Cloud Gateway目前提供的限流还是相对比较简单的,在实际中我们的限流策略会有很多种情况,比如:

  • 每个接口的限流数量不同,可以通过配置中心动态调整
  • 超过的流量被拒绝后可以返回固定的格式给调用方
  • 对某个服务进行整体限流(这个大家可以思考下用Spring Cloud Gateway如何实现,其实很简单)
  • ……

当然我们也可以通过重新RedisRateLimiter来实现自己的限流策略,这个我们后面再进行介绍。

限流源码

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

// routeId也就是我们的fsh-house,id就是限流的key,也就是localhost。

public

Mono<Response> isAllowed(String routeId, String id) {

 

// 会判断RedisRateLimiter是否初始化了

 

if

(!

this

.initialized.get()) {

  

throw

new

IllegalStateException(

"RedisRateLimiter is not initialized"

);

 

}

 

// 获取routeId对应的限流配置

 

Config routeConfig = getConfig().getOrDefault(routeId, defaultConfig);

 

 

if

(routeConfig ==

null

) {

  

throw

new

IllegalArgumentException(

"No Configuration found for route "

+ routeId);

 

}

 

 

// 允许用户每秒做多少次请求

 

int

replenishRate = routeConfig.getReplenishRate();

 

 

// 令牌桶的容量,允许在一秒钟内完成的最大请求数

 

int

burstCapacity = routeConfig.getBurstCapacity();

 

 

try

{

  

// 限流key的名称(request_rate_limiter.{localhost}.timestamp,request_rate_limiter.{localhost}.tokens)

  

List<String> keys = getKeys(id);

 

 

  

// The arguments to the LUA script. time() returns unixtime in seconds.

  

List<String> scriptArgs = Arrays.asList(replenishRate +

""

, burstCapacity +

""

,

    

Instant.now().getEpochSecond() +

""

,

"1"

);

  

// allowed, tokens_left = redis.eval(SCRIPT, keys, args)

  

// 执行LUA脚本

  

Flux<List<Long>> flux =

this

.redisTemplate.execute(

this

.script, keys, scriptArgs);

    

// .log("redisratelimiter", Level.FINER);

  

return

flux.onErrorResume(throwable -> Flux.just(Arrays.asList(1L, -1L)))

    

.reduce(

new

ArrayList<Long>(), (longs, l) -> {

     

longs.addAll(l);

     

return

longs;

    

}) .map(results -> {

     

boolean

allowed = results.get(

0

) == 1L;

     

Long tokensLeft = results.get(

1

);

 

     

Response response =

new

Response(allowed, getHeaders(routeConfig, tokensLeft));

 

     

if

(log.isDebugEnabled()) {

      

log.debug(

"response: "

+ response);

     

}

     

return

response;

    

});

 

}

 

catch

(Exception e) {

  

log.error(

"Error determining if user allowed from redis"

, e);

 

}

 

return

Mono.just(

new

Response(

true

, getHeaders(routeConfig, -1L)));

}

LUA脚本在:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

local tokens_key = KEYS[1]

local timestamp_key = KEYS[2]

--redis.log(redis.LOG_WARNING, "tokens_key " .. tokens_key)

 

local rate = tonumber(ARGV[1])

local capacity = tonumber(ARGV[2])

local now = tonumber(ARGV[3])

local requested = tonumber(ARGV[4])

 

local fill_time = capacity/rate

local ttl = math.floor(fill_time*2)

 

--redis.log(redis.LOG_WARNING, "rate " .. ARGV[1])

--redis.log(redis.LOG_WARNING, "capacity " .. ARGV[2])

--redis.log(redis.LOG_WARNING, "now " .. ARGV[3])

--redis.log(redis.LOG_WARNING, "requested " .. ARGV[4])

--redis.log(redis.LOG_WARNING, "filltime " .. fill_time)

--redis.log(redis.LOG_WARNING, "ttl " .. ttl)

 

local last_tokens = tonumber(redis.call("get", tokens_key))

if last_tokens == nil then

 

last_tokens = capacity

end

--redis.log(redis.LOG_WARNING, "last_tokens " .. last_tokens)

 

local last_refreshed = tonumber(redis.call("get", timestamp_key))

if last_refreshed == nil then

 

last_refreshed = 0

end

--redis.log(redis.LOG_WARNING, "last_refreshed " .. last_refreshed)

 

local delta = math.max(0, now-last_refreshed)

local filled_tokens = math.min(capacity, last_tokens+(delta*rate))

local allowed = filled_tokens >= requested

local new_tokens = filled_tokens

local allowed_num = 0

if allowed then

 

new_tokens = filled_tokens - requested

 

allowed_num = 1

end

 

--redis.log(redis.LOG_WARNING, "delta " .. delta)

--redis.log(redis.LOG_WARNING, "filled_tokens " .. filled_tokens)

--redis.log(redis.LOG_WARNING, "allowed_num " .. allowed_num)

--redis.log(redis.LOG_WARNING, "new_tokens " .. new_tokens)

 

redis.call("setex", tokens_key, ttl, new_tokens)

redis.call("setex", timestamp_key, ttl, now)

 

return { allowed_num, new_tokens }

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持脚本之家。

以上是 SpringCloudGateway限流操作 的全部内容, 来源链接: utcz.com/z/515664.html

回到顶部