聊聊rocketmq的RECONSUME_LATER

编程

本文主要研究一下rocketmq的RECONSUME_LATER

ConsumeConcurrentlyStatus

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/consumer/listener/ConsumeConcurrentlyStatus.java

public enum ConsumeConcurrentlyStatus {

/**

* Success consumption

*/

CONSUME_SUCCESS,

/**

* Failure consumption,later try to consume

*/

RECONSUME_LATER;

}

  • ConsumeConcurrentlyStatus定义了两个枚举值,分别为CONSUME_SUCCESS、RECONSUME_LATER

MessageListenerConcurrently

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/consumer/listener/MessageListenerConcurrently.java

public interface MessageListenerConcurrently extends MessageListener {

/**

* It is not recommend to throw exception,rather than returning ConsumeConcurrentlyStatus.RECONSUME_LATER if

* consumption failure

*

* @param msgs msgs.size() >= 1<br> DefaultMQPushConsumer.consumeMessageBatchMaxSize=1,you can modify here

* @return The consume status

*/

ConsumeConcurrentlyStatus consumeMessage(final List<MessageExt> msgs,

final ConsumeConcurrentlyContext context);

}

  • MessageListenerConcurrently接口定义了consumeMessage方法

DefaultMessageListenerConcurrently

rocketmq-spring-boot-2.0.3-sources.jar!/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java

    public class DefaultMessageListenerConcurrently implements MessageListenerConcurrently {

@SuppressWarnings("unchecked")

@Override

public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {

for (MessageExt messageExt : msgs) {

log.debug("received msg: {}", messageExt);

try {

long now = System.currentTimeMillis();

rocketMQListener.onMessage(doConvertMessage(messageExt));

long costTime = System.currentTimeMillis() - now;

log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime);

} catch (Exception e) {

log.warn("consume message failed. messageExt:{}", messageExt, e);

context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume);

return ConsumeConcurrentlyStatus.RECONSUME_LATER;

}

}

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

}

}

  • DefaultRocketMQListenerContainer实现了MessageListenerConcurrently方法,它会循环调用rocketMQListener.onMessage,出现异常会设置delayLevelWhenNextConsume,然后立即返回ConsumeConcurrentlyStatus.RECONSUME_LATER

ConsumeRequest

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java

    class ConsumeRequest implements Runnable {

private final List<MessageExt> msgs;

private final ProcessQueue processQueue;

private final MessageQueue messageQueue;

public ConsumeRequest(List<MessageExt> msgs, ProcessQueue processQueue, MessageQueue messageQueue) {

this.msgs = msgs;

this.processQueue = processQueue;

this.messageQueue = messageQueue;

}

public List<MessageExt> getMsgs() {

return msgs;

}

public ProcessQueue getProcessQueue() {

return processQueue;

}

@Override

public void run() {

if (this.processQueue.isDropped()) {

log.info("the message queue not be able to consume, because it"s dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);

return;

}

MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;

ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);

ConsumeConcurrentlyStatus status = null;

defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());

ConsumeMessageContext consumeMessageContext = null;

if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {

consumeMessageContext = new ConsumeMessageContext();

consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace());

consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup());

consumeMessageContext.setProps(new HashMap<String, String>());

consumeMessageContext.setMq(messageQueue);

consumeMessageContext.setMsgList(msgs);

consumeMessageContext.setSuccess(false);

ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);

}

long beginTimestamp = System.currentTimeMillis();

boolean hasException = false;

ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;

try {

if (msgs != null && !msgs.isEmpty()) {

for (MessageExt msg : msgs) {

MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));

}

}

status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);

} catch (Throwable e) {

log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",

RemotingHelper.exceptionSimpleDesc(e),

ConsumeMessageConcurrentlyService.this.consumerGroup,

msgs,

messageQueue);

hasException = true;

}

long consumeRT = System.currentTimeMillis() - beginTimestamp;

if (null == status) {

if (hasException) {

returnType = ConsumeReturnType.EXCEPTION;

} else {

returnType = ConsumeReturnType.RETURNNULL;

}

} else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {

returnType = ConsumeReturnType.TIME_OUT;

} else if (ConsumeConcurrentlyStatus.RECONSUME_LATER == status) {

returnType = ConsumeReturnType.FAILED;

} else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status) {

returnType = ConsumeReturnType.SUCCESS;

}

if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {

consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());

}

if (null == status) {

log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}",

ConsumeMessageConcurrentlyService.this.consumerGroup,

msgs,

messageQueue);

status = ConsumeConcurrentlyStatus.RECONSUME_LATER;

}

if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {

consumeMessageContext.setStatus(status.toString());

consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status);

ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);

}

ConsumeMessageConcurrentlyService.this.getConsumerStatsManager()

.incConsumeRT(ConsumeMessageConcurrentlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);

if (!processQueue.isDropped()) {

ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);

} else {

log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);

}

}

public MessageQueue getMessageQueue() {

return messageQueue;

}

}

  • ConsumeRequest实现了Runnable接口,其run方法会执行listener.consumeMessage(Collections.unmodifiableList(msgs), context);在processQueue.isDropped()为false的情况下会执行processConsumeResult方法

processConsumeResult

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java

public class ConsumeMessageConcurrentlyService implements ConsumeMessageService {

private static final InternalLogger log = ClientLogger.getLog();

private final DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;

private final DefaultMQPushConsumer defaultMQPushConsumer;

private final MessageListenerConcurrently messageListener;

private final BlockingQueue<Runnable> consumeRequestQueue;

private final ThreadPoolExecutor consumeExecutor;

private final String consumerGroup;

private final ScheduledExecutorService scheduledExecutorService;

private final ScheduledExecutorService cleanExpireMsgExecutors;

//......

public void processConsumeResult(

final ConsumeConcurrentlyStatus status,

final ConsumeConcurrentlyContext context,

final ConsumeRequest consumeRequest

) {

int ackIndex = context.getAckIndex();

if (consumeRequest.getMsgs().isEmpty())

return;

switch (status) {

case CONSUME_SUCCESS:

if (ackIndex >= consumeRequest.getMsgs().size()) {

ackIndex = consumeRequest.getMsgs().size() - 1;

}

int ok = ackIndex + 1;

int failed = consumeRequest.getMsgs().size() - ok;

this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);

this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);

break;

case RECONSUME_LATER:

ackIndex = -1;

this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),

consumeRequest.getMsgs().size());

break;

default:

break;

}

switch (this.defaultMQPushConsumer.getMessageModel()) {

case BROADCASTING:

for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {

MessageExt msg = consumeRequest.getMsgs().get(i);

log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());

}

break;

case CLUSTERING:

List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());

for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {

MessageExt msg = consumeRequest.getMsgs().get(i);

boolean result = this.sendMessageBack(msg, context);

if (!result) {

msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);

msgBackFailed.add(msg);

}

}

if (!msgBackFailed.isEmpty()) {

consumeRequest.getMsgs().removeAll(msgBackFailed);

this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());

}

break;

default:

break;

}

long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());

if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {

this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);

}

}

//......

}

  • processConsumeResult方法首先根据status更新ackIndex,如果是RECONSUME_LATER则ackIndex为-1;然后会根据consumer的MessageModel做不同处理,如果是BROADCASTING则打印warn日志,如果是CLUSTERING则会从ackIndex+1的位置起,挨个遍历onsumeRequest.getMsgs(),执行sendMessageBack
  • sendMessageBack执行失败的会更新reconsumeTimes,然后添加到msgBackFailed列表,然后从consumeRequest.getMsgs()移除
  • 如果msgBackFailed列表不为空则会执行submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue())方法

sendMessageBack

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java

public class ConsumeMessageConcurrentlyService implements ConsumeMessageService {

//......

public boolean sendMessageBack(final MessageExt msg, final ConsumeConcurrentlyContext context) {

int delayLevel = context.getDelayLevelWhenNextConsume();

// Wrap topic with namespace before sending back message.

msg.setTopic(this.defaultMQPushConsumer.withNamespace(msg.getTopic()));

try {

this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, context.getMessageQueue().getBrokerName());

return true;

} catch (Exception e) {

log.error("sendMessageBack exception, group: " + this.consumerGroup + " msg: " + msg.toString(), e);

}

return false;

}

//......

}

  • sendMessageBack方法从context获取delayLevel,然后执行defaultMQPushConsumerImpl.sendMessageBack

submitConsumeRequestLater

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java

public class ConsumeMessageConcurrentlyService implements ConsumeMessageService {

//......

private void submitConsumeRequestLater(

final List<MessageExt> msgs,

final ProcessQueue processQueue,

final MessageQueue messageQueue

) {

this.scheduledExecutorService.schedule(new Runnable() {

@Override

public void run() {

ConsumeMessageConcurrentlyService.this.submitConsumeRequest(msgs, processQueue, messageQueue, true);

}

}, 5000, TimeUnit.MILLISECONDS);

}

//......

}

  • submitConsumeRequestLater方法则延时5000毫秒定时执行submitConsumeRequest方法

submitConsumeRequest

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java

public class ConsumeMessageConcurrentlyService implements ConsumeMessageService {

//......

public void submitConsumeRequest(

final List<MessageExt> msgs,

final ProcessQueue processQueue,

final MessageQueue messageQueue,

final boolean dispatchToConsume) {

final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();

if (msgs.size() <= consumeBatchSize) {

ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);

try {

this.consumeExecutor.submit(consumeRequest);

} catch (RejectedExecutionException e) {

this.submitConsumeRequestLater(consumeRequest);

}

} else {

for (int total = 0; total < msgs.size(); ) {

List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize);

for (int i = 0; i < consumeBatchSize; i++, total++) {

if (total < msgs.size()) {

msgThis.add(msgs.get(total));

} else {

break;

}

}

ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);

try {

this.consumeExecutor.submit(consumeRequest);

} catch (RejectedExecutionException e) {

for (; total < msgs.size(); total++) {

msgThis.add(msgs.get(total));

}

this.submitConsumeRequestLater(consumeRequest);

}

}

}

}

//......

}

  • submitConsumeRequest方法会执行consumeExecutor.submit(consumeRequest),如果出现RejectedExecutionException异常则执行submitConsumeRequestLater

DefaultMQPushConsumerImpl.sendMessageBack

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java

public class DefaultMQPushConsumerImpl implements MQConsumerInner {

/**

* Delay some time when exception occur

*/

private static final long PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION = 3000;

/**

* Flow control interval

*/

private static final long PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL = 50;

/**

* Delay some time when suspend pull service

*/

private static final long PULL_TIME_DELAY_MILLS_WHEN_SUSPEND = 1000;

private static final long BROKER_SUSPEND_MAX_TIME_MILLIS = 1000 * 15;

private static final long CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND = 1000 * 30;

//......

public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName)

throws RemotingException, MQBrokerException, InterruptedException, MQClientException {

try {

String brokerAddr = (null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName)

: RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());

this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg,

this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes());

} catch (Exception e) {

log.error("sendMessageBack Exception, " + this.defaultMQPushConsumer.getConsumerGroup(), e);

Message newMsg = new Message(MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()), msg.getBody());

String originMsgId = MessageAccessor.getOriginMessageId(msg);

MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId);

newMsg.setFlag(msg.getFlag());

MessageAccessor.setProperties(newMsg, msg.getProperties());

MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic());

MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes() + 1));

MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(getMaxReconsumeTimes()));

newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());

this.mQClientFactory.getDefaultMQProducer().send(newMsg);

} finally {

msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQPushConsumer.getNamespace()));

}

}

//......

}

  • DefaultMQPushConsumerImpl的sendMessageBack方法会执行mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack,出现的异常的时候执行mQClientFactory.getDefaultMQProducer().send(newMsg)

consumerSendMessageBack

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/MQClientAPIImpl.java

public class MQClientAPIImpl {

private final static InternalLogger log = ClientLogger.getLog();

private static boolean sendSmartMsg =

Boolean.parseBoolean(System.getProperty("org.apache.rocketmq.client.sendSmartMsg", "true"));

static {

System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));

}

private final RemotingClient remotingClient;

private final TopAddressing topAddressing;

private final ClientRemotingProcessor clientRemotingProcessor;

private String nameSrvAddr = null;

private ClientConfig clientConfig;

//......

public void consumerSendMessageBack(

final String addr,

final MessageExt msg,

final String consumerGroup,

final int delayLevel,

final long timeoutMillis,

final int maxConsumeRetryTimes

) throws RemotingException, MQBrokerException, InterruptedException {

ConsumerSendMsgBackRequestHeader requestHeader = new ConsumerSendMsgBackRequestHeader();

RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK, requestHeader);

requestHeader.setGroup(consumerGroup);

requestHeader.setOriginTopic(msg.getTopic());

requestHeader.setOffset(msg.getCommitLogOffset());

requestHeader.setDelayLevel(delayLevel);

requestHeader.setOriginMsgId(msg.getMsgId());

requestHeader.setMaxReconsumeTimes(maxConsumeRetryTimes);

RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),

request, timeoutMillis);

assert response != null;

switch (response.getCode()) {

case ResponseCode.SUCCESS: {

return;

}

default:

break;

}

throw new MQBrokerException(response.getCode(), response.getRemark());

}

//......

}

  • MQClientAPIImpl的consumerSendMessageBack方法内部使用的是remotingClient.invokeSync来发送消息

小结

DefaultRocketMQListenerContainer实现了MessageListenerConcurrently方法,它会循环调用rocketMQListener.onMessage,出现异常会设置delayLevelWhenNextConsume,然后立即返回ConsumeConcurrentlyStatus.RECONSUME_LATER

doc

  • ConsumeConcurrentlyStatus

以上是 聊聊rocketmq的RECONSUME_LATER 的全部内容, 来源链接: utcz.com/z/510748.html

回到顶部