消息队列07SpringBoot搭建Rabbitmq消息回退
gralde
compile "org.springframework.cloud:spring-cloud-starter-bus-amqp"
application.properties
spring.rabbitmq.host=127.0.0.1spring.rabbitmq.port=5672spring.rabbitmq.prod.port=5674spring.rabbitmq.username=adminspring.rabbitmq.password=adminspring.rabbitmq.virtual-host=/spring.rabbitmq.publisher-confirms=true# 发送回调
spring.rabbitmq.publisher-returns=true
# 消费手动确认
spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring.rabbitmq.listener.direct.acknowledge-mode=manual
#消费者最小数量
spring.rabbitmq.listener.simple.concurrency=1
#消费之最大数量
spring.rabbitmq.listener.simple.max-concurrency=10
#在单个请求中处理的消息个数,他应该大于等于事务数量(unack的最大数量)
spring.rabbitmq.listener.simple.prefetch=1
代码
生产者
@Componentpublic class DirectSender implements RabbitTemplate.ReturnCallback{@Autowired
private RabbitTemplate rabbitTemplate;
public void send() throws Exception {
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
String context = "hi, i am DirectSender A message ";
this.rabbitTemplate.convertAndSend("DirectExchange", "Direct-C", context,correlationData);
//true 找不到投递的队列退回,false直接丢弃
this.rabbitTemplate.setMandatory(true);
this.rabbitTemplate.setReturnCallback(this);
//设置超时时间
boolean isAck = correlationData.getFuture().get(1, TimeUnit.MINUTES).isAck();
if(isAck){
System.out.println("消息已经投递:"+correlationData.getId());
}else{
System.out.println("消息没有被投递:"+correlationData.getId());
}
}
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("回退业务处理逻辑");
System.out.println("returnedMessage:"+message.toString() );
System.out.println("returnedMessage:"+message.getMessageProperties().getAppId() );
System.out.println("returnedMessage:"+message.getMessageProperties().getMessageId() );
System.out.println("returnedMessage:"+message.getMessageProperties().getReceivedExchange() );
System.out.println("returnedMessage:"+message.getMessageProperties().getReceivedRoutingKey() );
System.out.println("returnedMessage:"+message.getMessageProperties().getDeliveryTag() );
System.out.println("returnedMessage:"+message.getMessageProperties().getHeaders() );
System.out.println("returnedMessage:"+replyText );
System.out.println("returnedMessage:"+exchange );
System.out.println("returnedMessage:"+routingKey );
}
}
交换机Exchange
交换机 DirectExchange 没有绑定一个 Direct-C 的队列,所有我们的生产者是无法投递到对应的queue的,当无法给消息找到对应的队列就会执行我们的回退代码。
结果发现,replyText 显示NO_ROUTE,并且执行了我们的回退代码。 这里显示消息成功投递是因为他找到了对应的交换机投递成功过去了。
以上是 消息队列07SpringBoot搭建Rabbitmq消息回退 的全部内容, 来源链接: utcz.com/z/516576.html