【Servlet】AsyncContext实现长轮询场景配置动态更新

编程

场景:实现类似Nacos配置实时更新

1. 需求

  1. 配置中心数据未变更: 正常心跳检测
  2. 配置中心数据变更: 实时同步本地

2. 实现

2.1 基础类

ConfigData: 配置抽象

/**

* function: 配置数据

* author: zhiwei_yang

* time: 2020/7/18-18:02

*/

@Data

public class ConfigData {

/** 配置名 **/

private String name;

/** 配置数据 **/

private String data;

/** 配置签名,保证唯一 **/

private String signature;

/**

* 监听服务器IP

*/

private transient Set<String> listenHosts = new CopyOnWriteArraySet<>();

/**

* 获取配置签名

* @return

*/

public String getSignature() {

return signature == null ? DigestUtils.md5Hex(name+"#"+data): signature;

}

}

ConfigStore: 模拟配置存储

**

* function: 配置数据存储,模拟配置中心

* author: zhiwei_yang

* time: 2020/7/18-18:00

*/

public class ConfigStore {

/**

* 配置模拟:key: 应用名

*/

private final static Map<String, ConfigData> configStoreMap = new ConcurrentHashMap<>();

/** 阻塞队列: 存储变更主机名及更新配置,这里用于模拟实时监听配置变更,

* 生产环境分布式系统采用公共存存储系统,客户端通过监听变更来提前响应轮询阻塞请求

* key: 主机名

* value: 监听配置列表

**/

public final static Map<String, BlockingQueue<ConfigData>> blockingQueueMap = new ConcurrentHashMap<>();

static {

String name = "default"; // 应用名

String data = "localhost:8080,localhost:8081"; // 应用部署主机名

ConfigData configData = new ConfigData();

configData.setName(name);

configData.setData(data);

configStoreMap.put(name, configData);

}

/** 更新配置 **/

public static ConfigData updateConfig(ConfigData configData){

ConfigData oldConfigData = configStoreMap.put(configData.getName(), configData);

if(oldConfigData != null) {

configData.setListenHosts(oldConfigData.getListenHosts());

oldConfigData.getListenHosts().forEach(host -> {

if(blockingQueueMap.get(host) == null){

blockingQueueMap.put(host, new LinkedBlockingQueue<>(1));

}

blockingQueueMap.get(host).offer(configData);

});

}

return oldConfigData;

}

ResponseUtil: http响应工具类

**

* function: 请求响应工具类

* author: zhiwei_yang

* time: 2020/7/18-18:22

*/

public class ResponseUtil {

public final static String SUCCESS_CODE = "00000";

public final static String SUCCESS_MSG = "操作成功";

public final static String FAILURE_CODE = "10000";

public final static String FAILURE_MSG = "操作失败";

public final static String DATA = "data";

/**

* 请求响应

* @param servletResponse

*/

public static void sendResponse(ServletResponse servletResponse, JSONObject data){

try {

servletResponse.setCharacterEncoding(Charset.defaultCharset().name());

servletResponse.setContentType("application/json;charset=utf-8");

servletResponse.getWriter().println(data.toJSONString());

} catch (IOException e) {

e.printStackTrace();

}

}

}

2.2 配置监听

功能:维持心跳、配置变更实时同步

ConfigListenServlet: 配置监听器

@WebServlet(urlPatterns = "/configListen", loadOnStartup = 0, asyncSupported = true)

@Slf4j

public class ConfigListenServlet extends HttpServlet {

/** 默认长轮询时长 **/

private final static Integer DEFAULT_LONG_POLLING_INTERNAL_MS = 10000;

/** 考虑网络抖动时延:提前500ms响应 **/

private final static Integer NETWORK_TIME_DEVIATE_MS = 500;

/** 默认请求头 **/

private final static String LONG_POLLING_REQUEST = "long_polling_time";

/** 应用名 **/

private final static String APPLICATION_REQUEST = "application";

/** 客户端配置签名 **/

private final static String APPLICATION_CONFIG_SIGNATURE = "signature";

@Override

protected void doGet(HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse) {

this.doPost(httpServletRequest, httpServletResponse);

}

@Override

protected void doPost(HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse) {

String longPollingTimeStr = httpServletRequest.getParameter(LONG_POLLING_REQUEST);

Integer realLongPollingTime = null == longPollingTimeStr || longPollingTimeStr.length() == 0 ?

DEFAULT_LONG_POLLING_INTERNAL_MS :

Integer.parseInt(longPollingTimeStr);

String requestApplicationName = httpServletRequest.getParameter(APPLICATION_REQUEST);

JSONObject jsonObject = new JSONObject();

if(null == requestApplicationName || requestApplicationName.length() == 0){

jsonObject.put(ResponseUtil.SUCCESS_CODE, ResponseUtil.SUCCESS_MSG);

jsonObject.put(ResponseUtil.DATA, null);

ResponseUtil.sendResponse(httpServletResponse, jsonObject);

return;

}

String oldSignature = httpServletRequest.getParameter(APPLICATION_CONFIG_SIGNATURE);

// 第一次获取配置,直接返回

if(oldSignature == null || oldSignature.isEmpty()){

jsonObject.put(ResponseUtil.SUCCESS_CODE, ResponseUtil.SUCCESS_MSG);

jsonObject.put(ResponseUtil.DATA, ConfigStore.getConfig(httpServletRequest.getRemoteHost(), requestApplicationName));

ResponseUtil.sendResponse(httpServletResponse, jsonObject);

return;

}

// 轮询配置:

// 1. 配置未变更则轮询时间到返回

// 2. 配置中途变更,则提前返回

AsyncContext asyncContext = httpServletRequest.startAsync();

//设置请求超时时间

asyncContext.setTimeout(realLongPollingTime + NETWORK_TIME_DEVIATE_MS);

// 子线程完成具体的请求业务处理

ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3);

scheduledExecutorService.execute(new LongPollingRunnable(scheduledExecutorService, asyncContext, requestApplicationName,

oldSignature, realLongPollingTime - NETWORK_TIME_DEVIATE_MS));

}

}

LongPollingRunnable: 长轮询线程

@Slf4j

public class LongPollingRunnable implements Runnable {

/** 调度线程池 **/

private ScheduledExecutorService scheduledExecutorService;

/** servlet 异步上下文 **/

private AsyncContext asyncContext = null;

/** 应用名 **/

private String applicationName;

/** 请求应用签名 **/

private final String signature;

private final Integer longPollingIntervalTIme;

public LongPollingRunnable(ScheduledExecutorService scheduledExecutorService, AsyncContext asyncContext,

String applicationName, String signature, Integer longPollingIntervalTIme){

this.scheduledExecutorService = scheduledExecutorService;

this.longPollingIntervalTIme = longPollingIntervalTIme;

this.asyncContext = asyncContext;

this.applicationName = applicationName;

this.signature = signature;

}

/**

* 1. 初次访问配置一样,则进入轮询状态,中间若配置变更则提前返回

* 2. 初次访问配置不一致,则直接返回

*/

@Override

public void run(){

try {

ConfigData realConfigData = ConfigStore.getConfig(asyncContext.getRequest().getRemoteHost(), applicationName);

//1. 应用本地与服务端配置数据一致

assert realConfigData != null;

if(signature.equals(realConfigData.getSignature())){

// 监听配置变化,若一致则轮询时间到返回,否则提前返回

ScheduledFuture scheduledFuture = scheduledExecutorService.schedule(() -> {

log.info("定时任务轮询正常返回配置数据 configData ==> {}", JSON.toJSONString(realConfigData));

this.sendResponse(realConfigData);

}, longPollingIntervalTIme, TimeUnit.MILLISECONDS);

//监听实时配置:受影响的IP:CopyOnWriteArrayList

scheduledExecutorService.execute(() ->{

BlockingQueue<ConfigData> blockingQueue = ConfigStore.blockingQueueMap.get(asyncContext.getRequest().getRemoteHost());

try {

ConfigData updatedConfigData = blockingQueue.poll(longPollingIntervalTIme, TimeUnit.MILLISECONDS);

// 配置变更

if(null != updatedConfigData){

log.info("配置更新,提前返回配置数据 updatedConfigData ==> {}", JSON.toJSONString(updatedConfigData));

this.sendResponse(updatedConfigData);

// 取消正常轮询定时任务

scheduledFuture.cancel(true);

}

} catch (InterruptedException e) {

e.printStackTrace();

}

});

// 配置不一致,实时响应

}else{

sendResponse(realConfigData);

}

} catch (Exception e) {

e.printStackTrace();

}

}

/**

* 响应客户端请求

* @param configData

*/

private void sendResponse(ConfigData configData){

try {

JSONObject jsonObject = new JSONObject();

jsonObject.put(ResponseUtil.SUCCESS_CODE, ResponseUtil.SUCCESS_MSG);

jsonObject.put(ResponseUtil.DATA, configData);

ResponseUtil.sendResponse(asyncContext.getResponse(), jsonObject);

}finally {

// 响应客户端

asyncContext.complete();

}

}

}

2.3 配置更新

ConfigUpdateServlet

@WebServlet(urlPatterns = "/configUpdate", loadOnStartup = 0, asyncSupported = true)

@Slf4j

public class ConfigUpdateServlet extends HttpServlet {

/** 配置 **/

private final static String APPLICATION_CONFIG_DATA = "config";

/** 应用名 **/

private final static String APPLICATION_REQUEST = "application";

@Override

protected void doGet(HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse) {

this.doPost(httpServletRequest, httpServletResponse);

}

@Override

protected void doPost(HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse) {

String requestApplicationName = httpServletRequest.getParameter(APPLICATION_REQUEST);

String configData = httpServletRequest.getParameter(APPLICATION_CONFIG_DATA);

JSONObject jsonObject = new JSONObject();

if(null == requestApplicationName || requestApplicationName.isEmpty() ||

null == configData || configData.isEmpty()){

jsonObject.put(ResponseUtil.SUCCESS_CODE, ResponseUtil.SUCCESS_MSG);

jsonObject.put(ResponseUtil.DATA, null);

ResponseUtil.sendResponse(httpServletResponse, jsonObject);

return;

}

//配置变更

ConfigData realConfigData = new ConfigData();

realConfigData.setName(requestApplicationName);

realConfigData.setData(configData);

ConfigStore.updateConfig(realConfigData);

jsonObject.put(ResponseUtil.SUCCESS_CODE, ResponseUtil.SUCCESS_MSG);

jsonObject.put(ResponseUtil.DATA, null);

ResponseUtil.sendResponse(httpServletResponse, jsonObject);

}

}

2.3 JSP测试页面

<%@ page contentType="text/html;charset=UTF-8" language="java" %>

<html>

<head>

<title>长轮询</title>

</head>

<body>

配置值[<a href="###" onclick="refresh()">刷新</a>]:<span id="configText"></span> <br/>

<hr/>

<form>

配置值[<a href="###" onclick="config()">设置</a>]:<input id="configValue" type="text" placeholder="请设置新的配置值">

</form>

<script type="text/javascript" src="js/ajax/jquery-3.5.1.min.js"></script>

<script>

let signature = "";

let longPollingTime = 10000;

// 配置刷新

function refresh() {

$.get("/configListen",

{"application": "default", "long_polling_time":longPollingTime ,"signature": signature},

function (result) {

console.log("=====> %s", JSON.stringify(result))

signature = result.data.signature;

$("#configText").text(result.data.data);

});

}

refresh();

// 设置配置

function config() {

let data = $("#configValue").val();

console.log(data)

$.get("/configUpdate",

{"application": "default", "config": data},

function (result) {

console.log(result)

});

}

// 心跳监听配置

setInterval(function () {

refresh();

}, longPollingTime);

</script>

</body>

</html>

2.4 效果

后台日志:

[2020-07-19 00:19:56 : pool-12-thread-2] - 定时任务轮询正常返回配置数据 configData ==> {"data":"localhost:8080,localhost:8081","name":"default","signature":"c339787b2d2ba8f2e5a0a282ea565c8d"}

[2020-07-19 00:20:06 : pool-13-thread-2] - 定时任务轮询正常返回配置数据 configData ==> {"data":"localhost:8080,localhost:8081","name":"default","signature":"c339787b2d2ba8f2e5a0a282ea565c8d"}

[2020-07-19 00:20:13 : pool-14-thread-1] - 配置更新,提前返回配置数据 updatedConfigData ==> {"data":"localhost:8080,localhost:8083","name":"default","signature":"2cc8db8cd8e464969f71ae419e1ca9df"}

[2020-07-19 00:20:26 : pool-15-thread-1] - 定时任务轮询正常返回配置数据 configData ==> {"data":"localhost:8080,localhost:8083","name":"default","signature":"2cc8db8cd8e464969f71ae419e1ca9df"}

[2020-07-19 00:20:34 : pool-16-thread-1] - 配置更新,提前返回配置数据 updatedConfigData ==> {"data":"localhost:8080,localhost:8084","name":"default","signature":"8807331bd603991fb44511161b963eca"}

[2020-07-19 00:20:46 : pool-17-thread-1] - 定时任务轮询正常返回配置数据 configData ==> {"data":"localhost:8080,localhost:8084","name":"default","signature":"8807331bd603991fb44511161b963eca"}

浏览器视图:

以上是 【Servlet】AsyncContext实现长轮询场景配置动态更新 的全部内容, 来源链接: utcz.com/z/518451.html

回到顶部