【Java】gateway - 启动

我们从spring.factories开始,主要是GatewayAutoConfiguration,这里主要加载CompositeRouteDefinitionLocator、RouteDefinitionRouteLocator、FilteringWebHandler、RoutePredicateHandlerMapping、RouteRefreshListener、CachingRouteLocator等。
RouteLocator和RouteDefinitionLocator的作用在上一篇已经提过了,FilteringWebHandler、RoutePredicateHandlerMapping这两个等调用的时候讲,这边先知道他们会在这里加载。我们主要看看RouteRefreshListener和CachingRouteLocator。

RouteRefreshListener

RouteRefreshListener实现了ApplicationListener接口,所以他会调用onApplicationEvent方法,在符合某些条件下,会调用reset方法,这个思路和zuul是一样的。所以动态的刷新也可以调用他的事件来触发。

public void onApplicationEvent(ApplicationEvent event) {

if (event instanceof ContextRefreshedEvent

|| event instanceof RefreshScopeRefreshedEvent

|| event instanceof InstanceRegisteredEvent) {

reset();

}

else if (event instanceof ParentHeartbeatEvent) {

ParentHeartbeatEvent e = (ParentHeartbeatEvent) event;

resetIfNeeded(e.getValue());

}

else if (event instanceof HeartbeatEvent) {

HeartbeatEvent e = (HeartbeatEvent) event;

resetIfNeeded(e.getValue());

}

}

我们看看reset方法,他其实就是发布了一个RefreshRoutesEvent事件。

private void reset() {

this.publisher.publishEvent(new RefreshRoutesEvent(this));

}

CachingRouteLocator

CachingRouteLocator就是用来接收RefreshRoutesEvent事件的,他实现了ApplicationListener<RefreshRoutesEvent>接口。我们看看他的onApplicationEvent方法。

public void onApplicationEvent(RefreshRoutesEvent event) {

try {

fetch().collect(Collectors.toList()).subscribe(list -> Flux.fromIterable(list)

.materialize().collect(Collectors.toList()).subscribe(signals -> {

applicationEventPublisher

.publishEvent(new RefreshRoutesResultEvent(this));

cache.put(CACHE_KEY, signals);

}, throwable -> handleRefreshError(throwable)));

}

catch (Throwable e) {

handleRefreshError(e);

}

}

他先会调用fetch方法,这里实际上就是调用CompositeRouteLocator#getRoutes,最后再调用RouteDefinitionRouteLocator#getRoutes。

private Flux<Route> fetch() {

return this.delegate.getRoutes().sort(AnnotationAwareOrderComparator.INSTANCE);

}

RouteDefinitionRouteLocator#getRoutes

这个方法可以看到他是获取RouteDefinition的集合,然后再调用convertToRoute方法,在这个方法里,把RouteDefinition转为Route。

@Override

public Flux<Route> getRoutes() {

Flux<Route> routes = this.routeDefinitionLocator.getRouteDefinitions()

.map(this::convertToRoute);

// 其他略

}

RouteDefinitionRouteLocator#convertToRoute

在这里把RouteDefinition转为Route,主要是解析Predicate和GatewayFilter,我们在上一篇已经知道了他会通过工厂类来处理。combinePredicates方法就是处理Predicate的,getFilters是处理Filter的。

private Route convertToRoute(RouteDefinition routeDefinition) {

AsyncPredicate<ServerWebExchange> predicate = combinePredicates(routeDefinition);

List<GatewayFilter> gatewayFilters = getFilters(routeDefinition);

return Route.async(routeDefinition).asyncPredicate(predicate)

.replaceFilters(gatewayFilters).build();

}

RouteDefinitionRouteLocator#combinePredicates

在这里就是把RouteDefinition的predicate转为Route需要的Predicate。这里首先会创建一个Predicate,然后再把剩余的合并,形成了一个left、right的结构。结构图在上一篇已经说过了。
lookup方法就是通过工厂类创建Predicate的地方。

private AsyncPredicate<ServerWebExchange> combinePredicates(

RouteDefinition routeDefinition) {

List<PredicateDefinition> predicates = routeDefinition.getPredicates();

if (predicates == null || predicates.isEmpty()) {

// this is a very rare case, but possible, just match all

return AsyncPredicate.from(exchange -> true);

}

// 创建第一个Predicate

AsyncPredicate<ServerWebExchange> predicate = lookup(routeDefinition,

predicates.get(0));

// 把剩余的进行合并

for (PredicateDefinition andPredicate : predicates.subList(1,

predicates.size())) {

AsyncPredicate<ServerWebExchange> found = lookup(routeDefinition,

andPredicate);

predicate = predicate.and(found);

}

return predicate;

}

RouteDefinitionRouteLocator#lookup

这个方法里,首先先通过配置的名称,获取对应的RoutePredicateFactory,然后组装config信息,最后通过factory.applyAsync创建Predicate。

private AsyncPredicate<ServerWebExchange> lookup(RouteDefinition route,

PredicateDefinition predicate) {

RoutePredicateFactory<Object> factory = this.predicates.get(predicate.getName());

if (factory == null) {

throw new IllegalArgumentException(

"Unable to find RoutePredicateFactory with name "

+ predicate.getName());

}

if (logger.isDebugEnabled()) {

logger.debug("RouteDefinition " + route.getId() + " applying "

+ predicate.getArgs() + " to " + predicate.getName());

}

// @formatter:off

Object config = this.configurationService.with(factory)

.name(predicate.getName())

.properties(predicate.getArgs())

.eventFunction((bound, properties) -> new PredicateArgsEvent(

RouteDefinitionRouteLocator.this, route.getId(), properties))

.bind();

// @formatter:on

return factory.applyAsync(config);

}

RouteDefinitionRouteLocator#getFilters

这个是处理Filter的地方,他主要是通过loadGatewayFilters方法来获取对应的Filter。

private List<GatewayFilter> getFilters(RouteDefinition routeDefinition) {

List<GatewayFilter> filters = new ArrayList<>();

// TODO: support option to apply defaults after route specific filters?

if (!this.gatewayProperties.getDefaultFilters().isEmpty()) {

filters.addAll(loadGatewayFilters(DEFAULT_FILTERS,

new ArrayList<>(this.gatewayProperties.getDefaultFilters())));

}

if (!routeDefinition.getFilters().isEmpty()) {

filters.addAll(loadGatewayFilters(routeDefinition.getId(),

new ArrayList<>(routeDefinition.getFilters())));

}

AnnotationAwareOrderComparator.sort(filters);

return filters;

}

RouteDefinitionRouteLocator#loadGatewayFilters

这个设计方式跟上面类似,也是通过配置信息的名称,获取对应的GatewayFilterFactory,然后封装configuration信息,通过factory.apply创建一个Filter。

List<GatewayFilter> loadGatewayFilters(String id,

List<FilterDefinition> filterDefinitions) {

ArrayList<GatewayFilter> ordered = new ArrayList<>(filterDefinitions.size());

for (int i = 0; i < filterDefinitions.size(); i++) {

FilterDefinition definition = filterDefinitions.get(i);

GatewayFilterFactory factory = this.gatewayFilterFactories

.get(definition.getName());

if (factory == null) {

throw new IllegalArgumentException(

"Unable to find GatewayFilterFactory with name "

+ definition.getName());

}

if (logger.isDebugEnabled()) {

logger.debug("RouteDefinition " + id + " applying filter "

+ definition.getArgs() + " to " + definition.getName());

}

// @formatter:off

Object configuration = this.configurationService.with(factory)

.name(definition.getName())

.properties(definition.getArgs())

.eventFunction((bound, properties) -> new FilterArgsEvent(

// TODO: why explicit cast needed or java compile fails

RouteDefinitionRouteLocator.this, id, (Map<String, Object>) properties))

.bind();

// @formatter:on

// some filters require routeId

// TODO: is there a better place to apply this?

if (configuration instanceof HasRouteId) {

HasRouteId hasRouteId = (HasRouteId) configuration;

hasRouteId.setRouteId(id);

}

GatewayFilter gatewayFilter = factory.apply(configuration);

if (gatewayFilter instanceof Ordered) {

ordered.add(gatewayFilter);

}

else {

ordered.add(new OrderedGatewayFilter(gatewayFilter, i + 1));

}

}

return ordered;

}

整体流程

【Java】gateway - 启动

以上是 【Java】gateway - 启动 的全部内容, 来源链接: utcz.com/a/97899.html

回到顶部