Dubbo进阶(九):服务消费原理

之前在服务暴露原理中讲解了Dubbo是如何暴露服务的,这篇博客针对Dubbo消费原理一探究竟,主要从以下几个方面进行分析:

  • 注册中心的暴露;
  • 通过注册中心进行服务消费通知;
  • 直连服务进行消费。

服务消费端的详细过程

在这里插入图片描述

服务消费端启动时,将自身的信息注册到注册中心的目录,同时还订阅服务提供方的目录,当服务提供方的 URL 发生更改时,实时获取新的数据。

上图中可以看到,服务消费的流程与服务暴露的流程有点类似逆向的。从整体来看,Dubbo做服务消费也分为两大部分,第一步通过持有远程服务实例生成Invoker,这个Invoker再客户端是核心的远程代理对象。第二步会把Invoker通过动态代理转换成实现用户接口的动态代理引用。这个Invoker承载了网络连接、服务调用和重试等功能。

ReferenceConfig

在这里插入图片描述
org.apache.dubbo.config.ReferenceConfig类是ReferenceBean的父类,与生产端服务的ServiceBean一样,存放着解析出来的 XML 和注解信息。

服务初始化中转换的入口

当我们消费端调用本地接口就能实现远程服务的调用,这是怎么实现的呢?根据上面的流程图,来分析消费原理。

在消费端进行初始化时ReferenceConfig#init,会执行ReferenceConfig#createProxy来完成这一系列操作。以下为ReferenceConfig#createProxy主要的代码部分:

private T createProxy(Map<String, String> map) {

// 判断是否为 Jvm 本地引用

if (shouldJvmRefer(map)) {

// 通过 injvm 协议,获取本地服务

URL url = new URL(LOCAL_PROTOCOL, LOCALHOST_VALUE, 0, interfaceClass.getName()).addParameters(map);

invoker = REF_PROTOCOL.refer(interfaceClass, url);

} else {

urls.clear();

// 判断是否有自定义的直连地址,或注册中心地址

if (url != null && url.length() > 0) {

String[] us = SEMICOLON_SPLIT_PATTERN.split(url);

if (us != null && us.length > 0) {

for (String u : us) {

URL url = URL.valueOf(u);

if (StringUtils.isEmpty(url.getPath())) {

url = url.setPath(interfaceName);

}

if (UrlUtils.isRegistry(url)) {

// 如果是注册中心Protocol类型,则向地址中添加 refer 服务消费元数据

urls.add(url.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));

} else {

// 直连服务提供端

urls.add(ClusterUtils.mergeUrl(url, map));

}

}

}

} else {

// 组装注册中心的配置

if (!LOCAL_PROTOCOL.equalsIgnoreCase(getProtocol())) {

// 检查配置中心

checkRegistry();

List<URL> us = ConfigValidationUtils.loadRegistries(this, false);

if (CollectionUtils.isNotEmpty(us)) {

for (URL u : us) {

URL monitorUrl = ConfigValidationUtils.loadMonitor(this, u);

if (monitorUrl != null) {

// 监控上报信息

map.put(MONITOR_KEY, URL.encode(monitorUrl.toFullString()));

}

// 注册中心地址添加 refer 服务消费元数据

urls.add(u.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));

}

}

}

}

// 只有一条注册中心数据,即单注册中心

if (urls.size() == 1) {

// 将远程服务转化成 Invoker

invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0));

} else {

// 因为多注册中心就会存在多个 Invoker,这里用保存在 List 中

List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();

URL registryURL = null;

for (URL url : urls) {

// 将每个注册中心转换成 Invoker 数据

invokers.add(REF_PROTOCOL.refer(interfaceClass, url));

if (UrlUtils.isRegistry(url)) {

// 会覆盖前遍历的注册中心,使用最后一条注册中心数据

registryURL = url;

}

}

if (registryURL != null) {

// 默认使用 zone-aware 策略来处理多个订阅

URL u = registryURL.addParameterIfAbsent(CLUSTER_KEY, ZoneAwareCluster.NAME);

// 将转换后的多个 Invoker 合并成一个

invoker = CLUSTER.join(new StaticDirectory(u, invokers));

} else {

invoker = CLUSTER.join(new StaticDirectory(invokers));

}

}

}

// 利用动态代理,将 Invoker 转换成本地接口代理

return (T) PROXY_FACTORY.getProxy(invoker);

}

上面转换的过程中,主要可概括为:先分为本地引用和远程引用两类。本地就是以 inJvm 协议的获取本地服务,这不做过多说明;远程引用分为直连服务和通过注册中心。注册中心分为单注册中心和多注册中心的情况,单注册中心好解决,直接使用即可,多注册中心时,将转换后的 Invoker 合并成一个 Invoker。最后通过动态代理将 Invoker 转换成本地接口代理。

获取 Invoker 实例

由于本地服务时直接从缓存中获取,这里就注册中心的消费进行分析,上面代码片段中使用的是REF_PROTOCOL.refer进行转换,该方法代码:

public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {

// 获取服务的注册中心url,里面会设置注册中心的协议和移除 registry 的参数

url = getRegistryUrl(url);

// 获取注册中心实例

Registry registry = registryFactory.getRegistry(url);

if (RegistryService.class.equals(type)) {

return proxyFactory.getInvoker((T) registry, type, url);

}

// 获取服务消费元数据

Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));

// 从服务消费元数据中获取分组信息

String group = qs.get(GROUP_KEY);

if (group != null && group.length() > 0) {

if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {

// 执行 Invoker 转换工作

returndoRefer(getMergeableCluster(), registry, type, url);

}

}

// 执行 Invoker 转换工作

returndoRefer(cluster, registry, type, url);

}

上面主要是获取服务消费的注册中心实例和进行服务分组,最后调用doRefer方法进行转换工作,以下为doRefer的代码:

private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {

// 创建 RegistryDirectory 对象

RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);

// 设置注册中心

directory.setRegistry(registry);

// 设置协议

directory.setProtocol(protocol);

// directory.getConsumerUrl().getParameters() 是服务消费元数据

Map<String, String> parameters = new HashMap<String, String>(directory.getConsumerUrl().getParameters());

URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);

if (directory.isShouldRegister()) {

directory.setRegisteredConsumerUrl(subscribeUrl);

// 消费消息注册到注册中心

registry.register(directory.getRegisteredConsumerUrl());

}

directory.buildRouterChain(subscribeUrl);

// 服务消费者订阅:服务提供端,动态配置,路由的通知

directory.subscribe(toSubscribeUrl(subscribeUrl));

// 多个Invoker合并为一个

Invoker<T> invoker = cluster.join(directory);

List<RegistryProtocolListener> listeners = findRegistryProtocolListeners(url);

if (CollectionUtils.isEmpty(listeners)) {

return invoker;

}

RegistryInvokerWrapper<T> registryInvokerWrapper = new RegistryInvokerWrapper<>(directory, cluster, invoker, subscribeUrl);

for (RegistryProtocolListener listener : listeners) {

listener.onRefer(this, registryInvokerWrapper);

}

return registryInvokerWrapper;

}

上面实现主要是完成创建 RegistryDirectory 对象,将消费服务元数据注册到注册中心,通过 RegistryDirectory 对象里的信息,实现服务提供端,动态配置及路由的订阅相关功能。

RegistryDirectory 这个类实现了 NotifyListener 这个通知监听接口,当订阅的服务,配置或路由发生变化时,会接收到通知,进行相应改变:

public synchronized void notify(List<URL> urls) {

// 将服务提供方配置,路由配置,服务提供方的服务分别以不同的 key 保存在 Map 中

Map<String, List<URL>> categoryUrls = urls.stream()

.filter(Objects::nonNull)

.filter(this::isValidCategory)

.filter(this::isNotCompatibleFor26x)

.collect(Collectors.groupingBy(this::judgeCategory));

// 更新服务提供方配置

List<URL> configuratorURLs = categoryUrls.getOrDefault(CONFIGURATORS_CATEGORY, Collections.emptyList());

this.configurators = Configurator.toConfigurators(configuratorURLs).orElse(this.configurators);

// 更新路由配置

List<URL> routerURLs = categoryUrls.getOrDefault(ROUTERS_CATEGORY, Collections.emptyList());

toRouters(routerURLs).ifPresent(this::addRouters);

// 加载服务提供方的服务信息

List<URL> providerURLs = categoryUrls.getOrDefault(PROVIDERS_CATEGORY, Collections.emptyList());

/**

* 3.x added for extend URL address

*/

ExtensionLoader<AddressListener> addressListenerExtensionLoader = ExtensionLoader.getExtensionLoader(AddressListener.class);

List<AddressListener> supportedListeners = addressListenerExtensionLoader.getActivateExtension(getUrl(), (String[]) null);

if (supportedListeners != null && !supportedListeners.isEmpty()) {

for (AddressListener addressListener : supportedListeners) {

providerURLs = addressListener.notify(providerURLs, getConsumerUrl(), this);

}

}

// 重新加载 Invoker 实例

refreshOverrideAndInvoker(providerURLs);

}

RegistryDirectory#notify里面最后会刷新 Invoker 进行重新加载,下面是核心代码的实现:

private void refreshOverrideAndInvoker(List<URL> urls) {

// mock zookeeper://xxx?mock=return null

overrideDirectoryUrl();

// 刷新 invoker

refreshInvoker(urls);

}

private void refreshInvoker(List<URL> invokerUrls) {

Assert.notNull(invokerUrls, "invokerUrls should not be null");

if (invokerUrls.size() == 1

&& invokerUrls.get(0) != null

&& EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {

this.forbidden = true; // Forbid to access

this.invokers = Collections.emptyList();

routerChain.setInvokers(this.invokers);

destroyAllInvokers(); // Close all invokers

} else {

// 刷新之前的 Invoker

this.forbidden = false; // Allow to access

Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference

if (invokerUrls == Collections.<URL>emptyList()) {

invokerUrls = new ArrayList<>();

}

if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) {

invokerUrls.addAll(this.cachedInvokerUrls);

} else {

this.cachedInvokerUrls = new HashSet<>();

this.cachedInvokerUrls.addAll(invokerUrls);//Cached invoker urls, convenient for comparison

}

if (invokerUrls.isEmpty()) {

return;

}

// 加载新的 Invoker Map

Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map

/**

* If the calculation is wrong, it is not processed.

*

* 1. The protocol configured by the client is inconsistent with the protocol of the server.

* eg: consumer protocol = dubbo, provider only has other protocol services(rest).

* 2. The registration center is not robust and pushes illegal specification data.

*

*/

if (CollectionUtils.isEmptyMap(newUrlInvokerMap)) {

logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls

.toString()));

return;

}

// 获取新的 Invokers

List<Invoker<T>> newInvokers = Collections.unmodifiableList(new ArrayList<>(newUrlInvokerMap.values()));

// 缓存新的 Invokers

routerChain.setInvokers(newInvokers);

this.invokers = multiGroup ? toMergeInvokerList(newInvokers) : newInvokers;

this.urlInvokerMap = newUrlInvokerMap;

try {

// 通过新旧 Invokers 对比,销毁无用的 Invokers

destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker

} catch (Exception e) {

logger.warn("destroyUnusedInvokers error. ", e);

}

}

}

获取刷新前后的 Invokers,将新的 Invokers 重新缓存起来,通过对比,销毁无用的 Invoker。

上面将 URL 转换 Invoker 是在RegistryDirectory#toInvokers中进行。

private Map<String, Invoker<T>> toInvokers(List<URL> urls) {

Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<>();

if (urls == null || urls.isEmpty()) {

return newUrlInvokerMap;

}

Set<String> keys = new HashSet<>();

String queryProtocols = this.queryMap.get(PROTOCOL_KEY);

for (URL providerUrl : urls) {

// If protocol is configured at the reference side, only the matching protocol is selected

if (queryProtocols != null && queryProtocols.length() > 0) {

boolean accept = false;

String[] acceptProtocols = queryProtocols.split(",");

for (String acceptProtocol : acceptProtocols) {

if (providerUrl.getProtocol().equals(acceptProtocol)) {

accept = true;

break;

}

}

if (!accept) {

continue;

}

}

if (EMPTY_PROTOCOL.equals(providerUrl.getProtocol())) {

continue;

}

if (!ExtensionLoader.getExtensionLoader(Protocol.class).hasExtension(providerUrl.getProtocol())) {

logger.error(new IllegalStateException("Unsupported protocol " + providerUrl.getProtocol() +

" in notified url: " + providerUrl + " from registry " + getUrl().getAddress() +

" to consumer " + NetUtils.getLocalHost() + ", supported protocol: " +

ExtensionLoader.getExtensionLoader(Protocol.class).getSupportedExtensions()));

continue;

}

// 合并服务提供端配置数据

URL url = mergeUrl(providerUrl);

// 过滤重复的服务提供端配置数据

String key = url.toFullString(); // The parameter urls are sorted

if (keys.contains(key)) { // Repeated url

continue;

}

keys.add(key);

// 缓存键是不与使用者端参数合并的url,无论使用者如何合并参数,如果服务器url更改,则再次引用

Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference

Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);

// 缓存无对应 invoker,再次调用 protocol#refer 是否有数据

if (invoker == null) {

try {

boolean enabled = true;

if (url.hasParameter(DISABLED_KEY)) {

enabled = !url.getParameter(DISABLED_KEY, false);

} else {

enabled = url.getParameter(ENABLED_KEY, true);

}

if (enabled) {

invoker = new InvokerDelegate<>(protocol.refer(serviceType, url), url, providerUrl);

}

} catch (Throwable t) {

logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);

}

// 将新的 Invoker 缓存起来

if (invoker != null) { // Put new invoker in cache

newUrlInvokerMap.put(key, invoker);

}

} else {

// 缓存里有数据,则进行重新覆盖

newUrlInvokerMap.put(key, invoker);

}

}

keys.clear();

return newUrlInvokerMap;

}

以上是 Dubbo进阶(九):服务消费原理 的全部内容, 来源链接: utcz.com/a/25318.html

回到顶部