消息队列04SpringBoot搭建Rabbitmq与Topic匹配模式
gralde
compile "org.springframework.cloud:spring-cloud-starter-bus-amqp"
application.properties
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.prod.port=5674
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
spring.rabbitmq.virtual-host=/
spring.rabbitmq.publisher-confirms=true
# 发送回调
spring.rabbitmq.publisher-returns=true
# 消费手动确认
spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring.rabbitmq.listener.direct.acknowledge-mode=manual
#消费者最小数量
spring.rabbitmq.listener.simple.concurrency=1
#消费之最大数量
spring.rabbitmq.listener.simple.max-concurrency=10
#在单个请求中处理的消息个数,他应该大于等于事务数量(unack的最大数量)
spring.rabbitmq.listener.simple.prefetch=1
代码
消费者
@Componentpublic class TopicReceiver {@RabbitListener(bindings = @QueueBinding(//绑定队列和交换机
value = @Queue(value = "Topic-A"),//创建queuqe
exchange = @Exchange(value = "TopicExchange",type = "topic"),//创建交换机
key = "Topic.*"//.*是只匹配一个,如Topic.a
))
@RabbitHandler
public void TopicReceiverA(Message message, Channel channel) throws Exception{
System.out.println("TopicReceiverA:"+message.toString() );
System.out.println("TopicReceiverA:"+message.getMessageProperties().getAppId() );
System.out.println("TopicReceiverA:"+message.getMessageProperties().getMessageId() );
System.out.println("TopicReceiverA:"+message.getMessageProperties().getReceivedExchange() );
System.out.println("TopicReceiverA:"+message.getMessageProperties().getReceivedRoutingKey() );
System.out.println("TopicReceiverA:"+message.getMessageProperties().getDeliveryTag() );
System.out.println("TopicReceiverA:"+message.getMessageProperties().getHeaders() );
//应答消息队列
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
@RabbitListener(bindings = @QueueBinding(//绑定队列和交换机
value = @Queue(value = "Topic-B"),//创建queuqe
exchange = @Exchange(value = "TopicExchange",type = "direct"),//创建交换机
key = "Topic.#"//.#是匹配全部,如Topic.a或者Topic.a.b
))
@RabbitHandler
public void TopicReceiverB(Message message, Channel channel) throws Exception{
System.out.println("TopicReceiverB:"+message.toString() );
System.out.println("TopicReceiverB:"+message.getMessageProperties().getAppId() );
System.out.println("TopicReceiverB:"+message.getMessageProperties().getMessageId() );
System.out.println("TopicReceiverB:"+message.getMessageProperties().getReceivedExchange() );
System.out.println("TopicReceiverB:"+message.getMessageProperties().getReceivedRoutingKey() );
System.out.println("TopicReceiverB:"+message.getMessageProperties().getDeliveryTag() );
System.out.println("TopicReceiverB:"+message.getMessageProperties().getHeaders() );
//应答消息队列
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
}
生产者
@Componentpublic class TopicSender {@Autowired
private RabbitTemplate rabbitTemplate;
public void send() throws Exception {
String context = "hi, i am TopicSender Topic.B message ";
this.rabbitTemplate.convertAndSend("TopicExchange", "Topic.B", context);
context = "hi, i am TopicSender Topic.B.A message ";
this.rabbitTemplate.convertAndSend("TopicExchange", "Topic.B.A", context);
}
}
交换机Exchange
队列Queues
调用生产者代码
结果Routing key 为 Topic.*只能消费Topic.B ,Topic.# 能消费 Topic.B和Topic.B.A
以上是 消息队列04SpringBoot搭建Rabbitmq与Topic匹配模式 的全部内容, 来源链接: utcz.com/z/516546.html