Java使用easymqtt4j快速开发工业级mqtt企业级应用

编程

easymqtt4j , netty +mqtt +subscriber+ publisher +broker+cluster server for java

easymqtt4j特点:

1、spring integration 集成模式,自由灵活。

2、完全支持mqtt 3.1、3.1.1国际标准协议,可配置。

3、发布、订阅接口简单&统一Gateway。

4、完全支持event事件EventGateway,灵活自由控制。

5、支持handleEvent、connectionLost、 messageArrived、deliveryComplete。

6、支持preSend、postSend、afterSendCompletion。

7、支持preReceive、postReceive、afterReceiveCompletion。

使用方法&步骤:

1、引用jar 

2、实现 MqttSubscriberGateway消息队列订阅 接口 

3、实现 MqttEventGateway 事件 接口 

4、MqttPublisherGateway消息发送 接口 ( 使用请参考 MqttScheduleTask 消息定时发送)

 项目开源地址1: https://github.com/zengfr/easymqtt4j

项目开源地址2:https://gitee.com/zengfr/easymqtt4j

测试代码 :   https://github.com/zengfr/easymqtt4j/tree/master/easymqtt4j-test/src/main/java/com/zengfr/easymqtt4j/test

<dependency>

<artifactId>easymqtt4j-client</artifactId>

<groupId>com.zengfr.easymqtt4j</groupId>

<version>${project.version}</version>

</dependency>

 

#

spring.mqtt.host.username=admin

spring.mqtt.host.password=password

#spring.mqtt.host.uris=ws://api.easylink.io:1983

spring.mqtt.host.uris=tcp://147.14.141.51:1883

#

spring.mqtt.subscriber.id=subscriberId123

spring.mqtt.subscriber.topics=topic/#,testtopic/#

spring.mqtt.subscriber.completionTimeout=3000

#

spring.mqtt.publisher.id=publisherId456

spring.mqtt.publisher.defaulttopic=topic0

spring.mqtt.publisher.completionTimeout=3000

 

import com.zengfr.easymqtt4j.client.geteway.MqttSubscriberGateway;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.messaging.Message;

import org.springframework.stereotype.Component;

/**

* Created by zengfr on 2020/5/12.

*/

@Component

public class MqttSubscriberGatewayImpl implements MqttSubscriberGateway {

static Logger logger = LoggerFactory.getLogger(MqttSubscriberGatewayImpl.class);

@Override

public boolean handlerMqttMessage(Message<?> msg, String topic, String qos, String id, String timestamp) {

logger.info(String.format("收到:%s", msg));

return false;

}

}

 

import com.zengfr.easymqtt4j.client.geteway.MqttPublisherGateway;

import com.zengfr.easymqtt4j.client.util.MqttUtil;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.context.annotation.Configuration;

import org.springframework.scheduling.annotation.EnableAsync;

import org.springframework.scheduling.annotation.EnableScheduling;

import org.springframework.scheduling.annotation.Scheduled;

/**

* Created by zengfr on 2020/5/12.

*/

@Configuration

@EnableScheduling

@EnableAsync

public class MqttScheduleTask {

private static Logger logger = LoggerFactory.getLogger(MqttScheduleTask.class);

@Autowired

private MqttPublisherGateway publisherGateway;

@Scheduled(fixedRate = 1000*1,initialDelay = 1000*5)

public void sendMqtt1() {

logger.info("发送开始");

String d= MqttUtil.getNowString();

publisherGateway.publish("testtopic/0", 0, "hello 10 " + d);

logger.info("发送结束");

}

@Scheduled(fixedRate = 1000*3,initialDelay = 1000*5)

public void sendMqtt2() throws InterruptedException {

int count = 11;

for (int i = 0; i < count; i++) {

logger.info("发送开始");

String d= MqttUtil.getNowString();

publisherGateway.publish("topic/"+i, i%3, "hello 00 " + d);

publisherGateway.publish("testtopic/"+i, i%3, "hello 10 " + d);

logger.info("发送结束");

}

}

}

 

import com.zengfr.easymqtt4j.client.geteway.MqttEventGateway;

import com.zengfr.easymqtt4j.client.util.MqttMsgUtil;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;

import org.eclipse.paho.client.mqttv3.MqttException;

import org.eclipse.paho.client.mqttv3.MqttMessage;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.context.ApplicationEvent;

import org.springframework.integration.mqtt.event.MqttIntegrationEvent;

import org.springframework.messaging.Message;

import org.springframework.messaging.MessageChannel;

import org.springframework.stereotype.Component;

/**

* Created by zengfr on 2020/5/12.

*/

@Component

public class MqttEventGatewayImpl implements MqttEventGateway {

static Logger logger = LoggerFactory.getLogger(MqttEventGatewayImpl.class);

@Override

public void handleEvent(MqttIntegrationEvent event) {

logger.info("event: {}", event);

}

@Override

public void handleEvent(ApplicationEvent event) {

logger.info("event: {}", event);

}

@Override

public void connectionLost(String clientId, Throwable cause) {

logger.info("connectionLost: {} {}", clientId, cause);

}

@Override

public void messageArrived(String clientId, String topic, MqttMessage message) {

logger.info("messageArrived: {} {} {}", clientId, topic, message);

}

@Override

public void deliveryComplete(String clientId, IMqttDeliveryToken token) {

try {

logger.info("deliveryComplete: {} {}", clientId, MqttMsgUtil.tokenToString(token));

} catch (MqttException e) {

logger.error("error", e);

}

}

@Override

public void preSend(String clientId, Message<?> message, MessageChannel channel) {

}

@Override

public void postSend(String clientId, Message<?> message, MessageChannel channel, boolean sent) {

}

@Override

public void afterSendCompletion(String clientId, Message<?> message, MessageChannel channel, boolean sent, Exception ex) {

}

@Override

public boolean preReceive(String clientId, MessageChannel channel) {

return false;

}

@Override

public Message<?> postReceive(String clientId, Message<?> message, MessageChannel channel) {

return null;

}

@Override

public void afterReceiveCompletion(String clientId, Message<?> message, MessageChannel channel, Exception ex) {

}

}

 

 

 

 

 

 

 

 

 

 

 

以上是 Java使用easymqtt4j快速开发工业级mqtt企业级应用 的全部内容, 来源链接: utcz.com/z/516491.html

回到顶部