聊聊rocketmq的consumeTimeout

编程

本文主要研究一下rocketmq的consumeTimeout

consumeTimeout

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java

public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer {

private final InternalLogger log = ClientLogger.getLog();

//......

/**

* Maximum amount of time in minutes a message may block the consuming thread.

*/

private long consumeTimeout = 15;

public long getConsumeTimeout() {

return consumeTimeout;

}

public void setConsumeTimeout(final long consumeTimeout) {

this.consumeTimeout = consumeTimeout;

}

public TraceDispatcher getTraceDispatcher() {

return traceDispatcher;

}

}

  • DefaultMQPushConsumer定义了consumeTimeout属性,默认为15,单位是分钟

ConsumeMessageConcurrentlyService

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java

public class ConsumeMessageConcurrentlyService implements ConsumeMessageService {

private static final InternalLogger log = ClientLogger.getLog();

private final DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;

private final DefaultMQPushConsumer defaultMQPushConsumer;

private final MessageListenerConcurrently messageListener;

private final BlockingQueue<Runnable> consumeRequestQueue;

private final ThreadPoolExecutor consumeExecutor;

private final String consumerGroup;

private final ScheduledExecutorService scheduledExecutorService;

private final ScheduledExecutorService cleanExpireMsgExecutors;

public ConsumeMessageConcurrentlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl,

MessageListenerConcurrently messageListener) {

this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;

this.messageListener = messageListener;

this.defaultMQPushConsumer = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer();

this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup();

this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>();

this.consumeExecutor = new ThreadPoolExecutor(

this.defaultMQPushConsumer.getConsumeThreadMin(),

this.defaultMQPushConsumer.getConsumeThreadMax(),

1000 * 60,

TimeUnit.MILLISECONDS,

this.consumeRequestQueue,

new ThreadFactoryImpl("ConsumeMessageThread_"));

this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_"));

this.cleanExpireMsgExecutors = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("CleanExpireMsgScheduledThread_"));

}

public void start() {

this.cleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable() {

@Override

public void run() {

cleanExpireMsg();

}

}, this.defaultMQPushConsumer.getConsumeTimeout(), this.defaultMQPushConsumer.getConsumeTimeout(), TimeUnit.MINUTES);

}

private void cleanExpireMsg() {

Iterator<Map.Entry<MessageQueue, ProcessQueue>> it =

this.defaultMQPushConsumerImpl.getRebalanceImpl().getProcessQueueTable().entrySet().iterator();

while (it.hasNext()) {

Map.Entry<MessageQueue, ProcessQueue> next = it.next();

ProcessQueue pq = next.getValue();

pq.cleanExpiredMsg(this.defaultMQPushConsumer);

}

}

//......

}

  • ConsumeMessageConcurrentlyService的start方法注册了一个定时任务,每隔defaultMQPushConsumer.getConsumeTimeout()执行一次cleanExpireMsg方法;cleanExpireMsg方法会遍历processQueueTable,然后挨个执行ProcessQueue的cleanExpiredMsg方法

ProcessQueue.cleanExpiredMsg

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java

public class ProcessQueue {

//......

public void cleanExpiredMsg(DefaultMQPushConsumer pushConsumer) {

if (pushConsumer.getDefaultMQPushConsumerImpl().isConsumeOrderly()) {

return;

}

int loop = msgTreeMap.size() < 16 ? msgTreeMap.size() : 16;

for (int i = 0; i < loop; i++) {

MessageExt msg = null;

try {

this.lockTreeMap.readLock().lockInterruptibly();

try {

if (!msgTreeMap.isEmpty() && System.currentTimeMillis() - Long.parseLong(MessageAccessor.getConsumeStartTimeStamp(msgTreeMap.firstEntry().getValue())) > pushConsumer.getConsumeTimeout() * 60 * 1000) {

msg = msgTreeMap.firstEntry().getValue();

} else {

break;

}

} finally {

this.lockTreeMap.readLock().unlock();

}

} catch (InterruptedException e) {

log.error("getExpiredMsg exception", e);

}

try {

pushConsumer.sendMessageBack(msg, 3);

log.info("send expire msg back. topic={}, msgId={}, storeHost={}, queueId={}, queueOffset={}", msg.getTopic(), msg.getMsgId(), msg.getStoreHost(), msg.getQueueId(), msg.getQueueOffset());

try {

this.lockTreeMap.writeLock().lockInterruptibly();

try {

if (!msgTreeMap.isEmpty() && msg.getQueueOffset() == msgTreeMap.firstKey()) {

try {

removeMessage(Collections.singletonList(msg));

} catch (Exception e) {

log.error("send expired msg exception", e);

}

}

} finally {

this.lockTreeMap.writeLock().unlock();

}

} catch (InterruptedException e) {

log.error("getExpiredMsg exception", e);

}

} catch (Exception e) {

log.error("send expired msg exception", e);

}

}

}

public long removeMessage(final List<MessageExt> msgs) {

long result = -1;

final long now = System.currentTimeMillis();

try {

this.lockTreeMap.writeLock().lockInterruptibly();

this.lastConsumeTimestamp = now;

try {

if (!msgTreeMap.isEmpty()) {

result = this.queueOffsetMax + 1;

int removedCnt = 0;

for (MessageExt msg : msgs) {

MessageExt prev = msgTreeMap.remove(msg.getQueueOffset());

if (prev != null) {

removedCnt--;

msgSize.addAndGet(0 - msg.getBody().length);

}

}

msgCount.addAndGet(removedCnt);

if (!msgTreeMap.isEmpty()) {

result = msgTreeMap.firstKey();

}

}

} finally {

this.lockTreeMap.writeLock().unlock();

}

} catch (Throwable t) {

log.error("removeMessage exception", t);

}

return result;

}

//......

}

  • ProcessQueue的cleanExpiredMsg方法对于consumeOrderly的直接返回,之后根据msgTreeMap.size()确定loop,最大值为16;然后循环判断msgTreeMap.firstEntry().getValue()是否超过consumeTimeout,不是则直接跳出循环;是的话执行pushConsumer.sendMessageBack(msg, 3),然后调用removeMessage移除该msg;removeMessage方法则根据msg.getQueueOffset()移除消息

小结

ConsumeMessageConcurrentlyService的start方法注册了一个定时任务,每隔defaultMQPushConsumer.getConsumeTimeout()执行一次cleanExpireMsg方法;cleanExpireMsg方法会遍历processQueueTable,然后挨个执行ProcessQueue的cleanExpiredMsg方法

doc

  • DefaultMQPushConsumer

以上是 聊聊rocketmq的consumeTimeout 的全部内容, 来源链接: utcz.com/z/510811.html

回到顶部