使用PAHO订阅和阅读MQTT消息

我正在使用Paho发送和接收mqtt消息。到目前为止,发送消息一直没有问题,我正在使用mosquitto接收消息。

现在,我想使用Java客户端读取消息,并且注意到关于接收消息的文档越来越少。

我实现了MqttCallback接口,但仍然无法弄清楚如何阅读已订阅的主题的消息。

到目前为止,这是我的源代码,我可以使用mosquitto_sub读取消息。

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;

import org.eclipse.paho.client.mqttv3.MqttCallback;

import org.eclipse.paho.client.mqttv3.MqttClient;

import org.eclipse.paho.client.mqttv3.MqttException;

import org.eclipse.paho.client.mqttv3.MqttMessage;

public class PahoDemo implements MqttCallback {

MqttClient client;

MqttClient subClient;

public PahoDemo() {

}

public static void main(String[] args) {

new PahoDemo().doDemo();

}

public void doDemo() {

try {

client = new MqttClient("tcp://192.168.118.11:1883", "Sending");

subClient = new MqttClient("tcp://192.168.118.11:1883",

"Subscribing");

client.connect();

subClient.connect();

subClient.subscribe("foo");

MqttMessage message = new MqttMessage();

message.setPayload("A single message from my computer fff"

.getBytes());

client.publish("foo", message);

client.disconnect();

client.close();

subClient.disconnect();

subClient.close();

} catch (MqttException e) {

e.printStackTrace();

}

}

@Override

public void connectionLost(Throwable cause) {

// TODO Auto-generated method stub

}

@Override

public void messageArrived(String topic, MqttMessage message)

throws Exception {

System.out.println(message);

}

@Override

public void deliveryComplete(IMqttDeliveryToken token) {

// TODO Auto-generated method stub

}

}

回答:

您将在代理有时间将消息发送回之前关闭客户端。

另外,您不需要2个客户端实例,只需一个实例就可以发送和接收。

我已经稍微修改了您的代码,现在它将继续运行并接收消息,直到您将其杀死。

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;

import org.eclipse.paho.client.mqttv3.MqttCallback;

import org.eclipse.paho.client.mqttv3.MqttClient;

import org.eclipse.paho.client.mqttv3.MqttException;

import org.eclipse.paho.client.mqttv3.MqttMessage;

public class PahoDemo implements MqttCallback {

MqttClient client;

public PahoDemo() {

}

public static void main(String[] args) {

new PahoDemo().doDemo();

}

public void doDemo() {

try {

client = new MqttClient("tcp://192.168.118.11:1883", "Sending");

client.connect();

client.setCallback(this);

client.subscribe("foo");

MqttMessage message = new MqttMessage();

message.setPayload("A single message from my computer fff"

.getBytes());

client.publish("foo", message);

} catch (MqttException e) {

e.printStackTrace();

}

}

@Override

public void connectionLost(Throwable cause) {

// TODO Auto-generated method stub

}

@Override

public void messageArrived(String topic, MqttMessage message)

throws Exception {

System.out.println(message);

}

@Override

public void deliveryComplete(IMqttDeliveryToken token) {

// TODO Auto-generated method stub

}

}

编辑:添加了丢失的内容 client.setCallback(this)

以上是 使用PAHO订阅和阅读MQTT消息 的全部内容, 来源链接: utcz.com/qa/418755.html

回到顶部