聊聊rocketmqbroker的CONSUMER_SEND_MSG_BACK

编程

本文主要研究一下rocketmq broker的CONSUMER_SEND_MSG_BACK

CONSUMER_SEND_MSG_BACK

rocketmq/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java

public class RequestCode {

//......

public static final int CONSUMER_SEND_MSG_BACK = 36;

//......

  • RequestCode定义了CONSUMER_SEND_MSG_BACK常量,值为36

processRequest

rocketmq/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java

public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {

private List<ConsumeMessageHook> consumeMessageHookList;

public SendMessageProcessor(final BrokerController brokerController) {

super(brokerController);

}

@Override

public RemotingCommand processRequest(ChannelHandlerContext ctx,

RemotingCommand request) throws RemotingCommandException {

SendMessageContext mqtraceContext;

switch (request.getCode()) {

case RequestCode.CONSUMER_SEND_MSG_BACK:

return this.consumerSendMsgBack(ctx, request);

default:

SendMessageRequestHeader requestHeader = parseRequestHeader(request);

if (requestHeader == null) {

return null;

}

mqtraceContext = buildMsgContext(ctx, requestHeader);

this.executeSendMessageHookBefore(ctx, request, mqtraceContext);

RemotingCommand response;

if (requestHeader.isBatch()) {

response = this.sendBatchMessage(ctx, request, mqtraceContext, requestHeader);

} else {

response = this.sendMessage(ctx, request, mqtraceContext, requestHeader);

}

this.executeSendMessageHookAfter(response, mqtraceContext);

return response;

}

}

//......

}

  • SendMessageProcessor对于request.getCode()为RequestCode.CONSUMER_SEND_MSG_BACK会执行consumerSendMsgBack方法

consumerSendMsgBack

public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {

private List<ConsumeMessageHook> consumeMessageHookList;

public SendMessageProcessor(final BrokerController brokerController) {

super(brokerController);

}

//......

private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, final RemotingCommand request)

throws RemotingCommandException {

final RemotingCommand response = RemotingCommand.createResponseCommand(null);

final ConsumerSendMsgBackRequestHeader requestHeader =

(ConsumerSendMsgBackRequestHeader)request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class);

String namespace = NamespaceUtil.getNamespaceFromResource(requestHeader.getGroup());

if (this.hasConsumeMessageHook() && !UtilAll.isBlank(requestHeader.getOriginMsgId())) {

ConsumeMessageContext context = new ConsumeMessageContext();

context.setNamespace(namespace);

context.setConsumerGroup(requestHeader.getGroup());

context.setTopic(requestHeader.getOriginTopic());

context.setCommercialRcvStats(BrokerStatsManager.StatsType.SEND_BACK);

context.setCommercialRcvTimes(1);

context.setCommercialOwner(request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER));

this.executeConsumeMessageHookAfter(context);

}

SubscriptionGroupConfig subscriptionGroupConfig =

this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getGroup());

if (null == subscriptionGroupConfig) {

response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);

response.setRemark("subscription group not exist, " + requestHeader.getGroup() + " "

+ FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));

return response;

}

if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission())) {

response.setCode(ResponseCode.NO_PERMISSION);

response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending message is forbidden");

return response;

}

if (subscriptionGroupConfig.getRetryQueueNums() <= 0) {

response.setCode(ResponseCode.SUCCESS);

response.setRemark(null);

return response;

}

String newTopic = MixAll.getRetryTopic(requestHeader.getGroup());

int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % subscriptionGroupConfig.getRetryQueueNums();

int topicSysFlag = 0;

if (requestHeader.isUnitMode()) {

topicSysFlag = TopicSysFlag.buildSysFlag(false, true);

}

TopicConfig topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(

newTopic,

subscriptionGroupConfig.getRetryQueueNums(),

PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);

if (null == topicConfig) {

response.setCode(ResponseCode.SYSTEM_ERROR);

response.setRemark("topic[" + newTopic + "] not exist");

return response;

}

if (!PermName.isWriteable(topicConfig.getPerm())) {

response.setCode(ResponseCode.NO_PERMISSION);

response.setRemark(String.format("the topic[%s] sending message is forbidden", newTopic));

return response;

}

MessageExt msgExt = this.brokerController.getMessageStore().lookMessageByOffset(requestHeader.getOffset());

if (null == msgExt) {

response.setCode(ResponseCode.SYSTEM_ERROR);

response.setRemark("look message by offset failed, " + requestHeader.getOffset());

return response;

}

final String retryTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);

if (null == retryTopic) {

MessageAccessor.putProperty(msgExt, MessageConst.PROPERTY_RETRY_TOPIC, msgExt.getTopic());

}

msgExt.setWaitStoreMsgOK(false);

int delayLevel = requestHeader.getDelayLevel();

int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();

if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {

maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();

}

if (msgExt.getReconsumeTimes() >= maxReconsumeTimes

|| delayLevel < 0) {

newTopic = MixAll.getDLQTopic(requestHeader.getGroup());

queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;

topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,

DLQ_NUMS_PER_GROUP,

PermName.PERM_WRITE, 0

);

if (null == topicConfig) {

response.setCode(ResponseCode.SYSTEM_ERROR);

response.setRemark("topic[" + newTopic + "] not exist");

return response;

}

} else {

if (0 == delayLevel) {

delayLevel = 3 + msgExt.getReconsumeTimes();

}

msgExt.setDelayTimeLevel(delayLevel);

}

MessageExtBrokerInner msgInner = new MessageExtBrokerInner();

msgInner.setTopic(newTopic);

msgInner.setBody(msgExt.getBody());

msgInner.setFlag(msgExt.getFlag());

MessageAccessor.setProperties(msgInner, msgExt.getProperties());

msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));

msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(null, msgExt.getTags()));

msgInner.setQueueId(queueIdInt);

msgInner.setSysFlag(msgExt.getSysFlag());

msgInner.setBornTimestamp(msgExt.getBornTimestamp());

msgInner.setBornHost(msgExt.getBornHost());

msgInner.setStoreHost(this.getStoreHost());

msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1);

String originMsgId = MessageAccessor.getOriginMessageId(msgExt);

MessageAccessor.setOriginMessageId(msgInner, UtilAll.isBlank(originMsgId) ? msgExt.getMsgId() : originMsgId);

PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);

if (putMessageResult != null) {

switch (putMessageResult.getPutMessageStatus()) {

case PUT_OK:

String backTopic = msgExt.getTopic();

String correctTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);

if (correctTopic != null) {

backTopic = correctTopic;

}

this.brokerController.getBrokerStatsManager().incSendBackNums(requestHeader.getGroup(), backTopic);

response.setCode(ResponseCode.SUCCESS);

response.setRemark(null);

return response;

default:

break;

}

response.setCode(ResponseCode.SYSTEM_ERROR);

response.setRemark(putMessageResult.getPutMessageStatus().name());

return response;

}

response.setCode(ResponseCode.SYSTEM_ERROR);

response.setRemark("putMessageResult is null");

return response;

}

//......

}

  • consumerSendMsgBack方法会通过brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig方法查询该请求的consumerGroup对应的subscriptionGroupConfig,如果subscriptionGroupConfig为null则提前返回,如果subscriptionGroupConfig的retryQueueNums小于等于0,也提前返回;然后通过MixAll.getRetryTopic(requestHeader.getGroup())方法获取该consumerGroup对应的retryTopic,并计算queueIdInt;之后判断subscriptionGroupConfig设置的maxReconsumeTimes,如果大于等于该值则将该消息发往DLQ_GROUP_TOPIC;最后通过brokerController.getMessageStore().putMessage(msgInner)将该消息放入对应的newTopic

小结

SendMessageProcessor对于request.getCode()为RequestCode.CONSUMER_SEND_MSG_BACK会执行consumerSendMsgBack方法

doc

  • RequestCode

以上是 聊聊rocketmqbroker的CONSUMER_SEND_MSG_BACK 的全部内容, 来源链接: utcz.com/z/510802.html

回到顶部