RabbitMQ05消费消息有死信队列

编程

1、引入依赖

参考跟前一篇代码。

2、配置文件添加配置项

参考跟前一篇代码。

3、配置死信队列

import org.springframework.amqp.core.Binding;

import org.springframework.amqp.core.BindingBuilder;

import org.springframework.amqp.core.FanoutExchange;

import org.springframework.amqp.core.Queue;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import java.util.HashMap;

import java.util.Map;

@Configuration

public class MQDeadConfig {

public static final String MOON_FANOUT_EXCHANGE = "moon.fanout.exchange";

public static final String MOON_FANOUT_QUEUE = "moon.fanout.queue";

@Bean

public FanoutExchange moonFanoutExchange() {

return new FanoutExchange(MOON_FANOUT_EXCHANGE);

}

/**

* 死信队列设计思路

* 生产者 --> 消息 --> 交换机 --> 队列 --> 变成死信 --> DLX交换机 -->队列 --> 消费者

*

* 进入死信队列:

* 1. 消息被拒绝,并且requeue = false

* 2. 消息ttl过期

* 3. 队列达到最大的长度

*

* @return

*/

@Bean

public Queue moonFanoutQueue() {

Map<String, Object> args = new HashMap<>(3);

// 声明死信交换器

args.put("x-dead-letter-exchange", MOON_DEAD_FANOUT_EXCHANGE);

// 声明死信路由键

args.put("x-dead-letter-routing-key", "DelayKey");

// 声明队列消息过期时间,如果超过这个时间还没有消费则发送到死信交换机

//args.put("x-message-ttl", 10000);

return new Queue(MOON_FANOUT_QUEUE, true, false, false, args);

}

@Bean

public Binding bindExchangeAndQueue() {

return BindingBuilder.bind(moonFanoutQueue()).to(moonFanoutExchange());

}

/************ 以下是死信交换机和队列配置信息 ************/

// 死信交换机

public static final String MOON_DEAD_FANOUT_EXCHANGE = "moon.dead.fanout.exchange";

// 死信交换机绑定的queue

public static final String MOON_DEAD_FANOUT_QUEUE = "moon.dead.fanout.queue";

@Bean

public FanoutExchange moonDeadFanoutExchange() {

return new FanoutExchange(MOON_DEAD_FANOUT_EXCHANGE);

}

@Bean

public Queue moonDeadFanoutQueue() {

return new Queue(MOON_DEAD_FANOUT_QUEUE);

}

@Bean

public Binding bindDeadExchangeAndQueue() {

return BindingBuilder.bind(moonDeadFanoutQueue()).to(moonDeadFanoutExchange());

}

4、正常消费代码

参考跟前一篇代码。

5、死信队列消费代码

import com.moon.democonsumer.config.MQDeadConfig;

import com.rabbitmq.client.Channel;

import lombok.extern.slf4j.Slf4j;

import org.springframework.amqp.core.Message;

import org.springframework.amqp.rabbit.annotation.RabbitListener;

import org.springframework.stereotype.Component;

/**

* 消费绑定了死信exchange的queue里的数据

*/

@Slf4j

@Component

public class MQDeadConsumer {

@RabbitListener(queues = MQDeadConfig.MOON_DEAD_FANOUT_QUEUE)

public void onMessage(Message message, Channel channel) {

log.info("***** 死信队列收到信息 处理开始 *****");

try {

String body = new String(message.getBody(), "utf-8");

log.info("***** 死信队列收到信息, body={}", body);

// 确认收到消息,false只确认当前consumer一个消息收到,true确认所有consumer获得的消息

channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

} catch (Exception e) {

log.error("死信队列收到信息处理异常", e);

}

log.info("***** 死信队列收到信息 处理结束 *****");

}

}

 

以上是 RabbitMQ05消费消息有死信队列 的全部内容, 来源链接: utcz.com/z/512048.html

回到顶部