nacos分布式配置中心工作原理源码分析
nacos 分布式配置中心
个人阿里巴巴nacos改造项目:alibaba_nacos
1. 服务启动配置加载
源码:NacosPropertySourceLocator
工作流程:Nacos自定义PropertySourceLocator,用于服务启动时从配置中心获取数据,然后添加再本地运行环境,从而实现配置的加载
1.0 配置加载逻辑
方法:com.alibaba.cloud.nacos.client.NacosPropertySourceLocator.locate
@Overridepublic PropertySource<?> locate(Environment env) {
// 借助于NACOS属性配置创建ConfigService,用于配置中心交互API
ConfigService configService = nacosConfigProperties.configServiceInstance();
if (null == configService) {
log.warn("no instance of config service found, can"t load config from nacos");
return null;
}
// 创建NACOS属性源建造器
nacosPropertySourceBuilder = new NacosPropertySourceBuilder(configService, nacosConfigProperties.getTimeout());
// 定义DataID前缀,优先级大于NACOS配置名称
String name = nacosConfigProperties.getName();
String dataIdPrefix = nacosConfigProperties.getPrefix();
if (StringUtils.isEmpty(dataIdPrefix)) {
dataIdPrefix = name;
}
// 配置名称与dataId前缀未明确指定默认采用spring.application.name
if (StringUtils.isEmpty(dataIdPrefix)) {
dataIdPrefix = env.getProperty("spring.application.name");
}
// 构造属性源集合类,用于存放不同的配置
CompositePropertySource composite = new CompositePropertySource(NACOS_PROPERTY_SOURCE_NAME);
//NAOCOS配置加载顺序:共享配置 --> 扩展配置 --> 自身配置(后面优先级高)
loadSharedConfiguration(composite);
loadExtConfiguration(composite);
loadApplicationConfiguration(composite, dataIdPrefix, nacosConfigProperties, env);
return composite;
}
nacos配置客户端参考配置:
server: port: 8081
spring:
profiles:
active: dev
application:
name: nacos_config_client
cloud:
nacos:
config:
server-addr: localhost:8848
namespace: ${spring.profiles.active} # 命名空间,默认public,注意必须是命名空间ID
group: ${spring.application.name}
prefix: ${spring.application.name} # 文件前缀
file-extension: properties # 文件后缀
username: iot-app
password: iot-app
## 注意: 共享dataID 组名必须为 DEFAULT_GROUP
sharedDataIds: shared.properties
refreshableDataIds: shared.properties
extConfig:
- { dataId: common.properties, group: common, refresh: true }
1.1 加载共享配置
/** * 加载共享配置
* @param compositePropertySource
*/
private void loadSharedConfiguration(CompositePropertySource compositePropertySource) {
String sharedDataIds = nacosConfigProperties.getSharedDataIds();
String refreshDataIds = nacosConfigProperties.getRefreshableDataIds();
if (sharedDataIds == null || sharedDataIds.trim().length() == 0) {
return;
}
String[] sharedDataIdArray = sharedDataIds.split(SHARED_CONFIG_SEPARATOR_CHAR);
checkDataIdFileExtension(sharedDataIdArray);
for (String dataId : sharedDataIdArray) {
String fileExtension = dataId.substring(dataId.lastIndexOf(".") + 1);
boolean isRefreshable = checkDataIdIsRefreshable(refreshDataIds, dataId);
// 表明共享配置GroupID默认DEFAULT_GROUP
loadNacosDataIfPresent(compositePropertySource, dataId, "DEFAULT_GROUP", fileExtension, isRefreshable);
}
}
1.2 加载扩展配置
/** * 加载NACOS扩展配置
* @param compositePropertySource
*/
private void loadExtConfiguration(CompositePropertySource compositePropertySource) {
List<NacosConfigProperties.Config> extConfigs = nacosConfigProperties.getExtConfig();
if (CollectionUtils.isEmpty(extConfigs)) {
return;
}
checkExtConfiguration(extConfigs);
for (NacosConfigProperties.Config config : extConfigs) {
String dataId = config.getDataId();
String fileExtension = dataId.substring(dataId.lastIndexOf(DOT) + 1);
loadNacosDataIfPresent(compositePropertySource, dataId, config.getGroup(),fileExtension, config.isRefresh());
}
}
1.3 加载项目私有配置
private void loadApplicationConfiguration(CompositePropertySource compositePropertySource, String dataIdPrefix, NacosConfigProperties properties, Environment environment) {
String fileExtension = properties.getFileExtension();
String nacosGroup = properties.getGroup();
// load directly once by default
loadNacosDataIfPresent(compositePropertySource, dataIdPrefix, nacosGroup, fileExtension, true);
// load with suffix, which have a higher priority than the default
loadNacosDataIfPresent(compositePropertySource, dataIdPrefix + DOT + fileExtension, nacosGroup, fileExtension, true);
// Loaded with profile, which have a higher priority than the suffix
for (String profile : environment.getActiveProfiles()) {
String dataId = dataIdPrefix + SEP1 + profile + DOT + fileExtension;
loadNacosDataIfPresent(compositePropertySource, dataId, nacosGroup, fileExtension, true);
}
}
1.4 底层加载配置逻辑
private void loadNacosDataIfPresent(final CompositePropertySource composite, final String dataId, final String group, String fileExtension,
boolean isRefreshable) {
if (null == dataId || dataId.trim().length() < 1) {
return;
}
if (null == group || group.trim().length() < 1) {
return;
}
// 创建Nacos配置源
NacosPropertySource propertySource = this.loadNacosPropertySource(dataId, group, fileExtension, isRefreshable);
// 按照加载的前后顺序,后面加载调用addFirstPropertySource方法,表明优先级私有配置 > 扩展配置 > 共享配置
this.addFirstPropertySource(composite, propertySource, false);
}
加载Nacos配置数据
private NacosPropertySource loadNacosPropertySource(final String dataId, final String group, String fileExtension, boolean isRefreshable) { if (NacosContextRefresher.getRefreshCount() != 0) {
// 如果不支持自动刷新配置则自动从缓存获取返回
if (!isRefreshable) {
return NacosPropertySourceRepository.getNacosPropertySource(dataId);
}
}
//构造器从配置中心获取数据
return nacosPropertySourceBuilder.build(dataId, group, fileExtension, isRefreshable);
}
//底层接口:com.alibaba.cloud.nacos.client.NacosPropertySourceBuilder.loadNacosData,配置数据会缓存NacosPropertySourceRepository
private Properties loadNacosData(String dataId, String group, String fileExtension) {
String data = null;
try {
// http远程访问配置中心,获取配置数据
data = configService.getConfig(dataId, group, timeout);
if (StringUtils.isEmpty(data)) {
log.warn("Ignore the empty nacos configuration and get it based on dataId[{}] & group[{}]",dataId, group);
return EMPTY_PROPERTIES;
}
log.info(String.format("Loading nacos data, dataId: "%s", group: "%s", data: %s", dataId, group, data));
Properties properties = NacosDataParserHandler.getInstance().parseNacosData(data, fileExtension);
return properties == null ? EMPTY_PROPERTIES : properties;
}
catch (NacosException e) {
log.error("get data from Nacos error,dataId:{}, ", dataId, e);
}
catch (Exception e) {
log.error("parse data from Nacos error,dataId:{},data:{},", dataId, data, e);
}
return EMPTY_PROPERTIES;
}
2. 服务启动后配置热加载
热加载配置方式:@RefreshScope(spring远程支持)、@NacosValue(nacos支持)
区别:
- @RefreshScope: Bean先被清除,再次被访问后创建
- @NacosValue: 直接通过反射机制修改NacosValue修饰的Bean属性
2.1 Http长轮询实现配置实时同步
接口:LongPollingRunnable
private class LongPollingRunnable implements Runnable { private final int taskId;
public LongPollingRunnable(int taskId) {
this.taskId = taskId;
}
@Override
public void run() {
List<CacheData> localCacheDataList = new ArrayList<>();
List<String> inInitializingCacheList = new ArrayList<>();
try {
// check failover config: 检查JVM和本地配置文件缓存
for (CacheData localCacheData : cacheMap.get().values()) {
if (localCacheData.getTaskId() == taskId) {
localCacheDataList.add(localCacheData);
try {
syncDiskCacheToJvmCacheConfig(localCacheData);
//使用本地缓存配置文件数据检查MD5,同步缓存MD5到监听器
if (localCacheData.isUseLocalConfigInfo()) {
localCacheData.checkListenerMd5();
}
} catch (Exception e) {
log.error("get local config info error", e);
}
}
}
// check server config: 比较本地缓存MD5与服务器配置数据,再次确认已经变更的缓存坐标: shared.properties+DEFAULT_GROUP+dev(长轮询检查)
// 长轮询:nacos客户端正常每隔30s发送检查配置请求,若配置中心发现客户端与服务端配置不一致(配置更新),则收到请求立刻返回,若一致则正常30s返回正常响应
// 注意:考虑网络因素,客户端Http超时因比30略长,服务端响应应比30s略短,否则因网络原因,服务端正常30s响应,网络延时导致客户端获取不到响应超时报错
// 代码:long readTimeoutMs = timeout + (long) Math.round(timeout >> 1); 超时时间 = 正常轮询时间 * 1.5
List<String> changedGroupKeys = checkUpdateDataIds(localCacheDataList, inInitializingCacheList);
log.info("get changedGroupKeys ===> {}", changedGroupKeys);
//从nacos服务器获取变更配置信息
for (String groupKey : changedGroupKeys) {
String[] key = GroupKey.parseKey(groupKey);
String dataId = key[0];
String group = key[1];
String tenant = null;
if (key.length == 3) {
tenant = key[2];
}
try {
String[] ct = getServerConfig(dataId, group, tenant, 6000L);
//更新本地缓存
CacheData localCacheData = cacheMap.get().get(GroupKey.getKeyTenant(dataId, group, tenant));
localCacheData.setContent(ct[0]);
if (null != ct[1]) {
localCacheData.setType(ct[1]);
}
log.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}, type={}",
httpAgent.getName(), dataId, group, tenant, localCacheData.getMd5(),
ContentUtils.truncateContent(ct[0]), ct[1]);
} catch (NacosException ioe) {
String message = String.format(
"[%s] [get-update] get changed config exception. dataId=%s, group=%s, tenant=%s",
httpAgent.getName(), dataId, group, tenant);
log.error(message, ioe);
}
}
for (CacheData cacheData : localCacheDataList) {
//未初始化,或已初始化但未使用本地文件缓存配置数据
if (!cacheData.isInitializing() || inInitializingCacheList.contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) {
// 从配置中心更新数据后,检查本地监听器缓存,主要用于同步更新Spring属性信息
cacheData.checkListenerMd5();
cacheData.setInitializing(false);
}
}
inInitializingCacheList.clear();
executorService.execute(this);
} catch (Throwable e) {
// If the rotation training task is abnormal, the next execution time of the task will be punished
log.error("longPolling error : ", e);
executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS);
}
}
}
2.2 缓存监听器:cacheData.checkListenerMd5();
注意:一个缓存对应1个监听器,相当于1个配置文件就有1个监听器监听处理
缓存配置监听器注册接口:com.alibaba.cloud.nacos.refresh.NacosContextRefresher.registerNacosListener
底层代码:
private void safeNotifyListener(final String dataId, final String group, final String content, final String type, final String md5, final ManagerListenerWrap listenerWrap) {
final ConfigListener configListener = listenerWrap.configListener;
Runnable job = () -> {
ClassLoader myClassLoader = Thread.currentThread().getContextClassLoader();
ClassLoader appClassLoader = configListener.getClass().getClassLoader();
try {
//共享配置监听器
if (configListener instanceof AbstractSharedConfigListener) {
AbstractSharedConfigListener adapter = (AbstractSharedConfigListener) configListener;
adapter.fillContext(dataId, group);
log.info("[{}] [notify-context] dataId={}, group={}, md5={}", name, dataId, group, md5);
}
// 执行回调之前先将线程classloader设置为具体webapp的classloader,以免回调方法中调用spi接口是出现异常或错用(多应用部署才会有该问题)。
Thread.currentThread().setContextClassLoader(appClassLoader);
ConfigResponse configResponse = new ConfigResponse();
configResponse.setDataId(dataId);
configResponse.setGroup(group);
configResponse.setContent(content);
configFilterChainManager.doFilter(null, configResponse);
String tempContent = configResponse.getContent();
//回调监听器获取配置信息,发送RefreshEvent 通知Spring刷新配置,相当于重新加载配置
configListener.receiveConfigInfo(tempContent);
// compare lastContent and content: 自定义NacosValue支持监听器
if (configListener instanceof AbstractConfigChangeConfigListener) {
Map data = ConfigChangeHandler.getInstance().parseChangeData(listenerWrap.lastContent, content, type);
ConfigChangeEvent configChangeEvent = new ConfigChangeEvent(data);
((AbstractConfigChangeConfigListener) configListener).receiveConfigChange(configChangeEvent);
listenerWrap.lastContent = content;
}
// 更新配置监听器MD5值
listenerWrap.lastCallMd5 = md5;
log.info("[{}] [notify-ok] dataId={}, group={}, md5={}, listener={} ", name, dataId, group, md5, configListener);
} catch (NacosException nacosException) {
log.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} errCode={} errMsg={}", name,
dataId, group, md5, configListener, nacosException.getErrCode(), nacosException.getErrMsg());
} catch (Throwable t) {
log.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} tx={}", name, dataId, group, md5, configListener, t.getCause());
} finally {
Thread.currentThread().setContextClassLoader(myClassLoader);
}
};
final long startNotify = System.currentTimeMillis();
try {
//若监听器维护线程池则交给线程池运行,否则同步运行
if (null != configListener.getExecutor()) {
configListener.getExecutor().execute(job);
} else {
job.run();
}
} catch (Throwable t) {
log.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} throwable={}", name, dataId, group,
md5, configListener, t.getCause());
}
final long finishNotify = System.currentTimeMillis();
log.info("[{}] [notify-listener] time cost={}ms in ClientWorker, dataId={}, group={}, md5={}, listener={} ",
name, (finishNotify - startNotify), dataId, group, md5, configListener);
}
2.3 监听器实现配置热更新:@RefreshScope
RefreshScope:自定义Bean的工作区域,spring默认有单例、原型,还支持扩展Bean Scope, RefreshScope其中一种
RefreshScope作用:可在Bean运行期间动态刷新
// 默认匿名监听器ConfigListener configListener = listenerMap.computeIfAbsent(dataId, i -> new ConfigListener() {
@Override
public void receiveConfigInfo(String configInfo) {
refreshCountIncrement();
String md5 = "";
if (!StringUtils.isEmpty(configInfo)) {
try {
MessageDigest md = MessageDigest.getInstance("MD5");
md5 = new BigInteger(1, md.digest(configInfo.getBytes(StandardCharsets.UTF_8))).toString(16);
}
catch (NoSuchAlgorithmException e) {
log.warn("[Nacos] unable to get md5 for dataId: " + dataId, e);
}
}
refreshHistory.add(dataId, md5);
//发布刷新事件
applicationContext.publishEvent(new RefreshEvent(this, null, "Refresh Nacos config"));
if (log.isDebugEnabled()) {
log.debug("Refresh Nacos config group " + group + ",dataId" + dataId);
}
}
@Override
public Executor getExecutor() {
return null;
}
});
RefreshEvent事件监听处理:org.springframework.cloud.endpoint.event.RefreshEventListener.onApplicationEvent
// 刷新事件处理public void handle(RefreshEvent event) {
if (this.ready.get()) { // don"t handle events before app is ready
log.debug("Event received " + event.getEventDesc());
// 刷新具体逻辑
Set<String> keys = this.refresh.refresh();
log.info("Refresh keys changed: " + keys);
}
}
//org.springframework.cloud.context.refresh.ContextRefresher.refresh
public synchronized Set<String> refresh() {
// 刷新配置,相当于NacosPropertySourceLocator需重新从Nacos配置中心拉取数据
Set<String> keys = refreshEnvironment();
//refreshScope 重新刷新
this.scope.refreshAll();
return keys;
}
//this.scope.refreshAll();
//org.springframework.cloud.context.scope.refresh.RefreshScope.refreshAll
public void refreshAll() {
super.destroy();
// 发布RefreshScopeRefreshedEvent事件,spring默认无监听器,若有特殊业务处理可监听,例如Eureka
this.context.publishEvent(new RefreshScopeRefreshedEvent());
}
//父类destroy: org.springframework.cloud.context.scope.GenericScope.destroy()
@Override
public void destroy() {
List<Throwable> errors = new ArrayList<Throwable>();
// 清楚Bean缓存,相当于清除RefreshScope管理的所有Bean
Collection<BeanLifecycleWrapper> wrappers = this.cache.clear();
for (BeanLifecycleWrapper wrapper : wrappers) {
try {
Lock lock = this.locks.get(wrapper.getName()).writeLock();
lock.lock();
try {
wrapper.destroy();
}
finally {
lock.unlock();
}
}
catch (RuntimeException e) {
errors.add(e);
}
}
if (!errors.isEmpty()) {
throw wrapIfNecessary(errors.get(0));
}
this.errors.clear();
}
RefreshScope Bean获取逻辑
@Overridepublic Object get(String name, ObjectFactory<?> objectFactory) {
// 缓存存放RefreshScope和objectFactory(包含Bean创建方法和Bean实例缓存)
BeanLifecycleWrapper value = this.cache.put(name, new BeanLifecycleWrapper(name, objectFactory));
this.locks.putIfAbsent(name, new ReentrantReadWriteLock());
try {
return value.getBean();
}
catch (RuntimeException e) {
this.errors.put(name, e);
throw e;
}
}
// objectFactory获取RefreshScope Bean,如果Bean缓存存在则直接返回,不存在在按照Spring Bean创建流程重新创建
public Object getBean() {
if (this.bean == null) {
synchronized (this.name) {
if (this.bean == null) {
this.bean = this.objectFactory.getObject();
}
}
}
return this.bean;
}
总结:@RefreshScope 配置热加载逻辑: 发布FreshEvent事件,刷新nacos配置,清空RefreshScope范围Bean, 再次访问Bean时创建(先删除后创建)
2.3 监听器实现配置热更新:@NacosValue
工作原理:直接修改守影响的Bean的成员变量的值(反射机制),效率比@RefreshScope高
监听器:com.alibaba.cloud.nacos.refresh.NacosContextRefresher.registerNacosListener
工作流程:配置变更时发送ConfigChangeEvent事件
AbstractConfigChangeConfigListener abstractConfigChangeConfigListener = new AbstractConfigChangeConfigListener() { @Override
public void receiveConfigChange(ConfigChangeEvent event) {
applicationContext.publishEvent(event);
}
};
ConfigChangeEvent事件监听器:NacosValueAnnotationBeanPostProcessor
@Overridepublic void onApplicationEvent(ConfigChangeEvent event) {
// In to this event receiver, the environment has been updated the
// latest configuration information, pull directly from the environment
// fix issue #142
// placeholderNacosValueTargetMap: NacosValueAnnotationBeanPostProcessor实现BeanPostProcessor接口,在Bean初始化之前拦截标识@NacosValue属性的Bean并缓存在placeholderNacosValueTargetMap
for (Map.Entry<String, List<NacosValueTarget>> entry : placeholderNacosValueTargetMap.entrySet()) {
String key = environment.resolvePlaceholders(entry.getKey());
String newValue = environment.getProperty(key);
if (newValue == null) {
continue;
}
List<NacosValueTarget> beanPropertyList = entry.getValue();
for (NacosValueTarget target : beanPropertyList) {
String md5String = MD5.getInstance().getMD5String(newValue);
// 比较@NacosValue标识的变量的旧值和新值的MD5值,不一致表示标量值需要更新
boolean isUpdate = !target.lastMD5.equals(md5String);
if (isUpdate) {
// 原变量值MD5更新
target.updateLastMD5(md5String);
// 直接通过反射机制设置@NacosValue注解的属性值
if (target.method == null) {
setField(target, newValue);
}
else {
//方法回调设置@NacosValue注解的属性值
setMethod(target, newValue);
}
}
}
}
}
// 反射机制设置Field值
private void setField(final NacosValueTarget nacosValueTarget, final String propertyValue) {
final Object bean = nacosValueTarget.bean;
Field field = nacosValueTarget.field;
String fieldName = field.getName();
try {
ReflectionUtils.makeAccessible(field);
field.set(bean, convertIfNecessary(field, propertyValue));
if (logger.isDebugEnabled()) {
logger.debug("Update value of the {}" + " (field) in {} (bean) with {}", fieldName, nacosValueTarget.beanName, propertyValue);
}
}
catch (Throwable e) {
if (logger.isErrorEnabled()) {
logger.error("Can"t update value of the " + fieldName + " (field) in " + nacosValueTarget.beanName + " (bean)", e);
}
}
}
// 反射机制方法调用设置属性值
private void setMethod(NacosValueTarget nacosValueTarget, String propertyValue) {
Method method = nacosValueTarget.method;
ReflectionUtils.makeAccessible(method);
try {
method.invoke(nacosValueTarget.bean,
convertIfNecessary(method, propertyValue));
if (logger.isDebugEnabled()) {
logger.debug("Update value with {} (method) in {} (bean) with {}",
method.getName(), nacosValueTarget.beanName, propertyValue);
}
}
catch (Throwable e) {
if (logger.isErrorEnabled()) {
logger.error("Can"t update value with " + method.getName()
+ " (method) in " + nacosValueTarget.beanName + " (bean)", e);
}
}
}
总结:@NacosValue借助于BeanPostProcessor缓存使用@NacosValue注解的Bean相关信息,当配置中心配置变更时通过长轮询检测到,缓存监听器发送ConfigChangeEvent事件,
NacosValueAnnotationBeanPostProcessor进行事件监听,匹配@NacosValue的属性值MD5与新获取的MD5值不一致,则通过反射机制动态更新Bean的@NacosValue标识的属性,从而实现热加载
以上是 nacos分布式配置中心工作原理源码分析 的全部内容, 来源链接: utcz.com/z/517798.html