【Java】【Soul网关探秘】http数据同步-变更通知机制

【Soul网关探秘】http数据同步-变更通知机制

腾业发布于 23 分钟前

引言

上一篇,梳理除了 soul-admin 在发出数据变更通知前的处理脉络,本篇开始探究 http 同步策略的变更通知机制,

不同数据变更的通知机制应当是一致的,故本篇以 selector 配置变更通知为切入点进行深入。

配置操作入口

找到 ConfigController,这是配置操作的入口

【Java】【Soul网关探秘】http数据同步-变更通知机制

其持有一个 HttpLongPollingDataChangedListener 引用,通过 HttpLongPollingDataChangedListener 实现配置变更通知订阅和配置获取。

通知订阅:

@PostMapping(value = "/listener")

public void listener(final HttpServletRequest request, final HttpServletResponse response) {

longPollingListener.doLongPolling(request, response);

}

配置获取:

@GetMapping("/fetch")

public SoulAdminResult fetchConfigs(@NotNull final String[] groupKeys) {

Map<String, ConfigData<?>> result = Maps.newHashMap();

for (String groupKey : groupKeys) {

ConfigData<?> data = longPollingListener.fetchConfig(ConfigGroupEnum.valueOf(groupKey));

result.put(groupKey, data);

}

return SoulAdminResult.success(SoulResultMessage.SUCCESS, result);

}

通知订阅实现

使用 HttpLongPollingDataChangedListener#doLongPolling 实现通知订阅

public void doLongPolling(final HttpServletRequest request, final HttpServletResponse response) {

// 比较配置组md5

List<ConfigGroupEnum> changedGroup = compareChangedGroup(request);

String clientIp = getRemoteIp(request);

// 发现配置组变化则立即响应

if (CollectionUtils.isNotEmpty(changedGroup)) {

this.generateResponse(response, changedGroup);

log.info("send response with the changed group, ip={}, group={}", clientIp, changedGroup);

return;

}

// 监听配置变化

final AsyncContext asyncContext = request.startAsync();

asyncContext.setTimeout(0L);

// 阻塞客户端线程

scheduler.execute(new LongPollingClient(asyncContext, clientIp, HttpConstants.SERVER_MAX_HOLD_TIMEOUT));

}

通过比较 MD5 检查配置组是否发生变更,若配置组发生变更则立即响应,否则阻塞客户端线程。

此处 compareChangedGroup 实现不做深究,继续看LongPollingClient 具体处理:

@Override

public void run() {

this.asyncTimeoutFuture = scheduler.schedule(() -> {

clients.remove(LongPollingClient.this);

List<ConfigGroupEnum> changedGroups = compareChangedGroup((HttpServletRequest) asyncContext.getRequest());

sendResponse(changedGroups);

}, timeoutTime, TimeUnit.MILLISECONDS);

clients.add(this);

}

这里将 client 加入 clients 的同时,开启了一个定时任务,负责超时移除 client 并返回发生变化的配置组信息。

超时时间为构造时传入的 HttpConstants.SERVER_MAX_HOLD_TIMEOUT = 60s

配置获取实现

使用 AbstractDataChangedListener#fetchConfig 实现配置获取

public ConfigData<?> fetchConfig(final ConfigGroupEnum groupKey) {

ConfigDataCache config = CACHE.get(groupKey.name());

switch (groupKey) {

case APP_AUTH:

...

case PLUGIN:

...

case RULE:

...

case SELECTOR:

List<SelectorData> selectorList = GsonUtils.getGson().fromJson(config.getJson(), new TypeToken<List<SelectorData>>() {

}.getType());

return new ConfigData<>(config.getMd5(), config.getLastModifyTime(), selectorList);

case META_DATA:

...

default:

throw new IllegalStateException("Unexpected groupKey: " + groupKey);

}

}

这里从 CACHE 缓存获取对应配置组信息,包装成 ConfigData 并返回。

建立订阅关系

soul-web 端通过 HttpSyncDataConfiguration 初始化 HttpSyncDataService 并注入 spring容器。

HttpSyncDataService#start 方法在初始化时完成配置获取和订阅:

private void start() {

// It could be initialized multiple times, so you need to control that.

if (RUNNING.compareAndSet(false, true)) {

// fetch all group configs.

this.fetchGroupConfig(ConfigGroupEnum.values());

int threadSize = serverList.size();

this.executor = new ThreadPoolExecutor(threadSize, threadSize, 60L, TimeUnit.SECONDS,

new LinkedBlockingQueue<>(),

SoulThreadFactory.create("http-long-polling", true));

// start long polling, each server creates a thread to listen for changes.

this.serverList.forEach(server -> this.executor.execute(new HttpLongPollingTask(server)));

} else {

log.info("soul http long polling was started, executor=[{}]", executor);

}

}

1)配置获取

private void fetchGroupConfig(final ConfigGroupEnum... groups) throws SoulException {

for (int index = 0; index < this.serverList.size(); index++) {

String server = serverList.get(index);

try {

this.doFetchGroupConfig(server, groups);

break;

} catch (SoulException e) {

// no available server, throw exception.

if (index >= serverList.size() - 1) {

throw e;

}

log.warn("fetch config fail, try another one: {}", serverList.get(index + 1));

}

}

}

doFetchGroupConfig 内部发起配置获取请求并更新本地缓存

private void doFetchGroupConfig(final String server, final ConfigGroupEnum... groups) {

...

String url = server + "/configs/fetch?" + StringUtils.removeEnd(params.toString(), "&");

...

try {

json = this.httpClient.getForObject(url, String.class);

} catch (RestClientException e) {

...

}

// update local cache

boolean updated = this.updateCacheWithJson(json);

...

}

2)配置订阅

借助 HttpLongPollingTask 完成

@Override

public void run() {

while (RUNNING.get()) {

for (int time = 1; time <= retryTimes; time++) {

try {

doLongPolling(server);

} catch (Exception e) {

...

}

}

}

log.warn("Stop http long polling.");

}

HttpLongPollingTask 不断循环 doLongPolling,此处有 retry 操作

private void doLongPolling(final String server) {

...

String listenerUrl = server + "/configs/listener";

...

try {

String json = this.httpClient.postForEntity(listenerUrl, httpEntity, String.class).getBody();

log.debug("listener result: [{}]", json);

groupJson = GSON.fromJson(json, JsonObject.class).getAsJsonArray("data");

} catch (RestClientException e) {

...

}

if (groupJson != null) {

// fetch group configuration async.

ConfigGroupEnum[] changedGroups = GSON.fromJson(groupJson, ConfigGroupEnum[].class);

if (ArrayUtils.isNotEmpty(changedGroups)) {

log.info("Group config changed: {}", Arrays.toString(changedGroups));

this.doFetchGroupConfig(server, changedGroups);

}

}

}

doLongPolling 内部发起 post 请求订阅配置变更,若发生变更则重新获取配置。

至此,通知订阅处理脉络已清晰:

配置变更

上回我们说到AbstractDataChangedListener 的 onSelectorChanged 实现:

public void onSelectorChanged(final List<SelectorData> changed, final DataEventTypeEnum eventType) {

if (CollectionUtils.isEmpty(changed)) {

return;

}

// 更新 selector 缓存

this.updateSelectorCache();

// selector 变更后处理,实现具体的变更通知

this.afterSelectorChanged(changed, eventType);

}

这里 selector 变更处理是先更缓存后发通知,继续看 afterSelectorChanged 实现。

HttpLongPollingDataChangedListener 真正实现了 AbstractDataChangedListener 的 afterSelectorChanged:

@Override

protected void afterSelectorChanged(final List<SelectorData> changed, final DataEventTypeEnum eventType) {

scheduler.execute(new DataChangeTask(ConfigGroupEnum.SELECTOR));

}

由定时任务重复执行 DataChangeTask,DataChangeTask 具体处理如下:

@Override

public void run() {

for (Iterator<LongPollingClient> iter = clients.iterator(); iter.hasNext();) {

LongPollingClient client = iter.next();

iter.remove();

client.sendResponse(Collections.singletonList(groupKey));

log.info("send response with the changed group,ip={}, group={}, changeTime={}", client.ip, groupKey, changeTime);

}

}

DataChangeTask 负责从 clients 依次移除 LongPollingClient 并将 groupKey 作为响应返回,sendResponse 内部处理如下:

void sendResponse(final List<ConfigGroupEnum> changedGroups) {

// cancel scheduler

if (null != asyncTimeoutFuture) {

asyncTimeoutFuture.cancel(false);

}

generateResponse((HttpServletResponse) asyncContext.getResponse(), changedGroups);

asyncContext.complete();

}

负责生成响应报文并异步响应客户端,注意有个 asyncTimeoutFuture.cancel 操作,取消之前的 60s 超时响应。

总结

本篇梳理和分析了 soul-web 端到 soul-admin 端的配置变更通知订阅关系建立过程,配合上配置获取接口,完成了整个 http 数据同步策略的变更通知机制。

下篇,将探究 http 同步策略的web端处理变更通知,期待惊喜。

个人知识库

高性能微服务API网关-Soul

javaSoul

阅读 14发布于 23 分钟前

本作品系原创,采用《署名-非商业性使用-禁止演绎 4.0 国际》许可协议

avatar

腾业

1 声望

0 粉丝

0 条评论

得票时间

avatar

腾业

1 声望

0 粉丝

宣传栏

引言

上一篇,梳理除了 soul-admin 在发出数据变更通知前的处理脉络,本篇开始探究 http 同步策略的变更通知机制,

不同数据变更的通知机制应当是一致的,故本篇以 selector 配置变更通知为切入点进行深入。

配置操作入口

找到 ConfigController,这是配置操作的入口

【Java】【Soul网关探秘】http数据同步-变更通知机制

其持有一个 HttpLongPollingDataChangedListener 引用,通过 HttpLongPollingDataChangedListener 实现配置变更通知订阅和配置获取。

通知订阅:

@PostMapping(value = "/listener")

public void listener(final HttpServletRequest request, final HttpServletResponse response) {

longPollingListener.doLongPolling(request, response);

}

配置获取:

@GetMapping("/fetch")

public SoulAdminResult fetchConfigs(@NotNull final String[] groupKeys) {

Map<String, ConfigData<?>> result = Maps.newHashMap();

for (String groupKey : groupKeys) {

ConfigData<?> data = longPollingListener.fetchConfig(ConfigGroupEnum.valueOf(groupKey));

result.put(groupKey, data);

}

return SoulAdminResult.success(SoulResultMessage.SUCCESS, result);

}

通知订阅实现

使用 HttpLongPollingDataChangedListener#doLongPolling 实现通知订阅

public void doLongPolling(final HttpServletRequest request, final HttpServletResponse response) {

// 比较配置组md5

List<ConfigGroupEnum> changedGroup = compareChangedGroup(request);

String clientIp = getRemoteIp(request);

// 发现配置组变化则立即响应

if (CollectionUtils.isNotEmpty(changedGroup)) {

this.generateResponse(response, changedGroup);

log.info("send response with the changed group, ip={}, group={}", clientIp, changedGroup);

return;

}

// 监听配置变化

final AsyncContext asyncContext = request.startAsync();

asyncContext.setTimeout(0L);

// 阻塞客户端线程

scheduler.execute(new LongPollingClient(asyncContext, clientIp, HttpConstants.SERVER_MAX_HOLD_TIMEOUT));

}

通过比较 MD5 检查配置组是否发生变更,若配置组发生变更则立即响应,否则阻塞客户端线程。

此处 compareChangedGroup 实现不做深究,继续看LongPollingClient 具体处理:

@Override

public void run() {

this.asyncTimeoutFuture = scheduler.schedule(() -> {

clients.remove(LongPollingClient.this);

List<ConfigGroupEnum> changedGroups = compareChangedGroup((HttpServletRequest) asyncContext.getRequest());

sendResponse(changedGroups);

}, timeoutTime, TimeUnit.MILLISECONDS);

clients.add(this);

}

这里将 client 加入 clients 的同时,开启了一个定时任务,负责超时移除 client 并返回发生变化的配置组信息。

超时时间为构造时传入的 HttpConstants.SERVER_MAX_HOLD_TIMEOUT = 60s

配置获取实现

使用 AbstractDataChangedListener#fetchConfig 实现配置获取

public ConfigData<?> fetchConfig(final ConfigGroupEnum groupKey) {

ConfigDataCache config = CACHE.get(groupKey.name());

switch (groupKey) {

case APP_AUTH:

...

case PLUGIN:

...

case RULE:

...

case SELECTOR:

List<SelectorData> selectorList = GsonUtils.getGson().fromJson(config.getJson(), new TypeToken<List<SelectorData>>() {

}.getType());

return new ConfigData<>(config.getMd5(), config.getLastModifyTime(), selectorList);

case META_DATA:

...

default:

throw new IllegalStateException("Unexpected groupKey: " + groupKey);

}

}

这里从 CACHE 缓存获取对应配置组信息,包装成 ConfigData 并返回。

建立订阅关系

soul-web 端通过 HttpSyncDataConfiguration 初始化 HttpSyncDataService 并注入 spring容器。

HttpSyncDataService#start 方法在初始化时完成配置获取和订阅:

private void start() {

// It could be initialized multiple times, so you need to control that.

if (RUNNING.compareAndSet(false, true)) {

// fetch all group configs.

this.fetchGroupConfig(ConfigGroupEnum.values());

int threadSize = serverList.size();

this.executor = new ThreadPoolExecutor(threadSize, threadSize, 60L, TimeUnit.SECONDS,

new LinkedBlockingQueue<>(),

SoulThreadFactory.create("http-long-polling", true));

// start long polling, each server creates a thread to listen for changes.

this.serverList.forEach(server -> this.executor.execute(new HttpLongPollingTask(server)));

} else {

log.info("soul http long polling was started, executor=[{}]", executor);

}

}

1)配置获取

private void fetchGroupConfig(final ConfigGroupEnum... groups) throws SoulException {

for (int index = 0; index < this.serverList.size(); index++) {

String server = serverList.get(index);

try {

this.doFetchGroupConfig(server, groups);

break;

} catch (SoulException e) {

// no available server, throw exception.

if (index >= serverList.size() - 1) {

throw e;

}

log.warn("fetch config fail, try another one: {}", serverList.get(index + 1));

}

}

}

doFetchGroupConfig 内部发起配置获取请求并更新本地缓存

private void doFetchGroupConfig(final String server, final ConfigGroupEnum... groups) {

...

String url = server + "/configs/fetch?" + StringUtils.removeEnd(params.toString(), "&");

...

try {

json = this.httpClient.getForObject(url, String.class);

} catch (RestClientException e) {

...

}

// update local cache

boolean updated = this.updateCacheWithJson(json);

...

}

2)配置订阅

借助 HttpLongPollingTask 完成

@Override

public void run() {

while (RUNNING.get()) {

for (int time = 1; time <= retryTimes; time++) {

try {

doLongPolling(server);

} catch (Exception e) {

...

}

}

}

log.warn("Stop http long polling.");

}

HttpLongPollingTask 不断循环 doLongPolling,此处有 retry 操作

private void doLongPolling(final String server) {

...

String listenerUrl = server + "/configs/listener";

...

try {

String json = this.httpClient.postForEntity(listenerUrl, httpEntity, String.class).getBody();

log.debug("listener result: [{}]", json);

groupJson = GSON.fromJson(json, JsonObject.class).getAsJsonArray("data");

} catch (RestClientException e) {

...

}

if (groupJson != null) {

// fetch group configuration async.

ConfigGroupEnum[] changedGroups = GSON.fromJson(groupJson, ConfigGroupEnum[].class);

if (ArrayUtils.isNotEmpty(changedGroups)) {

log.info("Group config changed: {}", Arrays.toString(changedGroups));

this.doFetchGroupConfig(server, changedGroups);

}

}

}

doLongPolling 内部发起 post 请求订阅配置变更,若发生变更则重新获取配置。

至此,通知订阅处理脉络已清晰:

配置变更

上回我们说到AbstractDataChangedListener 的 onSelectorChanged 实现:

public void onSelectorChanged(final List<SelectorData> changed, final DataEventTypeEnum eventType) {

if (CollectionUtils.isEmpty(changed)) {

return;

}

// 更新 selector 缓存

this.updateSelectorCache();

// selector 变更后处理,实现具体的变更通知

this.afterSelectorChanged(changed, eventType);

}

这里 selector 变更处理是先更缓存后发通知,继续看 afterSelectorChanged 实现。

HttpLongPollingDataChangedListener 真正实现了 AbstractDataChangedListener 的 afterSelectorChanged:

@Override

protected void afterSelectorChanged(final List<SelectorData> changed, final DataEventTypeEnum eventType) {

scheduler.execute(new DataChangeTask(ConfigGroupEnum.SELECTOR));

}

由定时任务重复执行 DataChangeTask,DataChangeTask 具体处理如下:

@Override

public void run() {

for (Iterator<LongPollingClient> iter = clients.iterator(); iter.hasNext();) {

LongPollingClient client = iter.next();

iter.remove();

client.sendResponse(Collections.singletonList(groupKey));

log.info("send response with the changed group,ip={}, group={}, changeTime={}", client.ip, groupKey, changeTime);

}

}

DataChangeTask 负责从 clients 依次移除 LongPollingClient 并将 groupKey 作为响应返回,sendResponse 内部处理如下:

void sendResponse(final List<ConfigGroupEnum> changedGroups) {

// cancel scheduler

if (null != asyncTimeoutFuture) {

asyncTimeoutFuture.cancel(false);

}

generateResponse((HttpServletResponse) asyncContext.getResponse(), changedGroups);

asyncContext.complete();

}

负责生成响应报文并异步响应客户端,注意有个 asyncTimeoutFuture.cancel 操作,取消之前的 60s 超时响应。

总结

本篇梳理和分析了 soul-web 端到 soul-admin 端的配置变更通知订阅关系建立过程,配合上配置获取接口,完成了整个 http 数据同步策略的变更通知机制。

下篇,将探究 http 同步策略的web端处理变更通知,期待惊喜。

个人知识库

高性能微服务API网关-Soul

以上是 【Java】【Soul网关探秘】http数据同步-变更通知机制 的全部内容, 来源链接: utcz.com/a/110427.html

回到顶部