【Java】Nacos - 事件的注册、取消与监听(EventDispatcher)

我们在Nacos - NacosNamingService初始化提过,NacosNamingService对象创建的时候,会创建一个EventDispatcher对象。EventDispatcher的构造方法如下,创建一个线程池,然后放入Notifier任务。

public EventDispatcher() {

this.executor = Executors.newSingleThreadExecutor(new ThreadFactory() {

@Override

public Thread newThread(Runnable r) {

Thread thread = new Thread(r, "com.alibaba.nacos.naming.client.listener");

thread.setDaemon(true);

return thread;

}

});

this.executor.execute(new Notifier());

}

Notifier的Runnable的类,所以放入线程池的时候,会执行run方法。他主要是从阻塞队列changedServices取出ServiceInfo,然后根据ServiceInfo的key取出他对应的EventListener集合,再执行EventListener的onEvent方法。

@Override

public void run() {

while (!closed) {

ServiceInfo serviceInfo = null;

try {

// changedServices是LinkedBlockingQueue,从阻塞队列取值

serviceInfo = changedServices.poll(5, TimeUnit.MINUTES);

} catch (Exception ignore) {

}

// 没取值,重新从阻塞队列取

if (serviceInfo == null) {

continue;

}

try {

// 从observerMap取到EventListener集合

List<EventListener> listeners = observerMap.get(serviceInfo.getKey());

if (!CollectionUtils.isEmpty(listeners)) {

for (EventListener listener : listeners) {

// 执行onEvent方法

List<Instance> hosts = Collections.unmodifiableList(serviceInfo.getHosts());

listener.onEvent(new NamingEvent(serviceInfo.getName(), serviceInfo.getGroupName(),

serviceInfo.getClusters(), hosts));

}

}

} catch (Exception e) {

NAMING_LOGGER.error("[NA] notify error for service: " + serviceInfo.getName() + ", clusters: "

+ serviceInfo.getClusters(), e);

}

}

}

在Nacos - 启动中,提到NacosWatch实例化的时候,就会调用namingService.subscribe,他会调用EventDispatcher#addListener方法,在这里会把监听放入observerMap的map里,然后调用serviceChanged方法。

public void addListener(ServiceInfo serviceInfo, String clusters, EventListener listener) {

NAMING_LOGGER.info("[LISTENER] adding " + serviceInfo.getName() + " with " + clusters + " to listener map");

List<EventListener> observers = Collections.synchronizedList(new ArrayList<EventListener>());

observers.add(listener);

observers = observerMap.putIfAbsent(ServiceInfo.getKey(serviceInfo.getName(), clusters), observers);

if (observers != null) {

observers.add(listener);

}

serviceChanged(serviceInfo);

}

当serviceChanged被调用的时候,就会往阻塞队列存入ServiceInfo。上面已经知道了有个循环任务一直从阻塞队列changedServices取值,这个值就是这么来的。

public void serviceChanged(ServiceInfo serviceInfo) {

if (serviceInfo == null) {

return;

}

changedServices.add(serviceInfo);

}

总结

这个类有两个比较重要的成员,一个是observerMap,他的key是serviceInfo.getKey(),value是EventListener集合。一个是changedServices,存放serviceInfo的LinkedBlockingQueue阻塞队列。这两个成员的关联关系通过serviceInfo.getKey()维持。
当调用addListener的时候,就会把serviceInfo存入到changedServices,以及serviceInfo.getKey()和EventListener集合存入到observerMap。
while(true)中,由于阻塞队列changedServices有值,就会从中拿到serviceInfo,再通过serviceInfo.getKey()拿到observerMap对应的EventListener集合,然后执行EventListener集合的EventListener.onEvent方法。
【Java】Nacos - 事件的注册、取消与监听(EventDispatcher)

以上是 【Java】Nacos - 事件的注册、取消与监听(EventDispatcher) 的全部内容, 来源链接: utcz.com/a/93992.html

回到顶部