聊聊rocketmq的sendHeartbeatToAllBrokerWithLock

编程

本文主要研究一下rocketmq的sendHeartbeatToAllBrokerWithLock

sendHeartbeatToAllBrokerWithLock

rocketmq-client-4.6.0-sources.jar!/org/apache/rocketmq/client/impl/factory/MQClientInstance.java

public class MQClientInstance {

private final static long LOCK_TIMEOUT_MILLIS = 3000;

private final InternalLogger log = ClientLogger.getLog();

//......

public void sendHeartbeatToAllBrokerWithLock() {

if (this.lockHeartbeat.tryLock()) {

try {

this.sendHeartbeatToAllBroker();

this.uploadFilterClassSource();

} catch (final Exception e) {

log.error("sendHeartbeatToAllBroker exception", e);

} finally {

this.lockHeartbeat.unlock();

}

} else {

log.warn("lock heartBeat, but failed.");

}

}

private void sendHeartbeatToAllBroker() {

final HeartbeatData heartbeatData = this.prepareHeartbeatData();

final boolean producerEmpty = heartbeatData.getProducerDataSet().isEmpty();

final boolean consumerEmpty = heartbeatData.getConsumerDataSet().isEmpty();

if (producerEmpty && consumerEmpty) {

log.warn("sending heartbeat, but no consumer and no producer");

return;

}

if (!this.brokerAddrTable.isEmpty()) {

long times = this.sendHeartbeatTimesTotal.getAndIncrement();

Iterator<Entry<String, HashMap<Long, String>>> it = this.brokerAddrTable.entrySet().iterator();

while (it.hasNext()) {

Entry<String, HashMap<Long, String>> entry = it.next();

String brokerName = entry.getKey();

HashMap<Long, String> oneTable = entry.getValue();

if (oneTable != null) {

for (Map.Entry<Long, String> entry1 : oneTable.entrySet()) {

Long id = entry1.getKey();

String addr = entry1.getValue();

if (addr != null) {

if (consumerEmpty) {

if (id != MixAll.MASTER_ID)

continue;

}

try {

int version = this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, 3000);

if (!this.brokerVersionTable.containsKey(brokerName)) {

this.brokerVersionTable.put(brokerName, new HashMap<String, Integer>(4));

}

this.brokerVersionTable.get(brokerName).put(addr, version);

if (times % 20 == 0) {

log.info("send heart beat to broker[{} {} {}] success", brokerName, id, addr);

log.info(heartbeatData.toString());

}

} catch (Exception e) {

if (this.isBrokerInNameServer(addr)) {

log.info("send heart beat to broker[{} {} {}] failed", brokerName, id, addr, e);

} else {

log.info("send heart beat to broker[{} {} {}] exception, because the broker not up, forget it", brokerName,

id, addr, e);

}

}

}

}

}

}

}

}

private void uploadFilterClassSource() {

Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();

while (it.hasNext()) {

Entry<String, MQConsumerInner> next = it.next();

MQConsumerInner consumer = next.getValue();

if (ConsumeType.CONSUME_PASSIVELY == consumer.consumeType()) {

Set<SubscriptionData> subscriptions = consumer.subscriptions();

for (SubscriptionData sub : subscriptions) {

if (sub.isClassFilterMode() && sub.getFilterClassSource() != null) {

final String consumerGroup = consumer.groupName();

final String className = sub.getSubString();

final String topic = sub.getTopic();

final String filterClassSource = sub.getFilterClassSource();

try {

this.uploadFilterClassToAllFilterServer(consumerGroup, className, topic, filterClassSource);

} catch (Exception e) {

log.error("uploadFilterClassToAllFilterServer Exception", e);

}

}

}

}

}

}

//......

}

  • MQClientInstance的sendHeartbeatToAllBrokerWithLock方法首先会执行lockHeartbeat.tryLock(),然后执行sendHeartbeatToAllBroker()及uploadFilterClassSource(),最后在finally中执行lockHeartbeat.unlock()
  • sendHeartbeatToAllBroker方法会通过prepareHeartbeatData构建heartbeatData,之后遍历brokerAddrTable,执行mQClientAPIImpl.sendHearbeat(addr, heartbeatData, 3000)
  • uploadFilterClassSource方法会遍历consumerTable,对于consumeType为ConsumeType.CONSUME_PASSIVELY类型的,会遍历其subscriptions,对于sub.isClassFilterMode()且sub.getFilterClassSource()不为null的执行uploadFilterClassToAllFilterServer方法

sendHearbeat

rocketmq-client-4.6.0-sources.jar!/org/apache/rocketmq/client/impl/MQClientAPIImpl.java

public class MQClientAPIImpl {

private final static InternalLogger log = ClientLogger.getLog();

private static boolean sendSmartMsg =

Boolean.parseBoolean(System.getProperty("org.apache.rocketmq.client.sendSmartMsg", "true"));

static {

System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));

}

//......

public int sendHearbeat(

final String addr,

final HeartbeatData heartbeatData,

final long timeoutMillis

) throws RemotingException, MQBrokerException, InterruptedException {

RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.HEART_BEAT, null);

request.setLanguage(clientConfig.getLanguage());

request.setBody(heartbeatData.encode());

RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);

assert response != null;

switch (response.getCode()) {

case ResponseCode.SUCCESS: {

return response.getVersion();

}

default:

break;

}

throw new MQBrokerException(response.getCode(), response.getRemark());

}

//......

}

  • MQClientAPIImpl的sendHearbeat方法首先构建RemotingCommand.createRequestCommand(RequestCode.HEART_BEAT, null),之后通过remotingClient.invokeSync(addr, request, timeoutMillis)执行请求,成功的话返回response.getVersion(),否则抛出MQBrokerException(response.getCode(), response.getRemark())

processRequest

rocketmq-all-4.6.0-source-release/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java

public class ClientManageProcessor implements NettyRequestProcessor {

private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);

private final BrokerController brokerController;

public ClientManageProcessor(final BrokerController brokerController) {

this.brokerController = brokerController;

}

//......

public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)

throws RemotingCommandException {

switch (request.getCode()) {

case RequestCode.HEART_BEAT:

return this.heartBeat(ctx, request);

case RequestCode.UNREGISTER_CLIENT:

return this.unregisterClient(ctx, request);

case RequestCode.CHECK_CLIENT_CONFIG:

return this.checkClientConfig(ctx, request);

default:

break;

}

return null;

}

public RemotingCommand heartBeat(ChannelHandlerContext ctx, RemotingCommand request) {

RemotingCommand response = RemotingCommand.createResponseCommand(null);

HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class);

ClientChannelInfo clientChannelInfo = new ClientChannelInfo(

ctx.channel(),

heartbeatData.getClientID(),

request.getLanguage(),

request.getVersion()

);

for (ConsumerData data : heartbeatData.getConsumerDataSet()) {

SubscriptionGroupConfig subscriptionGroupConfig =

this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(

data.getGroupName());

boolean isNotifyConsumerIdsChangedEnable = true;

if (null != subscriptionGroupConfig) {

isNotifyConsumerIdsChangedEnable = subscriptionGroupConfig.isNotifyConsumerIdsChangedEnable();

int topicSysFlag = 0;

if (data.isUnitMode()) {

topicSysFlag = TopicSysFlag.buildSysFlag(false, true);

}

String newTopic = MixAll.getRetryTopic(data.getGroupName());

this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(

newTopic,

subscriptionGroupConfig.getRetryQueueNums(),

PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);

}

boolean changed = this.brokerController.getConsumerManager().registerConsumer(

data.getGroupName(),

clientChannelInfo,

data.getConsumeType(),

data.getMessageModel(),

data.getConsumeFromWhere(),

data.getSubscriptionDataSet(),

isNotifyConsumerIdsChangedEnable

);

if (changed) {

log.info("registerConsumer info changed {} {}",

data.toString(),

RemotingHelper.parseChannelRemoteAddr(ctx.channel())

);

}

}

for (ProducerData data : heartbeatData.getProducerDataSet()) {

this.brokerController.getProducerManager().registerProducer(data.getGroupName(),

clientChannelInfo);

}

response.setCode(ResponseCode.SUCCESS);

response.setRemark(null);

return response;

}

//......

}

  • processRequest方法对于RequestCode.HEART_BEAT会执行heartBeat(ctx, request)方法,该方法会解析body为heartbeatData,之后遍历heartbeatData的consumerData及producerData做不同处理,最后返回处理结果
  • 对于consumerData,会先取出subscriptionGroupConfig,对于config不为null的会执行brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod方法;最后执行brokerController.getConsumerManager().registerConsumer方法
  • 对于producerData,则执行brokerController.getProducerManager().registerProducer(data.getGroupName(), clientChannelInfo)方法

小结

  • MQClientInstance的sendHeartbeatToAllBrokerWithLock方法首先会执行lockHeartbeat.tryLock(),然后执行sendHeartbeatToAllBroker()及uploadFilterClassSource(),最后在finally中执行lockHeartbeat.unlock()
  • sendHeartbeatToAllBroker方法会通过prepareHeartbeatData构建heartbeatData,之后遍历brokerAddrTable,执行mQClientAPIImpl.sendHearbeat(addr, heartbeatData, 3000)
  • uploadFilterClassSource方法会遍历consumerTable,对于consumeType为ConsumeType.CONSUME_PASSIVELY类型的,会遍历其subscriptions,对于sub.isClassFilterMode()且sub.getFilterClassSource()不为null的执行uploadFilterClassToAllFilterServer方法

doc

  • MQClientInstance

以上是 聊聊rocketmq的sendHeartbeatToAllBrokerWithLock 的全部内容, 来源链接: utcz.com/z/511597.html

回到顶部