【Java】Gateway绑定MQTT实现发布订阅

前言

实现MQTT协议的中间件有很多,本文使用的是企业级 EMQX EnterPrise,不了解的小伙伴可以翻阅之前的博客。这里,主要介绍SpringBoot2.0集成MQTT实现消息推送的功能。

创建项目

创建父工程

打开 idea 点击 File>New>Project 选择Spring Initializr >JDK版本>Next 并按下图创建项目
【Java】Gateway绑定MQTT实现发布订阅
点击 next ,开发者工具 Developer Tools我们勾选前两个,
Web 我们勾选第一个,安全框架和SQL这里暂时不需要勾选,Messaging中间件,我们同样勾选第一个就好,Cloud组件我们也不用勾选。
【Java】Gateway绑定MQTT实现发布订阅

依次点击 nextfinish创建好项目
【Java】Gateway绑定MQTT实现发布订阅

删除 src ,.gitignore,HELP.md,mvnwmvnw.cmd 目录,本文采用Gateway绑定的方式,需要引入以下依赖:

<dependency>

<groupId>org.springframework.integration</groupId>

<artifactId>spring-integration-stream</artifactId>

</dependency>

<dependency>

<groupId>org.springframework.integration</groupId>

<artifactId>spring-integration-mqtt</artifactId>

</dependency>

父工程pom文件:

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">

<modelVersion>4.0.0</modelVersion>

<packaging>pom</packaging>

<modules>

<module>springboot_emqx_common</module>

<module>springboot_emqx_publish</module>

<module>springboot_emqx_subscribe</module>

</modules>

<parent>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter-parent</artifactId>

<version>2.4.1</version>

<relativePath/> <!-- lookup parent from repository -->

</parent>

<groupId>com.baba.wlb</groupId>

<artifactId>springboot_emqx</artifactId>

<version>1.0-SNAPSHOT</version>

<name>springboot_emqx</name>

<description>Demo project for Spring Boot</description>

<properties>

<java.version>1.8</java.version>

</properties>

<dependencies>

<dependency>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter-integration</artifactId>

</dependency>

<dependency>

<groupId>org.springframework.integration</groupId>

<artifactId>spring-integration-stream</artifactId>

</dependency>

<dependency>

<groupId>org.springframework.integration</groupId>

<artifactId>spring-integration-mqtt</artifactId>

</dependency>

<dependency>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter-web</artifactId>

</dependency>

<dependency>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-devtools</artifactId>

<scope>runtime</scope>

<optional>true</optional>

</dependency>

<dependency>

<groupId>org.projectlombok</groupId>

<artifactId>lombok</artifactId>

<optional>true</optional>

</dependency>

<dependency>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter-test</artifactId>

<scope>test</scope>

</dependency>

<dependency>

<groupId>org.springframework.integration</groupId>

<artifactId>spring-integration-test</artifactId>

<scope>test</scope>

</dependency>

</dependencies>

<build>

<plugins>

<plugin>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-maven-plugin</artifactId>

<!-- <configuration>-->

<!-- <excludes>-->

<!-- <exclude>-->

<!-- <groupId>org.projectlombok</groupId>-->

<!-- <artifactId>lombok</artifactId>-->

<!-- </exclude>-->

<!-- </excludes>-->

<!-- </configuration>-->

<configuration>

<mainClass>com.baba.wlb.publish.PublishApplication</mainClass>

</configuration>

</plugin>

</plugins>

</build>

</project>

创建子工程

在父工程中点击New>>module>Next 分别创建三个子工程:
springboot_emqx_common
springboot_emqx_publish
springboot_emqx_subscribe
【Java】Gateway绑定MQTT实现发布订阅

springboot_emqx_common

在该模块下新建如下package包
注:(config包下暂时没放公共配置,因为我试过好久,发现丢进来的配置只有主类'mainClass'才能加载到,其他模块加载不到通用配置,不清楚是不是我漏了什么注解,望了解这部分的人多多指教!所以只好拆分配置到各个模块中了)
【Java】Gateway绑定MQTT实现发布订阅

系统常量:Constants.java

package com.baba.wlb.share.common;

/**

* @Author wulongbo

* @Date 2020/12/29 13:50

* @Version 1.0

*/

/**

* 系统常量

*/

public class Constants {

public static final String MQTT_PUBLISH_CHANNEL = "mqttPublishChannel";

public static final String MQTT_SUBSCRIBE_CHANNEL = "mqttSubscribeChannel";

}

Emqx配置类:EmqxMqttProperties.java

package com.baba.wlb.share.properties;

import lombok.Data;

import org.springframework.boot.context.properties.ConfigurationProperties;

import org.springframework.stereotype.Component;

/**

* @Author wulongbo

* @Date 2020/12/29 11:33

* @Version 1.0

*/

/**

* 配置文件

*/

@Data

@Component

@ConfigurationProperties("wulongbo.mqtt.emqx")

public class EmqxMqttProperties {

private String username;

private String password;

private String hostUrl;

private String clientId;

private String defaultTopic;

private Integer timeout;

private Integer keepAlive;

private Integer qos;

private Integer version;

}

resource资源目录下新建一个 application-common.yml的yml文件。
注:方法一:以application-*.yml的形式命名。 方法二:模块之间并不用写依赖配置,直接在common模块的resource目录,添加一个config文件夹,在里面创建application.yml文件即可
官网是这么介绍的
这里选择第一种方式。

yml配置文件: application-common.yml

wulongbo:

mqtt:

emqx:

username: admin

password: public

#tcp://ip:port

host-url: tcp://39.102.56.91:1883

client-id: wulongbo${random.value}

default-topic: wulongbo_topic

# default-topic: $SYS/brokers/+/clients/#

timeout: 60

keep-alive: 60

# qos:{0:至多一次的传输 /1:至少分发一次,可重复 /2:只分发一次,不可重复}

qos: 1

version: 4

注:我自身的EMQX 是启用了Mysql认证登录的,并且关闭了匿名登录的哈,所以需要正确的用户名和密码

springboot_emqx_publish

在该模块下新建如下package包
【Java】Gateway绑定MQTT实现发布订阅

config类: EmqxMqttConfig.java

package com.baba.wlb.publish.config;

/**

* @Author wulongbo

* @Date 2020/12/29 11:38

* @Version 1.0

*/

import com.baba.wlb.share.common.Constants;

import com.baba.wlb.share.properties.EmqxMqttProperties;

import lombok.extern.slf4j.Slf4j;

import org.eclipse.paho.client.mqttv3.MqttConnectOptions;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import org.springframework.integration.annotation.IntegrationComponentScan;

import org.springframework.integration.annotation.ServiceActivator;

import org.springframework.integration.channel.DirectChannel;

import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;

import org.springframework.integration.mqtt.core.MqttPahoClientFactory;

import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;

import org.springframework.messaging.MessageChannel;

import org.springframework.messaging.MessageHandler;

import javax.annotation.Resource;

/**

* EMQX配置工具类

*/

@Configuration

@IntegrationComponentScan //消息扫描件

@Slf4j

public class EmqxMqttConfig {

@Resource

private EmqxMqttProperties emqxMqttProperties;

/**

* MQTT的连接

*/

@Bean

public MqttConnectOptions getMqttConnectOptions() {

// 设置相关的属性

MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();

mqttConnectOptions.setUserName(emqxMqttProperties.getUsername());

mqttConnectOptions.setPassword(emqxMqttProperties.getPassword().toCharArray());

mqttConnectOptions.setServerURIs(new String[]{emqxMqttProperties.getHostUrl()});

// 心跳

mqttConnectOptions.setKeepAliveInterval(emqxMqttProperties.getKeepAlive());

mqttConnectOptions.setMqttVersion(emqxMqttProperties.getVersion());

mqttConnectOptions.setConnectionTimeout(emqxMqttProperties.getTimeout());

// 保留/清空曾经连接的客户端信息

mqttConnectOptions.setCleanSession(false);

// qos

String playload = "设备已断开连接";

// 遗嘱消息

mqttConnectOptions.setWill("last_topic", playload.getBytes(), emqxMqttProperties.getQos(), false);

return mqttConnectOptions;

}

/**

* paho factory,mqtt自定义的连接放入factory工厂中

*/

@Bean

public MqttPahoClientFactory getMqttPahoClientFactory() {

DefaultMqttPahoClientFactory defaultMqttPahoClientFactory = new DefaultMqttPahoClientFactory();

defaultMqttPahoClientFactory.setConnectionOptions(getMqttConnectOptions());

return defaultMqttPahoClientFactory;

}

/**

* 开启连接通道

*/

@Bean(name = Constants.MQTT_PUBLISH_CHANNEL)

public MessageChannel getMqttPublishMessageChannel() {

DirectChannel directChannel = new DirectChannel();

return directChannel;

}

// /**

// * 开启连接通道

// */

// @Bean(name = Constants.MQTT_SUBSCRIBE_CHANNEL)

// public MessageChannel getMqttSubscribeMessageChannel() {

// DirectChannel directChannel = new DirectChannel();

// return directChannel;

// }

//

//

//

// /**

// * 监听topic.订阅者,消费者

// */

// @Bean

// public MessageProducer inbound() {

// MqttPahoMessageDrivenChannelAdapter mqttPahoMessageDrivenChannelAdapter = new MqttPahoMessageDrivenChannelAdapter(

// emqxMqttProperties.getClientId() + "_wlb", getMqttPahoClientFactory(), emqxMqttProperties.getDefaultTopic().split(",")

// );

// mqttPahoMessageDrivenChannelAdapter.setDisconnectCompletionTimeout(emqxMqttProperties.getTimeout());

// mqttPahoMessageDrivenChannelAdapter.setConverter(new DefaultPahoMessageConverter());

// mqttPahoMessageDrivenChannelAdapter.setQos(emqxMqttProperties.getQos());

// mqttPahoMessageDrivenChannelAdapter.setOutputChannel(getMqttPublishMessageChannel());

// return mqttPahoMessageDrivenChannelAdapter;

// }

/**

* 订阅者,消费者

*/

@Bean

@ServiceActivator(inputChannel = Constants.MQTT_PUBLISH_CHANNEL)

public MessageHandler getMessageHandler() {

MqttPahoMessageHandler mqttPahoMessageHandler = new MqttPahoMessageHandler(emqxMqttProperties.getClientId(),getMqttPahoClientFactory());

mqttPahoMessageHandler.setAsync(true);

mqttPahoMessageHandler.setDefaultQos(emqxMqttProperties.getQos());

mqttPahoMessageHandler.setDefaultTopic(emqxMqttProperties.getDefaultTopic());

return mqttPahoMessageHandler;

}

}

controller类: PublishController.java

package com.baba.wlb.publish.controller;

import com.baba.wlb.publish.service.PublishService;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.web.bind.annotation.RequestMapping;

import org.springframework.web.bind.annotation.RestController;

/**

* @Author wulongbo

* @Date 2020/12/29 13:58

* @Version 1.0

*/

/**

* 发送消息的Controller

*/

@RestController

@RequestMapping("/publish")

public class PublishController {

/**

* 注入发布者的service服务

*/

@Autowired

private PublishService publishService;

/**

* 发送消息

*/

@RequestMapping("/emqxPublish")

public String emqxPublish(String data,String topic){

publishService.sendToMqtt(data,topic);

return "success";

}

}

service: PublishService.java

package com.baba.wlb.publish.service;

import com.baba.wlb.share.common.Constants;

import org.springframework.integration.annotation.MessagingGateway;

import org.springframework.integration.mqtt.support.MqttHeaders;

import org.springframework.messaging.handler.annotation.Header;

import org.springframework.stereotype.Component;

/**

* @Author wulongbo

* @Date 2020/12/29 14:00

* @Version 1.0

*/

@MessagingGateway(defaultRequestChannel = Constants.MQTT_PUBLISH_CHANNEL)

@Component

public interface PublishService {

void sendToMqtt(String data, @Header(MqttHeaders.TOPIC) String topic);

void sendToMqtt(String data);

void sendToMqtt(@Header(MqttHeaders.TOPIC)String topic, int qos, String data);

}

注:必须加@Header(MqttHeaders.TOPIC)注解哈

application启动类: PublishApplication.java

package com.baba.wlb.publish;

import com.baba.wlb.share.properties.EmqxMqttProperties;

import org.springframework.boot.SpringApplication;

import org.springframework.boot.autoconfigure.SpringBootApplication;

import org.springframework.boot.context.properties.EnableConfigurationProperties;

/**

* @Author wulongbo

* @Date 2020/12/29 14:04

* @Version 1.0

*/

/**

* emqx 发布者启动程序

*/

@SpringBootApplication

@EnableConfigurationProperties({EmqxMqttProperties.class})

public class PublishApplication {

public static void main(String[] args) {

SpringApplication.run(PublishApplication.class,args);

}

}

注:须加入@EnableConfigurationProperties,才能加载到配置文件

yml文件: application.yml

server:

port: 1001

#spring:

# profiles:

# active: common

注:这里我们因为把publish模块设置成为了主类,所以可引入common yml,也可以不引入

springboot_emqx_subscribe

在该模块下新建如下package包
【Java】Gateway绑定MQTT实现发布订阅

config类: EmqxMqttConfig.java

package com.baba.wlb.subscribe.config;

/**

* @Author wulongbo

* @Date 2020/12/29 11:38

* @Version 1.0

*/

import com.baba.wlb.share.common.Constants;

import com.baba.wlb.share.properties.EmqxMqttProperties;

import lombok.extern.slf4j.Slf4j;

import org.eclipse.paho.client.mqttv3.MqttConnectOptions;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import org.springframework.integration.annotation.IntegrationComponentScan;

import org.springframework.integration.annotation.ServiceActivator;

import org.springframework.integration.channel.DirectChannel;

import org.springframework.integration.core.MessageProducer;

import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;

import org.springframework.integration.mqtt.core.MqttPahoClientFactory;

import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;

import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;

import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;

import org.springframework.messaging.MessageChannel;

import org.springframework.messaging.MessageHandler;

import javax.annotation.Resource;

/**

* EMQX配置工具类

*/

@Configuration

@IntegrationComponentScan //消息扫描件

@Slf4j

public class EmqxMqttConfig {

@Resource

private EmqxMqttProperties emqxMqttProperties;

/**

* MQTT的连接

*/

@Bean

public MqttConnectOptions getMqttConnectOptions() {

// 设置相关的属性

MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();

mqttConnectOptions.setUserName(emqxMqttProperties.getUsername());

mqttConnectOptions.setPassword(emqxMqttProperties.getPassword().toCharArray());

mqttConnectOptions.setServerURIs(new String[]{emqxMqttProperties.getHostUrl()});

// 心跳

mqttConnectOptions.setKeepAliveInterval(emqxMqttProperties.getKeepAlive());

mqttConnectOptions.setMqttVersion(emqxMqttProperties.getVersion());

mqttConnectOptions.setConnectionTimeout(emqxMqttProperties.getTimeout());

// 保留/清空曾经连接的客户端信息

mqttConnectOptions.setCleanSession(false);

// qos

String playload = "设备已断开连接";

// 遗嘱消息

mqttConnectOptions.setWill("last_topic", playload.getBytes(), emqxMqttProperties.getQos(), false);

return mqttConnectOptions;

}

/**

* paho factory,mqtt自定义的连接放入factory工厂中

*/

@Bean

public MqttPahoClientFactory getMqttPahoClientFactory() {

DefaultMqttPahoClientFactory defaultMqttPahoClientFactory = new DefaultMqttPahoClientFactory();

defaultMqttPahoClientFactory.setConnectionOptions(getMqttConnectOptions());

return defaultMqttPahoClientFactory;

}

// /**

// * 开启连接通道

// */

// @Bean(name = Constants.MQTT_PUBLISH_CHANNEL)

// public MessageChannel getMqttPublishMessageChannel() {

// DirectChannel directChannel = new DirectChannel();

// return directChannel;

// }

/**

* 开启连接通道

*/

@Bean(name = Constants.MQTT_SUBSCRIBE_CHANNEL)

public MessageChannel getMqttSubscribeMessageChannel() {

DirectChannel directChannel = new DirectChannel();

return directChannel;

}

/**

* 监听topic.订阅者,消费者

*/

@Bean

public MessageProducer inbound() {

MqttPahoMessageDrivenChannelAdapter mqttPahoMessageDrivenChannelAdapter = new MqttPahoMessageDrivenChannelAdapter(

emqxMqttProperties.getClientId() + "_wlb", getMqttPahoClientFactory(), emqxMqttProperties.getDefaultTopic().split(",")

);

mqttPahoMessageDrivenChannelAdapter.setDisconnectCompletionTimeout(emqxMqttProperties.getTimeout());

mqttPahoMessageDrivenChannelAdapter.setConverter(new DefaultPahoMessageConverter());

mqttPahoMessageDrivenChannelAdapter.setQos(emqxMqttProperties.getQos());

mqttPahoMessageDrivenChannelAdapter.setOutputChannel(getMqttSubscribeMessageChannel());

return mqttPahoMessageDrivenChannelAdapter;

}

/**

* 发布者,生产者

*/

@Bean

@ServiceActivator(inputChannel = Constants.MQTT_SUBSCRIBE_CHANNEL)

public MessageHandler getMessageHandler() {

MqttPahoMessageHandler mqttPahoMessageHandler = new MqttPahoMessageHandler(emqxMqttProperties.getClientId(),getMqttPahoClientFactory());

mqttPahoMessageHandler.setAsync(true);

mqttPahoMessageHandler.setDefaultQos(emqxMqttProperties.getQos());

mqttPahoMessageHandler.setDefaultTopic(emqxMqttProperties.getDefaultTopic());

return mqttPahoMessageHandler;

}

}

service业务类: SubscribeService.java

package com.baba.wlb.subscribe.service;

import com.baba.wlb.share.common.Constants;

import org.springframework.context.annotation.Bean;

import org.springframework.integration.annotation.ServiceActivator;

import org.springframework.messaging.Message;

import org.springframework.messaging.MessageHandler;

import org.springframework.messaging.MessagingException;

import org.springframework.stereotype.Service;

/**

* @Author wulongbo

* @Date 2020/12/29 14:11

* @Version 1.0

*/

/**

* 订阅者

*/

@Service

public class SubscribeService {

@Bean

@ServiceActivator(inputChannel = Constants.MQTT_SUBSCRIBE_CHANNEL)

public MessageHandler messageHandler() {

MessageHandler messageHandler = new MessageHandler() {

@Override

public void handleMessage(Message<?> message) throws MessagingException {

System.out.println("订阅者订阅消息头是:" + message.getHeaders());

System.out.println("订阅者订阅消息主体是:" + message.getPayload());

}

};

return messageHandler;

}

}

注:我们把MessageHandler放入了专门的server做业务处理,其实放config类也是OK的

application启动类: SubscribeApplication.java

package com.baba.wlb.subscribe;

/**

* @Author wulongbo

* @Date 2020/12/29 14:16

* @Version 1.0

*/

import com.baba.wlb.share.properties.EmqxMqttProperties;

import org.springframework.boot.SpringApplication;

import org.springframework.boot.autoconfigure.SpringBootApplication;

import org.springframework.boot.context.properties.EnableConfigurationProperties;

/**

* 订阅者启动类

*/

@SpringBootApplication

@EnableConfigurationProperties({EmqxMqttProperties.class})

public class SubscribeApplication {

public static void main(String[] args) {

SpringApplication.run(SubscribeApplication.class,args);

}

}

yml配置文件: application.yml

server:

port: 1002

spring:

profiles:

include: common

注:当然我们上面的publish和subscribe模块都是依赖于common模块的我们需要在各个模块上右击--Open Model Settings
【Java】Gateway绑定MQTT实现发布订阅
并按下图依次来添加模块之间的依赖关系
【Java】Gateway绑定MQTT实现发布订阅
最后,我们在分别在 publish和subscribe模块的pom文件中引入common依赖就Ok了

    <dependencies>

<dependency>

<groupId>com.baba.wlb</groupId>

<artifactId>springboot_emqx_common</artifactId>

<version>1.0-SNAPSHOT</version>

</dependency>

</dependencies>

【Java】Gateway绑定MQTT实现发布订阅
至此,我们多模块用Gateway绑定的方式就集成好了MQTT消息推送和消息订阅功能。

启动项目

分别启动PublishApplicationSubscribeApplication
端口分别为:1001,1002
【Java】Gateway绑定MQTT实现发布订阅

PostMan测试

打开postman:发起Get请求
localhost:1001/publish/emqxPublish?topic=wulongbo_topic&data=我是一条消息
可以看到我们订阅者订阅到了这条消息:
【Java】Gateway绑定MQTT实现发布订阅

至于service业务模块对消息的处理:具体是根据主题来筛选,还是根据playload来区分,看具体的业务场景和设计需要。当然EMQX 有更解耦的方式就是规则引擎来对各个事件响应动作,也有HTTP API供我们调用,读者灵活运用即可。

以上是 【Java】Gateway绑定MQTT实现发布订阅 的全部内容, 来源链接: utcz.com/a/91477.html

回到顶部