RabbitMQ配置回调
1、引入依赖
<dependency> <groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.12.4</version>
</dependency>
2、配置参数
spring.rabbitmq.addresses=10.10.60.65spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
#配置exchange是否收到消息回调
spring.rabbitmq.publisher-confirms=true
#配置是否送到queue的消息回调
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.template.mandatory=true
3、配置config,因为用的标准的配置参数,也可以不配置
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* 配置 RabbitTemplate
*/
@Configuration
public class RabbitMQConfig {
@Value("${spring.rabbitmq.addresses}")
private String addresses;
@Value("${spring.rabbitmq.username}")
private String username;
@Value("${spring.rabbitmq.password}")
private String password;
@Value("${spring.rabbitmq.virtual-host}")
private String virtualHost;
@Value("${spring.rabbitmq.port:5672}")
private Integer port;
/**
* spring.rabbitmq.template.mandatory
* 消息发送失败,是否回调给发送者
*
* 当其值为true时,交换器无法根据自身的类型和路由键匹配到符合条件的队列,这时rabbitMQ就会通过回调函数将消息返回给生产者。
* 当其值为false时,如果出现上述情形,则消息会丢失
*/
@Value("${spring.rabbitmq.template.mandatory:false}")
private Boolean mandatory;
/**
* 是否确认
*/
@Value("${spring.rabbitmq.publisher-confirms:false}")
private Boolean publisherConfirms;
/**
* 如果mandatorys设置成true,该值也设置 成true
*/
@Value("${spring.rabbitmq.publisher-returns:false}")
private Boolean publisherReturns;
/**
* RabbitMQConfig中定义connectionFactory中设置属性
* @return
*/
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
cachingConnectionFactory.setAddresses(this.addresses);
cachingConnectionFactory.setUsername(this.username);
cachingConnectionFactory.setPassword(this.password);
cachingConnectionFactory.setVirtualHost(this.virtualHost);
cachingConnectionFactory.setPort(this.port);
// 如果消息要设置成回调,则以下的配置必须要设置成true
cachingConnectionFactory.setPublisherConfirms(this.publisherConfirms);
cachingConnectionFactory.setPublisherReturns(this.publisherReturns);
return cachingConnectionFactory;
}
/**
* 同时为了调用SpringBoot集成rabbitMQ提供的发送的方法,我们需要注入rabbitTemplate
*/
@Bean(name = "rabbitTemplate")
// @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(this.connectionFactory());
template.setMandatory(mandatory);
template.setMessageConverter(new Jackson2JsonMessageConverter());
return template;
}
}
4、回调代码
import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
/**
* 消息发送确认
*/
@Slf4j
@Component
public class MQProducerAckConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init() {
// 指定 ConfirmCallback
rabbitTemplate.setConfirmCallback(this);
//指定 ReturnCallback
rabbitTemplate.setReturnCallback(this);
}
/**
* 如果没有exchange,也会收到回调
*
* ConfirmCallback 只确认消息是否正确到达 Exchange 中
* 1. 如果消息没有到exchange,则confirm回调,ack=false
* 2. 如果消息到达exchange,则confirm回调,ack=true
*
* 配置参数
* spring.rabbitmq.publisher-confirms=true
*
* @param correlationData 如果发送方没有传这个对象,则为null
* @param ack
* @param cause
* @see RabbitTemplate.ConfirmCallback#confirm
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
log.info("------------- 收到消息是否送到 exchange 回调,开始处理 --------------");
try {
log.info("消息是否到达exchange.ack(true到达,false没有到达)={},correlationData.id={}",
ack, correlationData != null ? correlationData.getId() : null);
} catch (Exception e) {
log.error("收到消息是否送到 exchange 回调,后处理出现异常", e);
}
log.info("------------- 收到消息是否送到 exchange 回调,处理完成 --------------");
}
/**
* ReturnCallback 消息没有正确到达队列时触发回调,如果正确到达队列不执行
* 1. exchange到queue成功,则不回调return
* 2. exchange到queue失败,则回调return
*
* 配置参数
* spring.rabbitmq.template.mandatory=true
*
* 下面这个true或false都会回调,只要spring.rabbitmq.template.mandatory=true
* spring.rabbitmq.publisher-returns=true
*
* @param message
* @param replyCode
* @param replyText
* @param exchange
* @param routingKey
* @see RabbitTemplate.ReturnCallback#returnedMessage
*/
@Override
public void returnedMessage(Message message, int replyCode,
String replyText, String exchange,
String routingKey) {
log.info("------------- 消息没有送到 queue 回调,开始处理 --------------");
try {
log.info("消息主体: {}", new String(message.getBody()), "utf-8");
log.info("应答码: {}", replyCode);
log.info("描述:{}", replyText);
log.info("消息使用的交换器 exchange : {}", exchange);
log.info("消息使用的路由键 routing : {}", routingKey);
} catch (Exception e) {
log.error("消息没有送到 queue 回调,后处理出现异常", e);
}
log.info("------------- 消息没有送到 queue 回调,处理完成 --------------");
}
}
5、发送消息测试代码
import com.example.demo.entity.User;import com.rabbitmq.tools.json.JSONUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.UUID;
@Slf4j
@RestController
public class MQController {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostMapping("/send")
public User getUser() {
User user = new User();
user.setId(1L);
try {
CorrelationData correlationData = new CorrelationData();
correlationData.setId(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend("moon.fanout.exchange", "", "{"id":1}", correlationData);
} catch (Exception e) {
log.error("error", e);
}
return user;
}
}
以上是 RabbitMQ配置回调 的全部内容, 来源链接: utcz.com/z/512001.html