聊聊rocketmq的pullInterval

编程

本文主要研究一下rocketmq的pullInterval

pullInterval

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java

public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer {

private final InternalLogger log = ClientLogger.getLog();

//......

/**

* Message pull Interval

*/

private long pullInterval = 0;

public long getPullInterval() {

return pullInterval;

}

public void setPullInterval(long pullInterval) {

this.pullInterval = pullInterval;

}

//......

}

  • DefaultMQPushConsumer定义了pullInterval属性,默认值为0

checkConfig

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java

public class DefaultMQPushConsumerImpl implements MQConsumerInner {

//......

private void checkConfig() throws MQClientException {

Validators.checkGroup(this.defaultMQPushConsumer.getConsumerGroup());

//......

// pullInterval

if (this.defaultMQPushConsumer.getPullInterval() < 0 || this.defaultMQPushConsumer.getPullInterval() > 65535) {

throw new MQClientException(

"pullInterval Out of range [0, 65535]"

+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),

null);

}

//......

}

//......

}

  • checkConfig方法会校验defaultMQPushConsumer.getPullInterval()必须大于等于0且小于等于65535

pullCallback

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java

public class DefaultMQPushConsumerImpl implements MQConsumerInner {

//......

PullCallback pullCallback = new PullCallback() {

@Override

public void onSuccess(PullResult pullResult) {

if (pullResult != null) {

pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,

subscriptionData);

switch (pullResult.getPullStatus()) {

case FOUND:

long prevRequestOffset = pullRequest.getNextOffset();

pullRequest.setNextOffset(pullResult.getNextBeginOffset());

long pullRT = System.currentTimeMillis() - beginTimestamp;

DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),

pullRequest.getMessageQueue().getTopic(), pullRT);

long firstMsgOffset = Long.MAX_VALUE;

if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {

DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);

} else {

firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();

DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),

pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());

boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());

DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(

pullResult.getMsgFoundList(),

processQueue,

pullRequest.getMessageQueue(),

dispatchToConsume);

if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {

DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,

DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());

} else {

DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);

}

}

if (pullResult.getNextBeginOffset() < prevRequestOffset

|| firstMsgOffset < prevRequestOffset) {

log.warn(

"[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}",

pullResult.getNextBeginOffset(),

firstMsgOffset,

prevRequestOffset);

}

break;

case NO_NEW_MSG:

pullRequest.setNextOffset(pullResult.getNextBeginOffset());

DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);

DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);

break;

case NO_MATCHED_MSG:

pullRequest.setNextOffset(pullResult.getNextBeginOffset());

DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);

DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);

break;

case OFFSET_ILLEGAL:

log.warn("the pull request offset illegal, {} {}",

pullRequest.toString(), pullResult.toString());

pullRequest.setNextOffset(pullResult.getNextBeginOffset());

pullRequest.getProcessQueue().setDropped(true);

DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {

@Override

public void run() {

try {

DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),

pullRequest.getNextOffset(), false);

DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());

DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());

log.warn("fix the pull request offset, {}", pullRequest);

} catch (Throwable e) {

log.error("executeTaskLater Exception", e);

}

}

}, 10000);

break;

default:

break;

}

}

}

@Override

public void onException(Throwable e) {

if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {

log.warn("execute the pull request exception", e);

}

DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);

}

};

//......

private void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) {

this.mQClientFactory.getPullMessageService().executePullRequestLater(pullRequest, timeDelay);

}

//......

}

  • pullCallback的onSuccess方法首先通过pullAPIWrapper.processPullResult处理pullResult,之后对于pullResult.getPullStatus()为FOUND类型的会判断pullResult.getMsgFoundList()是否为空,不为空则会执行consumeMessageService.submitConsumeRequest,然后再判断defaultMQPushConsumer.getPullInterval()是否大于0,大于0的话则执行executePullRequestLater(pullRequest,

    DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval()),否则这执行executePullRequestImmediately(pullRequest)

小结

DefaultMQPushConsumer定义了pullInterval属性,默认值为0;checkConfig方法会校验defaultMQPushConsumer.getPullInterval()必须大于等于0且小于等于65535;pullCallback的onSuccess方法判断defaultMQPushConsumer.getPullInterval()是否大于0,大于0的话则执行executePullRequestLater(pullRequest,

DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval()),否则这执行executePullRequestImmediately(pullRequest)

doc

  • DefaultMQPushConsumer

以上是 聊聊rocketmq的pullInterval 的全部内容, 来源链接: utcz.com/z/511088.html

回到顶部