【Servlet】AsyncContext实现长轮询场景配置动态更新
场景:实现类似Nacos配置实时更新
1. 需求
- 配置中心数据未变更: 正常心跳检测
- 配置中心数据变更: 实时同步本地
2. 实现
2.1 基础类
ConfigData: 配置抽象
/** * function: 配置数据
* author: zhiwei_yang
* time: 2020/7/18-18:02
*/
@Data
public class ConfigData {
/** 配置名 **/
private String name;
/** 配置数据 **/
private String data;
/** 配置签名,保证唯一 **/
private String signature;
/**
* 监听服务器IP
*/
private transient Set<String> listenHosts = new CopyOnWriteArraySet<>();
/**
* 获取配置签名
* @return
*/
public String getSignature() {
return signature == null ? DigestUtils.md5Hex(name+"#"+data): signature;
}
}
ConfigStore: 模拟配置存储
** * function: 配置数据存储,模拟配置中心
* author: zhiwei_yang
* time: 2020/7/18-18:00
*/
public class ConfigStore {
/**
* 配置模拟:key: 应用名
*/
private final static Map<String, ConfigData> configStoreMap = new ConcurrentHashMap<>();
/** 阻塞队列: 存储变更主机名及更新配置,这里用于模拟实时监听配置变更,
* 生产环境分布式系统采用公共存存储系统,客户端通过监听变更来提前响应轮询阻塞请求
* key: 主机名
* value: 监听配置列表
**/
public final static Map<String, BlockingQueue<ConfigData>> blockingQueueMap = new ConcurrentHashMap<>();
static {
String name = "default"; // 应用名
String data = "localhost:8080,localhost:8081"; // 应用部署主机名
ConfigData configData = new ConfigData();
configData.setName(name);
configData.setData(data);
configStoreMap.put(name, configData);
}
/** 更新配置 **/
public static ConfigData updateConfig(ConfigData configData){
ConfigData oldConfigData = configStoreMap.put(configData.getName(), configData);
if(oldConfigData != null) {
configData.setListenHosts(oldConfigData.getListenHosts());
oldConfigData.getListenHosts().forEach(host -> {
if(blockingQueueMap.get(host) == null){
blockingQueueMap.put(host, new LinkedBlockingQueue<>(1));
}
blockingQueueMap.get(host).offer(configData);
});
}
return oldConfigData;
}
ResponseUtil: http响应工具类
** * function: 请求响应工具类
* author: zhiwei_yang
* time: 2020/7/18-18:22
*/
public class ResponseUtil {
public final static String SUCCESS_CODE = "00000";
public final static String SUCCESS_MSG = "操作成功";
public final static String FAILURE_CODE = "10000";
public final static String FAILURE_MSG = "操作失败";
public final static String DATA = "data";
/**
* 请求响应
* @param servletResponse
*/
public static void sendResponse(ServletResponse servletResponse, JSONObject data){
try {
servletResponse.setCharacterEncoding(Charset.defaultCharset().name());
servletResponse.setContentType("application/json;charset=utf-8");
servletResponse.getWriter().println(data.toJSONString());
} catch (IOException e) {
e.printStackTrace();
}
}
}
2.2 配置监听
功能:维持心跳、配置变更实时同步
ConfigListenServlet: 配置监听器
@WebServlet(urlPatterns = "/configListen", loadOnStartup = 0, asyncSupported = true)@Slf4j
public class ConfigListenServlet extends HttpServlet {
/** 默认长轮询时长 **/
private final static Integer DEFAULT_LONG_POLLING_INTERNAL_MS = 10000;
/** 考虑网络抖动时延:提前500ms响应 **/
private final static Integer NETWORK_TIME_DEVIATE_MS = 500;
/** 默认请求头 **/
private final static String LONG_POLLING_REQUEST = "long_polling_time";
/** 应用名 **/
private final static String APPLICATION_REQUEST = "application";
/** 客户端配置签名 **/
private final static String APPLICATION_CONFIG_SIGNATURE = "signature";
@Override
protected void doGet(HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse) {
this.doPost(httpServletRequest, httpServletResponse);
}
@Override
protected void doPost(HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse) {
String longPollingTimeStr = httpServletRequest.getParameter(LONG_POLLING_REQUEST);
Integer realLongPollingTime = null == longPollingTimeStr || longPollingTimeStr.length() == 0 ?
DEFAULT_LONG_POLLING_INTERNAL_MS :
Integer.parseInt(longPollingTimeStr);
String requestApplicationName = httpServletRequest.getParameter(APPLICATION_REQUEST);
JSONObject jsonObject = new JSONObject();
if(null == requestApplicationName || requestApplicationName.length() == 0){
jsonObject.put(ResponseUtil.SUCCESS_CODE, ResponseUtil.SUCCESS_MSG);
jsonObject.put(ResponseUtil.DATA, null);
ResponseUtil.sendResponse(httpServletResponse, jsonObject);
return;
}
String oldSignature = httpServletRequest.getParameter(APPLICATION_CONFIG_SIGNATURE);
// 第一次获取配置,直接返回
if(oldSignature == null || oldSignature.isEmpty()){
jsonObject.put(ResponseUtil.SUCCESS_CODE, ResponseUtil.SUCCESS_MSG);
jsonObject.put(ResponseUtil.DATA, ConfigStore.getConfig(httpServletRequest.getRemoteHost(), requestApplicationName));
ResponseUtil.sendResponse(httpServletResponse, jsonObject);
return;
}
// 轮询配置:
// 1. 配置未变更则轮询时间到返回
// 2. 配置中途变更,则提前返回
AsyncContext asyncContext = httpServletRequest.startAsync();
//设置请求超时时间
asyncContext.setTimeout(realLongPollingTime + NETWORK_TIME_DEVIATE_MS);
// 子线程完成具体的请求业务处理
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3);
scheduledExecutorService.execute(new LongPollingRunnable(scheduledExecutorService, asyncContext, requestApplicationName,
oldSignature, realLongPollingTime - NETWORK_TIME_DEVIATE_MS));
}
}
LongPollingRunnable: 长轮询线程
@Slf4jpublic class LongPollingRunnable implements Runnable {
/** 调度线程池 **/
private ScheduledExecutorService scheduledExecutorService;
/** servlet 异步上下文 **/
private AsyncContext asyncContext = null;
/** 应用名 **/
private String applicationName;
/** 请求应用签名 **/
private final String signature;
private final Integer longPollingIntervalTIme;
public LongPollingRunnable(ScheduledExecutorService scheduledExecutorService, AsyncContext asyncContext,
String applicationName, String signature, Integer longPollingIntervalTIme){
this.scheduledExecutorService = scheduledExecutorService;
this.longPollingIntervalTIme = longPollingIntervalTIme;
this.asyncContext = asyncContext;
this.applicationName = applicationName;
this.signature = signature;
}
/**
* 1. 初次访问配置一样,则进入轮询状态,中间若配置变更则提前返回
* 2. 初次访问配置不一致,则直接返回
*/
@Override
public void run(){
try {
ConfigData realConfigData = ConfigStore.getConfig(asyncContext.getRequest().getRemoteHost(), applicationName);
//1. 应用本地与服务端配置数据一致
assert realConfigData != null;
if(signature.equals(realConfigData.getSignature())){
// 监听配置变化,若一致则轮询时间到返回,否则提前返回
ScheduledFuture scheduledFuture = scheduledExecutorService.schedule(() -> {
log.info("定时任务轮询正常返回配置数据 configData ==> {}", JSON.toJSONString(realConfigData));
this.sendResponse(realConfigData);
}, longPollingIntervalTIme, TimeUnit.MILLISECONDS);
//监听实时配置:受影响的IP:CopyOnWriteArrayList
scheduledExecutorService.execute(() ->{
BlockingQueue<ConfigData> blockingQueue = ConfigStore.blockingQueueMap.get(asyncContext.getRequest().getRemoteHost());
try {
ConfigData updatedConfigData = blockingQueue.poll(longPollingIntervalTIme, TimeUnit.MILLISECONDS);
// 配置变更
if(null != updatedConfigData){
log.info("配置更新,提前返回配置数据 updatedConfigData ==> {}", JSON.toJSONString(updatedConfigData));
this.sendResponse(updatedConfigData);
// 取消正常轮询定时任务
scheduledFuture.cancel(true);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
// 配置不一致,实时响应
}else{
sendResponse(realConfigData);
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 响应客户端请求
* @param configData
*/
private void sendResponse(ConfigData configData){
try {
JSONObject jsonObject = new JSONObject();
jsonObject.put(ResponseUtil.SUCCESS_CODE, ResponseUtil.SUCCESS_MSG);
jsonObject.put(ResponseUtil.DATA, configData);
ResponseUtil.sendResponse(asyncContext.getResponse(), jsonObject);
}finally {
// 响应客户端
asyncContext.complete();
}
}
}
2.3 配置更新
ConfigUpdateServlet
@WebServlet(urlPatterns = "/configUpdate", loadOnStartup = 0, asyncSupported = true)@Slf4j
public class ConfigUpdateServlet extends HttpServlet {
/** 配置 **/
private final static String APPLICATION_CONFIG_DATA = "config";
/** 应用名 **/
private final static String APPLICATION_REQUEST = "application";
@Override
protected void doGet(HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse) {
this.doPost(httpServletRequest, httpServletResponse);
}
@Override
protected void doPost(HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse) {
String requestApplicationName = httpServletRequest.getParameter(APPLICATION_REQUEST);
String configData = httpServletRequest.getParameter(APPLICATION_CONFIG_DATA);
JSONObject jsonObject = new JSONObject();
if(null == requestApplicationName || requestApplicationName.isEmpty() ||
null == configData || configData.isEmpty()){
jsonObject.put(ResponseUtil.SUCCESS_CODE, ResponseUtil.SUCCESS_MSG);
jsonObject.put(ResponseUtil.DATA, null);
ResponseUtil.sendResponse(httpServletResponse, jsonObject);
return;
}
//配置变更
ConfigData realConfigData = new ConfigData();
realConfigData.setName(requestApplicationName);
realConfigData.setData(configData);
ConfigStore.updateConfig(realConfigData);
jsonObject.put(ResponseUtil.SUCCESS_CODE, ResponseUtil.SUCCESS_MSG);
jsonObject.put(ResponseUtil.DATA, null);
ResponseUtil.sendResponse(httpServletResponse, jsonObject);
}
}
2.3 JSP测试页面
<%@ page contentType="text/html;charset=UTF-8" language="java" %><html>
<head>
<title>长轮询</title>
</head>
<body>
配置值[<a href="###" onclick="refresh()">刷新</a>]:<span id="configText"></span> <br/>
<hr/>
<form>
配置值[<a href="###" onclick="config()">设置</a>]:<input id="configValue" type="text" placeholder="请设置新的配置值">
</form>
<script type="text/javascript" src="js/ajax/jquery-3.5.1.min.js"></script>
<script>
let signature = "";
let longPollingTime = 10000;
// 配置刷新
function refresh() {
$.get("/configListen",
{"application": "default", "long_polling_time":longPollingTime ,"signature": signature},
function (result) {
console.log("=====> %s", JSON.stringify(result))
signature = result.data.signature;
$("#configText").text(result.data.data);
});
}
refresh();
// 设置配置
function config() {
let data = $("#configValue").val();
console.log(data)
$.get("/configUpdate",
{"application": "default", "config": data},
function (result) {
console.log(result)
});
}
// 心跳监听配置
setInterval(function () {
refresh();
}, longPollingTime);
</script>
</body>
</html>
2.4 效果
后台日志:
[2020-07-19 00:19:56 : pool-12-thread-2] - 定时任务轮询正常返回配置数据 configData ==> {"data":"localhost:8080,localhost:8081","name":"default","signature":"c339787b2d2ba8f2e5a0a282ea565c8d"}[2020-07-19 00:20:06 : pool-13-thread-2] - 定时任务轮询正常返回配置数据 configData ==> {"data":"localhost:8080,localhost:8081","name":"default","signature":"c339787b2d2ba8f2e5a0a282ea565c8d"}
[2020-07-19 00:20:13 : pool-14-thread-1] - 配置更新,提前返回配置数据 updatedConfigData ==> {"data":"localhost:8080,localhost:8083","name":"default","signature":"2cc8db8cd8e464969f71ae419e1ca9df"}
[2020-07-19 00:20:26 : pool-15-thread-1] - 定时任务轮询正常返回配置数据 configData ==> {"data":"localhost:8080,localhost:8083","name":"default","signature":"2cc8db8cd8e464969f71ae419e1ca9df"}
[2020-07-19 00:20:34 : pool-16-thread-1] - 配置更新,提前返回配置数据 updatedConfigData ==> {"data":"localhost:8080,localhost:8084","name":"default","signature":"8807331bd603991fb44511161b963eca"}
[2020-07-19 00:20:46 : pool-17-thread-1] - 定时任务轮询正常返回配置数据 configData ==> {"data":"localhost:8080,localhost:8084","name":"default","signature":"8807331bd603991fb44511161b963eca"}
浏览器视图:
以上是 【Servlet】AsyncContext实现长轮询场景配置动态更新 的全部内容, 来源链接: utcz.com/z/518451.html