【Java】Nacos - 服务端处理心跳请求

服务端用InstanceController#beat方法接收心跳请求。

InstanceController#beat

这里会判断是否已经有实例,如果没有就创建实例,然后再开始检查心跳。

public ObjectNode beat(HttpServletRequest request) throws Exception {

ObjectNode result = JacksonUtils.createEmptyJsonNode();

// 设置心跳时间,会直接改客户端的心跳时间

result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, switchDomain.getClientBeatInterval());

String beat = WebUtils.optional(request, "beat", StringUtils.EMPTY);

// 其他略

// 通过namespaceId, serviceName, clusterName, ip, port获取Instance

Instance instance = serviceManager.getInstance(namespaceId, serviceName, clusterName, ip, port);

// 如果没有,则注册

if (instance == null) {

// 这个是通过beat判断的,如果是第一次,则beat有信息,就会创建clientBeat

// 如果不是第一次,正常instance不为空的,所以此时为空说明可能被移除了

if (clientBeat == null) {

result.put(CommonParams.CODE, NamingResponseCode.RESOURCE_NOT_FOUND);

return result;

}

// 其他略

// 注册

serviceManager.registerInstance(namespaceId, serviceName, instance);

}

// 从serviceMap缓存获取Service

Service service = serviceManager.getService(namespaceId, serviceName);

if (service == null) {

throw new NacosException(NacosException.SERVER_ERROR,

"service not found: " + serviceName + "@" + namespaceId);

}

// 不是第一次,组装clientBeat

if (clientBeat == null) {

clientBeat = new RsInfo();

clientBeat.setIp(ip);

clientBeat.setPort(port);

clientBeat.setCluster(clusterName);

}

// 处理心跳

service.processClientBeat(clientBeat);

result.put(CommonParams.CODE, NamingResponseCode.OK);

if (instance.containsMetadata(PreservedMetadataKeys.HEART_BEAT_INTERVAL)) {

result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, instance.getInstanceHeartBeatInterval());

}

result.put(SwitchEntry.LIGHT_BEAT_ENABLED, switchDomain.isLightBeatEnabled());

return result;

}

ServiceManager#getInstance

通过ip和端口获取实例

public Instance getInstance(String namespaceId, String serviceName, String cluster, String ip, int port) {

// 从serviceMap缓存获取Service

Service service = getService(namespaceId, serviceName);

if (service == null) {

return null;

}

List<String> clusters = new ArrayList<>();

clusters.add(cluster);

// 从clusters集群获取Instance集合

List<Instance> ips = service.allIPs(clusters);

if (ips == null || ips.isEmpty()) {

return null;

}

// 通过ip和端口获取实例

for (Instance instance : ips) {

if (instance.getIp().equals(ip) && instance.getPort() == port) {

return instance;

}

}

return null;

}

Service#processClientBeat

封装Runnable对象,放入线程池。

public void processClientBeat(final RsInfo rsInfo) {

// 创建ClientBeatProcessor对象,这个是Runnable,所以线程池会调用他的run方法

ClientBeatProcessor clientBeatProcessor = new ClientBeatProcessor();

clientBeatProcessor.setService(this);

clientBeatProcessor.setRsInfo(rsInfo);

HealthCheckReactor.scheduleNow(clientBeatProcessor);

}

ClientBeatProcessor#run

找到对应的Instance,设置最后心跳时间,并设置为健康的,最后广播消息。

public void run() {

Service service = this.service;

if (Loggers.EVT_LOG.isDebugEnabled()) {

Loggers.EVT_LOG.debug("[CLIENT-BEAT] processing beat: {}", rsInfo.toString());

}

String ip = rsInfo.getIp();

String clusterName = rsInfo.getCluster();

int port = rsInfo.getPort();

Cluster cluster = service.getClusterMap().get(clusterName);

// 获取所有Instance

List<Instance> instances = cluster.allIPs(true);

for (Instance instance : instances) {

// 通过ip和端口获取Instance

if (instance.getIp().equals(ip) && instance.getPort() == port) {

if (Loggers.EVT_LOG.isDebugEnabled()) {

Loggers.EVT_LOG.debug("[CLIENT-BEAT] refresh beat: {}", rsInfo.toString());

}

// 设置最后心跳时间

instance.setLastBeat(System.currentTimeMillis());

// 没有被标记且不不健康的,设置为健康

if (!instance.isMarked()) {

if (!instance.isHealthy()) {

instance.setHealthy(true);

Loggers.EVT_LOG

.info("service: {} {POS} {IP-ENABLED} valid: {}:{}@{}, region: {}, msg: client beat ok",

cluster.getService().getName(), ip, port, cluster.getName(),

UtilsAndCommons.LOCALHOST_SITE);

// 广播消息

getPushService().serviceChanged(service);

}

}

}

}

}

PushService#onApplicationEvent

广播消息后,监听ServiceChangeEvent类型的类会调用onApplicationEvent方法。这里主要是封装UDP数据并发送。

public void onApplicationEvent(ServiceChangeEvent event) {

Service service = event.getService();

String serviceName = service.getName();

String namespaceId = service.getNamespaceId();

Future future = GlobalExecutor.scheduleUdpSender(() -> {

try {

Loggers.PUSH.info(serviceName + " is changed, add it to push queue.");

ConcurrentMap<String, PushClient> clients = clientMap

.get(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));

if (MapUtils.isEmpty(clients)) {

return;

}

Map<String, Object> cache = new HashMap<>(16);

long lastRefTime = System.nanoTime();

// 遍历PushClient集合

for (PushClient client : clients.values()) {

// 过期了就算了

if (client.zombie()) {

Loggers.PUSH.debug("client is zombie: " + client.toString());

clients.remove(client.toString());

Loggers.PUSH.debug("client is zombie: " + client.toString());

continue;

}

Receiver.AckEntry ackEntry;

Loggers.PUSH.debug("push serviceName: {} to client: {}", serviceName, client.toString());

String key = getPushCacheKey(serviceName, client.getIp(), client.getAgent());

byte[] compressData = null;

Map<String, Object> data = null;

if (switchDomain.getDefaultPushCacheMillis() >= 20000 && cache.containsKey(key)) {

org.javatuples.Pair pair = (org.javatuples.Pair) cache.get(key);

compressData = (byte[]) (pair.getValue0());

data = (Map<String, Object>) pair.getValue1();

Loggers.PUSH.debug("[PUSH-CACHE] cache hit: {}:{}", serviceName, client.getAddrStr());

}

// 封装UDP数据,如果数据大于1kb则压缩,compressIfNecessary这个方法判断

if (compressData != null) {

ackEntry = prepareAckEntry(client, compressData, data, lastRefTime);

} else {

ackEntry = prepareAckEntry(client, prepareHostsData(client), lastRefTime);

if (ackEntry != null) {

cache.put(key, new org.javatuples.Pair<>(ackEntry.origin.getData(), ackEntry.data));

}

}

Loggers.PUSH.info("serviceName: {} changed, schedule push for: {}, agent: {}, key: {}",

client.getServiceName(), client.getAddrStr(), client.getAgent(),

(ackEntry == null ? null : ackEntry.key));

// 发送udp数据

udpPush(ackEntry);

}

} catch (Exception e) {

Loggers.PUSH.error("[NACOS-PUSH] failed to push serviceName: {} to client, error: {}", serviceName, e);

} finally {

futureMap.remove(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));

}

}, 1000, TimeUnit.MILLISECONDS);

futureMap.put(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName), future);

}

PushService#udpPush

发送UDP数据,会重试10次。每10秒检查一次。

private static Receiver.AckEntry udpPush(Receiver.AckEntry ackEntry) {

if (ackEntry == null) {

Loggers.PUSH.error("[NACOS-PUSH] ackEntry is null.");

return null;

}

// 重试最大次数还没成功,就删除ackMap和udpSendTimeMap的内容

if (ackEntry.getRetryTimes() > MAX_RETRY_TIMES) {

Loggers.PUSH.warn("max re-push times reached, retry times {}, key: {}", ackEntry.retryTimes, ackEntry.key);

ackMap.remove(ackEntry.key);

udpSendTimeMap.remove(ackEntry.key);

failedPush += 1;

return ackEntry;

}

try {

if (!ackMap.containsKey(ackEntry.key)) {

totalPush++;

}

ackMap.put(ackEntry.key, ackEntry);

udpSendTimeMap.put(ackEntry.key, System.currentTimeMillis());

Loggers.PUSH.info("send udp packet: " + ackEntry.key);

// udp发送

udpSocket.send(ackEntry.origin);

ackEntry.increaseRetryTime();

// 10秒检查一次

GlobalExecutor.scheduleRetransmitter(new Retransmitter(ackEntry),

TimeUnit.NANOSECONDS.toMillis(ACK_TIMEOUT_NANOS), TimeUnit.MILLISECONDS);

return ackEntry;

} catch (Exception e) {

Loggers.PUSH.error("[NACOS-PUSH] failed to push data: {} to client: {}, error: {}", ackEntry.data,

ackEntry.origin.getAddress().getHostAddress(), e);

ackMap.remove(ackEntry.key);

udpSendTimeMap.remove(ackEntry.key);

failedPush += 1;

return null;

}

}

Retransmitter#run

每10秒检查是否发送成功,如果没发送成功,就继续发送,最多10次。

public void run() {

if (ackMap.containsKey(ackEntry.key)) {

Loggers.PUSH.info("retry to push data, key: " + ackEntry.key);

udpPush(ackEntry);

}

}

Receiver#run

PushService创建的时候,会开启Receiver的线程。

static {

// 其他略

Receiver receiver = new Receiver();

Thread inThread = new Thread(receiver);

inThread.setDaemon(true);

inThread.setName("com.alibaba.nacos.naming.push.receiver");

inThread.start();

// 其他略

}

他这里会有个while(true),收到请求后移除ackMap对应的key。

public void run() {

while (true) {

// 其他略

String ackKey = getAckKey(ip, port, ackPacket.lastRefTime);

AckEntry ackEntry = ackMap.remove(ackKey);

// 其他略

}

}

广播总结

广播的时候,会往ackMap存入值,广播过程失败就从ackMap移除对应的值。有时候UDP请求不成功,那这个值一直会在ackMap,这个时候,Retransmitter每隔10秒就会去ackMap看看有没有成功,如果没有成功,他就会去重试,直至到达重试最大次数。另外还有一个线程,会去监听UDP响应,如果收到了响应,就会从ackMap移除对应的值。这个UDP是发送给客户端的,Nacos - HostReactor的创建提到了收到请求后的处理,让客户端自己去更新信息。
【Java】Nacos - 服务端处理心跳请求

心跳总结

主要是收到心跳请求后,更新心跳的时间、健康状态以及广播
【Java】Nacos - 服务端处理心跳请求

以上是 【Java】Nacos - 服务端处理心跳请求 的全部内容, 来源链接: utcz.com/a/94483.html

回到顶部