聊聊rocketmq的updateConsumeOffsetToBroker

编程

本文主要研究一下rocketmq的updateConsumeOffsetToBroker

updateConsumeOffsetToBroker

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java

public class RemoteBrokerOffsetStore implements OffsetStore {

//......

/**

* Update the Consumer Offset synchronously, once the Master is off, updated to Slave,

* here need to be optimized.

*/

@Override

public void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException,

MQBrokerException, InterruptedException, MQClientException {

FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());

if (null == findBrokerResult) {

this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());

findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());

}

if (findBrokerResult != null) {

UpdateConsumerOffsetRequestHeader requestHeader = new UpdateConsumerOffsetRequestHeader();

requestHeader.setTopic(mq.getTopic());

requestHeader.setConsumerGroup(this.groupName);

requestHeader.setQueueId(mq.getQueueId());

requestHeader.setCommitOffset(offset);

if (isOneway) {

this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffsetOneway(

findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);

} else {

this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffset(

findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);

}

} else {

throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);

}

}

//......

}

  • RemoteBrokerOffsetStore的updateConsumeOffsetToBroker方法首先通过mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName())获取findBrokerResult
  • 若返回null,则执行mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic()),然后再执行mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName())获取findBrokerResult
  • 之后对于findBrokerResult不为null的情况构建UpdateConsumerOffsetRequestHeader,然后执行mQClientFactory.getMQClientAPIImpl().updateConsumerOffsetOneway或者mQClientFactory.getMQClientAPIImpl().updateConsumerOffset

findBrokerAddressInAdmin

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/factory/MQClientInstance.java

public class MQClientInstance {

//......

public FindBrokerResult findBrokerAddressInAdmin(final String brokerName) {

String brokerAddr = null;

boolean slave = false;

boolean found = false;

HashMap<Long/* brokerId */, String/* address */> map = this.brokerAddrTable.get(brokerName);

if (map != null && !map.isEmpty()) {

for (Map.Entry<Long, String> entry : map.entrySet()) {

Long id = entry.getKey();

brokerAddr = entry.getValue();

if (brokerAddr != null) {

found = true;

if (MixAll.MASTER_ID == id) {

slave = false;

} else {

slave = true;

}

break;

}

} // end of for

}

if (found) {

return new FindBrokerResult(brokerAddr, slave, findBrokerVersion(brokerName, brokerAddr));

}

return null;

}

//......

}

  • findBrokerAddressInAdmin方法首先从brokerAddrTable获取指定brokerName的brokerId及address的map,然后遍历map,对于brokerAddr不为null的标记found为true,标记brokerId为MixAll.MASTER_ID的slave为false,否则为true,最后跳出循环;若found为true则构造FindBrokerResult返回,否则返回null

小结

  • RemoteBrokerOffsetStore的updateConsumeOffsetToBroker方法首先通过mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName())获取findBrokerResult
  • 若返回null,则执行mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic()),然后再执行mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName())获取findBrokerResult
  • 之后对于findBrokerResult不为null的情况构建UpdateConsumerOffsetRequestHeader,然后执行mQClientFactory.getMQClientAPIImpl().updateConsumerOffsetOneway或者mQClientFactory.getMQClientAPIImpl().updateConsumerOffset

doc

  • RemoteBrokerOffsetStore

以上是 聊聊rocketmq的updateConsumeOffsetToBroker 的全部内容, 来源链接: utcz.com/z/511459.html

回到顶部