聊聊rocketmq的maxReconsumeTimes

编程

本文主要研究一下rocketmq的maxReconsumeTimes

maxReconsumeTimes

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java

public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer {

private final InternalLogger log = ClientLogger.getLog();

//......

/**

* Max re-consume times. -1 means 16 times.

* </p>

*

* If messages are re-consumed more than {@link #maxReconsumeTimes} before success, it"s be directed to a deletion

* queue waiting.

*/

private int maxReconsumeTimes = -1;

//......

public int getMaxReconsumeTimes() {

return maxReconsumeTimes;

}

public void setMaxReconsumeTimes(final int maxReconsumeTimes) {

this.maxReconsumeTimes = maxReconsumeTimes;

}

//......

}

  • DefaultMQPushConsumer定义了maxReconsumeTimes属性,默认为-1

sendMessageBack

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java

public class DefaultMQPushConsumerImpl implements MQConsumerInner {

//......

public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName)

throws RemotingException, MQBrokerException, InterruptedException, MQClientException {

try {

String brokerAddr = (null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName)

: RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());

this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg,

this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes());

} catch (Exception e) {

log.error("sendMessageBack Exception, " + this.defaultMQPushConsumer.getConsumerGroup(), e);

Message newMsg = new Message(MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()), msg.getBody());

String originMsgId = MessageAccessor.getOriginMessageId(msg);

MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId);

newMsg.setFlag(msg.getFlag());

MessageAccessor.setProperties(newMsg, msg.getProperties());

MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic());

MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes() + 1));

MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(getMaxReconsumeTimes()));

newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());

this.mQClientFactory.getDefaultMQProducer().send(newMsg);

} finally {

msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQPushConsumer.getNamespace()));

}

}

private int getMaxReconsumeTimes() {

// default reconsume times: 16

if (this.defaultMQPushConsumer.getMaxReconsumeTimes() == -1) {

return 16;

} else {

return this.defaultMQPushConsumer.getMaxReconsumeTimes();

}

}

//......

}

  • DefaultMQPushConsumerImpl的sendMessageBack方法会对mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack进行异常捕获,出现异常时会使用MessageAccessor.setReconsumeTime更新newMsg的reconsumeTime,以及调用getMaxReconsumeTimes方法设置newMsg的maxReconsumeTimes,最后使用mQClientFactory.getDefaultMQProducer().send(newMsg)发送消息

handleRetryAndDLQ

rocketmq/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java

public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {

//......

private boolean handleRetryAndDLQ(SendMessageRequestHeader requestHeader, RemotingCommand response,

RemotingCommand request,

MessageExt msg, TopicConfig topicConfig) {

String newTopic = requestHeader.getTopic();

if (null != newTopic && newTopic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {

String groupName = newTopic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length());

SubscriptionGroupConfig subscriptionGroupConfig =

this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(groupName);

if (null == subscriptionGroupConfig) {

response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);

response.setRemark(

"subscription group not exist, " + groupName + " " + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));

return false;

}

int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();

if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {

maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();

}

int reconsumeTimes = requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes();

if (reconsumeTimes >= maxReconsumeTimes) {

newTopic = MixAll.getDLQTopic(groupName);

int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;

topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,

DLQ_NUMS_PER_GROUP,

PermName.PERM_WRITE, 0

);

msg.setTopic(newTopic);

msg.setQueueId(queueIdInt);

if (null == topicConfig) {

response.setCode(ResponseCode.SYSTEM_ERROR);

response.setRemark("topic[" + newTopic + "] not exist");

return false;

}

}

}

int sysFlag = requestHeader.getSysFlag();

if (TopicFilterType.MULTI_TAG == topicConfig.getTopicFilterType()) {

sysFlag |= MessageSysFlag.MULTI_TAGS_FLAG;

}

msg.setSysFlag(sysFlag);

return true;

}

//......

}

  • SendMessageProcessor的handleRetryAndDLQ方法会判断如果topic是RETRY_GROUP_TOPIC_PREFIX(%RETRY%)开头的,会先从subscriptionGroupConfig.getRetryMaxTimes()获取maxReconsumeTimes,对于mq版本大于等于MQVersion.Version.V3_4_9.ordinal()的则会从request的header中读取maxReconsumeTimes;之后从request的header读取reconsumeTimes,如果该值大于等于maxReconsumeTimes则更新newTopic为MixAll.getDLQTopic(groupName)

小结

DefaultMQPushConsumer定义了maxReconsumeTimes属性,默认为-1;DefaultMQPushConsumerImpl的sendMessageBack方法会对mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack进行异常捕获,出现异常时会使用MessageAccessor.setReconsumeTime更新newMsg的reconsumeTime,以及调用getMaxReconsumeTimes方法设置newMsg的maxReconsumeTimes,最后使用mQClientFactory.getDefaultMQProducer().send(newMsg)发送消息;broker端会判断reconsumeTimes如果大于等于maxReconsumeTimes则会将其topic改为MixAll.getDLQTopic(groupName)

doc

  • DefaultMQPushConsumer

以上是 聊聊rocketmq的maxReconsumeTimes 的全部内容, 来源链接: utcz.com/z/510995.html

回到顶部