golang中使用etcd

本文内容纲要:

- with用法

- 读取前缀

- Delete

- 续租:

- 永不过期的租约

- watch

- op取代get put delete方法

- 事务txn实现分布式锁

package main

import (

"github.com/coreos/etcd/clientv3"

"time"

"fmt"

)

func main(){

var (

config clientv3.Config

err error

client *clientv3.Client

)

//配置

config = clientv3.Config{

Endpoints:[]string{"192.168.1.188:2379"},

DialTimeout:time.Second*5,

}

//连接

if client,err = clientv3.New(config);err != nil{

fmt.Println(err)

return

}

client=client

}

package main

import (

"github.com/coreos/etcd/clientv3"

"time"

"fmt"

"context"

)

func main(){

var (

config clientv3.Config

err error

client *clientv3.Client

kv clientv3.KV

putResp *clientv3.PutResponse

)

//配置

config = clientv3.Config{

Endpoints:[]string{"192.168.1.188:2379"},

DialTimeout:time.Second*5,

}

//连接 床见一个客户端

if client,err = clientv3.New(config);err != nil{

fmt.Println(err)

return

}

//用于读写etcd的键值对

kv = clientv3.NewKV(client)

putResp, err = kv.Put(context.TODO(),"/cron/jobs/job1","bye",clientv3.WithPrevKV())

if err != nil{

fmt.Println(err)

}else{

//获取版本信息

fmt.Println("Revision:",putResp.Header.Revision)

if putResp.PrevKv != nil{

fmt.Println("key:",string(putResp.PrevKv.Key))

fmt.Println("Value:",string(putResp.PrevKv.Value))

fmt.Println("Version:",string(putResp.PrevKv.Version))

}

}

}

Revision: 10

key: /cron/jobs/job1

Value: hello

Version:

get

package main

import (

"github.com/coreos/etcd/clientv3"

"time"

"fmt"

"context"

)

func main(){

var (

config clientv3.Config

err error

client *clientv3.Client

kv clientv3.KV

getResp *clientv3.GetResponse

)

//配置

config = clientv3.Config{

Endpoints:[]string{"192.168.1.188:2379"},

DialTimeout:time.Second*5,

}

//连接 床见一个客户端

if client,err = clientv3.New(config);err != nil{

fmt.Println(err)

return

}

//用于读写etcd的键值对

kv = clientv3.NewKV(client)

getResp,err = kv.Get(context.TODO(),"/cron/jobs/job1")

if err != nil {

fmt.Println(err)

return

}

fmt.Println(getResp.Kvs)

}

[key:"/cron/jobs/job1" create_revision:4 mod_revision:11 version:5 value:"bye" ]

with用法

//用于读写etcd的键值对

kv = clientv3.NewKV(client)

getResp,err = kv.Get(context.TODO(),"/cron/jobs/job1",clientv3.WithCountOnly())

if err != nil {

fmt.Println(err)

return

}

fmt.Println(getResp.Kvs,getResp.Count)

[] 1

读取前缀

//用于读写etcd的键值对

kv = clientv3.NewKV(client)

//读取前缀

getResp,err = kv.Get(context.TODO(),"/cron/jobs/",clientv3.WithPrefix())

if err != nil {

fmt.Println(err)

return

}

fmt.Println(getResp.Kvs)

[key:"/cron/jobs/job1" create_revision:4 mod_revision:11 version:5 value:"bye"

key:"/cron/jobs/job2" create_revision:12 mod_revision:12 version:1 value:"byehhhhhh" ]

Delete

//用于读写etcd的键值对

kv = clientv3.NewKV(client)

delResp,err = kv.Delete(context.TODO(),"/cron/jobs/job2",clientv3.WithPrevKV())

if err != nil{

fmt.Println(err)

return

}else{

if len(delResp.PrevKvs) > 0 {

for idx,kvpair = range delResp.PrevKvs{

idx = idx

fmt.Println("删除了",string(kvpair.Key),string(kvpair.Value))

}

}

}

byehhhhhh

删除多个key

delResp,err = kv.Delete(context.TODO(),"/cron/jobs",clientv3.WithPrefix())

续租:

package main

import (

"github.com/coreos/etcd/clientv3"

"time"

"fmt"

"context"

)

func main(){

var (

config clientv3.Config

err error

client *clientv3.Client

kv clientv3.KV

lease clientv3.Lease

leaseid clientv3.LeaseID

leaseGrantResp *clientv3.LeaseGrantResponse

putResp *clientv3.PutResponse

getResp *clientv3.GetResponse

//keepresp *clientv3.LeaseKeepAliveResponse

//keepRestChan <-chan *clientv3.LeaseKeepAliveResponse

)

//配置

config = clientv3.Config{

Endpoints:[]string{"192.168.1.188:2379"},

DialTimeout:time.Second*5,

}

//连接 床见一个客户端

if client,err = clientv3.New(config);err != nil{

fmt.Println(err)

return

}

//申请一个lease 租约

lease = clientv3.NewLease(client)

//申请一个10秒的租约

if leaseGrantResp, err = lease.Grant(context.TODO(),10);err != nil{

fmt.Println(err)

return

}

//拿到租约id

leaseid = leaseGrantResp.ID

//获得kv api子集

kv = clientv3.NewKV(client)

//put一个kv 让它与租约关联起来 从而实现10秒自动过期

if putResp,err = kv.Put(context.TODO(),"cron/lock/job1","v5",clientv3.WithLease(leaseid));err != nil{

fmt.Println(err)

return

}

fmt.Println("写入成功",putResp.Header.Revision)

//定时的看一下key过期了没有

for{

if getResp,err = kv.Get(context.TODO(),"cron/lock/job1");err != nil{

fmt.Println(err)

return

}

if getResp.Count == 0{

fmt.Println("kv过期了")

break

}

fmt.Println("还没过期:",getResp.Kvs)

time.Sleep(time.Second*2)

}

}

写入成功 24

还没过期: [key:"cron/lock/job1" create_revision:24 mod_revision:24 version:1 value:"v5" lease:7587840069550468387 ]

还没过期: [key:"cron/lock/job1" create_revision:24 mod_revision:24 version:1 value:"v5" lease:7587840069550468387 ]

还没过期: [key:"cron/lock/job1" create_revision:24 mod_revision:24 version:1 value:"v5" lease:7587840069550468387 ]

还没过期: [key:"cron/lock/job1" create_revision:24 mod_revision:24 version:1 value:"v5" lease:7587840069550468387 ]

还没过期: [key:"cron/lock/job1" create_revision:24 mod_revision:24 version:1 value:"v5" lease:7587840069550468387 ]

还没过期: [key:"cron/lock/job1" create_revision:24 mod_revision:24 version:1 value:"v5" lease:7587840069550468387 ]

kv过期了

永不过期的租约

ImageImage

package main

import (

"github.com/coreos/etcd/clientv3"

"time"

"fmt"

"context"

)

func main(){

var (

config clientv3.Config

err error

client *clientv3.Client

kv clientv3.KV

lease clientv3.Lease

leaseid clientv3.LeaseID

leaseGrantResp *clientv3.LeaseGrantResponse

putResp *clientv3.PutResponse

getResp *clientv3.GetResponse

keepresp *clientv3.LeaseKeepAliveResponse

keepRestChan <-chan *clientv3.LeaseKeepAliveResponse

)

//配置

config = clientv3.Config{

Endpoints:[]string{"192.168.1.188:2379"},

DialTimeout:time.Second*5,

}

//连接 床见一个客户端

if client,err = clientv3.New(config);err != nil{

fmt.Println(err)

return

}

//申请一个lease 租约

lease = clientv3.NewLease(client)

//申请一个10秒的租约

if leaseGrantResp, err = lease.Grant(context.TODO(),10);err != nil{

fmt.Println(err)

return

}

//拿到租约id

leaseid = leaseGrantResp.ID

//获得kv api子集

kv = clientv3.NewKV(client)

//自动续租

if keepRestChan,err = lease.KeepAlive(context.TODO(),leaseid);err != nil{

fmt.Println(err)

return

}

//处理续租应答的协程

go func() {

for {

select {

case keepresp = <-keepRestChan:

if keepRestChan == nil{

fmt.Println("租约已失效了")

goto END

}else{//每秒会续租一次,所以就会收到一次应答

fmt.Println("收到自动续租的应答")

}

}

}

END:

}()

//put一个kv 让它与租约关联起来 从而实现10秒自动过期

if putResp,err = kv.Put(context.TODO(),"cron/lock/job1","v5",clientv3.WithLease(leaseid));err != nil{

fmt.Println(err)

return

}

fmt.Println("写入成功",putResp.Header.Revision)

//定时的看一下key过期了没有

for{

if getResp,err = kv.Get(context.TODO(),"cron/lock/job1");err != nil{

fmt.Println(err)

return

}

if getResp.Count == 0{

fmt.Println("kv过期了")

break

}

fmt.Println("还没过期:",getResp.Kvs)

time.Sleep(time.Second*2)

}

}

View Code

写入成功 38

收到自动续租的应答

还没过期: [key:"cron/lock/job1" create_revision:38 mod_revision:38 version:1 value:"v5" lease:7587840069550468444 ]

还没过期: [key:"cron/lock/job1" create_revision:38 mod_revision:38 version:1 value:"v5" lease:7587840069550468444 ]

收到自动续租的应答

还没过期: [key:"cron/lock/job1" create_revision:38 mod_revision:38 version:1 value:"v5" lease:7587840069550468444 ]

还没过期: [key:"cron/lock/job1" create_revision:38 mod_revision:38 version:1 value:"v5" lease:7587840069550468444 ]

收到自动续租的应答

还没过期: [key:"cron/lock/job1" create_revision:38 mod_revision:38 version:1 value:"v5" lease:7587840069550468444 ]

还没过期: [key:"cron/lock/job1" create_revision:38 mod_revision:38 version:1 value:"v5" lease:7587840069550468444 ]

收到自动续租的应答

watch

  监听k v变化 常用作与集群中配置下发,状态同步 非常有价值

package main

import (

"github.com/coreos/etcd/clientv3"

"time"

"fmt"

"context"

"github.com/coreos/etcd/mvcc/mvccpb"

)

func main() {

var (

config clientv3.Config

client *clientv3.Client

err error

kv clientv3.KV

watcher clientv3.Watcher

getResp *clientv3.GetResponse

watchStartRevision int64

watchRespChan <-chan clientv3.WatchResponse

watchResp clientv3.WatchResponse

event *clientv3.Event

)

// 客户端配置

config = clientv3.Config{

Endpoints: []string{"36.111.184.221:2379"},

DialTimeout: 5 * time.Second,

}

// 建立连接

if client, err = clientv3.New(config); err != nil {

fmt.Println(err)

return

}

// KV

kv = clientv3.NewKV(client)

// 模拟etcd中KV的变化

go func() {

for {

kv.Put(context.TODO(), "/cron/jobs/job7", "i am job7")

kv.Delete(context.TODO(), "/cron/jobs/job7")

time.Sleep(1 * time.Second)

}

}()

// 先GET到当前的值,并监听后续变化

if getResp, err = kv.Get(context.TODO(), "/cron/jobs/job7"); err != nil {

fmt.Println(err)

return

}

// 现在key是存在的

if len(getResp.Kvs) != 0 {

fmt.Println("当前值:", string(getResp.Kvs[0].Value))

}

// 当前etcd集群事务ID, 单调递增的

watchStartRevision = getResp.Header.Revision + 1

// 创建一个watcher

watcher = clientv3.NewWatcher(client)

// 启动监听

fmt.Println("从该版本向后监听:", watchStartRevision)

ctx, cancelFunc := context.WithCancel(context.TODO())

time.AfterFunc(5 * time.Second, func() {

cancelFunc()

})

watchRespChan = watcher.Watch(ctx, "/cron/jobs/job7", clientv3.WithRev(watchStartRevision))

// 处理kv变化事件

for watchResp = range watchRespChan {

for _, event = range watchResp.Events {

switch event.Type {

case mvccpb.PUT:

fmt.Println("修改为:", string(event.Kv.Value), "Revision:", event.Kv.CreateRevision, event.Kv.ModRevision)

case mvccpb.DELETE:

fmt.Println("删除了", "Revision:", event.Kv.ModRevision)

}

}

}

}

当前值: i am job7

从该版本向后监听: 72

删除了 key:"/cron/jobs/job7" mod_revision:72

修改为: key:"/cron/jobs/job7" create_revision:73 mod_revision:73 version:1 value:"i am job7"

删除了 key:"/cron/jobs/job7" mod_revision:74

修改为: key:"/cron/jobs/job7" create_revision:75 mod_revision:75 version:1 value:"i am job7"

删除了 key:"/cron/jobs/job7" mod_revision:76

修改为: key:"/cron/jobs/job7" create_revision:77 mod_revision:77 version:1 value:"i am job7"

删除了 key:"/cron/jobs/job7" mod_revision:78

修改为: key:"/cron/jobs/job7" create_revision:79 mod_revision:79 version:1 value:"i am job7"

删除了 key:"/cron/jobs/job7" mod_revision:80

修改为: key:"/cron/jobs/job7" create_revision:81 mod_revision:81 version:1 value:"i am job7"

删除了 key:"/cron/jobs/job7" mod_revision:82

op取代get put delete方法

ImageImage

package main

import (

"github.com/coreos/etcd/clientv3"

"time"

"fmt"

"context"

)

func main() {

var (

config clientv3.Config

client *clientv3.Client

err error

kv clientv3.KV

putOp clientv3.Op

getOp clientv3.Op

opResp clientv3.OpResponse

)

// 客户端配置

config = clientv3.Config{

Endpoints: []string{"36.111.184.221:2379"},

DialTimeout: 5 * time.Second,

}

// 建立连接

if client, err = clientv3.New(config); err != nil {

fmt.Println(err)

return

}

kv = clientv3.NewKV(client)

// 创建Op: operation

putOp = clientv3.OpPut("/cron/jobs/job8", "123123123")

// 执行OP

if opResp, err = kv.Do(context.TODO(), putOp); err != nil {

fmt.Println(err)

return

}

// kv.Do(op)

// kv.Put

// kv.Get

// kv.Delete

fmt.Println("写入Revision:", opResp.Put().Header.Revision)

// 创建Op

getOp = clientv3.OpGet("/cron/jobs/job8")

// 执行OP

if opResp, err = kv.Do(context.TODO(), getOp); err != nil {

fmt.Println(err)

return

}

// 打印

fmt.Println("数据Revision:", opResp.Get().Kvs[0].ModRevision) // create rev == mod rev

fmt.Println("数据value:", string(opResp.Get().Kvs[0].Value))

}

View Code

事务txn实现分布式锁

package main

import (

"github.com/coreos/etcd/clientv3"

"time"

"fmt"

"context"

)

func main() {

var (

config clientv3.Config

client *clientv3.Client

err error

lease clientv3.Lease

leaseGrantResp *clientv3.LeaseGrantResponse

leaseId clientv3.LeaseID

keepRespChan <-chan *clientv3.LeaseKeepAliveResponse

keepResp *clientv3.LeaseKeepAliveResponse

ctx context.Context

cancelFunc context.CancelFunc

kv clientv3.KV

txn clientv3.Txn

txnResp *clientv3.TxnResponse

)

// 客户端配置

config = clientv3.Config{

Endpoints: []string{"36.111.184.221:2379"},

DialTimeout: 5 * time.Second,

}

// 建立连接

if client, err = clientv3.New(config); err != nil {

fmt.Println(err)

return

}

// lease实现锁自动过期:

// op操作

// txn事务: if else then

// 1, 上锁 (创建租约, 自动续租, 拿着租约去抢占一个key)

lease = clientv3.NewLease(client)

// 申请一个5秒的租约

if leaseGrantResp, err = lease.Grant(context.TODO(), 5); err != nil {

fmt.Println(err)

return

}

// 拿到租约的ID

leaseId = leaseGrantResp.ID

// 准备一个用于取消自动续租的context

ctx, cancelFunc = context.WithCancel(context.TODO())

// 确保函数退出后, 自动续租会停止

defer cancelFunc()

defer lease.Revoke(context.TODO(), leaseId)

// 5秒后会取消自动续租

if keepRespChan, err = lease.KeepAlive(ctx, leaseId); err != nil {

fmt.Println(err)

return

}

// 处理续约应答的协程

go func() {

for {

select {

case keepResp = <- keepRespChan:

if keepRespChan == nil {

fmt.Println("租约已经失效了")

goto END

} else { // 每秒会续租一次, 所以就会受到一次应答

fmt.Println("收到自动续租应答:", keepResp.ID)

}

}

}

END:

}()

// if 不存在key, then 设置它, else 抢锁失败

kv = clientv3.NewKV(client)

// 创建事务

txn = kv.Txn(context.TODO())

// 定义事务

// 如果key不存在

txn.If(clientv3.Compare(clientv3.CreateRevision("/cron/lock/job9"), "=", 0)).

Then(clientv3.OpPut("/cron/lock/job9", "xxx", clientv3.WithLease(leaseId))).

Else(clientv3.OpGet("/cron/lock/job9")) // 否则抢锁失败

// 提交事务

if txnResp, err = txn.Commit(); err != nil {

fmt.Println(err)

return // 没有问题

}

// 判断是否抢到了锁

if !txnResp.Succeeded {

fmt.Println("锁被占用:", string(txnResp.Responses[0].GetResponseRange().Kvs[0].Value))

return

}

// 2, 处理业务

fmt.Println("处理任务")

time.Sleep(5 * time.Second)

// 3, 释放锁(取消自动续租, 释放租约)

// defer 会把租约释放掉, 关联的KV就被删除了

}

执行结果:

Image

本文内容总结:with用法,读取前缀,Delete,续租:,永不过期的租约,watch,op取代get put delete方法,事务txn实现分布式锁,

原文链接:https://www.cnblogs.com/sunlong88/p/11295424.html

以上是 golang中使用etcd 的全部内容, 来源链接: utcz.com/z/296987.html

回到顶部