聊聊rocketmq的pullThresholdForTopic

编程

本文主要研究一下rocketmq的pullThresholdForTopic

pullThresholdForTopic

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java

public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer {

//......

/**

* Flow control threshold on topic level, default value is -1(Unlimited)

* <p>

* The value of {@code pullThresholdForQueue} will be overwrote and calculated based on

* {@code pullThresholdForTopic} if it is"t unlimited

* <p>

* For example, if the value of pullThresholdForTopic is 1000 and 10 message queues are assigned to this consumer,

* then pullThresholdForQueue will be set to 100

*/

private int pullThresholdForTopic = -1;

/**

* Limit the cached message size on topic level, default value is -1 MiB(Unlimited)

* <p>

* The value of {@code pullThresholdSizeForQueue} will be overwrote and calculated based on

* {@code pullThresholdSizeForTopic} if it is"t unlimited

* <p>

* For example, if the value of pullThresholdSizeForTopic is 1000 MiB and 10 message queues are

* assigned to this consumer, then pullThresholdSizeForQueue will be set to 100 MiB

*/

private int pullThresholdSizeForTopic = -1;

//......

public int getPullThresholdForTopic() {

return pullThresholdForTopic;

}

public void setPullThresholdForTopic(final int pullThresholdForTopic) {

this.pullThresholdForTopic = pullThresholdForTopic;

}

public int getPullThresholdSizeForTopic() {

return pullThresholdSizeForTopic;

}

public void setPullThresholdSizeForTopic(final int pullThresholdSizeForTopic) {

this.pullThresholdSizeForTopic = pullThresholdSizeForTopic;

}

//......

}

  • DefaultMQPushConsumer定义了pullThresholdForTopic(默认值-1)、pullThresholdSizeForTopic(默认值-1)属性

checkConfig

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java

public class DefaultMQPushConsumerImpl implements MQConsumerInner {

//......

private void checkConfig() throws MQClientException {

Validators.checkGroup(this.defaultMQPushConsumer.getConsumerGroup());

//......

// pullThresholdForTopic

if (this.defaultMQPushConsumer.getPullThresholdForTopic() != -1) {

if (this.defaultMQPushConsumer.getPullThresholdForTopic() < 1 || this.defaultMQPushConsumer.getPullThresholdForTopic() > 6553500) {

throw new MQClientException(

"pullThresholdForTopic Out of range [1, 6553500]"

+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),

null);

}

}

if (this.defaultMQPushConsumer.getPullThresholdSizeForTopic() != -1) {

// pullThresholdSizeForTopic

if (this.defaultMQPushConsumer.getPullThresholdSizeForTopic() < 1 || this.defaultMQPushConsumer.getPullThresholdSizeForTopic() > 102400) {

throw new MQClientException(

"pullThresholdSizeForTopic Out of range [1, 102400]"

+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),

null);

}

}

//......

}

//......

}

  • checkConfig方法要求defaultMQPushConsumer.getPullThresholdForTopic()在值非-1时必须大于等于1且小于等于6553500;defaultMQPushConsumer.getPullThresholdSizeForTopic()在值非-1时必须大于等于1且小于等于102400

messageQueueChanged

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java

public class RebalancePushImpl extends RebalanceImpl {

private final static long UNLOCK_DELAY_TIME_MILLS = Long.parseLong(System.getProperty("rocketmq.client.unlockDelayTimeMills", "20000"));

private final DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;

public RebalancePushImpl(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl) {

this(null, null, null, null, defaultMQPushConsumerImpl);

}

public RebalancePushImpl(String consumerGroup, MessageModel messageModel,

AllocateMessageQueueStrategy allocateMessageQueueStrategy,

MQClientInstance mQClientFactory, DefaultMQPushConsumerImpl defaultMQPushConsumerImpl) {

super(consumerGroup, messageModel, allocateMessageQueueStrategy, mQClientFactory);

this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;

}

@Override

public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {

/**

* When rebalance result changed, should update subscription"s version to notify broker.

* Fix: inconsistency subscription may lead to consumer miss messages.

*/

SubscriptionData subscriptionData = this.subscriptionInner.get(topic);

long newVersion = System.currentTimeMillis();

log.info("{} Rebalance changed, also update version: {}, {}", topic, subscriptionData.getSubVersion(), newVersion);

subscriptionData.setSubVersion(newVersion);

int currentQueueCount = this.processQueueTable.size();

if (currentQueueCount != 0) {

int pullThresholdForTopic = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getPullThresholdForTopic();

if (pullThresholdForTopic != -1) {

int newVal = Math.max(1, pullThresholdForTopic / currentQueueCount);

log.info("The pullThresholdForQueue is changed from {} to {}",

this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getPullThresholdForQueue(), newVal);

this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().setPullThresholdForQueue(newVal);

}

int pullThresholdSizeForTopic = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getPullThresholdSizeForTopic();

if (pullThresholdSizeForTopic != -1) {

int newVal = Math.max(1, pullThresholdSizeForTopic / currentQueueCount);

log.info("The pullThresholdSizeForQueue is changed from {} to {}",

this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getPullThresholdSizeForQueue(), newVal);

this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().setPullThresholdSizeForQueue(newVal);

}

}

// notify broker

this.getmQClientFactory().sendHeartbeatToAllBrokerWithLock();

}

//......

}

  • messageQueueChanged方法在currentQueueCount不为0的时候,且pullThresholdForTopic、pullThresholdSizeForTopic不为-1的时候会重新设置pullThresholdForQueue、pullThresholdSizeForQueue;具体的规则是pullThresholdForQueue取Math.max(1, pullThresholdForTopic / currentQueueCount),pullThresholdSizeForQueue取Math.max(1, pullThresholdSizeForTopic / currentQueueCount)

小结

  • DefaultMQPushConsumer定义了pullThresholdForTopic(默认值-1)、pullThresholdSizeForTopic(默认值-1)属性
  • checkConfig方法要求defaultMQPushConsumer.getPullThresholdForTopic()在值非-1时必须大于等于1且小于等于6553500;defaultMQPushConsumer.getPullThresholdSizeForTopic()在值非-1时必须大于等于1且小于等于102400
  • messageQueueChanged方法在currentQueueCount不为0的时候,且pullThresholdForTopic、pullThresholdSizeForTopic不为-1的时候会重新设置pullThresholdForQueue、pullThresholdSizeForQueue;具体的规则是pullThresholdForQueue取Math.max(1, pullThresholdForTopic / currentQueueCount),pullThresholdSizeForQueue取Math.max(1, pullThresholdSizeForTopic / currentQueueCount)

doc

  • DefaultMQPushConsumer

以上是 聊聊rocketmq的pullThresholdForTopic 的全部内容, 来源链接: utcz.com/z/511069.html

回到顶部