rabbitmq生产者重发机制
欢迎访问我的个人博客http://home.znfang.ml
说明
重发机制是在mq中很重要的一部分,消费者可以通过ack或者nack就可以很轻松实现消息重新入队列,然后进行重发,但是生产者也有可能在网络动荡的情况下,投递不成功,这个时候就需要消息重发。虽然rabbitmq提供了事务功能,但是如果开启事务,就太影响性能了。本文参考littersmall的博文,利用spring-boot的定时功能,在本地缓存,从而实现对失败的消息进行重发。完整的项目代码请参考本人spring-boot练习代码
实现过程
pom依赖
在pom文件中引入rabbitmq依赖
<dependency> <groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
yaml文件
在yaml文件中添加以下配置,一定要开启手动确认和手动返回
spring: rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
publisher-confirms: true
publisher-returns: true
msgWithTime
为了进行消息重发,需要重新定义一个结构,来保存带时间和特殊标记的消息,同时需要记录下来之前是用什么类进行分发的,也就是要记录下来之前分发到哪个队列。具体结构如下
@Getter@Setter
@NoArgsConstructor
@Component
public class MsgWithTime {
private long id;
private long time;
private Object msg;
private MsgSender sender;
public MsgWithTime(long id, long time, Object msg) {
this.id = id;
this.time = time;
this.msg = msg;
}
}
RetryStruct
重试结构体。用map缓存了每次消息发送记录,在后续的confirm阶段如果成功了就删除,如果失败了就进行定时任务重发
@Getter@Setter
@Component
public class RetryStruct {
private AtomicLong id = new AtomicLong();
private Map<Long, MsgWithTime> map = new ConcurrentHashMap<>();
public long generateId() {
return id.incrementAndGet();
}
public void add(MsgWithTime msg) {
map.putIfAbsent(msg.getId(), msg);
}
public void del(long id) {
map.remove(id);
}
}
RetrySchedule
定时任务重发。默认认为在固定的时间段内,如果还存在在map中的消息,没有发送成功,需要进行重发,并且尝试重发几次后,如果仍然失败,则不会进行直接删除,并记录日志
@Slf4j@Component
public class RetrySchedule {
@Autowired
private RetryStruct retryStruct;
@Scheduled(fixedDelay = MqConstant.RETRY_INTERVAL)
public void retry() {
long now = System.currentTimeMillis();
for (Map.Entry<Long,MsgWithTime> entry : retryStruct.getMap().entrySet()) {
MsgWithTime msg = entry.getValue();
if (null != msg) {
if (msg.getTime() + MqConstant.RETRY_TIME * MqConstant.VALID_TIME < now) {
log.info("send message {} failed after 3 min ", msg);
retryStruct.del(entry.getKey());
} else if (msg.getTime() + MqConstant.VALID_TIME < now) {
boolean res = msg.getSender().send(msg);
if (!res) {
log.info("retry send message failed {}", msg);
}
}
}
}
}
}
Sender
消息发送类,主要是为了将消息发送到某个队列中,如果会存在多个队列,那么需要定义一个接口,然后全都实现这个接口就行了。这里就直接列出实现类。其中需要利用CorrelationData,发送之前要将信息加入到map中。
/** * 消息发送接口实现
*/
@Slf4j
@Service
public class MsgSenderImpl implements MsgSender {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private RetryStruct retryStruct;
@Override
public boolean send(Object msg) {
long id = retryStruct.generateId();
long time = System.currentTimeMillis();
return send(new MsgWithTime(id, time, msg));
}
@Override
public boolean send(MsgWithTime msg) {
msg.setSender(this);
retryStruct.add(msg);
CorrelationData data = new CorrelationData(String.valueOf(msg.getId()));
try {
rabbitTemplate.convertAndSend(MqConstant.EXCHANGE, MqConstant.ROUTE_KEY, msg.getMsg().toString(), data);
}catch (Exception e) {
log.error("send failed: " + e.getMessage());
return false;
}
return true;
}
}
rabbitTemplate
这里需要交代一下return和confirm的返回时间。confirm是消息到达exchange的时候返回。return是从exchange提交到queue的时候返回。因此,如果有return的消息,则直接进行重发就可以了,而confirm失败才会定时重发,两个是不一样的
添加rabbitTemplate的配置代码。需要实现returncallback和confirmcallback接口,需要开启手动确认和手动返回,如果确认结果成功,则删除map内容,如果失败,则记录日志,后续定时任务会自动重发的。,具体代码如下
/** * rabbitmq template配置
*/
@Slf4j
@Configuration
public class RabbitConfig implements RabbitTemplate.ReturnCallback, RabbitTemplate.ConfirmCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private RetryStruct retryStruct;
@PostConstruct
public void init() {
rabbitTemplate.setMandatory(true);
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnCallback(this);
}
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
if (b){
String id = correlationData.getId();
log.info("send success " + id);
if (id != null){
retryStruct.del(Long.valueOf(id));
}
} else {
log.error("send failed: " + correlationData.getId());
}
}
@Override
public void returnedMessage(Message message, int i, String s, String s1, String s2) {
try {
Thread.sleep(1000);
}catch (InterruptedException e){
log.error("sleep error " + e.getMessage());
}
log.error("send failed: " + i + " " + s + " " + s1 + " " + s2);
rabbitTemplate.send(message);
}
}
结束语
spring-boot和rabbitmq都是很好用的,后续会写一些关于spring-boot的例子
以上是 rabbitmq生产者重发机制 的全部内容, 来源链接: utcz.com/z/517595.html