java开发RocketMQ之NameServer路由管理源码分析

1.前言

NameServer主要作用是为消息消费者和消息生产者提供关于主题Topic的路由信息,那么NameServer需要存储路由的基本信息,还要管理Broker节点,包括路由注册、路由删除等。

2.路由元信息

路由元信息主要由RouteInfoManager来进行管理,这个类在上一篇文章中已经介绍过,这里再做一下简单的介绍。

private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;

private final ReadWriteLock lock = new ReentrantReadWriteLock();

private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;

private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;

private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;

private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;

private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;

BROKER_CHANNEL_EXPIRED_TIME:NameServer与Broker空闲连接时长,在2minNameServer之内没有收到Broker的心跳包,则NameServer会关闭与该Broker的连接并删除Broker的路由信息。

lock:读写锁,用来保护以下用于存储关键信息的非线程安全容器HashMap。

topicQueueTable:Topic消息队列路由信息,消息发送时根据路由表进行负载均衡

brokerAddrTable:Broker基础信息,包括brokerName、所属集群名称、主备Broker地址

clusterAddrTable:Broker集群信息,存储集群中所有Broker名称

brokerLiveTable:Broker状态信息,NameServer每次收到心跳包会对该表进行信息维护。

filterServerTable:Broker上的FilterServer列表,用于类模式消息过滤。

RocketMQ基于定于发布机制,一个Topic拥有多个消息队列,一个Broker为每一个主题创建4个读队列和4个写队列。多个Broker组成一个集群,集群由相同的多台Broker组成Master-Slave架构,brokerId为0代表Master,大于0为Slave。BrokerLiveInfo中的lastUpdateTimestamp存储上次收到Broker心跳包的时间。

3.路由注册

以下是路由注册的时序图:

在这里插入图片描述

RocketMQ的路由注册功能是通过Broker与NameServer的心跳机制实现的。Broker启动时向NameServer集群发送心跳信息,每隔30s向集群所有NameServer发送心跳包,NameServer接收到之后会对brokerLiveTable中的BrokerLiveInfo的lastUpdateTimeStamp属性值进行更新。此外NameServer会每隔10s扫描brokerLiveTable,如果检测到有Broker连续120s没有发送心跳包,则NameServer将移除Broker的路由信息同时关闭Socket连接。

源码

3.1Broker路由注册

BrokerController#start

//...进行检查

if (!messageStoreConfig.isEnableDLegerCommitLog()) {

startProcessorByHa(messageStoreConfig.getBrokerRole());

handleSlaveSynchronize(messageStoreConfig.getBrokerRole());

//注册Broker信息

this.registerBrokerAll(true, false, true);

}

//创建定时任务线程池

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override

public void run() {

try {

//每隔三十秒注册Broker信息

BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());

} catch (Throwable e) {

log.error("registerBrokerAll Exception", e);

}

}

}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);

if (this.brokerStatsManager != null) {

this.brokerStatsManager.start();

}

if (this.brokerFastFailure != null) {

this.brokerFastFailure.start();

}

}

BrokerController#registerBrokerAll

if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),

this.getBrokerAddr(),

this.brokerConfig.getBrokerName(),

this.brokerConfig.getBrokerId(),

this.brokerConfig.getRegisterBrokerTimeoutMills())) {

//开始注册

doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);

}

BrokerController#doRegisterBrokerAll

//进入brokerOuterAPI#registerBrokerAll

List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll(

this.brokerConfig.getBrokerClusterName(),

this.getBrokerAddr(),

this.brokerConfig.getBrokerName(),

this.brokerConfig.getBrokerId(),

this.getHAServerAddr(),

topicConfigWrapper,

this.filterServerManager.buildNewFilterServerList(),

oneway,

this.brokerConfig.getRegisterBrokerTimeoutMills(),

this.brokerConfig.isCompressedRegister());

BrokerOuterAPI#registerBrokerAll

final List<RegisterBrokerResult> registerBrokerResultList = new CopyOnWriteArrayList<>();

//获取nameServer地址信息

List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();

if (nameServerAddressList != null && nameServerAddressList.size() > 0) {

//进行请求头的封装

final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();

requestHeader.setBrokerAddr(brokerAddr);

requestHeader.setBrokerId(brokerId);

requestHeader.setBrokerName(brokerName);

requestHeader.setClusterName(clusterName);

requestHeader.setHaServerAddr(haServerAddr);

requestHeader.setCompressed(compressed);

//封装注册请求体

RegisterBrokerBody requestBody = new RegisterBrokerBody();

requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);

requestBody.setFilterServerList(filterServerList);

final byte[] body = requestBody.encode(compressed);

final int bodyCrc32 = UtilAll.crc32(body);

requestHeader.setBodyCrc32(bodyCrc32);

final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());

//遍历nameServer地址 ---> 分别向各个nameServer注册Broker

for (final String namesrvAddr : nameServerAddressList) {

brokerOuterExecutor.execute(new Runnable() {

@Override

public void run() {

try {

//向nameServer注册Broker并返回注册结果

RegisterBrokerResult result = registerBroker(namesrvAddr, oneway, timeoutMills, requestHeader, body);

if (result != null) {

registerBrokerResultList.add(result);

}

log.info("register broker[{}]to name server {} OK", brokerId, namesrvAddr);

} catch (Exception e) {

log.warn("registerBroker Exception, {}", namesrvAddr, e);

} finally {

countDownLatch.countDown();

}

}

});

}

try {

countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);

} catch (InterruptedException e) {

}

}

return registerBrokerResultList;

BrokerOuterAPI#registerBroker

//如果是oneway发送 无需关注返回结果 直接返回null

if (oneway) {

try {

this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMills);

} catch (RemotingTooMuchRequestException e) {

// Ignore

log.error("RemotingTooMuchRequestException!!!");

}

return null;

}

//返回远程调用的响应结果

RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMills);

3.2NameServer处理路由注册

以下是NameServer处理路由注册的时序图:

在这里插入图片描述

请求发送至NameServer,会由默认的网络处理解析类DefaultRequestProcessor进行处理解析,如果请求类型是REGISTER_BROKER则会将请求转发至RouteInfoManager进行路由的注册。

源码

DefaultRequestProcessor#processRequest

switch (request.getCode()) {

case RequestCode.REGISTER_BROKER:

Version brokerVersion = MQVersion.value2Version(request.getVersion());

//高版本进入这个分支

if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {

return this.registerBrokerWithFilterServer(ctx, request);

} else {

return this.registerBroker(ctx, request);

}

}

DefaultRequestProcessor#registerBrokerWithFilterServer

//转发到NameServerController中的RouteInfoManager进行路由注册

RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker(

requestHeader.getClusterName(),

requestHeader.getBrokerAddr(),

requestHeader.getBrokerName(),

requestHeader.getBrokerId(),

requestHeader.getHaServerAddr(),

registerBrokerBody.getTopicConfigSerializeWrapper(),

registerBrokerBody.getFilterServerList(),

ctx.channel());

RouteInfoManager#registerBroker

//加入写锁-保护HashTable线程安全

this.lock.writeLock().lockInterruptibly();

//维护clusterAddrTable

Set<String> brokerNames = this.clusterAddrTable.get(clusterName);

if (null == brokerNames) {

brokerNames = new HashSet<String>();

this.clusterAddrTable.put(clusterName, brokerNames);

}

brokerNames.add(brokerName);

//维护brokerAddrTable

//是否是第一次注册 默认为false

boolean registerFirst = false;

BrokerData brokerData = this.brokerAddrTable.get(brokerName);

//如果在brokerAddrTable没有该Broker信息

if (null == brokerData) {

//设置为第一次注册

registerFirst = true;

//创建对应的BrokerData信息

brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());

this.brokerAddrTable.put(brokerName, brokerData);

}

//不是第一次注册-对Broker信息进行更新

Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();

//Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT>

//The same IP:PORT must only have one record in brokerAddrTable

Iterator<Entry<Long, String>> it = brokerAddrsMap.entrySet().iterator();

while (it.hasNext()) {

Entry<Long, String> item = it.next();

if (null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey()) {

it.remove();

}

}

String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);

registerFirst = registerFirst || (null == oldAddr);

//维护topicQueueTable

if (null != topicConfigWrapper

&& MixAll.MASTER_ID == brokerId) {

if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())

|| registerFirst) {

ConcurrentMap<String, TopicConfig> tcTable =

topicConfigWrapper.getTopicConfigTable();

if (tcTable != null) {

for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {

//具体细节在这个方法中

this.createAndUpdateQueueData(brokerName, entry.getValue());

}

}

}

}

//维护brokerLiveTable

BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,

new BrokerLiveInfo(

System.currentTimeMillis(),

topicConfigWrapper.getDataVersion(),

channel,

haServerAddr));

if (null == prevBrokerLiveInfo) {

log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr);

}

//维护filterServerTable

if (filterServerList != null) {

if (filterServerList.isEmpty()) {

this.filterServerTable.remove(brokerAddr);

} else {

this.filterServerTable.put(brokerAddr, filterServerList);

}

}

if (MixAll.MASTER_ID != brokerId) {

String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);

if (masterAddr != null) {

BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);

if (brokerLiveInfo != null) {

result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());

result.setMasterAddr(masterAddr);

}

}

}

RouteInfoManager#createAndUpdateQueueData

//创建并封装新的队列信息

QueueData queueData = new QueueData();

//Broker名称

queueData.setBrokerName(brokerName);

//读写队列个数

queueData.setWriteQueueNums(topicConfig.getWriteQueueNums());

queueData.setReadQueueNums(topicConfig.getReadQueueNums());

//操作权限

queueData.setPerm(topicConfig.getPerm());

//同步复制还是异步复制的标识

queueData.setTopicSysFlag(topicConfig.getTopicSysFlag());

//根据Topic从topicQueueTable中获取队列集合

List<QueueData> queueDataList = this.topicQueueTable.get(topicConfig.getTopicName());

//如果没有获取到

if (null == queueDataList) {

//创建并将新的queueData添加到集合中

queueDataList = new LinkedList<QueueData>();

queueDataList.add(queueData);

//放入topicQueueTable

this.topicQueueTable.put(topicConfig.getTopicName(), queueDataList);

log.info("new topic registered, {} {}", topicConfig.getTopicName(), queueData);

}

//queueDataList不为空

else {

boolean addNewOne = true;

Iterator<QueueData> it = queueDataList.iterator();

//对该主题已有队列信息进行遍历

while (it.hasNext()) {

QueueData qd = it.next();

//如果找到相同主题

if (qd.getBrokerName().equals(brokerName)) {

//已经存在

if (qd.equals(queueData)) {

addNewOne = false;

} else {

//进行更改-先删除后新增

log.info("topic changed, {} OLD: {} NEW: {}", topicConfig.getTopicName(), qd,

queueData);

it.remove();

}

}

}

//添加一个新的队列信息

if (addNewOne) {

queueDataList.add(queueData);

}

}

3.3路由删除

NameServer会每隔10s扫描brokerLiveTable状态表,如果BrokerLivelastUpdateTimestamp的时间戳距当前时间超过120s,则认为Broker失效,移除该Broker,关闭与Broker连接,同时更新topicQueueTablebrokerAddrTablebrokerLiveTablefilterServerTable

删除路由信息的两个触发条件:

  • NameServer定期扫描brokerLiveTable检测上次心跳包发送时间与当前系统的时间差,如果时间超过120s,则需要移除该Broker路由信息
  • Broker正常关闭,执行unregisterBroker

3.3.1Broker异常关闭

在这里插入图片描述

在这里插入图片描述

NamesrvController#initialize

//定时任务线程池--->每隔十秒扫描活跃状态异常的Broker信息

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

/**

* 对Not Active Broker 进行扫描

*/

@Override

public void run() {

//在routeInfoManager中进行路由删除

NamesrvController.this.routeInfoManager.scanNotActiveBroker();

}

}, 5, 10, TimeUnit.SECONDS);

RouteInfoManager#scanNotActiveBroker

//遍历brokerLiveTable 查看是否有broker状态长时间没有更新

Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();

//遍历brokerLiveTable

while (it.hasNext()) {

Entry<String, BrokerLiveInfo> next = it.next();

long last = next.getValue().getLastUpdateTimestamp();

//如果收到心跳包的时间间距超过120s

if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {

//关闭连接

RemotingUtil.closeChannel(next.getValue().getChannel());

//移除broker

it.remove();

log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);

//维护路由表

this.onChannelDestroy(next.getKey(), next.getValue().getChannel());

}

}

RouteInfoManager#onChannelDestroy

//待删除的BrokerAddress

String brokerAddrFound = null;

if (channel != null) {

try {

try {

//加入读锁

this.lock.readLock().lockInterruptibly();

Iterator<Entry<String, BrokerLiveInfo>> itBrokerLiveTable =

this.brokerLiveTable.entrySet().iterator();

//遍历查找待删除的Broker

while (itBrokerLiveTable.hasNext()) {

Entry<String, BrokerLiveInfo> entry = itBrokerLiveTable.next();

if (entry.getValue().getChannel() == channel) {

brokerAddrFound = entry.getKey();

break;

}

}

} finally {

//释放锁

this.lock.readLock().unlock();

}

} catch (Exception e) {

log.error("onChannelDestroy Exception", e);

}

}

//加入写锁

//根据brokerAddrFound从brokerLiveTable和filterServerTable移除对应的Broker信息

this.lock.writeLock().lockInterruptibly();

this.brokerLiveTable.remove(brokerAddrFound);

this.filterServerTable.remove(brokerAddrFound);

//维护brokerAddrTable

String brokerNameFound = null;

boolean removeBrokerName = false;

Iterator<Entry<String, BrokerData>> itBrokerAddrTable =

this.brokerAddrTable.entrySet().iterator();

//遍历brokerAddrTable

while (itBrokerAddrTable.hasNext() && (null == brokerNameFound)) {

BrokerData brokerData = itBrokerAddrTable.next().getValue();

//遍历brokerAddress

Iterator<Entry<Long, String>> it = brokerData.getBrokerAddrs().entrySet().iterator();

while (it.hasNext()) {

Entry<Long, String> entry = it.next();

Long brokerId = entry.getKey();

String brokerAddr = entry.getValue();

//遍历到匹配的broker

if (brokerAddr.equals(brokerAddrFound)) {

brokerNameFound = brokerData.getBrokerName();

//根据broker地址移除broker

it.remove();

log.info("remove brokerAddr[{}, {}] from brokerAddrTable, because channel destroyed",

brokerId, brokerAddr);

break;

}

}

//如果当前BrokerData中的Broker列表为空

if (brokerData.getBrokerAddrs().isEmpty()) {

removeBrokerName = true;

//从BrokerAddrTable移除该BrokerData

itBrokerAddrTable.remove();

log.info("remove brokerName[{}] from brokerAddrTable, because channel destroyed",

brokerData.getBrokerName());

}

}

//维护clusterAddrTable

if (brokerNameFound != null && removeBrokerName) {

Iterator<Entry<String, Set<String>>> it = this.clusterAddrTable.entrySet().iterator();

//遍历clusterAddrTable

while (it.hasNext()) {

Entry<String, Set<String>> entry = it.next();

String clusterName = entry.getKey();

Set<String> brokerNames = entry.getValue();

//根据brokerName删除cluster集群中的broker并返回是否删除成功

boolean removed = brokerNames.remove(brokerNameFound);

if (removed) {

log.info("remove brokerName[{}], clusterName[{}] from clusterAddrTable, because channel destroyed",

brokerNameFound, clusterName);

//如果cluster已经不包含任何broker则移除该集群

if (brokerNames.isEmpty()) {

log.info("remove the clusterName[{}] from clusterAddrTable, because channel destroyed and no broker in this cluster",

clusterName);

it.remove();

}

break;

}

}

}

//维护TopicQueueTable

if (removeBrokerName) {

Iterator<Entry<String, List<QueueData>>> itTopicQueueTable =

this.topicQueueTable.entrySet().iterator();

//遍历TopicQueueTable

while (itTopicQueueTable.hasNext()) {

Entry<String, List<QueueData>> entry = itTopicQueueTable.next();

//主题

String topic = entry.getKey();

//主题对应的Broker信息

List<QueueData> queueDataList = entry.getValue();

Iterator<QueueData> itQueueData = queueDataList.iterator();

//遍历Broker

while (itQueueData.hasNext()) {

QueueData queueData = itQueueData.next();

//通过brokerNameFound在queueData中查找对应的Broker信息

if (queueData.getBrokerName().equals(brokerNameFound)) {

//移除该Broker

itQueueData.remove();

log.info("remove topic[{} {}], from topicQueueTable, because channel destroyed",

topic, queueData);

}

}

//如果当前主题已经没有任何Broker,则移除该主题Topic

if (queueDataList.isEmpty()) {

itTopicQueueTable.remove();

log.info("remove topic[{}] all queue, from topicQueueTable, because channel destroyed",

topic);

}

}

}

//释放写锁

finally {

this.lock.writeLock().unlock();

}

3.3.2Broker正常关闭

在这里插入图片描述

关闭入口-BrokerController

//BrokerController#shutdown

this.unregisterBrokerAll();

//BrokerController#unregisterBrokerAll

this.brokerOuterAPI.unregisterBrokerAll(

this.brokerConfig.getBrokerClusterName(),

this.getBrokerAddr(),

this.brokerConfig.getBrokerName(),

this.brokerConfig.getBrokerId());

BrokerOuterAPI#unregisterBrokerAll

//获取NameServerAddress列表

List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();

if (nameServerAddressList != null) {

//遍历NameServer

for (String namesrvAddr : nameServerAddressList) {

try {

//注销指定Broker

this.unregisterBroker(namesrvAddr, clusterName, brokerAddr, brokerName, brokerId);

log.info("unregisterBroker OK, NamesrvAddr: {}", namesrvAddr);

} catch (Exception e) {

log.warn("unregisterBroker Exception, {}", namesrvAddr, e);

}

}

}

BrokerOuterAPI#unregisterBroker

//封装注销Broker请求头

UnRegisterBrokerRequestHeader requestHeader = new UnRegisterBrokerRequestHeader();

requestHeader.setBrokerAddr(brokerAddr);

requestHeader.setBrokerId(brokerId);

requestHeader.setBrokerName(brokerName);

requestHeader.setClusterName(clusterName);

//封装注销请求

RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UNREGISTER_BROKER, requestHeader);

//远程调用并发送请求-返回响应结果

RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, 3000);

assert response != null;

switch (response.getCode()) {

case ResponseCode.SUCCESS: {

return;

}

default:

break;

}

throw new MQBrokerException(response.getCode(), response.getRemark(), brokerAddr);

忽略Netty远程调用过程

DefaultRequestProcess#processRequest

//根据请求状态码-进行不同的处理

switch (request.getCode()) {

case RequestCode.UNREGISTER_BROKER:

return this.unregisterBroker(ctx, request);

}

DefaultRequestProcess#unregisterBroker

final RemotingCommand response = RemotingCommand.createResponseCommand(null);

//解码注销Broker的请求头

final UnRegisterBrokerRequestHeader requestHeader =

(UnRegisterBrokerRequestHeader) request.decodeCommandCustomHeader(UnRegisterBrokerRequestHeader.class);

//转发至RouteInfoManager注销Broker

this.namesrvController.getRouteInfoManager().unregisterBroker(

requestHeader.getClusterName(),

requestHeader.getBrokerAddr(),

requestHeader.getBrokerName(),

requestHeader.getBrokerId());

//设置响应状态码

response.setCode(ResponseCode.SUCCESS);

response.setRemark(null);

//返回响应结果

return response;

RouteInfoManager#unregisterBroker

try {

try {

//加入写锁-保护HashMap线程安全

this.lock.writeLock().lockInterruptibly();

//由于是从BrokerController远程调用来的请求-故不对BrokerAddr进行检查-直接进行移除

//移除brokerLiveTable中的Broker信息

BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.remove(brokerAddr);

log.info("unregisterBroker, remove from brokerLiveTable {}, {}",

brokerLiveInfo != null ? "OK" : "Failed",

brokerAddr

);

//维护filterServerTable

this.filterServerTable.remove(brokerAddr);

boolean removeBrokerName = false;

//根据brokername获取对应的BrokerData信息

BrokerData brokerData = this.brokerAddrTable.get(brokerName);

if (null != brokerData) {

//删除对应Broker信息

String addr = brokerData.getBrokerAddrs().remove(brokerId);

log.info("unregisterBroker, remove addr from brokerAddrTable {}, {}",

addr != null ? "OK" : "Failed",

brokerAddr

);

//如果当前BrokerData中已经没有其他Broker

if (brokerData.getBrokerAddrs().isEmpty()) {

//从BrokerAddrTable移除相应的Broker

this.brokerAddrTable.remove(brokerName);

log.info("unregisterBroker, remove name from brokerAddrTable OK, {}",

brokerName

);

removeBrokerName = true;

}

}

if (removeBrokerName) {

//在clusterAddrTable中通过集群名称获取对应的Broker集合

Set<String> nameSet = this.clusterAddrTable.get(clusterName);

if (nameSet != null) {

//移除该集合中的Broker

boolean removed = nameSet.remove(brokerName);

log.info("unregisterBroker, remove name from clusterAddrTable {}, {}",

removed ? "OK" : "Failed",

brokerName);

//如果该集群中的Broker集合为空-移除该集群

if (nameSet.isEmpty()) {

this.clusterAddrTable.remove(clusterName);

log.info("unregisterBroker, remove cluster from clusterAddrTable {}",

clusterName

);

}

}

//移除该Broker下的所有主题Topic

this.removeTopicByBrokerName(brokerName);

}

} finally {

//释放写锁

this.lock.writeLock().unlock();

}

} catch (Exception e) {

log.error("unregisterBroker Exception", e);

}

3.4路由发现

RocketMQ路由发现是非实时的,当Topic路由出现变化后,NameServer不会主动推送给客户端,而是由客户端定时拉取主题最新的路由。

DefaultRequestProcessor#processRequest

switch (request.getCode()) {

case RequestCode.GET_ROUTEINFO_BY_TOPIC:

return this.getRouteInfoByTopic(ctx, request);

}

DefaultRequestProcessor#getRouteInfoByTopic

//远程调用返回的响应结果

final RemotingCommand response = RemotingCommand.createResponseCommand(null);

//解析获取路由信息的请求头

final GetRouteInfoRequestHeader requestHeader =

(GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);

//调用RouteInfoManager中的方法,从路由表topicQueueTable、brokerAddrTable、filterServerTable中

//分别填充TopicRouteData的List<QueueData>、List<BrokerData>、filterServer

TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());

//如果找到主题对应你的路由信息并且该主题为顺序消息

//则从NameServer KVConfig中获取关于顺序消息相关的配置填充路由信息

if (topicRouteData != null) {

if (this.namesrvController.getNamesrvConfig().isOrderMessageEnable()) {

String orderTopicConf =

this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG,

requestHeader.getTopic());

topicRouteData.setOrderTopicConf(orderTopicConf);

}

byte[] content = topicRouteData.encode();

//填充响应体和状态码

response.setBody(content);

response.setCode(ResponseCode.SUCCESS);

response.setRemark(null);

return response;

}

//设置响应码-主题不存在

response.setCode(ResponseCode.TOPIC_NOT_EXIST);

response.setRemark("No topic route info in name server for the topic: " + requestHeader.getTopic()

+ FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));

return response;

RouteInfoManager#pickupTopicRouteData

//创建主题路由信息对象

TopicRouteData topicRouteData = new TopicRouteData();

//是否找到对应队列的标识

boolean foundQueueData = false;

//是否找到对应Broker的标识

boolean foundBrokerData = false;

Set<String> brokerNameSet = new HashSet<String>();

List<BrokerData> brokerDataList = new LinkedList<BrokerData>();

topicRouteData.setBrokerDatas(brokerDataList);

HashMap<String, List<String>> filterServerMap = new HashMap<String, List<String>>();

topicRouteData.setFilterServerTable(filterServerMap);

try {

try {

//加入读锁

this.lock.readLock().lockInterruptibly();

//从topicQueueTable中通过主题topic查找Broker列表

List<QueueData> queueDataList = this.topicQueueTable.get(topic);

if (queueDataList != null) {

//填充Broker信息到topicRouteData.QueueDatas

topicRouteData.setQueueDatas(queueDataList);

foundQueueData = true;

Iterator<QueueData> it = queueDataList.iterator();

while (it.hasNext()) {

QueueData qd = it.next();

brokerNameSet.add(qd.getBrokerName());

}

//遍历Broker集合

for (String brokerName : brokerNameSet) {

//从brokerAddrTable中通过BrokerName进行查找

BrokerData brokerData = this.brokerAddrTable.get(brokerName);

if (null != brokerData) {

BrokerData brokerDataClone = new BrokerData(brokerData.getCluster(), brokerData.getBrokerName(), (HashMap<Long, String>) brokerData.getBrokerAddrs().clone());

//填充BrokerData信息到brokerDatas

brokerDataList.add(brokerDataClone);

foundBrokerData = true;

for (final String brokerAddr : brokerDataClone.getBrokerAddrs().values()) {

List<String> filterServerList = this.filterServerTable.get(brokerAddr);

filterServerMap.put(brokerAddr, filterServerList);

}

}

}

}

} finally {

//释放读锁

this.lock.readLock().unlock();

}

} catch (Exception e) {

log.error("pickupTopicRouteData Exception", e);

}

log.debug("pickupTopicRouteData {} {}", topic, topicRouteData);

if (foundBrokerData && foundQueueData) {

return topicRouteData;

}

return null;

3.5总结

在这里插入图片描述

路由注册:Broker启动之后会将自己的信息注册到NameServer,NameServer收到Broker心跳包之后会更新BrokerLiveTable中对应的Broker的相关信息,特别记录lastUpdateTimestamp

心跳机制:Broker会每隔30s向NameServer发送心跳包,告诉NameServer自己的运行状态良好。

路由发现:NameServer中的主题Topic信息更新之后,NameServer并不会实时将数据推送给订阅方Producer,而需要Producer自行通过Topic来查询路由信息。

路由删除:

  • Broker异常宕机:NameServer每隔10s扫描BrokerLiveTable,检查表中各个Broker最新的发送心跳包时间,如果这个时间与当前系统时间差超过120s,则该Broker会被视作已掉线,NameServer会将这个Broker从表中删除,并且更新各个表中与该Broker有关的所有信息。
  • Broker正常下线:NameServer接受到BrokerController发送来的UNREGISTER_BROKER请求之后,会执行上述步骤,对路由信息进行更新。

本文仅作为个人学习使用,如有不足或者错误请指正!

以上就是java开发RocketMQ之NameServer路由管理源码分析的详细内容,更多关于RocketMQ之NameServer路由管理的资料请关注其它相关文章!

以上是 java开发RocketMQ之NameServer路由管理源码分析 的全部内容, 来源链接: utcz.com/p/250716.html

回到顶部