聊聊rocketmq的pullFromWhichNodeTable

编程

本文主要研究一下rocketmq的pullFromWhichNodeTable

pullFromWhichNodeTable

rocketmq-all-4.6.0-source-release/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java

public class PullAPIWrapper {

private final InternalLogger log = ClientLogger.getLog();

private final MQClientInstance mQClientFactory;

private final String consumerGroup;

private final boolean unitMode;

private ConcurrentMap<MessageQueue, AtomicLong/* brokerId */> pullFromWhichNodeTable =

new ConcurrentHashMap<MessageQueue, AtomicLong>(32);

private volatile boolean connectBrokerByUser = false;

private volatile long defaultBrokerId = MixAll.MASTER_ID;

private Random random = new Random(System.currentTimeMillis());

private ArrayList<FilterMessageHook> filterMessageHookList = new ArrayList<FilterMessageHook>();

//......

public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult,

final SubscriptionData subscriptionData) {

PullResultExt pullResultExt = (PullResultExt) pullResult;

this.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId());

if (PullStatus.FOUND == pullResult.getPullStatus()) {

ByteBuffer byteBuffer = ByteBuffer.wrap(pullResultExt.getMessageBinary());

List<MessageExt> msgList = MessageDecoder.decodes(byteBuffer);

List<MessageExt> msgListFilterAgain = msgList;

if (!subscriptionData.getTagsSet().isEmpty() && !subscriptionData.isClassFilterMode()) {

msgListFilterAgain = new ArrayList<MessageExt>(msgList.size());

for (MessageExt msg : msgList) {

if (msg.getTags() != null) {

if (subscriptionData.getTagsSet().contains(msg.getTags())) {

msgListFilterAgain.add(msg);

}

}

}

}

if (this.hasHook()) {

FilterMessageContext filterMessageContext = new FilterMessageContext();

filterMessageContext.setUnitMode(unitMode);

filterMessageContext.setMsgList(msgListFilterAgain);

this.executeHook(filterMessageContext);

}

for (MessageExt msg : msgListFilterAgain) {

String traFlag = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);

if (Boolean.parseBoolean(traFlag)) {

msg.setTransactionId(msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));

}

MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MIN_OFFSET,

Long.toString(pullResult.getMinOffset()));

MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MAX_OFFSET,

Long.toString(pullResult.getMaxOffset()));

}

pullResultExt.setMsgFoundList(msgListFilterAgain);

}

pullResultExt.setMessageBinary(null);

return pullResult;

}

public void updatePullFromWhichNode(final MessageQueue mq, final long brokerId) {

AtomicLong suggest = this.pullFromWhichNodeTable.get(mq);

if (null == suggest) {

this.pullFromWhichNodeTable.put(mq, new AtomicLong(brokerId));

} else {

suggest.set(brokerId);

}

}

public long recalculatePullFromWhichNode(final MessageQueue mq) {

if (this.isConnectBrokerByUser()) {

return this.defaultBrokerId;

}

AtomicLong suggest = this.pullFromWhichNodeTable.get(mq);

if (suggest != null) {

return suggest.get();

}

return MixAll.MASTER_ID;

}

public boolean isConnectBrokerByUser() {

return connectBrokerByUser;

}

//......

}

  • PullAPIWrapper定义了pullFromWhichNodeTable,其key为MessageQueue,value为AtomicLong类型的brokerId
  • processPullResult方法会使用pullResultExt.getSuggestWhichBrokerId()来执行updatePullFromWhichNode;updatePullFromWhichNode会更新指定MessageQueue的brokerId
  • recalculatePullFromWhichNode方法在isConnectBrokerByUser为true时直接返回defaultBrokerId(MixAll.MASTER_ID),否则从pullFromWhichNodeTable取对应的brokerId,取不到则返回MixAll.MASTER_ID

PullResultExt

rocketmq-all-4.6.0-source-release/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullResultExt.java

public class PullResultExt extends PullResult {

private final long suggestWhichBrokerId;

private byte[] messageBinary;

public PullResultExt(PullStatus pullStatus, long nextBeginOffset, long minOffset, long maxOffset,

List<MessageExt> msgFoundList, final long suggestWhichBrokerId, final byte[] messageBinary) {

super(pullStatus, nextBeginOffset, minOffset, maxOffset, msgFoundList);

this.suggestWhichBrokerId = suggestWhichBrokerId;

this.messageBinary = messageBinary;

}

public byte[] getMessageBinary() {

return messageBinary;

}

public void setMessageBinary(byte[] messageBinary) {

this.messageBinary = messageBinary;

}

public long getSuggestWhichBrokerId() {

return suggestWhichBrokerId;

}

}

  • PullResultExt定义了suggestWhichBrokerId属性

processPullResponse

rocketmq-all-4.6.0-source-release/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java

public class MQClientAPIImpl {

private final static InternalLogger log = ClientLogger.getLog();

private static boolean sendSmartMsg =

Boolean.parseBoolean(System.getProperty("org.apache.rocketmq.client.sendSmartMsg", "true"));

//......

private PullResult processPullResponse(

final RemotingCommand response) throws MQBrokerException, RemotingCommandException {

PullStatus pullStatus = PullStatus.NO_NEW_MSG;

switch (response.getCode()) {

case ResponseCode.SUCCESS:

pullStatus = PullStatus.FOUND;

break;

case ResponseCode.PULL_NOT_FOUND:

pullStatus = PullStatus.NO_NEW_MSG;

break;

case ResponseCode.PULL_RETRY_IMMEDIATELY:

pullStatus = PullStatus.NO_MATCHED_MSG;

break;

case ResponseCode.PULL_OFFSET_MOVED:

pullStatus = PullStatus.OFFSET_ILLEGAL;

break;

default:

throw new MQBrokerException(response.getCode(), response.getRemark());

}

PullMessageResponseHeader responseHeader =

(PullMessageResponseHeader) response.decodeCommandCustomHeader(PullMessageResponseHeader.class);

return new PullResultExt(pullStatus, responseHeader.getNextBeginOffset(), responseHeader.getMinOffset(),

responseHeader.getMaxOffset(), null, responseHeader.getSuggestWhichBrokerId(), response.getBody());

}

//......

}

  • processPullResponse方法会使用responseHeader.getSuggestWhichBrokerId()来创建PullResultExt并返回

PullMessageResponseHeader

rocketmq-all-4.6.0-source-release/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageResponseHeader.java

public class PullMessageResponseHeader implements CommandCustomHeader {

@CFNotNull

private Long suggestWhichBrokerId;

@CFNotNull

private Long nextBeginOffset;

@CFNotNull

private Long minOffset;

@CFNotNull

private Long maxOffset;

//......

}

  • PullMessageResponseHeader定义了suggestWhichBrokerId属性

processRequest

rocketmq-all-4.6.0-source-release/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java

public class PullMessageProcessor implements NettyRequestProcessor {

private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);

private final BrokerController brokerController;

private List<ConsumeMessageHook> consumeMessageHookList;

//......

private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend)

throws RemotingCommandException {

//......

if (getMessageResult.isSuggestPullingFromSlave()) {

responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());

} else {

responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);

}

switch (this.brokerController.getMessageStoreConfig().getBrokerRole()) {

case ASYNC_MASTER:

case SYNC_MASTER:

break;

case SLAVE:

if (!this.brokerController.getBrokerConfig().isSlaveReadEnable()) {

response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);

responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);

}

break;

}

if (this.brokerController.getBrokerConfig().isSlaveReadEnable()) {

// consume too slow ,redirect to another machine

if (getMessageResult.isSuggestPullingFromSlave()) {

responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());

}

// consume ok

else {

responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId());

}

} else {

responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);

}

//......

}

//......

}

  • 当getMessageResult.isSuggestPullingFromSlave()则设置responseHeader的suggestWhichBrokerId为subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly(),否则设置为MixAll.MASTER_ID

whichBrokerWhenConsumeSlowly

rocketmq-all-4.6.0-source-release/common/src/main/java/org/apache/rocketmq/common/subscription/SubscriptionGroupConfig.java

public class SubscriptionGroupConfig {

private String groupName;

private boolean consumeEnable = true;

private boolean consumeFromMinEnable = true;

private boolean consumeBroadcastEnable = true;

private int retryQueueNums = 1;

private int retryMaxTimes = 16;

private long brokerId = MixAll.MASTER_ID;

private long whichBrokerWhenConsumeSlowly = 1;

private boolean notifyConsumerIdsChangedEnable = true;

//......

}

  • SubscriptionGroupConfig的whichBrokerWhenConsumeSlowly默认值为1,而MixAll.MASTER_ID则为0

小结

  • PullAPIWrapper定义了pullFromWhichNodeTable,其key为MessageQueue,value为AtomicLong类型的brokerId
  • processPullResult方法会使用pullResultExt.getSuggestWhichBrokerId()来执行updatePullFromWhichNode;updatePullFromWhichNode会更新指定MessageQueue的brokerId
  • recalculatePullFromWhichNode方法在isConnectBrokerByUser为true时直接返回defaultBrokerId(MixAll.MASTER_ID),否则从pullFromWhichNodeTable取对应的brokerId,取不到则返回MixAll.MASTER_ID

doc

  • PullAPIWrapper

以上是 聊聊rocketmq的pullFromWhichNodeTable 的全部内容, 来源链接: utcz.com/z/511492.html

回到顶部