MQTT消费者接收的数据会出现丢失吗?

MQTT这边有个消费端,订阅了对应主题,messageArrived接收订阅的消息。handle方法就是不走数据库直接插入到redis,后期再消费。现在五千个设备,如果每隔1小时同时向MQTT这边上报数据,消费端这边设置了10个线程,这种写法会丢失数据吗?

ExecutorService executorService = Executors.newFixedThreadPool(10);

public synchronized void messageArrived (final String topic, MqttMessage message ) throws Exception

{

final String msg = new String(message.getPayload());

// System.err.println("【MQTT-消费端】接收消息主题 : " + topic);

// System.err.println("【MQTT-消费端】接收消息内容 : " + msg);

executorService.execute(new Runnable() {

public void run() {

handle(topic,msg);

}

});

}


回答:

结果:本地模拟测试了,不会丢失

生产者 :这边作为客户端也连接进mqtt,for循环5000个发送消息至对应主题

for (int x=0;x<5000;x++){

int finalX = x;

new Thread(() -> mqttPushClient.publish("ruby", String.valueOf(finalX))).start();

}

消费者:加了Thread.sleep(1000);模拟插入到redis

ExecutorService executorService = Executors.newFixedThreadPool(10);

private static AtomicInteger num = new AtomicInteger(0);

@Override

public void messageArrived(String topic, MqttMessage message) throws Exception {

log.info("接收消息主题 : " + topic);

log.info("接收消息Qos : " + message.getQos());

String s = new String(message.getPayload());

log.info("接收消息内容 : " + s);

executorService.execute(() -> {

// 处理接收到的消息

handler(s);

});

}

private void handler(String s) {

try {

Thread.sleep(1000);

num.incrementAndGet();

System.err.println(num.get());

} catch (InterruptedException e) {

throw new RuntimeException(e);

}

}

最后一个线程走完正好5000,生产者很快就发完了,消费者睡一秒导致整体这消费速度有点慢


回答:

如果是服务端作为一个mqtt客户端去消费mqtt消息,如果消息收到了,再保存,正常情况下都是不会丢的。

丢消息的情况可能是出在,程序没有订阅mqtt topic的时候,有客户端给这个topic发送了消息,这部份消息,如果程序在这段时间后面再进行订阅,是拿不到的(丢了)。

还有一个,可能要看你订阅topic是,使用的是哪种Qos,使用不好的话,可能也会出现重复或者丢失消息的情况。

以上是 MQTT消费者接收的数据会出现丢失吗? 的全部内容, 来源链接: utcz.com/p/945499.html

回到顶部