消息队列05SpringBoot搭建Rabbitmq与死信队列
gralde
compile "org.springframework.cloud:spring-cloud-starter-bus-amqp"
application.properties
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.prod.port=5674
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
spring.rabbitmq.virtual-host=/
spring.rabbitmq.publisher-confirms=true
# 发送回调
spring.rabbitmq.publisher-returns=true
# 消费手动确认
spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring.rabbitmq.listener.direct.acknowledge-mode=manual
#消费者最小数量
spring.rabbitmq.listener.simple.concurrency=1
#消费之最大数量
spring.rabbitmq.listener.simple.max-concurrency=10
#在单个请求中处理的消息个数,他应该大于等于事务数量(unack的最大数量)
spring.rabbitmq.listener.simple.prefetch=1
代码
死信队列的消费者
@Componentpublic class DeadReceiver {@RabbitListener(bindings = @QueueBinding(//绑定队列和交换机
value = @Queue(value = "DeadQueue"),//创建queuqe
exchange = @Exchange(value = "DeadExchange",type = "fanout")
))
@RabbitHandler
public void DeadQueue(Message message, Channel channel) throws Exception{
System.out.println("DeadQueue:"+message.toString() );
System.out.println("DeadQueue:"+message.getMessageProperties().getAppId() );
System.out.println("DeadQueue:"+message.getMessageProperties().getMessageId() );
System.out.println("DeadQueue:"+message.getMessageProperties().getReceivedExchange() );
System.out.println("DeadQueue:"+message.getMessageProperties().getReceivedRoutingKey() );
System.out.println("DeadQueue:"+message.getMessageProperties().getDeliveryTag() );
System.out.println("DeadQueue:"+message.getMessageProperties().getHeaders() );
//应答消息队列
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
}
绑定死信队列的消费者
注:x-message-ttl 要声明type 为long 不然会报错。感觉这个操作有点迷-_-!
x-message-ttl 设置这个队列消息的超时时间为3秒
x-dead-letter-exchange 超时消息转发到 DeadExchange 这个交换机上
x-dead-letter-routing-key 超时的消息转发到 DeadExchange 这个交换机上 的DeadQueue 路由上。
@Componentpublic class TopicReceiver {@RabbitListener(bindings = @QueueBinding(//绑定队列和交换机
value = @Queue(value = "Topic-A",
arguments = {//绑定死信队列
@Argument(name = "x-dead-letter-exchange",value = "DeadExchange"),
@Argument(name = "x-dead-letter-routing-key",value = "DeadQueue"),
@Argument(name = "x-message-ttl",value = "3000",type = "long")
}),//创建queuqe
exchange = @Exchange(value = "TopicExchange",type = "topic"),//创建交换机
key = "Topic.*"//.*是只匹配一个,如Topic.a
))
@RabbitHandler
public void TopicReceiverA(Message message, Channel channel) throws Exception{
System.out.println("TopicReceiverA:"+message.toString() );
System.out.println("TopicReceiverA:"+message.getMessageProperties().getAppId() );
System.out.println("TopicReceiverA:"+message.getMessageProperties().getMessageId() );
System.out.println("TopicReceiverA:"+message.getMessageProperties().getReceivedExchange() );
System.out.println("TopicReceiverA:"+message.getMessageProperties().getReceivedRoutingKey() );
System.out.println("TopicReceiverA:"+message.getMessageProperties().getDeliveryTag() );
System.out.println("TopicReceiverA:"+message.getMessageProperties().getHeaders() );
//应答消息队列
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
@RabbitListener(bindings = @QueueBinding(//绑定队列和交换机
value = @Queue(value = "Topic-B"),//创建queuqe
exchange = @Exchange(value = "TopicExchange",type = "topic"),//创建交换机
key = "Topic.#"//.#是匹配全部,如Topic.a或者Topic.a.b
))
@RabbitHandler
public void TopicReceiverB(Message message, Channel channel) throws Exception{
System.out.println("TopicReceiverB:"+message.toString() );
System.out.println("TopicReceiverB:"+message.getMessageProperties().getAppId() );
System.out.println("TopicReceiverB:"+message.getMessageProperties().getMessageId() );
System.out.println("TopicReceiverB:"+message.getMessageProperties().getReceivedExchange() );
System.out.println("TopicReceiverB:"+message.getMessageProperties().getReceivedRoutingKey() );
System.out.println("TopicReceiverB:"+message.getMessageProperties().getDeliveryTag() );
System.out.println("TopicReceiverB:"+message.getMessageProperties().getHeaders() );
//应答消息队列
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
}
生产者
@Componentpublic class TopicSender {@Autowired
private RabbitTemplate rabbitTemplate;
public void send() throws Exception {
String context = "hi, i am TopicSender Topic.B message ";
this.rabbitTemplate.convertAndSend("TopicExchange", "Topic.B", context);
context = "hi, i am TopicSender Topic.B.A message ";
this.rabbitTemplate.convertAndSend("TopicExchange", "Topic.B.A", context);
}
}
这里Topic.B的路由会转发到 Topic.*的队列中去,这时我们注销掉监听的 Topic.*的消费方法避免消息被消费
3秒后 消息过期,可以看到消息转发到 我们指定的死信队列中去
控制台死信队列 消费了 Topic.*的队列转发过来的消息
以上是 消息队列05SpringBoot搭建Rabbitmq与死信队列 的全部内容, 来源链接: utcz.com/z/516569.html