Golang etcd服务注册与发现

本文内容纲要:Golang etcd服务注册与发现

//sevice.go

package discovery

import (

"context"

"errors"

"sync"

"time"

"github.com/coreos/etcd/clientv3"

l4g "github.com/alecthomas/log4go"

)

type Service struct {

closeChan chan struct{} //关闭通道

client *clientv3.Client //etcd v3 client

leaseID clientv3.LeaseID //etcd 租约id

key string //键

val string //值

wg sync.WaitGroup

}

// NewService 构造一个注册服务

func NewService(etcdEndpoints []string, key string, val string) (*Service, error) {

cli, err := clientv3.New(clientv3.Config{

Endpoints: etcdEndpoints,

DialTimeout: 2 * time.Second,

})

if nil != err {

return nil, err

}

s := &Service{

client: cli,

closeChan: make(chan struct{}),

key: key,

val: val,

}

return s, nil

}

// Start 开启注册

// @param - ttlSecond 租期(秒)

func (s *Service) Start(ttlSecond int64) error {

// minimum lease TTL is 5-second

resp, err := s.client.Grant(context.TODO(), ttlSecond)

if err != nil {

panic(err)

}

s.leaseID = resp.ID

_, err = s.client.Put(context.TODO(), s.key, s.val, clientv3.WithLease(s.leaseID))

if err != nil {

panic(err)

}

ch, err1 := s.client.KeepAlive(context.TODO(), s.leaseID)

if nil != err1 {

panic(err)

}

l4g.Info("[discovery] Service Start leaseID:[%d] key:[%s], value:[%s]", s.leaseID, s.key, s.val)

s.wg.Add(1)

defer s.wg.Done()

for {

select {

case <-s.closeChan:

return s.revoke()

case <-s.client.Ctx().Done():

return errors.New("server closed")

case ka, ok := <-ch:

if !ok {

l4g.Warn("[discovery] Service Start keep alive channel closed")

return s.revoke()

} else {

l4g.Fine("[discovery] Service Start recv reply from Service: %s, ttl:%d", s.key, ka.TTL)

}

}

}

return nil

}

// Stop 停止

func (s *Service) Stop() {

close(s.closeChan)

s.wg.Wait()

s.client.Close()

}

func (s *Service) revoke() error {

_, err := s.client.Revoke(context.TODO(), s.leaseID)

if err != nil {

l4g.Error("[discovery] Service revoke key:[%s] error:[%s]", s.key, err.Error())

} else {

l4g.Info("[discovery] Service revoke successfully key:[%s]", s.key)

}

return err

}

//watch.go

package discovery

import (

"context"

"os"

"time"

"github.com/coreos/etcd/clientv3"

"github.com/coreos/etcd/mvcc/mvccpb"

l4g "github.com/alecthomas/log4go"

"google.golang.org/grpc/grpclog"

)

type GroupManager struct {

wg sync.WaitGroup

ctx context.Context

cancel context.CancelFunc

once sync.Once

}

func NewGroupManager() *GroupManager {

ret := new(GroupManager)

ret.ctx, ret.cancel = context.WithCancel(context.Background())

return ret

}

func (this *GroupManager) Close() {

this.once.Do(this.cancel)

}

func (this *GroupManager) Wait() {

this.wg.Wait()

}

func (this *GroupManager) Add(delta int) {

this.wg.Add(delta)

}

func (this *GroupManager) Done() {

this.wg.Done()

}

func (this *GroupManager) Chan() <-chan struct{} {

return this.ctx.Done()

}

type Target interface {

Set(string, string)

Create(string, string)

Modify(string, string)

Delete(string)

}

type Config struct {

Servers []string

DailTimeout int64

RequestTimeout int64

Prefix bool

Target string

}

func Watch(gm *GroupManager, cfg *Config, target Target) {

defer gm.Done()

cli, err := clientv3.New(clientv3.Config{

Endpoints: cfg.Servers,

DialTimeout: time.Duration(cfg.DailTimeout) * time.Second,

})

if err != nil {

panic(err.Error())

return

}

defer cli.Close() // make sure to close the client

l4g.Info("[discovery] start watch %s", cfg.Target)

ctx, cancel := context.WithTimeout(context.Background(), time.Duration(cfg.RequestTimeout)*time.Second)

var resp *clientv3.GetResponse

if cfg.Prefix {

resp, err = cli.Get(ctx, cfg.Target, clientv3.WithPrefix())

} else {

resp, err = cli.Get(ctx, cfg.Target)

}

cancel()

if err != nil {

panic(err.Error())

}

for _, ev := range resp.Kvs {

target.Set(string(string(ev.Key)), string(ev.Value))

}

var rch clientv3.WatchChan

if cfg.Prefix {

rch = cli.Watch(context.Background(), cfg.Target, clientv3.WithPrefix(), clientv3.WithRev(resp.Header.Revision+1))

} else {

rch = cli.Watch(context.Background(), cfg.Target, clientv3.WithRev(resp.Header.Revision+1))

}

for {

select {

case <-gm.Chan():

l4g.Info("[discovery] watch %s close", cfg.Target)

return

case wresp := <-rch:

err := wresp.Err()

if err != nil {

l4g.Info("[discovery] watch %s response error: %s ", cfg.Target, err.Error())

gm.Close()

return

}

l4g.Debug("[discovery] watch %s response %+v", cfg.Target, wresp)

for _, ev := range wresp.Events {

if ev.IsCreate() {

target.Create(string(ev.Kv.Key), string(ev.Kv.Value))

} else if ev.IsModify() {

target.Modify(string(ev.Kv.Key), string(ev.Kv.Value))

} else if ev.Type == mvccpb.DELETE {

target.Delete(string(ev.Kv.Key))

} else {

l4g.Error("[discovery] no found watch type: %s %q", ev.Type, ev.Kv.Key)

}

}

}

}

}

本文内容总结:Golang etcd服务注册与发现

原文链接:https://www.cnblogs.com/mrblue/p/9722682.html

以上是 Golang etcd服务注册与发现 的全部内容, 来源链接: utcz.com/z/297035.html

回到顶部