golang中使用etcd
本文内容纲要:
- with用法- 读取前缀
- Delete
- 续租:
- 永不过期的租约
- watch
- op取代get put delete方法
- 事务txn实现分布式锁
package main
import (
"github.com/coreos/etcd/clientv3"
"time"
"fmt"
)
func main(){
var (
config clientv3.Config
err error
client *clientv3.Client
)
//配置
config = clientv3.Config{
Endpoints:[]string{"192.168.1.188:2379"},
DialTimeout:time.Second*5,
}
//连接
if client,err = clientv3.New(config);err != nil{
fmt.Println(err)
return
}
client=client
}
package main
import (
"github.com/coreos/etcd/clientv3"
"time"
"fmt"
"context"
)
func main(){
var (
config clientv3.Config
err error
client *clientv3.Client
kv clientv3.KV
putResp *clientv3.PutResponse
)
//配置
config = clientv3.Config{
Endpoints:[]string{"192.168.1.188:2379"},
DialTimeout:time.Second*5,
}
//连接 床见一个客户端
if client,err = clientv3.New(config);err != nil{
fmt.Println(err)
return
}
//用于读写etcd的键值对
kv = clientv3.NewKV(client)
putResp, err = kv.Put(context.TODO(),"/cron/jobs/job1","bye",clientv3.WithPrevKV())
if err != nil{
fmt.Println(err)
}else{
//获取版本信息
fmt.Println("Revision:",putResp.Header.Revision)
if putResp.PrevKv != nil{
fmt.Println("key:",string(putResp.PrevKv.Key))
fmt.Println("Value:",string(putResp.PrevKv.Value))
fmt.Println("Version:",string(putResp.PrevKv.Version))
}
}
}
Revision: 10
key: /cron/jobs/job1
Value: hello
Version:
get
package mainimport (
"github.com/coreos/etcd/clientv3"
"time"
"fmt"
"context"
)
func main(){
var (
config clientv3.Config
err error
client *clientv3.Client
kv clientv3.KV
getResp *clientv3.GetResponse
)
//配置
config = clientv3.Config{
Endpoints:[]string{"192.168.1.188:2379"},
DialTimeout:time.Second*5,
}
//连接 床见一个客户端
if client,err = clientv3.New(config);err != nil{
fmt.Println(err)
return
}
//用于读写etcd的键值对
kv = clientv3.NewKV(client)
getResp,err = kv.Get(context.TODO(),"/cron/jobs/job1")
if err != nil {
fmt.Println(err)
return
}
fmt.Println(getResp.Kvs)
}
[key:"/cron/jobs/job1" create_revision:4 mod_revision:11 version:5 value:"bye" ]
with用法
//用于读写etcd的键值对 kv = clientv3.NewKV(client)
getResp,err = kv.Get(context.TODO(),"/cron/jobs/job1",clientv3.WithCountOnly())
if err != nil {
fmt.Println(err)
return
}
fmt.Println(getResp.Kvs,getResp.Count)
[] 1
读取前缀
//用于读写etcd的键值对 kv = clientv3.NewKV(client)
//读取前缀
getResp,err = kv.Get(context.TODO(),"/cron/jobs/",clientv3.WithPrefix())
if err != nil {
fmt.Println(err)
return
}
fmt.Println(getResp.Kvs)
[key:"/cron/jobs/job1" create_revision:4 mod_revision:11 version:5 value:"bye"
key:"/cron/jobs/job2" create_revision:12 mod_revision:12 version:1 value:"byehhhhhh" ]
Delete
//用于读写etcd的键值对 kv = clientv3.NewKV(client)
delResp,err = kv.Delete(context.TODO(),"/cron/jobs/job2",clientv3.WithPrevKV())
if err != nil{
fmt.Println(err)
return
}else{
if len(delResp.PrevKvs) > 0 {
for idx,kvpair = range delResp.PrevKvs{
idx = idx
fmt.Println("删除了",string(kvpair.Key),string(kvpair.Value))
}
}
}
byehhhhhh
删除多个key
delResp,err = kv.Delete(context.TODO(),"/cron/jobs",clientv3.WithPrefix())
续租:
package mainimport (
"github.com/coreos/etcd/clientv3"
"time"
"fmt"
"context"
)
func main(){
var (
config clientv3.Config
err error
client *clientv3.Client
kv clientv3.KV
lease clientv3.Lease
leaseid clientv3.LeaseID
leaseGrantResp *clientv3.LeaseGrantResponse
putResp *clientv3.PutResponse
getResp *clientv3.GetResponse
//keepresp *clientv3.LeaseKeepAliveResponse
//keepRestChan <-chan *clientv3.LeaseKeepAliveResponse
)
//配置
config = clientv3.Config{
Endpoints:[]string{"192.168.1.188:2379"},
DialTimeout:time.Second*5,
}
//连接 床见一个客户端
if client,err = clientv3.New(config);err != nil{
fmt.Println(err)
return
}
//申请一个lease 租约
lease = clientv3.NewLease(client)
//申请一个10秒的租约
if leaseGrantResp, err = lease.Grant(context.TODO(),10);err != nil{
fmt.Println(err)
return
}
//拿到租约id
leaseid = leaseGrantResp.ID
//获得kv api子集
kv = clientv3.NewKV(client)
//put一个kv 让它与租约关联起来 从而实现10秒自动过期
if putResp,err = kv.Put(context.TODO(),"cron/lock/job1","v5",clientv3.WithLease(leaseid));err != nil{
fmt.Println(err)
return
}
fmt.Println("写入成功",putResp.Header.Revision)
//定时的看一下key过期了没有
for{
if getResp,err = kv.Get(context.TODO(),"cron/lock/job1");err != nil{
fmt.Println(err)
return
}
if getResp.Count == 0{
fmt.Println("kv过期了")
break
}
fmt.Println("还没过期:",getResp.Kvs)
time.Sleep(time.Second*2)
}
}
写入成功 24
还没过期: [key:"cron/lock/job1" create_revision:24 mod_revision:24 version:1 value:"v5" lease:7587840069550468387 ]
还没过期: [key:"cron/lock/job1" create_revision:24 mod_revision:24 version:1 value:"v5" lease:7587840069550468387 ]
还没过期: [key:"cron/lock/job1" create_revision:24 mod_revision:24 version:1 value:"v5" lease:7587840069550468387 ]
还没过期: [key:"cron/lock/job1" create_revision:24 mod_revision:24 version:1 value:"v5" lease:7587840069550468387 ]
还没过期: [key:"cron/lock/job1" create_revision:24 mod_revision:24 version:1 value:"v5" lease:7587840069550468387 ]
还没过期: [key:"cron/lock/job1" create_revision:24 mod_revision:24 version:1 value:"v5" lease:7587840069550468387 ]
kv过期了
永不过期的租约
package mainimport (
"github.com/coreos/etcd/clientv3"
"time"
"fmt"
"context"
)
func main(){
var (
config clientv3.Config
err error
client *clientv3.Client
kv clientv3.KV
lease clientv3.Lease
leaseid clientv3.LeaseID
leaseGrantResp *clientv3.LeaseGrantResponse
putResp *clientv3.PutResponse
getResp *clientv3.GetResponse
keepresp *clientv3.LeaseKeepAliveResponse
keepRestChan <-chan *clientv3.LeaseKeepAliveResponse
)
//配置
config = clientv3.Config{
Endpoints:[]string{"192.168.1.188:2379"},
DialTimeout:time.Second*5,
}
//连接 床见一个客户端
if client,err = clientv3.New(config);err != nil{
fmt.Println(err)
return
}
//申请一个lease 租约
lease = clientv3.NewLease(client)
//申请一个10秒的租约
if leaseGrantResp, err = lease.Grant(context.TODO(),10);err != nil{
fmt.Println(err)
return
}
//拿到租约id
leaseid = leaseGrantResp.ID
//获得kv api子集
kv = clientv3.NewKV(client)
//自动续租
if keepRestChan,err = lease.KeepAlive(context.TODO(),leaseid);err != nil{
fmt.Println(err)
return
}
//处理续租应答的协程
go func() {
for {
select {
case keepresp = <-keepRestChan:
if keepRestChan == nil{
fmt.Println("租约已失效了")
goto END
}else{//每秒会续租一次,所以就会收到一次应答
fmt.Println("收到自动续租的应答")
}
}
}
END:
}()
//put一个kv 让它与租约关联起来 从而实现10秒自动过期
if putResp,err = kv.Put(context.TODO(),"cron/lock/job1","v5",clientv3.WithLease(leaseid));err != nil{
fmt.Println(err)
return
}
fmt.Println("写入成功",putResp.Header.Revision)
//定时的看一下key过期了没有
for{
if getResp,err = kv.Get(context.TODO(),"cron/lock/job1");err != nil{
fmt.Println(err)
return
}
if getResp.Count == 0{
fmt.Println("kv过期了")
break
}
fmt.Println("还没过期:",getResp.Kvs)
time.Sleep(time.Second*2)
}
}
View Code
写入成功 38收到自动续租的应答
还没过期: [key:"cron/lock/job1" create_revision:38 mod_revision:38 version:1 value:"v5" lease:7587840069550468444 ]
还没过期: [key:"cron/lock/job1" create_revision:38 mod_revision:38 version:1 value:"v5" lease:7587840069550468444 ]
收到自动续租的应答
还没过期: [key:"cron/lock/job1" create_revision:38 mod_revision:38 version:1 value:"v5" lease:7587840069550468444 ]
还没过期: [key:"cron/lock/job1" create_revision:38 mod_revision:38 version:1 value:"v5" lease:7587840069550468444 ]
收到自动续租的应答
还没过期: [key:"cron/lock/job1" create_revision:38 mod_revision:38 version:1 value:"v5" lease:7587840069550468444 ]
还没过期: [key:"cron/lock/job1" create_revision:38 mod_revision:38 version:1 value:"v5" lease:7587840069550468444 ]
收到自动续租的应答
watch
监听k v变化 常用作与集群中配置下发,状态同步 非常有价值
package mainimport (
"github.com/coreos/etcd/clientv3"
"time"
"fmt"
"context"
"github.com/coreos/etcd/mvcc/mvccpb"
)
func main() {
var (
config clientv3.Config
client *clientv3.Client
err error
kv clientv3.KV
watcher clientv3.Watcher
getResp *clientv3.GetResponse
watchStartRevision int64
watchRespChan <-chan clientv3.WatchResponse
watchResp clientv3.WatchResponse
event *clientv3.Event
)
// 客户端配置
config = clientv3.Config{
Endpoints: []string{"36.111.184.221:2379"},
DialTimeout: 5 * time.Second,
}
// 建立连接
if client, err = clientv3.New(config); err != nil {
fmt.Println(err)
return
}
// KV
kv = clientv3.NewKV(client)
// 模拟etcd中KV的变化
go func() {
for {
kv.Put(context.TODO(), "/cron/jobs/job7", "i am job7")
kv.Delete(context.TODO(), "/cron/jobs/job7")
time.Sleep(1 * time.Second)
}
}()
// 先GET到当前的值,并监听后续变化
if getResp, err = kv.Get(context.TODO(), "/cron/jobs/job7"); err != nil {
fmt.Println(err)
return
}
// 现在key是存在的
if len(getResp.Kvs) != 0 {
fmt.Println("当前值:", string(getResp.Kvs[0].Value))
}
// 当前etcd集群事务ID, 单调递增的
watchStartRevision = getResp.Header.Revision + 1
// 创建一个watcher
watcher = clientv3.NewWatcher(client)
// 启动监听
fmt.Println("从该版本向后监听:", watchStartRevision)
ctx, cancelFunc := context.WithCancel(context.TODO())
time.AfterFunc(5 * time.Second, func() {
cancelFunc()
})
watchRespChan = watcher.Watch(ctx, "/cron/jobs/job7", clientv3.WithRev(watchStartRevision))
// 处理kv变化事件
for watchResp = range watchRespChan {
for _, event = range watchResp.Events {
switch event.Type {
case mvccpb.PUT:
fmt.Println("修改为:", string(event.Kv.Value), "Revision:", event.Kv.CreateRevision, event.Kv.ModRevision)
case mvccpb.DELETE:
fmt.Println("删除了", "Revision:", event.Kv.ModRevision)
}
}
}
}
当前值: i am job7
从该版本向后监听: 72
删除了 key:"/cron/jobs/job7" mod_revision:72
修改为: key:"/cron/jobs/job7" create_revision:73 mod_revision:73 version:1 value:"i am job7"
删除了 key:"/cron/jobs/job7" mod_revision:74
修改为: key:"/cron/jobs/job7" create_revision:75 mod_revision:75 version:1 value:"i am job7"
删除了 key:"/cron/jobs/job7" mod_revision:76
修改为: key:"/cron/jobs/job7" create_revision:77 mod_revision:77 version:1 value:"i am job7"
删除了 key:"/cron/jobs/job7" mod_revision:78
修改为: key:"/cron/jobs/job7" create_revision:79 mod_revision:79 version:1 value:"i am job7"
删除了 key:"/cron/jobs/job7" mod_revision:80
修改为: key:"/cron/jobs/job7" create_revision:81 mod_revision:81 version:1 value:"i am job7"
删除了 key:"/cron/jobs/job7" mod_revision:82
op取代get put delete方法
package mainimport (
"github.com/coreos/etcd/clientv3"
"time"
"fmt"
"context"
)
func main() {
var (
config clientv3.Config
client *clientv3.Client
err error
kv clientv3.KV
putOp clientv3.Op
getOp clientv3.Op
opResp clientv3.OpResponse
)
// 客户端配置
config = clientv3.Config{
Endpoints: []string{"36.111.184.221:2379"},
DialTimeout: 5 * time.Second,
}
// 建立连接
if client, err = clientv3.New(config); err != nil {
fmt.Println(err)
return
}
kv = clientv3.NewKV(client)
// 创建Op: operation
putOp = clientv3.OpPut("/cron/jobs/job8", "123123123")
// 执行OP
if opResp, err = kv.Do(context.TODO(), putOp); err != nil {
fmt.Println(err)
return
}
// kv.Do(op)
// kv.Put
// kv.Get
// kv.Delete
fmt.Println("写入Revision:", opResp.Put().Header.Revision)
// 创建Op
getOp = clientv3.OpGet("/cron/jobs/job8")
// 执行OP
if opResp, err = kv.Do(context.TODO(), getOp); err != nil {
fmt.Println(err)
return
}
// 打印
fmt.Println("数据Revision:", opResp.Get().Kvs[0].ModRevision) // create rev == mod rev
fmt.Println("数据value:", string(opResp.Get().Kvs[0].Value))
}
View Code
事务txn实现分布式锁
package mainimport (
"github.com/coreos/etcd/clientv3"
"time"
"fmt"
"context"
)
func main() {
var (
config clientv3.Config
client *clientv3.Client
err error
lease clientv3.Lease
leaseGrantResp *clientv3.LeaseGrantResponse
leaseId clientv3.LeaseID
keepRespChan <-chan *clientv3.LeaseKeepAliveResponse
keepResp *clientv3.LeaseKeepAliveResponse
ctx context.Context
cancelFunc context.CancelFunc
kv clientv3.KV
txn clientv3.Txn
txnResp *clientv3.TxnResponse
)
// 客户端配置
config = clientv3.Config{
Endpoints: []string{"36.111.184.221:2379"},
DialTimeout: 5 * time.Second,
}
// 建立连接
if client, err = clientv3.New(config); err != nil {
fmt.Println(err)
return
}
// lease实现锁自动过期:
// op操作
// txn事务: if else then
// 1, 上锁 (创建租约, 自动续租, 拿着租约去抢占一个key)
lease = clientv3.NewLease(client)
// 申请一个5秒的租约
if leaseGrantResp, err = lease.Grant(context.TODO(), 5); err != nil {
fmt.Println(err)
return
}
// 拿到租约的ID
leaseId = leaseGrantResp.ID
// 准备一个用于取消自动续租的context
ctx, cancelFunc = context.WithCancel(context.TODO())
// 确保函数退出后, 自动续租会停止
defer cancelFunc()
defer lease.Revoke(context.TODO(), leaseId)
// 5秒后会取消自动续租
if keepRespChan, err = lease.KeepAlive(ctx, leaseId); err != nil {
fmt.Println(err)
return
}
// 处理续约应答的协程
go func() {
for {
select {
case keepResp = <- keepRespChan:
if keepRespChan == nil {
fmt.Println("租约已经失效了")
goto END
} else { // 每秒会续租一次, 所以就会受到一次应答
fmt.Println("收到自动续租应答:", keepResp.ID)
}
}
}
END:
}()
// if 不存在key, then 设置它, else 抢锁失败
kv = clientv3.NewKV(client)
// 创建事务
txn = kv.Txn(context.TODO())
// 定义事务
// 如果key不存在
txn.If(clientv3.Compare(clientv3.CreateRevision("/cron/lock/job9"), "=", 0)).
Then(clientv3.OpPut("/cron/lock/job9", "xxx", clientv3.WithLease(leaseId))).
Else(clientv3.OpGet("/cron/lock/job9")) // 否则抢锁失败
// 提交事务
if txnResp, err = txn.Commit(); err != nil {
fmt.Println(err)
return // 没有问题
}
// 判断是否抢到了锁
if !txnResp.Succeeded {
fmt.Println("锁被占用:", string(txnResp.Responses[0].GetResponseRange().Kvs[0].Value))
return
}
// 2, 处理业务
fmt.Println("处理任务")
time.Sleep(5 * time.Second)
// 3, 释放锁(取消自动续租, 释放租约)
// defer 会把租约释放掉, 关联的KV就被删除了
}
执行结果:
本文内容总结:with用法,读取前缀,Delete,续租:,永不过期的租约,watch,op取代get put delete方法,事务txn实现分布式锁,
原文链接:https://www.cnblogs.com/sunlong88/p/11295424.html
以上是 golang中使用etcd 的全部内容, 来源链接: utcz.com/z/296987.html