RabbitMQ04消费消息
1、引入依赖
<dependency> <groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.62</version>
</dependency>
2、配置参数
spring.rabbitmq.addresses=10.10.60.65spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
# 消费手动确认
spring.rabbitmq.listener.simple.acknowledge-mode=manual
3、配置消费队列
import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MQConfig {
public static final String MOON_FANOUT_EXCHANGE = "moon.fanout.exchange";
public static final String MOON_FANOUT_QUEUE = "moon.fanout.queue";
@Bean
public FanoutExchange moonFanoutExchange() {
return new FanoutExchange(MOON_FANOUT_EXCHANGE);
}
@Bean
public Queue moonFanoutQueue() {
return new Queue(MOON_FANOUT_QUEUE);
}
@Bean
public Binding bindExchangeAndQueue() {
return BindingBuilder.bind(moonFanoutQueue()).to(moonFanoutExchange());
}
}
4、具体消费代码
模拟消费消息处理失败,如果第一次失败放回队列尾部,再次处理失败,则放入死信队列(如果配置,下个小结添加死信队列配置)
import com.alibaba.fastjson.JSON;import com.moon.democonsumer.config.MQConfig;
import com.moon.democonsumer.entity.User;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class MQConsumer {
/**
* 抛出异常的数据,会放到消息的队尾
* @param message
* @param channel
* @throws Exception
*/
@RabbitListener(queues = MQConfig.MOON_FANOUT_QUEUE)
//public void onMessage(User user, Message message, Channel channel) throws Exception {
public void onMessage(Message message, Channel channel) throws Exception {
log.info("***** MQConsumer onMessage 开始 *****");
User user = null;
try {
String body = new String(message.getBody(), "utf-8");
log.info("***** MQConsumer onMessage 接收到消息, body={}", body);
user = JSON.parseObject(body, User.class);
log.info("***** MQConsumer onMessage 接收到消息 name = {}", user.getName());
// 模拟执行任务
Thread.sleep(1000);
if (user.getId() == 1) {
throw new RuntimeException("id=1抛出异常");
}
// 确认收到消息,false只确认当前consumer一个消息收到,true确认所有consumer获得的消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
//e.printStackTrace();
if (message.getMessageProperties().getRedelivered()) {
log.info("***** MQConsumer onMessage 消息已重复处理失败,拒绝再次接收 name = {}", user.getName());
// 拒绝消息,requeue=false 表示不再重新入队,如果配置了死信队列则进入死信队列
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
} else {
log.info("***** MQConsumer onMessage 消息即将再次返回队列处理 name = {}", user.getName());
// requeue为是否重新回到队列,true重新入队
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}
log.info("***** MQConsumer onMessage 结束 *****");
}
}
以上是 RabbitMQ04消费消息 的全部内容, 来源链接: utcz.com/z/512049.html