消息队列05SpringBoot搭建Rabbitmq与死信队列

编程

gralde

compile "org.springframework.cloud:spring-cloud-starter-bus-amqp"

application.properties


spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.prod.port=5674
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
spring.rabbitmq.virtual-host=/
spring.rabbitmq.publisher-confirms=true
# 发送回调
spring.rabbitmq.publisher-returns=true
# 消费手动确认
spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring.rabbitmq.listener.direct.acknowledge-mode=manual
#消费者最小数量
spring.rabbitmq.listener.simple.concurrency=1
#消费之最大数量
spring.rabbitmq.listener.simple.max-concurrency=10
#在单个请求中处理的消息个数,他应该大于等于事务数量(unack的最大数量)
spring.rabbitmq.listener.simple.prefetch=1

代码

死信队列的消费者

@Component

public class DeadReceiver {

@RabbitListener(bindings = @QueueBinding(//绑定队列和交换机

value = @Queue(value = "DeadQueue"),//创建queuqe

exchange = @Exchange(value = "DeadExchange",type = "fanout")

))

@RabbitHandler

public void DeadQueue(Message message, Channel channel) throws Exception{

System.out.println("DeadQueue:"+message.toString() );

System.out.println("DeadQueue:"+message.getMessageProperties().getAppId() );

System.out.println("DeadQueue:"+message.getMessageProperties().getMessageId() );

System.out.println("DeadQueue:"+message.getMessageProperties().getReceivedExchange() );

System.out.println("DeadQueue:"+message.getMessageProperties().getReceivedRoutingKey() );

System.out.println("DeadQueue:"+message.getMessageProperties().getDeliveryTag() );

System.out.println("DeadQueue:"+message.getMessageProperties().getHeaders() );

//应答消息队列

channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);

}

}

绑定死信队列的消费者

注:x-message-ttl 要声明type 为long 不然会报错。感觉这个操作有点迷-_-!

x-message-ttl 设置这个队列消息的超时时间为3秒

x-dead-letter-exchange 超时消息转发到 DeadExchange 这个交换机上

x-dead-letter-routing-key 超时的消息转发到 DeadExchange 这个交换机上 的DeadQueue 路由上。

@Component

public class TopicReceiver {

@RabbitListener(bindings = @QueueBinding(//绑定队列和交换机

value = @Queue(value = "Topic-A",

arguments = {//绑定死信队列

@Argument(name = "x-dead-letter-exchange",value = "DeadExchange"),

@Argument(name = "x-dead-letter-routing-key",value = "DeadQueue"),

@Argument(name = "x-message-ttl",value = "3000",type = "long")

}),//创建queuqe

exchange = @Exchange(value = "TopicExchange",type = "topic"),//创建交换机

key = "Topic.*"//.*是只匹配一个,如Topic.a

))

@RabbitHandler

public void TopicReceiverA(Message message, Channel channel) throws Exception{

System.out.println("TopicReceiverA:"+message.toString() );

System.out.println("TopicReceiverA:"+message.getMessageProperties().getAppId() );

System.out.println("TopicReceiverA:"+message.getMessageProperties().getMessageId() );

System.out.println("TopicReceiverA:"+message.getMessageProperties().getReceivedExchange() );

System.out.println("TopicReceiverA:"+message.getMessageProperties().getReceivedRoutingKey() );

System.out.println("TopicReceiverA:"+message.getMessageProperties().getDeliveryTag() );

System.out.println("TopicReceiverA:"+message.getMessageProperties().getHeaders() );

//应答消息队列

channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);

}

@RabbitListener(bindings = @QueueBinding(//绑定队列和交换机

value = @Queue(value = "Topic-B"),//创建queuqe

exchange = @Exchange(value = "TopicExchange",type = "topic"),//创建交换机

key = "Topic.#"//.#是匹配全部,如Topic.a或者Topic.a.b

))

@RabbitHandler

public void TopicReceiverB(Message message, Channel channel) throws Exception{

System.out.println("TopicReceiverB:"+message.toString() );

System.out.println("TopicReceiverB:"+message.getMessageProperties().getAppId() );

System.out.println("TopicReceiverB:"+message.getMessageProperties().getMessageId() );

System.out.println("TopicReceiverB:"+message.getMessageProperties().getReceivedExchange() );

System.out.println("TopicReceiverB:"+message.getMessageProperties().getReceivedRoutingKey() );

System.out.println("TopicReceiverB:"+message.getMessageProperties().getDeliveryTag() );

System.out.println("TopicReceiverB:"+message.getMessageProperties().getHeaders() );

//应答消息队列

channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);

}

}

生产者

@Component

public class TopicSender {

@Autowired

private RabbitTemplate rabbitTemplate;

public void send() throws Exception {

String context = "hi, i am TopicSender Topic.B message ";

this.rabbitTemplate.convertAndSend("TopicExchange", "Topic.B", context);

context = "hi, i am TopicSender Topic.B.A message ";

this.rabbitTemplate.convertAndSend("TopicExchange", "Topic.B.A", context);

}

}

这里Topic.B的路由会转发到 Topic.*的队列中去,这时我们注销掉监听的 Topic.*的消费方法避免消息被消费

3秒后 消息过期,可以看到消息转发到 我们指定的死信队列中去

控制台死信队列 消费了 Topic.*的队列转发过来的消息

以上是 消息队列05SpringBoot搭建Rabbitmq与死信队列 的全部内容, 来源链接: utcz.com/z/516569.html

回到顶部