nacos分布式配置中心工作原理源码分析

编程

nacos 分布式配置中心

个人阿里巴巴nacos改造项目:alibaba_nacos

1. 服务启动配置加载

源码:NacosPropertySourceLocator

工作流程:Nacos自定义PropertySourceLocator,用于服务启动时从配置中心获取数据,然后添加再本地运行环境,从而实现配置的加载

1.0 配置加载逻辑

方法:com.alibaba.cloud.nacos.client.NacosPropertySourceLocator.locate

@Override

public PropertySource<?> locate(Environment env) {

// 借助于NACOS属性配置创建ConfigService,用于配置中心交互API

ConfigService configService = nacosConfigProperties.configServiceInstance();

if (null == configService) {

log.warn("no instance of config service found, can"t load config from nacos");

return null;

}

// 创建NACOS属性源建造器

nacosPropertySourceBuilder = new NacosPropertySourceBuilder(configService, nacosConfigProperties.getTimeout());

// 定义DataID前缀,优先级大于NACOS配置名称

String name = nacosConfigProperties.getName();

String dataIdPrefix = nacosConfigProperties.getPrefix();

if (StringUtils.isEmpty(dataIdPrefix)) {

dataIdPrefix = name;

}

// 配置名称与dataId前缀未明确指定默认采用spring.application.name

if (StringUtils.isEmpty(dataIdPrefix)) {

dataIdPrefix = env.getProperty("spring.application.name");

}

// 构造属性源集合类,用于存放不同的配置

CompositePropertySource composite = new CompositePropertySource(NACOS_PROPERTY_SOURCE_NAME);

//NAOCOS配置加载顺序:共享配置 --> 扩展配置 --> 自身配置(后面优先级高)

loadSharedConfiguration(composite);

loadExtConfiguration(composite);

loadApplicationConfiguration(composite, dataIdPrefix, nacosConfigProperties, env);

return composite;

}

nacos配置客户端参考配置:

server:

port: 8081

spring:

profiles:

active: dev

application:

name: nacos_config_client

cloud:

nacos:

config:

server-addr: localhost:8848

namespace: ${spring.profiles.active} # 命名空间,默认public,注意必须是命名空间ID

group: ${spring.application.name}

prefix: ${spring.application.name} # 文件前缀

file-extension: properties # 文件后缀

username: iot-app

password: iot-app

## 注意: 共享dataID 组名必须为 DEFAULT_GROUP

sharedDataIds: shared.properties

refreshableDataIds: shared.properties

extConfig:

- { dataId: common.properties, group: common, refresh: true }

1.1 加载共享配置

/**

* 加载共享配置

* @param compositePropertySource

*/

private void loadSharedConfiguration(CompositePropertySource compositePropertySource) {

String sharedDataIds = nacosConfigProperties.getSharedDataIds();

String refreshDataIds = nacosConfigProperties.getRefreshableDataIds();

if (sharedDataIds == null || sharedDataIds.trim().length() == 0) {

return;

}

String[] sharedDataIdArray = sharedDataIds.split(SHARED_CONFIG_SEPARATOR_CHAR);

checkDataIdFileExtension(sharedDataIdArray);

for (String dataId : sharedDataIdArray) {

String fileExtension = dataId.substring(dataId.lastIndexOf(".") + 1);

boolean isRefreshable = checkDataIdIsRefreshable(refreshDataIds, dataId);

// 表明共享配置GroupID默认DEFAULT_GROUP

loadNacosDataIfPresent(compositePropertySource, dataId, "DEFAULT_GROUP", fileExtension, isRefreshable);

}

}

1.2 加载扩展配置

/**

* 加载NACOS扩展配置

* @param compositePropertySource

*/

private void loadExtConfiguration(CompositePropertySource compositePropertySource) {

List<NacosConfigProperties.Config> extConfigs = nacosConfigProperties.getExtConfig();

if (CollectionUtils.isEmpty(extConfigs)) {

return;

}

checkExtConfiguration(extConfigs);

for (NacosConfigProperties.Config config : extConfigs) {

String dataId = config.getDataId();

String fileExtension = dataId.substring(dataId.lastIndexOf(DOT) + 1);

loadNacosDataIfPresent(compositePropertySource, dataId, config.getGroup(),fileExtension, config.isRefresh());

}

}

1.3 加载项目私有配置

private void loadApplicationConfiguration(CompositePropertySource compositePropertySource, String dataIdPrefix,

NacosConfigProperties properties, Environment environment) {

String fileExtension = properties.getFileExtension();

String nacosGroup = properties.getGroup();

// load directly once by default

loadNacosDataIfPresent(compositePropertySource, dataIdPrefix, nacosGroup, fileExtension, true);

// load with suffix, which have a higher priority than the default

loadNacosDataIfPresent(compositePropertySource, dataIdPrefix + DOT + fileExtension, nacosGroup, fileExtension, true);

// Loaded with profile, which have a higher priority than the suffix

for (String profile : environment.getActiveProfiles()) {

String dataId = dataIdPrefix + SEP1 + profile + DOT + fileExtension;

loadNacosDataIfPresent(compositePropertySource, dataId, nacosGroup, fileExtension, true);

}

}

1.4 底层加载配置逻辑

private void loadNacosDataIfPresent(final CompositePropertySource composite,

final String dataId, final String group, String fileExtension,

boolean isRefreshable) {

if (null == dataId || dataId.trim().length() < 1) {

return;

}

if (null == group || group.trim().length() < 1) {

return;

}

// 创建Nacos配置源

NacosPropertySource propertySource = this.loadNacosPropertySource(dataId, group, fileExtension, isRefreshable);

// 按照加载的前后顺序,后面加载调用addFirstPropertySource方法,表明优先级私有配置 > 扩展配置 > 共享配置

this.addFirstPropertySource(composite, propertySource, false);

}

加载Nacos配置数据

private NacosPropertySource loadNacosPropertySource(final String dataId, final String group, String fileExtension, boolean isRefreshable) {

if (NacosContextRefresher.getRefreshCount() != 0) {

// 如果不支持自动刷新配置则自动从缓存获取返回

if (!isRefreshable) {

return NacosPropertySourceRepository.getNacosPropertySource(dataId);

}

}

//构造器从配置中心获取数据

return nacosPropertySourceBuilder.build(dataId, group, fileExtension, isRefreshable);

}

//底层接口:com.alibaba.cloud.nacos.client.NacosPropertySourceBuilder.loadNacosData,配置数据会缓存NacosPropertySourceRepository

private Properties loadNacosData(String dataId, String group, String fileExtension) {

String data = null;

try {

// http远程访问配置中心,获取配置数据

data = configService.getConfig(dataId, group, timeout);

if (StringUtils.isEmpty(data)) {

log.warn("Ignore the empty nacos configuration and get it based on dataId[{}] & group[{}]",dataId, group);

return EMPTY_PROPERTIES;

}

log.info(String.format("Loading nacos data, dataId: "%s", group: "%s", data: %s", dataId, group, data));

Properties properties = NacosDataParserHandler.getInstance().parseNacosData(data, fileExtension);

return properties == null ? EMPTY_PROPERTIES : properties;

}

catch (NacosException e) {

log.error("get data from Nacos error,dataId:{}, ", dataId, e);

}

catch (Exception e) {

log.error("parse data from Nacos error,dataId:{},data:{},", dataId, data, e);

}

return EMPTY_PROPERTIES;

}

2. 服务启动后配置热加载

热加载配置方式:@RefreshScope(spring远程支持)、@NacosValue(nacos支持)

区别:

  • @RefreshScope: Bean先被清除,再次被访问后创建
  • @NacosValue: 直接通过反射机制修改NacosValue修饰的Bean属性

2.1 Http长轮询实现配置实时同步

接口:LongPollingRunnable

private class LongPollingRunnable implements Runnable {

private final int taskId;

public LongPollingRunnable(int taskId) {

this.taskId = taskId;

}

@Override

public void run() {

List<CacheData> localCacheDataList = new ArrayList<>();

List<String> inInitializingCacheList = new ArrayList<>();

try {

// check failover config: 检查JVM和本地配置文件缓存

for (CacheData localCacheData : cacheMap.get().values()) {

if (localCacheData.getTaskId() == taskId) {

localCacheDataList.add(localCacheData);

try {

syncDiskCacheToJvmCacheConfig(localCacheData);

//使用本地缓存配置文件数据检查MD5,同步缓存MD5到监听器

if (localCacheData.isUseLocalConfigInfo()) {

localCacheData.checkListenerMd5();

}

} catch (Exception e) {

log.error("get local config info error", e);

}

}

}

// check server config: 比较本地缓存MD5与服务器配置数据,再次确认已经变更的缓存坐标: shared.properties+DEFAULT_GROUP+dev(长轮询检查)

// 长轮询:nacos客户端正常每隔30s发送检查配置请求,若配置中心发现客户端与服务端配置不一致(配置更新),则收到请求立刻返回,若一致则正常30s返回正常响应

// 注意:考虑网络因素,客户端Http超时因比30略长,服务端响应应比30s略短,否则因网络原因,服务端正常30s响应,网络延时导致客户端获取不到响应超时报错

// 代码:long readTimeoutMs = timeout + (long) Math.round(timeout >> 1); 超时时间 = 正常轮询时间 * 1.5

List<String> changedGroupKeys = checkUpdateDataIds(localCacheDataList, inInitializingCacheList);

log.info("get changedGroupKeys ===> {}", changedGroupKeys);

//从nacos服务器获取变更配置信息

for (String groupKey : changedGroupKeys) {

String[] key = GroupKey.parseKey(groupKey);

String dataId = key[0];

String group = key[1];

String tenant = null;

if (key.length == 3) {

tenant = key[2];

}

try {

String[] ct = getServerConfig(dataId, group, tenant, 6000L);

//更新本地缓存

CacheData localCacheData = cacheMap.get().get(GroupKey.getKeyTenant(dataId, group, tenant));

localCacheData.setContent(ct[0]);

if (null != ct[1]) {

localCacheData.setType(ct[1]);

}

log.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}, type={}",

httpAgent.getName(), dataId, group, tenant, localCacheData.getMd5(),

ContentUtils.truncateContent(ct[0]), ct[1]);

} catch (NacosException ioe) {

String message = String.format(

"[%s] [get-update] get changed config exception. dataId=%s, group=%s, tenant=%s",

httpAgent.getName(), dataId, group, tenant);

log.error(message, ioe);

}

}

for (CacheData cacheData : localCacheDataList) {

//未初始化,或已初始化但未使用本地文件缓存配置数据

if (!cacheData.isInitializing() || inInitializingCacheList.contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) {

// 从配置中心更新数据后,检查本地监听器缓存,主要用于同步更新Spring属性信息

cacheData.checkListenerMd5();

cacheData.setInitializing(false);

}

}

inInitializingCacheList.clear();

executorService.execute(this);

} catch (Throwable e) {

// If the rotation training task is abnormal, the next execution time of the task will be punished

log.error("longPolling error : ", e);

executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS);

}

}

}

2.2 缓存监听器:cacheData.checkListenerMd5();

注意:一个缓存对应1个监听器,相当于1个配置文件就有1个监听器监听处理

缓存配置监听器注册接口:com.alibaba.cloud.nacos.refresh.NacosContextRefresher.registerNacosListener

底层代码:

private void safeNotifyListener(final String dataId, final String group, final String content, final String type,

final String md5, final ManagerListenerWrap listenerWrap) {

final ConfigListener configListener = listenerWrap.configListener;

Runnable job = () -> {

ClassLoader myClassLoader = Thread.currentThread().getContextClassLoader();

ClassLoader appClassLoader = configListener.getClass().getClassLoader();

try {

//共享配置监听器

if (configListener instanceof AbstractSharedConfigListener) {

AbstractSharedConfigListener adapter = (AbstractSharedConfigListener) configListener;

adapter.fillContext(dataId, group);

log.info("[{}] [notify-context] dataId={}, group={}, md5={}", name, dataId, group, md5);

}

// 执行回调之前先将线程classloader设置为具体webapp的classloader,以免回调方法中调用spi接口是出现异常或错用(多应用部署才会有该问题)。

Thread.currentThread().setContextClassLoader(appClassLoader);

ConfigResponse configResponse = new ConfigResponse();

configResponse.setDataId(dataId);

configResponse.setGroup(group);

configResponse.setContent(content);

configFilterChainManager.doFilter(null, configResponse);

String tempContent = configResponse.getContent();

//回调监听器获取配置信息,发送RefreshEvent 通知Spring刷新配置,相当于重新加载配置

configListener.receiveConfigInfo(tempContent);

// compare lastContent and content: 自定义NacosValue支持监听器

if (configListener instanceof AbstractConfigChangeConfigListener) {

Map data = ConfigChangeHandler.getInstance().parseChangeData(listenerWrap.lastContent, content, type);

ConfigChangeEvent configChangeEvent = new ConfigChangeEvent(data);

((AbstractConfigChangeConfigListener) configListener).receiveConfigChange(configChangeEvent);

listenerWrap.lastContent = content;

}

// 更新配置监听器MD5值

listenerWrap.lastCallMd5 = md5;

log.info("[{}] [notify-ok] dataId={}, group={}, md5={}, listener={} ", name, dataId, group, md5, configListener);

} catch (NacosException nacosException) {

log.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} errCode={} errMsg={}", name,

dataId, group, md5, configListener, nacosException.getErrCode(), nacosException.getErrMsg());

} catch (Throwable t) {

log.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} tx={}", name, dataId, group, md5, configListener, t.getCause());

} finally {

Thread.currentThread().setContextClassLoader(myClassLoader);

}

};

final long startNotify = System.currentTimeMillis();

try {

//若监听器维护线程池则交给线程池运行,否则同步运行

if (null != configListener.getExecutor()) {

configListener.getExecutor().execute(job);

} else {

job.run();

}

} catch (Throwable t) {

log.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} throwable={}", name, dataId, group,

md5, configListener, t.getCause());

}

final long finishNotify = System.currentTimeMillis();

log.info("[{}] [notify-listener] time cost={}ms in ClientWorker, dataId={}, group={}, md5={}, listener={} ",

name, (finishNotify - startNotify), dataId, group, md5, configListener);

}

2.3 监听器实现配置热更新:@RefreshScope

RefreshScope:自定义Bean的工作区域,spring默认有单例、原型,还支持扩展Bean Scope, RefreshScope其中一种

RefreshScope作用:可在Bean运行期间动态刷新

// 默认匿名监听器

ConfigListener configListener = listenerMap.computeIfAbsent(dataId, i -> new ConfigListener() {

@Override

public void receiveConfigInfo(String configInfo) {

refreshCountIncrement();

String md5 = "";

if (!StringUtils.isEmpty(configInfo)) {

try {

MessageDigest md = MessageDigest.getInstance("MD5");

md5 = new BigInteger(1, md.digest(configInfo.getBytes(StandardCharsets.UTF_8))).toString(16);

}

catch (NoSuchAlgorithmException e) {

log.warn("[Nacos] unable to get md5 for dataId: " + dataId, e);

}

}

refreshHistory.add(dataId, md5);

//发布刷新事件

applicationContext.publishEvent(new RefreshEvent(this, null, "Refresh Nacos config"));

if (log.isDebugEnabled()) {

log.debug("Refresh Nacos config group " + group + ",dataId" + dataId);

}

}

@Override

public Executor getExecutor() {

return null;

}

});

RefreshEvent事件监听处理:org.springframework.cloud.endpoint.event.RefreshEventListener.onApplicationEvent

// 刷新事件处理

public void handle(RefreshEvent event) {

if (this.ready.get()) { // don"t handle events before app is ready

log.debug("Event received " + event.getEventDesc());

// 刷新具体逻辑

Set<String> keys = this.refresh.refresh();

log.info("Refresh keys changed: " + keys);

}

}

//org.springframework.cloud.context.refresh.ContextRefresher.refresh

public synchronized Set<String> refresh() {

// 刷新配置,相当于NacosPropertySourceLocator需重新从Nacos配置中心拉取数据

Set<String> keys = refreshEnvironment();

//refreshScope 重新刷新

this.scope.refreshAll();

return keys;

}

//this.scope.refreshAll();

//org.springframework.cloud.context.scope.refresh.RefreshScope.refreshAll

public void refreshAll() {

super.destroy();

// 发布RefreshScopeRefreshedEvent事件,spring默认无监听器,若有特殊业务处理可监听,例如Eureka

this.context.publishEvent(new RefreshScopeRefreshedEvent());

}

//父类destroy: org.springframework.cloud.context.scope.GenericScope.destroy()

@Override

public void destroy() {

List<Throwable> errors = new ArrayList<Throwable>();

// 清楚Bean缓存,相当于清除RefreshScope管理的所有Bean

Collection<BeanLifecycleWrapper> wrappers = this.cache.clear();

for (BeanLifecycleWrapper wrapper : wrappers) {

try {

Lock lock = this.locks.get(wrapper.getName()).writeLock();

lock.lock();

try {

wrapper.destroy();

}

finally {

lock.unlock();

}

}

catch (RuntimeException e) {

errors.add(e);

}

}

if (!errors.isEmpty()) {

throw wrapIfNecessary(errors.get(0));

}

this.errors.clear();

}

RefreshScope Bean获取逻辑

@Override

public Object get(String name, ObjectFactory<?> objectFactory) {

// 缓存存放RefreshScope和objectFactory(包含Bean创建方法和Bean实例缓存)

BeanLifecycleWrapper value = this.cache.put(name, new BeanLifecycleWrapper(name, objectFactory));

this.locks.putIfAbsent(name, new ReentrantReadWriteLock());

try {

return value.getBean();

}

catch (RuntimeException e) {

this.errors.put(name, e);

throw e;

}

}

// objectFactory获取RefreshScope Bean,如果Bean缓存存在则直接返回,不存在在按照Spring Bean创建流程重新创建

public Object getBean() {

if (this.bean == null) {

synchronized (this.name) {

if (this.bean == null) {

this.bean = this.objectFactory.getObject();

}

}

}

return this.bean;

}

总结:@RefreshScope 配置热加载逻辑: 发布FreshEvent事件,刷新nacos配置,清空RefreshScope范围Bean, 再次访问Bean时创建(先删除后创建)

2.3 监听器实现配置热更新:@NacosValue

工作原理:直接修改守影响的Bean的成员变量的值(反射机制),效率比@RefreshScope高

监听器:com.alibaba.cloud.nacos.refresh.NacosContextRefresher.registerNacosListener

工作流程:配置变更时发送ConfigChangeEvent事件

AbstractConfigChangeConfigListener abstractConfigChangeConfigListener = new AbstractConfigChangeConfigListener() {

@Override

public void receiveConfigChange(ConfigChangeEvent event) {

applicationContext.publishEvent(event);

}

};

ConfigChangeEvent事件监听器:NacosValueAnnotationBeanPostProcessor

@Override

public void onApplicationEvent(ConfigChangeEvent event) {

// In to this event receiver, the environment has been updated the

// latest configuration information, pull directly from the environment

// fix issue #142

// placeholderNacosValueTargetMap: NacosValueAnnotationBeanPostProcessor实现BeanPostProcessor接口,在Bean初始化之前拦截标识@NacosValue属性的Bean并缓存在placeholderNacosValueTargetMap

for (Map.Entry<String, List<NacosValueTarget>> entry : placeholderNacosValueTargetMap.entrySet()) {

String key = environment.resolvePlaceholders(entry.getKey());

String newValue = environment.getProperty(key);

if (newValue == null) {

continue;

}

List<NacosValueTarget> beanPropertyList = entry.getValue();

for (NacosValueTarget target : beanPropertyList) {

String md5String = MD5.getInstance().getMD5String(newValue);

// 比较@NacosValue标识的变量的旧值和新值的MD5值,不一致表示标量值需要更新

boolean isUpdate = !target.lastMD5.equals(md5String);

if (isUpdate) {

// 原变量值MD5更新

target.updateLastMD5(md5String);

// 直接通过反射机制设置@NacosValue注解的属性值

if (target.method == null) {

setField(target, newValue);

}

else {

//方法回调设置@NacosValue注解的属性值

setMethod(target, newValue);

}

}

}

}

}

// 反射机制设置Field值

private void setField(final NacosValueTarget nacosValueTarget, final String propertyValue) {

final Object bean = nacosValueTarget.bean;

Field field = nacosValueTarget.field;

String fieldName = field.getName();

try {

ReflectionUtils.makeAccessible(field);

field.set(bean, convertIfNecessary(field, propertyValue));

if (logger.isDebugEnabled()) {

logger.debug("Update value of the {}" + " (field) in {} (bean) with {}", fieldName, nacosValueTarget.beanName, propertyValue);

}

}

catch (Throwable e) {

if (logger.isErrorEnabled()) {

logger.error("Can"t update value of the " + fieldName + " (field) in " + nacosValueTarget.beanName + " (bean)", e);

}

}

}

// 反射机制方法调用设置属性值

private void setMethod(NacosValueTarget nacosValueTarget, String propertyValue) {

Method method = nacosValueTarget.method;

ReflectionUtils.makeAccessible(method);

try {

method.invoke(nacosValueTarget.bean,

convertIfNecessary(method, propertyValue));

if (logger.isDebugEnabled()) {

logger.debug("Update value with {} (method) in {} (bean) with {}",

method.getName(), nacosValueTarget.beanName, propertyValue);

}

}

catch (Throwable e) {

if (logger.isErrorEnabled()) {

logger.error("Can"t update value with " + method.getName()

+ " (method) in " + nacosValueTarget.beanName + " (bean)", e);

}

}

}

总结:@NacosValue借助于BeanPostProcessor缓存使用@NacosValue注解的Bean相关信息,当配置中心配置变更时通过长轮询检测到,缓存监听器发送ConfigChangeEvent事件,

NacosValueAnnotationBeanPostProcessor进行事件监听,匹配@NacosValue的属性值MD5与新获取的MD5值不一致,则通过反射机制动态更新Bean的@NacosValue标识的属性,从而实现热加载

以上是 nacos分布式配置中心工作原理源码分析 的全部内容, 来源链接: utcz.com/z/517798.html

回到顶部