【Java】Nacos - 客户端实例列表获取

实例列表获取主要是HostReactor#getServiceInfo方法。Nacos - 启动中namingService.subscribe注册监听的时候,也会调用这个方法。

getServiceInfo

public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {

// 如果发生故障转移,就从文件缓存里取

NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());

String key = ServiceInfo.getKey(serviceName, clusters);

if (failoverReactor.isFailoverSwitch()) {

return failoverReactor.getService(key);

}

// 从serviceInfoMap里取

ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);

// serviceInfoMap没有

if (null == serviceObj) {

serviceObj = new ServiceInfo(serviceName, clusters);

serviceInfoMap.put(serviceObj.getKey(), serviceObj);

updatingMap.put(serviceName, new Object());

// 内存没有,从服务器取

updateServiceNow(serviceName, clusters);

updatingMap.remove(serviceName);

} else if (updatingMap.containsKey(serviceName)) {

// 如果正在更新,则wait,避免多线程同时调用服务器

if (UPDATE_HOLD_INTERVAL > 0) {

// hold a moment waiting for update finish

synchronized (serviceObj) {

try {

serviceObj.wait(UPDATE_HOLD_INTERVAL);

} catch (InterruptedException e) {

NAMING_LOGGER

.error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);

}

}

}

}

// 开启定时任务更新服务列表

scheduleUpdateIfAbsent(serviceName, clusters);

// 从内存里取

return serviceInfoMap.get(serviceObj.getKey());

}

updateServiceNow

从服务器获取,NamingProxy会调用NamingProxy#reqApi,他会随机取一个server,调用NamingProxy#callServer。NamingProxy的代码就略了。

private void updateServiceNow(String serviceName, String clusters) {

try {

updateService(serviceName, clusters);

} catch (NacosException e) {

NAMING_LOGGER.error("[NA] failed to update serviceName: " + serviceName, e);

}

}

public void updateService(String serviceName, String clusters) throws NacosException {

ServiceInfo oldService = getServiceInfo0(serviceName, clusters);

try {

// 从服务器取

String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUdpPort(), false);

if (StringUtils.isNotEmpty(result)) {

// 解析返回的字符串

processServiceJson(result);

}

} finally {

if (oldService != null) {

synchronized (oldService) {

oldService.notifyAll();

}

}

}

}

processServiceJson

主要是判断是否有更新,有更新发送给serviceChanged,并写入文件

public ServiceInfo processServiceJson(String json) {

ServiceInfo serviceInfo = JacksonUtils.toObj(json, ServiceInfo.class);

ServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey());

if (serviceInfo.getHosts() == null || !serviceInfo.validate()) {

return oldService;

}

boolean changed = false;

if (oldService != null) {

if (oldService.getLastRefTime() > serviceInfo.getLastRefTime()) {

NAMING_LOGGER.warn("out of date data received, old-t: " + oldService.getLastRefTime() + ", new-t: "

+ serviceInfo.getLastRefTime());

}

serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);

Map<String, Instance> oldHostMap = new HashMap<String, Instance>(oldService.getHosts().size());

for (Instance host : oldService.getHosts()) {

oldHostMap.put(host.toInetAddr(), host);

}

Map<String, Instance> newHostMap = new HashMap<String, Instance>(serviceInfo.getHosts().size());

for (Instance host : serviceInfo.getHosts()) {

newHostMap.put(host.toInetAddr(), host);

}

Set<Instance> modHosts = new HashSet<Instance>();

Set<Instance> newHosts = new HashSet<Instance>();

Set<Instance> remvHosts = new HashSet<Instance>();

// 下面是修改、新增、移除的筛选。

List<Map.Entry<String, Instance>> newServiceHosts = new ArrayList<Map.Entry<String, Instance>>(

newHostMap.entrySet());

for (Map.Entry<String, Instance> entry : newServiceHosts) {

Instance host = entry.getValue();

String key = entry.getKey();

if (oldHostMap.containsKey(key) && !StringUtils

.equals(host.toString(), oldHostMap.get(key).toString())) {

modHosts.add(host);

continue;

}

if (!oldHostMap.containsKey(key)) {

newHosts.add(host);

}

}

for (Map.Entry<String, Instance> entry : oldHostMap.entrySet()) {

Instance host = entry.getValue();

String key = entry.getKey();

if (newHostMap.containsKey(key)) {

continue;

}

if (!newHostMap.containsKey(key)) {

remvHosts.add(host);

}

}

if (newHosts.size() > 0) {

changed = true;

NAMING_LOGGER.info("new ips(" + newHosts.size() + ") service: " + serviceInfo.getKey() + " -> "

+ JacksonUtils.toJson(newHosts));

}

if (remvHosts.size() > 0) {

changed = true;

NAMING_LOGGER.info("removed ips(" + remvHosts.size() + ") service: " + serviceInfo.getKey() + " -> "

+ JacksonUtils.toJson(remvHosts));

}

if (modHosts.size() > 0) {

changed = true;

updateBeatInfo(modHosts);

NAMING_LOGGER.info("modified ips(" + modHosts.size() + ") service: " + serviceInfo.getKey() + " -> "

+ JacksonUtils.toJson(modHosts));

}

serviceInfo.setJsonFromServer(json);

// 有更新发送给serviceChanged,并写入文件

if (newHosts.size() > 0 || remvHosts.size() > 0 || modHosts.size() > 0) {

eventDispatcher.serviceChanged(serviceInfo);

DiskCache.write(serviceInfo, cacheDir);

}

} else {

changed = true;

NAMING_LOGGER.info("init new ips(" + serviceInfo.ipCount() + ") service: " + serviceInfo.getKey() + " -> "

+ JacksonUtils.toJson(serviceInfo.getHosts()));

serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);

// 有更新发送给serviceChanged,并写入文件

eventDispatcher.serviceChanged(serviceInfo);

serviceInfo.setJsonFromServer(json);

DiskCache.write(serviceInfo, cacheDir);

}

MetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size());

if (changed) {

NAMING_LOGGER.info("current ips:(" + serviceInfo.ipCount() + ") service: " + serviceInfo.getKey() + " -> "

+ JacksonUtils.toJson(serviceInfo.getHosts()));

}

return serviceInfo;

}

scheduleUpdateIfAbsent

把任务加入线程池

public void scheduleUpdateIfAbsent(String serviceName, String clusters) {

// 任务已经有了就不加了

if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {

return;

}

synchronized (futureMap) {

// // 任务已经有了就不加了

if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {

return;

}

// 把任务加入线程池

ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, clusters));

// 加入任务

futureMap.put(ServiceInfo.getKey(serviceName, clusters), future);

}

}

UpdateTask.run

定时任务更新,默认每10秒更新,如果失败了,就每次乘以2,比如第一次1秒,第二次2秒,第三次4秒,最多是2的6次方,最大等待60秒。

public void run() {

long delayTime = DEFAULT_DELAY;

try {

ServiceInfo serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));

// serviceInfoMap没有就直接更新

if (serviceObj == null) {

updateService(serviceName, clusters);

return;

}

if (serviceObj.getLastRefTime() <= lastRefTime) {

updateService(serviceName, clusters);

serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));

} else {

// if serviceName already updated by push, we should not override it

// since the push data may be different from pull through force push

refreshOnly(serviceName, clusters);

}

lastRefTime = serviceObj.getLastRefTime();

if (!eventDispatcher.isSubscribed(serviceName, clusters) && !futureMap

.containsKey(ServiceInfo.getKey(serviceName, clusters))) {

// abort the update task

NAMING_LOGGER.info("update task is stopped, service:" + serviceName + ", clusters:" + clusters);

return;

}

// 失败了就增加失败次数

if (CollectionUtils.isEmpty(serviceObj.getHosts())) {

incFailCount();

return;

}

delayTime = serviceObj.getCacheMillis();

// 成功就重置为1

resetFailCount();

} catch (Throwable e) {

incFailCount();

NAMING_LOGGER.warn("[NA] failed to update serviceName: " + serviceName, e);

} finally {

// 重新到线程池

executor.schedule(this, Math.min(delayTime << failCount, DEFAULT_DELAY * 60), TimeUnit.MILLISECONDS);

}

}

总结

结合Nacos - HostReactor的创建。

【Java】Nacos - 客户端实例列表获取

以上是 【Java】Nacos - 客户端实例列表获取 的全部内容, 来源链接: utcz.com/a/94083.html

回到顶部