聊聊rocketmq的updateTopicRouteInfoFromNameServer

编程

本文主要研究一下rocketmq的updateTopicRouteInfoFromNameServer

updateTopicRouteInfoFromNameServer

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/factory/MQClientInstance.java

public class MQClientInstance {

private final static long LOCK_TIMEOUT_MILLIS = 3000;

private final InternalLogger log = ClientLogger.getLog();

private final ClientConfig clientConfig;

private final int instanceIndex;

private final String clientId;

private final long bootTimestamp = System.currentTimeMillis();

private final ConcurrentMap<String/* group */, MQProducerInner> producerTable = new ConcurrentHashMap<String, MQProducerInner>();

private final ConcurrentMap<String/* group */, MQConsumerInner> consumerTable = new ConcurrentHashMap<String, MQConsumerInner>();

//......

public void updateTopicRouteInfoFromNameServer() {

Set<String> topicList = new HashSet<String>();

// Consumer

{

Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();

while (it.hasNext()) {

Entry<String, MQConsumerInner> entry = it.next();

MQConsumerInner impl = entry.getValue();

if (impl != null) {

Set<SubscriptionData> subList = impl.subscriptions();

if (subList != null) {

for (SubscriptionData subData : subList) {

topicList.add(subData.getTopic());

}

}

}

}

}

// Producer

{

Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();

while (it.hasNext()) {

Entry<String, MQProducerInner> entry = it.next();

MQProducerInner impl = entry.getValue();

if (impl != null) {

Set<String> lst = impl.getPublishTopicList();

topicList.addAll(lst);

}

}

}

for (String topic : topicList) {

this.updateTopicRouteInfoFromNameServer(topic);

}

}

public boolean updateTopicRouteInfoFromNameServer(final String topic) {

return updateTopicRouteInfoFromNameServer(topic, false, null);

}

public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,

DefaultMQProducer defaultMQProducer) {

try {

if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {

try {

TopicRouteData topicRouteData;

if (isDefault && defaultMQProducer != null) {

topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),

1000 * 3);

if (topicRouteData != null) {

for (QueueData data : topicRouteData.getQueueDatas()) {

int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());

data.setReadQueueNums(queueNums);

data.setWriteQueueNums(queueNums);

}

}

} else {

topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);

}

if (topicRouteData != null) {

TopicRouteData old = this.topicRouteTable.get(topic);

boolean changed = topicRouteDataIsChange(old, topicRouteData);

if (!changed) {

changed = this.isNeedUpdateTopicRouteInfo(topic);

} else {

log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);

}

if (changed) {

TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();

for (BrokerData bd : topicRouteData.getBrokerDatas()) {

this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());

}

// Update Pub info

{

TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);

publishInfo.setHaveTopicRouterInfo(true);

Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();

while (it.hasNext()) {

Entry<String, MQProducerInner> entry = it.next();

MQProducerInner impl = entry.getValue();

if (impl != null) {

impl.updateTopicPublishInfo(topic, publishInfo);

}

}

}

// Update sub info

{

Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);

Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();

while (it.hasNext()) {

Entry<String, MQConsumerInner> entry = it.next();

MQConsumerInner impl = entry.getValue();

if (impl != null) {

impl.updateTopicSubscribeInfo(topic, subscribeInfo);

}

}

}

log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData);

this.topicRouteTable.put(topic, cloneTopicRouteData);

return true;

}

} else {

log.warn("updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {}", topic);

}

} catch (Exception e) {

if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) && !topic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC)) {

log.warn("updateTopicRouteInfoFromNameServer Exception", e);

}

} finally {

this.lockNamesrv.unlock();

}

} else {

log.warn("updateTopicRouteInfoFromNameServer tryLock timeout {}ms", LOCK_TIMEOUT_MILLIS);

}

} catch (InterruptedException e) {

log.warn("updateTopicRouteInfoFromNameServer Exception", e);

}

return false;

}

//......

}

  • updateTopicRouteInfoFromNameServer首先从consumerTable及producerTable获取topicList,然后遍历topicList执行updateTopicRouteInfoFromNameServer,最后执行的是updateTopicRouteInfoFromNameServer(topic, false, null)
  • 这里会执行mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3)获取topicRouteData然后与topicRouteTable中的TopicRouteData进行对比,先通过topicRouteDataIsChange判断是否有变化,没有的话再通过isNeedUpdateTopicRouteInfo进一步判断
  • 若有变化则更新brokerAddrTable,遍历producerTable执行impl.updateTopicPublishInfo(topic, publishInfo);遍历consumerTable执行impl.updateTopicSubscribeInfo(topic, subscribeInfo),最后将cloneTopicRouteData更新到topicRouteTable

getTopicRouteInfoFromNameServer

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/MQClientAPIImpl.java

public class MQClientAPIImpl {

//......

public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis)

throws RemotingException, MQClientException, InterruptedException {

return getTopicRouteInfoFromNameServer(topic, timeoutMillis, true);

}

public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis,

boolean allowTopicNotExist) throws MQClientException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {

GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader();

requestHeader.setTopic(topic);

RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINTO_BY_TOPIC, requestHeader);

RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);

assert response != null;

switch (response.getCode()) {

case ResponseCode.TOPIC_NOT_EXIST: {

if (allowTopicNotExist && !topic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC)) {

log.warn("get Topic [{}] RouteInfoFromNameServer is not exist value", topic);

}

break;

}

case ResponseCode.SUCCESS: {

byte[] body = response.getBody();

if (body != null) {

return TopicRouteData.decode(body, TopicRouteData.class);

}

}

default:

break;

}

throw new MQClientException(response.getCode(), response.getRemark());

}

//......

}

  • getTopicRouteInfoFromNameServer方法构造RequestCode.GET_ROUTEINTO_BY_TOPIC,若response.getCode为ResponseCode.SUCCESS,则使用TopicRouteData.decode(body, TopicRouteData.class)解析为TopicRouteData;这里remotingClient.invokeSync的addr参数为null

invokeSync

rocketmq-remoting-4.5.2-sources.jar!/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java

public class NettyRemotingClient extends NettyRemotingAbstract implements RemotingClient {

//......

private final AtomicReference<List<String>> namesrvAddrList = new AtomicReference<List<String>>();

private final AtomicReference<String> namesrvAddrChoosed = new AtomicReference<String>();

private final AtomicInteger namesrvIndex = new AtomicInteger(initValueIndex());

//......

public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)

throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {

long beginStartTime = System.currentTimeMillis();

final Channel channel = this.getAndCreateChannel(addr);

if (channel != null && channel.isActive()) {

try {

doBeforeRpcHooks(addr, request);

long costTime = System.currentTimeMillis() - beginStartTime;

if (timeoutMillis < costTime) {

throw new RemotingTimeoutException("invokeSync call timeout");

}

RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);

doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(channel), request, response);

return response;

} catch (RemotingSendRequestException e) {

log.warn("invokeSync: send request exception, so close the channel[{}]", addr);

this.closeChannel(addr, channel);

throw e;

} catch (RemotingTimeoutException e) {

if (nettyClientConfig.isClientCloseSocketIfTimeout()) {

this.closeChannel(addr, channel);

log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);

}

log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);

throw e;

}

} else {

this.closeChannel(addr, channel);

throw new RemotingConnectException(addr);

}

}

private Channel getAndCreateChannel(final String addr) throws InterruptedException {

if (null == addr) {

return getAndCreateNameserverChannel();

}

ChannelWrapper cw = this.channelTables.get(addr);

if (cw != null && cw.isOK()) {

return cw.getChannel();

}

return this.createChannel(addr);

}

private Channel getAndCreateNameserverChannel() throws InterruptedException {

String addr = this.namesrvAddrChoosed.get();

if (addr != null) {

ChannelWrapper cw = this.channelTables.get(addr);

if (cw != null && cw.isOK()) {

return cw.getChannel();

}

}

final List<String> addrList = this.namesrvAddrList.get();

if (this.lockNamesrvChannel.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {

try {

addr = this.namesrvAddrChoosed.get();

if (addr != null) {

ChannelWrapper cw = this.channelTables.get(addr);

if (cw != null && cw.isOK()) {

return cw.getChannel();

}

}

if (addrList != null && !addrList.isEmpty()) {

for (int i = 0; i < addrList.size(); i++) {

int index = this.namesrvIndex.incrementAndGet();

index = Math.abs(index);

index = index % addrList.size();

String newAddr = addrList.get(index);

this.namesrvAddrChoosed.set(newAddr);

log.info("new name server is chosen. OLD: {} , NEW: {}. namesrvIndex = {}", addr, newAddr, namesrvIndex);

Channel channelNew = this.createChannel(newAddr);

if (channelNew != null) {

return channelNew;

}

}

}

} catch (Exception e) {

log.error("getAndCreateNameserverChannel: create name server channel exception", e);

} finally {

this.lockNamesrvChannel.unlock();

}

} else {

log.warn("getAndCreateNameserverChannel: try to lock name server, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);

}

return null;

}

private static int initValueIndex() {

Random r = new Random();

return Math.abs(r.nextInt() % 999) % 999;

}

//......

}

  • invokeSync首先通过getAndCreateChannel获取channel,而getAndCreateChannel方法在addr为null时执行的是getAndCreateNameserverChannel;这里取的是namesrvAddrChoosed.get(),若不为null则返回,为null的话则先从namesrvIndex.incrementAndGet()获取index,取绝对值,然后再对addrList.size()取余数作为选中的namesrv的地址,更新到namesrvAddrChoosed;namesrvIndex的初始值为initValueIndex,它通过Math.abs(r.nextInt() % 999) % 999算出一个随机初始值

小结

  • MQClientInstance的updateTopicRouteInfoFromNameServer首先从consumerTable及producerTable获取topicList,然后遍历topicList执行updateTopicRouteInfoFromNameServer,最后执行的是updateTopicRouteInfoFromNameServer(topic, false, null)
  • 这里会执行mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3)获取topicRouteData然后与topicRouteTable中的TopicRouteData进行对比,先通过topicRouteDataIsChange判断是否有变化,没有的话再通过isNeedUpdateTopicRouteInfo进一步判断
  • 若有变化则更新brokerAddrTable,遍历producerTable执行impl.updateTopicPublishInfo(topic, publishInfo);遍历consumerTable执行impl.updateTopicSubscribeInfo(topic, subscribeInfo),最后将cloneTopicRouteData更新到topicRouteTable

doc

  • MQClientInstance

以上是 聊聊rocketmq的updateTopicRouteInfoFromNameServer 的全部内容, 来源链接: utcz.com/z/511408.html

回到顶部