聊聊nacossdkgo的HostReactor

编程

本文主要研究一下nacos-sdk-go的HostReactor

HostReactor

nacos-sdk-go-v0.3.2/clients/naming_client/host_reator.go

type HostReactor struct {

serviceInfoMap cache.ConcurrentMap

cacheDir string

updateThreadNum int

serviceProxy NamingProxy

pushReceiver PushReceiver

subCallback SubscribeCallback

updateTimeMap cache.ConcurrentMap

updateCacheWhenEmpty bool

}

  • HostReactor定义了serviceInfoMap、cacheDir、updateThreadNum、serviceProxy、pushReceiver、subCallback、updateTimeMap、updateCacheWhenEmpty属性

NewHostReactor

nacos-sdk-go-v0.3.2/clients/naming_client/host_reator.go

func NewHostReactor(serviceProxy NamingProxy, cacheDir string, updateThreadNum int, notLoadCacheAtStart bool, subCallback SubscribeCallback, updateCacheWhenEmpty bool) HostReactor {

if updateThreadNum <= 0 {

updateThreadNum = Default_Update_Thread_Num

}

hr := HostReactor{

serviceProxy: serviceProxy,

cacheDir: cacheDir,

updateThreadNum: updateThreadNum,

serviceInfoMap: cache.NewConcurrentMap(),

subCallback: subCallback,

updateTimeMap: cache.NewConcurrentMap(),

updateCacheWhenEmpty: updateCacheWhenEmpty,

}

pr := NewPushRecevier(&hr)

hr.pushReceiver = *pr

if !notLoadCacheAtStart {

hr.loadCacheFromDisk()

}

go hr.asyncUpdateService()

return hr

}

  • NewHostReactor方法创建HostReactor,然后通过NewPushRecevier创建pushReceiver,对于notLoadCacheAtStart为false的则执行loadCacheFromDisk,之后异步执行asyncUpdateService

loadCacheFromDisk

nacos-sdk-go-v0.3.2/clients/naming_client/host_reator.go

func (hr *HostReactor) loadCacheFromDisk() {

serviceMap := cache.ReadServicesFromFile(hr.cacheDir)

if serviceMap == nil || len(serviceMap) == 0 {

return

}

for k, v := range serviceMap {

hr.serviceInfoMap.Set(k, v)

}

}

  • loadCacheFromDisk方法通过cache.ReadServicesFromFile(hr.cacheDir)获取serviceMap

asyncUpdateService

nacos-sdk-go-v0.3.2/clients/naming_client/host_reator.go

func (hr *HostReactor) asyncUpdateService() {

sema := utils.NewSemaphore(hr.updateThreadNum)

for {

for _, v := range hr.serviceInfoMap.Items() {

service := v.(model.Service)

lastRefTime, ok := hr.updateTimeMap.Get(utils.GetServiceCacheKey(service.Name, service.Clusters))

if !ok {

lastRefTime = uint64(0)

}

if uint64(utils.CurrentMillis())-lastRefTime.(uint64) > service.CacheMillis {

sema.Acquire()

go func() {

hr.updateServiceNow(service.Name, service.Clusters)

sema.Release()

}()

}

}

time.Sleep(1 * time.Second)

}

}

  • asyncUpdateService方法遍历serviceInfoMap,通过hr.updateTimeMap.Get(utils.GetServiceCacheKey(service.Name, service.Clusters))获取lastRefTime,然后判断是否超过service.CacheMillis,超过的haul则执行sema.Acquire(),异步hr.updateServiceNow(service.Name, service.Clusters),最后执行sema.Release()

updateServiceNow

nacos-sdk-go-v0.3.2/clients/naming_client/host_reator.go

func (hr *HostReactor) updateServiceNow(serviceName string, clusters string) {

result, err := hr.serviceProxy.QueryList(serviceName, clusters, hr.pushReceiver.port, false)

if err != nil {

log.Printf("[ERROR]:query list return error!servieName:%s cluster:%s err:%s

", serviceName, clusters, err.Error())

return

}

if result == "" {

log.Printf("[ERROR]:query list is empty!servieName:%s cluster:%s

", serviceName, clusters)

return

}

hr.ProcessServiceJson(result)

}

  • updateServiceNow方法通过hr.serviceProxy.QueryList(serviceName, clusters, hr.pushReceiver.port, false)获取json,然后通过hr.ProcessServiceJson(result)解析json

ProcessServiceJson

nacos-sdk-go-v0.3.2/clients/naming_client/host_reator.go

func (hr *HostReactor) ProcessServiceJson(result string) {

service := utils.JsonToService(result)

if service == nil {

return

}

cacheKey := utils.GetServiceCacheKey(service.Name, service.Clusters)

oldDomain, ok := hr.serviceInfoMap.Get(cacheKey)

if ok && !hr.updateCacheWhenEmpty {

//if instance list is empty,not to update cache

if service.Hosts == nil || len(service.Hosts) == 0 {

log.Printf("[ERROR]:do not have useful host, ignore it, name:%s

", service.Name)

return

}

}

hr.updateTimeMap.Set(cacheKey, uint64(utils.CurrentMillis()))

hr.serviceInfoMap.Set(cacheKey, *service)

if !ok || ok && !reflect.DeepEqual(service.Hosts, oldDomain.(model.Service).Hosts) {

if !ok {

log.Println("[INFO] service not found in cache " + cacheKey)

} else {

log.Printf("[INFO] service key:%s was updated to:%s

", cacheKey, utils.ToJsonString(service))

}

cache.WriteServicesToFile(*service, hr.cacheDir)

hr.subCallback.ServiceChanged(service)

}

}

  • ProcessServiceJson方法通过utils.JsonToService(result)将json解析为model.Service,然后通过utils.GetServiceCacheKey(service.Name, service.Clusters)构建cacheKey,之后更新hr.updateTimeMap、hr.serviceInfoMap;对于缓存不存在或者缓存存在变更的则执行cache.WriteServicesToFile(*service, hr.cacheDir),然后触发hr.subCallback.ServiceChanged(service)

小结

HostReactor定义了serviceInfoMap、cacheDir、updateThreadNum、serviceProxy、pushReceiver、subCallback、updateTimeMap、updateCacheWhenEmpty属性

doc

  • host_reator

以上是 聊聊nacossdkgo的HostReactor 的全部内容, 来源链接: utcz.com/z/517790.html

回到顶部