SpringBoot下WebSocket+React例子
1、Java端
1.1、引入SpringBoot的WebSocket包,Maven配置:
<dependency><groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
<version>2.1.0.RELEASE</version>
</dependency>
1.2、增加WebSocket配置类
package com.tfe.sell.common.config;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
/**
* 开启WebSocket支持
*/
@Configuration
public class WebSocketConfig {
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
1.3、添加WebSocket的服务类
package com.tfe.sell.common.config;import com.alibaba.fastjson.JSON;
import com.tfe.sell.common.entity.JsonResult;
import com.tfe.sell.common.entity.WebSocketJsonResult;
import com.tfe.sell.common.enumeration.WebSocketMessageType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.CopyOnWriteArraySet;
@ServerEndpoint("/admin/websocket/{userid}")
@Component
public class WebSocketServer {
private static final Logger logger = LoggerFactory.getLogger(WebSocketServer.class);
//静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
private static int onlineCount = 0;
//concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
private static CopyOnWriteArraySet<WebSocketServer> webSocketSet = new CopyOnWriteArraySet<WebSocketServer>();
//与某个客户端的连接会话,需要通过它来给客户端发送数据
private Session session;
//接收sid
private String userid = "";
/**
* 连接建立成功调用的方法*/
@OnOpen
public void onOpen(Session session,@PathParam("userid") String userid) {
if (!WebSocketServer.contains(userid)){
this.session = session;
webSocketSet.add(this); //加入set中
addOnlineCount(); //在线数加1
logger.info("有新窗口开始监听:" + userid + ",当前在线人数为" + getOnlineCount());
this.userid = userid;
}
}
public static Boolean contains(String userid){
for (WebSocketServer item : webSocketSet) {
if (item.userid.equals(userid)) {
return true;
}
}
return false;
}
/**
* 连接关闭调用的方法
*/
@OnClose
public void onClose() {
webSocketSet.remove(this); //从set中删除
subOnlineCount(); //在线数减1
logger.info("有一连接关闭["+ this.userid +"],当前在线人数为" + getOnlineCount());
}
/**
* 收到客户端消息后调用的方法
*
* @param message 客户端发送过来的消息*/
@OnMessage
public void onMessage(String message, Session session) {
logger.info("收到来自窗口" + userid + "的信息:" + message);
WebSocketJsonResult jsonResult = JSON.parseObject(message,WebSocketJsonResult.class);
handleMessage(jsonResult);
}
public void handleMessage(WebSocketJsonResult jsonResult){
WebSocketMessageType messageType = WebSocketMessageType.getByCode(jsonResult.getCode());
if (messageType == null){
logger.error("传入的类型不正确",jsonResult.getCode());
return;
}
switch (messageType){
case clientHeartbeat:
WebSocketJsonResult newMessage = new WebSocketJsonResult(null,
WebSocketMessageType.clientHeartbeatReply.getCode(),
WebSocketMessageType.clientHeartbeatReply.getName());
String jsonString = JSON.toJSONString(newMessage);
sendInfo(jsonString,userid);
break;
default:
//不做任何事情
logger.error("传入的类型没有对应的处理");
break;
}
}
/**
*
* @param session
* @param error
*/
@OnError
public void onError(Session session, Throwable error) {
logger.error("发生错误" + error.getMessage());
error.printStackTrace();
}
/**
* 实现服务器主动推送
*/
public void sendMessage(String message) throws IOException {
this.session.getBasicRemote().sendText(message);
}
/**
* 群发自定义消息
* */
public static void sendInfo(String message,String userid) {
logger.info("推送消息到窗口" + userid + ",推送内容:" + message);
for (WebSocketServer item : webSocketSet) {
try {
//这里可以设定只推送给这个sid的,为null则全部推送
if(userid == null) {
item.sendMessage(message);
}else if(item.userid.equals(userid)){
item.sendMessage(message);
}
} catch (IOException e) {
logger.error("error:" + e.getMessage());
continue;
}
}
}
public static Integer getSize(){
return webSocketSet.size();
}
public static String getUserIdList(){
StringBuilder sb = new StringBuilder();
for (WebSocketServer item : webSocketSet) {
sb.append(item.userid);
sb.append(",");
}
return sb.toString();
}
public static synchronized int getOnlineCount() {
return onlineCount;
}
public static synchronized void addOnlineCount() {
WebSocketServer.onlineCount++;
}
public static synchronized void subOnlineCount() {
WebSocketServer.onlineCount--;
}
}
这个服务类用到一些实体和枚举:
WebSocketJsonResult:
package com.tfe.sell.common.entity;/**
*
* JSON模型
*
* 用户后台向前台返回的JSON对象
*
*
*/
public class WebSocketJsonResult implements java.io.Serializable {
private static final long serialVersionUID = -1118025395225258944L;
private String message = "";
private int code = 1;
private Object result = null;
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
public Object getResult() {
return result;
}
public void setResult(Object obj) {
this.result = obj;
}
public int getCode() {
return code;
}
public void setCode(int code) {
this.code = code;
}
public WebSocketJsonResult(){}
public WebSocketJsonResult(Object result, int code, String message){
this.result = result;
this.code = code;
this.message = message;
}
public static WebSocketJsonResult successInstance(){
return new WebSocketJsonResult(null,1,"成功");
}
public static WebSocketJsonResult failInstance(String message){
return new WebSocketJsonResult(null,-1,message);
}
}
2.React端
2.1、添加WebSocket组件
新建WebSocket目录,添加Index.js,代码如下:
/*** 参数:[socketOpen|socketClose|socketMessage|socketError] = func,[socket连接成功时触发|连接关闭|发送消息|连接错误]
* timeout:连接超时时间
* @type {module.webSocket}
*/
module.exports = class webSocket {
constructor(param = {}) {
this.param = param;
this.reconnectCount = 0;
this.socket = null;
this.taskRemindInterval = null;
this.isSucces=true;
}
connection = () => {
let {socketUrl, timeout = 0} = this.param;
// 检测当前浏览器是什么浏览器来决定用什么socket
if ('WebSocket' in window) {
console.log('WebSocket');
this.socket = new WebSocket(socketUrl);
}
else if ('MozWebSocket' in window) {
console.log('MozWebSocket');
this.socket = new MozWebSocket(socketUrl);
}
else {
console.log('SockJS');
this.socket = new SockJS(socketUrl);
}
this.socket.onopen = this.onopen;
this.socket.onmessage = this.onmessage;
this.socket.onclose = this.onclose;
this.socket.onerror = this.onerror;
this.socket.sendMessage = this.sendMessage;
this.socket.closeSocket = this.closeSocket;
};
// 连接成功触发
onopen = () => {
let {socketOpen} = this.param;
this.isSucces=false; //连接成功将标识符改为false
socketOpen && socketOpen();
};
// 后端向前端推得数据
onmessage = (msg) => {
let {socketMessage} = this.param;
socketMessage && socketMessage(msg);
};
// 关闭连接触发
onclose = (e) => {
this.isSucces = true; //关闭将标识符改为true
this.socket.close();
let {socketClose} = this.param;
socketClose && socketClose(e);
};
onerror = (e) => {
// socket连接报错触发
let {socketError} = this.param;
this.socket = null;
socketError && socketError(e);
};
sendMessage = (value) => {
// 向后端发送数据
if(this.socket) {
this.socket.send(JSON.stringify(value));
}
};
closeSocket = () => {
this.socket.close();
};
//获得状态
readyState = () => {
return this.socket.readyState;
}
};
2.2 在需要触发服务的页面,例如订单管理页面增加WebSocket引用代码
import WebSocket from "components/WebSocket";
socket = null;
@observable
webSockethadOpen = false;
//重试次数
websocketConnectedCount = 0;
openWebSocket = () => {let head = constantStore.ROOT_API_URL.replace("https","wss").replace("http","ws");
let url = head + "/admin/websocket/" + userStore.id;
this.socket = new WebSocket({
socketUrl: url,
socketMessage: (msg) => {
var myMessage = msg;
var data = myMessage.data;
if (data != undefined){
let jsonResult = JSON.parse(data);
if (jsonResult.code == 101) {
console.info("收到新订单消息",jsonResult);
let orderGroupId = jsonResult.result;
this.soundNotice();
Modal.confirm({
title: "新订单",
content: "有新订单,是否显示?",
okText: "确认",
cancelText: "取消",
onOk: () => {
if(this != undefined){
this.refs["DetailModal"].showModal(orderGroupId);
}
else{
message.error("请先打开【订单管理】页面");
}
}
});
}
else if (jsonResult.code == 401){
//401是后端收到心跳包,并返回的消息类型
console.info("收到后台回复心跳包",jsonResult);
this.heartCheck.reset().start(); // 如果获取到消息,说明连接是正常的,重置心跳检测
}
}
},
socketClose: (msg) => {
this.webSockethadOpen = false;
message.success("订单监控已关闭");
console.info("订单监控已关闭");
},
socketError: () => {
console.info("连接建立失败");
this.webSockethadOpen = false;
this.websocketConnectedCount++;
if(this.websocketConnectedCount <= 5){
message.error('连接建立失败,尝试重连');
this.openWebSocket();
}
else{
message.error('连接建立失败,请联系管理员');
}
},
socketOpen: () => {
message.success("开始订单监控");
console.info("开始订单监控");
this.heartCheck.reset().start(); // 成功建立连接后,重置心跳检测
}
});
//重试创建socket连接
try {
this.socket.connection();
} catch (e) {
// 捕获异常,防止js error
}
this.webSockethadOpen = true;
};
closeWebSocket = () => {
if (this.webSockethadOpen == true){
this.socket.closeSocket();
}
};
soundNotice = () => {
var url = "/new_order.mp3";
var audio = new Audio(url);
audio.src = url;
audio.play();
};
componentDidMount() {
//初始化心跳包对象
this.initHeartCheck();
//进入自动开启监控
this.openWebSocket();
}
//初始化心跳包对象
initHeartCheck = () => {
let _this = this;
// 心跳检测, 每隔一段时间检测连接状态,如果处于连接中,就向server端主动发送消息,来重置server端与客户端的最大连接时间,
// 如果已经断开了,发起重连。
this.heartCheck = {
timeout: 10 * 60 * 1000, //10分钟发送一次心跳包,输入毫秒数
serverTimeoutObj: null,
reset: function () {
if (this.serverTimeoutObj != undefined){
clearTimeout(this.serverTimeoutObj);
}
return this;
},
start: function () {
this.serverTimeoutObj = setInterval(function () {
if (_this.webSockethadOpen) {
if (_this.socket.readyState() == 1) {
var message = {
"code":301,
"message":"发送心跳包"
};
console.log("发送心跳包消息",message);
_this.socket.sendMessage(message);
_this.heartCheck.reset().start(); // 如果获取到消息,说明连接是正常的,重置心跳检测
} else {
console.log("断开状态,尝试重连");
_this.openWebSocket();
}
}
}, this.timeout)
}
};
};
componentWillUnmount(){
console.info("componentWillUnmount");
this.closeWebSocket();
}
<Button type="primary" disabled={this.webSockethadOpen} onClick={() => this.openWebSocket()} >
监听订单
</Button>
<Button type="primary" disabled={!this.webSockethadOpen} onClick={() => this.closeWebSocket()}>
关闭监听
</Button>
以上是 SpringBoot下WebSocket+React例子 的全部内容, 来源链接: utcz.com/z/383153.html