聊聊nacossdkgo的BeatReactor

编程

本文主要研究一下nacos-sdk-go的BeatReactor

BeatReactor

nacos-sdk-go-v0.3.2/clients/naming_client/beat_reactor.go

type BeatReactor struct {

beatMap cache.ConcurrentMap

serviceProxy NamingProxy

clientBeatInterval int64

beatThreadCount int

beatThreadSemaphore *nsema.Semaphore

beatRecordMap cache.ConcurrentMap

}

  • BeatReactor定义了beatMap、serviceProxy、clientBeatInterval、beatThreadCount、beatThreadSemaphore、beatRecordMap属性

NewBeatReactor

nacos-sdk-go-v0.3.2/clients/naming_client/beat_reactor.go

func NewBeatReactor(serviceProxy NamingProxy, clientBeatInterval int64) BeatReactor {

br := BeatReactor{}

if clientBeatInterval <= 0 {

clientBeatInterval = 5 * 1000

}

br.beatMap = cache.NewConcurrentMap()

br.serviceProxy = serviceProxy

br.clientBeatInterval = clientBeatInterval

br.beatThreadCount = Default_Beat_Thread_Num

br.beatRecordMap = cache.NewConcurrentMap()

br.beatThreadSemaphore = nsema.NewSemaphore(br.beatThreadCount)

return br

}

  • NewBeatReactor方法创建了BeatReactor,并初始化其属性

AddBeatInfo

nacos-sdk-go-v0.3.2/clients/naming_client/beat_reactor.go

func (br *BeatReactor) AddBeatInfo(serviceName string, beatInfo model.BeatInfo) {

log.Printf("[INFO] adding beat: <%s> to beat map.

", utils.ToJsonString(beatInfo))

k := buildKey(serviceName, beatInfo.Ip, beatInfo.Port)

br.beatMap.Set(k, &beatInfo)

go br.sendInstanceBeat(k, &beatInfo)

}

  • AddBeatInfo方法通过buildKey构建key,然后将其放进去beatMap,之后异步执行br.sendInstanceBeat

RemoveBeatInfo

nacos-sdk-go-v0.3.2/clients/naming_client/beat_reactor.go

func (br *BeatReactor) RemoveBeatInfo(serviceName string, ip string, port uint64) {

log.Printf("[INFO] remove beat: %s@%s:%d from beat map.

", serviceName, ip, port)

k := buildKey(serviceName, ip, port)

data, exist := br.beatMap.Get(k)

if exist {

beatInfo := data.(*model.BeatInfo)

beatInfo.Stopped = true

}

br.beatMap.Remove(k)

}

  • RemoveBeatInfo方法将指定key从beatMap移除,对于已经在beatMap中的设置其Stopped为true

sendInstanceBeat

nacos-sdk-go-v0.3.2/clients/naming_client/beat_reactor.go

func (br *BeatReactor) sendInstanceBeat(k string, beatInfo *model.BeatInfo) {

for {

br.beatThreadSemaphore.Acquire()

//如果当前实例注销,则进行停止心跳

if beatInfo.Stopped {

log.Printf("[INFO] intance[%s] stop heartBeating

", k)

br.beatThreadSemaphore.Release()

return

}

//进行心跳通信

beatInterval, err := br.serviceProxy.SendBeat(*beatInfo)

if err != nil {

log.Printf("[ERROR]:beat to server return error:%s

", err.Error())

br.beatThreadSemaphore.Release()

t := time.NewTimer(beatInfo.Period)

<-t.C

continue

}

if beatInterval > 0 {

beatInfo.Period = time.Duration(time.Millisecond.Nanoseconds() * beatInterval)

}

br.beatRecordMap.Set(k, utils.CurrentMillis())

br.beatThreadSemaphore.Release()

t := time.NewTimer(beatInfo.Period)

<-t.C

}

}

  • sendInstanceBeat方法先执行br.beatThreadSemaphore.Acquire(),之后通过br.serviceProxy.SendBeat(*beatInfo)来发送beat,之后执行br.beatThreadSemaphore.Release(),然后新创建time.NewTimer(beatInfo.Period)

小结

BeatReactor定义了beatMap、serviceProxy、clientBeatInterval、beatThreadCount、beatThreadSemaphore、beatRecordMap属性;它提供了NewBeatReactor、AddBeatInfo、RemoveBeatInfo方法

doc

  • beat_reactor

以上是 聊聊nacossdkgo的BeatReactor 的全部内容, 来源链接: utcz.com/z/517826.html

回到顶部