如何在Spring Boot中实现循环队列使用者

我在Spring构建了一个消息驱动的服务,该服务将在集群中运行,并且需要以循环方式从RabbitMQ队列中提取消息。该实现当前正在以先到先得的方式将消息从队列中拉出,从而导致某些服务器被备份而其他服务器则处于空闲状态。

当前的QueueConsumerConfiguration.java如下所示:

@Configuration

public class QueueConsumerConfiguration extends RabbitMqConfiguration {

private Logger LOG = LoggerFactory.getLogger(QueueConsumerConfiguration.class);

private static final int DEFAULT_CONSUMERS=2;

@Value("${eventservice.inbound}")

protected String inboudEventQueue;

@Value("${eventservice.consumers}")

protected int queueConsumers;

@Autowired

private EventHandler eventtHandler;

@Bean

public RabbitTemplate rabbitTemplate() {

RabbitTemplate template = new RabbitTemplate(connectionFactory());

template.setRoutingKey(this.inboudEventQueue);

template.setQueue(this.inboudEventQueue);

template.setMessageConverter(jsonMessageConverter());

return template;

}

@Bean

public Queue inboudEventQueue() {

return new Queue(this.inboudEventQueue);

}

@Bean

public SimpleMessageListenerContainer listenerContainer() {

SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();

container.setConnectionFactory(connectionFactory());

container.setQueueNames(this.inboudEventQueue);

container.setMessageListener(messageListenerAdapter());

if (this.queueConsumers > 0) {

LOG.info("Starting queue consumers:" + this.queueConsumers );

container.setMaxConcurrentConsumers(this.queueConsumers);

container.setConcurrentConsumers(this.queueConsumers);

} else {

LOG.info("Starting default queue consumers:" + DEFAULT_CONSUMERS);

container.setMaxConcurrentConsumers(DEFAULT_CONSUMERS);

container.setConcurrentConsumers(DEFAULT_CONSUMERS);

}

return container;

}

@Bean

public MessageListenerAdapter messageListenerAdapter() {

return new MessageListenerAdapter(this.eventtHandler, jsonMessageConverter());

}

}

只是添加一个案例

container.setChannelTransacted(true);

配置?

回答:

RabbitMQ对待所有消费者都是相同的-

它知道一个容器中的多个消费者之间没有区别。一个消费者在多个容器中(例如在不同的主机上)。从Rabbit的角度来看,每个人都是消费者。

如果要更好地控制服务器亲缘关系,则需要使用多个队列,每个容器侦听自己的队列。

然后,您可以在生产者端控制分发-例如,使用主题或直接交换和特定的路由键将消息路由到特定的队列。

这将生产者与消费者紧密地联系在一起(他必须知道有多少人)。

或者,您可以让生产者使用路由键rk.0, rk.1, ..., rk.29(反复地,当达到30时重置为0)。

然后,您可以使用多个绑定来绑定使用者队列-

消费者1将rk.0转换为rk.9,2将rk.10转换为rk.19,依此类推。

如果随后决定增加使用者的数量,则只需适当地重构绑定即可重新分配工作。

容器将按需扩展到maxConcurrentConsumers,但实际上,仅在整个容器空闲一段时间后才进行缩减。

以上是 如何在Spring Boot中实现循环队列使用者 的全部内容, 来源链接: utcz.com/qa/403972.html

回到顶部