Dubbo进阶:Directory实现
首先我们依然再看一遍Dubbo调用的流程
本文我们主要分析Directory。
首先通过UML来看下Directory的层级结构
从这个UML图上可以看到,依然使用了模板模式。AbstractDirectory是一个抽象类,封装了通用逻辑,最重要的有list
方法,用于返回所有可用的list,它会调用doList
方法,doList
是一个抽象方法,在不同的子类会有不同的实现。
public List<Invoker<T>> list(Invocation invocation) throws RpcException {if (destroyed) {
throw new RpcException("Directory already destroyed .url: " + getUrl());
}
returndoList(invocation);
}
protected abstract List<Invoker<T>> doList(Invocation invocation) throws RpcException;
StaticDirectory即静态服务目录,顾名思义,它内部存放的 Invoker 是不会变动的。通过StaticDirectory
的构造函数可以看到将传入的invoker的list封装成静态的Directory
对象,里面的列表不会变。
private final List<Invoker<T>> invokers;public StaticDirectory(List<Invoker<T>> invokers) {
this(null, invokers, null);
}
public StaticDirectory(List<Invoker<T>> invokers, RouterChain<T> routerChain) {
this(null, invokers, routerChain);
}
public StaticDirectory(URL url, List<Invoker<T>> invokers) {
this(url, invokers, null);
}
public StaticDirectory(URL url, List<Invoker<T>> invokers, RouterChain<T> routerChain) {
super(url == null && CollectionUtils.isNotEmpty(invokers) ? invokers.get(0).getUrl() : url, routerChain);
if (CollectionUtils.isEmpty(invokers)) {
throw new IllegalArgumentException("invokers == null");
}
this.invokers = invokers;
}
RegistryDirectory是一种动态服务目录,实现了 NotifyListener 接口。当注册中心服务配置发生变化后,RegistryDirectory 可收到与当前服务相关的变化。收到变更通知后,RegistryDirectory 可根据配置变更信息刷新 Invoker 列表,接下来通过源码来分析一下RegistryDirectory。
- 列举invoker的list
public List<Invoker<T>> doList(Invocation invocation) {if (forbidden) {
// 1. No service provider 2. Service providers are disabled
throw new RpcException(RpcException.FORBIDDEN_EXCEPTION, "No provider available from registry " +
getUrl().getAddress() + " for service " + getConsumerUrl().getServiceKey() + " on consumer " +
NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() +
", please check status of providers(disabled, not registered or in blacklist).");
}
if (multiGroup) {
return this.invokers == null ? Collections.emptyList() : this.invokers;
}
List<Invoker<T>> invokers = null;
try {
// Get invokers from cache, only runtime routers will be executed.
invokers = routerChain.route(getConsumerUrl(), invocation);
} catch (Throwable t) {
logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t);
}
return invokers == null ? Collections.emptyList() : invokers;
}
- 接收服务变更通知,RegistryDirectory 是一个动态服务目录,它需要接受注册中心配置进行动态调整。因此 RegistryDirectory 实现了 NotifyListener 接口,通过这个接口获取注册中心变更通知,
notify
这个方法主要监听配置中心对应URL的变化,然后更新本地的参数配置并做provider list的整合。
public synchronized void notify(List<URL> urls) {Map<String, List<URL>> categoryUrls = urls.stream()
.filter(Objects::nonNull)
.filter(this::isValidCategory)
.filter(this::isNotCompatibleFor26x)
.collect(Collectors.groupingBy(this::judgeCategory));
List<URL> configuratorURLs = categoryUrls.getOrDefault(CONFIGURATORS_CATEGORY, Collections.emptyList());
this.configurators = Configurator.toConfigurators(configuratorURLs).orElse(this.configurators);
List<URL> routerURLs = categoryUrls.getOrDefault(ROUTERS_CATEGORY, Collections.emptyList());
toRouters(routerURLs).ifPresent(this::addRouters);
// providers
List<URL> providerURLs = categoryUrls.getOrDefault(PROVIDERS_CATEGORY, Collections.emptyList());
/**
* 3.x added for extend URL address
*/
ExtensionLoader<AddressListener> addressListenerExtensionLoader = ExtensionLoader.getExtensionLoader(AddressListener.class);
List<AddressListener> supportedListeners = addressListenerExtensionLoader.getActivateExtension(getUrl(), (String[]) null);
if (supportedListeners != null && !supportedListeners.isEmpty()) {
for (AddressListener addressListener : supportedListeners) {
providerURLs = addressListener.notify(providerURLs, getConsumerUrl(),this);
}
}
refreshOverrideAndInvoker(providerURLs);
}
- 刷新Invoker列表
private void refreshInvoker(List<URL> invokerUrls) {Assert.notNull(invokerUrls, "invokerUrls should not be null");
// invokerUrls的数量和协议头判断是否禁用所有的服务,如果禁用,则将 forbidden 设为 true,并销毁所有的 Invoker
if (invokerUrls.size() == 1
&& invokerUrls.get(0) != null
&& EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
this.forbidden = true; // Forbid to access
this.invokers = Collections.emptyList();
routerChain.setInvokers(this.invokers);
destroyAllInvokers(); // Close all invokers
} else {
// 若不禁用,则将url转成Invoker
this.forbidden = false; // Allow to access
Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
if (invokerUrls == Collections.<URL>emptyList()) {
invokerUrls = new ArrayList<>();
}
if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) {
invokerUrls.addAll(this.cachedInvokerUrls);
} else {
this.cachedInvokerUrls = new HashSet<>();
this.cachedInvokerUrls.addAll(invokerUrls);//Cached invoker urls, convenient for comparison
}
if (invokerUrls.isEmpty()) {
return;
}
Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map
/**
* If the calculation is wrong, it is not processed.
*
* 1. The protocol configured by the client is inconsistent with the protocol of the server.
* eg: consumer protocol = dubbo, provider only has other protocol services(rest).
* 2. The registration center is not robust and pushes illegal specification data.
*
*/
if (CollectionUtils.isEmptyMap(newUrlInvokerMap)) {
logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls
.toString()));
return;
}
List<Invoker<T>> newInvokers = Collections.unmodifiableList(new ArrayList<>(newUrlInvokerMap.values()));
// pre-route and build cache, notice that route cache should build on original Invoker list.
// toMergeMethodInvokerMap() will wrap some invokers having different groups, those wrapped invokers not should be routed.
routerChain.setInvokers(newInvokers);
this.invokers = multiGroup ? toMergeInvokerList(newInvokers) : newInvokers;
this.urlInvokerMap = newUrlInvokerMap;
try {
// 销毁无用的Invoker,避免服务消费者调用已下线的服务的服务
destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker
} catch (Exception e) {
logger.warn("destroyUnusedInvokers error. ", e);
}
}
}
private Map<String, Invoker<T>> toInvokers(List<URL> urls) {Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<>();
if (urls == null || urls.isEmpty()) {
return newUrlInvokerMap;
}
Set<String> keys = new HashSet<>();
// 获取服务消费端配置的协议
String queryProtocols = this.queryMap.get(PROTOCOL_KEY);
for (URL providerUrl : urls) {
// If protocol is configured at the reference side, only the matching protocol is selected
if (queryProtocols != null && queryProtocols.length() > 0) {
boolean accept = false;
String[] acceptProtocols = queryProtocols.split(",");
// 检测服务提供者协议是否被服务消费者所支持
for (String acceptProtocol : acceptProtocols) {
if (providerUrl.getProtocol().equals(acceptProtocol)) {
accept = true;
break;
}
}
if (!accept) {
// 若服务消费者协议头不被消费者所支持,则忽略当前 providerUrl
continue;
}
}
// 忽略 empty 协议
if (EMPTY_PROTOCOL.equals(providerUrl.getProtocol())) {
continue;
}
// 通过 SPI 检测服务端协议是否被消费端支持
if (!ExtensionLoader.getExtensionLoader(Protocol.class).hasExtension(providerUrl.getProtocol())) {
logger.error(new IllegalStateException("Unsupported protocol " + providerUrl.getProtocol() +
" in notified url: " + providerUrl + " from registry " + getUrl().getAddress() +
" to consumer " + NetUtils.getLocalHost() + ", supported protocol: " +
ExtensionLoader.getExtensionLoader(Protocol.class).getSupportedExtensions()));
continue;
}
// 合并 url
URL url = mergeUrl(providerUrl);
String key = url.toFullString(); // The parameter urls are sorted
if (keys.contains(key)) { // Repeated url
continue;
}
keys.add(key);
// 本地 Invoker 缓存列表
Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
/ / 缓存未命中
if (invoker == null) {
try {
boolean enabled = true;
// 获取 disable 配置,并修改 enable 变量
if (url.hasParameter(DISABLED_KEY)) {
enabled = !url.getParameter(DISABLED_KEY, false);
} else {
enabled = url.getParameter(ENABLED_KEY, true);
}
if (enabled) {
// 调用 refer 获取 Invoker
invoker = new InvokerDelegate<>(protocol.refer(serviceType, url), url, providerUrl);
}
} catch (Throwable t) {
logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);
}
if (invoker != null) {
// 缓存 Invoker 实例
newUrlInvokerMap.put(key, invoker);
}
} else {
// 缓存命中,将 invoker 存储到 newUrlInvokerMap 中
newUrlInvokerMap.put(key, invoker);
}
}
keys.clear();
return newUrlInvokerMap;
}
也就是注册中心有变化,则更新urlInvokerMap
和cachedInvokerUrls
的值,这就是官网提到的它的值可能是动态变化的,比如注册中心推送变更的原因所在。
以上是 Dubbo进阶:Directory实现 的全部内容, 来源链接: utcz.com/a/25398.html