redis断连重启后,stream流监听会失效?

公司项目使用了StreamListener进行监听redis stream流消息数据;但每隔十几二十天的就会失效监听不到数据;
初步判断:应该是网络或者连接数等问题导致程序与redis服务断开连接,但问题还是无法定位。
以下是代码,有大佬知道或者遇到过这类问题嘛,还请指教如何解决!

@Bean

public List<Subscription> subscription(RedisConnectionFactory factory){

List<Subscription> resultList = new ArrayList<>();

var options = StreamMessageListenerContainer

.StreamMessageListenerContainerOptions

.builder()

.pollTimeout(Duration.ofSeconds(1))

.build();

for (String redisStreamName : redisStreamNames) {

initStream(redisStreamName,groups[0]);

var listenerContainer = StreamMessageListenerContainer.create(factory,options);

Subscription subscription = listenerContainer.receiveAutoAck(Consumer.from(groups[0], this.getClass().getName()),

StreamOffset.create(redisStreamName, ReadOffset.lastConsumed()), streamListener);

resultList.add(subscription);

listenerContainer.start();

}

return resultList;

}

public class ListenerMessage implements StreamListener<String, MapRecord<String, String, String>> {

RedisCache redisCache;

public ListenerMessage(RedisCache redisCache){

this.redisCache = redisCache;

}

@Override

public void onMessage(MapRecord<String, String, String> entries) {

try{

Map<String, String> map = entries.getValue();

String private_chat = map.get("private_chat");

MessageSave messageSave = JSON.toJavaObject(JSON.parseObject(private_chat),MessageSave.class);

log.info("当前正在处理:{}",messageSave.getMsgtime());

QyTagService qyTagService = SpringUtils.getBean(QyTagService.class);

qyTagService.auditPrivateMessage(messageSave);

//check用于验证key和对应消息是否一直

log.info("stream name :{}, body:{}, check:{}",entries.getStream(), map,(entries.getStream().equals(map.get("name"))));

redisCache.ack(entries.getStream(),"group2",entries.getId().getValue());

redisCache.delField(entries.getStream(),entries.getId().getValue());

}catch (Exception e){

log.error("error message:{}",e.getMessage());

}

}

}

redis配置:

  redis:

expire: 60000 # 过期时间

database: 0 # Redis使用的库

host:

port: 6379 #端口号

timeout: 100000 # 连接超时时间(毫秒)

cache:

type: redis #使用redis做缓存


回答:

使用cancelOnError方法,返回false

//注册

var options = StreamMessageListenerContainer.StreamReadRequest

.builder(StreamOffset.create(redisStreamName, ReadOffset.lastConsumed()))

.cancelOnError(throwable -> {

System.out.println("这是一个错误"+throwable);

// 不能取消

return !(throwable instanceof RuntimeException);

})

.consumer(Consumer.from(groups[0], this.getClass().getName()))

.autoAck(true)

.build();

以上是 redis断连重启后,stream流监听会失效? 的全部内容, 来源链接: utcz.com/p/945424.html

回到顶部