Java使用easymqtt4j快速开发工业级mqtt企业级应用
easymqtt4j , netty +mqtt +subscriber+ publisher +broker+cluster server for java
easymqtt4j特点:
1、spring integration 集成模式,自由灵活。
2、完全支持mqtt 3.1、3.1.1国际标准协议,可配置。
3、发布、订阅接口简单&统一Gateway。
4、完全支持event事件EventGateway,灵活自由控制。
5、支持handleEvent、connectionLost、 messageArrived、deliveryComplete。
6、支持preSend、postSend、afterSendCompletion。
7、支持preReceive、postReceive、afterReceiveCompletion。
使用方法&步骤:
1、引用jar
2、实现 MqttSubscriberGateway消息队列订阅 接口
3、实现 MqttEventGateway 事件 接口
4、MqttPublisherGateway消息发送 接口 ( 使用请参考 MqttScheduleTask 消息定时发送)
项目开源地址1: https://github.com/zengfr/easymqtt4j
项目开源地址2:https://gitee.com/zengfr/easymqtt4j
测试代码 : https://github.com/zengfr/easymqtt4j/tree/master/easymqtt4j-test/src/main/java/com/zengfr/easymqtt4j/test
<dependency> <artifactId>easymqtt4j-client</artifactId> <groupId>com.zengfr.easymqtt4j</groupId> <version>${project.version}</version></dependency>
#spring.mqtt.host.username=adminspring.mqtt.host.password=password#spring.mqtt.host.uris=ws://api.easylink.io:1983spring.mqtt.host.uris=tcp://147.14.141.51:1883#spring.mqtt.subscriber.id=subscriberId123spring.mqtt.subscriber.topics=topic/#,testtopic/#spring.mqtt.subscriber.completionTimeout=3000#spring.mqtt.publisher.id=publisherId456spring.mqtt.publisher.defaulttopic=topic0spring.mqtt.publisher.completionTimeout=3000
import com.zengfr.easymqtt4j.client.geteway.MqttSubscriberGateway;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.messaging.Message;import org.springframework.stereotype.Component;/**
* Created by zengfr on 2020/5/12.
*/
@Component
public class MqttSubscriberGatewayImpl implements MqttSubscriberGateway {
static Logger logger = LoggerFactory.getLogger(MqttSubscriberGatewayImpl.class);
@Override
public boolean handlerMqttMessage(Message<?> msg, String topic, String qos, String id, String timestamp) {
logger.info(String.format("收到:%s", msg));
return false;
}
}
import com.zengfr.easymqtt4j.client.geteway.MqttPublisherGateway;import com.zengfr.easymqtt4j.client.util.MqttUtil;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.context.annotation.Configuration;import org.springframework.scheduling.annotation.EnableAsync;import org.springframework.scheduling.annotation.EnableScheduling;import org.springframework.scheduling.annotation.Scheduled;/**
* Created by zengfr on 2020/5/12.
*/
@Configuration
@EnableScheduling
@EnableAsync
public class MqttScheduleTask {
private static Logger logger = LoggerFactory.getLogger(MqttScheduleTask.class);
@Autowired
private MqttPublisherGateway publisherGateway;
@Scheduled(fixedRate = 1000*1,initialDelay = 1000*5)
public void sendMqtt1() {
logger.info("发送开始");
String d= MqttUtil.getNowString();
publisherGateway.publish("testtopic/0", 0, "hello 10 " + d);
logger.info("发送结束");
}
@Scheduled(fixedRate = 1000*3,initialDelay = 1000*5)
public void sendMqtt2() throws InterruptedException {
int count = 11;
for (int i = 0; i < count; i++) {
logger.info("发送开始");
String d= MqttUtil.getNowString();
publisherGateway.publish("topic/"+i, i%3, "hello 00 " + d);
publisherGateway.publish("testtopic/"+i, i%3, "hello 10 " + d);
logger.info("发送结束");
}
}
}
import com.zengfr.easymqtt4j.client.geteway.MqttEventGateway;import com.zengfr.easymqtt4j.client.util.MqttMsgUtil;import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;import org.eclipse.paho.client.mqttv3.MqttException;import org.eclipse.paho.client.mqttv3.MqttMessage;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.context.ApplicationEvent;import org.springframework.integration.mqtt.event.MqttIntegrationEvent;import org.springframework.messaging.Message;import org.springframework.messaging.MessageChannel;import org.springframework.stereotype.Component;/**
* Created by zengfr on 2020/5/12.
*/
@Component
public class MqttEventGatewayImpl implements MqttEventGateway {
static Logger logger = LoggerFactory.getLogger(MqttEventGatewayImpl.class);
@Override
public void handleEvent(MqttIntegrationEvent event) {
logger.info("event: {}", event);
}
@Override
public void handleEvent(ApplicationEvent event) {
logger.info("event: {}", event);
}
@Override
public void connectionLost(String clientId, Throwable cause) {
logger.info("connectionLost: {} {}", clientId, cause);
}
@Override
public void messageArrived(String clientId, String topic, MqttMessage message) {
logger.info("messageArrived: {} {} {}", clientId, topic, message);
}
@Override
public void deliveryComplete(String clientId, IMqttDeliveryToken token) {
try {
logger.info("deliveryComplete: {} {}", clientId, MqttMsgUtil.tokenToString(token));
} catch (MqttException e) {
logger.error("error", e);
}
}
@Override
public void preSend(String clientId, Message<?> message, MessageChannel channel) {
}
@Override
public void postSend(String clientId, Message<?> message, MessageChannel channel, boolean sent) {
}
@Override
public void afterSendCompletion(String clientId, Message<?> message, MessageChannel channel, boolean sent, Exception ex) {
}
@Override
public boolean preReceive(String clientId, MessageChannel channel) {
return false;
}
@Override
public Message<?> postReceive(String clientId, Message<?> message, MessageChannel channel) {
return null;
}
@Override
public void afterReceiveCompletion(String clientId, Message<?> message, MessageChannel channel, Exception ex) {
}
}
以上是 Java使用easymqtt4j快速开发工业级mqtt企业级应用 的全部内容, 来源链接: utcz.com/z/516491.html