MQTT消费者接收的数据会出现丢失吗?
MQTT这边有个消费端,订阅了对应主题,messageArrived接收订阅的消息。handle方法就是不走数据库直接插入到redis,后期再消费。现在五千个设备,如果每隔1小时同时向MQTT这边上报数据,消费端这边设置了10个线程,这种写法会丢失数据吗?
ExecutorService executorService = Executors.newFixedThreadPool(10);public synchronized void messageArrived (final String topic, MqttMessage message ) throws Exception
{
final String msg = new String(message.getPayload());
// System.err.println("【MQTT-消费端】接收消息主题 : " + topic);
// System.err.println("【MQTT-消费端】接收消息内容 : " + msg);
executorService.execute(new Runnable() {
public void run() {
handle(topic,msg);
}
});
}
回答:
结果:本地模拟测试了,不会丢失
生产者 :这边作为客户端也连接进mqtt,for循环5000个发送消息至对应主题
for (int x=0;x<5000;x++){ int finalX = x;
new Thread(() -> mqttPushClient.publish("ruby", String.valueOf(finalX))).start();
}
消费者:加了Thread.sleep(1000);模拟插入到redis
ExecutorService executorService = Executors.newFixedThreadPool(10); private static AtomicInteger num = new AtomicInteger(0);
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
log.info("接收消息主题 : " + topic);
log.info("接收消息Qos : " + message.getQos());
String s = new String(message.getPayload());
log.info("接收消息内容 : " + s);
executorService.execute(() -> {
// 处理接收到的消息
handler(s);
});
}
private void handler(String s) {
try {
Thread.sleep(1000);
num.incrementAndGet();
System.err.println(num.get());
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
最后一个线程走完正好5000,生产者很快就发完了,消费者睡一秒导致整体这消费速度有点慢
回答:
如果是服务端作为一个mqtt客户端去消费mqtt消息,如果消息收到了,再保存,正常情况下都是不会丢的。
丢消息的情况可能是出在,程序没有订阅mqtt topic的时候,有客户端给这个topic发送了消息,这部份消息,如果程序在这段时间后面再进行订阅,是拿不到的(丢了)。
还有一个,可能要看你订阅topic是,使用的是哪种Qos,使用不好的话,可能也会出现重复或者丢失消息的情况。
以上是 MQTT消费者接收的数据会出现丢失吗? 的全部内容, 来源链接: utcz.com/p/945499.html