聊聊rocketmq的ConsumerManageProcessor

编程

本文主要研究一下rocketmq的ConsumerManageProcessor

NettyRequestProcessor

rocketmq-all-4.6.0-source-release/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRequestProcessor.java

public interface NettyRequestProcessor {

RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)

throws Exception;

boolean rejectRequest();

}

  • NettyRequestProcessor接口定义了processRequest、rejectRequest方法

ConsumerManageProcessor

rocketmq-all-4.6.0-source-release/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java

public class ConsumerManageProcessor implements NettyRequestProcessor {

private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);

private final BrokerController brokerController;

public ConsumerManageProcessor(final BrokerController brokerController) {

this.brokerController = brokerController;

}

@Override

public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)

throws RemotingCommandException {

switch (request.getCode()) {

case RequestCode.GET_CONSUMER_LIST_BY_GROUP:

return this.getConsumerListByGroup(ctx, request);

case RequestCode.UPDATE_CONSUMER_OFFSET:

return this.updateConsumerOffset(ctx, request);

case RequestCode.QUERY_CONSUMER_OFFSET:

return this.queryConsumerOffset(ctx, request);

default:

break;

}

return null;

}

@Override

public boolean rejectRequest() {

return false;

}

//......

}

  • ConsumerManageProcessor实现了NettyRequestProcessor接口,其processRequest方法只处理code为RequestCode.GET_CONSUMER_LIST_BY_GROUP、RequestCode.UPDATE_CONSUMER_OFFSET或者RequestCode.QUERY_CONSUMER_OFFSET的request;其中针对RequestCode.GET_CONSUMER_LIST_BY_GROUP执行getConsumerListByGroup方法,针对RequestCode.UPDATE_CONSUMER_OFFSET执行updateConsumerOffset方法,针对RequestCode.QUERY_CONSUMER_OFFSET执行queryConsumerOffset方法;其rejectRequest返回false

getConsumerListByGroup

rocketmq-all-4.6.0-source-release/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java

public class ConsumerManageProcessor implements NettyRequestProcessor {

//......

public RemotingCommand getConsumerListByGroup(ChannelHandlerContext ctx, RemotingCommand request)

throws RemotingCommandException {

final RemotingCommand response =

RemotingCommand.createResponseCommand(GetConsumerListByGroupResponseHeader.class);

final GetConsumerListByGroupRequestHeader requestHeader =

(GetConsumerListByGroupRequestHeader) request

.decodeCommandCustomHeader(GetConsumerListByGroupRequestHeader.class);

ConsumerGroupInfo consumerGroupInfo =

this.brokerController.getConsumerManager().getConsumerGroupInfo(

requestHeader.getConsumerGroup());

if (consumerGroupInfo != null) {

List<String> clientIds = consumerGroupInfo.getAllClientId();

if (!clientIds.isEmpty()) {

GetConsumerListByGroupResponseBody body = new GetConsumerListByGroupResponseBody();

body.setConsumerIdList(clientIds);

response.setBody(body.encode());

response.setCode(ResponseCode.SUCCESS);

response.setRemark(null);

return response;

} else {

log.warn("getAllClientId failed, {} {}", requestHeader.getConsumerGroup(),

RemotingHelper.parseChannelRemoteAddr(ctx.channel()));

}

} else {

log.warn("getConsumerGroupInfo failed, {} {}", requestHeader.getConsumerGroup(),

RemotingHelper.parseChannelRemoteAddr(ctx.channel()));

}

response.setCode(ResponseCode.SYSTEM_ERROR);

response.setRemark("no consumer for this group, " + requestHeader.getConsumerGroup());

return response;

}

//......

}

  • getConsumerListByGroup方法通过brokerController.getConsumerManager().getConsumerGroupInfo(requestHeader.getConsumerGroup())获取consumerGroupInfo

updateConsumerOffset

rocketmq-all-4.6.0-source-release/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java

public class ConsumerManageProcessor implements NettyRequestProcessor {

//......

private RemotingCommand updateConsumerOffset(ChannelHandlerContext ctx, RemotingCommand request)

throws RemotingCommandException {

final RemotingCommand response =

RemotingCommand.createResponseCommand(UpdateConsumerOffsetResponseHeader.class);

final UpdateConsumerOffsetRequestHeader requestHeader =

(UpdateConsumerOffsetRequestHeader) request

.decodeCommandCustomHeader(UpdateConsumerOffsetRequestHeader.class);

this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getConsumerGroup(),

requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset());

response.setCode(ResponseCode.SUCCESS);

response.setRemark(null);

return response;

}

//......

}

  • updateConsumerOffset方法主要是执行brokerController.getConsumerOffsetManager().commitOffset

queryConsumerOffset

rocketmq-all-4.6.0-source-release/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java

public class ConsumerManageProcessor implements NettyRequestProcessor {

//......

private RemotingCommand queryConsumerOffset(ChannelHandlerContext ctx, RemotingCommand request)

throws RemotingCommandException {

final RemotingCommand response =

RemotingCommand.createResponseCommand(QueryConsumerOffsetResponseHeader.class);

final QueryConsumerOffsetResponseHeader responseHeader =

(QueryConsumerOffsetResponseHeader) response.readCustomHeader();

final QueryConsumerOffsetRequestHeader requestHeader =

(QueryConsumerOffsetRequestHeader) request

.decodeCommandCustomHeader(QueryConsumerOffsetRequestHeader.class);

long offset =

this.brokerController.getConsumerOffsetManager().queryOffset(

requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId());

if (offset >= 0) {

responseHeader.setOffset(offset);

response.setCode(ResponseCode.SUCCESS);

response.setRemark(null);

} else {

long minOffset =

this.brokerController.getMessageStore().getMinOffsetInQueue(requestHeader.getTopic(),

requestHeader.getQueueId());

if (minOffset <= 0

&& !this.brokerController.getMessageStore().checkInDiskByConsumeOffset(

requestHeader.getTopic(), requestHeader.getQueueId(), 0)) {

responseHeader.setOffset(0L);

response.setCode(ResponseCode.SUCCESS);

response.setRemark(null);

} else {

response.setCode(ResponseCode.QUERY_NOT_FOUND);

response.setRemark("Not found, V3_0_6_SNAPSHOT maybe this group consumer boot first");

}

}

return response;

}

//......

}

  • queryConsumerOffset方法主要是通过brokerController.getConsumerOffsetManager().queryOffset(requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId())获取指定consumerGroup、topic及queueId的offset

小结

ConsumerManageProcessor实现了NettyRequestProcessor接口,其processRequest方法只处理code为RequestCode.GET_CONSUMER_LIST_BY_GROUP、RequestCode.UPDATE_CONSUMER_OFFSET或者RequestCode.QUERY_CONSUMER_OFFSET的request;其中针对RequestCode.GET_CONSUMER_LIST_BY_GROUP执行getConsumerListByGroup方法,针对RequestCode.UPDATE_CONSUMER_OFFSET执行updateConsumerOffset方法,针对RequestCode.QUERY_CONSUMER_OFFSET执行queryConsumerOffset方法;其rejectRequest返回false

doc

  • ConsumerManageProcessor

以上是 聊聊rocketmq的ConsumerManageProcessor 的全部内容, 来源链接: utcz.com/z/512118.html

回到顶部