聊聊rocketmq的MessageQueueSelector

编程

本文主要研究一下rocketmq的MessageQueueSelector

MessageQueueSelector

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/producer/MessageQueueSelector.java

public interface MessageQueueSelector {

MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);

}

  • MessageQueueSelector接口定义了select方法,返回MessageQueue;它有几个实现类,分别是SelectMessageQueueByHash、SelectMessageQueueByRandom、SelectMessageQueueByMachineRoom

SelectMessageQueueByHash

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/producer/selector/SelectMessageQueueByHash.java

public class SelectMessageQueueByHash implements MessageQueueSelector {

@Override

public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {

int value = arg.hashCode();

if (value < 0) {

value = Math.abs(value);

}

value = value % mqs.size();

return mqs.get(value);

}

}

  • SelectMessageQueueByHash实现了MessageQueueSelector接口,其select方法取arg参数的hashcode的绝对值,然后对mqs.size()取余,得到目标队列在mqs的下标

SelectMessageQueueByRandom

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/producer/selector/SelectMessageQueueByRandom.java

public class SelectMessageQueueByRandom implements MessageQueueSelector {

private Random random = new Random(System.currentTimeMillis());

@Override

public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {

int value = random.nextInt(mqs.size());

return mqs.get(value);

}

}

  • SelectMessageQueueByRandom实现了MessageQueueSelector接口,其select方法直接根据mqs.size()随机一个值作为目标队列在mqs的下标

SelectMessageQueueByMachineRoom

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/producer/selector/SelectMessageQueueByMachineRoom.java

public class SelectMessageQueueByMachineRoom implements MessageQueueSelector {

private Set<String> consumeridcs;

@Override

public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {

return null;

}

public Set<String> getConsumeridcs() {

return consumeridcs;

}

public void setConsumeridcs(Set<String> consumeridcs) {

this.consumeridcs = consumeridcs;

}

}

  • SelectMessageQueueByMachineRoom实现了MessageQueueSelector接口,其select方法目前返回null

RocketMQTemplate

rocketmq-spring-boot-2.0.3-sources.jar!/org/apache/rocketmq/spring/core/RocketMQTemplate.java

public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> implements InitializingBean, DisposableBean {

private static final Logger log = LoggerFactory.getLogger(RocketMQTemplate.class);

private DefaultMQProducer producer;

private ObjectMapper objectMapper;

private String charset = "UTF-8";

private MessageQueueSelector messageQueueSelector = new SelectMessageQueueByHash();

private final Map<String, TransactionMQProducer> cache = new ConcurrentHashMap<>(); //only put TransactionMQProducer by now!!!

public DefaultMQProducer getProducer() {

return producer;

}

public void setProducer(DefaultMQProducer producer) {

this.producer = producer;

}

public ObjectMapper getObjectMapper() {

return objectMapper;

}

public void setObjectMapper(ObjectMapper objectMapper) {

this.objectMapper = objectMapper;

}

public String getCharset() {

return charset;

}

public void setCharset(String charset) {

this.charset = charset;

}

public MessageQueueSelector getMessageQueueSelector() {

return messageQueueSelector;

}

public void setMessageQueueSelector(MessageQueueSelector messageQueueSelector) {

this.messageQueueSelector = messageQueueSelector;

}

//......

}

  • RocketMQTemplate默认创建的MessageQueueSelector是SelectMessageQueueByHash

小结

MessageQueueSelector接口定义了select方法,返回MessageQueue;它有几个实现类,分别是SelectMessageQueueByHash、SelectMessageQueueByRandom、SelectMessageQueueByMachineRoom;RocketMQTemplate默认创建的MessageQueueSelector是SelectMessageQueueByHash

doc

  • MessageQueueSelector

以上是 聊聊rocketmq的MessageQueueSelector 的全部内容, 来源链接: utcz.com/z/510350.html

回到顶部