RabbitMQ配置回调

1、引入依赖
<dependency>	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
	<groupId>org.projectlombok</groupId>
	<artifactId>lombok</artifactId>
	<version>1.12.4</version>
</dependency>
2、配置参数
spring.rabbitmq.addresses=10.10.60.65spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
#配置exchange是否收到消息回调
spring.rabbitmq.publisher-confirms=true
#配置是否送到queue的消息回调
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.template.mandatory=true
3、配置config,因为用的标准的配置参数,也可以不配置
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
 * 配置 RabbitTemplate
 */
@Configuration
public class RabbitMQConfig {
    @Value("${spring.rabbitmq.addresses}")
    private String addresses;
    @Value("${spring.rabbitmq.username}")
    private String username;
    @Value("${spring.rabbitmq.password}")
    private String password;
    @Value("${spring.rabbitmq.virtual-host}")
    private String virtualHost;
    @Value("${spring.rabbitmq.port:5672}")
    private Integer port;
    /**
     * spring.rabbitmq.template.mandatory
     * 消息发送失败,是否回调给发送者
     *
     * 当其值为true时,交换器无法根据自身的类型和路由键匹配到符合条件的队列,这时rabbitMQ就会通过回调函数将消息返回给生产者。
     * 当其值为false时,如果出现上述情形,则消息会丢失
     */
    @Value("${spring.rabbitmq.template.mandatory:false}")
    private Boolean mandatory;
    /**
     * 是否确认
     */
    @Value("${spring.rabbitmq.publisher-confirms:false}")
    private Boolean publisherConfirms;
    /**
     * 如果mandatorys设置成true,该值也设置 成true
     */
    @Value("${spring.rabbitmq.publisher-returns:false}")
    private Boolean publisherReturns;
    /**
     * RabbitMQConfig中定义connectionFactory中设置属性
     * @return
     */
    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
        cachingConnectionFactory.setAddresses(this.addresses);
        cachingConnectionFactory.setUsername(this.username);
        cachingConnectionFactory.setPassword(this.password);
        cachingConnectionFactory.setVirtualHost(this.virtualHost);
        cachingConnectionFactory.setPort(this.port);
        // 如果消息要设置成回调,则以下的配置必须要设置成true
        cachingConnectionFactory.setPublisherConfirms(this.publisherConfirms);
        cachingConnectionFactory.setPublisherReturns(this.publisherReturns);
        return cachingConnectionFactory;
    }
    /**
     * 同时为了调用SpringBoot集成rabbitMQ提供的发送的方法,我们需要注入rabbitTemplate
     */
    @Bean(name = "rabbitTemplate")
//    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate template = new RabbitTemplate(this.connectionFactory());
        template.setMandatory(mandatory);
        template.setMessageConverter(new Jackson2JsonMessageConverter());
        return template;
    }
}
4、回调代码
import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
/**
 * 消息发送确认
 */
@Slf4j
@Component
public class MQProducerAckConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @PostConstruct
    public void init() {
        // 指定 ConfirmCallback
        rabbitTemplate.setConfirmCallback(this);
        //指定 ReturnCallback
        rabbitTemplate.setReturnCallback(this);
    }
    /**
     * 如果没有exchange,也会收到回调
     *
     * ConfirmCallback 只确认消息是否正确到达 Exchange 中
     * 1. 如果消息没有到exchange,则confirm回调,ack=false
     * 2. 如果消息到达exchange,则confirm回调,ack=true
     *
     * 配置参数
     *  spring.rabbitmq.publisher-confirms=true
     *
     * @param correlationData   如果发送方没有传这个对象,则为null
     * @param ack
     * @param cause
     * @see RabbitTemplate.ConfirmCallback#confirm
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        log.info("------------- 收到消息是否送到 exchange 回调,开始处理 --------------");
        try {
            log.info("消息是否到达exchange.ack(true到达,false没有到达)={},correlationData.id={}",
                    ack, correlationData != null ? correlationData.getId() : null);
        } catch (Exception e) {
            log.error("收到消息是否送到 exchange 回调,后处理出现异常", e);
        }
        log.info("------------- 收到消息是否送到 exchange 回调,处理完成 --------------");
    }
    /**
     * ReturnCallback 消息没有正确到达队列时触发回调,如果正确到达队列不执行
     * 1. exchange到queue成功,则不回调return
     * 2. exchange到queue失败,则回调return
     *
     * 配置参数
     *  spring.rabbitmq.template.mandatory=true
     *
     * 下面这个true或false都会回调,只要spring.rabbitmq.template.mandatory=true
     * spring.rabbitmq.publisher-returns=true
     *
     * @param message
     * @param replyCode
     * @param replyText
     * @param exchange
     * @param routingKey
     * @see RabbitTemplate.ReturnCallback#returnedMessage
     */
    @Override
    public void returnedMessage(Message message, int replyCode,
                                String replyText, String exchange,
                                String routingKey) {
        log.info("------------- 消息没有送到 queue 回调,开始处理 --------------");
        try {
            log.info("消息主体: {}", new String(message.getBody()), "utf-8");
            log.info("应答码: {}", replyCode);
            log.info("描述:{}", replyText);
            log.info("消息使用的交换器 exchange : {}", exchange);
            log.info("消息使用的路由键 routing : {}", routingKey);
        } catch (Exception e) {
            log.error("消息没有送到 queue 回调,后处理出现异常", e);
        }
        log.info("------------- 消息没有送到 queue 回调,处理完成 --------------");
    }
}
5、发送消息测试代码
import com.example.demo.entity.User;import com.rabbitmq.tools.json.JSONUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.UUID;
@Slf4j
@RestController
public class MQController {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @PostMapping("/send")
    public User getUser() {
        User user = new User();
        user.setId(1L);
        try {
            CorrelationData correlationData = new CorrelationData();
            correlationData.setId(UUID.randomUUID().toString());
            rabbitTemplate.convertAndSend("moon.fanout.exchange", "", "{"id":1}", correlationData);
        } catch (Exception e) {
            log.error("error", e);
        }
        return user;
    }
}
以上是 RabbitMQ配置回调 的全部内容, 来源链接: utcz.com/z/512001.html








