redis断连重启后,stream流监听会失效?
公司项目使用了StreamListener进行监听redis stream流消息数据;但每隔十几二十天的就会失效监听不到数据;
初步判断:应该是网络或者连接数等问题导致程序与redis服务断开连接,但问题还是无法定位。
以下是代码,有大佬知道或者遇到过这类问题嘛,还请指教如何解决!
@Bean public List<Subscription> subscription(RedisConnectionFactory factory){
List<Subscription> resultList = new ArrayList<>();
var options = StreamMessageListenerContainer
.StreamMessageListenerContainerOptions
.builder()
.pollTimeout(Duration.ofSeconds(1))
.build();
for (String redisStreamName : redisStreamNames) {
initStream(redisStreamName,groups[0]);
var listenerContainer = StreamMessageListenerContainer.create(factory,options);
Subscription subscription = listenerContainer.receiveAutoAck(Consumer.from(groups[0], this.getClass().getName()),
StreamOffset.create(redisStreamName, ReadOffset.lastConsumed()), streamListener);
resultList.add(subscription);
listenerContainer.start();
}
return resultList;
}
public class ListenerMessage implements StreamListener<String, MapRecord<String, String, String>> { RedisCache redisCache;
public ListenerMessage(RedisCache redisCache){
this.redisCache = redisCache;
}
@Override
public void onMessage(MapRecord<String, String, String> entries) {
try{
Map<String, String> map = entries.getValue();
String private_chat = map.get("private_chat");
MessageSave messageSave = JSON.toJavaObject(JSON.parseObject(private_chat),MessageSave.class);
log.info("当前正在处理:{}",messageSave.getMsgtime());
QyTagService qyTagService = SpringUtils.getBean(QyTagService.class);
qyTagService.auditPrivateMessage(messageSave);
//check用于验证key和对应消息是否一直
log.info("stream name :{}, body:{}, check:{}",entries.getStream(), map,(entries.getStream().equals(map.get("name"))));
redisCache.ack(entries.getStream(),"group2",entries.getId().getValue());
redisCache.delField(entries.getStream(),entries.getId().getValue());
}catch (Exception e){
log.error("error message:{}",e.getMessage());
}
}
}
redis配置:
redis: expire: 60000 # 过期时间
database: 0 # Redis使用的库
host:
port: 6379 #端口号
timeout: 100000 # 连接超时时间(毫秒)
cache:
type: redis #使用redis做缓存
回答:
使用cancelOnError方法,返回false
//注册 var options = StreamMessageListenerContainer.StreamReadRequest
.builder(StreamOffset.create(redisStreamName, ReadOffset.lastConsumed()))
.cancelOnError(throwable -> {
System.out.println("这是一个错误"+throwable);
// 不能取消
return !(throwable instanceof RuntimeException);
})
.consumer(Consumer.from(groups[0], this.getClass().getName()))
.autoAck(true)
.build();
以上是 redis断连重启后,stream流监听会失效? 的全部内容, 来源链接: utcz.com/p/945424.html