RabbitMQ04消费消息

编程

1、引入依赖

<dependency>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter-web</artifactId>

</dependency>

<dependency>

<groupId>org.projectlombok</groupId>

<artifactId>lombok</artifactId>

<optional>true</optional>

</dependency>

<dependency>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter-amqp</artifactId>

</dependency>

<dependency>

<groupId>com.alibaba</groupId>

<artifactId>fastjson</artifactId>

<version>1.2.62</version>

</dependency>

2、配置参数

spring.rabbitmq.addresses=10.10.60.65

spring.rabbitmq.port=5672

spring.rabbitmq.virtual-host=/

spring.rabbitmq.username=guest

spring.rabbitmq.password=guest

# 消费手动确认

spring.rabbitmq.listener.simple.acknowledge-mode=manual

3、配置消费队列

import org.springframework.amqp.core.Binding;

import org.springframework.amqp.core.BindingBuilder;

import org.springframework.amqp.core.FanoutExchange;

import org.springframework.amqp.core.Queue;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

@Configuration

public class MQConfig {

public static final String MOON_FANOUT_EXCHANGE = "moon.fanout.exchange";

public static final String MOON_FANOUT_QUEUE = "moon.fanout.queue";

@Bean

public FanoutExchange moonFanoutExchange() {

return new FanoutExchange(MOON_FANOUT_EXCHANGE);

}

@Bean

public Queue moonFanoutQueue() {

return new Queue(MOON_FANOUT_QUEUE);

}

@Bean

public Binding bindExchangeAndQueue() {

return BindingBuilder.bind(moonFanoutQueue()).to(moonFanoutExchange());

}

}

4、具体消费代码

模拟消费消息处理失败,如果第一次失败放回队列尾部,再次处理失败,则放入死信队列(如果配置,下个小结添加死信队列配置)

import com.alibaba.fastjson.JSON;

import com.moon.democonsumer.config.MQConfig;

import com.moon.democonsumer.entity.User;

import com.rabbitmq.client.Channel;

import lombok.extern.slf4j.Slf4j;

import org.springframework.amqp.core.Message;

import org.springframework.amqp.rabbit.annotation.RabbitListener;

import org.springframework.stereotype.Component;

@Slf4j

@Component

public class MQConsumer {

/**

* 抛出异常的数据,会放到消息的队尾

* @param message

* @param channel

* @throws Exception

*/

@RabbitListener(queues = MQConfig.MOON_FANOUT_QUEUE)

//public void onMessage(User user, Message message, Channel channel) throws Exception {

public void onMessage(Message message, Channel channel) throws Exception {

log.info("***** MQConsumer onMessage 开始 *****");

User user = null;

try {

String body = new String(message.getBody(), "utf-8");

log.info("***** MQConsumer onMessage 接收到消息, body={}", body);

user = JSON.parseObject(body, User.class);

log.info("***** MQConsumer onMessage 接收到消息 name = {}", user.getName());

// 模拟执行任务

Thread.sleep(1000);

if (user.getId() == 1) {

throw new RuntimeException("id=1抛出异常");

}

// 确认收到消息,false只确认当前consumer一个消息收到,true确认所有consumer获得的消息

channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

} catch (Exception e) {

//e.printStackTrace();

if (message.getMessageProperties().getRedelivered()) {

log.info("***** MQConsumer onMessage 消息已重复处理失败,拒绝再次接收 name = {}", user.getName());

// 拒绝消息,requeue=false 表示不再重新入队,如果配置了死信队列则进入死信队列

channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);

} else {

log.info("***** MQConsumer onMessage 消息即将再次返回队列处理 name = {}", user.getName());

// requeue为是否重新回到队列,true重新入队

channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);

}

}

log.info("***** MQConsumer onMessage 结束 *****");

}

}

 

以上是 RabbitMQ04消费消息 的全部内容, 来源链接: utcz.com/z/512049.html

回到顶部