聊聊nacossdkgo的HostReactor
序
本文主要研究一下nacos-sdk-go的HostReactor
HostReactor
nacos-sdk-go-v0.3.2/clients/naming_client/host_reator.go
type HostReactor struct { serviceInfoMap cache.ConcurrentMap
cacheDir string
updateThreadNum int
serviceProxy NamingProxy
pushReceiver PushReceiver
subCallback SubscribeCallback
updateTimeMap cache.ConcurrentMap
updateCacheWhenEmpty bool
}
- HostReactor定义了serviceInfoMap、cacheDir、updateThreadNum、serviceProxy、pushReceiver、subCallback、updateTimeMap、updateCacheWhenEmpty属性
NewHostReactor
nacos-sdk-go-v0.3.2/clients/naming_client/host_reator.go
func NewHostReactor(serviceProxy NamingProxy, cacheDir string, updateThreadNum int, notLoadCacheAtStart bool, subCallback SubscribeCallback, updateCacheWhenEmpty bool) HostReactor { if updateThreadNum <= 0 {
updateThreadNum = Default_Update_Thread_Num
}
hr := HostReactor{
serviceProxy: serviceProxy,
cacheDir: cacheDir,
updateThreadNum: updateThreadNum,
serviceInfoMap: cache.NewConcurrentMap(),
subCallback: subCallback,
updateTimeMap: cache.NewConcurrentMap(),
updateCacheWhenEmpty: updateCacheWhenEmpty,
}
pr := NewPushRecevier(&hr)
hr.pushReceiver = *pr
if !notLoadCacheAtStart {
hr.loadCacheFromDisk()
}
go hr.asyncUpdateService()
return hr
}
- NewHostReactor方法创建HostReactor,然后通过NewPushRecevier创建pushReceiver,对于notLoadCacheAtStart为false的则执行loadCacheFromDisk,之后异步执行asyncUpdateService
loadCacheFromDisk
nacos-sdk-go-v0.3.2/clients/naming_client/host_reator.go
func (hr *HostReactor) loadCacheFromDisk() { serviceMap := cache.ReadServicesFromFile(hr.cacheDir)
if serviceMap == nil || len(serviceMap) == 0 {
return
}
for k, v := range serviceMap {
hr.serviceInfoMap.Set(k, v)
}
}
- loadCacheFromDisk方法通过cache.ReadServicesFromFile(hr.cacheDir)获取serviceMap
asyncUpdateService
nacos-sdk-go-v0.3.2/clients/naming_client/host_reator.go
func (hr *HostReactor) asyncUpdateService() { sema := utils.NewSemaphore(hr.updateThreadNum)
for {
for _, v := range hr.serviceInfoMap.Items() {
service := v.(model.Service)
lastRefTime, ok := hr.updateTimeMap.Get(utils.GetServiceCacheKey(service.Name, service.Clusters))
if !ok {
lastRefTime = uint64(0)
}
if uint64(utils.CurrentMillis())-lastRefTime.(uint64) > service.CacheMillis {
sema.Acquire()
go func() {
hr.updateServiceNow(service.Name, service.Clusters)
sema.Release()
}()
}
}
time.Sleep(1 * time.Second)
}
}
- asyncUpdateService方法遍历serviceInfoMap,通过hr.updateTimeMap.Get(utils.GetServiceCacheKey(service.Name, service.Clusters))获取lastRefTime,然后判断是否超过service.CacheMillis,超过的haul则执行sema.Acquire(),异步hr.updateServiceNow(service.Name, service.Clusters),最后执行sema.Release()
updateServiceNow
nacos-sdk-go-v0.3.2/clients/naming_client/host_reator.go
func (hr *HostReactor) updateServiceNow(serviceName string, clusters string) { result, err := hr.serviceProxy.QueryList(serviceName, clusters, hr.pushReceiver.port, false)
if err != nil {
log.Printf("[ERROR]:query list return error!servieName:%s cluster:%s err:%s
", serviceName, clusters, err.Error())
return
}
if result == "" {
log.Printf("[ERROR]:query list is empty!servieName:%s cluster:%s
", serviceName, clusters)
return
}
hr.ProcessServiceJson(result)
}
- updateServiceNow方法通过hr.serviceProxy.QueryList(serviceName, clusters, hr.pushReceiver.port, false)获取json,然后通过hr.ProcessServiceJson(result)解析json
ProcessServiceJson
nacos-sdk-go-v0.3.2/clients/naming_client/host_reator.go
func (hr *HostReactor) ProcessServiceJson(result string) { service := utils.JsonToService(result)
if service == nil {
return
}
cacheKey := utils.GetServiceCacheKey(service.Name, service.Clusters)
oldDomain, ok := hr.serviceInfoMap.Get(cacheKey)
if ok && !hr.updateCacheWhenEmpty {
//if instance list is empty,not to update cache
if service.Hosts == nil || len(service.Hosts) == 0 {
log.Printf("[ERROR]:do not have useful host, ignore it, name:%s
", service.Name)
return
}
}
hr.updateTimeMap.Set(cacheKey, uint64(utils.CurrentMillis()))
hr.serviceInfoMap.Set(cacheKey, *service)
if !ok || ok && !reflect.DeepEqual(service.Hosts, oldDomain.(model.Service).Hosts) {
if !ok {
log.Println("[INFO] service not found in cache " + cacheKey)
} else {
log.Printf("[INFO] service key:%s was updated to:%s
", cacheKey, utils.ToJsonString(service))
}
cache.WriteServicesToFile(*service, hr.cacheDir)
hr.subCallback.ServiceChanged(service)
}
}
- ProcessServiceJson方法通过utils.JsonToService(result)将json解析为model.Service,然后通过utils.GetServiceCacheKey(service.Name, service.Clusters)构建cacheKey,之后更新hr.updateTimeMap、hr.serviceInfoMap;对于缓存不存在或者缓存存在变更的则执行cache.WriteServicesToFile(*service, hr.cacheDir),然后触发hr.subCallback.ServiceChanged(service)
小结
HostReactor定义了serviceInfoMap、cacheDir、updateThreadNum、serviceProxy、pushReceiver、subCallback、updateTimeMap、updateCacheWhenEmpty属性
doc
- host_reator
以上是 聊聊nacossdkgo的HostReactor 的全部内容, 来源链接: utcz.com/z/517790.html