聊聊rocketmq的suggestPullingFromSlave

编程

本文主要研究一下rocketmq的suggestPullingFromSlave

suggestPullingFromSlave

rocketmq-all-4.6.0-source-release/store/src/main/java/org/apache/rocketmq/store/GetMessageResult.java

public class GetMessageResult {

private final List<SelectMappedBufferResult> messageMapedList =

new ArrayList<SelectMappedBufferResult>(100);

private final List<ByteBuffer> messageBufferList = new ArrayList<ByteBuffer>(100);

private GetMessageStatus status;

private long nextBeginOffset;

private long minOffset;

private long maxOffset;

private int bufferTotalSize = 0;

private boolean suggestPullingFromSlave = false;

private int msgCount4Commercial = 0;

//......

public boolean isSuggestPullingFromSlave() {

return suggestPullingFromSlave;

}

public void setSuggestPullingFromSlave(boolean suggestPullingFromSlave) {

this.suggestPullingFromSlave = suggestPullingFromSlave;

}

//......

}

  • GetMessageResult定义了suggestPullingFromSlave属性,默认为false

getMessage

rocketmq-all-4.6.0-source-release/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java

public class DefaultMessageStore implements MessageStore {

//......

public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset,

final int maxMsgNums,

final MessageFilter messageFilter) {

if (this.shutdown) {

log.warn("message store has shutdown, so getMessage is forbidden");

return null;

}

if (!this.runningFlags.isReadable()) {

log.warn("message store is not readable, so getMessage is forbidden " + this.runningFlags.getFlagBits());

return null;

}

long beginTime = this.getSystemClock().now();

GetMessageStatus status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;

long nextBeginOffset = offset;

long minOffset = 0;

long maxOffset = 0;

GetMessageResult getResult = new GetMessageResult();

final long maxOffsetPy = this.commitLog.getMaxOffset();

ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId);

if (consumeQueue != null) {

minOffset = consumeQueue.getMinOffsetInQueue();

maxOffset = consumeQueue.getMaxOffsetInQueue();

if (maxOffset == 0) {

status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;

nextBeginOffset = nextOffsetCorrection(offset, 0);

} else if (offset < minOffset) {

status = GetMessageStatus.OFFSET_TOO_SMALL;

nextBeginOffset = nextOffsetCorrection(offset, minOffset);

} else if (offset == maxOffset) {

status = GetMessageStatus.OFFSET_OVERFLOW_ONE;

nextBeginOffset = nextOffsetCorrection(offset, offset);

} else if (offset > maxOffset) {

status = GetMessageStatus.OFFSET_OVERFLOW_BADLY;

if (0 == minOffset) {

nextBeginOffset = nextOffsetCorrection(offset, minOffset);

} else {

nextBeginOffset = nextOffsetCorrection(offset, maxOffset);

}

} else {

SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(offset);

if (bufferConsumeQueue != null) {

try {

status = GetMessageStatus.NO_MATCHED_MESSAGE;

long nextPhyFileStartOffset = Long.MIN_VALUE;

long maxPhyOffsetPulling = 0;

int i = 0;

final int maxFilterMessageCount = Math.max(16000, maxMsgNums * ConsumeQueue.CQ_STORE_UNIT_SIZE);

final boolean diskFallRecorded = this.messageStoreConfig.isDiskFallRecorded();

ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();

for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {

long offsetPy = bufferConsumeQueue.getByteBuffer().getLong();

int sizePy = bufferConsumeQueue.getByteBuffer().getInt();

long tagsCode = bufferConsumeQueue.getByteBuffer().getLong();

maxPhyOffsetPulling = offsetPy;

//......

this.storeStatsService.getGetMessageTransferedMsgCount().incrementAndGet();

getResult.addMessage(selectResult);

status = GetMessageStatus.FOUND;

nextPhyFileStartOffset = Long.MIN_VALUE;

}

//......

long diff = maxOffsetPy - maxPhyOffsetPulling;

long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE

* (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));

getResult.setSuggestPullingFromSlave(diff > memory);

} finally {

bufferConsumeQueue.release();

}

} else {

status = GetMessageStatus.OFFSET_FOUND_NULL;

nextBeginOffset = nextOffsetCorrection(offset, consumeQueue.rollNextFile(offset));

log.warn("consumer request topic: " + topic + "offset: " + offset + " minOffset: " + minOffset + " maxOffset: "

+ maxOffset + ", but access logic queue failed.");

}

}

} else {

status = GetMessageStatus.NO_MATCHED_LOGIC_QUEUE;

nextBeginOffset = nextOffsetCorrection(offset, 0);

}

if (GetMessageStatus.FOUND == status) {

this.storeStatsService.getGetMessageTimesTotalFound().incrementAndGet();

} else {

this.storeStatsService.getGetMessageTimesTotalMiss().incrementAndGet();

}

long elapsedTime = this.getSystemClock().now() - beginTime;

this.storeStatsService.setGetMessageEntireTimeMax(elapsedTime);

getResult.setStatus(status);

getResult.setNextBeginOffset(nextBeginOffset);

getResult.setMaxOffset(maxOffset);

getResult.setMinOffset(minOffset);

return getResult;

}

//......

}

  • DefaultMessageStore的getMessage会计算diff(maxOffsetPy - maxPhyOffsetPulling),然后同memory(StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0))对比,大于则设置getResult的suggestPullingFromSlave为true

processRequest

rocketmq-all-4.6.0-source-release/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java

public class PullMessageProcessor implements NettyRequestProcessor {

//......

private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend)

throws RemotingCommandException {

RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);

final PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) response.readCustomHeader();

final PullMessageRequestHeader requestHeader =

(PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class);

response.setOpaque(request.getOpaque());

//......

final GetMessageResult getMessageResult =

this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(),

requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter);

if (getMessageResult != null) {

response.setRemark(getMessageResult.getStatus().name());

responseHeader.setNextBeginOffset(getMessageResult.getNextBeginOffset());

responseHeader.setMinOffset(getMessageResult.getMinOffset());

responseHeader.setMaxOffset(getMessageResult.getMaxOffset());

if (getMessageResult.isSuggestPullingFromSlave()) {

responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());

} else {

responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);

}

switch (this.brokerController.getMessageStoreConfig().getBrokerRole()) {

case ASYNC_MASTER:

case SYNC_MASTER:

break;

case SLAVE:

if (!this.brokerController.getBrokerConfig().isSlaveReadEnable()) {

response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);

responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);

}

break;

}

if (this.brokerController.getBrokerConfig().isSlaveReadEnable()) {

// consume too slow ,redirect to another machine

if (getMessageResult.isSuggestPullingFromSlave()) {

responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());

}

// consume ok

else {

responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId());

}

} else {

responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);

}

//......

} else {

response.setCode(ResponseCode.SYSTEM_ERROR);

response.setRemark("store getMessage return null");

}

boolean storeOffsetEnable = brokerAllowSuspend;

storeOffsetEnable = storeOffsetEnable && hasCommitOffsetFlag;

storeOffsetEnable = storeOffsetEnable

&& this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE;

if (storeOffsetEnable) {

this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(channel),

requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset());

}

return response;

}

//......

}

  • processRequest方法会通过brokerController.getMessageStore().getMessage方法获取getMessageResult,之后判断getMessageResult.isSuggestPullingFromSlave()是否为true,是的话则执行responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly()),否则执行responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID)

小结

DefaultMessageStore的getMessage会计算diff(maxOffsetPy - maxPhyOffsetPulling),然后同memory(StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0))对比,大于则设置getResult的suggestPullingFromSlave为true

doc

  • GetMessageResult

以上是 聊聊rocketmq的suggestPullingFromSlave 的全部内容, 来源链接: utcz.com/z/511528.html

回到顶部