聊聊nacossdkgo的NamingClient

编程

本文主要研究一下nacos-sdk-go的NamingClient

NamingClient

nacos-sdk-go-v0.3.2/clients/naming_client/naming_client.go

type NamingClient struct {

nacos_client.INacosClient

hostReactor HostReactor

serviceProxy NamingProxy

subCallback SubscribeCallback

beatReactor BeatReactor

indexMap cache.ConcurrentMap

}

  • NamingClient定义了hostReactor、serviceProxy、subCallback、beatReactor、indexMap属性

NewNamingClient

nacos-sdk-go-v0.3.2/clients/naming_client/naming_client.go

func NewNamingClient(nc nacos_client.INacosClient) (NamingClient, error) {

naming := NamingClient{}

clientConfig, err :=

nc.GetClientConfig()

if err != nil {

return naming, err

}

serverConfig, err := nc.GetServerConfig()

if err != nil {

return naming, err

}

httpAgent, err := nc.GetHttpAgent()

if err != nil {

return naming, err

}

err = logger.InitLog(clientConfig.LogDir)

if err != nil {

return naming, err

}

naming.subCallback = NewSubscribeCallback()

naming.serviceProxy, err = NewNamingProxy(clientConfig, serverConfig, httpAgent)

if err != nil {

return naming, err

}

naming.hostReactor = NewHostReactor(naming.serviceProxy, clientConfig.CacheDir+string(os.PathSeparator)+"naming",

clientConfig.UpdateThreadNum, clientConfig.NotLoadCacheAtStart, naming.subCallback, clientConfig.UpdateCacheWhenEmpty)

naming.beatReactor = NewBeatReactor(naming.serviceProxy, clientConfig.BeatInterval)

naming.indexMap = cache.NewConcurrentMap()

return naming, nil

}

  • NewNamingClient方法创建NamingClient,并设置其subCallback、serviceProxy、hostReactor、beatReactor、indexMap属性

RegisterInstance

nacos-sdk-go-v0.3.2/clients/naming_client/naming_client.go

// 注册服务实例

func (sc *NamingClient) RegisterInstance(param vo.RegisterInstanceParam) (bool, error) {

if param.GroupName == "" {

param.GroupName = constant.DEFAULT_GROUP

}

instance := model.Instance{

Ip: param.Ip,

Port: param.Port,

Metadata: param.Metadata,

ClusterName: param.ClusterName,

Healthy: param.Healthy,

Enable: param.Enable,

Weight: param.Weight,

Ephemeral: param.Ephemeral,

}

beatInfo := model.BeatInfo{

Ip: param.Ip,

Port: param.Port,

Metadata: param.Metadata,

ServiceName: utils.GetGroupName(param.ServiceName, param.GroupName),

Cluster: param.ClusterName,

Weight: param.Weight,

Period: utils.GetDurationWithDefault(param.Metadata, constant.HEART_BEAT_INTERVAL, time.Second*5),

}

_, err := sc.serviceProxy.RegisterInstance(utils.GetGroupName(param.ServiceName, param.GroupName), param.GroupName, instance)

if err != nil {

return false, err

}

if instance.Ephemeral {

sc.beatReactor.AddBeatInfo(utils.GetGroupName(param.ServiceName, param.GroupName), beatInfo)

}

return true, nil

}

  • RegisterInstance方法根据RegisterInstanceParam构建instance,然后通过sc.serviceProxy.RegisterInstance(utils.GetGroupName(param.ServiceName, param.GroupName), param.GroupName, instance)去注册;如果instance是Ephemeral类型的,则执行sc.beatReactor.AddBeatInfo(utils.GetGroupName(param.ServiceName, param.GroupName), beatInfo)

DeregisterInstance

nacos-sdk-go-v0.3.2/clients/naming_client/naming_client.go

// 注销服务实例

func (sc *NamingClient) DeregisterInstance(param vo.DeregisterInstanceParam) (bool, error) {

if param.GroupName == "" {

param.GroupName = constant.DEFAULT_GROUP

}

sc.beatReactor.RemoveBeatInfo(utils.GetGroupName(param.ServiceName, param.GroupName), param.Ip, param.Port)

_, err := sc.serviceProxy.DeregisterInstance(utils.GetGroupName(param.ServiceName, param.GroupName), param.Ip, param.Port, param.Cluster, param.Ephemeral)

if err != nil {

return false, err

}

return true, nil

}

  • DeregisterInstance方法执行sc.beatReactor.RemoveBeatInfo及sc.serviceProxy.DeregisterInstance

GetService

nacos-sdk-go-v0.3.2/clients/naming_client/naming_client.go

// 获取服务列表

func (sc *NamingClient) GetService(param vo.GetServiceParam) (model.Service, error) {

if param.GroupName == "" {

param.GroupName = constant.DEFAULT_GROUP

}

service := sc.hostReactor.GetServiceInfo(utils.GetGroupName(param.ServiceName, param.GroupName), strings.Join(param.Clusters, ","))

return service, nil

}

  • GetService方法通过sc.hostReactor.GetServiceInfo来查询service信息

GetAllServicesInfo

nacos-sdk-go-v0.3.2/clients/naming_client/naming_client.go

func (sc *NamingClient) GetAllServicesInfo(param vo.GetAllServiceInfoParam) ([]model.Service, error) {

if param.GroupName == "" {

param.GroupName = constant.DEFAULT_GROUP

}

if param.NameSpace == "" {

param.NameSpace = constant.DEFAULT_NAMESPACE_ID

}

service := sc.hostReactor.GetAllServiceInfo(param.NameSpace, param.GroupName, strings.Join(param.Clusters, ","))

return service, nil

}

  • GetAllServicesInfo方法通过sc.hostReactor.GetAllServiceInfo来查询所有的service信息

SelectAllInstances

nacos-sdk-go-v0.3.2/clients/naming_client/naming_client.go

func (sc *NamingClient) SelectAllInstances(param vo.SelectAllInstancesParam) ([]model.Instance, error) {

if param.GroupName == "" {

param.GroupName = constant.DEFAULT_GROUP

}

service := sc.hostReactor.GetServiceInfo(utils.GetGroupName(param.ServiceName, param.GroupName), strings.Join(param.Clusters, ","))

if service.Hosts == nil || len(service.Hosts) == 0 {

return []model.Instance{}, errors.New("instance list is empty!")

}

return service.Hosts, nil

}

  • SelectAllInstances方法通过sc.hostReactor.GetServiceInfo获取service信息,然后返回service.Hosts

SelectInstances

nacos-sdk-go-v0.3.2/clients/naming_client/naming_client.go

func (sc *NamingClient) SelectInstances(param vo.SelectInstancesParam) ([]model.Instance, error) {

if param.GroupName == "" {

param.GroupName = constant.DEFAULT_GROUP

}

service := sc.hostReactor.GetServiceInfo(utils.GetGroupName(param.ServiceName, param.GroupName), strings.Join(param.Clusters, ","))

return sc.selectInstances(service, param.HealthyOnly)

}

func (sc *NamingClient) selectInstances(service model.Service, healthy bool) ([]model.Instance, error) {

if service.Hosts == nil || len(service.Hosts) == 0 {

return []model.Instance{}, errors.New("instance list is empty!")

}

hosts := service.Hosts

var result []model.Instance

for _, host := range hosts {

if host.Healthy == healthy && host.Enable && host.Weight > 0 {

result = append(result, host)

}

}

return result, nil

}

  • SelectInstances方法通过sc.hostReactor.GetServiceInfo获取service信息,然后再通过sc.selectInstances(service, param.HealthyOnly)获取健康的instance

SelectOneHealthyInstance

nacos-sdk-go-v0.3.2/clients/naming_client/naming_client.go

func (sc *NamingClient) SelectOneHealthyInstance(param vo.SelectOneHealthInstanceParam) (*model.Instance, error) {

if param.GroupName == "" {

param.GroupName = constant.DEFAULT_GROUP

}

service := sc.hostReactor.GetServiceInfo(utils.GetGroupName(param.ServiceName, param.GroupName), strings.Join(param.Clusters, ","))

return sc.selectOneHealthyInstances(service)

}

func (sc *NamingClient) selectOneHealthyInstances(service model.Service) (*model.Instance, error) {

if service.Hosts == nil || len(service.Hosts) == 0 {

return nil, errors.New("instance list is empty!")

}

hosts := service.Hosts

var result []model.Instance

mw := 0

for _, host := range hosts {

if host.Healthy && host.Enable && host.Weight > 0 {

cw := int(math.Ceil(host.Weight))

if cw > mw {

mw = cw

}

result = append(result, host)

}

}

if len(result) == 0 {

return nil, errors.New("healthy instance list is empty!")

}

randomInstances := random(result, mw)

key := utils.GetServiceCacheKey(service.Name, service.Clusters)

i, indexOk := sc.indexMap.Get(key)

var index int

if !indexOk {

index = rand.Intn(len(randomInstances))

} else {

index = i.(int)

index += 1

if index >= len(randomInstances) {

index = index % len(randomInstances)

}

}

sc.indexMap.Set(key, index)

return &randomInstances[index], nil

}

  • SelectOneHealthyInstance方法通过sc.hostReactor.GetServiceInfo获取service信息,再通过sc.selectOneHealthyInstances(service)选择一个健康的instance

Subscribe

nacos-sdk-go-v0.3.2/clients/naming_client/naming_client.go

// 服务监听

func (sc *NamingClient) Subscribe(param *vo.SubscribeParam) error {

if param.GroupName == "" {

param.GroupName = constant.DEFAULT_GROUP

}

serviceParam := vo.GetServiceParam{

ServiceName: param.ServiceName,

GroupName: param.GroupName,

Clusters: param.Clusters,

}

sc.subCallback.AddCallbackFuncs(utils.GetGroupName(param.ServiceName, param.GroupName), strings.Join(param.Clusters, ","), &param.SubscribeCallback)

_, err := sc.GetService(serviceParam)

if err != nil {

return err

}

return nil

}

  • Subscribe方法通过sc.subCallback.AddCallbackFuncs来注册callback

Unsubscribe

nacos-sdk-go-v0.3.2/clients/naming_client/naming_client.go

//取消服务监听

func (sc *NamingClient) Unsubscribe(param *vo.SubscribeParam) error {

sc.subCallback.RemoveCallbackFuncs(utils.GetGroupName(param.ServiceName, param.GroupName), strings.Join(param.Clusters, ","), &param.SubscribeCallback)

return nil

}

  • Unsubscribe方法则通过sc.subCallback.RemoveCallbackFuncs来取消callback

小结

nacos-sdk-go的NamingClient提供了RegisterInstance、DeregisterInstance、GetService、GetAllServicesInfo、SelectAllInstances、SelectInstances、SelectOneHealthyInstance、Subscribe、Unsubscribe方法

doc

  • naming_client

以上是 聊聊nacossdkgo的NamingClient 的全部内容, 来源链接: utcz.com/z/517785.html

回到顶部