zk中实现分布式锁服务

编程

什么场景下需要实现分布式锁" title="分布式锁">分布式锁?

Q: 例如多台客户端修改zk中配置文件,如何保证数据的一致性

解决方案

zk中分布式锁实现步骤 创建锁,获取锁,删除锁

具体实现

开始在zk中create 一个persistent类型znode,例如名字叫 /locks/write_lock

其他客户端需要在修改文件之前,执行如下步骤,看是否能够获取修改文件的锁,如果获取成功,可以修改文件内容

1客户端创建一个(临时顺序节点)sequence|ephemeral类型znode,名字是lockid开头,创建的znode是

/Locks/write_lock/lockid0000000001

2调用getChildren()不要设置watcher获取/locks/write_lock下的znode列表(这里不设置锁,是为了避免羊群效应,接收到一些无关通知,给server带来很大压力)

3判断创建的节点是不是排序后子节点列表中最小的那个节点,如果是,表示已经获取到了锁,业务完成之后,删除这个节点,如果不是,获取锁失败

4调用exists()方法判断比自己创建节点序列号小1的znode节点是否存在(也就是获取znode节点列表中最小的znode),并且监听该节点状态变化,如果exists()返回false,表示锁已经释放,从2步骤开始继续判断

5如果4中exists()返回true,等待zk通知,接收到通知后继续从2开始执行检查

 

import org.apache.zookeeper.CreateMode;

import org.apache.zookeeper.KeeperException;

import org.apache.zookeeper.WatchedEvent;

import org.apache.zookeeper.Watcher;

import org.apache.zookeeper.ZooDefs;

import org.apache.zookeeper.ZooKeeper;

import org.apache.zookeeper.data.Stat;

import java.io.IOException;

import java.util.Collections;

import java.util.List;

import java.util.concurrent.CountDownLatch;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import java.util.concurrent.Semaphore;

import java.util.concurrent.TimeUnit;

/**

* @date 2019/11/8 15:14

* description:

*/

public class ZookeeperLock implements Watcher {

public static final String XIE = "/";

private ZooKeeper zk = null;

private final String hosts = "127.0.0.1:2181";

private static final int SESSION_TIMEOUT = 5000;

private String root = "/locks";

private String waitNode;

private int waitTime = 100;

private String myzkNode;

private CountDownLatch latch;

public ZookeeperLock() throws IOException, KeeperException, InterruptedException {

zk = new ZooKeeper(hosts, SESSION_TIMEOUT, this);

Stat stat = zk.exists(root, false);

if (stat == null) {

//创建一个永久根节点

zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

}

}

public boolean tryLock() {

String splitStr = "lock_";

//获取下面所有子节点

List<String> children = null;

try {

myzkNode = zk.create(root + XIE + splitStr, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);

children = zk.getChildren(root, false);

} catch (KeeperException e) {

e.printStackTrace();

} catch (InterruptedException e) {

e.printStackTrace();

}

//对子节点排序

Collections.sort(children);

//只存在一个要获取锁的节点,没有竞争

if (myzkNode.equals(root + XIE + children.get(0))) {

return true;

}

//监控比自己小的节点

String childNode = myzkNode.substring(myzkNode.lastIndexOf(XIE) + 1);

waitNode = children.get(Collections.binarySearch(children, childNode) - 1);

return false;

}

private boolean waitLock(String waitNode, int waitTime) throws KeeperException, InterruptedException {

Stat stat = zk.exists(root + XIE + waitNode, true);

if (stat != null) {

this.latch = new CountDownLatch(1);

latch.await(waitTime, TimeUnit.MILLISECONDS);

this.latch = null;

}

return true;

}

/**

* 获取锁

*/

public void lock() {

if (tryLock()) {

System.out.println("Thread" + Thread.currentThread().getName() + "***************************************hold lock!");

return;

} else {

try {

waitLock(waitNode, waitTime);

} catch (KeeperException e) {

e.printStackTrace();

} catch (InterruptedException e) {

e.printStackTrace();

}

}

}

/**

* 释放锁

*/

public void unlock() {

try {

zk.delete(myzkNode, -1);

zk.close();

System.out.println("Thread" + Thread.currentThread().getName() + "*********************************unlock success!" + myzkNode);

} catch (InterruptedException e) {

e.printStackTrace();

} catch (KeeperException e) {

e.printStackTrace();

}

}

@Override

public void process(WatchedEvent event) {

if (this.latch != null) {

this.latch.countDown();

}

}

public static void main(String[] args) {

ExecutorService service = Executors.newCachedThreadPool();

//模拟并发数为10

final Semaphore semaphore = new Semaphore(10);

for (int i = 0; i < 100; i++) {

Runnable runnable = () -> {

try {

semaphore.acquire();

ZookeeperLock zkLock = new ZookeeperLock();

zkLock.lock();

//do something

Thread.sleep(1000);

zkLock.unlock();

semaphore.release();

} catch (Exception e) {

e.printStackTrace();

}

};

service.execute(runnable);

}

service.shutdown();

}

}

 

 

 

以上是 zk中实现分布式锁服务 的全部内容, 来源链接: utcz.com/z/510541.html

回到顶部