聊聊rocketmq的consumeMessageBatchMaxSize

编程

本文主要研究一下rocketmq的consumeMessageBatchMaxSize

consumeMessageBatchMaxSize

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java

public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer {

private final InternalLogger log = ClientLogger.getLog();

//......

/**

* Batch consumption size

*/

private int consumeMessageBatchMaxSize = 1;

public int getConsumeMessageBatchMaxSize() {

return consumeMessageBatchMaxSize;

}

public void setConsumeMessageBatchMaxSize(int consumeMessageBatchMaxSize) {

this.consumeMessageBatchMaxSize = consumeMessageBatchMaxSize;

}

//......

}

  • DefaultMQPushConsumer定义了consumeMessageBatchMaxSize属性,默认值为1

checkConfig

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java

public class DefaultMQPushConsumerImpl implements MQConsumerInner {

//......

private void checkConfig() throws MQClientException {

Validators.checkGroup(this.defaultMQPushConsumer.getConsumerGroup());

//......

// consumeMessageBatchMaxSize

if (this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize() < 1

|| this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize() > 1024) {

throw new MQClientException(

"consumeMessageBatchMaxSize Out of range [1, 1024]"

+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),

null);

}

//......

}

//......

}

  • checkConfig方法会校验defaultMQPushConsumer.getConsumeMessageBatchMaxSize(),要求其值必须大于等于且小于等于1024

submitConsumeRequest

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java

public class ConsumeMessageConcurrentlyService implements ConsumeMessageService {

//......

public void submitConsumeRequest(

final List<MessageExt> msgs,

final ProcessQueue processQueue,

final MessageQueue messageQueue,

final boolean dispatchToConsume) {

final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();

if (msgs.size() <= consumeBatchSize) {

ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);

try {

this.consumeExecutor.submit(consumeRequest);

} catch (RejectedExecutionException e) {

this.submitConsumeRequestLater(consumeRequest);

}

} else {

for (int total = 0; total < msgs.size(); ) {

List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize);

for (int i = 0; i < consumeBatchSize; i++, total++) {

if (total < msgs.size()) {

msgThis.add(msgs.get(total));

} else {

break;

}

}

ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);

try {

this.consumeExecutor.submit(consumeRequest);

} catch (RejectedExecutionException e) {

for (; total < msgs.size(); total++) {

msgThis.add(msgs.get(total));

}

this.submitConsumeRequestLater(consumeRequest);

}

}

}

}

//......

}

  • submitConsumeRequest方法在msgs.size()小于等于consumeBatchSize时会创建ConsumeRequest,然后提交到consumeExecutor执行,若出现RejectedExecutionException则执行submitConsumeRequestLater;对于msgs.size()大于consumeBatchSize的,则按consumeBatchSize分批创建ConsumeRequest提交给consumeExecutor执行,若出现RejectedExecutionException则将剩余的msg添加到msgThis,然后执行submitConsumeRequestLater

小结

DefaultMQPushConsumer定义了consumeMessageBatchMaxSize属性,默认值为1;DefaultMQPushConsumerImpl的checkConfig方法会校验defaultMQPushConsumer.getConsumeMessageBatchMaxSize(),要求其值必须大于等于且小于等于1024;ConsumeMessageConcurrentlyService的submitConsumeRequest方法在msgs.size()小于等于consumeBatchSize时会创建ConsumeRequest,然后提交到consumeExecutor执行,若出现RejectedExecutionException则执行submitConsumeRequestLater;对于msgs.size()大于consumeBatchSize的,则按consumeBatchSize分批创建ConsumeRequest提交给consumeExecutor执行,若出现RejectedExecutionException则将剩余的msg添加到msgThis,然后执行submitConsumeRequestLater

doc

  • DefaultMQPushConsumer

以上是 聊聊rocketmq的consumeMessageBatchMaxSize 的全部内容, 来源链接: utcz.com/z/511170.html

回到顶部