消息负载
public void run() { log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
this.waitForRunning(waitInterval);
this.mqClientFactory.doRebalance();
}
log.info(this.getServiceName() + " service end");
}
遍历已经注册的消费者
public void doRebalance() { for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
MQConsumerInner impl = entry.getValue();
if (impl != null) {
try {
impl.doRebalance();
} catch (Throwable e) {
log.error("doRebalance exception", e);
}
}
}
}
RebalanceImpl中实现rebalance ,对于每一个消费实例 ,都可以对消息做负载均衡
public void doRebalance(final boolean isOrder) { Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
if (subTable != null) {
for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
final String topic = entry.getKey();
try {
this.rebalanceByTopic(topic, isOrder);
} catch (Throwable e) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("rebalanceByTopic Exception", e);
}
}
}
}
this.truncateMessageQueueNotMyTopic();
}
负载策略
1 ) AllocateMessageQueueAveragely :平均分配,推荐指数为5 颗星。
举例来说,如果现在有8 个消息消费队列ql , q2 , q3 , 俐, q5 , 币,q7 ,币,有3 个消费者
cl,c2 , c3 ,那么根据该负载算法,消息队列分配如下:
c 1: q l ,q2,q3
c2:q4 ,q5,q6
c3:q7 ,q8
2 ) AllocateMessageQueueAveragelyByCircle :平均轮询分配,推荐指数为5 颗星。
举例来说,如果现在有8 个消息消费队列咐,q2 ,币, 俐, q5 , 币,q7 ,币, 有3 个消费者cl , c2,
c3 ,那么根据该负载算法,消息队列分配如下:
cl : ql,q4, q7
c2 : q2,q5,q8
c3: q3,q6
3 ) AllocateMessageQueueConsistentHash : 一致性hash 。不推荐使用,因为消息队列负载信息不容易跟
4 ) AllocateMessageQueueByConfig :根据配置,为每一个消费者配置固定的消息队列。
5 ) AllocateMessageQueueByMachineRoom :根据Broker 部署机房名,对每个消费者负
责不同的Broker 上的队列。
问题l : PullRequest 对象在什么时候创建并加入到pullR巳questQueue 中以便唤醒
PullMessageService 线程。
RebalanceService 线程每隔2 0s 对消费者订阅的主题进行一次队列重新分配, 每一次分配都会获取主题的所有队列、从Broker 服务器
实时查询当前该主题该消费组内消费者列
表, 对新分配的消息队列会创建对应的PullRequest 对象。在一个JVM 进程中,同一个消
费组同一个队列只会存在一个PullRequest 对象。
问题2 : 集群内多个消费者是如何负载主题下的多个消费队列,并且如果有新的消费者
加入时,消息队列又会如何重新分布。
由于
每次进行队列重新负载时会从Broker 实时查询出当前消费组内所有消费者,并且
对消息队列、消费者列表进行排序,这样新加入的消费者就会在队列重新分布时分配到消
费队列从而消费消息。
如果由于由新的消费者加入或原先的消费者出现若机导致原先分给消费者的队列
在负载之后分配给别的消费者,那么在应用程序的角度来看的话,消息会被重复消费。
消息确认
retryQueueNums : 重试队列个数,默认为1 ,每一个Broker 上一个重试队列。
以上是 消息负载 的全部内容, 来源链接: utcz.com/z/515491.html