Golang etcd服务注册与发现
本文内容纲要:Golang etcd服务注册与发现
//sevice.go
package discoveryimport (
"context"
"errors"
"sync"
"time"
"github.com/coreos/etcd/clientv3"
l4g "github.com/alecthomas/log4go"
)
type Service struct {
closeChan chan struct{} //关闭通道
client *clientv3.Client //etcd v3 client
leaseID clientv3.LeaseID //etcd 租约id
key string //键
val string //值
wg sync.WaitGroup
}
// NewService 构造一个注册服务
func NewService(etcdEndpoints []string, key string, val string) (*Service, error) {
cli, err := clientv3.New(clientv3.Config{
Endpoints: etcdEndpoints,
DialTimeout: 2 * time.Second,
})
if nil != err {
return nil, err
}
s := &Service{
client: cli,
closeChan: make(chan struct{}),
key: key,
val: val,
}
return s, nil
}
// Start 开启注册
// @param - ttlSecond 租期(秒)
func (s *Service) Start(ttlSecond int64) error {
// minimum lease TTL is 5-second
resp, err := s.client.Grant(context.TODO(), ttlSecond)
if err != nil {
panic(err)
}
s.leaseID = resp.ID
_, err = s.client.Put(context.TODO(), s.key, s.val, clientv3.WithLease(s.leaseID))
if err != nil {
panic(err)
}
ch, err1 := s.client.KeepAlive(context.TODO(), s.leaseID)
if nil != err1 {
panic(err)
}
l4g.Info("[discovery] Service Start leaseID:[%d] key:[%s], value:[%s]", s.leaseID, s.key, s.val)
s.wg.Add(1)
defer s.wg.Done()
for {
select {
case <-s.closeChan:
return s.revoke()
case <-s.client.Ctx().Done():
return errors.New("server closed")
case ka, ok := <-ch:
if !ok {
l4g.Warn("[discovery] Service Start keep alive channel closed")
return s.revoke()
} else {
l4g.Fine("[discovery] Service Start recv reply from Service: %s, ttl:%d", s.key, ka.TTL)
}
}
}
return nil
}
// Stop 停止
func (s *Service) Stop() {
close(s.closeChan)
s.wg.Wait()
s.client.Close()
}
func (s *Service) revoke() error {
_, err := s.client.Revoke(context.TODO(), s.leaseID)
if err != nil {
l4g.Error("[discovery] Service revoke key:[%s] error:[%s]", s.key, err.Error())
} else {
l4g.Info("[discovery] Service revoke successfully key:[%s]", s.key)
}
return err
}
//watch.go
package discoveryimport (
"context"
"os"
"time"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/mvcc/mvccpb"
l4g "github.com/alecthomas/log4go"
"google.golang.org/grpc/grpclog"
)
type GroupManager struct {
wg sync.WaitGroup
ctx context.Context
cancel context.CancelFunc
once sync.Once
}
func NewGroupManager() *GroupManager {
ret := new(GroupManager)
ret.ctx, ret.cancel = context.WithCancel(context.Background())
return ret
}
func (this *GroupManager) Close() {
this.once.Do(this.cancel)
}
func (this *GroupManager) Wait() {
this.wg.Wait()
}
func (this *GroupManager) Add(delta int) {
this.wg.Add(delta)
}
func (this *GroupManager) Done() {
this.wg.Done()
}
func (this *GroupManager) Chan() <-chan struct{} {
return this.ctx.Done()
}
type Target interface {
Set(string, string)
Create(string, string)
Modify(string, string)
Delete(string)
}
type Config struct {
Servers []string
DailTimeout int64
RequestTimeout int64
Prefix bool
Target string
}
func Watch(gm *GroupManager, cfg *Config, target Target) {
defer gm.Done()
cli, err := clientv3.New(clientv3.Config{
Endpoints: cfg.Servers,
DialTimeout: time.Duration(cfg.DailTimeout) * time.Second,
})
if err != nil {
panic(err.Error())
return
}
defer cli.Close() // make sure to close the client
l4g.Info("[discovery] start watch %s", cfg.Target)
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(cfg.RequestTimeout)*time.Second)
var resp *clientv3.GetResponse
if cfg.Prefix {
resp, err = cli.Get(ctx, cfg.Target, clientv3.WithPrefix())
} else {
resp, err = cli.Get(ctx, cfg.Target)
}
cancel()
if err != nil {
panic(err.Error())
}
for _, ev := range resp.Kvs {
target.Set(string(string(ev.Key)), string(ev.Value))
}
var rch clientv3.WatchChan
if cfg.Prefix {
rch = cli.Watch(context.Background(), cfg.Target, clientv3.WithPrefix(), clientv3.WithRev(resp.Header.Revision+1))
} else {
rch = cli.Watch(context.Background(), cfg.Target, clientv3.WithRev(resp.Header.Revision+1))
}
for {
select {
case <-gm.Chan():
l4g.Info("[discovery] watch %s close", cfg.Target)
return
case wresp := <-rch:
err := wresp.Err()
if err != nil {
l4g.Info("[discovery] watch %s response error: %s ", cfg.Target, err.Error())
gm.Close()
return
}
l4g.Debug("[discovery] watch %s response %+v", cfg.Target, wresp)
for _, ev := range wresp.Events {
if ev.IsCreate() {
target.Create(string(ev.Kv.Key), string(ev.Kv.Value))
} else if ev.IsModify() {
target.Modify(string(ev.Kv.Key), string(ev.Kv.Value))
} else if ev.Type == mvccpb.DELETE {
target.Delete(string(ev.Kv.Key))
} else {
l4g.Error("[discovery] no found watch type: %s %q", ev.Type, ev.Kv.Key)
}
}
}
}
}
本文内容总结:Golang etcd服务注册与发现
原文链接:https://www.cnblogs.com/mrblue/p/9722682.html
以上是 Golang etcd服务注册与发现 的全部内容, 来源链接: utcz.com/z/297035.html