SpringCloud升级之路Hoxton9.针对网关非Get请求的重试

编程

针对网关非 Get 请求的重试

在之前的系列里面Spring Cloud升级之路 - Hoxton - 5. 实现微服务调用重试,我们针对 OpenFeign 和 Spring Cloud Gateway 都设置了重试。

对于 OpenFeign:

  • Get请求:任何非200 响应码,任何异常,都会重试。
  • 非 Get 请求:任何IOException(除了SocketTimeOutException,这个是read time out 导致的),还有 redilience 断路器异常,都会重试,其他的都不重试。

对于 Spring Cloud Gateway:

  • Get请求:任何4XX,5XX响应码,任何异常,都会重试。

现在,我们需要实现针对于 Spring Cloud Gateway 的非 Get 请求的任何IOException(除了SocketTimeOutException,这个是read time out 导致的),还有 redilience 断路器异常进行重试,Get因为请求并没有真正发出去。

现有设计

目前在 Spring Cloud Gateway 的 RetryFilterFactory,无法实现针对 Get 和非 Get 对于不同的异常进行不同的重试:

org.springframework.cloud.gateway.filter.factory.RetryGatewayFilterFactory

public class RetryGatewayFilterFactory

extends AbstractGatewayFilterFactory<RetryGatewayFilterFactory.RetryConfig> {

/**

* Retry iteration key.ServerWebExchange的某个Attribute的key

* 这个Attribute用来在每次调用的时候,+1,看是否超过了重试次数

*/

public static final String RETRY_ITERATION_KEY = "retry_iteration";

public RetryGatewayFilterFactory() {

super(RetryConfig.class);

}

@Override

public GatewayFilter apply(RetryConfig retryConfig) {

//检验配置

retryConfig.validate();

Repeat<ServerWebExchange> statusCodeRepeat = null;

//如果配置了可重试的HTTP响应状态码,则检查响应码是否可以重试

if (!retryConfig.getStatuses().isEmpty() || !retryConfig.getSeries().isEmpty()) {

Predicate<RepeatContext<ServerWebExchange>> repeatPredicate = context -> {

ServerWebExchange exchange = context.applicationContext();

//检查是否超过了重试次数

if (exceedsMaxIterations(exchange, retryConfig)) {

return false;

}

//判断是否可以重试

HttpStatus statusCode = exchange.getResponse().getStatusCode();

boolean retryableStatusCode = retryConfig.getStatuses()

.contains(statusCode);

if (!retryableStatusCode && statusCode != null) {

// try the series

retryableStatusCode = retryConfig.getSeries().stream()

.anyMatch(series -> statusCode.series().equals(series));

}

final boolean finalRetryableStatusCode = retryableStatusCode;

//判断是否是可以重试的HTTP方法

HttpMethod httpMethod = exchange.getRequest().getMethod();

boolean retryableMethod = retryConfig.getMethods().contains(httpMethod);

//返回是否是可以重试的方法以及是否可以重试的HTTP响应状态码

return retryableMethod && finalRetryableStatusCode;

};

//每次重试,都要重置路由,重新解析路由

statusCodeRepeat = Repeat.onlyIf(repeatPredicate)

.doOnRepeat(context -> reset(context.applicationContext()));

//设置Backoff

BackoffConfig backoff = retryConfig.getBackoff();

if (backoff != null) {

statusCodeRepeat = statusCodeRepeat.backoff(getBackoff(backoff));

}

}

// TODO: support timeout, backoff, jitter, etc... in Builder

//判断异常是否可以重试

Retry<ServerWebExchange> exceptionRetry = null;

if (!retryConfig.getExceptions().isEmpty()) {

Predicate<RetryContext<ServerWebExchange>> retryContextPredicate = context -> {

ServerWebExchange exchange = context.applicationContext();

if (exceedsMaxIterations(exchange, retryConfig)) {

return false;

}

Throwable exception = context.exception();

for (Class<? extends Throwable> retryableClass : retryConfig

.getExceptions()) {

if (retryableClass.isInstance(exception) || (exception != null

&& retryableClass.isInstance(exception.getCause()))) {

trace("exception or its cause is retryable %s, configured exceptions %s",

() -> getExceptionNameWithCause(exception),

retryConfig::getExceptions);

HttpMethod httpMethod = exchange.getRequest().getMethod();

boolean retryableMethod = retryConfig.getMethods()

.contains(httpMethod);

trace("retryableMethod: %b, httpMethod %s, configured methods %s",

() -> retryableMethod, () -> httpMethod,

retryConfig::getMethods);

return retryableMethod;

}

}

trace("exception or its cause is not retryable %s, configured exceptions %s",

() -> getExceptionNameWithCause(exception),

retryConfig::getExceptions);

return false;

};

exceptionRetry = Retry.onlyIf(retryContextPredicate)

.doOnRetry(context -> reset(context.applicationContext()))

.retryMax(retryConfig.getRetries());

BackoffConfig backoff = retryConfig.getBackoff();

if (backoff != null) {

exceptionRetry = exceptionRetry.backoff(getBackoff(backoff));

}

}

GatewayFilter gatewayFilter = apply(retryConfig.getRouteId(), statusCodeRepeat,

exceptionRetry);

return new GatewayFilter() {

@Override

public Mono<Void> filter(ServerWebExchange exchange,

GatewayFilterChain chain) {

return gatewayFilter.filter(exchange, chain);

}

@Override

public String toString() {

return filterToStringCreator(RetryGatewayFilterFactory.this)

.append("retries", retryConfig.getRetries())

.append("series", retryConfig.getSeries())

.append("statuses", retryConfig.getStatuses())

.append("methods", retryConfig.getMethods())

.append("exceptions", retryConfig.getExceptions()).toString();

}

};

}

private String getExceptionNameWithCause(Throwable exception) {

if (exception != null) {

StringBuilder builder = new StringBuilder(exception.getClass().getName());

Throwable cause = exception.getCause();

if (cause != null) {

builder.append("{cause=").append(cause.getClass().getName()).append("}");

}

return builder.toString();

}

else {

return "null";

}

}

private Backoff getBackoff(BackoffConfig backoff) {

return Backoff.exponential(backoff.firstBackoff, backoff.maxBackoff,

backoff.factor, backoff.basedOnPreviousValue);

}

public boolean exceedsMaxIterations(ServerWebExchange exchange,

RetryConfig retryConfig) {

Integer iteration = exchange.getAttribute(RETRY_ITERATION_KEY);

//是否超过了可重试次数

boolean exceeds = iteration != null && iteration >= retryConfig.getRetries();

return exceeds;

}

public void reset(ServerWebExchange exchange) {

//这个方法主要是

Set<String> addedHeaders = exchange.getAttributeOrDefault(

CLIENT_RESPONSE_HEADER_NAMES, Collections.emptySet());

addedHeaders

.forEach(header -> exchange.getResponse().getHeaders().remove(header));

removeAlreadyRouted(exchange);

}

public GatewayFilter apply(String routeId, Repeat<ServerWebExchange> repeat,

Retry<ServerWebExchange> retry) {

if (routeId != null && getPublisher() != null) {

// send an event to enable caching

getPublisher().publishEvent(new EnableBodyCachingEvent(this, routeId));

}

return (exchange, chain) -> {

trace("Entering retry-filter");

// chain.filter returns a Mono<Void>

Publisher<Void> publisher = chain.filter(exchange)

// .log("retry-filter", Level.INFO)

.doOnSuccessOrError((aVoid, throwable) -> {

int iteration = exchange

.getAttributeOrDefault(RETRY_ITERATION_KEY, -1);

int newIteration = iteration + 1;

trace("setting new iteration in attr %d", () -> newIteration);

exchange.getAttributes().put(RETRY_ITERATION_KEY, newIteration);

});

if (retry != null) {

// retryWhen returns a Mono<Void>

// retry needs to go before repeat

publisher = ((Mono<Void>) publisher)

.retryWhen(retry.withApplicationContext(exchange));

}

if (repeat != null) {

// repeatWhen returns a Flux<Void>

// so this needs to be last and the variable a Publisher<Void>

publisher = ((Mono<Void>) publisher)

.repeatWhen(repeat.withApplicationContext(exchange));

}

return Mono.fromDirect(publisher);

};

}

@SuppressWarnings("unchecked")

public static class RetryConfig implements HasRouteId {

//路由id

private String routeId;

//重试次数,不包括调用的第一次,默认为3,也就是可能会调用4次

private int retries = 3;

//针对哪些HTTP状态码重试,一个Series对应一组HttpStatus

private List<Series> series = toList(Series.SERVER_ERROR);

//针对哪些HTTP状态码重试,一个HttpStatus就是一个HTTP状态码

private List<HttpStatus> statuses = new ArrayList<>();

//针对哪些HTTP方法重试

private List<HttpMethod> methods = toList(HttpMethod.GET);

//针对的哪些异常重试

private List<Class<? extends Throwable>> exceptions = toList(IOException.class,

TimeoutException.class);

//重试间隔策略

private BackoffConfig backoff;

public void validate() {

//重试次数必须大于10

Assert.isTrue(this.retries > 0, "retries must be greater than 0");

//可重试的series,可重试的状态码还有可重试的异常不能都为空,否则没有可以重试的场景了

Assert.isTrue(

!this.series.isEmpty() || !this.statuses.isEmpty()

|| !this.exceptions.isEmpty(),

"series, status and exceptions may not all be empty");

//重试的Http方法不能为空

Assert.notEmpty(this.methods, "methods may not be empty");

if (this.backoff != null) {

this.backoff.validate();

}

}

//省略构造器,getter,setter还有一些工具方法

}

public static class BackoffConfig {

//第一次重试时间间隔

private Duration firstBackoff = Duration.ofMillis(5);

//最大等待间隔

private Duration maxBackoff;

//增长比例

private int factor = 2;

//是否保留上一次请求的重试间隔时间,下次从这个时间间隔开始重试

private boolean basedOnPreviousValue = true;

//省略构造器,getter,setter

public void validate() {

//第一次重试间隔不能为空

Assert.notNull(this.firstBackoff, "firstBackoff must be present");

}

}

}

总结起来,流程简化如下:

  1. 判断本次请求 HTTP 方法是否被 RetryConfig.methods 包含和 HTTP 响应码是否在 RetryConfig.series 的范围内或者 statuses 的集合内,如果在,看本次请求的 retry_iteration 这个 Attribute 是第几次(从0开始),是否超过了重试次数,如果没超过,就重试,如果超过,停止重试。
  2. 判断本次请求 HTTP 方法是否被 RetryConfig.methods 包含和 异常是否在 RetryConfig.exceptions 的集合内(是其中的某个异常的子类也可以),如果在,看本次请求的 retry_iteration 这个 Attribute 是第几次(从0开始),是否超过了重试次数,如果没超过,就重试,如果超过,停止重试。

配置的时候,HTTP 方法如果包含所有方法,那么没办法区分 GET 请求或者是 非 GET 请求;如果建立两个 Filter 一个拦截 GET 另一个拦截 非GET,那么他们共用的 Attribute 每次就会 +2,重试次数就不准确了。

所以,最后使用了这样一个不优雅的设计,就是 GET 和非 GET 使用不同的 RetryConfig,GET 的还是根据application.properties配置来,针对非 GET 请求,强制重试下面这些异常:

  • io.netty.channel.ConnectTimeoutException.class:连接超时
  • java.net.ConnectException.class:No route to host 异常
  • io.github.resilience4j.circuitbreaker.CallNotPermittedException: resilience4j 断路器相关异常

RetryGatewayFilter

    @Override

public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {

ServerHttpRequest request = exchange.getRequest();

//获取微服务名称

String serviceName = request.getHeaders().getFirst(CommonConstant.SERVICE_NAME);

HttpMethod method = exchange.getRequest().getMethod();

//生成 GatewayFilter,保存到 gatewayFilterMap

GatewayFilter gatewayFilter = gatewayFilterMap.computeIfAbsent(serviceName + ":" + method, k -> {

Map<String, RetryConfig> retryConfigMap = apiGatewayRetryConfig.getRetry();

//通过微服务名称,获取重试配置

RetryConfig retryConfig = retryConfigMap.containsKey(serviceName) ? retryConfigMap.get(serviceName) : apiGatewayRetryConfig.getDefault();

//重试次数为0,则不重试

if (retryConfig.getRetries() == 0) {

return null;

}

//针对非GET请求,强制限制重试并且只能重试下面的异常b

if (!HttpMethod.GET.equals(method)) {

RetryConfig newConfig = new RetryConfig();

BeanUtils.copyProperties(retryConfig, newConfig);

//限制所有方法都可以重试,由于外层限制了不为GET,这里相当于不为GET的所有方法

newConfig.setMethods(HttpMethod.values());

newConfig.setSeries();

newConfig.setStatuses();

newConfig.setExceptions(//链接超时

io.netty.channel.ConnectTimeoutException.class,

//No route to host

java.net.ConnectException.class,

//针对Resilience4j的异常

CallNotPermittedException.class);

retryConfig = newConfig;

}

return this.apply(retryConfig);

});

return gatewayFilter != null ? gatewayFilter.filter(exchange, chain) : chain.filter(exchange);

}

以上是 SpringCloud升级之路Hoxton9.针对网关非Get请求的重试 的全部内容, 来源链接: utcz.com/z/518486.html

回到顶部