消息队列03SpringBoot搭建Rabbitmq与Direct直接投放模式

编程

gralde

compile "org.springframework.cloud:spring-cloud-starter-bus-amqp"

application.properties


spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.prod.port=5674
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
spring.rabbitmq.virtual-host=/
spring.rabbitmq.publisher-confirms=true
# 发送回调
spring.rabbitmq.publisher-returns=true
# 消费手动确认
spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring.rabbitmq.listener.direct.acknowledge-mode=manual
#消费者最小数量
spring.rabbitmq.listener.simple.concurrency=1
#消费之最大数量
spring.rabbitmq.listener.simple.max-concurrency=10
#在单个请求中处理的消息个数,他应该大于等于事务数量(unack的最大数量)
spring.rabbitmq.listener.simple.prefetch=1

代码

消费者

@Component

public class DirectReceiver {

@RabbitListener(bindings = @QueueBinding(//绑定队列和交换机

value = @Queue(value = "Direct-A"),//创建queuqe

exchange = @Exchange(value = "DirectExchange",type = "direct"),//创建交换机

key = "Direct-A"//路由规则,routingkey如果是Direct-A就发到这个监听

))

@RabbitHandler

public void directReceiverA(Message message, Channel channel) throws Exception{

System.out.println("directReceiverA:"+message.toString() );

System.out.println("directReceiverA:"+message.getMessageProperties().getAppId() );

System.out.println("directReceiverA:"+message.getMessageProperties().getMessageId() );

System.out.println("directReceiverA:"+message.getMessageProperties().getReceivedExchange() );

System.out.println("directReceiverA:"+message.getMessageProperties().getReceivedRoutingKey() );

System.out.println("directReceiverA:"+message.getMessageProperties().getDeliveryTag() );

System.out.println("directReceiverA:"+message.getMessageProperties().getHeaders() );

//应答消息队列

channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);

}

@RabbitListener(bindings = @QueueBinding(//绑定队列和交换机

value = @Queue(value = "Direct-B"),//创建queuqe

exchange = @Exchange(value = "DirectExchange",type = "direct"),//创建交换机

key = "Direct-B"

))

@RabbitHandler

public void directReceiverB(Message message, Channel channel) throws Exception{

System.out.println("directReceiverB:"+message.toString() );

System.out.println("directReceiverB:"+message.getMessageProperties().getAppId() );

System.out.println("directReceiverB:"+message.getMessageProperties().getMessageId() );

System.out.println("directReceiverB:"+message.getMessageProperties().getReceivedExchange() );

System.out.println("directReceiverB:"+message.getMessageProperties().getReceivedRoutingKey() );

System.out.println("directReceiverB:"+message.getMessageProperties().getDeliveryTag() );

System.out.println("directReceiverB:"+message.getMessageProperties().getHeaders() );

//应答消息队列

channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);

}

}

生产者

@Component

public class DirectSender {

@Autowired

private RabbitTemplate rabbitTemplate;

public void send() throws Exception {

String context = "hi, i am DirectSender B message ";

this.rabbitTemplate.convertAndSend("DirectExchange", "Direct-B", context);

context = "hi, i am DirectSender A message ";

this.rabbitTemplate.convertAndSend("DirectExchange", "Direct-A", context);

}

}

查看交换机

看到我们声明的交换机已经生成,并且已经绑定好队列和路由key

 

查看队列

2个队列也已经生成,并且绑定到相应的交换机

调用生产者代码

可以看到 结果不像Fanout订阅模式一样2个队列都消费,而是对应绑定routing key的队列 消费。

以上是 消息队列03SpringBoot搭建Rabbitmq与Direct直接投放模式 的全部内容, 来源链接: utcz.com/z/516550.html

回到顶部