聊聊rocketmq的adjustThreadPoolNumsThreshold

编程

本文主要研究一下rocketmq的adjustThreadPoolNumsThreshold

DefaultMQPushConsumer

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java

public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer {

private final InternalLogger log = ClientLogger.getLog();

//......

/**

* Threshold for dynamic adjustment of the number of thread pool

*/

private long adjustThreadPoolNumsThreshold = 100000;

public long getAdjustThreadPoolNumsThreshold() {

return adjustThreadPoolNumsThreshold;

}

public void setAdjustThreadPoolNumsThreshold(long adjustThreadPoolNumsThreshold) {

this.adjustThreadPoolNumsThreshold = adjustThreadPoolNumsThreshold;

}

//......

}

  • DefaultMQPushConsumer定义了adjustThreadPoolNumsThreshold属性,默认为100000

DefaultMQPushConsumerImpl

ocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java

public class DefaultMQPushConsumerImpl implements MQConsumerInner {

//......

public void adjustThreadPool() {

long computeAccTotal = this.computeAccumulationTotal();

long adjustThreadPoolNumsThreshold = this.defaultMQPushConsumer.getAdjustThreadPoolNumsThreshold();

long incThreshold = (long) (adjustThreadPoolNumsThreshold * 1.0);

long decThreshold = (long) (adjustThreadPoolNumsThreshold * 0.8);

if (computeAccTotal >= incThreshold) {

this.consumeMessageService.incCorePoolSize();

}

if (computeAccTotal < decThreshold) {

this.consumeMessageService.decCorePoolSize();

}

}

private long computeAccumulationTotal() {

long msgAccTotal = 0;

ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable = this.rebalanceImpl.getProcessQueueTable();

Iterator<Entry<MessageQueue, ProcessQueue>> it = processQueueTable.entrySet().iterator();

while (it.hasNext()) {

Entry<MessageQueue, ProcessQueue> next = it.next();

ProcessQueue value = next.getValue();

msgAccTotal += value.getMsgAccCnt();

}

return msgAccTotal;

}

//......

}

  • adjustThreadPool方法会计算computeAccTotal,然后使用adjustThreadPoolNumsThreshold * 1.0作为incThreshold,使用adjustThreadPoolNumsThreshold * 0.8作为decThreshold;对于computeAccTotal大于等于incThreshold的,执行consumeMessageService.incCorePoolSize();对于computeAccTotal小于decThreshold的执行consumeMessageService.decCorePoolSize()

MQClientInstance

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/factory/MQClientInstance.java

public class MQClientInstance {

//......

public void start() throws MQClientException {

synchronized (this) {

switch (this.serviceState) {

case CREATE_JUST:

this.serviceState = ServiceState.START_FAILED;

// If not specified,looking address from name server

if (null == this.clientConfig.getNamesrvAddr()) {

this.mQClientAPIImpl.fetchNameServerAddr();

}

// Start request-response channel

this.mQClientAPIImpl.start();

// Start various schedule tasks

this.startScheduledTask();

// Start pull service

this.pullMessageService.start();

// Start rebalance service

this.rebalanceService.start();

// Start push service

this.defaultMQProducer.getDefaultMQProducerImpl().start(false);

log.info("the client factory [{}] start OK", this.clientId);

this.serviceState = ServiceState.RUNNING;

break;

case RUNNING:

break;

case SHUTDOWN_ALREADY:

break;

case START_FAILED:

throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);

default:

break;

}

}

}

private void startScheduledTask() {

//......

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override

public void run() {

try {

MQClientInstance.this.adjustThreadPool();

} catch (Exception e) {

log.error("ScheduledTask adjustThreadPool exception", e);

}

}

}, 1, 1, TimeUnit.MINUTES);

}

public void adjustThreadPool() {

Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();

while (it.hasNext()) {

Entry<String, MQConsumerInner> entry = it.next();

MQConsumerInner impl = entry.getValue();

if (impl != null) {

try {

if (impl instanceof DefaultMQPushConsumerImpl) {

DefaultMQPushConsumerImpl dmq = (DefaultMQPushConsumerImpl) impl;

dmq.adjustThreadPool();

}

} catch (Exception e) {

}

}

}

}

//......

}

  • MQClientInstance的start方法对于CREATE_JUST状态会执行startScheduledTask()方法,后者会注册一个定时任务,每隔1分钟执行一次adjustThreadPool方法;adjustThreadPool方法则遍历consumerTable的MQConsumerInner,对于DefaultMQPushConsumerImpl类型的MQConsumerInner执行adjustThreadPool方法

小结

DefaultMQPushConsumer定义了adjustThreadPoolNumsThreshold属性,默认为100000;MQClientInstance的start方法对于CREATE_JUST状态会执行startScheduledTask()方法,后者会注册一个定时任务,每隔1分钟执行一次adjustThreadPool方法;adjustThreadPool方法则遍历consumerTable的MQConsumerInner,对于DefaultMQPushConsumerImpl类型的MQConsumerInner执行adjustThreadPool方法

doc

  • DefaultMQPushConsumer

以上是 聊聊rocketmq的adjustThreadPoolNumsThreshold 的全部内容, 来源链接: utcz.com/z/510871.html

回到顶部