【Java】Nacos - 服务端处理注册请求

Nacos - 客户端注册已经讲过了,那这里讲一下服务端是怎么处理请求的。
处理客户的请求在InstanceController里,我们看看register方法。

InstanceController#register

这里主要是封装Instance,并调用serviceManager的registerInstance方法进行服务注册。

public String register(HttpServletRequest request) throws Exception {

// 获取namespaceId

final String namespaceId = WebUtils

.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);

// 获取serviceName

final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);

// 验证serviceName的合法性

NamingUtils.checkServiceNameFormat(serviceName);

// 封装并验证Instance的合法性

final Instance instance = parseInstance(request);

// 服务注册

serviceManager.registerInstance(namespaceId, serviceName, instance);

return "ok";

}

ServiceManager#registerInstance

判断是否已经注册过,如果没有注册,则创建一个Service并注册,然后再添加实例。

public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {

// 是否已经注册过,如果没有注册,则创建一个Service并注册

createEmptyService(namespaceId, serviceName, instance.isEphemeral());

// 从注册的服务中取Service

Service service = getService(namespaceId, serviceName);

if (service == null) {

throw new NacosException(NacosException.INVALID_PARAM,

"service not found, namespace: " + namespaceId + ", service: " + serviceName);

}

// 添加实例

addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);

}

ServiceManager#createEmptyService

直接调用createServiceIfAbsent方法。

public void createEmptyService(String namespaceId, String serviceName, boolean local) throws NacosException {

createServiceIfAbsent(namespaceId, serviceName, local, null);

}

ServiceManager#createServiceIfAbsent

public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster)

throws NacosException {

// 获取Service

Service service = getService(namespaceId, serviceName);

if (service == null) {

// 没有获取到则创建

Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);

service = new Service();

service.setName(serviceName);

service.setNamespaceId(namespaceId);

service.setGroupName(NamingUtils.getGroupName(serviceName));

// now validate the service. if failed, exception will be thrown

service.setLastModifiedMillis(System.currentTimeMillis());

// checksum用于校验的

service.recalculateChecksum();

if (cluster != null) {

// 加入到集群

cluster.setService(service);

service.getClusterMap().put(cluster.getName(), cluster);

}

// 验证服务和集群名称的合法性

service.validate();

// 放入缓存并检查心跳

putServiceAndInit(service);

if (!local) {

// 一致性协议保存

addOrReplaceService(service);

}

}

}

ServiceManager#putServiceAndInit

Service存入serviceMap缓存,并每5秒健康检查

private void putServiceAndInit(Service service) throws NacosException {

// 存入serviceMap缓存

putService(service);

// 每5秒健康检查,包括service和cluster

service.init();

// 添加监听,包括临时和永久

consistencyService

.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);

consistencyService

.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);

Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());

}

ServiceManager#getService

从serviceMap缓存中取值。

public Service getService(String namespaceId, String serviceName) {

if (serviceMap.get(namespaceId) == null) {

return null;

}

return chooseServiceMap(namespaceId).get(serviceName);

}

public Map<String, Service> chooseServiceMap(String namespaceId) {

return serviceMap.get(namespaceId);

}

ServiceManager#addInstance

保存实例

public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)

throws NacosException {

// 获取实例key

String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);

// 从缓存serviceMap获取Service

Service service = getService(namespaceId, serviceName);

synchronized (service) {

// 获取service的所有实例

List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);

Instances instances = new Instances();

instances.setInstanceList(instanceList);

// 一致性保存,实际调用DistroConsistencyServiceImpl#put

consistencyService.put(key, instances);

}

}

ServiceManager#addIpAddresses

直接调用updateIpAddresses方法

private List<Instance> addIpAddresses(Service service, boolean ephemeral, Instance... ips) throws NacosException {

return updateIpAddresses(service, UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD, ephemeral, ips);

}

ServiceManager#updateIpAddresses

获取service的所有实例,这里会更新老数据

public List<Instance> updateIpAddresses(Service service, String action, boolean ephemeral, Instance... ips)

throws NacosException {

// 获取旧数据

Datum datum = consistencyService

.get(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), ephemeral));

// 获取集群中所有的实例

List<Instance> currentIPs = service.allIPs(ephemeral);

Map<String, Instance> currentInstances = new HashMap<>(currentIPs.size());

Set<String> currentInstanceIds = Sets.newHashSet();

// 遍历所有实例,key为ip+端口

for (Instance instance : currentIPs) {

currentInstances.put(instance.toIpAddr(), instance);

currentInstanceIds.add(instance.getInstanceId());

}

// 定义新数据

Map<String, Instance> instanceMap;

if (datum != null && null != datum.value) {

// 如果有老数据,通过currentInstances来更新健康状态和心跳时间

instanceMap = setValid(((Instances) datum.value).getInstanceList(), currentInstances);

} else {

// 没有就创建一个

instanceMap = new HashMap<>(ips.length);

}

for (Instance instance : ips) {

// 不存在,就创建一个Cluster集群,并开启健康检查

if (!service.getClusterMap().containsKey(instance.getClusterName())) {

// 重新创建一个

Cluster cluster = new Cluster(instance.getClusterName(), service);

// 开启健康检查

cluster.init();

service.getClusterMap().put(instance.getClusterName(), cluster);

Loggers.SRV_LOG

.warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",

instance.getClusterName(), instance.toJson());

}

// 删除操作的话,就删除实例

if (UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE.equals(action)) {

instanceMap.remove(instance.getDatumKey());

} else {

// 新增实例,设置唯一id

instance.setInstanceId(instance.generateInstanceId(currentInstanceIds));

instanceMap.put(instance.getDatumKey(), instance);

}

}

if (instanceMap.size() <= 0 && UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD.equals(action)) {

throw new IllegalArgumentException(

"ip list can not be empty, service: " + service.getName() + ", ip list: " + JacksonUtils

.toJson(instanceMap.values()));

}

// 返回所有实例

return new ArrayList<>(instanceMap.values());

}

DistroConsistencyServiceImpl#put

如果是临时,则加入缓存,并放入阻塞队列。

public void put(String key, Record value) throws NacosException {

onPut(key, value);

// 一致性服务

distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE,

globalConfig.getTaskDispatchPeriod() / 2);

}

public void onPut(String key, Record value) {

// 临时加入dataStore缓存

if (KeyBuilder.matchEphemeralInstanceListKey(key)) {

Datum<Instances> datum = new Datum<>();

datum.value = (Instances) value;

datum.key = key;

datum.timestamp.incrementAndGet();

dataStore.put(key, datum);

}

if (!listeners.containsKey(key)) {

return;

}

// 在notifier的阻塞队列加入ArrayBlockingQueue

notifier.addTask(key, DataOperation.CHANGE);

}

DistroConsistencyServiceImp.Notifierl#run

调用handle方法

@Override

public void run() {

Loggers.DISTRO.info("distro notifier started");

for (; ; ) {

try {

Pair<String, DataOperation> pair = tasks.take();

handle(pair);

} catch (Throwable e) {

Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);

}

}

}

DistroConsistencyServiceImp.Notifierl#handle

对事件进行通知

private void handle(Pair<String, DataOperation> pair) {

try {

String datumKey = pair.getValue0();

DataOperation action = pair.getValue1();

services.remove(datumKey);

int count = 0;

// 没有监听,返回

if (!listeners.containsKey(datumKey)) {

return;

}

for (RecordListener listener : listeners.get(datumKey)) {

count++;

try {

if (action == DataOperation.CHANGE) {

// 处理变更事件

listener.onChange(datumKey, dataStore.get(datumKey).value);

continue;

}

if (action == DataOperation.DELETE) {

// 处理删除事件

listener.onDelete(datumKey);

continue;

}

} catch (Throwable e) {

Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e);

}

}

if (Loggers.DISTRO.isDebugEnabled()) {

Loggers.DISTRO

.debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}",

datumKey, count, action.name());

}

} catch (Throwable e) {

Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);

}

}

总结

  1. 服务、集群如果不存在,则创建,并注册监听事件。
  2. 开启服务、集群的健康检查。
  3. 如果有旧服务数据,则更新健康状态和心跳时间。
  4. 节点的数据一致性。
  5. 调用监听。

【Java】Nacos - 服务端处理注册请求

以上是 【Java】Nacos - 服务端处理注册请求 的全部内容, 来源链接: utcz.com/a/96524.html

回到顶部