RabbitMQ配置回调

编程

1、引入依赖

<dependency>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter-web</artifactId>

</dependency>

<dependency>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter-amqp</artifactId>

</dependency>

<dependency>

<groupId>org.projectlombok</groupId>

<artifactId>lombok</artifactId>

<version>1.12.4</version>

</dependency>

2、配置参数

spring.rabbitmq.addresses=10.10.60.65

spring.rabbitmq.port=5672

spring.rabbitmq.virtual-host=/

spring.rabbitmq.username=guest

spring.rabbitmq.password=guest

#配置exchange是否收到消息回调

spring.rabbitmq.publisher-confirms=true

#配置是否送到queue的消息回调

spring.rabbitmq.publisher-returns=true

spring.rabbitmq.template.mandatory=true

3、配置config,因为用的标准的配置参数,也可以不配置

import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;

import org.springframework.amqp.rabbit.connection.ConnectionFactory;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;

import org.springframework.beans.factory.annotation.Value;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

/**

* 配置 RabbitTemplate

*/

@Configuration

public class RabbitMQConfig {

@Value("${spring.rabbitmq.addresses}")

private String addresses;

@Value("${spring.rabbitmq.username}")

private String username;

@Value("${spring.rabbitmq.password}")

private String password;

@Value("${spring.rabbitmq.virtual-host}")

private String virtualHost;

@Value("${spring.rabbitmq.port:5672}")

private Integer port;

/**

* spring.rabbitmq.template.mandatory

* 消息发送失败,是否回调给发送者

*

* 当其值为true时,交换器无法根据自身的类型和路由键匹配到符合条件的队列,这时rabbitMQ就会通过回调函数将消息返回给生产者。

* 当其值为false时,如果出现上述情形,则消息会丢失

*/

@Value("${spring.rabbitmq.template.mandatory:false}")

private Boolean mandatory;

/**

* 是否确认

*/

@Value("${spring.rabbitmq.publisher-confirms:false}")

private Boolean publisherConfirms;

/**

* 如果mandatorys设置成true,该值也设置 成true

*/

@Value("${spring.rabbitmq.publisher-returns:false}")

private Boolean publisherReturns;

/**

* RabbitMQConfig中定义connectionFactory中设置属性

* @return

*/

@Bean

public ConnectionFactory connectionFactory() {

CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();

cachingConnectionFactory.setAddresses(this.addresses);

cachingConnectionFactory.setUsername(this.username);

cachingConnectionFactory.setPassword(this.password);

cachingConnectionFactory.setVirtualHost(this.virtualHost);

cachingConnectionFactory.setPort(this.port);

// 如果消息要设置成回调,则以下的配置必须要设置成true

cachingConnectionFactory.setPublisherConfirms(this.publisherConfirms);

cachingConnectionFactory.setPublisherReturns(this.publisherReturns);

return cachingConnectionFactory;

}

/**

* 同时为了调用SpringBoot集成rabbitMQ提供的发送的方法,我们需要注入rabbitTemplate

*/

@Bean(name = "rabbitTemplate")

// @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)

public RabbitTemplate rabbitTemplate() {

RabbitTemplate template = new RabbitTemplate(this.connectionFactory());

template.setMandatory(mandatory);

template.setMessageConverter(new Jackson2JsonMessageConverter());

return template;

}

}

4、回调代码

import lombok.extern.slf4j.Slf4j;

import org.springframework.amqp.core.Message;

import org.springframework.amqp.rabbit.connection.CorrelationData;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

/**

* 消息发送确认

*/

@Slf4j

@Component

public class MQProducerAckConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {

@Autowired

private RabbitTemplate rabbitTemplate;

@PostConstruct

public void init() {

// 指定 ConfirmCallback

rabbitTemplate.setConfirmCallback(this);

//指定 ReturnCallback

rabbitTemplate.setReturnCallback(this);

}

/**

* 如果没有exchange,也会收到回调

*

* ConfirmCallback 只确认消息是否正确到达 Exchange 中

* 1. 如果消息没有到exchange,则confirm回调,ack=false

* 2. 如果消息到达exchange,则confirm回调,ack=true

*

* 配置参数

* spring.rabbitmq.publisher-confirms=true

*

* @param correlationData 如果发送方没有传这个对象,则为null

* @param ack

* @param cause

* @see RabbitTemplate.ConfirmCallback#confirm

*/

@Override

public void confirm(CorrelationData correlationData, boolean ack, String cause) {

log.info("------------- 收到消息是否送到 exchange 回调,开始处理 --------------");

try {

log.info("消息是否到达exchange.ack(true到达,false没有到达)={},correlationData.id={}",

ack, correlationData != null ? correlationData.getId() : null);

} catch (Exception e) {

log.error("收到消息是否送到 exchange 回调,后处理出现异常", e);

}

log.info("------------- 收到消息是否送到 exchange 回调,处理完成 --------------");

}

/**

* ReturnCallback 消息没有正确到达队列时触发回调,如果正确到达队列不执行

* 1. exchange到queue成功,则不回调return

* 2. exchange到queue失败,则回调return

*

* 配置参数

* spring.rabbitmq.template.mandatory=true

*

* 下面这个true或false都会回调,只要spring.rabbitmq.template.mandatory=true

* spring.rabbitmq.publisher-returns=true

*

* @param message

* @param replyCode

* @param replyText

* @param exchange

* @param routingKey

* @see RabbitTemplate.ReturnCallback#returnedMessage

*/

@Override

public void returnedMessage(Message message, int replyCode,

String replyText, String exchange,

String routingKey) {

log.info("------------- 消息没有送到 queue 回调,开始处理 --------------");

try {

log.info("消息主体: {}", new String(message.getBody()), "utf-8");

log.info("应答码: {}", replyCode);

log.info("描述:{}", replyText);

log.info("消息使用的交换器 exchange : {}", exchange);

log.info("消息使用的路由键 routing : {}", routingKey);

} catch (Exception e) {

log.error("消息没有送到 queue 回调,后处理出现异常", e);

}

log.info("------------- 消息没有送到 queue 回调,处理完成 --------------");

}

}

5、发送消息测试代码

import com.example.demo.entity.User;

import com.rabbitmq.tools.json.JSONUtil;

import lombok.extern.slf4j.Slf4j;

import org.springframework.amqp.rabbit.connection.CorrelationData;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.web.bind.annotation.PostMapping;

import org.springframework.web.bind.annotation.RestController;

import java.util.UUID;

@Slf4j

@RestController

public class MQController {

@Autowired

private RabbitTemplate rabbitTemplate;

@PostMapping("/send")

public User getUser() {

User user = new User();

user.setId(1L);

try {

CorrelationData correlationData = new CorrelationData();

correlationData.setId(UUID.randomUUID().toString());

rabbitTemplate.convertAndSend("moon.fanout.exchange", "", "{"id":1}", correlationData);

} catch (Exception e) {

log.error("error", e);

}

return user;

}

}

 

以上是 RabbitMQ配置回调 的全部内容, 来源链接: utcz.com/z/512001.html

回到顶部