聊聊rocketmq的suspendCurrentQueueTimeMillis

编程

本文主要研究一下rocketmq的suspendCurrentQueueTimeMillis

suspendCurrentQueueTimeMillis

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java

public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer {

//......

/**

* Suspending pulling time for cases requiring slow pulling like flow-control scenario.

*/

private long suspendCurrentQueueTimeMillis = 1000;

public long getSuspendCurrentQueueTimeMillis() {

return suspendCurrentQueueTimeMillis;

}

public void setSuspendCurrentQueueTimeMillis(final long suspendCurrentQueueTimeMillis) {

this.suspendCurrentQueueTimeMillis = suspendCurrentQueueTimeMillis;

}

//......

}

  • DefaultMQPushConsumer定义了suspendCurrentQueueTimeMillis属性,默认值为1000

submitConsumeRequestLater

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java

public class ConsumeMessageOrderlyService implements ConsumeMessageService {

//......

private void submitConsumeRequestLater(

final ProcessQueue processQueue,

final MessageQueue messageQueue,

final long suspendTimeMillis

) {

long timeMillis = suspendTimeMillis;

if (timeMillis == -1) {

timeMillis = this.defaultMQPushConsumer.getSuspendCurrentQueueTimeMillis();

}

if (timeMillis < 10) {

timeMillis = 10;

} else if (timeMillis > 30000) {

timeMillis = 30000;

}

this.scheduledExecutorService.schedule(new Runnable() {

@Override

public void run() {

ConsumeMessageOrderlyService.this.submitConsumeRequest(null, processQueue, messageQueue, true);

}

}, timeMillis, TimeUnit.MILLISECONDS);

}

//......

}

  • submitConsumeRequestLater方法在timeMillis为-1时会读取defaultMQPushConsumer.getSuspendCurrentQueueTimeMillis()的值,如果该值小于10则重置为10,如果该值大于30000则重置为30000;然后使用scheduledExecutorService延时timeMillis执行submitConsumeRequest方法

小结

DefaultMQPushConsumer定义了suspendCurrentQueueTimeMillis属性,默认值为1000;ConsumeMessageOrderlyService的submitConsumeRequestLater方法在timeMillis为-1时会读取defaultMQPushConsumer.getSuspendCurrentQueueTimeMillis()的值,如果该值小于10则重置为10,如果该值大于30000则重置为30000;然后使用scheduledExecutorService延时timeMillis执行submitConsumeRequest方法

doc

  • DefaultMQPushConsumer

以上是 聊聊rocketmq的suspendCurrentQueueTimeMillis 的全部内容, 来源链接: utcz.com/z/511124.html

回到顶部